Repository: samza Updated Branches: refs/heads/master b668b5bea -> 1a7e27097
SAMZA-2019: for 1 partition broadcast topic generate topic#0 config + address few review comments Author: Boris S <[email protected]> Author: Boris S <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: xiliu <[email protected]> Closes #846 from sborya/isBroadcast1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1a7e2709 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1a7e2709 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1a7e2709 Branch: refs/heads/master Commit: 1a7e27097c7509863437e96b1c392e3886ca67ab Parents: b668b5b Author: Boris S <[email protected]> Authored: Wed Dec 5 14:13:50 2018 -0800 Committer: Boris S <[email protected]> Committed: Wed Dec 5 14:13:50 2018 -0800 ---------------------------------------------------------------------- .../JobNodeConfigurationGenerator.java | 6 +++++- .../org/apache/samza/execution/StreamEdge.java | 20 ++++++++++---------- 2 files changed, 15 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1a7e2709/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java index a762dec..761fb05 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java @@ -105,7 +105,11 @@ import org.slf4j.LoggerFactory; for (StreamEdge inEdge : inEdges.values()) { String formattedSystemStream = inEdge.getName(); if (inEdge.isBroadcast()) { - broadcastInputs.add(formattedSystemStream + "#[0-" + (inEdge.getPartitionCount() - 1) + "]"); + if (inEdge.getPartitionCount() > 1) { + broadcastInputs.add(formattedSystemStream + "#[0-" + (inEdge.getPartitionCount() - 1) + "]"); + } else { + broadcastInputs.add(formattedSystemStream + "#0"); + } } else { inputs.add(formattedSystemStream); } http://git-wip-us.apache.org/repos/asf/samza/blob/1a7e2709/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java index ffced0f..051abcf 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java @@ -55,7 +55,7 @@ public class StreamEdge { this.isIntermediate = isIntermediate; // broadcast can be configured either by an operator or via the configs this.isBroadcast = - isBroadcast || (config == null) ? false : new StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream()); + isBroadcast || new StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream()); this.config = config; if (isBroadcast && isIntermediate) { partitions = 1; @@ -113,21 +113,21 @@ public class StreamEdge { } Config generateConfig() { - Map<String, String> newConfig = new HashMap<>(); + Map<String, String> streamConfig = new HashMap<>(); StreamSpec spec = getStreamSpec(); - newConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName()); - newConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName()); + streamConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName()); + streamConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName()); if (isIntermediate()) { - newConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true"); - newConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true"); - newConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest"); - newConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE)); + streamConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true"); + streamConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true"); + streamConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest"); + streamConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE)); } spec.getConfig().forEach((property, value) -> { - newConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value); + streamConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value); }); - return new MapConfig(newConfig); + return new MapConfig(streamConfig); } public boolean isBroadcast() {
