Repository: samza
Updated Branches:
  refs/heads/master 3f9b96704 -> 9f8682138


SAMZA-1615: Fix a couple of issues in ControlMessageSender

Two issues I found during testing: 1) medaDataCache.getSystemStreamMetadata(): 
if we pass in partitionOnly to be true, it will actually not store the metadata 
into the cache, resulting every call being another query to kafka. I turned off 
the partitionOnly in order to make it in the cache. 2) change the log for info 
to debug.

Author: xinyuiscool <[email protected]>

Reviewers: Boris S <[email protected]>

Closes #444 from xinyuiscool/SAMZA-1615


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f868213
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f868213
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f868213

Branch: refs/heads/master
Commit: 9f8682138e5ec115ff38fb9882305e6921b13f96
Parents: 3f9b967
Author: Xinyu Liu <[email protected]>
Authored: Thu Mar 15 16:47:22 2018 -0700
Committer: xiliu <[email protected]>
Committed: Thu Mar 15 16:47:22 2018 -0700

----------------------------------------------------------------------
 .../samza/operators/impl/ControlMessageSender.java | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9f868213/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
index 3bdc361..4afca92 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.operators.impl;
 
+import org.apache.samza.SamzaException;
 import org.apache.samza.system.ControlMessage;
 import org.apache.samza.system.MessageType;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -29,12 +30,16 @@ import org.apache.samza.task.MessageCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 
 /**
  * This is a helper class to broadcast control messages to each partition of 
an intermediate stream
  */
 class ControlMessageSender {
   private static final Logger LOG = 
LoggerFactory.getLogger(ControlMessageSender.class);
+  private static final Map<SystemStream, Integer> PARTITION_COUNT_CACHE = new 
ConcurrentHashMap<>();
 
   private final StreamMetadataCache metadataCache;
 
@@ -43,9 +48,15 @@ class ControlMessageSender {
   }
 
   void send(ControlMessage message, SystemStream systemStream, 
MessageCollector collector) {
-    SystemStreamMetadata metadata = 
metadataCache.getSystemStreamMetadata(systemStream, true);
-    int partitionCount = metadata.getSystemStreamPartitionMetadata().size();
-    LOG.info(String.format("Broadcast %s message from task %s to %s with %s 
partition",
+    Integer partitionCount = 
PARTITION_COUNT_CACHE.computeIfAbsent(systemStream, ss -> {
+        SystemStreamMetadata metadata = 
metadataCache.getSystemStreamMetadata(ss, true);
+        if (metadata == null) {
+          throw new SamzaException("Unable to find metadata for stream " + 
systemStream);
+        }
+        return metadata.getSystemStreamPartitionMetadata().size();
+      });
+
+    LOG.debug(String.format("Broadcast %s message from task %s to %s with %s 
partition",
         MessageType.of(message).name(), message.getTaskName(), systemStream, 
partitionCount));
 
     for (int i = 0; i < partitionCount; i++) {

Reply via email to