Repository: samza
Updated Branches:
  refs/heads/master 5154f9eb8 -> 2aa18b825


SAMZA-2013: Account for cycles in graph traversal within Execution Planner

Author: Ahmed Abdul Hamid <[email protected]>

Reviewers: Aditya Toomla <[email protected]>

Closes #832 from ahmedahamid/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2aa18b82
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2aa18b82
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2aa18b82

Branch: refs/heads/master
Commit: 2aa18b82570e9457f1e36b6fc064eadadfb2304d
Parents: 5154f9e
Author: Ahmed Abdul Hamid <[email protected]>
Authored: Thu Nov 29 19:23:46 2018 -0800
Committer: Prateek Maheshwari <[email protected]>
Committed: Thu Nov 29 19:23:46 2018 -0800

----------------------------------------------------------------------
 .../execution/OperatorSpecGraphAnalyzer.java    | 13 +++++++-
 .../samza/execution/TestExecutionPlanner.java   | 33 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2aa18b82/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
 
b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
index 70a5848..b3ecfeb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java
@@ -86,7 +86,18 @@ import 
org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
       Function<OperatorSpec, Iterable<OperatorSpec>> getNextOpSpecs) {
     visitor.accept(startOpSpec);
     for (OperatorSpec nextOpSpec : getNextOpSpecs.apply(startOpSpec)) {
-      traverse(nextOpSpec, visitor, getNextOpSpecs);
+      traverseHelper(startOpSpec, visitor, getNextOpSpecs, new HashSet<>());
+    }
+  }
+
+  private static void traverseHelper(OperatorSpec startOpSpec, 
Consumer<OperatorSpec> visitor,
+      Function<OperatorSpec, Iterable<OperatorSpec>> getNextOpSpecs, 
Set<OperatorSpec> visitedOpSpecs) {
+    visitor.accept(startOpSpec);
+    for (OperatorSpec nextOpSpec : getNextOpSpecs.apply(startOpSpec)) {
+      // Make sure we do not end up endlessly traversing cycles.
+      if (visitedOpSpecs.add(nextOpSpec)) {
+        traverseHelper(nextOpSpec, visitor, getNextOpSpecs, visitedOpSpecs);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2aa18b82/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 7908764..25d5071 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -420,6 +420,30 @@ public class TestExecutionPlanner {
       }, config);
   }
 
+  private StreamApplicationDescriptorImpl 
createStreamGraphWithStreamTableJoinAndSendToSameTable() {
+    /**
+     * A special example of stream-table join where a stream is joined with a 
table, and the result is
+     * sent to the same table. This example is necessary to ensure {@link 
ExecutionPlanner} does not
+     * get stuck traversing the virtual cycle between stream-table-join and 
send-to-table operator specs
+     * indefinitely.
+     *
+     * The reason such virtual cycle is present is to support computing 
partitions of intermediate
+     * streams participating in stream-table joins. Please, refer to SAMZA 
SEP-16 for more details.
+     */
+    return new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<KV<Object, Object>> messageStream1 = 
appDesc.getInputStream(input1Descriptor);
+
+        TableDescriptor tableDescriptor = new 
TestLocalTableDescriptor.MockLocalTableDescriptor(
+          "table-id", new KVSerde(new StringSerde(), new StringSerde()));
+        Table table = appDesc.getTable(tableDescriptor);
+
+        messageStream1
+          .join(table, mock(StreamTableJoinFunction.class))
+          .sendTo(table);
+
+      }, config);
+  }
+
   @Before
   public void setup() {
     Map<String, String> configMap = new HashMap<>();
@@ -587,6 +611,15 @@ public class TestExecutionPlanner {
   }
 
   @Test
+  public void testHandlesVirtualStreamTableJoinCycles() {
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    StreamApplicationDescriptorImpl graphSpec = 
createStreamGraphWithStreamTableJoinAndSendToSameTable();
+
+    // Just make sure planning terminates.
+    planner.plan(graphSpec);
+  }
+
+  @Test
   public void testDefaultPartitions() {
     Map<String, String> map = new HashMap<>(config);
     map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), 
String.valueOf(DEFAULT_PARTITIONS));

Reply via email to