Repository: samza
Updated Branches:
  refs/heads/master 42bac30ad -> 608e4e0b7


SAMZA-1534: Fix the visualization in job graph with the new PartitionBy Op

Seems the stream and the partitionBy op has the same id. So in rendering I 
added the stream as the id for the node. Also resolved the run.id collision 
issue.

Author: xiliu <[email protected]>

Reviewers: Jagadish V <[email protected]>

Closes #385 from xinyuiscool/SAMZA-1534


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/608e4e0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/608e4e0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/608e4e0b

Branch: refs/heads/master
Commit: 608e4e0b7635aa00a9f1e01647de8d978051d2cf
Parents: 42bac30
Author: Xinyu Liu <[email protected]>
Authored: Tue Dec 12 12:16:39 2017 -0800
Committer: xiliu <[email protected]>
Committed: Tue Dec 12 12:16:39 2017 -0800

----------------------------------------------------------------------
 .../samza/execution/JobGraphJsonGenerator.java  |  6 +-
 .../samza/runtime/RemoteApplicationRunner.java  |  7 +-
 .../execution/TestJobGraphJsonGenerator.java    | 75 ++++++++++++++++++++
 .../src/main/visualizer/js/planToDagre.js       |  6 +-
 4 files changed, 87 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/608e4e0b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 2729fa3..2be01af 100644
--- 
a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -28,19 +28,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.table.TableSpec;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
-
 /**
  * This class generates the JSON representation of the {@link JobGraph}.
  */
@@ -220,6 +219,9 @@ import org.codehaus.jackson.map.ObjectMapper;
     if (spec instanceof OutputOperatorSpec) {
       OutputStreamImpl outputStream = ((OutputOperatorSpec) 
spec).getOutputStream();
       map.put("outputStreamId", outputStream.getStreamSpec().getId());
+    } else if (spec instanceof PartitionByOperatorSpec) {
+      OutputStreamImpl outputStream = ((PartitionByOperatorSpec) 
spec).getOutputStream();
+      map.put("outputStreamId", outputStream.getStreamSpec().getId());
     }
 
     if (spec instanceof StreamTableJoinOperatorSpec) {

http://git-wip-us.apache.org/repos/asf/samza/blob/608e4e0b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index 3e046af..1ead841 100644
--- 
a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ 
b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -33,6 +33,8 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.UUID;
+
 
 /**
  * This class implements the {@link ApplicationRunner} that runs the 
applications in a remote cluster
@@ -57,8 +59,9 @@ public class RemoteApplicationRunner extends 
AbstractApplicationRunner {
   @Override
   public void run(StreamApplication app) {
     try {
-      // TODO: this is a tmp solution and the run.id generation will be 
addressed in another JIRA
-      String runId = String.valueOf(System.currentTimeMillis());
+      // TODO: run.id needs to be set for standalone: SAMZA-1531
+      // run.id is based on current system time with the most significant bits 
in UUID (8 digits) to avoid collision
+      String runId = String.valueOf(System.currentTimeMillis()) + "-" + 
UUID.randomUUID().toString().substring(0, 8);
       LOG.info("The run id for this run is {}", runId);
 
       // 1. initialize and plan

http://git-wip-us.apache.org/repos/asf/samza/blob/608e4e0b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index ba5c922..b48c82d 100644
--- 
a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -27,10 +27,13 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.LongSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -48,6 +51,12 @@ import static org.mockito.Mockito.when;
 
 public class TestJobGraphJsonGenerator {
 
+  public class PageViewEvent {
+    String getCountry() {
+      return "";
+    }
+  }
+
   @Test
   public void test() throws Exception {
 
@@ -147,4 +156,70 @@ public class TestJobGraphJsonGenerator {
     assertEquals(2, nodes.sinkStreams.size());
     assertEquals(2, nodes.intermediateStreams.size());
   }
+
+  @Test
+  public void test2() throws Exception {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-app");
+    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    Config config = new MapConfig(configMap);
+
+    StreamSpec input = new StreamSpec("PageView", 
"hdfs:/user/dummy/PageViewEvent", "hdfs");
+    StreamSpec output = new StreamSpec("PageViewCount", "PageViewCount", 
"kafka");
+
+    ApplicationRunner runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("PageView")).thenReturn(input);
+    when(runner.getStreamSpec("PageViewCount")).thenReturn(output);
+
+    // intermediate streams used in tests
+    when(runner.getStreamSpec("test-app-1-partition_by-keyed-by-country"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-keyed-by-country", 
"test-app-1-partition_by-keyed-by-country", "kafka"));
+
+    // set up external partition count
+    Map<String, Integer> system1Map = new HashMap<>();
+    system1Map.put("hdfs:/user/dummy/PageViewEvent", 512);
+    Map<String, Integer> system2Map = new HashMap<>();
+    system2Map.put("PageViewCount", 16);
+
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
+    SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
+    systemAdmins.put("hdfs", systemAdmin1);
+    systemAdmins.put("kafka", systemAdmin2);
+    StreamManager streamManager = new StreamManager(systemAdmins);
+
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    MessageStream<KV<String, PageViewEvent>> inputStream = 
streamGraph.getInputStream("PageView");
+    inputStream
+        .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), 
"keyed-by-country")
+        .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(),
+            Duration.ofSeconds(10L),
+            () -> 0L,
+            (m, c) -> c + 1L,
+            new StringSerde(),
+            new LongSerde()), "count-by-country")
+        .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
+        .sendTo(streamGraph.getOutputStream("PageViewCount"));
+
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    ExecutionPlan plan = planner.plan(streamGraph);
+    String json = plan.getPlanAsJson();
+    System.out.println(json);
+
+    // deserialize
+    ObjectMapper mapper = new ObjectMapper();
+    JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, 
JobGraphJsonGenerator.JobGraphJson.class);
+    JobGraphJsonGenerator.OperatorGraphJson operatorGraphJson = 
nodes.jobs.get(0).operatorGraph;
+    assertEquals(2, operatorGraphJson.inputStreams.size());
+    assertEquals(4, operatorGraphJson.operators.size());
+    assertEquals(1, nodes.sourceStreams.size());
+    assertEquals(1, nodes.sinkStreams.size());
+    assertEquals(1, nodes.intermediateStreams.size());
+
+    // verify partitionBy op output to the intermdiate stream of the same id
+    
assertEquals(operatorGraphJson.operators.get("test-app-1-partition_by-keyed-by-country").get("outputStreamId"),
+        "test-app-1-partition_by-keyed-by-country");
+    
assertEquals(operatorGraphJson.operators.get("test-app-1-send_to-5").get("outputStreamId"),
+        "PageViewCount");
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/608e4e0b/samza-shell/src/main/visualizer/js/planToDagre.js
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/visualizer/js/planToDagre.js 
b/samza-shell/src/main/visualizer/js/planToDagre.js
index 0421c33..77adaa2 100644
--- a/samza-shell/src/main/visualizer/js/planToDagre.js
+++ b/samza-shell/src/main/visualizer/js/planToDagre.js
@@ -36,7 +36,7 @@ function planToDagre(data) {
       labelVal += "<li>PhysicalName: " + stream.streamSpec.physicalName + 
"</li>"
       labelVal += "<li>PartitionCount: " + stream.streamSpec.partitionCount + 
"</li>"
       labelVal += "</ul></div>"
-      g.setNode(streamId,  { label: labelVal, labelType: "html", shape: 
"ellipse", class: streamClasses[i] });
+      g.setNode(streamId + "-stream",  { label: labelVal, labelType: "html", 
shape: "ellipse", class: streamClasses[i] });
     }
   }
 
@@ -67,7 +67,7 @@ function planToDagre(data) {
     for (var k = 0; k < inputs.length; k++) {
       var input = inputs[k];
       for (var m = 0; m < input.nextOperatorIds.length; m++) {
-        g.setEdge(input.streamId, input.nextOperatorIds[m]);
+        g.setEdge(input.streamId + "-stream", input.nextOperatorIds[m]);
       }
     }
 
@@ -78,7 +78,7 @@ function planToDagre(data) {
         g.setEdge(opId, operator.nextOperatorIds[j]);
       }
       if (typeof(operator.outputStreamId) !== 'undefined') {
-        g.setEdge(opId, operator.outputStreamId);
+        g.setEdge(opId, operator.outputStreamId + "-stream");
       }
     }
   }

Reply via email to