This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/main by this push:
new 994cbc3dd8 Check number of copies and throw clean error, fixes #6517
(#6543)
994cbc3dd8 is described below
commit 994cbc3dd858b6a05421b41a2757fa8bf7a5f6de
Author: Hans Van Akelyen <[email protected]>
AuthorDate: Tue Feb 10 21:07:04 2026 +0100
Check number of copies and throw clean error, fixes #6517 (#6543)
---
.../java/org/apache/hop/pipeline/Pipeline.java | 298 +++++----------------
.../pipeline/transform/TransformInitThread.java | 8 +
.../hopgui/file/pipeline/HopGuiPipelineGraph.java | 46 +++-
.../ui/hopgui/messages/messages_en_US.properties | 9 +-
4 files changed, 122 insertions(+), 239 deletions(-)
diff --git a/engine/src/main/java/org/apache/hop/pipeline/Pipeline.java
b/engine/src/main/java/org/apache/hop/pipeline/Pipeline.java
index c4e5564a98..9f978dbdba 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/Pipeline.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/Pipeline.java
@@ -42,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
+import lombok.Setter;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.vfs2.FileName;
import org.apache.commons.vfs2.FileObject;
@@ -180,26 +182,20 @@ public abstract class Pipeline
/** The parent logging object interface (this could be a pipeline or a
workflow). */
private ILoggingObject parent;
- /** Indicates that we want to do a topological sort of the transforms in a
GUI. */
- private boolean sortingTransformsTopologically;
+ @Getter @Setter private boolean sortingTransformsTopologically;
/** Indicates that we are running in preview mode... */
private boolean preview;
- /** Keeps track of when this pipeline started preparation */
- private Date executionStartDate;
+ @Setter private Date executionStartDate;
- /** Keeps track of when this pipeline ended preparation */
- private Date executionEndDate;
+ @Setter private Date executionEndDate;
- /** The variable bindings for the pipeline. */
- private IVariables variables = new Variables();
+ @Getter private IVariables variables = new Variables();
- /** A list of all the row sets. */
- public List<IRowSet> rowsets;
+ @Getter public List<IRowSet> rowsets;
- /** A list of all the transforms. */
- private List<TransformMetaDataCombi> transforms;
+ @Getter private List<TransformMetaDataCombi> transforms;
/** Constant indicating a dispatch type of 1-to-1. */
public static final int TYPE_DISP_1_1 = 1;
@@ -253,8 +249,7 @@ public abstract class Pipeline
public static final String CONFIGURATION_IN_EXPORT_FILENAME =
"__pipeline_execution_configuration__.xml";
- /** Whether safe mode is enabled. */
- private boolean safeModeEnabled;
+ @Setter private boolean safeModeEnabled;
/** Int value for storage pipeline statuses */
private final AtomicInteger status;
@@ -262,7 +257,7 @@ public abstract class Pipeline
/** Boolean to check if pipeline is already stopped */
private final AtomicBoolean isAlreadyStopped = new AtomicBoolean(false);
- /** Plugins can use this to add additional data samplers to the pipeline. */
+ @Getter @Setter
protected List<IExecutionDataSampler<? extends IExecutionDataSamplerStore>>
dataSamplers;
/**
@@ -292,8 +287,7 @@ public abstract class Pipeline
/** Whether the pipeline is ready to start. */
private boolean readyToStart;
- /** Transform performance snapshots. */
- private Map<String, List<PerformanceSnapShot>> transformPerformanceSnapShots;
+ @Getter @Setter private Map<String, List<PerformanceSnapShot>>
transformPerformanceSnapShots;
/** The transform performance snapshot timer. */
private Timer transformPerformanceSnapShotTimer;
@@ -326,11 +320,9 @@ public abstract class Pipeline
/** The last transform performance snapshot sequence number added. */
private int lastTransformPerformanceSnapshotSeqNrAdded;
- /** The active sub-pipelines. */
- private Map<String, IPipelineEngine> activeSubPipelines;
+ @Setter @Getter private Map<String, IPipelineEngine> activeSubPipelines;
- /** The active subjobs */
- private Map<String, IWorkflowEngine<WorkflowMeta>> activeSubWorkflows;
+ @Getter @Setter private Map<String, IWorkflowEngine<WorkflowMeta>>
activeSubWorkflows;
/** The transform performance snapshot size limit. */
private int transformPerformanceSnapshotSizeLimit;
@@ -346,26 +338,24 @@ public abstract class Pipeline
private Result previousResult;
- protected List<RowMetaAndData> resultRows;
+ @Getter @Setter protected List<RowMetaAndData> resultRows;
- protected List<ResultFile> resultFiles;
+ @Getter @Setter protected List<ResultFile> resultFiles;
/** The command line arguments for the pipeline. */
protected String[] arguments;
- private HttpServletResponse servletResponse;
+ @Getter private HttpServletResponse servletResponse;
- private HttpServletRequest servletRequest;
+ @Setter @Getter private HttpServletRequest servletRequest;
private final Map<String, Object> extensionDataMap;
- protected int rowSetSize;
+ @Getter @Setter protected int rowSetSize;
- /** Whether the feedback is shown. */
- protected boolean feedbackShown;
+ @Setter protected boolean feedbackShown;
- /** The feedback size. */
- protected int feedbackSize;
+ @Setter protected int feedbackSize;
/** Instantiates a new pipeline. */
public Pipeline() {
@@ -639,18 +629,17 @@ public abstract class Pipeline
// How many times do we start the source transform?
int thisCopies = thisTransform.getCopies(this);
-
if (thisCopies < 0) {
- // This can only happen if a variable is used that didn't resolve to
a positive integer
- // value
- //
- throw new HopException(
- BaseMessages.getString(
- PKG, "Pipeline.Log.TransformCopiesNotCorrectlyDefined",
thisTransform.getName()));
+ thisCopies =
+ 1; // use 1 for rowset math; transform will be marked failed
during allocation
}
// How many times do we start the target transform?
int nextCopies = nextTransform.getCopies(this);
+ if (nextCopies < 0) {
+ nextCopies =
+ 1; // use 1 for rowset math; transform will be marked failed
during allocation
+ }
// Are we re-partitioning?
boolean repartitioning;
@@ -802,6 +791,43 @@ public abstract class Pipeline
// How many copies are launched of this transform?
int nrCopies = transformMeta.getCopies(this);
+ if (nrCopies <= 0) {
+ // Variable didn't resolve or resolved to 0 - follow same pattern as
transform init
+ // failure: add one combi with errors=1 and STATUS_STOPPED so pipeline
ends cleanly
+ String copiesStr =
+ transformMeta.getCopiesString() != null ?
transformMeta.getCopiesString() : "?";
+ log.logError(
+ BaseMessages.getString(
+ PKG,
+ "Pipeline.Log.TransformCopiesNotCorrectlyDefined",
+ copiesStr,
+ transformMeta.getName()));
+
+ TransformMetaDataCombi combi = new TransformMetaDataCombi();
+ combi.transformName = transformMeta.getName();
+ combi.copy = 0;
+ combi.transformMeta = transformMeta;
+ combi.meta = transformMeta.getTransform();
+ ITransformData data = combi.meta.createTransformData();
+ combi.data = data;
+ ITransform transform =
+ combi.meta.createTransform(transformMeta, data, 0, pipelineMeta,
this);
+ transform.initializeFrom(this);
+ transform.setMetadataProvider(metadataProvider);
+ combi.transform = transform;
+
+ transform.setErrors(1);
+ combi.data.setStatus(ComponentExecutionStatus.STATUS_STOPPED);
+
+ if (combi.transform instanceof ILoggingObject) {
+ ILogChannel logChannel = combi.transform.getLogChannel();
+ logChannel.setLogLevel(logLevel);
+ logChannel.setGatheringMetrics(log.isGatheringMetrics());
+ }
+ transforms.add(combi);
+ continue;
+ }
+
if (log.isDebug()) {
log.logDebug(
BaseMessages.getString(
@@ -888,6 +914,9 @@ public abstract class Pipeline
// Metadata wise we need to do the same trick in PipelineMeta
//
for (TransformMetaDataCombi combi : transforms) {
+ if (combi.data.getStatus() == ComponentExecutionStatus.STATUS_STOPPED) {
+ continue; // pre-failed (e.g. invalid copies), init was skipped
+ }
if (combi.transformMeta.isDoingErrorHandling()) {
combi.transform.identifyErrorOutput();
}
@@ -1845,22 +1874,6 @@ public abstract class Pipeline
return null;
}
- /**
- * Gets sortingTransformsTopologically
- *
- * @return value of sortingTransformsTopologically
- */
- public boolean isSortingTransformsTopologically() {
- return sortingTransformsTopologically;
- }
-
- /**
- * @param sortingTransformsTopologically The sortingTransformsTopologically
to set
- */
- public void setSortingTransformsTopologically(boolean
sortingTransformsTopologically) {
- this.sortingTransformsTopologically = sortingTransformsTopologically;
- }
-
/**
* Gets the meta-data for the pipeline.
*
@@ -1881,24 +1894,6 @@ public abstract class Pipeline
this.pipelineMeta = pipelineMeta;
}
- /**
- * Gets the rowsets for the pipeline.
- *
- * @return a list of rowsets
- */
- public List<IRowSet> getRowsets() {
- return rowsets;
- }
-
- /**
- * Gets a list of transforms in the pipeline.
- *
- * @return a list of the transforms in the pipeline
- */
- public List<TransformMetaDataCombi> getTransforms() {
- return transforms;
- }
-
protected void setTransforms(List<TransformMetaDataCombi> transforms) {
this.transforms = transforms;
}
@@ -1976,15 +1971,6 @@ public abstract class Pipeline
return list;
}
- /**
- * Turn on safe mode during running: the pipeline will run slower but with
more checking enabled.
- *
- * @param safeModeEnabled true for safe mode
- */
- public void setSafeModeEnabled(boolean safeModeEnabled) {
- this.safeModeEnabled = safeModeEnabled;
- }
-
/**
* Checks whether safe mode is enabled.
*
@@ -2485,26 +2471,6 @@ public abstract class Pipeline
this.preview = preview;
}
- /**
- * Gets a named list (map) of transform performance snapshots.
- *
- * @return a named list (map) of transform performance snapshots
- */
- public Map<String, List<PerformanceSnapShot>>
getTransformPerformanceSnapShots() {
- return transformPerformanceSnapShots;
- }
-
- /**
- * Sets the named list (map) of transform performance snapshots.
- *
- * @param transformPerformanceSnapShots a named list (map) of transform
performance snapshots to
- * set
- */
- public void setTransformPerformanceSnapShots(
- Map<String, List<PerformanceSnapShot>> transformPerformanceSnapShots) {
- this.transformPerformanceSnapShots = transformPerformanceSnapShots;
- }
-
@Override
public void addExecutionStartedListener(
IExecutionStartedListener<IPipelineEngine<PipelineMeta>> listener) {
@@ -2878,15 +2844,6 @@ public abstract class Pipeline
return activeSubWorkflows.get(subWorkflowName);
}
- /**
- * Gets the active sub-workflows.
- *
- * @return a map (by name) of the active sub-workflows
- */
- public Map<String, IWorkflowEngine<WorkflowMeta>> getActiveSubWorkflows() {
- return activeSubWorkflows;
- }
-
/**
* Gets the container object ID.
*
@@ -2984,22 +2941,6 @@ public abstract class Pipeline
}
}
- public List<ResultFile> getResultFiles() {
- return resultFiles;
- }
-
- public void setResultFiles(List<ResultFile> resultFiles) {
- this.resultFiles = resultFiles;
- }
-
- public List<RowMetaAndData> getResultRows() {
- return resultRows;
- }
-
- public void setResultRows(List<RowMetaAndData> resultRows) {
- this.resultRows = resultRows;
- }
-
@Override
public Result getPreviousResult() {
return previousResult;
@@ -3064,18 +3005,6 @@ public abstract class Pipeline
this.servletResponse = response;
}
- public HttpServletResponse getServletResponse() {
- return servletResponse;
- }
-
- public void setServletRequest(HttpServletRequest request) {
- this.servletRequest = request;
- }
-
- public HttpServletRequest getServletRequest() {
- return servletRequest;
- }
-
public synchronized void doTopologySortOfTransforms() {
// The bubble sort algorithm in contrast to the QuickSort or MergeSort
// algorithms
@@ -3224,13 +3153,6 @@ public abstract class Pipeline
return executionStartDate;
}
- /**
- * @param executionStartDate The executionStartDate to set
- */
- public void setExecutionStartDate(Date executionStartDate) {
- this.executionStartDate = executionStartDate;
- }
-
/**
* Gets executionEndDate
*
@@ -3241,13 +3163,6 @@ public abstract class Pipeline
return executionEndDate;
}
- /**
- * @param executionEndDate The executionEndDate to set
- */
- public void setExecutionEndDate(Date executionEndDate) {
- this.executionEndDate = executionEndDate;
- }
-
@Override
public Map<String, Object> getExtensionDataMap() {
return extensionDataMap;
@@ -3522,22 +3437,6 @@ public abstract class Pipeline
executionFinishedListeners.add(listener);
}
- /**
- * Gets rowSetSize
- *
- * @return value of rowSetSize
- */
- public int getRowSetSize() {
- return rowSetSize;
- }
-
- /**
- * @param rowSetSize The rowSetSize to set
- */
- public void setRowSetSize(int rowSetSize) {
- this.rowSetSize = rowSetSize;
- }
-
/**
* Gets feedbackShown
*
@@ -3548,13 +3447,6 @@ public abstract class Pipeline
return feedbackShown;
}
- /**
- * @param feedbackShown The feedbackShown to set
- */
- public void setFeedbackShown(boolean feedbackShown) {
- this.feedbackShown = feedbackShown;
- }
-
/**
* Gets feedbackSize
*
@@ -3565,13 +3457,6 @@ public abstract class Pipeline
return feedbackSize;
}
- /**
- * @param feedbackSize The feedbackSize to set
- */
- public void setFeedbackSize(int feedbackSize) {
- this.feedbackSize = feedbackSize;
- }
-
/**
* @deprecated Gets executionStartedListeners
* @return value of executionStartedListeners
@@ -3630,38 +3515,6 @@ public abstract class Pipeline
this.pipelineRunConfiguration = pipelineRunConfiguration;
}
- /**
- * Gets activeSubPipelines
- *
- * @return value of activeSubPipelines
- */
- public Map<String, IPipelineEngine> getActiveSubPipelines() {
- return activeSubPipelines;
- }
-
- /**
- * @param activeSubPipelines The activeSubPipelines to set
- */
- public void setActiveSubPipelines(Map<String, IPipelineEngine>
activeSubPipelines) {
- this.activeSubPipelines = activeSubPipelines;
- }
-
- /**
- * @param activeSubWorkflows The activeSubWorkflows to set
- */
- public void setActiveSubWorkflows(Map<String, IWorkflowEngine<WorkflowMeta>>
activeSubWorkflows) {
- this.activeSubWorkflows = activeSubWorkflows;
- }
-
- /**
- * Gets variables
- *
- * @return value of variables
- */
- public IVariables getVariables() {
- return variables;
- }
-
/**
* @param variables The variables to set
*/
@@ -3669,25 +3522,6 @@ public abstract class Pipeline
this.variables = variables;
}
- /**
- * Gets dataSamplers
- *
- * @return value of dataSamplers
- */
- public List<IExecutionDataSampler<? extends IExecutionDataSamplerStore>>
getDataSamplers() {
- return dataSamplers;
- }
-
- /**
- * Sets dataSamplers
- *
- * @param dataSamplers value of dataSamplers
- */
- public void setDataSamplers(
- List<IExecutionDataSampler<? extends IExecutionDataSamplerStore>>
dataSamplers) {
- this.dataSamplers = dataSamplers;
- }
-
@Override
public <Store extends IExecutionDataSamplerStore, Sampler extends
IExecutionDataSampler<Store>>
void addExecutionDataSampler(Sampler sampler) {
diff --git
a/engine/src/main/java/org/apache/hop/pipeline/transform/TransformInitThread.java
b/engine/src/main/java/org/apache/hop/pipeline/transform/TransformInitThread.java
index 9aa2f8e190..c1ecd4362e 100644
---
a/engine/src/main/java/org/apache/hop/pipeline/transform/TransformInitThread.java
+++
b/engine/src/main/java/org/apache/hop/pipeline/transform/TransformInitThread.java
@@ -60,6 +60,14 @@ public class TransformInitThread implements Runnable {
return;
}
+ // Already marked as failed (e.g. invalid number of copies) - skip init,
same outcome as init
+ // failure
+ if (combi.data.getStatus() == ComponentExecutionStatus.STATUS_STOPPED) {
+ finished = true;
+ ok = false;
+ return;
+ }
+
try {
combi.transform.getLogChannel().snap(Metrics.METRIC_TRANSFORM_INIT_START);
diff --git
a/ui/src/main/java/org/apache/hop/ui/hopgui/file/pipeline/HopGuiPipelineGraph.java
b/ui/src/main/java/org/apache/hop/ui/hopgui/file/pipeline/HopGuiPipelineGraph.java
index 7819fa8b51..3cfeed8cef 100644
---
a/ui/src/main/java/org/apache/hop/ui/hopgui/file/pipeline/HopGuiPipelineGraph.java
+++
b/ui/src/main/java/org/apache/hop/ui/hopgui/file/pipeline/HopGuiPipelineGraph.java
@@ -2518,15 +2518,46 @@ public class HopGuiPipelineGraph extends
HopGuiAbstractGraph
copies(transformMeta);
}
+ /**
+ * Validates that the number of copies is either a positive integer or a
variable (${VARIABLE}).
+ *
+ * @param value the string to validate
+ * @return true if valid (number or ${...} variable pattern)
+ */
+ private boolean isValidCopiesString(String value) {
+ if (Utils.isEmpty(value)) {
+ return false;
+ }
+ String trimmed = value.trim();
+ // Positive integer: one or more digits
+ if (trimmed.matches("\\d+")) {
+ return true;
+ }
+ // Variable pattern: ${VARIABLE}
+ return trimmed.matches("\\$\\{[^}]+\\}");
+ }
+
public void copies(TransformMeta transformMeta) {
final boolean multipleOK = checkNumberOfCopies(pipelineMeta,
transformMeta);
selectedTransforms = null;
- String tt = BaseMessages.getString(PKG,
"PipelineGraph.Dialog.NrOfCopiesOfTransform.Title");
- String mt = BaseMessages.getString(PKG,
"PipelineGraph.Dialog.NrOfCopiesOfTransform.Message");
EnterStringDialog nd =
- new EnterStringDialog(hopShell(), transformMeta.getCopiesString(), tt,
mt, true, variables);
+ new EnterStringDialog(
+ hopShell(),
+ transformMeta.getCopiesString(),
+ BaseMessages.getString(PKG,
"PipelineGraph.Dialog.NrOfCopiesOfTransform.Title"),
+ BaseMessages.getString(PKG,
"PipelineGraph.Dialog.NrOfCopiesOfTransform.Message"),
+ true,
+ variables);
String cop = nd.open();
if (!Utils.isEmpty(cop)) {
+ cop = cop.trim();
+ if (!isValidCopiesString(cop)) {
+ modalMessageDialog(
+ BaseMessages.getString(PKG,
"PipelineGraph.Dialog.InvalidNrOfCopies.Title"),
+ BaseMessages.getString(PKG,
"PipelineGraph.Dialog.InvalidNrOfCopies.Message"),
+ SWT.OK | SWT.ICON_ERROR);
+ return;
+ }
int copies = Const.toInt(hopGui.getVariables().resolve(cop), -1);
if (copies > 1 && !multipleOK) {
@@ -4893,12 +4924,19 @@ public class HopGuiPipelineGraph extends
HopGuiAbstractGraph
private void checkErrorVisuals() {
if (pipeline.getErrors() > 0) {
// Get the logging text and filter it out. Store it in the
transformLogMap...
+ // Use non-empty placeholder when log is null/empty so the transform is
still marked red
+ // (e.g. invalid copies transform never ran init so has no log output).
//
transformLogMap = new HashMap<>();
for (IEngineComponent component : pipeline.getComponents()) {
if (component.getErrors() > 0) {
String logText = component.getLogText();
- transformLogMap.put(component.getName(), logText);
+ transformLogMap.put(
+ component.getName(),
+ Utils.isEmpty(logText)
+ ? BaseMessages.getString(
+ PKG,
"PipelineGraph.Dialog.TransformHadErrors.SeePipelineLog")
+ : logText);
}
}
diff --git
a/ui/src/main/resources/org/apache/hop/ui/hopgui/messages/messages_en_US.properties
b/ui/src/main/resources/org/apache/hop/ui/hopgui/messages/messages_en_US.properties
index fef49ce307..2167ff8df9 100644
---
a/ui/src/main/resources/org/apache/hop/ui/hopgui/messages/messages_en_US.properties
+++
b/ui/src/main/resources/org/apache/hop/ui/hopgui/messages/messages_en_US.properties
@@ -174,13 +174,16 @@ PipelineGraph.Dialog.LoopAfterHopEnabled.Title=Error
PipelineGraph.Dialog.MultipleCopiesAreNotAllowedHere.Message=Sorry, multiple
copies of a transform are not allowed here.\nWe suggest that you insert a Dummy
transform here.\nThe next transform(s) can then be launched in multiple copies
as usual.
PipelineGraph.Dialog.MultipleCopiesAreNotAllowedHere.Title=Sorry
PipelineGraph.Dialog.NoteEditor.Title=Notes
-PipelineGraph.Dialog.NrOfCopiesOfTransform.Message=Number of copies (1 or
higher)
+PipelineGraph.Dialog.InvalidNrOfCopies.Message=Number of copies must be a
positive integer (e.g. 1, 2) or a variable.
+PipelineGraph.Dialog.InvalidNrOfCopies.Title=Invalid number of copies
+PipelineGraph.Dialog.NrOfCopiesOfTransform.Message=Number of copies (1 or
higher), or variable
PipelineGraph.Dialog.NrOfCopiesOfTransform.Title=Nr of copies of transform...
PipelineGraph.Dialog.Option.SplitHop.DoNotAskAgain=Don''t ask again
PipelineGraph.Dialog.SplitHop.Message=Do you want to split this hop?
PipelineGraph.Dialog.SplitHop.Title=Split hop?
PipelineGraph.Dialog.TransformDescription.Message=Transform description\:
PipelineGraph.Dialog.TransformDescription.Title=Transform description dialog
+PipelineGraph.Dialog.TransformHadErrors.SeePipelineLog=Transform had errors
(see pipeline log).
PipelineGraph.EditTransform.Tooltip=Edit the properties of this transform
PipelineGraph.ExecutionResultsPanel.CloseButton.Tooltip=Close the execution
results panel
PipelineGraph.ExecutionResultsPanel.MaxButton.Tooltip=Maximize the execution
results panel
@@ -211,7 +214,7 @@ PipelineGraph.TransformOutputConnector.Tooltip=Click on
this output connector to
PipelineGraph.TransformSupportsErrorHandling.Tooltip=This transform supports
error handling. Click on this icon to create a new link.
PipelineGraph.ViewOutput.OutputDialog.First.Text=First
PipelineGraph.ViewOutput.OutputDialog.Header=Output of {0}
-PipelineGraph.ViewOutput.OutputDialog.Last.Text=Last (REVERSE ORDER!)
+PipelineGraph.ViewOutput.OutputDialog.Last.Text=Last (REVERSE ORDER!)
PipelineGraph.ViewOutput.OutputDialog.OutputRows.Text=output rows of transform
{0}
PipelineGraph.ViewOutput.OutputDialog.Random.Text=Random
PipelineLog.Button.ClearLog=\ &Clear log
@@ -284,4 +287,4 @@ WorkflowLog.Dialog.SaveJobBeforeRunning.Title=Save workflow
WorkflowLog.System.ERROR=ERROR
WorkflowLog.System.EXCEPTION=EXCEPTION
PipelineLog.Dialog.Pause.Tooltip=Pause the logs
-PipelineLog.Dialog.Resume.Tooltip=Resume the logs
+PipelineLog.Dialog.Resume.Tooltip=Resume the logs
\ No newline at end of file