Repository: nifi
Updated Branches:
  refs/heads/master 769530bea -> b5550ffcf


NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 ofSec to of Mils

This closes #1167


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b5550ffc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b5550ffc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b5550ffc

Branch: refs/heads/master
Commit: b5550ffcf5d3b968551b91a41f146b18dcc0a9ec
Parents: 769530b
Author: Joe N <[email protected]>
Authored: Sat Oct 29 11:48:20 2016 -0400
Committer: Oleg Zhurakousky <[email protected]>
Committed: Wed Nov 2 16:10:37 2016 -0400

----------------------------------------------------------------------
 .../azure/eventhub/GetAzureEventHub.java        | 33 +++++++++++++++++++-
 .../azure/eventhub/GetAzureEventHubTest.java    |  4 +++
 2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b5550ffc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index 0455fe9..12ea1ba 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -40,6 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -121,6 +122,20 @@ public class GetAzureEventHub extends AbstractProcessor {
             .expressionLanguageSupported(false)
             .required(false)
             .build();
+    static final PropertyDescriptor RECEIVER_FETCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Partition Recivier Fetch Size")
+            .description("The number of events that a receiver should fetch 
from an EventHubs partition before returning. Default(100)")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .required(false)
+            .build();
+    static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Partiton Receiver Timeout (millseconds)")
+            .description("The amount of time a Partition Receiver should wait 
to receive the Fetch Size before returning. Default(60000)")
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .required(false)
+            .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -130,6 +145,8 @@ public class GetAzureEventHub extends AbstractProcessor {
     private final ConcurrentMap<String, PartitionReceiver> 
partitionToReceiverMap = new ConcurrentHashMap<>();
     private volatile BlockingQueue<String> partitionNames = new 
LinkedBlockingQueue<>();
     private volatile Instant configuredEnqueueTime;
+    private volatile int receiverFetchSize;
+    private volatile Duration receiverFetchTimeout;
     private EventHubClient eventHubClient;
 
     private final static List<PropertyDescriptor> propertyDescriptors;
@@ -148,6 +165,9 @@ public class GetAzureEventHub extends AbstractProcessor {
         _propertyDescriptors.add(NUM_PARTITIONS);
         _propertyDescriptors.add(CONSUMER_GROUP);
         _propertyDescriptors.add(ENQUEUE_TIME);
+        _propertyDescriptors.add(RECEIVER_FETCH_SIZE);
+        _propertyDescriptors.add(RECEIVER_FETCH_TIMEOUT);
+
         propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -201,6 +221,7 @@ public class GetAzureEventHub extends AbstractProcessor {
                     partitionId,
                     configuredEnqueueTime == null ? Instant.now() : 
configuredEnqueueTime).get();
 
+            receiver.setReceiveTimeout(receiverFetchTimeout == null ? 
Duration.ofMillis(60000) : receiverFetchTimeout);
             partitionToReceiverMap.put(partitionId, receiver);
             return receiver;
 
@@ -222,7 +243,7 @@ public class GetAzureEventHub extends AbstractProcessor {
         final PartitionReceiver receiver;
         try {
             receiver = getReceiver(context, partitionId);
-            return receiver.receive(100).get();
+            return receiver.receive(receiverFetchSize).get();
         } catch (final IOException | ServiceBusException | ExecutionException 
| InterruptedException e) {
             throw new ProcessException(e);
         }
@@ -264,6 +285,16 @@ public class GetAzureEventHub extends AbstractProcessor {
         } else {
             configuredEnqueueTime = null;
         }
+        if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
+            receiverFetchSize = 
context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
+        } else {
+            receiverFetchSize = 100;
+        }
+        if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
+            receiverFetchTimeout = 
Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
+        } else {
+            receiverFetchTimeout = null;
+        }
 
         final String connectionString = new ConnectionStringBuilder(namespace, 
eventHubName, policyName, policyKey).toString();
         setupReceiver(connectionString);

http://git-wip-us.apache.org/repos/asf/nifi/blob/b5550ffc/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index a63458f..951384a 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -64,6 +64,10 @@ public class GetAzureEventHubTest {
         testRunner.assertValid();
         
testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z");
         testRunner.assertValid();
+        testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5");
+        testRunner.assertValid();
+        
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
+        testRunner.assertValid();
     }
     
     @Test

Reply via email to