Repository: samza Updated Branches: refs/heads/master 89beb1fcc -> 9d404bc51
SAMZA-1706: lazy initialization for eventhub system producer We are seeing slow shutdown issue for eventhub system producers for users who only use eventhub consumer (but then Samza system creates both consumer and producer together no matter what). As a workaround, add lazy initialization to the producer to avoid the slow shutdown Author: Hai Lu <[email protected]> Reviewers: Jagadish <[email protected]> Closes #511 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9d404bc5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9d404bc5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9d404bc5 Branch: refs/heads/master Commit: 9d404bc515a4cbc81bb9922d8d44b02ef1e84fcf Parents: 89beb1f Author: Hai Lu <[email protected]> Authored: Wed May 9 15:32:44 2018 -0700 Committer: Jagadish <[email protected]> Committed: Wed May 9 15:32:44 2018 -0700 ---------------------------------------------------------------------- .../producer/EventHubSystemProducer.java | 82 ++++++++++++-------- 1 file changed, 49 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9d404bc5/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java index 690e59e..b9afea7 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java @@ -95,6 +95,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer { private volatile boolean isStarted = false; + // We implement lazy initialization for producer as a workaround for + // slow shutdown issue. + private boolean isInitialized = false; + /** * Per partition event hub client. Partitions from the same stream may share the same client, * depends on config PerPartitionConnection. See {@link EventHubConfig} @@ -127,9 +131,12 @@ public class EventHubSystemProducer extends AsyncSystemProducer { this.interceptors = interceptors; this.maxMessageSize = config.getSkipMessagesLargerThan(systemName); this.eventHubClientManagerFactory = eventHubClientManagerFactory; + } + + private void init() { + LOG.info("Initializing EventHubSystemProducer"); // Fetches the stream ids List<String> streamIds = config.getStreams(systemName); - // Create and initiate connections to Event Hubs // even if PerPartitionConnection == true, we still need a stream level event hub for initial metadata (fetching // partition count) @@ -139,6 +146,40 @@ public class EventHubSystemProducer extends AsyncSystemProducer { perStreamEventHubClientManagers.put(streamId, ehClient); ehClient.init(); } + + // Create partition senders if required + if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) { + // Create all partition senders + perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> { + EventHubClient ehClient = samzaEventHubClient.getEventHubClient(); + + try { + Map<Integer, PartitionSender> partitionSenders = new HashMap<>(); + long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName); + Integer numPartitions = + ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount(); + + for (int i = 0; i < numPartitions; i++) { + String partitionId = String.valueOf(i); + EventHubClientManager perPartitionClientManager = + createOrGetEventHubClientManagerForPartition(streamId, i); + PartitionSender partitionSender = + perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId); + partitionSenders.put(i, partitionSender); + } + + streamPartitionSenders.put(streamId, partitionSenders); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + String msg = "Failed to fetch number of Event Hub partitions for partition sender creation"; + throw new SamzaException(msg, e); + } catch (EventHubException | IllegalArgumentException e) { + String msg = "Creation of partition sender failed with exception"; + throw new SamzaException(msg, e); + } + }); + } + isInitialized = true; + LOG.info("EventHubSystemProducer initialized."); } @Override @@ -183,38 +224,6 @@ public class EventHubSystemProducer extends AsyncSystemProducer { super.start(); LOG.info("Starting system producer."); - // Create partition senders if required - if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) { - // Create all partition senders - perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> { - EventHubClient ehClient = samzaEventHubClient.getEventHubClient(); - - try { - Map<Integer, PartitionSender> partitionSenders = new HashMap<>(); - long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName); - Integer numPartitions = - ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount(); - - for (int i = 0; i < numPartitions; i++) { - String partitionId = String.valueOf(i); - EventHubClientManager perPartitionClientManager = - createOrGetEventHubClientManagerForPartition(streamId, i); - PartitionSender partitionSender = - perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId); - partitionSenders.put(i, partitionSender); - } - - streamPartitionSenders.put(streamId, partitionSenders); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - String msg = "Failed to fetch number of Event Hub partitions for partition sender creation"; - throw new SamzaException(msg, e); - } catch (EventHubException | IllegalArgumentException e) { - String msg = "Creation of partition sender failed with exception"; - throw new SamzaException(msg, e); - } - }); - } - // Initiate metrics streamIds.forEach((streamId) -> { eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE)); @@ -245,6 +254,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer { if (!isStarted) { throw new SamzaException("Trying to call send before the producer is started."); } + if (!isInitialized) { + // lazy initialization on the first send + init(); + } String streamId = config.getStreamId(envelope.getSystemStream().getStream()); @@ -371,6 +384,9 @@ public class EventHubSystemProducer extends AsyncSystemProducer { .forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS)); perPartitionEventHubClients.clear(); } + isStarted = false; + isInitialized = false; + LOG.info("EventHubSystemProducer stopped."); } /**
