Repository: samza Updated Branches: refs/heads/master 3f9b96704 -> 9f8682138
SAMZA-1615: Fix a couple of issues in ControlMessageSender Two issues I found during testing: 1) medaDataCache.getSystemStreamMetadata(): if we pass in partitionOnly to be true, it will actually not store the metadata into the cache, resulting every call being another query to kafka. I turned off the partitionOnly in order to make it in the cache. 2) change the log for info to debug. Author: xinyuiscool <[email protected]> Reviewers: Boris S <[email protected]> Closes #444 from xinyuiscool/SAMZA-1615 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f868213 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f868213 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f868213 Branch: refs/heads/master Commit: 9f8682138e5ec115ff38fb9882305e6921b13f96 Parents: 3f9b967 Author: Xinyu Liu <[email protected]> Authored: Thu Mar 15 16:47:22 2018 -0700 Committer: xiliu <[email protected]> Committed: Thu Mar 15 16:47:22 2018 -0700 ---------------------------------------------------------------------- .../samza/operators/impl/ControlMessageSender.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9f868213/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java b/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java index 3bdc361..4afca92 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java @@ -19,6 +19,7 @@ package org.apache.samza.operators.impl; +import org.apache.samza.SamzaException; import org.apache.samza.system.ControlMessage; import org.apache.samza.system.MessageType; import org.apache.samza.system.OutgoingMessageEnvelope; @@ -29,12 +30,16 @@ import org.apache.samza.task.MessageCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * This is a helper class to broadcast control messages to each partition of an intermediate stream */ class ControlMessageSender { private static final Logger LOG = LoggerFactory.getLogger(ControlMessageSender.class); + private static final Map<SystemStream, Integer> PARTITION_COUNT_CACHE = new ConcurrentHashMap<>(); private final StreamMetadataCache metadataCache; @@ -43,9 +48,15 @@ class ControlMessageSender { } void send(ControlMessage message, SystemStream systemStream, MessageCollector collector) { - SystemStreamMetadata metadata = metadataCache.getSystemStreamMetadata(systemStream, true); - int partitionCount = metadata.getSystemStreamPartitionMetadata().size(); - LOG.info(String.format("Broadcast %s message from task %s to %s with %s partition", + Integer partitionCount = PARTITION_COUNT_CACHE.computeIfAbsent(systemStream, ss -> { + SystemStreamMetadata metadata = metadataCache.getSystemStreamMetadata(ss, true); + if (metadata == null) { + throw new SamzaException("Unable to find metadata for stream " + systemStream); + } + return metadata.getSystemStreamPartitionMetadata().size(); + }); + + LOG.debug(String.format("Broadcast %s message from task %s to %s with %s partition", MessageType.of(message).name(), message.getTaskName(), systemStream, partitionCount)); for (int i = 0; i < partitionCount; i++) {
