SAMZA-1434: Fix issues found in Hadoop Fix the following bugs found when running Samza on hadoop:
1. Hdfs allows output partitions to be 0 (empty folder) 2. Add null check for the changelog topic generation 3. Call getStreamSpec() instead of using streamSpec member in StreamEdge. This is due to getStreamSpec will do more transformation. 4. Bound the auto-generated intermediate topic partition by a certain count (256). Author: Xinyu Liu <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]> Closes #307 from xinyuiscool/SAMZA-1434 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a1f01444 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a1f01444 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a1f01444 Branch: refs/heads/master Commit: a1f01444ec12f49684213cc69b1cce16ff0f8232 Parents: 2819cbc Author: Xinyu Liu <[email protected]> Authored: Fri Sep 29 15:05:55 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Fri Sep 29 15:05:55 2017 -0700 ---------------------------------------------------------------------- samza-api/src/main/java/org/apache/samza/system/StreamSpec.java | 5 +++-- .../main/java/org/apache/samza/config/JavaStorageConfig.java | 4 +++- .../main/java/org/apache/samza/execution/ExecutionPlanner.java | 5 ++++- .../src/main/java/org/apache/samza/execution/StreamEdge.java | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java index 8d7401a..6ea1a22 100644 --- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -158,8 +158,9 @@ public class StreamSpec { validateLogicalIdentifier("streamId", id); validateLogicalIdentifier("systemName", systemName); - if (partitionCount < 1) { - throw new IllegalArgumentException("Parameter 'partitionCount' must be greater than 0"); + // partition count being 0 is a valid use case in Hadoop when the output stream is an empty folder + if (partitionCount < 0) { + throw new IllegalArgumentException("Parameter 'partitionCount' must be >= 0"); } this.id = id; http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java index 4e9a58a..34e5683 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java @@ -73,7 +73,9 @@ public class JavaStorageConfig extends MapConfig { systemStreamRes = systemStream; } - systemStreamRes = StreamManager.createUniqueNameForBatch(systemStreamRes, this); + if (systemStreamRes != null) { + systemStreamRes = StreamManager.createUniqueNameForBatch(systemStreamRes, this); + } return systemStreamRes; } http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index e258d13..998ea1e 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory; public class ExecutionPlanner { private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class); + private static final int MAX_INFERRED_PARTITIONS = 256; + private final Config config; private final StreamManager streamManager; @@ -253,9 +255,10 @@ public class ExecutionPlanner { if (partitions < 0) { // use the following simple algo to figure out the partitions // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions)) + // partition will be further bounded by MAX_INFERRED_PARTITIONS. This is important when running in hadoop. int maxInPartitions = maxPartition(jobGraph.getSources()); int maxOutPartitions = maxPartition(jobGraph.getSinks()); - partitions = Math.max(maxInPartitions, maxOutPartitions); + partitions = Math.min(Math.max(maxInPartitions, maxOutPartitions), MAX_INFERRED_PARTITIONS); } for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) { if (edge.getPartitionCount() <= 0) { http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java index f545490..792fde5 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java @@ -82,7 +82,8 @@ public class StreamEdge { } SystemStream getSystemStream() { - return new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName()); + StreamSpec spec = getStreamSpec(); + return new SystemStream(spec.getSystemName(), spec.getPhysicalName()); } String getFormattedSystemStream() {
