This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 1330b92 NIFI-5892 Wait timestamp lingers, potentially messing up
downstream wait-notify pairs Clear the wait timestamp when transferring to
failur or success
1330b92 is described below
commit 1330b92cfa01f9dd04bade7a05a047ad6f080c4b
Author: Otto Fowler <[email protected]>
AuthorDate: Fri Dec 21 09:15:24 2018 -0500
NIFI-5892 Wait timestamp lingers, potentially messing up downstream
wait-notify pairs
Clear the wait timestamp when transferring to failur or success
replace explicit attribute clear with function call, refactor and integrate
into existing tests per review
This closes #3233.
Signed-off-by: Koji Kawamura <[email protected]>
---
.../org/apache/nifi/processors/standard/Wait.java | 23 +++++++++++++++---
.../apache/nifi/processors/standard/TestWait.java | 27 ++++++++++++++++++++--
2 files changed, 45 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index cfec15e..e297556 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -84,7 +84,8 @@ import static
org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJE
)
@WritesAttributes({
@WritesAttribute(attribute = "wait.start.timestamp", description =
"All FlowFiles will have an attribute 'wait.start.timestamp', which sets the "
- + "initial epoch timestamp when the file first entered this processor.
This is used to determine the expiration time of the FlowFile."),
+ + "initial epoch timestamp when the file first entered this processor.
This is used to determine the expiration time of the FlowFile. "
+ + "This attribute is not written when the FlowFile is transferred to
failure or success"),
@WritesAttribute(attribute = "wait.counter.<counterName>", description
= "If a signal exists when the processor runs, "
+ "each count value in the signal is copied.")
})
@@ -314,6 +315,8 @@ public class Wait extends AbstractProcessor {
final Consumer<FlowFile> transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile);
+ // This flowFile is now failed, our tracking is done, clear the
timer
+ flowFile = clearWaitState(session, flowFile);
getFlowFilesFor.apply(REL_FAILURE).add(flowFile);
};
@@ -328,9 +331,19 @@ public class Wait extends AbstractProcessor {
relationship = Relationship.SELF;
}
}
-
+ final Relationship finalRelationship = relationship;
final List<FlowFile> flowFilesWithSignalAttributes =
routedFlowFiles.getValue().stream()
- .map(f -> copySignalAttributes(session, f,
signalRef.get(), originalSignalCounts,
replaceOriginalAttributes)).collect(Collectors.toList());
+ .map(f -> {
+ if (REL_SUCCESS.equals(finalRelationship)) {
+ // These flowFiles will be exiting the wait, clear
the timer
+ f = clearWaitState(session, f);
+ }
+ return copySignalAttributes(session, f,
signalRef.get(),
+ originalSignalCounts,
+ replaceOriginalAttributes);
+ })
+ .collect(Collectors.toList());
+
session.transfer(flowFilesWithSignalAttributes, relationship);
};
@@ -470,6 +483,10 @@ public class Wait extends AbstractProcessor {
}
+ private FlowFile clearWaitState(final ProcessSession session, final
FlowFile flowFile) {
+ return session.removeAttribute(flowFile, WAIT_START_TIMESTAMP);
+ }
+
private FlowFile copySignalAttributes(final ProcessSession session, final
FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount,
final boolean replaceOriginal) {
if (signal == null) {
return flowFile;
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index 5b5b6fc..2ccb2fe 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -69,6 +69,8 @@ public class TestWait {
// no cache key attribute
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
+ // timestamp must be present
+
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0).assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
}
@@ -101,6 +103,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
runner.enqueue(ff);
@@ -126,7 +129,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile ff =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+ ff.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
runner.enqueue(ff);
@@ -164,6 +167,7 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
+
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
}
@@ -178,6 +182,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists("wait.counter.total");
+
runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0).assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
runner.clearTransferState();
}
@@ -231,6 +236,8 @@ public class TestWait {
final MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue",
outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there
@@ -272,6 +279,8 @@ public class TestWait {
final MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue",
outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there
@@ -310,7 +319,7 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
-
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 2nd iteration.
*/
@@ -324,6 +333,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 3rd iteration.
@@ -335,6 +345,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 4th iteration.
@@ -350,6 +361,9 @@ public class TestWait {
final MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // wait timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
+
// show a new attribute was copied from the cache
assertEquals("notifyValue",
outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there
@@ -391,6 +405,7 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
MockFlowFile waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 2nd iteration.
@@ -405,6 +420,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since counter-B doesn't reach to 2.
waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 3rd iteration.
@@ -419,6 +435,7 @@ public class TestWait {
runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
// Still waiting since total count doesn't reach to 3.
waitingFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
+ waitingFlowFile.assertAttributeExists(Wait.WAIT_START_TIMESTAMP);
/*
* 4th iteration.
@@ -434,6 +451,8 @@ public class TestWait {
final MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // wait timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// show a new attribute was copied from the cache
assertEquals("notifyValue",
outputFlowFile.getAttribute("notify.only"));
// show that the original attributes are still there
@@ -486,6 +505,8 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
MockFlowFile outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
// expect counter to be decremented to 0 and releasable count remains
1.
@@ -502,6 +523,8 @@ public class TestWait {
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
outputFlowFile =
runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+ // timer cleared
+ outputFlowFile.assertAttributeNotExists(Wait.WAIT_START_TIMESTAMP);
// All counters are consumed.
outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");