Repository: samza
Updated Branches:
  refs/heads/master b668b5bea -> 1a7e27097


SAMZA-2019: for 1 partition broadcast topic generate topic#0 config

+ address few review comments

Author: Boris S <[email protected]>
Author: Boris S <[email protected]>
Author: Boris Shkolnik <[email protected]>

Reviewers: xiliu <[email protected]>

Closes #846 from sborya/isBroadcast1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1a7e2709
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1a7e2709
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1a7e2709

Branch: refs/heads/master
Commit: 1a7e27097c7509863437e96b1c392e3886ca67ab
Parents: b668b5b
Author: Boris S <[email protected]>
Authored: Wed Dec 5 14:13:50 2018 -0800
Committer: Boris S <[email protected]>
Committed: Wed Dec 5 14:13:50 2018 -0800

----------------------------------------------------------------------
 .../JobNodeConfigurationGenerator.java          |  6 +++++-
 .../org/apache/samza/execution/StreamEdge.java  | 20 ++++++++++----------
 2 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1a7e2709/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index a762dec..761fb05 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -105,7 +105,11 @@ import org.slf4j.LoggerFactory;
     for (StreamEdge inEdge : inEdges.values()) {
       String formattedSystemStream = inEdge.getName();
       if (inEdge.isBroadcast()) {
-        broadcastInputs.add(formattedSystemStream + "#[0-" + 
(inEdge.getPartitionCount() - 1) + "]");
+        if (inEdge.getPartitionCount() > 1) {
+          broadcastInputs.add(formattedSystemStream + "#[0-" + 
(inEdge.getPartitionCount() - 1) + "]");
+        } else {
+          broadcastInputs.add(formattedSystemStream + "#0");
+        }
       } else {
         inputs.add(formattedSystemStream);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/1a7e2709/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index ffced0f..051abcf 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -55,7 +55,7 @@ public class StreamEdge {
     this.isIntermediate = isIntermediate;
     // broadcast can be configured either by an operator or via the configs
     this.isBroadcast =
-          isBroadcast || (config == null) ? false : new 
StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream());
+          isBroadcast || new 
StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream());
     this.config = config;
     if (isBroadcast && isIntermediate) {
       partitions = 1;
@@ -113,21 +113,21 @@ public class StreamEdge {
   }
 
   Config generateConfig() {
-    Map<String, String> newConfig = new HashMap<>();
+    Map<String, String> streamConfig = new HashMap<>();
     StreamSpec spec = getStreamSpec();
-    newConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
spec.getId()), spec.getSystemName());
-    newConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
spec.getId()), spec.getPhysicalName());
+    streamConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
spec.getId()), spec.getSystemName());
+    streamConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
spec.getId()), spec.getPhysicalName());
     if (isIntermediate()) {
-      
newConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
spec.getId()), "true");
-      
newConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
 spec.getId()), "true");
-      
newConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(),
 spec.getId()), "oldest");
-      newConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), 
spec.getId()), String.valueOf(Integer.MAX_VALUE));
+      
streamConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
spec.getId()), "true");
+      
streamConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(),
 spec.getId()), "true");
+      
streamConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(),
 spec.getId()), "oldest");
+      streamConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), 
spec.getId()), String.valueOf(Integer.MAX_VALUE));
     }
     spec.getConfig().forEach((property, value) -> {
-        newConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), 
spec.getId()) + property, value);
+        streamConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), 
spec.getId()) + property, value);
       });
 
-    return new MapConfig(newConfig);
+    return new MapConfig(streamConfig);
   }
 
   public boolean isBroadcast() {

Reply via email to