asfgit closed pull request #3233: NIFI-5892 Wait timestamp lingers, potentially messing up downstream w… URL: https://github.com/apache/nifi/pull/3233
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index cfec15e263..e2975560ac 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -84,7 +84,8 @@ ) @WritesAttributes({ @WritesAttribute(attribute = "wait.start.timestamp", description = "All FlowFiles will have an attribute 'wait.start.timestamp', which sets the " - + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile."), + + "initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. " + + "This attribute is not written when the FlowFile is transferred to failure or success"), @WritesAttribute(attribute = "wait.counter.<counterName>", description = "If a signal exists when the processor runs, " + "each count value in the signal is copied.") }) @@ -314,6 +315,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final Consumer<FlowFile> transferToFailure = flowFile -> { flowFile = session.penalize(flowFile); + // This flowFile is now failed, our tracking is done, clear the timer + flowFile = clearWaitState(session, flowFile); getFlowFilesFor.apply(REL_FAILURE).add(flowFile); }; @@ -328,9 +331,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session relationship = Relationship.SELF; } } - + final Relationship finalRelationship = relationship; final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream() - .map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList()); + .map(f -> { + if (REL_SUCCESS.equals(finalRelationship)) { + // These flowFiles will be exiting the wait, clear the timer + f = clearWaitState(session, f); + } + return copySignalAttributes(session, f, signalRef.get(), + originalSignalCounts, + replaceOriginalAttributes); + }) + .collect(Collectors.toList()); + session.transfer(flowFilesWithSignalAttributes, relationship); }; @@ -470,6 +483,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } + private FlowFile clearWaitState(final ProcessSession session, final FlowFile flowFile) { + return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP); + } + private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) { if (signal == null) { return flowFile; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index a4df2f37e9..360567b534 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -69,6 +69,8 @@ public void testWait() throws InitializationException { // no cache key attribute runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); + // timestamp must be present + runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP); runner.clearTransferState(); } @@ -101,6 +103,7 @@ public void testExpired() throws InitializationException, InterruptedException { runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); runner.clearTransferState(); runner.enqueue(ff); @@ -126,7 +129,7 @@ public void testCounterExpired() throws InitializationException, InterruptedExce runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); MockFlowFile ff = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - + ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); runner.clearTransferState(); runner.enqueue(ff); @@ -164,6 +167,7 @@ public void testBadWaitStartTimestamp() throws InitializationException, Interrup runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); runner.clearTransferState(); } @@ -178,6 +182,7 @@ public void testEmptyReleaseSignal() throws InitializationException, Interrupted runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1); runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total"); + runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); runner.clearTransferState(); } @@ -232,6 +237,8 @@ public void testReplaceAttributes() throws InitializationException, IOException final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + // timer cleared + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // show a new attribute was copied from the cache assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); // show that uuid was not overwritten @@ -276,6 +283,8 @@ public void testKeepOriginalAttributes() throws InitializationException, IOExcep final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + // timer cleared + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // show a new attribute was copied from the cache assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); // show that uuid was not overwritten @@ -317,7 +326,7 @@ public void testWaitForTotalCount() throws InitializationException, IOException runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); - + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); /* * 2nd iteration. */ @@ -331,6 +340,7 @@ public void testWaitForTotalCount() throws InitializationException, IOException runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); // Still waiting since total count doesn't reach to 3. waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); /* * 3rd iteration. @@ -342,6 +352,7 @@ public void testWaitForTotalCount() throws InitializationException, IOException runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); // Still waiting since total count doesn't reach to 3. waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); /* * 4th iteration. @@ -357,6 +368,9 @@ public void testWaitForTotalCount() throws InitializationException, IOException final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + // wait timer cleared + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); + // show a new attribute was copied from the cache assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); // show that uuid was not overwritten @@ -401,6 +415,7 @@ public void testWaitForSpecificCount() throws InitializationException, IOExcepti runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); MockFlowFile waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); /* * 2nd iteration. @@ -415,6 +430,7 @@ public void testWaitForSpecificCount() throws InitializationException, IOExcepti runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); // Still waiting since counter-B doesn't reach to 2. waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); /* * 3rd iteration. @@ -429,6 +445,7 @@ public void testWaitForSpecificCount() throws InitializationException, IOExcepti runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1); // Still waiting since total count doesn't reach to 3. waitingFlowFile = runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0); + waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP); /* * 4th iteration. @@ -444,6 +461,8 @@ public void testWaitForSpecificCount() throws InitializationException, IOExcepti final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + // wait timer cleared + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // show a new attribute was copied from the cache assertEquals("notifyValue", outputFlowFile.getAttribute("notify.only")); // show that uuid was not overwritten @@ -498,6 +517,8 @@ public void testDecrementCache() throws ConcurrentModificationException, IOExcep runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + // timer cleared + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); outputFlowFile.assertAttributeEquals("wait.counter.counter", "2"); // expect counter to be decremented to 0 and releasable count remains 1. @@ -514,6 +535,8 @@ public void testDecrementCache() throws ConcurrentModificationException, IOExcep runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + // timer cleared + outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP); // All counters are consumed. outputFlowFile.assertAttributeEquals("wait.counter.counter", "0"); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services