This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new ec0a99a STORM-3362 fixing 'EventHubSpout uses a blocking receiver in
nextTuple()' issue
new c4aa01a Merge branch 'fixing_blocking_call_in_EventHubSpout' of
https://github.com/CaperAi/storm into STORM-3362-merge
ec0a99a is described below
commit ec0a99a2fd747c0fe53f8dc7acb6c229e0e44d44
Author: York Yang <[email protected]>
AuthorDate: Tue Mar 26 10:19:21 2019 -0400
STORM-3362 fixing 'EventHubSpout uses a blocking receiver in nextTuple()'
issue
---
.../org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java | 6 ++++++
.../org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java | 9 +++++++++
2 files changed, 15 insertions(+)
diff --git
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
index 83dc850..c5d6303 100755
---
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
+++
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java
@@ -23,6 +23,7 @@ import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.servicebus.ServiceBusException;
import java.io.IOException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -39,6 +40,7 @@ public class EventHubReceiverImpl implements
IEventHubReceiver {
private final String entityName;
private final String partitionId;
private final String consumerGroupName;
+ private final int receiverTimeoutInMillis;
private PartitionReceiver receiver;
private EventHubClient ehClient;
@@ -51,6 +53,7 @@ public class EventHubReceiverImpl implements
IEventHubReceiver {
this.entityName = config.getEntityPath();
this.partitionId = partitionId;
this.consumerGroupName = config.getConsumerGroupName();
+ this.receiverTimeoutInMillis = config.getReceiverTimeoutInMillis();
receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
receiveApiCallCount = new CountMetric();
receiveMessageCount = new CountMetric();
@@ -82,6 +85,9 @@ public class EventHubReceiverImpl implements
IEventHubReceiver {
throw new RuntimeException("Eventhub receiver must have " +
"an offset or time to be created");
}
+ if (receiver != null) {
+
receiver.setReceiveTimeout(Duration.ofMillis(receiverTimeoutInMillis));
+ }
} catch (IOException e) {
logger.error("Exception in creating ehclient" + e.toString());
throw new EventHubException(e);
diff --git
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index cd27b11..3e486b5 100755
---
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -42,6 +42,7 @@ public class EventHubSpoutConfig implements Serializable {
// disabling filter
private String connectionString;
private String topologyName;
+ private int receiverTimeoutInMillis = 1000; // default
private IEventDataScheme scheme = new StringEventDataScheme();
private String consumerGroupName =
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
private String outputStreamId;
@@ -252,4 +253,12 @@ public class EventHubSpoutConfig implements Serializable {
public void setOutputStreamId(String outputStreamId) {
this.outputStreamId = outputStreamId;
}
+
+ public int getReceiverTimeoutInMillis() {
+ return receiverTimeoutInMillis;
+ }
+
+ public void setReceiverTimeoutInMillis(int receiverTimeoutInMillis) {
+ this.receiverTimeoutInMillis = receiverTimeoutInMillis;
+ }
}