Repository: samza Updated Branches: refs/heads/master 42bac30ad -> 608e4e0b7
SAMZA-1534: Fix the visualization in job graph with the new PartitionBy Op Seems the stream and the partitionBy op has the same id. So in rendering I added the stream as the id for the node. Also resolved the run.id collision issue. Author: xiliu <[email protected]> Reviewers: Jagadish V <[email protected]> Closes #385 from xinyuiscool/SAMZA-1534 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/608e4e0b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/608e4e0b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/608e4e0b Branch: refs/heads/master Commit: 608e4e0b7635aa00a9f1e01647de8d978051d2cf Parents: 42bac30 Author: Xinyu Liu <[email protected]> Authored: Tue Dec 12 12:16:39 2017 -0800 Committer: xiliu <[email protected]> Committed: Tue Dec 12 12:16:39 2017 -0800 ---------------------------------------------------------------------- .../samza/execution/JobGraphJsonGenerator.java | 6 +- .../samza/runtime/RemoteApplicationRunner.java | 7 +- .../execution/TestJobGraphJsonGenerator.java | 75 ++++++++++++++++++++ .../src/main/visualizer/js/planToDagre.js | 6 +- 4 files changed, 87 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/608e4e0b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java index 2729fa3..2be01af 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java @@ -28,19 +28,18 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; - import org.apache.samza.config.ApplicationConfig; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.operators.spec.PartitionByOperatorSpec; import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec; import org.apache.samza.table.TableSpec; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; - /** * This class generates the JSON representation of the {@link JobGraph}. */ @@ -220,6 +219,9 @@ import org.codehaus.jackson.map.ObjectMapper; if (spec instanceof OutputOperatorSpec) { OutputStreamImpl outputStream = ((OutputOperatorSpec) spec).getOutputStream(); map.put("outputStreamId", outputStream.getStreamSpec().getId()); + } else if (spec instanceof PartitionByOperatorSpec) { + OutputStreamImpl outputStream = ((PartitionByOperatorSpec) spec).getOutputStream(); + map.put("outputStreamId", outputStream.getStreamSpec().getId()); } if (spec instanceof StreamTableJoinOperatorSpec) { http://git-wip-us.apache.org/repos/asf/samza/blob/608e4e0b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 3e046af..1ead841 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -33,6 +33,8 @@ import org.apache.samza.metrics.MetricsRegistryMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.UUID; + /** * This class implements the {@link ApplicationRunner} that runs the applications in a remote cluster @@ -57,8 +59,9 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { @Override public void run(StreamApplication app) { try { - // TODO: this is a tmp solution and the run.id generation will be addressed in another JIRA - String runId = String.valueOf(System.currentTimeMillis()); + // TODO: run.id needs to be set for standalone: SAMZA-1531 + // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision + String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8); LOG.info("The run id for this run is {}", runId); // 1. initialize and plan http://git-wip-us.apache.org/repos/asf/samza/blob/608e4e0b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index ba5c922..b48c82d 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -27,10 +27,13 @@ import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.LongSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; +import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.codehaus.jackson.map.ObjectMapper; @@ -48,6 +51,12 @@ import static org.mockito.Mockito.when; public class TestJobGraphJsonGenerator { + public class PageViewEvent { + String getCountry() { + return ""; + } + } + @Test public void test() throws Exception { @@ -147,4 +156,70 @@ public class TestJobGraphJsonGenerator { assertEquals(2, nodes.sinkStreams.size()); assertEquals(2, nodes.intermediateStreams.size()); } + + @Test + public void test2() throws Exception { + Map<String, String> configMap = new HashMap<>(); + configMap.put(JobConfig.JOB_NAME(), "test-app"); + configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system"); + Config config = new MapConfig(configMap); + + StreamSpec input = new StreamSpec("PageView", "hdfs:/user/dummy/PageViewEvent", "hdfs"); + StreamSpec output = new StreamSpec("PageViewCount", "PageViewCount", "kafka"); + + ApplicationRunner runner = mock(ApplicationRunner.class); + when(runner.getStreamSpec("PageView")).thenReturn(input); + when(runner.getStreamSpec("PageViewCount")).thenReturn(output); + + // intermediate streams used in tests + when(runner.getStreamSpec("test-app-1-partition_by-keyed-by-country")) + .thenReturn(new StreamSpec("test-app-1-partition_by-keyed-by-country", "test-app-1-partition_by-keyed-by-country", "kafka")); + + // set up external partition count + Map<String, Integer> system1Map = new HashMap<>(); + system1Map.put("hdfs:/user/dummy/PageViewEvent", 512); + Map<String, Integer> system2Map = new HashMap<>(); + system2Map.put("PageViewCount", 16); + + Map<String, SystemAdmin> systemAdmins = new HashMap<>(); + SystemAdmin systemAdmin1 = createSystemAdmin(system1Map); + SystemAdmin systemAdmin2 = createSystemAdmin(system2Map); + systemAdmins.put("hdfs", systemAdmin1); + systemAdmins.put("kafka", systemAdmin2); + StreamManager streamManager = new StreamManager(systemAdmins); + + StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); + MessageStream<KV<String, PageViewEvent>> inputStream = streamGraph.getInputStream("PageView"); + inputStream + .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), "keyed-by-country") + .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(), + Duration.ofSeconds(10L), + () -> 0L, + (m, c) -> c + 1L, + new StringSerde(), + new LongSerde()), "count-by-country") + .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage())) + .sendTo(streamGraph.getOutputStream("PageViewCount")); + + ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); + ExecutionPlan plan = planner.plan(streamGraph); + String json = plan.getPlanAsJson(); + System.out.println(json); + + // deserialize + ObjectMapper mapper = new ObjectMapper(); + JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class); + JobGraphJsonGenerator.OperatorGraphJson operatorGraphJson = nodes.jobs.get(0).operatorGraph; + assertEquals(2, operatorGraphJson.inputStreams.size()); + assertEquals(4, operatorGraphJson.operators.size()); + assertEquals(1, nodes.sourceStreams.size()); + assertEquals(1, nodes.sinkStreams.size()); + assertEquals(1, nodes.intermediateStreams.size()); + + // verify partitionBy op output to the intermdiate stream of the same id + assertEquals(operatorGraphJson.operators.get("test-app-1-partition_by-keyed-by-country").get("outputStreamId"), + "test-app-1-partition_by-keyed-by-country"); + assertEquals(operatorGraphJson.operators.get("test-app-1-send_to-5").get("outputStreamId"), + "PageViewCount"); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/608e4e0b/samza-shell/src/main/visualizer/js/planToDagre.js ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/visualizer/js/planToDagre.js b/samza-shell/src/main/visualizer/js/planToDagre.js index 0421c33..77adaa2 100644 --- a/samza-shell/src/main/visualizer/js/planToDagre.js +++ b/samza-shell/src/main/visualizer/js/planToDagre.js @@ -36,7 +36,7 @@ function planToDagre(data) { labelVal += "<li>PhysicalName: " + stream.streamSpec.physicalName + "</li>" labelVal += "<li>PartitionCount: " + stream.streamSpec.partitionCount + "</li>" labelVal += "</ul></div>" - g.setNode(streamId, { label: labelVal, labelType: "html", shape: "ellipse", class: streamClasses[i] }); + g.setNode(streamId + "-stream", { label: labelVal, labelType: "html", shape: "ellipse", class: streamClasses[i] }); } } @@ -67,7 +67,7 @@ function planToDagre(data) { for (var k = 0; k < inputs.length; k++) { var input = inputs[k]; for (var m = 0; m < input.nextOperatorIds.length; m++) { - g.setEdge(input.streamId, input.nextOperatorIds[m]); + g.setEdge(input.streamId + "-stream", input.nextOperatorIds[m]); } } @@ -78,7 +78,7 @@ function planToDagre(data) { g.setEdge(opId, operator.nextOperatorIds[j]); } if (typeof(operator.outputStreamId) !== 'undefined') { - g.setEdge(opId, operator.outputStreamId); + g.setEdge(opId, operator.outputStreamId + "-stream"); } } }
