This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new ec0a99a  STORM-3362 fixing 'EventHubSpout uses a blocking receiver in 
nextTuple()' issue
     new c4aa01a  Merge branch 'fixing_blocking_call_in_EventHubSpout' of 
https://github.com/CaperAi/storm into STORM-3362-merge
ec0a99a is described below

commit ec0a99a2fd747c0fe53f8dc7acb6c229e0e44d44
Author: York Yang <[email protected]>
AuthorDate: Tue Mar 26 10:19:21 2019 -0400

    STORM-3362 fixing 'EventHubSpout uses a blocking receiver in nextTuple()' 
issue
---
 .../org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java   | 6 ++++++
 .../org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java    | 9 +++++++++
 2 files changed, 15 insertions(+)

diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
index 83dc850..c5d6303 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -23,6 +23,7 @@ import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.PartitionReceiver;
 import com.microsoft.azure.servicebus.ServiceBusException;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -39,6 +40,7 @@ public class EventHubReceiverImpl implements 
IEventHubReceiver {
     private final String entityName;
     private final String partitionId;
     private final String consumerGroupName;
+    private final int receiverTimeoutInMillis;
 
     private PartitionReceiver receiver;
     private EventHubClient ehClient;
@@ -51,6 +53,7 @@ public class EventHubReceiverImpl implements 
IEventHubReceiver {
         this.entityName = config.getEntityPath();
         this.partitionId = partitionId;
         this.consumerGroupName = config.getConsumerGroupName();
+        this.receiverTimeoutInMillis = config.getReceiverTimeoutInMillis();
         receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
         receiveApiCallCount = new CountMetric();
         receiveMessageCount = new CountMetric();
@@ -82,6 +85,9 @@ public class EventHubReceiverImpl implements 
IEventHubReceiver {
                 throw new RuntimeException("Eventhub receiver must have " +
                                            "an offset or time to be created");
             }
+            if (receiver != null) {
+                
receiver.setReceiveTimeout(Duration.ofMillis(receiverTimeoutInMillis));
+            }
         } catch (IOException e) {
             logger.error("Exception in creating ehclient" + e.toString());
             throw new EventHubException(e);
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index cd27b11..3e486b5 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -42,6 +42,7 @@ public class EventHubSpoutConfig implements Serializable {
     // disabling filter
     private String connectionString;
     private String topologyName;
+    private int receiverTimeoutInMillis = 1000; // default
     private IEventDataScheme scheme = new StringEventDataScheme();
     private String consumerGroupName = 
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
     private String outputStreamId;
@@ -252,4 +253,12 @@ public class EventHubSpoutConfig implements Serializable {
     public void setOutputStreamId(String outputStreamId) {
         this.outputStreamId = outputStreamId;
     }
+
+    public int getReceiverTimeoutInMillis() {
+        return receiverTimeoutInMillis;
+    }
+
+    public void setReceiverTimeoutInMillis(int receiverTimeoutInMillis) {
+        this.receiverTimeoutInMillis = receiverTimeoutInMillis;
+    }
 }

Reply via email to