asfgit closed pull request #3233: NIFI-5892 Wait timestamp lingers, potentially
messing up downstream w…
URL: https://github.com/apache/nifi/pull/3233
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index cfec15e263..e2975560ac 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -84,7 +84,8 @@
)
@WritesAttributes({
@WritesAttribute(attribute = "wait.start.timestamp", description =
"All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
- + "initial epoch timestamp when the file first entered this processor.
This is used to determine the expiration time of the FlowFile."),
+ + "initial epoch timestamp when the file first entered this processor.
This is used to determine the expiration time of the FlowFile. "
+ + "This attribute is not written when the FlowFile is transferred to
failure or success"),
@WritesAttribute(attribute = "wait.counter.<counterName>", description
= "If a signal exists when the processor runs, "
+ "each count value in the signal is copied.")
})
@@ -314,6 +315,8 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
final Consumer<FlowFile> transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile);
+ // This flowFile is now failed, our tracking is done, clear the
timer
+ flowFile = clearWaitState(session, flowFile);
getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
};
@@ -328,9 +331,19 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
relationship = Relationship.SELF;
}
}
-
+ final Relationship finalRelationship = relationship;
final List<FlowFile> flowFilesWithSignalAttributes =
routedFlowFiles.getValue().stream()
- .map(f -> copySignalAttributes(session, f,
signalRef.get(), originalSignalCounts,
replaceOriginalAttributes)).collect(Collectors.toList());
+ .map(f -> {
+ if (REL_SUCCESS.equals(finalRelationship)) {
+ // These flowFiles will be exiting the wait, clear
the timer
+ f = clearWaitState(session, f);
+ }
+ return copySignalAttributes(session, f,
signalRef.get(),
+ originalSignalCounts,
+ replaceOriginalAttributes);
+ })
+ .collect(Collectors.toList());
+
session.transfer(flowFilesWithSignalAttributes, relationship);
};
@@ -470,6 +483,10 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
}
+ private FlowFile clearWaitState(final ProcessSession session, final
FlowFile flowFile) {
+ return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
+ }
+
private FlowFile copySignalAttributes(final ProcessSession session, final
FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount,
final boolean replaceOriginal) {
if (signal == null) {
return flowFile;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index a4df2f37e9..360567b534 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -69,6 +69,8 @@ public void testWait() throws InitializationException {
// no cache key attribute
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ // timestamp must be present
+
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
}
@@ -101,6 +103,7 @@ public void testExpired() throws InitializationException,
InterruptedException {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
runner.enqueue(ff);
@@ -126,7 +129,7 @@ public void testCounterExpired() throws
InitializationException, InterruptedExce
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+ ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
runner.enqueue(ff);
@@ -164,6 +167,7 @@ public void testBadWaitStartTimestamp() throws
InitializationException, Interrup
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
+
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
}
@@ -178,6 +182,7 @@ public void testEmptyReleaseSignal() throws
InitializationException, Interrupted
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
+
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
}
@@ -232,6 +237,8 @@ public void testReplaceAttributes() throws
InitializationException, IOException
final MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue",
outputFlowFile.getAttribute("notify.only"));
// show that uuid was not overwritten
@@ -276,6 +283,8 @@ public void testKeepOriginalAttributes() throws
InitializationException, IOExcep
final MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue",
outputFlowFile.getAttribute("notify.only"));
// show that uuid was not overwritten
@@ -317,7 +326,7 @@ public void testWaitForTotalCount() throws
InitializationException, IOException
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 2nd iteration.
*/
@@ -331,6 +340,7 @@ public void testWaitForTotalCount() throws
InitializationException, IOException
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 3rd iteration.
@@ -342,6 +352,7 @@ public void testWaitForTotalCount() throws
InitializationException, IOException
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 4th iteration.
@@ -357,6 +368,9 @@ public void testWaitForTotalCount() throws
InitializationException, IOException
final MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // wait timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+
// show a new attribute was copied from the cache
assertEquals("notifyValue",
outputFlowFile.getAttribute("notify.only"));
// show that uuid was not overwritten
@@ -401,6 +415,7 @@ public void testWaitForSpecificCount() throws
InitializationException, IOExcepti
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 2nd iteration.
@@ -415,6 +430,7 @@ public void testWaitForSpecificCount() throws
InitializationException, IOExcepti
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since counter-B doesn't reach to 2.
waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 3rd iteration.
@@ -429,6 +445,7 @@ public void testWaitForSpecificCount() throws
InitializationException, IOExcepti
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 4th iteration.
@@ -444,6 +461,8 @@ public void testWaitForSpecificCount() throws
InitializationException, IOExcepti
final MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // wait timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue",
outputFlowFile.getAttribute("notify.only"));
// show that uuid was not overwritten
@@ -498,6 +517,8 @@ public void testDecrementCache() throws
ConcurrentModificationException, IOExcep
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
// expect counter to be decremented to 0 and releasable count remains
1.
@@ -514,6 +535,8 @@ public void testDecrementCache() throws
ConcurrentModificationException, IOExcep
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// All counters are consumed.
outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services