Repository: samza
Updated Branches:
  refs/heads/master 553ce33b1 -> 6f1b3db2c


SAMZA-1172: Fix for the topological sort to handle single-node loop

In the processor graph, the topological sort missed adding to the visited set 
during graph traversal. This caused wrong graph being generated for single-node 
loop. This is fixed in the patch.

Also fixed the maxPartition method not handling empty collection correctly.

Added a few new unit tests for these. Also adjust the timing of previous async 
commit unit tests so it can run more reliably. Long term wise we need to fix 
the timer inside the AsyncRunLoop tests.

Author: Xinyu Liu <[email protected]>

Reviewers: Jacob Maes <[email protected]>

Closes #100 from xinyuiscool/SAMZA-1172


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6f1b3db2
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6f1b3db2
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6f1b3db2

Branch: refs/heads/master
Commit: 6f1b3db2c4a1a2ef22ec023c6b3ed9f54766ae3e
Parents: 553ce33
Author: Xinyu Liu <[email protected]>
Authored: Wed Mar 29 10:49:25 2017 -0700
Committer: Xinyu Liu <[email protected]>
Committed: Wed Mar 29 10:49:25 2017 -0700

----------------------------------------------------------------------
 .../samza/execution/ExecutionPlanner.java       |  4 +--
 .../apache/samza/execution/ProcessorGraph.java  |  6 ++++
 .../samza/execution/TestExecutionPlanner.java   | 23 ++++++++++++++
 .../samza/execution/TestProcessorGraph.java     | 32 ++++++++++++++++++++
 .../org/apache/samza/task/TestAsyncRunLoop.java |  6 ++--
 5 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 77790a8..ca2e71e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -302,8 +302,8 @@ public class ExecutionPlanner {
     }
   }
 
-  private static int maxPartition(Collection<StreamEdge> edges) {
-    return 
edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).get();
+  /* package private */ static int maxPartition(Collection<StreamEdge> edges) {
+    return 
edges.stream().map(StreamEdge::getPartitionCount).reduce(Integer::max).orElse(StreamEdge.PARTITIONS_UNKNOWN);
   }
 
   private static StreamSpec createStreamSpec(StreamEdge edge) {

http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java 
b/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
index d94a9eb..13755ae 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ProcessorGraph.java
@@ -276,6 +276,10 @@ public class ProcessorGraph {
    */
   /* package private */ List<ProcessorNode> topologicalSort() {
     Collection<ProcessorNode> pnodes = nodes.values();
+    if (pnodes.size() == 1) {
+      return new ArrayList<>(pnodes);
+    }
+
     Queue<ProcessorNode> q = new ArrayDeque<>();
     Map<String, Long> indegree = new HashMap<>();
     Set<ProcessorNode> visited = new HashSet<>();
@@ -337,6 +341,7 @@ public class ProcessorGraph {
           }
           // start from the node with minimal input edge again
           q.add(minNode);
+          visited.add(minNode);
         } else {
           // all the remaining nodes should be reachable from sources
           // start from sources again to find the next node that hasn't been 
visited
@@ -344,6 +349,7 @@ public class ProcessorGraph {
               .filter(node -> !visited.contains(node))
               .findAny().get();
           q.add(nextNode);
+          visited.add(nextNode);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index fa02e04..b69eec6 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -20,6 +20,9 @@
 package org.apache.samza.execution;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +47,7 @@ import org.apache.samza.task.TaskCoordinator;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -279,4 +283,23 @@ public class TestExecutionPlanner {
         assertTrue(edge.getPartitionCount() == 64); // max of input1 and 
output1
       });
   }
+
+  @Test
+  public void testMaxPartition() {
+    Collection<StreamEdge> edges = new ArrayList<>();
+    StreamEdge edge = new StreamEdge(input1);
+    edge.setPartitionCount(2);
+    edges.add(edge);
+    edge = new StreamEdge(input2);
+    edge.setPartitionCount(32);
+    edges.add(edge);
+    edge = new StreamEdge(input3);
+    edge.setPartitionCount(16);
+    edges.add(edge);
+
+    assertEquals(ExecutionPlanner.maxPartition(edges), 32);
+
+    edges = Collections.emptyList();
+    assertEquals(ExecutionPlanner.maxPartition(edges), 
StreamEdge.PARTITIONS_UNKNOWN);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
index 2bdf529..2f89d91 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestProcessorGraph.java
@@ -27,6 +27,7 @@ import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
@@ -34,6 +35,8 @@ public class TestProcessorGraph {
 
   ProcessorGraph graph1;
   ProcessorGraph graph2;
+  ProcessorGraph graph3;
+  ProcessorGraph graph4;
   int streamSeq = 0;
 
   private StreamSpec genStream() {
@@ -88,6 +91,24 @@ public class TestProcessorGraph {
     graph2.addIntermediateStream(genStream(), "5", "5");
     graph2.addIntermediateStream(genStream(), "5", "7");
     graph2.addSink(genStream(), "7");
+
+    /**
+     * graph3 is a graph with self loops
+     * 1<->1 -> 2<->2
+     */
+    graph3 = new ProcessorGraph(null);
+    graph3.addSource(genStream(), "1");
+    graph3.addIntermediateStream(genStream(), "1", "1");
+    graph3.addIntermediateStream(genStream(), "1", "2");
+    graph3.addIntermediateStream(genStream(), "2", "2");
+
+    /**
+     * graph4 is a graph of single-loop node
+     * 1<->1
+     */
+    graph4 = new ProcessorGraph(null);
+    graph4.addSource(genStream(), "1");
+    graph4.addIntermediateStream(genStream(), "1", "1");
   }
 
   @Test
@@ -194,5 +215,16 @@ public class TestProcessorGraph {
     assertTrue(idxMap2.get("6") > idxMap2.get("1"));
     assertTrue(idxMap2.get("5") > idxMap2.get("4"));
     assertTrue(idxMap2.get("7") > idxMap2.get("5"));
+
+    //test graph3
+    List<ProcessorNode> sortedNodes3 = graph3.topologicalSort();
+    assertTrue(sortedNodes3.size() == 2);
+    assertEquals(sortedNodes3.get(0).getId(), "1");
+    assertEquals(sortedNodes3.get(1).getId(), "2");
+
+    //test graph4
+    List<ProcessorNode> sortedNodes4 = graph4.topologicalSort();
+    assertTrue(sortedNodes4.size() == 1);
+    assertEquals(sortedNodes4.get(0).getId(), "1");
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6f1b3db2/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 31cbe79..60dcd26 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -47,7 +47,6 @@ import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import scala.Option;
 import scala.collection.JavaConversions;
@@ -575,7 +574,7 @@ public class TestAsyncRunLoop {
       });
 
     runLoop.run();
-    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    callbackExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
 
     verify(offsetManager, atLeastOnce()).checkpoint(taskName0);
     assertEquals(3, task0.processed);
@@ -585,7 +584,6 @@ public class TestAsyncRunLoop {
   }
 
   @Test
-  @Ignore
   public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws 
InterruptedException {
     TestTask task0 = new TestTask(true, true, false);
 
@@ -631,6 +629,6 @@ public class TestAsyncRunLoop {
 
     runLoop.run();
 
-    callbackExecutor.awaitTermination(100, TimeUnit.MILLISECONDS);
+    callbackExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
   }
 }

Reply via email to