This is an automated email from the ASF dual-hosted git repository.
mcasters pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/master by this push:
new ab8255f3e7 Fix for issue #3056 : NullPointerException on pipelines
new 6354c7430e Merge pull request #3453 from mattcasters/master
ab8255f3e7 is described below
commit ab8255f3e7ca1be3371ddfea6b9b08859d7aba0e
Author: Matt Casters <[email protected]>
AuthorDate: Thu Nov 23 12:54:50 2023 +0100
Fix for issue #3056 : NullPointerException on pipelines
---
.../hop/pipeline/transform/BaseTransform.java | 58 +++++++++++++++++++---
1 file changed, 52 insertions(+), 6 deletions(-)
diff --git
a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
index a53cedd91a..4cbe10c34f 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
@@ -1577,6 +1577,7 @@ public class BaseTransform<Meta extends ITransformMeta,
Data extends ITransformD
row = inputRowSet.getRowImmediate();
}
if (row != null) {
+ obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
}
} else {
@@ -1629,6 +1630,7 @@ public class BaseTransform<Meta extends ITransformMeta,
Data extends ITransformD
row = inputRowSet.getRowWait(waitingTime.get(), TimeUnit.MILLISECONDS);
boolean timeout = false;
if (row != null) {
+ obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
blockPointer++;
waitingTime.reset();
@@ -1663,6 +1665,7 @@ public class BaseTransform<Meta extends ITransformMeta,
Data extends ITransformD
inputRowSetsLock.writeLock().unlock();
}
} else {
+ obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
}
} else {
@@ -1691,17 +1694,12 @@ public class BaseTransform<Meta extends ITransformMeta,
Data extends ITransformD
nextInputStream();
inputRowSet = currentInputStream();
row = getRowFrom(inputRowSet);
+ obtainInputRowMeta(row, inputRowSet);
}
} finally {
inputRowSetsLock.readLock().unlock();
}
- // Also set the meta data on the first occurrence.
- // or if prevTransforms.length > 1 inputRowMeta can be changed
- if (inputRowMeta == null || prevTransforms.length > 1) {
- inputRowMeta = inputRowSet.getRowMeta();
- }
-
if (row != null) {
// OK, before we return the row, let's see if we need to check on mixing
// row compositions...
@@ -1721,6 +1719,54 @@ public class BaseTransform<Meta extends ITransformMeta,
Data extends ITransformD
return row;
}
+ /**
+ * The first non-null row we get we'll lock in the row metadata.
+ * For scenarios with multiple inputs, we move the metadata around (e.g.
Merge Rows).
+ *
+ * @param row The input row (not null!)
+ * @param inputRowSet The row set we're reading from right now
+ */
+ private void obtainInputRowMeta(Object[] row, IRowSet inputRowSet) {
+ if (row==null) {
+ return;
+ }
+
+ // Set the row metadata on the first occurrence.
+ // If prevTransforms.length > 1, inputRowMeta can be changed as well.
+ //
+ if (inputRowMeta == null || prevTransforms.length > 1) {
+ inputRowMeta = inputRowSet.getRowMeta();
+ }
+
+ // Extra sanity check
+ //
+ if (row!=null && inputRowMeta == null) {
+ int nr = 0;
+ for (IRowSet rowSet : inputRowSets) {
+ log.logMinimal(
+ "===> Input row set #"
+ + nr
+ + ", done? "
+ + rowSet.isDone()
+ + ", size="
+ + rowSet.size()
+ + ", metadata? "
+ + (rowSet.getRowMeta() != null));
+ nr++;
+ }
+ log.logMinimal("===> Current input row set nr=" + currentInputRowSetNr);
+
+ throw new RuntimeException(
+ "No row metadata obtained for row "
+ + Arrays.toString(row)
+ + Const.CR
+ + "inputRowSet.getRowMeta()="
+ + inputRowSet.getRowMeta()
+ + ", inputRowSets.size()="
+ + inputRowSets.size());
+ }
+ }
+
/**
* IRowHandler controls how getRow/putRow are handled. The default
IRowHandler will simply call
* {@link #handleGetRow()} and {@link #handlePutRow(IRowMeta, Object[])}