Repository: samza
Updated Branches:
  refs/heads/master 89beb1fcc -> 9d404bc51


SAMZA-1706: lazy initialization for eventhub system producer

We are seeing slow shutdown issue for eventhub system producers for users who 
only use eventhub consumer (but then Samza system creates both consumer and 
producer together no matter what). As a workaround, add lazy initialization to 
the producer to avoid the slow shutdown

Author: Hai Lu <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #511 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d404bc5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d404bc5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d404bc5

Branch: refs/heads/master
Commit: 9d404bc515a4cbc81bb9922d8d44b02ef1e84fcf
Parents: 89beb1f
Author: Hai Lu <[email protected]>
Authored: Wed May 9 15:32:44 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Wed May 9 15:32:44 2018 -0700

----------------------------------------------------------------------
 .../producer/EventHubSystemProducer.java        | 82 ++++++++++++--------
 1 file changed, 49 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9d404bc5/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index 690e59e..b9afea7 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -95,6 +95,10 @@ public class EventHubSystemProducer extends 
AsyncSystemProducer {
 
   private volatile boolean isStarted = false;
 
+  // We implement lazy initialization for producer as a workaround for
+  // slow shutdown issue.
+  private boolean isInitialized = false;
+
   /**
    * Per partition event hub client. Partitions from the same stream may share 
the same client,
    * depends on config PerPartitionConnection. See {@link EventHubConfig}
@@ -127,9 +131,12 @@ public class EventHubSystemProducer extends 
AsyncSystemProducer {
     this.interceptors = interceptors;
     this.maxMessageSize = config.getSkipMessagesLargerThan(systemName);
     this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+  }
+
+  private void init() {
+    LOG.info("Initializing EventHubSystemProducer");
     // Fetches the stream ids
     List<String> streamIds = config.getStreams(systemName);
-
     // Create and initiate connections to Event Hubs
     // even if PerPartitionConnection == true, we still need a stream level 
event hub for initial metadata (fetching
     // partition count)
@@ -139,6 +146,40 @@ public class EventHubSystemProducer extends 
AsyncSystemProducer {
       perStreamEventHubClientManagers.put(streamId, ehClient);
       ehClient.init();
     }
+
+    // Create partition senders if required
+    if 
(PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
+      // Create all partition senders
+      perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) 
-> {
+          EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
+
+          try {
+            Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
+            long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
+            Integer numPartitions =
+                ehClient.getRuntimeInformation().get(timeoutMs, 
TimeUnit.MILLISECONDS).getPartitionCount();
+
+            for (int i = 0; i < numPartitions; i++) {
+              String partitionId = String.valueOf(i);
+              EventHubClientManager perPartitionClientManager =
+                  createOrGetEventHubClientManagerForPartition(streamId, i);
+              PartitionSender partitionSender =
+                  
perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId);
+              partitionSenders.put(i, partitionSender);
+            }
+
+            streamPartitionSenders.put(streamId, partitionSenders);
+          } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
+            String msg = "Failed to fetch number of Event Hub partitions for 
partition sender creation";
+            throw new SamzaException(msg, e);
+          } catch (EventHubException | IllegalArgumentException e) {
+            String msg = "Creation of partition sender failed with exception";
+            throw new SamzaException(msg, e);
+          }
+        });
+    }
+    isInitialized = true;
+    LOG.info("EventHubSystemProducer initialized.");
   }
 
   @Override
@@ -183,38 +224,6 @@ public class EventHubSystemProducer extends 
AsyncSystemProducer {
     super.start();
     LOG.info("Starting system producer.");
 
-    // Create partition senders if required
-    if 
(PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
-      // Create all partition senders
-      perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) 
-> {
-          EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
-
-          try {
-            Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
-            long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
-            Integer numPartitions =
-                ehClient.getRuntimeInformation().get(timeoutMs, 
TimeUnit.MILLISECONDS).getPartitionCount();
-
-            for (int i = 0; i < numPartitions; i++) {
-              String partitionId = String.valueOf(i);
-              EventHubClientManager perPartitionClientManager =
-                  createOrGetEventHubClientManagerForPartition(streamId, i);
-              PartitionSender partitionSender =
-                  
perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId);
-              partitionSenders.put(i, partitionSender);
-            }
-
-            streamPartitionSenders.put(streamId, partitionSenders);
-          } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-            String msg = "Failed to fetch number of Event Hub partitions for 
partition sender creation";
-            throw new SamzaException(msg, e);
-          } catch (EventHubException | IllegalArgumentException e) {
-            String msg = "Creation of partition sender failed with exception";
-            throw new SamzaException(msg, e);
-          }
-        });
-    }
-
     // Initiate metrics
     streamIds.forEach((streamId) -> {
         eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, 
EVENT_SKIP_RATE));
@@ -245,6 +254,10 @@ public class EventHubSystemProducer extends 
AsyncSystemProducer {
     if (!isStarted) {
       throw new SamzaException("Trying to call send before the producer is 
started.");
     }
+    if (!isInitialized) {
+      // lazy initialization on the first send
+      init();
+    }
 
     String streamId = 
config.getStreamId(envelope.getSystemStream().getStream());
 
@@ -371,6 +384,9 @@ public class EventHubSystemProducer extends 
AsyncSystemProducer {
           .forEach(ehClient -> 
ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
       perPartitionEventHubClients.clear();
     }
+    isStarted = false;
+    isInitialized = false;
+    LOG.info("EventHubSystemProducer stopped.");
   }
 
   /**

Reply via email to