This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1330b92  NIFI-5892 Wait timestamp lingers, potentially messing up 
downstream wait-notify pairs Clear the wait timestamp when transferring to 
failur or success
1330b92 is described below

commit 1330b92cfa01f9dd04bade7a05a047ad6f080c4b
Author: Otto Fowler <[email protected]>
AuthorDate: Fri Dec 21 09:15:24 2018 -0500

    NIFI-5892 Wait timestamp lingers, potentially messing up downstream 
wait-notify pairs
    Clear the wait timestamp when transferring to failur or success
    
    replace explicit attribute clear with function call, refactor and integrate 
into existing tests per review
    
    This closes #3233.
    
    Signed-off-by: Koji Kawamura <[email protected]>
---
 .../org/apache/nifi/processors/standard/Wait.java  | 23 +++++++++++++++---
 .../apache/nifi/processors/standard/TestWait.java  | 27 ++++++++++++++++++++--
 2 files changed, 45 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index cfec15e..e297556 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -84,7 +84,8 @@ import static 
org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE
 )
 @WritesAttributes({
         @WritesAttribute(attribute = "wait.start.timestamp", description = 
"All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
-        + "initial epoch timestamp when the file first entered this processor. 
 This is used to determine the expiration time of the FlowFile."),
+        + "initial epoch timestamp when the file first entered this processor. 
 This is used to determine the expiration time of the FlowFile.  "
+        + "This attribute is not written when the FlowFile is transferred to 
failure or success"),
         @WritesAttribute(attribute = "wait.counter.<counterName>", description 
= "If a signal exists when the processor runs, "
         + "each count value in the signal is copied.")
 })
@@ -314,6 +315,8 @@ public class Wait extends AbstractProcessor {
 
         final Consumer<FlowFile> transferToFailure = flowFile -> {
             flowFile = session.penalize(flowFile);
+            // This flowFile is now failed, our tracking is done, clear the 
timer
+            flowFile = clearWaitState(session, flowFile);
             getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
         };
 
@@ -328,9 +331,19 @@ public class Wait extends AbstractProcessor {
                     relationship = Relationship.SELF;
                 }
             }
-
+            final Relationship finalRelationship = relationship;
             final List<FlowFile> flowFilesWithSignalAttributes = 
routedFlowFiles.getValue().stream()
-                    .map(f -> copySignalAttributes(session, f, 
signalRef.get(), originalSignalCounts, 
replaceOriginalAttributes)).collect(Collectors.toList());
+                    .map(f -> {
+                        if (REL_SUCCESS.equals(finalRelationship)) {
+                            // These flowFiles will be exiting the wait, clear 
the timer
+                            f = clearWaitState(session, f);
+                        }
+                        return copySignalAttributes(session, f, 
signalRef.get(),
+                            originalSignalCounts,
+                            replaceOriginalAttributes);
+                    })
+                    .collect(Collectors.toList());
+
             session.transfer(flowFilesWithSignalAttributes, relationship);
         };
 
@@ -470,6 +483,10 @@ public class Wait extends AbstractProcessor {
 
     }
 
+    private FlowFile clearWaitState(final ProcessSession session, final 
FlowFile flowFile) {
+        return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
+    }
+
     private FlowFile copySignalAttributes(final ProcessSession session, final 
FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, 
final boolean replaceOriginal) {
         if (signal == null) {
             return flowFile;
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index 5b5b6fc..2ccb2fe 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -69,6 +69,8 @@ public class TestWait {
 
         // no cache key attribute
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+        // timestamp must be present
+        
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -101,6 +103,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile ff = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         runner.clearTransferState();
         runner.enqueue(ff);
@@ -126,7 +129,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile ff = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+        ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
         runner.enqueue(ff);
 
@@ -164,6 +167,7 @@ public class TestWait {
         runner.run();
 
         runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
+        
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -178,6 +182,7 @@ public class TestWait {
 
         runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
         
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
+        
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         runner.clearTransferState();
     }
 
@@ -231,6 +236,8 @@ public class TestWait {
 
         final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", 
outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -272,6 +279,8 @@ public class TestWait {
 
         final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", 
outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -310,7 +319,7 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
         /*
          * 2nd iteration.
          */
@@ -324,6 +333,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 3rd iteration.
@@ -335,6 +345,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 4th iteration.
@@ -350,6 +361,9 @@ public class TestWait {
 
         final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // wait timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", 
outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -391,6 +405,7 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         MockFlowFile waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 2nd iteration.
@@ -405,6 +420,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since counter-B doesn't reach to 2.
         waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 3rd iteration.
@@ -419,6 +435,7 @@ public class TestWait {
         runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
         // Still waiting since total count doesn't reach to 3.
         waitingFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+        waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
 
         /*
          * 4th iteration.
@@ -434,6 +451,8 @@ public class TestWait {
 
         final MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
 
+        // wait timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // show a new attribute was copied from the cache
         assertEquals("notifyValue", 
outputFlowFile.getAttribute("notify.only"));
         // show that the original attributes are still there
@@ -486,6 +505,8 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
         MockFlowFile outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
 
         // expect counter to be decremented to 0 and releasable count remains 
1.
@@ -502,6 +523,8 @@ public class TestWait {
         runner.run();
         runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
         outputFlowFile = 
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        // timer cleared
+        outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
         // All counters are consumed.
         outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");
 

Reply via email to