Repository: samza Updated Branches: refs/heads/0.14.1 44d6f40f7 -> 64ea33d7d
SAMZA-1706: lazy initialization for eventhub system producer We are seeing slow shutdown issue for eventhub system producers for users who only use eventhub consumer (but then Samza system creates both consumer and producer together no matter what). As a workaround, add lazy initialization to the producer to avoid the slow shutdown Author: Hai Lu <[email protected]> Reviewers: Jagadish <[email protected]> Closes #511 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/64ea33d7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/64ea33d7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/64ea33d7 Branch: refs/heads/0.14.1 Commit: 64ea33d7d89c5f7f934a0cc8290f3ad3f925e98a Parents: 44d6f40 Author: Hai Lu <[email protected]> Authored: Wed May 9 15:32:44 2018 -0700 Committer: xiliu <[email protected]> Committed: Thu May 10 09:19:59 2018 -0700 ---------------------------------------------------------------------- .../producer/EventHubSystemProducer.java | 82 ++++++++++++-------- 1 file changed, 49 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/64ea33d7/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java index 690e59e..b9afea7 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java @@ -95,6 +95,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer { private volatile boolean isStarted = false; + // We implement lazy initialization for producer as a workaround for + // slow shutdown issue. + private boolean isInitialized = false; + /** * Per partition event hub client. Partitions from the same stream may share the same client, * depends on config PerPartitionConnection. See {@link EventHubConfig} @@ -127,9 +131,12 @@ public class EventHubSystemProducer extends AsyncSystemProducer { this.interceptors = interceptors; this.maxMessageSize = config.getSkipMessagesLargerThan(systemName); this.eventHubClientManagerFactory = eventHubClientManagerFactory; + } + + private void init() { + LOG.info("Initializing EventHubSystemProducer"); // Fetches the stream ids List<String> streamIds = config.getStreams(systemName); - // Create and initiate connections to Event Hubs // even if PerPartitionConnection == true, we still need a stream level event hub for initial metadata (fetching // partition count) @@ -139,6 +146,40 @@ public class EventHubSystemProducer extends AsyncSystemProducer { perStreamEventHubClientManagers.put(streamId, ehClient); ehClient.init(); } + + // Create partition senders if required + if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) { + // Create all partition senders + perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> { + EventHubClient ehClient = samzaEventHubClient.getEventHubClient(); + + try { + Map<Integer, PartitionSender> partitionSenders = new HashMap<>(); + long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName); + Integer numPartitions = + ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount(); + + for (int i = 0; i < numPartitions; i++) { + String partitionId = String.valueOf(i); + EventHubClientManager perPartitionClientManager = + createOrGetEventHubClientManagerForPartition(streamId, i); + PartitionSender partitionSender = + perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId); + partitionSenders.put(i, partitionSender); + } + + streamPartitionSenders.put(streamId, partitionSenders); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + String msg = "Failed to fetch number of Event Hub partitions for partition sender creation"; + throw new SamzaException(msg, e); + } catch (EventHubException | IllegalArgumentException e) { + String msg = "Creation of partition sender failed with exception"; + throw new SamzaException(msg, e); + } + }); + } + isInitialized = true; + LOG.info("EventHubSystemProducer initialized."); } @Override @@ -183,38 +224,6 @@ public class EventHubSystemProducer extends AsyncSystemProducer { super.start(); LOG.info("Starting system producer."); - // Create partition senders if required - if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) { - // Create all partition senders - perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> { - EventHubClient ehClient = samzaEventHubClient.getEventHubClient(); - - try { - Map<Integer, PartitionSender> partitionSenders = new HashMap<>(); - long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName); - Integer numPartitions = - ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount(); - - for (int i = 0; i < numPartitions; i++) { - String partitionId = String.valueOf(i); - EventHubClientManager perPartitionClientManager = - createOrGetEventHubClientManagerForPartition(streamId, i); - PartitionSender partitionSender = - perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId); - partitionSenders.put(i, partitionSender); - } - - streamPartitionSenders.put(streamId, partitionSenders); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - String msg = "Failed to fetch number of Event Hub partitions for partition sender creation"; - throw new SamzaException(msg, e); - } catch (EventHubException | IllegalArgumentException e) { - String msg = "Creation of partition sender failed with exception"; - throw new SamzaException(msg, e); - } - }); - } - // Initiate metrics streamIds.forEach((streamId) -> { eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE)); @@ -245,6 +254,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer { if (!isStarted) { throw new SamzaException("Trying to call send before the producer is started."); } + if (!isInitialized) { + // lazy initialization on the first send + init(); + } String streamId = config.getStreamId(envelope.getSystemStream().getStream()); @@ -371,6 +384,9 @@ public class EventHubSystemProducer extends AsyncSystemProducer { .forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS)); perPartitionEventHubClients.clear(); } + isStarted = false; + isInitialized = false; + LOG.info("EventHubSystemProducer stopped."); } /**
