Repository: nifi
Updated Branches:
  refs/heads/master 9cfc13423 -> e46f4131f


NIFI-3452: Add Wait processor Wait Mode property
Ensure back-pressure is active until downstream processing completes.

This closes #1490.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e46f4131
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e46f4131
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e46f4131

Branch: refs/heads/master
Commit: e46f4131f9eade2eb9fe108241107688795904f5
Parents: 9cfc134
Author: Koji Kawamura <[email protected]>
Authored: Thu Feb 9 21:30:43 2017 +0900
Committer: Pierre Villard <[email protected]>
Committed: Tue Feb 14 19:15:27 2017 +0100

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/Wait.java   | 119 ++++++++++++-------
 .../nifi/processors/standard/TestWait.java      |  17 +++
 2 files changed, 95 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e46f4131/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index 0bd5ca6..8ae85d8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -82,24 +82,27 @@ public class Wait extends AbstractProcessor {
 
     // Identifies the distributed map cache client
     public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new 
PropertyDescriptor.Builder()
-        .name("Distributed Cache Service")
-        .description("The Controller Service that is used to check for release 
signals from a corresponding Notify processor")
-        .required(true)
-        .identifiesControllerService(AtomicDistributedMapCacheClient.class)
-        .build();
+            .name("distributed-cache-service")
+            .displayName("Distributed Cache Service")
+            .description("The Controller Service that is used to check for 
release signals from a corresponding Notify processor")
+            .required(true)
+            .identifiesControllerService(AtomicDistributedMapCacheClient.class)
+            .build();
 
     // Selects the FlowFile attribute or expression, whose value is used as 
cache key
     public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new 
PropertyDescriptor.Builder()
-        .name("Release Signal Identifier")
-        .description("A value, or the results of an Attribute Expression 
Language statement, which will " +
-            "be evaluated against a FlowFile in order to determine the release 
signal cache key")
-        .required(true)
-        
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
 true))
-        .expressionLanguageSupported(true)
-        .build();
+            .name("release-signal-id")
+            .displayName("Release Signal Identifier")
+            .description("A value, or the results of an Attribute Expression 
Language statement, which will " +
+                "be evaluated against a FlowFile in order to determine the 
release signal cache key")
+            .required(true)
+            
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
 true))
+            .expressionLanguageSupported(true)
+            .build();
 
     public static final PropertyDescriptor TARGET_SIGNAL_COUNT = new 
PropertyDescriptor.Builder()
-            .name("Target Signal Count")
+            .name("target-signal-count")
+            .displayName("Target Signal Count")
             .description("A value, or the results of an Attribute Expression 
Language statement, which will " +
                     "be evaluated against a FlowFile in order to determine the 
target signal count. " +
                     "This processor checks whether the signal count has 
reached this number. " +
@@ -112,7 +115,8 @@ public class Wait extends AbstractProcessor {
             .build();
 
     public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new 
PropertyDescriptor.Builder()
-            .name("Signal Counter Name")
+            .name("signal-counter-name")
+            .displayName("Signal Counter Name")
             .description("A value, or the results of an Attribute Expression 
Language statement, which will " +
                     "be evaluated against a FlowFile in order to determine the 
signal counter name. " +
                     "If not specified, this processor checks the total count 
in a signal.")
@@ -123,13 +127,14 @@ public class Wait extends AbstractProcessor {
 
     // Selects the FlowFile attribute or expression, whose value is used as 
cache key
     public static final PropertyDescriptor EXPIRATION_DURATION = new 
PropertyDescriptor.Builder()
-        .name("Expiration Duration")
-        .description("Indicates the duration after which waiting flow files 
will be routed to the 'expired' relationship")
-        .required(true)
-        .defaultValue("10 min")
-        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .build();
+            .name("expiration-duration")
+            .displayName("Expiration Duration")
+            .description("Indicates the duration after which waiting flow 
files will be routed to the 'expired' relationship")
+            .required(true)
+            .defaultValue("10 min")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
 
     public static final AllowableValue ATTRIBUTE_COPY_REPLACE = new 
AllowableValue("replace", "Replace if present",
             "When cached attributes are copied onto released FlowFiles, they 
replace any matching attributes.");
@@ -138,33 +143,54 @@ public class Wait extends AbstractProcessor {
             "Attributes on released FlowFiles are not overwritten by copied 
cached attributes.");
 
     public static final PropertyDescriptor ATTRIBUTE_COPY_MODE = new 
PropertyDescriptor.Builder()
-        .name("Attribute Copy Mode")
-        .description("Specifies how to handle attributes copied from flow 
files entering the Notify processor")
-        .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
-        .required(true)
-        .allowableValues(ATTRIBUTE_COPY_REPLACE, ATTRIBUTE_COPY_KEEP_ORIGINAL)
-        .expressionLanguageSupported(false)
-        .build();
+            .name("attribute-copy-mode")
+            .displayName("Attribute Copy Mode")
+            .description("Specifies how to handle attributes copied from flow 
files entering the Notify processor")
+            .defaultValue(ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue())
+            .required(true)
+            .allowableValues(ATTRIBUTE_COPY_REPLACE, 
ATTRIBUTE_COPY_KEEP_ORIGINAL)
+            .expressionLanguageSupported(false)
+            .build();
+
+    public static final AllowableValue WAIT_MODE_TRANSFER_TO_WAIT = new 
AllowableValue("wait", "Transfer to wait relationship",
+            "Transfer a FlowFile to the 'wait' relationship when whose release 
signal has not been notified yet." +
+                    " This mode allows other incoming FlowFiles to be enqueued 
by moving FlowFiles into the wait relationship.");
+
+    public static final AllowableValue WAIT_MODE_KEEP_IN_UPSTREAM = new 
AllowableValue("keep", "Keep in the upstream connection",
+            "Transfer a FlowFile to the upstream connection where it comes 
from when whose release signal has not been notified yet." +
+                    " This mode helps keeping upstream connection being full 
so that the upstream source processor" +
+                    " will not be scheduled while back-pressure is active and 
limit incoming FlowFiles. ");
+
+    public static final PropertyDescriptor WAIT_MODE = new 
PropertyDescriptor.Builder()
+            .name("wait-mode")
+            .displayName("Wait Mode")
+            .description("Specifies how to handle a FlowFile waiting for a 
notify signal")
+            .defaultValue(WAIT_MODE_TRANSFER_TO_WAIT.getValue())
+            .required(true)
+            .allowableValues(WAIT_MODE_TRANSFER_TO_WAIT, 
WAIT_MODE_KEEP_IN_UPSTREAM)
+            .expressionLanguageSupported(false)
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("A FlowFile with a matching release signal in the cache 
will be routed to this relationship")
-        .build();
+            .name("success")
+            .description("A FlowFile with a matching release signal in the 
cache will be routed to this relationship")
+            .build();
 
     public static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("When the cache cannot be reached, or if the Release 
Signal Identifier evaluates to null or empty, FlowFiles will be routed to this 
relationship")
-        .build();
+            .name("failure")
+            .description("When the cache cannot be reached, or if the Release 
Signal Identifier evaluates to null or empty, FlowFiles will be routed to this 
relationship")
+            .build();
 
     public static final Relationship REL_WAIT = new Relationship.Builder()
-        .name("wait")
-        .description("A FlowFile with no matching release signal in the cache 
will be routed to this relationship")
-        .build();
+            .name("wait")
+            .description("A FlowFile with no matching release signal in the 
cache will be routed to this relationship")
+            .build();
 
     public static final Relationship REL_EXPIRED = new Relationship.Builder()
-        .name("expired")
-        .description("A FlowFile that has exceeded the configured Expiration 
Duration will be routed to this relationship")
-        .build();
+            .name("expired")
+            .description("A FlowFile that has exceeded the configured 
Expiration Duration will be routed to this relationship")
+            .build();
+
     private final Set<Relationship> relationships;
 
     public Wait() {
@@ -185,6 +211,7 @@ public class Wait extends AbstractProcessor {
         descriptors.add(EXPIRATION_DURATION);
         descriptors.add(DISTRIBUTED_CACHE_SERVICE);
         descriptors.add(ATTRIBUTE_COPY_MODE);
+        descriptors.add(WAIT_MODE);
         return descriptors;
     }
 
@@ -259,7 +286,17 @@ public class Wait extends AbstractProcessor {
                 if (logger.isDebugEnabled()) {
                     logger.debug("No release signal yet for {} on FlowFile 
{}", new Object[] {signalId, flowFile});
                 }
-                session.transfer(flowFile, REL_WAIT);
+
+
+                final String waitMode = 
context.getProperty(WAIT_MODE).getValue();
+                if (WAIT_MODE_TRANSFER_TO_WAIT.getValue().equals(waitMode)) {
+                    session.transfer(flowFile, REL_WAIT);
+                } else if 
(WAIT_MODE_KEEP_IN_UPSTREAM.getValue().equals(waitMode)) {
+                    // Transfer to self.
+                    session.transfer(flowFile);
+                } else {
+                    throw new ProcessException("Unsupported wait mode " + 
waitMode + " was specified.");
+                }
                 return;
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e46f4131/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index e1117d5..0ce0045 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+
 import org.apache.nifi.processors.standard.TestNotify.MockCacheClient;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
@@ -63,6 +64,22 @@ public class TestWait {
     }
 
     @Test
+    public void testWaitKeepInUpstreamConnection() throws 
InitializationException {
+        runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, 
"${releaseSignalAttribute}");
+        runner.setProperty(Wait.WAIT_MODE, Wait.WAIT_MODE_KEEP_IN_UPSTREAM);
+
+        final Map<String, String> props = new HashMap<>();
+        props.put("releaseSignalAttribute", "1");
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run();
+
+        // The FlowFile stays in the upstream connection.
+        runner.assertQueueNotEmpty();
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testExpired() throws InitializationException, 
InterruptedException {
         runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, 
"${releaseSignalAttribute}");
         runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");

Reply via email to