SAMZA-1434: Fix issues found in Hadoop

Fix the following bugs found when running Samza on hadoop:

1. Hdfs allows output partitions to be 0 (empty folder)
2. Add null check for the changelog topic generation
3. Call getStreamSpec() instead of using streamSpec member in StreamEdge. This 
is due to getStreamSpec will do more transformation.
4. Bound the auto-generated intermediate topic partition by a certain count 
(256).

Author: Xinyu Liu <[email protected]>

Reviewers: Jagadish Venkatraman <[email protected]>

Closes #307 from xinyuiscool/SAMZA-1434


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a1f01444
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a1f01444
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a1f01444

Branch: refs/heads/master
Commit: a1f01444ec12f49684213cc69b1cce16ff0f8232
Parents: 2819cbc
Author: Xinyu Liu <[email protected]>
Authored: Fri Sep 29 15:05:55 2017 -0700
Committer: Xinyu Liu <[email protected]>
Committed: Fri Sep 29 15:05:55 2017 -0700

----------------------------------------------------------------------
 samza-api/src/main/java/org/apache/samza/system/StreamSpec.java | 5 +++--
 .../main/java/org/apache/samza/config/JavaStorageConfig.java    | 4 +++-
 .../main/java/org/apache/samza/execution/ExecutionPlanner.java  | 5 ++++-
 .../src/main/java/org/apache/samza/execution/StreamEdge.java    | 3 ++-
 4 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java 
b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 8d7401a..6ea1a22 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -158,8 +158,9 @@ public class StreamSpec {
     validateLogicalIdentifier("streamId", id);
     validateLogicalIdentifier("systemName", systemName);
 
-    if (partitionCount < 1) {
-      throw new IllegalArgumentException("Parameter 'partitionCount' must be 
greater than 0");
+    // partition count being 0 is a valid use case in Hadoop when the output 
stream is an empty folder
+    if (partitionCount < 0) {
+      throw new IllegalArgumentException("Parameter 'partitionCount' must be 
>= 0");
     }
 
     this.id = id;

http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
index 4e9a58a..34e5683 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JavaStorageConfig.java
@@ -73,7 +73,9 @@ public class JavaStorageConfig extends MapConfig {
       systemStreamRes = systemStream;
     }
 
-    systemStreamRes = StreamManager.createUniqueNameForBatch(systemStreamRes, 
this);
+    if (systemStreamRes != null) {
+      systemStreamRes = 
StreamManager.createUniqueNameForBatch(systemStreamRes, this);
+    }
     return systemStreamRes;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index e258d13..998ea1e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
 public class ExecutionPlanner {
   private static final Logger log = 
LoggerFactory.getLogger(ExecutionPlanner.class);
 
+  private static final int MAX_INFERRED_PARTITIONS = 256;
+
   private final Config config;
   private final StreamManager streamManager;
 
@@ -253,9 +255,10 @@ public class ExecutionPlanner {
     if (partitions < 0) {
       // use the following simple algo to figure out the partitions
       // partition = MAX(MAX(Input topic partitions), MAX(Output topic 
partitions))
+      // partition will be further bounded by MAX_INFERRED_PARTITIONS. This is 
important when running in hadoop.
       int maxInPartitions = maxPartition(jobGraph.getSources());
       int maxOutPartitions = maxPartition(jobGraph.getSinks());
-      partitions = Math.max(maxInPartitions, maxOutPartitions);
+      partitions = Math.min(Math.max(maxInPartitions, maxOutPartitions), 
MAX_INFERRED_PARTITIONS);
     }
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {

http://git-wip-us.apache.org/repos/asf/samza/blob/a1f01444/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index f545490..792fde5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -82,7 +82,8 @@ public class StreamEdge {
   }
 
   SystemStream getSystemStream() {
-    return new SystemStream(streamSpec.getSystemName(), 
streamSpec.getPhysicalName());
+    StreamSpec spec = getStreamSpec();
+    return new SystemStream(spec.getSystemName(), spec.getPhysicalName());
   }
 
   String getFormattedSystemStream() {

Reply via email to