Repository: samza Updated Branches: refs/heads/master 553ce33b1 -> 6f1b3db2c
SAMZA-1172: Fix for the topological sort to handle single-node loop In the processor graph, the topological sort missed adding to the visited set during graph traversal. This caused wrong graph being generated for single-node loop. This is fixed in the patch. Also fixed the maxPartition method not handling empty collection correctly. Added a few new unit tests for these. Also adjust the timing of previous async commit unit tests so it can run more reliably. Long term wise we need to fix the timer inside the AsyncRunLoop tests. Author: Xinyu Liu <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #100 from xinyuiscool/SAMZA-1172 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6f1b3db2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6f1b3db2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6f1b3db2 Branch: refs/heads/master Commit: 6f1b3db2c4a1a2ef22ec023c6b3ed9f54766ae3e Parents: 553ce33 Author: Xinyu Liu <[email protected]> Authored: Wed Mar 29 10:49:25 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Wed Mar 29 10:49:25 2017 -0700 ---------------------------------------------------------------------- .../samza/execution/ExecutionPlanner.java | 4 +-- .../apache/samza/execution/ProcessorGraph.java | 6 ++++ .../samza/execution/TestExecutionPlanner.java | 23 ++++++++++++++ .../samza/execution/TestProcessorGraph.java | 32 ++++++++++++++++++++ .../org/apache/samza/task/TestAsyncRunLoop.java | 6 ++-- 5 files changed, 65 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index 77790a8..ca2e71e 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -302,8 +302,8 @@ public class ExecutionPlanner { } } - private static int maxPartition(Collection<StreamEdge> edges) { - return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).get(); + /* package private */ static int maxPartition(Collection<StreamEdge> edges) { + return edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN); } private static StreamSpec createStreamSpec(StreamEdge edge) { http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java index d94a9eb..13755ae 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java @@ -276,6 +276,10 @@ public class ProcessorGraph { */ /* package private */ List<ProcessorNode> topologicalSort() { Collection<ProcessorNode> pnodes = nodes.values(); + if (pnodes.size() == 1) { + return new ArrayList<>(pnodes); + } + Queue<ProcessorNode> q = new ArrayDeque<>(); Map<String, Long> indegree = new HashMap<>(); Set<ProcessorNode> visited = new HashSet<>(); @@ -337,6 +341,7 @@ public class ProcessorGraph { } // start from the node with minimal input edge again q.add(minNode); + visited.add(minNode); } else { // all the remaining nodes should be reachable from sources // start from sources again to find the next node that hasn't been visited @@ -344,6 +349,7 @@ public class ProcessorGraph { .filter(node -> !visited.contains(node)) .findAny().get(); q.add(nextNode); + visited.add(nextNode); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index fa02e04..b69eec6 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -20,6 +20,9 @@ package org.apache.samza.execution; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -44,6 +47,7 @@ import org.apache.samza.task.TaskCoordinator; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -279,4 +283,23 @@ public class TestExecutionPlanner { assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1 }); } + + @Test + public void testMaxPartition() { + Collection<StreamEdge> edges = new ArrayList<>(); + StreamEdge edge = new StreamEdge(input1); + edge.setPartitionCount(2); + edges.add(edge); + edge = new StreamEdge(input2); + edge.setPartitionCount(32); + edges.add(edge); + edge = new StreamEdge(input3); + edge.setPartitionCount(16); + edges.add(edge); + + assertEquals(ExecutionPlanner.maxPartition(edges), 32); + + edges = Collections.emptyList(); + assertEquals(ExecutionPlanner.maxPartition(edges), StreamEdge.PARTITIONS_UNKNOWN); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java index 2bdf529..2f89d91 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java @@ -27,6 +27,7 @@ import org.apache.samza.system.StreamSpec; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -34,6 +35,8 @@ public class TestProcessorGraph { ProcessorGraph graph1; ProcessorGraph graph2; + ProcessorGraph graph3; + ProcessorGraph graph4; int streamSeq = 0; private StreamSpec genStream() { @@ -88,6 +91,24 @@ public class TestProcessorGraph { graph2.addIntermediateStream(genStream(), "5", "5"); graph2.addIntermediateStream(genStream(), "5", "7"); graph2.addSink(genStream(), "7"); + + /** + * graph3 is a graph with self loops + * 1<->1 -> 2<->2 + */ + graph3 = new ProcessorGraph(null); + graph3.addSource(genStream(), "1"); + graph3.addIntermediateStream(genStream(), "1", "1"); + graph3.addIntermediateStream(genStream(), "1", "2"); + graph3.addIntermediateStream(genStream(), "2", "2"); + + /** + * graph4 is a graph of single-loop node + * 1<->1 + */ + graph4 = new ProcessorGraph(null); + graph4.addSource(genStream(), "1"); + graph4.addIntermediateStream(genStream(), "1", "1"); } @Test @@ -194,5 +215,16 @@ public class TestProcessorGraph { assertTrue(idxMap2.get("6") > idxMap2.get("1")); assertTrue(idxMap2.get("5") > idxMap2.get("4")); assertTrue(idxMap2.get("7") > idxMap2.get("5")); + + //test graph3 + List<ProcessorNode> sortedNodes3 = graph3.topologicalSort(); + assertTrue(sortedNodes3.size() == 2); + assertEquals(sortedNodes3.get(0).getId(), "1"); + assertEquals(sortedNodes3.get(1).getId(), "2"); + + //test graph4 + List<ProcessorNode> sortedNodes4 = graph4.topologicalSort(); + assertTrue(sortedNodes4.size() == 1); + assertEquals(sortedNodes4.get(0).getId(), "1"); } } http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 31cbe79..60dcd26 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -47,7 +47,6 @@ import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.TestSystemConsumers; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import scala.Option; import scala.collection.JavaConversions; @@ -575,7 +574,7 @@ public class TestAsyncRunLoop { }); runLoop.run(); - callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); + callbackExecutor.awaitTermination(500, TimeUnit.MILLISECONDS); verify(offsetManager, atLeastOnce()).checkpoint(taskName0); assertEquals(3, task0.processed); @@ -585,7 +584,6 @@ public class TestAsyncRunLoop { } @Test - @Ignore public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException { TestTask task0 = new TestTask(true, true, false); @@ -631,6 +629,6 @@ public class TestAsyncRunLoop { runLoop.run(); - callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS); + callbackExecutor.awaitTermination(500, TimeUnit.MILLISECONDS); } }
