This is an automated email from the ASF dual-hosted git repository. hansva pushed a commit to branch 2.18.1-patch in repository https://gitbox.apache.org/repos/asf/hop.git
commit 0479fa479473ecfed7daa2273fa9f3cf587b7ad9 Author: Lance <[email protected]> AuthorDate: Fri May 29 21:33:28 2026 +0800 Fix NPE in SystemData transform, correct NONE type display (#7027) * Fix NPE in SystemData transform, correct NONE type display Signed-off-by: lance <[email protected]> * Fix NPE in SystemData transform, correct NONE type display Signed-off-by: lance <[email protected]> * Fix NPE in SystemData transform, correct NONE type display Signed-off-by: lance <[email protected]> --------- Signed-off-by: lance <[email protected]> --- .../pipeline/transforms/systemdata/SystemData.java | 1105 ++++++++------------ .../transforms/systemdata/SystemDataMeta.java | 1 + .../transforms/systemdata/SystemDataType.java | 6 +- .../transforms/systemdata/ManagementTests.java | 83 ++ .../transforms/systemdata/SystemDataDataTests.java | 72 ++ .../transforms/systemdata/SystemDataMetaTest.java | 94 +- .../transforms/systemdata/SystemDataTests.java | 382 +++++++ .../transforms/systemdata/SystemDataTypeTests.java | 111 ++ 8 files changed, 1195 insertions(+), 659 deletions(-) diff --git a/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemData.java b/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemData.java index f9056bf80c..7085699075 100644 --- a/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemData.java +++ b/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemData.java @@ -17,11 +17,96 @@ package org.apache.hop.pipeline.transforms.systemdata; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.AVAILABLE_PROCESSORS; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.COMMITTED_VIRTUAL_MEMORY_SIZE; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.COPYNR; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.CURRENT_PID; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.FILENAME; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.FREE_PHYSICAL_MEMORY_SIZE; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.FREE_SWAP_SPACE_SIZE; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.HOSTNAME; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.HOSTNAME_REAL; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.IP_ADDRESS; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.JVM_AVAILABLE_MEMORY; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.JVM_CPU_TIME; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.JVM_FREE_MEMORY; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.JVM_MAX_MEMORY; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.JVM_TOTAL_MEMORY; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.MODIFIED_DATE; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.MODIFIED_USER; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_DAY_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_DAY_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_MONTH_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_MONTH_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_QUARTER_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_QUARTER_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_WEEK_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_WEEK_END_US; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_WEEK_OPEN_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_WEEK_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_WEEK_START_US; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_YEAR_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.NEXT_YEAR_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PIPELINE_DATE_FROM; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PIPELINE_DATE_TO; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PIPELINE_NAME; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_ENTRY_NR; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_EXIT_STATUS; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_IS_STOPPED; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_LOG_TEXT; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_ERRORS; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_FILES; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_FILES_RETRIEVED; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_LINES_DELETED; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_LINES_INPUT; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_LINES_OUTPUT; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_LINES_READ; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_LINES_REJECTED; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_LINES_UPDATED; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_LINES_WRITTEN; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_NR_ROWS; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREVIOUS_RESULT_RESULT; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_DAY_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_DAY_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_MONTH_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_MONTH_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_QUARTER_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_QUARTER_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_WEEK_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_WEEK_END_US; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_WEEK_OPEN_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_WEEK_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_WEEK_START_US; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_YEAR_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.PREV_YEAR_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.SYSTEM_DATE; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.SYSTEM_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_DAY_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_DAY_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_MONTH_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_MONTH_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_QUARTER_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_QUARTER_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_WEEK_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_WEEK_END_US; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_WEEK_OPEN_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_WEEK_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_WEEK_START_US; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_YEAR_END; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.THIS_YEAR_START; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.TOTAL_PHYSICAL_MEMORY_SIZE; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.TOTAL_SWAP_SPACE_SIZE; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.WORKFLOW_DATE_FROM; +import static org.apache.hop.pipeline.transforms.systemdata.SystemDataType.WORKFLOW_DATE_TO; + import java.util.Calendar; import java.util.Date; +import java.util.EnumMap; import java.util.GregorianCalendar; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.function.Function; import org.apache.hop.core.Const; import org.apache.hop.core.Result; import org.apache.hop.core.exception.HopException; @@ -39,8 +124,24 @@ import org.apache.hop.pipeline.PipelineMeta; import org.apache.hop.pipeline.transform.BaseTransform; import org.apache.hop.pipeline.transform.TransformMeta; -/** Get information from the System or the supervising pipeline. */ +/** + * Get information from the System or the supervising pipeline. + * + * <p>The transform supports a wide range of system data types (see {@link SystemDataType}), + * including: + * + * <ul> + * <li>Pipeline and workflow execution timestamps + * <li>Date/time boundaries (day, week, month, quarter, year) + * <li>JVM and system metrics (memory, CPU, processors) + * <li>Environment information (hostname, IP, PID) + * <li>Previous execution result statistics + * </ul> + */ public class SystemData extends BaseTransform<SystemDataMeta, SystemDataData> { + private final Map<SystemDataType, ThrowingSupplier<Object>> resolvers = + new EnumMap<>(SystemDataType.class); + public SystemData( TransformMeta transformMeta, SystemDataMeta meta, @@ -51,656 +152,19 @@ public class SystemData extends BaseTransform<SystemDataMeta, SystemDataData> { super(transformMeta, meta, data, copyNr, pipelineMeta, pipeline); } - private Object[] getSystemData(IRowMeta inputRowMeta, Object[] inputRowData) throws HopException { - Object[] row = RowDataUtil.createResizedCopy(inputRowData, data.outputRowMeta.size()); - - for (int i = 0, index = inputRowMeta.size(); i < meta.getFields().size(); i++, index++) { - SystemDataMeta.SystemInfoField field = meta.getFields().get(i); - Calendar cal; - - switch (field.getFieldType()) { - case SYSTEM_START, PIPELINE_DATE_TO: - row[index] = getPipeline().getExecutionStartDate(); - break; - case SYSTEM_DATE: - row[index] = new Date(); - break; - case PIPELINE_DATE_FROM: - row[index] = - calculateStartRange( - getPipeline().getPipelineRunConfiguration().getExecutionInfoLocationName(), - ExecutionType.Pipeline, - getPipeline().getPipelineMeta().getName()); - break; - case WORKFLOW_DATE_FROM: - if (getPipeline().getParentWorkflow() != null) { - row[index] = - calculateStartRange( - getPipeline() - .getParentWorkflow() - .getWorkflowRunConfiguration() - .getExecutionInfoLocationName(), - ExecutionType.Workflow, - getPipeline().getParentWorkflow().getWorkflowMeta().getName()); - } - break; - case WORKFLOW_DATE_TO: - row[index] = getPipeline().getParentWorkflow().getExecutionStartDate(); - break; - case PREV_DAY_START: - cal = Calendar.getInstance(); - cal.add(Calendar.DAY_OF_MONTH, -1); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case PREV_DAY_END: - cal = Calendar.getInstance(); - cal.add(Calendar.DAY_OF_MONTH, -1); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case THIS_DAY_START: - cal = Calendar.getInstance(); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case THIS_DAY_END: - cal = Calendar.getInstance(); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case NEXT_DAY_START: - cal = Calendar.getInstance(); - cal.add(Calendar.DAY_OF_MONTH, 1); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case NEXT_DAY_END: - cal = Calendar.getInstance(); - cal.add(Calendar.DAY_OF_MONTH, 1); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case PREV_MONTH_START: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, -1); - cal.set(Calendar.DAY_OF_MONTH, 1); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case PREV_MONTH_END: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, -1); - cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH)); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case THIS_MONTH_START: - cal = Calendar.getInstance(); - cal.set(Calendar.DAY_OF_MONTH, 1); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case THIS_MONTH_END: - cal = Calendar.getInstance(); - cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH)); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case NEXT_MONTH_START: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, 1); - cal.set(Calendar.DAY_OF_MONTH, 1); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case NEXT_MONTH_END: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, 1); - cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH)); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case COPYNR: - row[index] = (long) getCopy(); - break; - case PIPELINE_NAME: - row[index] = getPipelineMeta().getName(); - break; - case MODIFIED_USER: - row[index] = getPipelineMeta().getModifiedUser(); - break; - case MODIFIED_DATE: - row[index] = getPipelineMeta().getModifiedDate(); - break; - case HOSTNAME_REAL: - row[index] = Const.getHostnameReal(); - break; - case HOSTNAME: - row[index] = Const.getHostname(); - break; - case IP_ADDRESS: - try { - row[index] = Const.getIPAddress(); - } catch (Exception e) { - throw new HopException(e); - } - break; - case FILENAME: - row[index] = getPipelineMeta().getFilename(); - break; - case CURRENT_PID: - row[index] = Management.getPID(); - break; - case JVM_TOTAL_MEMORY: - row[index] = Runtime.getRuntime().totalMemory(); - break; - case JVM_FREE_MEMORY: - row[index] = Runtime.getRuntime().freeMemory(); - break; - case JVM_MAX_MEMORY: - row[index] = Runtime.getRuntime().maxMemory(); - break; - case JVM_AVAILABLE_MEMORY: - Runtime rt = Runtime.getRuntime(); - row[index] = rt.freeMemory() + (rt.maxMemory() - rt.totalMemory()); - break; - case AVAILABLE_PROCESSORS: - row[index] = (long) Runtime.getRuntime().availableProcessors(); - break; - case JVM_CPU_TIME: - row[index] = Management.getJVMCpuTime() / 1000000; - break; - case TOTAL_PHYSICAL_MEMORY_SIZE: - row[index] = Management.getTotalPhysicalMemorySize(); - break; - case TOTAL_SWAP_SPACE_SIZE: - row[index] = Management.getTotalSwapSpaceSize(); - break; - case COMMITTED_VIRTUAL_MEMORY_SIZE: - row[index] = Management.getCommittedVirtualMemorySize(); - break; - case FREE_PHYSICAL_MEMORY_SIZE: - row[index] = Management.getFreePhysicalMemorySize(); - break; - case FREE_SWAP_SPACE_SIZE: - row[index] = Management.getFreeSwapSpaceSize(); - break; - - case PREV_WEEK_START: - cal = Calendar.getInstance(); - cal.add(Calendar.WEEK_OF_YEAR, -1); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case PREV_WEEK_END: - cal = Calendar.getInstance(); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - row[index] = cal.getTime(); - break; - case PREV_WEEK_OPEN_END: - cal = Calendar.getInstance(Locale.ROOT); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - cal.add(Calendar.DAY_OF_WEEK, -1); - row[index] = cal.getTime(); - break; - case PREV_WEEK_START_US: - cal = Calendar.getInstance(Locale.US); - cal.add(Calendar.WEEK_OF_YEAR, -1); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case PREV_WEEK_END_US: - cal = Calendar.getInstance(Locale.US); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - row[index] = cal.getTime(); - break; - case THIS_WEEK_START: - cal = Calendar.getInstance(); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case THIS_WEEK_END: - cal = Calendar.getInstance(); - cal.add(Calendar.WEEK_OF_YEAR, 1); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - row[index] = cal.getTime(); - break; - case THIS_WEEK_OPEN_END: - cal = Calendar.getInstance(Locale.ROOT); - cal.add(Calendar.WEEK_OF_YEAR, 1); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - cal.add(Calendar.DAY_OF_WEEK, -1); - row[index] = cal.getTime(); - break; - case THIS_WEEK_START_US: - cal = Calendar.getInstance(Locale.US); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case THIS_WEEK_END_US: - cal = Calendar.getInstance(Locale.US); - cal.add(Calendar.WEEK_OF_YEAR, 1); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - row[index] = cal.getTime(); - break; - case NEXT_WEEK_START: - cal = Calendar.getInstance(); - cal.add(Calendar.WEEK_OF_YEAR, 1); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case NEXT_WEEK_END: - cal = Calendar.getInstance(); - cal.add(Calendar.WEEK_OF_YEAR, 2); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - row[index] = cal.getTime(); - break; - case NEXT_WEEK_OPEN_END: - cal = Calendar.getInstance(Locale.ROOT); - cal.add(Calendar.WEEK_OF_YEAR, 2); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - cal.add(Calendar.DAY_OF_WEEK, -1); - row[index] = cal.getTime(); - break; - case NEXT_WEEK_START_US: - cal = Calendar.getInstance(Locale.US); - cal.add(Calendar.WEEK_OF_YEAR, 1); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case NEXT_WEEK_END_US: - cal = Calendar.getInstance(Locale.US); - cal.add(Calendar.WEEK_OF_YEAR, 2); - cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, -1); - row[index] = cal.getTime(); - break; - case PREV_QUARTER_START: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, -3 - (cal.get(Calendar.MONTH) % 3)); - cal.set(Calendar.DAY_OF_MONTH, 1); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case PREV_QUARTER_END: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, -1 - (cal.get(Calendar.MONTH) % 3)); - cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DATE)); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case THIS_QUARTER_START: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, -(cal.get(Calendar.MONTH) % 3)); - cal.set(Calendar.DAY_OF_MONTH, 1); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case THIS_QUARTER_END: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, 2 - (cal.get(Calendar.MONTH) % 3)); - cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DATE)); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case NEXT_QUARTER_START: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, 3 - (cal.get(Calendar.MONTH) % 3)); - cal.set(Calendar.DAY_OF_MONTH, 1); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case NEXT_QUARTER_END: - cal = Calendar.getInstance(); - cal.add(Calendar.MONTH, 5 - (cal.get(Calendar.MONTH) % 3)); - cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DATE)); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case PREV_YEAR_START: - cal = Calendar.getInstance(); - cal.add(Calendar.YEAR, -1); - cal.set(Calendar.DAY_OF_YEAR, cal.getActualMinimum(Calendar.DATE)); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case PREV_YEAR_END: - cal = Calendar.getInstance(); - cal.set(Calendar.DAY_OF_YEAR, cal.getActualMinimum(Calendar.DATE)); - cal.add(Calendar.DAY_OF_YEAR, -1); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case THIS_YEAR_START: - cal = Calendar.getInstance(); - cal.set(Calendar.DAY_OF_YEAR, cal.getActualMinimum(Calendar.DATE)); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case THIS_YEAR_END: - cal = Calendar.getInstance(); - cal.add(Calendar.YEAR, 1); - cal.set(Calendar.DAY_OF_YEAR, cal.getActualMinimum(Calendar.DATE)); - cal.add(Calendar.DAY_OF_YEAR, -1); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case NEXT_YEAR_START: - cal = Calendar.getInstance(); - cal.add(Calendar.YEAR, 1); - cal.set(Calendar.DAY_OF_YEAR, cal.getActualMinimum(Calendar.DATE)); - cal.set(Calendar.HOUR_OF_DAY, 0); - cal.set(Calendar.MINUTE, 0); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - row[index] = cal.getTime(); - break; - case NEXT_YEAR_END: - cal = Calendar.getInstance(); - cal.add(Calendar.YEAR, 2); - cal.set(Calendar.DAY_OF_YEAR, cal.getActualMinimum(Calendar.DATE)); - cal.add(Calendar.DAY_OF_YEAR, -1); - cal.set(Calendar.HOUR_OF_DAY, 23); - cal.set(Calendar.MINUTE, 59); - cal.set(Calendar.SECOND, 59); - cal.set(Calendar.MILLISECOND, 999); - row[index] = cal.getTime(); - break; - case PREVIOUS_RESULT_RESULT: - Result previousResultResult = getPipeline().getPreviousResult(); - boolean result = false; - if (previousResultResult != null) { - result = previousResultResult.isResult(); - } - row[index] = result; - break; - case PREVIOUS_RESULT_EXIT_STATUS: - Result previousResultExitStatus = getPipeline().getPreviousResult(); - long exitStatus = 0; - if (previousResultExitStatus != null) { - exitStatus = previousResultExitStatus.getExitStatus(); - } - row[index] = exitStatus; - break; - case PREVIOUS_RESULT_ENTRY_NR: - Result previousResultEntryNr = getPipeline().getPreviousResult(); - long entryNr = 0; - if (previousResultEntryNr != null) { - entryNr = previousResultEntryNr.getEntryNr(); - } - row[index] = entryNr; - break; - case PREVIOUS_RESULT_NR_FILES: - Result previousResultNrFiles = getPipeline().getPreviousResult(); - long nrFiles = 0; - if (previousResultNrFiles != null) { - nrFiles = previousResultNrFiles.getResultFiles().size(); - } - row[index] = nrFiles; - break; - case PREVIOUS_RESULT_NR_FILES_RETRIEVED: - Result previousResultNrFilesRetrieves = getPipeline().getPreviousResult(); - long nrFilesRetrieved = 0; - if (previousResultNrFilesRetrieves != null) { - nrFilesRetrieved = previousResultNrFilesRetrieves.getNrFilesRetrieved(); - } - row[index] = nrFilesRetrieved; - break; - case PREVIOUS_RESULT_NR_LINES_DELETED: - Result previousResultNrLinesDeleted = getPipeline().getPreviousResult(); - long nrLinesDeleted = 0; - if (previousResultNrLinesDeleted != null) { - nrLinesDeleted = previousResultNrLinesDeleted.getNrLinesDeleted(); - } - row[index] = nrLinesDeleted; - break; - case PREVIOUS_RESULT_NR_LINES_INPUT: - Result previousResultNrLinesInput = getPipeline().getPreviousResult(); - long nrLinesInput = 0; - if (previousResultNrLinesInput != null) { - nrLinesInput = previousResultNrLinesInput.getNrLinesInput(); - } - row[index] = nrLinesInput; - break; - case PREVIOUS_RESULT_NR_LINES_OUTPUT: - Result previousResultNrLinesOutput = getPipeline().getPreviousResult(); - long nrLinesOutput = 0; - if (previousResultNrLinesOutput != null) { - nrLinesOutput = previousResultNrLinesOutput.getNrLinesOutput(); - } - row[index] = nrLinesOutput; - break; - case PREVIOUS_RESULT_NR_LINES_READ: - Result previousResultNrLinesRead = getPipeline().getPreviousResult(); - long nrLinesRead = 0; - if (previousResultNrLinesRead != null) { - nrLinesRead = previousResultNrLinesRead.getNrLinesRead(); - } - row[index] = nrLinesRead; - break; - case PREVIOUS_RESULT_NR_LINES_REJECTED: - Result previousResultNrLinesRejected = getPipeline().getPreviousResult(); - long nrLinesRejected = 0; - if (previousResultNrLinesRejected != null) { - nrLinesRejected = previousResultNrLinesRejected.getNrLinesRejected(); - } - row[index] = nrLinesRejected; - break; - case PREVIOUS_RESULT_NR_LINES_UPDATED: - Result previousResultNrLinesUpdated = getPipeline().getPreviousResult(); - long nrLinesUpdated = 0; - if (previousResultNrLinesUpdated != null) { - nrLinesUpdated = previousResultNrLinesUpdated.getNrLinesUpdated(); - } - row[index] = nrLinesUpdated; - break; - case PREVIOUS_RESULT_NR_LINES_WRITTEN: - Result previousResultNrLinesWritten = getPipeline().getPreviousResult(); - long nrLinesWritten = 0; - if (previousResultNrLinesWritten != null) { - nrLinesWritten = previousResultNrLinesWritten.getNrLinesWritten(); - } - row[index] = nrLinesWritten; - break; - case PREVIOUS_RESULT_NR_ROWS: - Result previousResultNrRows = getPipeline().getPreviousResult(); - long nrRows = 0; - if (previousResultNrRows != null) { - nrRows = previousResultNrRows.getRows().size(); - } - row[index] = nrRows; - break; - case PREVIOUS_RESULT_IS_STOPPED: - Result previousResultIsStopped = getPipeline().getPreviousResult(); - boolean stop = false; - if (previousResultIsStopped != null) { - stop = previousResultIsStopped.isStopped(); - } - row[index] = stop; - break; - case PREVIOUS_RESULT_NR_ERRORS: - Result previousResultNrErrors = getPipeline().getPreviousResult(); - long nrErrors = 0; - if (previousResultNrErrors != null) { - nrErrors = previousResultNrErrors.getNrErrors(); - } - row[index] = nrErrors; - break; - case PREVIOUS_RESULT_LOG_TEXT: - Result previousResultLogText = getPipeline().getPreviousResult(); - String errorReason = null; - if (previousResultLogText != null) { - errorReason = previousResultLogText.getLogText(); - } - row[index] = errorReason; - break; - default: - break; + /** Initializes the transform and prepares all field resolvers. */ + @Override + public boolean init() { + if (super.init()) { + List<TransformMeta> previous = getPipelineMeta().findPreviousTransforms(getTransformMeta()); + if (!Utils.isEmpty(previous)) { + data.readsRows = true; } - } - - return row; - } - /** Calculate the start of the data range for a pipeline. */ - private Date calculateStartRange(String locationName, ExecutionType executionType, String name) - throws HopException { - ExecutionInfoLocation location = loadLocation(metadataProvider, locationName); - if (location == null) { - // Nothing to look up! - // - return null; - } - IExecutionInfoLocation iLocation = location.getExecutionInfoLocation(); - - try { - iLocation.initialize(this, metadataProvider); - - // Look up the previous successful execution of a pipeline with the given name - // - Execution execution = iLocation.findPreviousSuccessfulExecution(executionType, name); - if (execution == null) { - // We can go back millions of years but that would probably confuse a lot of 3rd party - // systems. - // - return new GregorianCalendar(1900, Calendar.JANUARY, 1).getTime(); - } else { - return execution.getExecutionStartDate(); - } - } finally { - iLocation.close(); + initResolvers(); + return true; } - } - - private ExecutionInfoLocation loadLocation( - IHopMetadataProvider metadataProvider, String locationName) throws HopException { - return metadataProvider.getSerializer(ExecutionInfoLocation.class).load(resolve(locationName)); + return false; } @Override @@ -720,7 +184,8 @@ public class SystemData extends BaseTransform<SystemDataMeta, SystemDataData> { } } else { - row = new Object[] {}; // empty row + // empty row + row = new Object[] {}; incrementLinesRead(); if (first) { @@ -753,16 +218,342 @@ public class SystemData extends BaseTransform<SystemDataMeta, SystemDataData> { return true; } - @Override - public boolean init() { - if (super.init()) { - List<TransformMeta> previous = getPipelineMeta().findPreviousTransforms(getTransformMeta()); - if (!Utils.isEmpty(previous)) { - data.readsRows = true; + /** + * Populates system data fields into the output row. + * + * <p>This method iterates over configured fields and resolves each value using the corresponding + * resolver. + * + * @param inputRowMeta metadata of the input row + * @param inputRowData input row data + * @return a new row containing original data plus system fields + * @throws HopException if value resolution fails + */ + private Object[] getSystemData(IRowMeta inputRowMeta, Object[] inputRowData) throws HopException { + Object[] row = RowDataUtil.createResizedCopy(inputRowData, data.outputRowMeta.size()); + + for (int i = 0, index = inputRowMeta.size(); i < meta.getFields().size(); i++, index++) { + SystemDataMeta.SystemInfoField field = meta.getFields().get(i); + row[index] = resolveFieldValue(field.getFieldType()); + } + return row; + } + + /** + * Initializes all resolver mappings for supported {@link SystemDataType}. + * + * <p>This method registers suppliers for: + * + * <ul> + * <li>Core system fields (dates, pipeline info) + * <li>Date boundary calculations + * <li>JVM metrics + * <li>Previous execution results + * </ul> + * + * <p>Initialization is idempotent and executed only once. + */ + private void initResolvers() { + if (!resolvers.isEmpty()) { + return; + } + resolvers.put(SYSTEM_START, () -> getPipeline().getExecutionStartDate()); + resolvers.put(PIPELINE_DATE_TO, () -> getPipeline().getExecutionStartDate()); + resolvers.put(SYSTEM_DATE, Date::new); + resolvers.put(PIPELINE_DATE_FROM, this::pipelineDateFrom); + resolvers.put(WORKFLOW_DATE_FROM, this::workflowDateFrom); + resolvers.put(WORKFLOW_DATE_TO, this::workflowDateTo); + + resolvers.put(PREV_DAY_START, () -> dayBoundary(-1, true)); + resolvers.put(PREV_DAY_END, () -> dayBoundary(-1, false)); + resolvers.put(THIS_DAY_START, () -> dayBoundary(0, true)); + resolvers.put(THIS_DAY_END, () -> dayBoundary(0, false)); + resolvers.put(NEXT_DAY_START, () -> dayBoundary(1, true)); + resolvers.put(NEXT_DAY_END, () -> dayBoundary(1, false)); + + resolvers.put(PREV_MONTH_START, () -> monthBoundary(-1, true)); + resolvers.put(PREV_MONTH_END, () -> monthBoundary(-1, false)); + resolvers.put(THIS_MONTH_START, () -> monthBoundary(0, true)); + resolvers.put(THIS_MONTH_END, () -> monthBoundary(0, false)); + resolvers.put(NEXT_MONTH_START, () -> monthBoundary(1, true)); + resolvers.put(NEXT_MONTH_END, () -> monthBoundary(1, false)); + + resolvers.put(PREV_WEEK_START, () -> weekBoundary(-1, Locale.getDefault(), true, false)); + resolvers.put(PREV_WEEK_END, () -> weekBoundary(-1, Locale.getDefault(), false, false)); + resolvers.put(PREV_WEEK_OPEN_END, () -> weekBoundary(-1, Locale.ROOT, false, true)); + resolvers.put(PREV_WEEK_START_US, () -> weekBoundary(-1, Locale.US, true, false)); + resolvers.put(PREV_WEEK_END_US, () -> weekBoundary(-1, Locale.US, false, false)); + resolvers.put(THIS_WEEK_START, () -> weekBoundary(0, Locale.getDefault(), true, false)); + resolvers.put(THIS_WEEK_END, () -> weekBoundary(0, Locale.getDefault(), false, false)); + resolvers.put(THIS_WEEK_OPEN_END, () -> weekBoundary(0, Locale.ROOT, false, true)); + resolvers.put(THIS_WEEK_START_US, () -> weekBoundary(0, Locale.US, true, false)); + resolvers.put(THIS_WEEK_END_US, () -> weekBoundary(0, Locale.US, false, false)); + resolvers.put(NEXT_WEEK_START, () -> weekBoundary(1, Locale.getDefault(), true, false)); + resolvers.put(NEXT_WEEK_END, () -> weekBoundary(1, Locale.getDefault(), false, false)); + resolvers.put(NEXT_WEEK_OPEN_END, () -> weekBoundary(1, Locale.ROOT, false, true)); + resolvers.put(NEXT_WEEK_START_US, () -> weekBoundary(1, Locale.US, true, false)); + resolvers.put(NEXT_WEEK_END_US, () -> weekBoundary(1, Locale.US, false, false)); + + resolvers.put(PREV_QUARTER_START, () -> quarterBoundary(-1, true)); + resolvers.put(PREV_QUARTER_END, () -> quarterBoundary(-1, false)); + resolvers.put(THIS_QUARTER_START, () -> quarterBoundary(0, true)); + resolvers.put(THIS_QUARTER_END, () -> quarterBoundary(0, false)); + resolvers.put(NEXT_QUARTER_START, () -> quarterBoundary(1, true)); + resolvers.put(NEXT_QUARTER_END, () -> quarterBoundary(1, false)); + + resolvers.put(PREV_YEAR_START, () -> yearBoundary(-1, true)); + resolvers.put(PREV_YEAR_END, () -> yearBoundary(-1, false)); + resolvers.put(THIS_YEAR_START, () -> yearBoundary(0, true)); + resolvers.put(THIS_YEAR_END, () -> yearBoundary(0, false)); + resolvers.put(NEXT_YEAR_START, () -> yearBoundary(1, true)); + resolvers.put(NEXT_YEAR_END, () -> yearBoundary(1, false)); + + resolvers.put(COPYNR, () -> (long) getCopy()); + resolvers.put(PIPELINE_NAME, () -> getPipelineMeta().getName()); + resolvers.put(FILENAME, () -> getPipelineMeta().getFilename()); + resolvers.put(MODIFIED_USER, () -> getPipelineMeta().getModifiedUser()); + resolvers.put(MODIFIED_DATE, () -> getPipelineMeta().getModifiedDate()); + resolvers.put(HOSTNAME_REAL, Const::getHostnameReal); + resolvers.put(HOSTNAME, Const::getHostname); + resolvers.put(IP_ADDRESS, this::safeIpAddress); + resolvers.put(CURRENT_PID, Management::getPID); + + // init jvm + initResolversJvm(); + // init resolver result + initResolversResult(); + } + + private void initResolversJvm() { + resolvers.put(JVM_TOTAL_MEMORY, () -> Runtime.getRuntime().totalMemory()); + resolvers.put(JVM_FREE_MEMORY, () -> Runtime.getRuntime().freeMemory()); + resolvers.put(JVM_MAX_MEMORY, () -> Runtime.getRuntime().maxMemory()); + resolvers.put(JVM_AVAILABLE_MEMORY, this::jvmAvailableMemory); + resolvers.put(AVAILABLE_PROCESSORS, () -> (long) Runtime.getRuntime().availableProcessors()); + resolvers.put(JVM_CPU_TIME, () -> Management.getJVMCpuTime() / 1000000); + resolvers.put(TOTAL_PHYSICAL_MEMORY_SIZE, Management::getTotalPhysicalMemorySize); + resolvers.put(TOTAL_SWAP_SPACE_SIZE, Management::getTotalSwapSpaceSize); + resolvers.put(COMMITTED_VIRTUAL_MEMORY_SIZE, Management::getCommittedVirtualMemorySize); + resolvers.put(FREE_PHYSICAL_MEMORY_SIZE, Management::getFreePhysicalMemorySize); + resolvers.put(FREE_SWAP_SPACE_SIZE, Management::getFreeSwapSpaceSize); + } + + private void initResolversResult() { + resolvers.put( + PREVIOUS_RESULT_RESULT, () -> previousResult() != null && previousResult().isResult()); + resolvers.put(PREVIOUS_RESULT_EXIT_STATUS, fromPreResult(Result::getExitStatus, 0L)); + resolvers.put(PREVIOUS_RESULT_ENTRY_NR, fromPreResult(Result::getEntryNr, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_FILES, fromPreResult(r -> r.getResultFiles().size(), 0L)); + resolvers.put( + PREVIOUS_RESULT_NR_FILES_RETRIEVED, fromPreResult(Result::getNrFilesRetrieved, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_LINES_DELETED, fromPreResult(Result::getNrLinesDeleted, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_LINES_INPUT, fromPreResult(Result::getNrLinesInput, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_LINES_OUTPUT, fromPreResult(Result::getNrLinesOutput, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_LINES_READ, fromPreResult(Result::getNrLinesRead, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_LINES_REJECTED, fromPreResult(Result::getNrLinesRejected, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_LINES_UPDATED, fromPreResult(Result::getNrLinesUpdated, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_LINES_WRITTEN, fromPreResult(Result::getNrLinesWritten, 0L)); + resolvers.put(PREVIOUS_RESULT_NR_ROWS, fromPreResult(r -> r.getRows().size(), 0L)); + resolvers.put( + PREVIOUS_RESULT_IS_STOPPED, () -> previousResult() != null && previousResult().isStopped()); + resolvers.put(PREVIOUS_RESULT_NR_ERRORS, fromPreResult(Result::getNrErrors, 0L)); + resolvers.put(PREVIOUS_RESULT_LOG_TEXT, fromPreResult(Result::getLogText, null)); + } + + /** + * Creates a resolver that extracts a value from the previous execution result. + * + * <p>If the previous result is {@code null}, the provided default value is returned. + * + * @param func function to extract value from {@link Result} + * @param defaultValue fallback value when result is not available + * @param <T> return type + * @return a supplier that safely resolves the value + */ + private <T> ThrowingSupplier<T> fromPreResult(Function<Result, T> func, T defaultValue) { + return () -> { + Result r = previousResult(); + return r == null ? defaultValue : func.apply(r); + }; + } + + /** + * Resolves the value for a given system data type. + * + * @param type the system data type + * @return the resolved value, or {@code null} if no resolver exists + * @throws HopException if resolution fails + */ + private Object resolveFieldValue(SystemDataType type) throws HopException { + ThrowingSupplier<Object> supplier = resolvers.get(type); + return supplier == null ? null : supplier.get(); + } + + private Date pipelineDateFrom() throws HopException { + return calculateStartRange( + getPipeline().getPipelineRunConfiguration().getExecutionInfoLocationName(), + ExecutionType.Pipeline, + getPipeline().getPipelineMeta().getName()); + } + + private Date workflowDateFrom() throws HopException { + if (getPipeline().getParentWorkflow() == null) { + return null; + } + return calculateStartRange( + getPipeline() + .getParentWorkflow() + .getWorkflowRunConfiguration() + .getExecutionInfoLocationName(), + ExecutionType.Workflow, + getPipeline().getParentWorkflow().getWorkflowMeta().getName()); + } + + private Date workflowDateTo() { + return getPipeline().getParentWorkflow() == null + ? null + : getPipeline().getParentWorkflow().getExecutionStartDate(); + } + + /** + * Computes the start or end of a day relative to the current date. + * + * @param dayOffset offset from current day (e.g., -1 = previous day) + * @param start {@code true} for start of day, {@code false} for end of day + * @return calculated date + */ + private Date dayBoundary(int dayOffset, boolean start) { + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.DAY_OF_MONTH, dayOffset); + setDayTime(cal, start); + return cal.getTime(); + } + + private Date monthBoundary(int monthOffset, boolean start) { + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.MONTH, monthOffset); + cal.set(Calendar.DAY_OF_MONTH, start ? 1 : cal.getActualMaximum(Calendar.DAY_OF_MONTH)); + setDayTime(cal, start); + return cal.getTime(); + } + + private Date weekBoundary(int weekOffset, Locale locale, boolean start, boolean openEnd) { + Calendar cal = + locale == null || Locale.getDefault().equals(locale) + ? Calendar.getInstance() + : Calendar.getInstance(locale); + if (start) { + cal.add(Calendar.WEEK_OF_YEAR, weekOffset); + cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); + setDayTime(cal, true); + } else { + cal.add(Calendar.WEEK_OF_YEAR, weekOffset + 1); + cal.set(Calendar.DAY_OF_WEEK, cal.getFirstDayOfWeek()); + cal.set(Calendar.HOUR_OF_DAY, 0); + cal.set(Calendar.MINUTE, 0); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, -1); + if (openEnd) { + cal.add(Calendar.DAY_OF_WEEK, -1); } + } + return cal.getTime(); + } - return true; + private Date quarterBoundary(int quarterOffset, boolean start) { + Calendar cal = Calendar.getInstance(); + int monthShift = + start + ? (quarterOffset * 3) - (cal.get(Calendar.MONTH) % 3) + : (quarterOffset * 3 + 2) - (cal.get(Calendar.MONTH) % 3); + cal.add(Calendar.MONTH, monthShift); + cal.set(Calendar.DAY_OF_MONTH, start ? 1 : cal.getActualMaximum(Calendar.DATE)); + setDayTime(cal, start); + return cal.getTime(); + } + + private Date yearBoundary(int yearOffset, boolean start) { + Calendar cal = Calendar.getInstance(); + if (start) { + cal.add(Calendar.YEAR, yearOffset); + cal.set(Calendar.DAY_OF_YEAR, cal.getActualMinimum(Calendar.DATE)); + setDayTime(cal, true); + } else { + cal.add(Calendar.YEAR, yearOffset + 1); + cal.set(Calendar.DAY_OF_YEAR, cal.getActualMinimum(Calendar.DATE)); + cal.add(Calendar.DAY_OF_YEAR, -1); + setDayTime(cal, false); } - return false; + return cal.getTime(); + } + + private void setDayTime(Calendar cal, boolean start) { + cal.set(Calendar.HOUR_OF_DAY, start ? 0 : 23); + cal.set(Calendar.MINUTE, start ? 0 : 59); + cal.set(Calendar.SECOND, start ? 0 : 59); + cal.set(Calendar.MILLISECOND, start ? 0 : 999); + } + + private Object safeIpAddress() throws HopException { + try { + return Const.getIPAddress(); + } catch (Exception e) { + throw new HopException(e); + } + } + + private long jvmAvailableMemory() { + Runtime rt = Runtime.getRuntime(); + return rt.freeMemory() + (rt.maxMemory() - rt.totalMemory()); + } + + private Result previousResult() { + return getPipeline().getPreviousResult(); + } + + /** Calculate the start of the data range for a pipeline. */ + private Date calculateStartRange(String locationName, ExecutionType executionType, String name) + throws HopException { + ExecutionInfoLocation location = loadLocation(metadataProvider, locationName); + if (location == null) { + // Nothing to look up! + // + return null; + } + IExecutionInfoLocation iLocation = location.getExecutionInfoLocation(); + + try { + iLocation.initialize(this, metadataProvider); + + // Look up the previous successful execution of a pipeline with the given name + // + Execution execution = iLocation.findPreviousSuccessfulExecution(executionType, name); + if (execution == null) { + // We can go back millions of years but that would probably confuse a lot of 3rd party + // systems. + // + return new GregorianCalendar(1900, Calendar.JANUARY, 1).getTime(); + } else { + return execution.getExecutionStartDate(); + } + } finally { + iLocation.close(); + } + } + + private ExecutionInfoLocation loadLocation( + IHopMetadataProvider metadataProvider, String locationName) throws HopException { + return metadataProvider.getSerializer(ExecutionInfoLocation.class).load(resolve(locationName)); + } + + /** + * Functional interface similar to {@link java.util.function.Supplier}, but allows throwing {@link + * HopException}. + * + * @param <T> supplied value type + */ + @FunctionalInterface + private interface ThrowingSupplier<T> { + T get() throws HopException; } } diff --git a/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataMeta.java b/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataMeta.java index 9869683b62..b40491a6bb 100644 --- a/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataMeta.java +++ b/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataMeta.java @@ -163,6 +163,7 @@ public class SystemDataMeta extends BaseTransformMeta<SystemData, SystemDataData PREVIOUS_RESULT_EXIT_STATUS, PREVIOUS_RESULT_ENTRY_NR, PREVIOUS_RESULT_NR_ERRORS, + PREVIOUS_RESULT_NR_ROWS, PREVIOUS_RESULT_NR_FILES, PREVIOUS_RESULT_NR_FILES_RETRIEVED, PREVIOUS_RESULT_NR_LINES_DELETED, diff --git a/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataType.java b/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataType.java index ab87f8bae8..825f229d68 100644 --- a/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataType.java +++ b/plugins/transforms/systemdata/src/main/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataType.java @@ -17,6 +17,7 @@ package org.apache.hop.pipeline.transforms.systemdata; +import java.util.Arrays; import lombok.Getter; import org.apache.hop.i18n.BaseMessages; import org.apache.hop.metadata.api.IEnumHasCodeAndDescription; @@ -132,7 +133,10 @@ public enum SystemDataType implements IEnumHasCodeAndDescription { } public static String[] getDescriptions() { - return IEnumHasCodeAndDescription.getDescriptions(SystemDataType.class); + return Arrays.stream(SystemDataType.values()) + .filter(t -> t != SystemDataType.NONE) + .map(SystemDataType::getDescription) + .toArray(String[]::new); } SystemDataType(String code, String descriptionName) { diff --git a/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/ManagementTests.java b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/ManagementTests.java new file mode 100644 index 0000000000..30aa75b25f --- /dev/null +++ b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/ManagementTests.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hop.pipeline.transforms.systemdata; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.management.ThreadMXBean; +import org.junit.jupiter.api.Test; + +/** Unit test for {@link Management} */ +class ManagementTests { + + @Test + void testGetPID() { + long pid = Management.getPID(); + + assertTrue(pid > 0, "PID should be positive"); + } + + @Test + void testGetJVMCpuTime() { + long cpuTime = Management.getJVMCpuTime(); + + assertTrue(cpuTime >= 0, "CPU time should be >= 0"); + } + + @Test + void testMemoryMethods() { + assertTrue(Management.getFreePhysicalMemorySize() >= 0); + assertTrue(Management.getFreeSwapSpaceSize() >= 0); + assertTrue(Management.getTotalPhysicalMemorySize() > 0); + assertTrue(Management.getTotalSwapSpaceSize() >= 0); + assertTrue(Management.getCommittedVirtualMemorySize() >= 0); + } + + @Test + void testGetCpuTime() { + long threadId = Thread.currentThread().threadId(); + + long cpuTime = Management.getCpuTime(threadId); + assertTrue(cpuTime >= 0); + } + + @Test + void testGetCpuTime_notSupported() throws Exception { + ThreadMXBean mockBean = mock(ThreadMXBean.class); + when(mockBean.isThreadCpuTimeSupported()).thenReturn(false); + + var field = Management.class.getDeclaredField("threadBean"); + field.setAccessible(true); + field.set(null, mockBean); + + long result = Management.getCpuTime(1L); + assertEquals(0L, result); + } + + @Test + void testBeanCaching() { + long v1 = Management.getJVMCpuTime(); + long v2 = Management.getJVMCpuTime(); + + assertTrue(v1 >= 0); + assertTrue(v2 >= 0); + } +} diff --git a/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataDataTests.java b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataDataTests.java new file mode 100644 index 0000000000..63bbda5e2a --- /dev/null +++ b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataDataTests.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hop.pipeline.transforms.systemdata; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.hop.core.row.RowMeta; +import org.apache.hop.pipeline.transform.BaseTransformData; +import org.junit.jupiter.api.Test; + +/** Unit test for {@link SystemDataData} */ +class SystemDataDataTests { + + @Test + void testDefaultState() { + SystemDataData data = new SystemDataData(); + + assertFalse(data.readsRows, "readsRows should default to false"); + assertNull(data.outputRowMeta, "outputRowMeta should default to null"); + } + + @Test + void testFieldAssignment() { + SystemDataData data = new SystemDataData(); + + data.readsRows = true; + + RowMeta rowMeta = new RowMeta(); + data.outputRowMeta = rowMeta; + + assertTrue(data.readsRows); + assertSame(rowMeta, data.outputRowMeta); + } + + @Test + void testInheritance() { + SystemDataData data = new SystemDataData(); + + assertNotNull(data); + assertInstanceOf(BaseTransformData.class, data); + } + + @Test + void testThreadSafetyBasic() throws InterruptedException { + SystemDataData data = new SystemDataData(); + + Thread t = Thread.startVirtualThread(() -> data.readsRows = true); + t.join(); + + assertTrue(data.readsRows); + } +} diff --git a/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataMetaTest.java b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataMetaTest.java index 4f50786adb..82d58801c2 100644 --- a/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataMetaTest.java +++ b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataMetaTest.java @@ -20,17 +20,25 @@ package org.apache.hop.pipeline.transforms.systemdata; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; +import org.apache.hop.core.ICheckResult; +import org.apache.hop.core.row.IValueMeta; +import org.apache.hop.core.row.RowMeta; +import org.apache.hop.core.variables.Variables; import org.apache.hop.core.xml.XmlHandler; import org.apache.hop.metadata.serializer.memory.MemoryMetadataProvider; import org.apache.hop.metadata.serializer.xml.XmlMetadataUtil; import org.apache.hop.pipeline.transform.TransformMeta; import org.junit.jupiter.api.Test; +/** Unit test for {@link SystemDataMeta} */ class SystemDataMetaTest { @Test void testLoadSave() throws Exception { @@ -60,11 +68,95 @@ class SystemDataMetaTest { validate(metaCopy); } + @Test + void testGetFieldsDefaultType() throws Exception { + SystemDataMeta meta = new SystemDataMeta(); + + SystemDataMeta.SystemInfoField field = new SystemDataMeta.SystemInfoField(); + field.setFieldName("noneField"); + field.setFieldType(SystemDataType.NONE); + + meta.getFields().add(field); + + RowMeta rowMeta = new RowMeta(); + meta.getFields(rowMeta, "t", null, null, new Variables(), null); + + assertEquals(IValueMeta.TYPE_NONE, rowMeta.getValueMeta(0).getType()); + } + + @Test + void testCheckErrorWhenTypeNone() { + SystemDataMeta meta = new SystemDataMeta(); + + SystemDataMeta.SystemInfoField field = new SystemDataMeta.SystemInfoField(); + field.setFieldName("f1"); + field.setFieldType(SystemDataType.NONE); + + meta.getFields().add(field); + + List<ICheckResult> remarks = new ArrayList<>(); + meta.check(remarks, null, null, null, null, null, null, new Variables(), null); + + assertFalse(remarks.isEmpty()); + assertEquals(ICheckResult.TYPE_RESULT_ERROR, remarks.getFirst().getType()); + } + + @Test + void testCheckOk() { + SystemDataMeta meta = new SystemDataMeta(); + + SystemDataMeta.SystemInfoField field = new SystemDataMeta.SystemInfoField(); + field.setFieldName("f1"); + field.setFieldType(SystemDataType.SYSTEM_DATE); + + meta.getFields().add(field); + + List<ICheckResult> remarks = new ArrayList<>(); + meta.check(remarks, null, null, null, null, null, null, new Variables(), null); + + assertEquals(1, remarks.size()); + assertEquals(ICheckResult.TYPE_RESULT_OK, remarks.getFirst().getType()); + } + + @Test + void testCloneDeepCopy() { + SystemDataMeta meta = new SystemDataMeta(); + + SystemDataMeta.SystemInfoField field = new SystemDataMeta.SystemInfoField(); + field.setFieldName("f1"); + field.setFieldType(SystemDataType.SYSTEM_DATE); + + meta.getFields().add(field); + + SystemDataMeta cloned = (SystemDataMeta) meta.clone(); + + assertNotSame(meta, cloned); + assertEquals(meta.getFields().size(), cloned.getFields().size()); + + assertNotSame(meta.getFields().getFirst(), cloned.getFields().getFirst()); + } + + @Test + void testCopyConstructor() { + SystemDataMeta meta = new SystemDataMeta(); + + SystemDataMeta.SystemInfoField field = new SystemDataMeta.SystemInfoField(); + field.setFieldName("f1"); + field.setFieldType(SystemDataType.SYSTEM_DATE); + + meta.getFields().add(field); + + SystemDataMeta copy = new SystemDataMeta(meta); + + assertEquals(1, copy.getFields().size()); + assertNotSame(meta.getFields().getFirst(), copy.getFields().getFirst()); + } + private static void validate(SystemDataMeta meta) { assertNotNull(meta.getFields()); assertFalse(meta.getFields().isEmpty()); assertEquals(8, meta.getFields().size()); - SystemDataMeta.SystemInfoField f1 = meta.getFields().get(0); + SystemDataMeta.SystemInfoField f1 = meta.getFields().getFirst(); assertEquals("variable_sysdate", f1.getFieldName()); assertEquals(SystemDataType.SYSTEM_DATE, f1.getFieldType()); diff --git a/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataTests.java b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataTests.java new file mode 100644 index 0000000000..c6dd0473d8 --- /dev/null +++ b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataTests.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hop.pipeline.transforms.systemdata; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import org.apache.hop.core.Const; +import org.apache.hop.core.IRowSet; +import org.apache.hop.core.QueueRowSet; +import org.apache.hop.core.Result; +import org.apache.hop.core.ResultFile; +import org.apache.hop.core.exception.HopException; +import org.apache.hop.core.logging.ILoggingObject; +import org.apache.hop.core.row.IRowMeta; +import org.apache.hop.core.row.RowMeta; +import org.apache.hop.core.row.value.ValueMetaString; +import org.apache.hop.execution.Execution; +import org.apache.hop.execution.ExecutionInfoLocation; +import org.apache.hop.execution.IExecutionInfoLocation; +import org.apache.hop.junit.rules.RestoreHopEngineEnvironmentExtension; +import org.apache.hop.metadata.api.IHopMetadataProvider; +import org.apache.hop.metadata.api.IHopMetadataSerializer; +import org.apache.hop.pipeline.PipelineMeta; +import org.apache.hop.pipeline.config.PipelineRunConfiguration; +import org.apache.hop.pipeline.transform.TransformMeta; +import org.apache.hop.pipeline.transforms.mock.TransformMockHelper; +import org.apache.hop.workflow.WorkflowMeta; +import org.apache.hop.workflow.config.WorkflowRunConfiguration; +import org.apache.hop.workflow.engine.IWorkflowEngine; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedStatic; + +/** Unit test for {@link SystemData} */ +@ExtendWith(RestoreHopEngineEnvironmentExtension.class) +class SystemDataTests { + + private static final Date PIPELINE_START = new Date(1_700_000_000_000L); + private static final Date WORKFLOW_START = new Date(1_700_100_000_000L); + private static final Date PREVIOUS_EXECUTION_START = new Date(1_699_000_000_000L); + + private TransformMockHelper<SystemDataMeta, SystemDataData> helper; + private SystemDataMeta meta; + private SystemDataData data; + private IHopMetadataProvider metadataProvider; + private IExecutionInfoLocation executionInfoLocation; + + @BeforeEach + void setUp() throws Exception { + helper = new TransformMockHelper<>("SystemData", SystemDataMeta.class, SystemDataData.class); + when(helper.pipeline.isRunning()).thenReturn(true); + when(helper.pipeline.isStopped()).thenReturn(false); + when(helper.logChannelFactory.create(any(), any(ILoggingObject.class))) + .thenReturn(helper.iLogChannel); + + meta = new SystemDataMeta(); + data = new SystemDataData(); + metadataProvider = mock(IHopMetadataProvider.class); + + when(helper.pipeline.getExecutionStartDate()).thenReturn(PIPELINE_START); + when(helper.pipeline.getPipelineMeta()).thenReturn(helper.pipelineMeta); + when(helper.pipelineMeta.getName()).thenReturn("pipeline-name"); + when(helper.pipelineMeta.getModifiedUser()).thenReturn("tester"); + when(helper.pipelineMeta.getModifiedDate()).thenReturn(new Date(1_701_000_000_000L)); + when(helper.pipelineMeta.getFilename()).thenReturn("pipeline-file.hpl"); + + PipelineRunConfiguration pipelineRunConfiguration = new PipelineRunConfiguration(); + pipelineRunConfiguration.setExecutionInfoLocationName("pipeline-location"); + when(helper.pipeline.getPipelineRunConfiguration()).thenReturn(pipelineRunConfiguration); + @SuppressWarnings("unchecked") + IWorkflowEngine<WorkflowMeta> parentWorkflow = mock(IWorkflowEngine.class); + when(helper.pipeline.getParentWorkflow()).thenReturn(parentWorkflow); + when(parentWorkflow.getExecutionStartDate()).thenReturn(WORKFLOW_START); + WorkflowMeta workflowMeta = mock(WorkflowMeta.class); + when(workflowMeta.getName()).thenReturn("workflow-name"); + when(parentWorkflow.getWorkflowMeta()).thenReturn(workflowMeta); + WorkflowRunConfiguration workflowRunConfiguration = new WorkflowRunConfiguration(); + workflowRunConfiguration.setExecutionInfoLocationName("workflow-location"); + when(parentWorkflow.getWorkflowRunConfiguration()).thenReturn(workflowRunConfiguration); + + Result previousResult = createPreviousResult(); + when(helper.pipeline.getPreviousResult()).thenReturn(previousResult); + + executionInfoLocation = mock(IExecutionInfoLocation.class); + Execution execution = new Execution(); + execution.setExecutionStartDate(PREVIOUS_EXECUTION_START); + when(executionInfoLocation.findPreviousSuccessfulExecution(any(), any())).thenReturn(execution); + + @SuppressWarnings("unchecked") + IHopMetadataSerializer<ExecutionInfoLocation> serializer = mock(IHopMetadataSerializer.class); + when(metadataProvider.getSerializer(ExecutionInfoLocation.class)).thenReturn(serializer); + when(serializer.load(any())) + .thenAnswer( + invocation -> { + ExecutionInfoLocation location = new ExecutionInfoLocation(); + location.setExecutionInfoLocation(executionInfoLocation); + return location; + }); + } + + @Test + void processRow_readsInputAndEmitsAllConfiguredSystemFields() throws Exception { + List<SystemDataMeta.SystemInfoField> fields = new ArrayList<>(); + for (SystemDataType type : SystemDataType.values()) { + if (type == SystemDataType.HOSTNAME + || type == SystemDataType.HOSTNAME_REAL + || type == SystemDataType.IP_ADDRESS) { + continue; + } + SystemDataMeta.SystemInfoField field = new SystemDataMeta.SystemInfoField(); + field.setFieldName("f_" + type.name()); + field.setFieldType(type); + fields.add(field); + } + meta.setFields(fields); + + when(helper.pipelineMeta.findPreviousTransforms(any())) + .thenReturn(List.of(mock(TransformMeta.class))); + SystemData transform = newTransform(); + IRowMeta inputMeta = new RowMeta(); + inputMeta.addValueMeta(new ValueMetaString("input")); + transform.setInputRowMeta(inputMeta); + transform.addRowSetToInputRowSets(helper.getMockInputRowSet(new Object[][] {{"seed"}})); + QueueRowSet outputRowSet = new QueueRowSet(); + transform.addRowSetToOutputRowSets(outputRowSet); + + assertTrue(transform.init()); + transform.processRow(); + + Object[] output = outputRowSet.getRow(); + assertNotNull(output); + assertTrue(output.length >= fields.size()); + assertEquals("seed", output[0]); + + assertEquals( + PIPELINE_START, value(outputRowSet.getRowMeta(), output, SystemDataType.SYSTEM_START)); + assertEquals( + "pipeline-name", value(outputRowSet.getRowMeta(), output, SystemDataType.PIPELINE_NAME)); + assertEquals("tester", value(outputRowSet.getRowMeta(), output, SystemDataType.MODIFIED_USER)); + assertEquals( + "pipeline-file.hpl", value(outputRowSet.getRowMeta(), output, SystemDataType.FILENAME)); + assertEquals( + 0L, + ((Number) + Objects.requireNonNull( + value(outputRowSet.getRowMeta(), output, SystemDataType.COPYNR))) + .longValue()); + assertEquals( + true, value(outputRowSet.getRowMeta(), output, SystemDataType.PREVIOUS_RESULT_RESULT)); + assertEquals( + 11L, + ((Number) + Objects.requireNonNull( + value( + outputRowSet.getRowMeta(), + output, + SystemDataType.PREVIOUS_RESULT_EXIT_STATUS))) + .longValue()); + assertEquals( + "log-text", + value(outputRowSet.getRowMeta(), output, SystemDataType.PREVIOUS_RESULT_LOG_TEXT)); + assertNull(value(outputRowSet.getRowMeta(), output, SystemDataType.NONE)); + assertInstanceOf( + Date.class, value(outputRowSet.getRowMeta(), output, SystemDataType.THIS_DAY_START)); + } + + @Test + @Tag("slow") + void processRow_networkFieldsAreFastAndCoveredWithStaticMocking() throws Exception { + List<SystemDataMeta.SystemInfoField> fields = new ArrayList<>(); + fields.add(field("f_HOSTNAME", SystemDataType.HOSTNAME)); + fields.add(field("f_HOSTNAME_REAL", SystemDataType.HOSTNAME_REAL)); + fields.add(field("f_IP_ADDRESS", SystemDataType.IP_ADDRESS)); + meta.setFields(fields); + + when(helper.pipelineMeta.findPreviousTransforms(any())) + .thenReturn(List.of(mock(TransformMeta.class))); + SystemData transform = newTransform(); + transform.setInputRowMeta(new RowMeta()); + transform.addRowSetToInputRowSets(helper.getMockInputRowSet(new Object[][] {{}})); + QueueRowSet outputRowSet = new QueueRowSet(); + transform.addRowSetToOutputRowSets(outputRowSet); + + try (MockedStatic<Const> constMock = mockStatic(Const.class, CALLS_REAL_METHODS)) { + constMock.when(Const::getHostname).thenReturn("host"); + constMock.when(Const::getHostnameReal).thenReturn("lance"); + constMock.when(Const::getIPAddress).thenReturn("127.0.0.1"); + + assertTrue(transform.init()); + transform.processRow(); + } + + Object[] output = outputRowSet.getRow(); + assertNotNull(output); + assertEquals("host", valueByFieldName(outputRowSet.getRowMeta(), output, "f_HOSTNAME")); + assertEquals("lance", valueByFieldName(outputRowSet.getRowMeta(), output, "f_HOSTNAME_REAL")); + assertEquals("127.0.0.1", valueByFieldName(outputRowSet.getRowMeta(), output, "f_IP_ADDRESS")); + } + + @Test + void processRow_workflowDatesAreNullWhenParentWorkflowMissing() throws Exception { + meta.setFields( + List.of( + field("workflow_from", SystemDataType.WORKFLOW_DATE_FROM), + field("workflow_to", SystemDataType.WORKFLOW_DATE_TO))); + when(helper.pipeline.getParentWorkflow()).thenReturn(null); + when(helper.pipelineMeta.findPreviousTransforms(any())).thenReturn(new ArrayList<>()); + + SystemData transform = newTransform(); + transform.setInputRowMeta(new RowMeta()); + transform.addRowSetToOutputRowSets(new QueueRowSet()); + assertTrue(transform.init()); + assertFalse(transform.processRow()); + + IRowSet outputRowSet = transform.getOutputRowSets().getFirst(); + Object[] output = outputRowSet.getRow(); + assertNull(valueByFieldName(outputRowSet.getRowMeta(), output, "workflow_from")); + assertNull(valueByFieldName(outputRowSet.getRowMeta(), output, "workflow_to")); + } + + @Test + void processRow_pipelineDateFromFallsBackTo1900WhenNoPreviousExecution() throws Exception { + meta.setFields(List.of(field("pipeline_from", SystemDataType.PIPELINE_DATE_FROM))); + when(helper.pipelineMeta.findPreviousTransforms(any())).thenReturn(new ArrayList<>()); + when(executionInfoLocation.findPreviousSuccessfulExecution(any(), any())).thenReturn(null); + + SystemData transform = newTransform(); + transform.setInputRowMeta(new RowMeta()); + transform.addRowSetToOutputRowSets(new QueueRowSet()); + assertTrue(transform.init()); + assertFalse(transform.processRow()); + + IRowSet outputRowSet = transform.getOutputRowSets().getFirst(); + Object[] output = outputRowSet.getRow(); + Date value = (Date) valueByFieldName(outputRowSet.getRowMeta(), output, "pipeline_from"); + assertNotNull(value); + Calendar calendar = Calendar.getInstance(); + calendar.setTime(value); + assertEquals(1900, calendar.get(Calendar.YEAR)); + } + + @Test + void processRow_ipAddressFailureThrowsHopException() throws Exception { + meta.setFields(List.of(field("ip", SystemDataType.IP_ADDRESS))); + when(helper.pipelineMeta.findPreviousTransforms(any())).thenReturn(new ArrayList<>()); + SystemData transform = newTransform(); + transform.setInputRowMeta(new RowMeta()); + transform.addRowSetToOutputRowSets(new QueueRowSet()); + assertTrue(transform.init()); + + try (MockedStatic<Const> constMock = mockStatic(Const.class, CALLS_REAL_METHODS)) { + constMock + .when(Const::getIPAddress) + .thenThrow(new UnknownHostException("expected test exception")); + assertThrows(HopException.class, transform::processRow); + } + } + + @Test + void processRow_singleRowModeEmitsDefaultsWhenPreviousResultMissing() throws Exception { + List<SystemDataMeta.SystemInfoField> fields = new ArrayList<>(); + fields.add(field("nr_errors", SystemDataType.PREVIOUS_RESULT_NR_ERRORS)); + fields.add(field("result", SystemDataType.PREVIOUS_RESULT_RESULT)); + fields.add(field("log", SystemDataType.PREVIOUS_RESULT_LOG_TEXT)); + fields.add(field("pipeline_from", SystemDataType.PIPELINE_DATE_FROM)); + meta.setFields(fields); + + when(helper.pipelineMeta.findPreviousTransforms(any())).thenReturn(new ArrayList<>()); + when(helper.pipeline.getPreviousResult()).thenReturn(null); + @SuppressWarnings("unchecked") + IHopMetadataSerializer<ExecutionInfoLocation> serializer = mock(IHopMetadataSerializer.class); + when(metadataProvider.getSerializer(ExecutionInfoLocation.class)).thenReturn(serializer); + when(serializer.load(any())).thenReturn(null); + + SystemData transform = newTransform(); + transform.setInputRowMeta(new RowMeta()); + transform.addRowSetToOutputRowSets(new QueueRowSet()); + + assertTrue(transform.init()); + assertFalse(transform.processRow()); + + IRowSet outputRowSet = transform.getOutputRowSets().getFirst(); + Object[] output = outputRowSet.getRow(); + assertNotNull(output); + assertTrue(output.length >= 4); + assertEquals( + 0L, + ((Number) + Objects.requireNonNull( + valueByFieldName(outputRowSet.getRowMeta(), output, "nr_errors"))) + .longValue()); + assertEquals(false, valueByFieldName(outputRowSet.getRowMeta(), output, "result")); + assertNull(valueByFieldName(outputRowSet.getRowMeta(), output, "log")); + assertNull(valueByFieldName(outputRowSet.getRowMeta(), output, "pipeline_from")); + } + + private SystemData newTransform() { + TransformMeta tm = helper.transformMeta; + PipelineMeta pm = helper.pipelineMeta; + SystemData input = new SystemData(tm, meta, data, 0, pm, helper.pipeline); + input.setMetadataProvider(metadataProvider); + return input; + } + + private static SystemDataMeta.SystemInfoField field(String name, SystemDataType type) { + SystemDataMeta.SystemInfoField field = new SystemDataMeta.SystemInfoField(); + field.setFieldName(name); + field.setFieldType(type); + return field; + } + + private static Object value(IRowMeta rowMeta, Object[] output, SystemDataType type) { + int index = rowMeta.indexOfValue("f_" + type.name()); + if (index < 0) { + return null; + } + return output[index]; + } + + private static Object valueByFieldName(IRowMeta rowMeta, Object[] output, String fieldName) { + int index = rowMeta.indexOfValue(fieldName); + if (index < 0) { + return null; + } + return output[index]; + } + + private static Result createPreviousResult() { + Result previousResult = new Result(); + previousResult.setResult(true); + previousResult.setExitStatus(11); + previousResult.setEntryNr(12); + previousResult.setNrFilesRetrieved(13); + previousResult.setNrLinesDeleted(14); + previousResult.setNrLinesInput(15); + previousResult.setNrLinesOutput(16); + previousResult.setNrLinesRead(17); + previousResult.setNrLinesRejected(18); + previousResult.setNrLinesUpdated(19); + previousResult.setNrLinesWritten(20); + previousResult.setStopped(true); + previousResult.setNrErrors(21); + previousResult.setLogText("log-text"); + previousResult.getRows().add(null); + previousResult.getResultFiles().put("f1", mock(ResultFile.class)); + return previousResult; + } +} diff --git a/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataTypeTests.java b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataTypeTests.java new file mode 100644 index 0000000000..8f17ebb9da --- /dev/null +++ b/plugins/transforms/systemdata/src/test/java/org/apache/hop/pipeline/transforms/systemdata/SystemDataTypeTests.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hop.pipeline.transforms.systemdata; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import org.junit.jupiter.api.Test; + +/** Unit test for {@link SystemDataType} */ +class SystemDataTypeTests { + + @Test + void testEnumBasicProperties() { + SystemDataType type = SystemDataType.SYSTEM_DATE; + + assertEquals("system date (variable)", type.getCode()); + assertNotNull(type.getDescription()); + assertFalse(type.getDescription().isEmpty()); + } + + @Test + void testNoneDefault() { + SystemDataType none = SystemDataType.NONE; + + assertEquals("", none.getCode()); + assertNotNull(none.getDescription()); + } + + @Test + void testLookupDescription_success() { + String desc = SystemDataType.SYSTEM_DATE.getDescription(); + + SystemDataType result = SystemDataType.lookupDescription(desc); + assertEquals(SystemDataType.SYSTEM_DATE, result); + } + + @Test + void testLookupDescription_notFound() { + SystemDataType result = SystemDataType.lookupDescription("not-exist"); + assertEquals(SystemDataType.NONE, result); + } + + @Test + void testGetDescriptions() { + String[] descriptions = SystemDataType.getDescriptions(); + + assertNotNull(descriptions); + assertTrue(descriptions.length > 0); + + boolean found = false; + for (String desc : descriptions) { + if (desc.equals(SystemDataType.SYSTEM_DATE.getDescription())) { + found = true; + break; + } + } + + assertTrue(found); + } + + @Test + void testAllEnumHaveCodeAndDescription() { + for (SystemDataType type : SystemDataType.values()) { + assertNotNull(type.getCode(), type.name() + " code is null"); + assertNotNull(type.getDescription(), type.name() + " description is null"); + } + } + + @Test + void testDescriptionUnique() { + Set<String> set = new HashSet<>(); + + for (SystemDataType type : SystemDataType.values()) { + String desc = type.getDescription(); + assertTrue(set.add(desc), "Duplicate description: " + desc); + } + } + + @Test + void testDescriptionNotFallbackKey() { + for (SystemDataType type : SystemDataType.values()) { + if (type.equals(SystemDataType.NONE)) { + continue; + } + + String desc = type.getDescription(); + assertFalse( + desc.contains("SystemDataMeta.TypeDesc."), "i18n not resolved for: " + type.name()); + } + } +}
