Repository: samza Updated Branches: refs/heads/master 5154f9eb8 -> 2aa18b825
SAMZA-2013: Account for cycles in graph traversal within Execution Planner Author: Ahmed Abdul Hamid <[email protected]> Reviewers: Aditya Toomla <[email protected]> Closes #832 from ahmedahamid/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2aa18b82 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2aa18b82 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2aa18b82 Branch: refs/heads/master Commit: 2aa18b82570e9457f1e36b6fc064eadadfb2304d Parents: 5154f9e Author: Ahmed Abdul Hamid <[email protected]> Authored: Thu Nov 29 19:23:46 2018 -0800 Committer: Prateek Maheshwari <[email protected]> Committed: Thu Nov 29 19:23:46 2018 -0800 ---------------------------------------------------------------------- .../execution/OperatorSpecGraphAnalyzer.java | 13 +++++++- .../samza/execution/TestExecutionPlanner.java | 33 ++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2aa18b82/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java index 70a5848..b3ecfeb 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java +++ b/samza-core/src/main/java/org/apache/samza/execution/OperatorSpecGraphAnalyzer.java @@ -86,7 +86,18 @@ import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; Function<OperatorSpec, Iterable<OperatorSpec>> getNextOpSpecs) { visitor.accept(startOpSpec); for (OperatorSpec nextOpSpec : getNextOpSpecs.apply(startOpSpec)) { - traverse(nextOpSpec, visitor, getNextOpSpecs); + traverseHelper(startOpSpec, visitor, getNextOpSpecs, new HashSet<>()); + } + } + + private static void traverseHelper(OperatorSpec startOpSpec, Consumer<OperatorSpec> visitor, + Function<OperatorSpec, Iterable<OperatorSpec>> getNextOpSpecs, Set<OperatorSpec> visitedOpSpecs) { + visitor.accept(startOpSpec); + for (OperatorSpec nextOpSpec : getNextOpSpecs.apply(startOpSpec)) { + // Make sure we do not end up endlessly traversing cycles. + if (visitedOpSpecs.add(nextOpSpec)) { + traverseHelper(nextOpSpec, visitor, getNextOpSpecs, visitedOpSpecs); + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/2aa18b82/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 7908764..25d5071 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -420,6 +420,30 @@ public class TestExecutionPlanner { }, config); } + private StreamApplicationDescriptorImpl createStreamGraphWithStreamTableJoinAndSendToSameTable() { + /** + * A special example of stream-table join where a stream is joined with a table, and the result is + * sent to the same table. This example is necessary to ensure {@link ExecutionPlanner} does not + * get stuck traversing the virtual cycle between stream-table-join and send-to-table operator specs + * indefinitely. + * + * The reason such virtual cycle is present is to support computing partitions of intermediate + * streams participating in stream-table joins. Please, refer to SAMZA SEP-16 for more details. + */ + return new StreamApplicationDescriptorImpl(appDesc -> { + MessageStream<KV<Object, Object>> messageStream1 = appDesc.getInputStream(input1Descriptor); + + TableDescriptor tableDescriptor = new TestLocalTableDescriptor.MockLocalTableDescriptor( + "table-id", new KVSerde(new StringSerde(), new StringSerde())); + Table table = appDesc.getTable(tableDescriptor); + + messageStream1 + .join(table, mock(StreamTableJoinFunction.class)) + .sendTo(table); + + }, config); + } + @Before public void setup() { Map<String, String> configMap = new HashMap<>(); @@ -587,6 +611,15 @@ public class TestExecutionPlanner { } @Test + public void testHandlesVirtualStreamTableJoinCycles() { + ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); + StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithStreamTableJoinAndSendToSameTable(); + + // Just make sure planning terminates. + planner.plan(graphSpec); + } + + @Test public void testDefaultPartitions() { Map<String, String> map = new HashMap<>(config); map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
