SAMZA-1204: Visualize StreamGraph and ExecutionPlan Once a Samza application (using fluent API) is deployed, an execution plan will be generated by the ExecutionPlanner. The plan JSON will be written to a file (plan.json) under the ./plan directory, which also contains the plan.html and javscripts (js folder).
Author: Xinyu Liu <[email protected]> Author: xinyuiscool <[email protected]> Reviewers: Jake Maes <[email protected]> Closes #127 from xinyuiscool/SAMZA-1204 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b71b253d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b71b253d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b71b253d Branch: refs/heads/master Commit: b71b253d2ad6bf1dc68c7bc6bb2e30782d86c7ff Parents: ad1f161 Author: Xinyu Liu <[email protected]> Authored: Tue May 2 10:09:22 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Tue May 2 10:09:22 2017 -0700 ---------------------------------------------------------------------- build.gradle | 3 + .../apache/samza/execution/ExecutionPlan.java | 11 +- .../samza/execution/ExecutionPlanner.java | 10 +- .../org/apache/samza/execution/JobGraph.java | 45 +++-- .../samza/execution/JobGraphJsonGenerator.java | 199 ++++++++++--------- .../org/apache/samza/execution/JobNode.java | 10 +- .../samza/operators/MessageStreamImpl.java | 30 +-- .../samza/operators/impl/RootOperatorImpl.java | 5 + .../samza/operators/spec/OperatorSpec.java | 6 + .../operators/spec/PartialJoinOperatorSpec.java | 8 + .../samza/operators/spec/SinkOperatorSpec.java | 8 + .../operators/spec/StreamOperatorSpec.java | 8 + .../operators/spec/WindowOperatorSpec.java | 8 + .../samza/operators/util/OperatorJsonUtils.java | 89 +++++++++ .../runtime/AbstractApplicationRunner.java | 27 +++ .../samza/runtime/LocalApplicationRunner.java | 1 + .../samza/runtime/RemoteApplicationRunner.java | 1 + .../samza/config/ShellCommandConfig.scala | 5 + .../samza/execution/TestExecutionPlanner.java | 15 +- .../apache/samza/execution/TestJobGraph.java | 82 ++++---- .../execution/TestJobGraphJsonGenerator.java | 4 +- .../samza/operators/impl/TestOperatorImpl.java | 5 + .../samza/operators/spec/TestOperatorSpecs.java | 6 +- samza-shell/src/main/assembly/src.xml | 8 + samza-shell/src/main/bash/run-app.sh | 9 + samza-shell/src/main/visualizer/js/d3.v3.min.js | 5 + .../src/main/visualizer/js/dagre-d3.min.js | 28 +++ .../src/main/visualizer/js/planToDagre.js | 91 +++++++++ samza-shell/src/main/visualizer/plan.html | 118 +++++++++++ 29 files changed, 651 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index dc56077..74e5161 100644 --- a/build.gradle +++ b/build.gradle @@ -71,6 +71,8 @@ rat { '**/non-responsive.less', '**/ropa-sans.css', '**/syntax.css', + '**/d3.v3.min.js', + '**/dagre-d3.min.js', '.idea/**', '.reviewboardrc', 'docs/_site/**', @@ -396,6 +398,7 @@ project(":samza-shell") { classifier = 'dist' from 'src/main/bash' from 'src/main/resources' + from 'src/main/visualizer' } artifacts { http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java index 6e2b4c6..bde9bfb 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlan.java @@ -20,6 +20,7 @@ package org.apache.samza.execution; import java.util.List; +import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.JobConfig; import org.apache.samza.system.StreamSpec; @@ -28,10 +29,11 @@ import org.apache.samza.system.StreamSpec; * This interface represents Samza {@link org.apache.samza.application.StreamApplication} * plans for physical execution. */ [email protected] public interface ExecutionPlan { /** - * Returns the configs for single stage job, in the order of topologically sort. + * Returns the configs for single stage job, in topological sort order. * @return list of job configs */ List<JobConfig> getJobConfigs(); @@ -43,9 +45,10 @@ public interface ExecutionPlan { List<StreamSpec> getIntermediateStreams(); /** - * Returns the JSON representation of the plan for visualization - * @return json string - * @throws Exception exception + * Returns the JSON representation of the plan. + * @return JSON string + * @throws Exception exception during JSON serialization, including {@link java.io.IOException} + * and {@link org.codehaus.jackson.JsonGenerationException} */ String getPlanAsJson() throws Exception; } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index ac39eb8..d763d84 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -61,6 +61,9 @@ public class ExecutionPlanner { // create physical job graph based on stream graph JobGraph jobGraph = createJobGraph(streamGraph); + // fetch the external streams partition info + updateExistingPartitions(jobGraph, streamManager); + if (!jobGraph.getIntermediateStreamEdges().isEmpty()) { // figure out the partitions for internal streams calculatePartitions(streamGraph, jobGraph); @@ -84,7 +87,7 @@ public class ExecutionPlanner { // For this phase, we have a single job node for the whole dag String jobName = config.get(JobConfig.JOB_NAME()); String jobId = config.get(JobConfig.JOB_ID(), "1"); - JobNode node = jobGraph.getOrCreateNode(jobName, jobId, streamGraph); + JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph); // add sources sourceStreams.forEach(spec -> jobGraph.addSource(spec, node)); @@ -104,9 +107,6 @@ public class ExecutionPlanner { * Figure out the number of partitions of all streams */ /* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) { - // fetch the external streams partition info - updateExistingPartitions(jobGraph, streamManager); - // calculate the partitions for the input streams of join operators calculateJoinInputPartitions(streamGraph, jobGraph); @@ -167,7 +167,7 @@ public class ExecutionPlanner { Set<OperatorSpec> visited = new HashSet<>(); streamGraph.getInputStreams().entrySet().forEach(entry -> { - StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey()); + StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey()); // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, outputStreamToJoinSpec, joinQ, visited); http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java index ff5fbdf..35f27ab 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.StreamGraphImpl; @@ -65,40 +66,46 @@ import org.slf4j.LoggerFactory; this.config = config; } - /** - * Returns the configs for single stage job, in the order of topologically sort. - * @return list of job configs - */ + @Override public List<JobConfig> getJobConfigs() { - return getJobNodes().stream().map(JobNode::generateConfig).collect(Collectors.toList()); + String json = ""; + try { + json = getPlanAsJson(); + } catch (Exception e) { + log.warn("Failed to generate plan JSON", e); + } + + final String planJson = json; + return getJobNodes().stream().map(n -> n.generateConfig(planJson)).collect(Collectors.toList()); } - /** - * Returns the intermediate streams that need to be created. - * @return intermediate {@link StreamSpec}s - */ + @Override public List<StreamSpec> getIntermediateStreams() { return getIntermediateStreamEdges().stream() .map(streamEdge -> streamEdge.getStreamSpec()) .collect(Collectors.toList()); } - /** - * Returns the JSON representation of the plan for visualization - * @return json string - * @throws Exception - */ + @Override public String getPlanAsJson() throws Exception { return jsonGenerator.toJson(this); } /** + * Returns the config for this application + * @return {@link ApplicationConfig} + */ + public ApplicationConfig getApplicationConfig() { + return new ApplicationConfig(config); + } + + /** * Add a source stream to a {@link JobNode} * @param input source stream * @param node the job node that consumes from the source */ void addSource(StreamSpec input, JobNode node) { - StreamEdge edge = getOrCreateEdge(input); + StreamEdge edge = getOrCreateStreamEdge(input); edge.addTargetNode(node); node.addInEdge(edge); sources.add(edge); @@ -110,7 +117,7 @@ import org.slf4j.LoggerFactory; * @param node the job node that outputs to the sink */ void addSink(StreamSpec output, JobNode node) { - StreamEdge edge = getOrCreateEdge(output); + StreamEdge edge = getOrCreateStreamEdge(output); edge.addSourceNode(node); node.addOutEdge(edge); sinks.add(edge); @@ -123,7 +130,7 @@ import org.slf4j.LoggerFactory; * @param to the target node */ void addIntermediateStream(StreamSpec streamSpec, JobNode from, JobNode to) { - StreamEdge edge = getOrCreateEdge(streamSpec); + StreamEdge edge = getOrCreateStreamEdge(streamSpec); edge.addSourceNode(from); edge.addTargetNode(to); from.addOutEdge(edge); @@ -137,7 +144,7 @@ import org.slf4j.LoggerFactory; * @param jobId id of the job * @return */ - JobNode getOrCreateNode(String jobName, String jobId, StreamGraphImpl streamGraph) { + JobNode getOrCreateJobNode(String jobName, String jobId, StreamGraphImpl streamGraph) { String nodeId = JobNode.createId(jobName, jobId); JobNode node = nodes.get(nodeId); if (node == null) { @@ -152,7 +159,7 @@ import org.slf4j.LoggerFactory; * @param streamSpec spec of the StreamEdge * @return stream edge */ - StreamEdge getOrCreateEdge(StreamSpec streamSpec) { + StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec) { String streamId = streamSpec.getId(); StreamEdge edge = edges.get(streamId); if (edge == null) { http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java index 317616c..96c0538 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java @@ -19,95 +19,99 @@ package org.apache.samza.execution; +import com.google.common.base.Joiner; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.PartialJoinOperatorSpec; -import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.apache.samza.operators.util.OperatorJsonUtils; import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.map.ObjectMapper; + /** * This class generates the JSON representation of the {@link JobGraph}. */ -public class JobGraphJsonGenerator { - - /** - * This class provides the necessary connection of operators for traversal. - */ - static abstract class Traversable { - @JsonProperty("NextOperatorIds") - Set<Integer> nextOperatorIds = new HashSet<>(); - } - - static final class OperatorJson extends Traversable { - @JsonProperty("OpCode") - String opCode; - @JsonProperty("OpId") - int opId; - @JsonProperty("OutputStreamId") - String outputStreamId; - @JsonProperty("PairedOpId") - int pairedOpId = -1; //for join operator, we will have a pair nodes for two partial joins - } +/* package private */ class JobGraphJsonGenerator { static final class StreamSpecJson { - @JsonProperty("Id") + @JsonProperty("id") String id; - @JsonProperty("SystemName") + @JsonProperty("systemName") String systemName; - @JsonProperty("PhysicalName") + @JsonProperty("physicalName") String physicalName; - @JsonProperty("PartitionCount") + @JsonProperty("partitionCount") int partitionCount; } static final class StreamEdgeJson { - @JsonProperty("StreamSpec") + @JsonProperty("streamSpec") StreamSpecJson streamSpec; + @JsonProperty("sourceJobs") + List<String> sourceJobs; + @JsonProperty("targetJobs") + List<String> targetJobs; } static final class OperatorGraphJson { - @JsonProperty("InputStreams") - List<InputStreamJson> inputStreams; - @JsonProperty("Operators") - Map<Integer, OperatorJson> operators = new HashMap<>(); + @JsonProperty("inputStreams") + List<StreamJson> inputStreams; + @JsonProperty("outputStreams") + List<StreamJson> outputStreams; + @JsonProperty("operators") + Map<Integer, Map<String, Object>> operators = new HashMap<>(); + @JsonProperty("canonicalOpIds") + Map<Integer, String> canonicalOpIds = new HashMap<>(); } - static final class InputStreamJson extends Traversable { - @JsonProperty("StreamId") + static final class StreamJson { + @JsonProperty("streamId") String streamId; + @JsonProperty("nextOperatorIds") + Set<Integer> nextOperatorIds = new HashSet<>(); } static final class JobNodeJson { - @JsonProperty("JobName") + @JsonProperty("jobName") String jobName; - @JsonProperty("JobId") + @JsonProperty("jobId") String jobId; - @JsonProperty("OperatorGraph") + @JsonProperty("operatorGraph") OperatorGraphJson operatorGraph; } static final class JobGraphJson { - @JsonProperty("Jobs") + @JsonProperty("jobs") List<JobNodeJson> jobs; - @JsonProperty("Streams") - Map<String, StreamEdgeJson> streams; + @JsonProperty("sourceStreams") + Map<String, StreamEdgeJson> sourceStreams; + @JsonProperty("sinkStreams") + Map<String, StreamEdgeJson> sinkStreams; + @JsonProperty("intermediateStreams") + Map<String, StreamEdgeJson> intermediateStreams; + @JsonProperty("applicationName") + String applicationName; + @JsonProperty("applicationId") + String applicationId; } - // Mapping from the output stream to the join spec. Since StreamGraph creates two partial join operators for a join and they - // will have the same output stream, this mapping is used to choose one of them as the unique join spec representing this join - // (who register first in the map wins). - Map<MessageStream, OperatorSpec> outputStreamToJoinSpec = new HashMap<>(); + // Mapping from the output stream to the ids. + // Logically they belong to the same operator, but in code we generate one operator for each input. + // This is to associate the operators that output to the same MessageStream. + Multimap<MessageStream, Integer> outputStreamToOpIds = HashMultimap.create(); /** * Returns the JSON representation of a {@link JobGraph} @@ -119,13 +123,18 @@ public class JobGraphJsonGenerator { JobGraphJson jobGraphJson = new JobGraphJson(); // build StreamEdge JSON - jobGraphJson.streams = new HashMap<>(); - jobGraph.getSources().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams)); - jobGraph.getSinks().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams)); - jobGraph.getIntermediateStreamEdges().forEach(e -> getOrCreateStreamEdgeJson(e, jobGraphJson.streams)); + ApplicationConfig appConfig = jobGraph.getApplicationConfig(); + jobGraphJson.applicationName = appConfig.getAppName(); + jobGraphJson.applicationId = appConfig.getAppId(); + jobGraphJson.sourceStreams = new HashMap<>(); + jobGraphJson.sinkStreams = new HashMap<>(); + jobGraphJson.intermediateStreams = new HashMap<>(); + jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams)); + jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams)); + jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.intermediateStreams)); jobGraphJson.jobs = jobGraph.getJobNodes().stream() - .map(jobNode -> buildJobNodeJson(jobNode, jobGraphJson.streams)) + .map(jobNode -> buildJobNodeJson(jobNode)) .collect(Collectors.toList()); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -137,10 +146,9 @@ public class JobGraphJsonGenerator { /** * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.operators.StreamGraph} for this job * @param jobNode job node in the {@link JobGraph} - * @param streamEdges map of {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson} * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson} */ - private JobNodeJson buildJobNodeJson(JobNode jobNode, Map<String, StreamEdgeJson> streamEdges) { + private JobNodeJson buildJobNodeJson(JobNode jobNode) { JobNodeJson job = new JobNodeJson(); job.jobName = jobNode.getJobName(); job.jobId = jobNode.getJobId(); @@ -157,10 +165,27 @@ public class JobGraphJsonGenerator { OperatorGraphJson opGraph = new OperatorGraphJson(); opGraph.inputStreams = new ArrayList<>(); jobNode.getStreamGraph().getInputStreams().forEach((streamSpec, stream) -> { - InputStreamJson inputJson = new InputStreamJson(); - inputJson.streamId = streamSpec.getId(); + StreamJson inputJson = new StreamJson(); opGraph.inputStreams.add(inputJson); - updateOperatorGraphJson((MessageStreamImpl) stream, inputJson, opGraph); + inputJson.streamId = streamSpec.getId(); + Collection<OperatorSpec> specs = ((MessageStreamImpl) stream).getRegisteredOperatorSpecs(); + inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()); + + updateOperatorGraphJson((MessageStreamImpl) stream, opGraph); + + for (Map.Entry<MessageStream, Collection<Integer>> entry : outputStreamToOpIds.asMap().entrySet()) { + List<Integer> sortedIds = new ArrayList<>(entry.getValue()); + Collections.sort(sortedIds); + String canonicalId = Joiner.on(',').join(sortedIds); + sortedIds.stream().forEach(id -> opGraph.canonicalOpIds.put(id, canonicalId)); + } + }); + + opGraph.outputStreams = new ArrayList<>(); + jobNode.getStreamGraph().getOutputStreams().keySet().forEach(streamSpec -> { + StreamJson outputJson = new StreamJson(); + outputJson.streamId = streamSpec.getId(); + opGraph.outputStreams.add(outputJson); }); return opGraph; } @@ -168,65 +193,30 @@ public class JobGraphJsonGenerator { /** * Traverse the {@StreamGraph} recursively and update the operator graph JSON POJO. * @param messageStream input - * @param parent parent node in the traveral * @param opGraph operator graph to build */ - private void updateOperatorGraphJson(MessageStreamImpl messageStream, Traversable parent, OperatorGraphJson opGraph) { + private void updateOperatorGraphJson(MessageStreamImpl messageStream, OperatorGraphJson opGraph) { Collection<OperatorSpec> specs = messageStream.getRegisteredOperatorSpecs(); specs.forEach(opSpec -> { - parent.nextOperatorIds.add(opSpec.getOpId()); + opGraph.operators.put(opSpec.getOpId(), OperatorJsonUtils.operatorToMap(opSpec)); - OperatorJson opJson = getOrCreateOperatorJson(opSpec, opGraph); - if (opSpec instanceof SinkOperatorSpec) { - opJson.outputStreamId = ((SinkOperatorSpec) opSpec).getOutputStream().getStreamSpec().getId(); - } else if (opSpec.getNextStream() != null) { - updateOperatorGraphJson(opSpec.getNextStream(), opJson, opGraph); + if (opSpec.getOpCode() == OperatorSpec.OpCode.JOIN || opSpec.getOpCode() == OperatorSpec.OpCode.MERGE) { + outputStreamToOpIds.put(opSpec.getNextStream(), opSpec.getOpId()); } - }); - } - - /** - * Get or create the JSON POJO for an operator. - * @param opSpec {@link OperatorSpec} - * @param opGraph {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson} - * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorJson} - */ - private OperatorJson getOrCreateOperatorJson(OperatorSpec opSpec, OperatorGraphJson opGraph) { - Map<Integer, OperatorJson> operators = opGraph.operators; - OperatorJson opJson = operators.get(opSpec.getOpId()); - if (opJson == null) { - opJson = new OperatorJson(); - opJson.opCode = opSpec.getOpCode().name(); - opJson.opId = opSpec.getOpId(); - operators.put(opSpec.getOpId(), opJson); - } - if (opSpec instanceof PartialJoinOperatorSpec) { - // every join will have two partial join operators - // we will choose one of them in order to consolidate the inputs - // the first one who registered with the outputStreamToJoinSpec will win - MessageStream output = opSpec.getNextStream(); - OperatorSpec joinSpec = outputStreamToJoinSpec.get(output); - if (joinSpec == null) { - joinSpec = opSpec; - outputStreamToJoinSpec.put(output, joinSpec); - } else if (joinSpec != opSpec) { - OperatorJson joinNode = operators.get(joinSpec.getOpId()); - joinNode.pairedOpId = opJson.opId; - opJson.pairedOpId = joinNode.opId; - } - } - - return opJson; + if (opSpec.getNextStream() != null) { + updateOperatorGraphJson(opSpec.getNextStream(), opGraph); + } + }); } /** * Get or create the JSON POJO for a {@link StreamEdge} * @param edge {@link StreamEdge} * @param streamEdges map of streamId to {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson} - * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson} + * @return JSON representation of the {@link StreamEdge} */ - private StreamEdgeJson getOrCreateStreamEdgeJson(StreamEdge edge, Map<String, StreamEdgeJson> streamEdges) { + private StreamEdgeJson buildStreamEdgeJson(StreamEdge edge, Map<String, StreamEdgeJson> streamEdges) { String streamId = edge.getStreamSpec().getId(); StreamEdgeJson edgeJson = streamEdges.get(streamId); if (edgeJson == null) { @@ -237,6 +227,19 @@ public class JobGraphJsonGenerator { streamSpecJson.physicalName = edge.getStreamSpec().getPhysicalName(); streamSpecJson.partitionCount = edge.getPartitionCount(); edgeJson.streamSpec = streamSpecJson; + + List<String> sourceJobs = new ArrayList<>(); + edge.getSourceNodes().forEach(jobNode -> { + sourceJobs.add(jobNode.getJobName()); + }); + edgeJson.sourceJobs = sourceJobs; + + List<String> targetJobs = new ArrayList<>(); + edge.getTargetNodes().forEach(jobNode -> { + targetJobs.add(jobNode.getJobName()); + }); + edgeJson.targetJobs = targetJobs; + streamEdges.put(streamId, edgeJson); } return edgeJson; http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index e19c9ca..0484cf9 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; public class JobNode { private static final Logger log = LoggerFactory.getLogger(JobNode.class); private static final String CONFIG_JOB_PREFIX = "jobs.%s."; + private static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan"; private final String jobName; private final String jobId; @@ -92,7 +93,12 @@ public class JobNode { return outEdges; } - public JobConfig generateConfig() { + /** + * Generate the configs for a job + * @param executionPlanJson JSON representation of the execution plan + * @return config of the job + */ + public JobConfig generateConfig(String executionPlanJson) { Map<String, String> configs = new HashMap<>(); configs.put(JobConfig.JOB_NAME(), jobName); @@ -100,6 +106,8 @@ public class JobNode { configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs)); log.info("Job {} has generated configs {}", jobName, configs); + configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson); + String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName); // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline. return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix))); http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 69a41db..b9adeed 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -81,16 +81,16 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public MessageStream<M> filter(FilterFunction<? super M> filterFn) { - OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec( - filterFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); + OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, new MessageStreamImpl<>(this.graph), + this.graph.getNextOpId()); this.registeredOperatorSpecs.add(op); return op.getNextStream(); } @Override public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) { - OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec( - flatMapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); + OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, new MessageStreamImpl<>(this.graph), + this.graph.getNextOpId()); this.registeredOperatorSpecs.add(op); return op.getNextStream(); } @@ -103,15 +103,15 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <K, V> void sendTo(OutputStream<K, V, M> outputStream) { - SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec( - (OutputStreamInternal<K, V, M>) outputStream, this.graph.getNextOpId()); + SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamInternal<K, V, M>) outputStream, + this.graph.getNextOpId()); this.registeredOperatorSpecs.add(op); } @Override public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) { - OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec( - (WindowInternal<M, K, WV>) window, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); + OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window, + new MessageStreamImpl<>(this.graph), this.graph.getNextOpId()); this.registeredOperatorSpecs.add(wndOp); return wndOp.getNextStream(); } @@ -175,9 +175,9 @@ public class MessageStreamImpl<M> implements MessageStream<M> { this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec( thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId())); - ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs - .add(OperatorSpecs.createPartialJoinOperatorSpec( - otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId())); + ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs + .createPartialJoinOperatorSpec(otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream, + this.graph.getNextOpId())); return nextStream; } @@ -187,8 +187,11 @@ public class MessageStreamImpl<M> implements MessageStream<M> { MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph); otherStreams.add(this); - otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs. - add(OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId()))); + otherStreams.forEach(other -> { + OperatorSpec mergeOperatorSepc = + OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId()); + ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(mergeOperatorSepc); + }); return nextStream; } @@ -213,5 +216,4 @@ public class MessageStreamImpl<M> implements MessageStream<M> { public Collection<OperatorSpec> getRegisteredOperatorSpecs() { return Collections.unmodifiableSet(this.registeredOperatorSpecs); } - } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java index 0f18e97..059b567 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java @@ -62,6 +62,11 @@ public final class RootOperatorImpl<M> extends OperatorImpl<M, M> { public int getOpId() { return -1; } + + @Override + public String getSourceLocation() { + return ""; + } }; } } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index cc3c4ab..3ea52ca 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -63,6 +63,12 @@ public interface OperatorSpec<OM> { int getOpId(); /** + * Return the user source code location that creates the operator + * @return source location + */ + String getSourceLocation(); + + /** * Get the name for this operator based on its opCode and opId. * @return the name for this operator */ http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java index e85626f..92b4170 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.PartialJoinFunction; +import org.apache.samza.operators.util.OperatorJsonUtils; /** @@ -38,6 +39,7 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> { private final long ttlMs; private final MessageStreamImpl<RM> nextStream; private final int opId; + private final String sourceLocation; /** * Default constructor for a {@link PartialJoinOperatorSpec}. @@ -58,6 +60,7 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> { this.ttlMs = ttlMs; this.nextStream = nextStream; this.opId = opId; + this.sourceLocation = OperatorJsonUtils.getSourceLocation(); } @Override @@ -86,4 +89,9 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> { public int getOpId() { return this.opId; } + + @Override + public String getSourceLocation() { + return sourceLocation; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java index 0d135d3..afdd6b9 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java @@ -21,6 +21,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.stream.OutputStreamInternal; +import org.apache.samza.operators.util.OperatorJsonUtils; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; @@ -39,6 +40,7 @@ public class SinkOperatorSpec<M> implements OperatorSpec { private OutputStreamInternal<?, ?, M> outputStream; // may be null private final OperatorSpec.OpCode opCode; private final int opId; + private final String sourceLocation; /** * Constructs a {@link SinkOperatorSpec} with a user defined {@link SinkFunction}. @@ -54,6 +56,7 @@ public class SinkOperatorSpec<M> implements OperatorSpec { this.sinkFn = sinkFn; this.opCode = opCode; this.opId = opId; + this.sourceLocation = OperatorJsonUtils.getSourceLocation(); } /** @@ -99,6 +102,11 @@ public class SinkOperatorSpec<M> implements OperatorSpec { return this.opId; } + @Override + public String getSourceLocation() { + return sourceLocation; + } + /** * Creates a {@link SinkFunction} to send messages to the provided {@code output}. * @param outputStream the {@link OutputStreamInternal} to send messages to http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index 204e566..c53efae 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.util.OperatorJsonUtils; /** @@ -34,6 +35,7 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> { private final MessageStreamImpl<OM> nextStream; private final OperatorSpec.OpCode opCode; private final int opId; + private final String sourceLocation; /** * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}. @@ -49,6 +51,7 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> { this.nextStream = nextStream; this.opCode = opCode; this.opId = opId; + this.sourceLocation = OperatorJsonUtils.getSourceLocation(); } @Override @@ -69,4 +72,9 @@ public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> { public int getOpId() { return this.opId; } + + @Override + public String getSourceLocation() { + return sourceLocation; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 73b17b5..7ea07f6 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -20,6 +20,7 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.MessageStreamImpl; +import org.apache.samza.operators.util.OperatorJsonUtils; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; @@ -36,6 +37,7 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK private final WindowInternal<M, WK, WV> window; private final MessageStreamImpl<WindowPane<WK, WV>> nextStream; private final int opId; + private final String sourceLocation; /** * Constructor for {@link WindowOperatorSpec}. @@ -48,6 +50,7 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK this.nextStream = nextStream; this.window = window; this.opId = opId; + this.sourceLocation = OperatorJsonUtils.getSourceLocation(); } @Override @@ -68,4 +71,9 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK public int getOpId() { return this.opId; } + + @Override + public String getSourceLocation() { + return sourceLocation; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java new file mode 100644 index 0000000..b52fbc3 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.util; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; +import org.apache.samza.operators.spec.SinkOperatorSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OperatorJsonUtils { + private static final Logger log = LoggerFactory.getLogger(OperatorJsonUtils.class); + + private static final String OP_CODE = "opCode"; + private static final String OP_ID = "opId"; + private static final String SOURCE_LOCATION = "sourceLocation"; + private static final String NEXT_OPERATOR_IDS = "nextOperatorIds"; + private static final String OUTPUT_STREAM_ID = "outputStreamId"; + private static final String TTL_MS = "ttlMs"; + + /** + * Format the operator properties into a map + * @param spec a {@link OperatorSpec} instance + * @return map of the operator properties + */ + public static Map<String, Object> operatorToMap(OperatorSpec spec) { + Map<String, Object> map = new HashMap<>(); + map.put(OP_CODE, spec.getOpCode().name()); + map.put(OP_ID, spec.getOpId()); + map.put(SOURCE_LOCATION, spec.getSourceLocation()); + + if (spec.getNextStream() != null) { + Collection<OperatorSpec> nextOperators = spec.getNextStream().getRegisteredOperatorSpecs(); + map.put(NEXT_OPERATOR_IDS, nextOperators.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet())); + } else { + map.put(NEXT_OPERATOR_IDS, Collections.emptySet()); + } + + if (spec instanceof SinkOperatorSpec) { + map.put(OUTPUT_STREAM_ID, ((SinkOperatorSpec) spec).getOutputStream().getStreamSpec().getId()); + } + + if (spec instanceof PartialJoinOperatorSpec) { + map.put(TTL_MS, ((PartialJoinOperatorSpec) spec).getTtlMs()); + } + + return map; + } + + /** + * Return the location of source code that creates the operator. + * This function is invoked in the constructor of each operator. + * @return formatted source location including file and line number + */ + public static String getSourceLocation() { + // The stack trace looks like: + // [0] Thread.getStackTrace() + // [1] OperatorJsonUtils.getSourceLocation() + // [2] SomeOperator.<init>() + // [3] OperatorSpecs.createSomeOperator() + // [4] MessageStreamImpl.someOperator() + // [5] User code that calls [2] + // we are only interested in [5] here + StackTraceElement location = Thread.currentThread().getStackTrace()[5]; + return String.format("%s:%s", location.getFileName(), location.getLineNumber()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java index 692dc38..3c7c83d 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java @@ -18,22 +18,28 @@ */ package org.apache.samza.runtime; +import java.io.File; +import java.io.PrintWriter; import java.util.Map; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.execution.ExecutionPlanner; import org.apache.samza.execution.StreamManager; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.system.StreamSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Defines common, core behavior for implementations of the {@link ApplicationRunner} API */ public abstract class AbstractApplicationRunner extends ApplicationRunner { + private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class); private final StreamManager streamManager; private final ExecutionPlanner planner; @@ -106,4 +112,25 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { final StreamManager getStreamManager() { return streamManager; } + + /** + * Write the execution plan JSON to a file + * @param planJson JSON representation of the plan + */ + final void writePlanJsonFile(String planJson) { + try { + String content = "plan='" + planJson + "'"; + String planPath = System.getenv(ShellCommandConfig.EXECUTION_PLAN_DIR()); + if (planPath != null && !planPath.isEmpty()) { + // Write the plan json to plan path + File file = new File(planPath + "/plan.json"); + file.setReadable(true, false); + PrintWriter writer = new PrintWriter(file, "UTF-8"); + writer.println(content); + writer.close(); + } + } catch (Exception e) { + log.warn("Failed to write execution plan json to file", e); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 5e83c3c..bff0f1c 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -110,6 +110,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { try { // 1. initialize and plan ExecutionPlan plan = getExecutionPlan(app); + writePlanJsonFile(plan.getPlanAsJson()); // 2. create the necessary streams createStreams(plan.getIntermediateStreams()); http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 38eb195..d5f6e21 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -50,6 +50,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { try { // 1. initialize and plan ExecutionPlan plan = getExecutionPlan(app); + writePlanJsonFile(plan.getPlanAsJson()); // 2. create the necessary streams getStreamManager().createStreams(plan.getIntermediateStreams()); http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala index 1397ed5..3c0f320 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala @@ -53,6 +53,11 @@ object ShellCommandConfig { */ val ENV_LOGGED_STORE_BASE_DIR = "LOGGED_STORE_BASE_DIR" + /** + * The directory path that contains the execution plan + */ + val EXECUTION_PLAN_DIR = "EXECUTION_PLAN_DIR" + val COMMAND_SHELL_EXECUTE = "task.execute" val TASK_JVM_OPTS = "task.opts" val TASK_JAVA_HOME = "task.java.home" http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index b7f952a..5366dc3 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -220,11 +220,11 @@ public class TestExecutionPlanner { JobGraph jobGraph = planner.createJobGraph(streamGraph); ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager); - assertTrue(jobGraph.getOrCreateEdge(input1).getPartitionCount() == 64); - assertTrue(jobGraph.getOrCreateEdge(input2).getPartitionCount() == 16); - assertTrue(jobGraph.getOrCreateEdge(input3).getPartitionCount() == 32); - assertTrue(jobGraph.getOrCreateEdge(output1).getPartitionCount() == 8); - assertTrue(jobGraph.getOrCreateEdge(output2).getPartitionCount() == 16); + assertTrue(jobGraph.getOrCreateStreamEdge(input1).getPartitionCount() == 64); + assertTrue(jobGraph.getOrCreateStreamEdge(input2).getPartitionCount() == 16); + assertTrue(jobGraph.getOrCreateStreamEdge(input3).getPartitionCount() == 32); + assertTrue(jobGraph.getOrCreateStreamEdge(output1).getPartitionCount() == 8); + assertTrue(jobGraph.getOrCreateStreamEdge(output2).getPartitionCount() == 16); jobGraph.getIntermediateStreamEdges().forEach(edge -> { assertTrue(edge.getPartitionCount() == -1); @@ -264,11 +264,10 @@ public class TestExecutionPlanner { } @Test - public void testCalculateIntStreamPartitions() { + public void testCalculateIntStreamPartitions() throws Exception { ExecutionPlanner planner = new ExecutionPlanner(config, streamManager); StreamGraphImpl streamGraph = createSimpleGraph(); - JobGraph jobGraph = planner.createJobGraph(streamGraph); - planner.calculatePartitions(streamGraph, jobGraph); + JobGraph jobGraph = (JobGraph) planner.plan(streamGraph); // the partitions should be the same as input1 jobGraph.getIntermediateStreams().forEach(edge -> { http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java index 4a4498c..bf131ce 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java @@ -59,14 +59,14 @@ public class TestJobGraph { private void createGraph1() { graph1 = new JobGraph(null); - JobNode n2 = graph1.getOrCreateNode("2", "1", null); - JobNode n3 = graph1.getOrCreateNode("3", "1", null); - JobNode n5 = graph1.getOrCreateNode("5", "1", null); - JobNode n7 = graph1.getOrCreateNode("7", "1", null); - JobNode n8 = graph1.getOrCreateNode("8", "1", null); - JobNode n9 = graph1.getOrCreateNode("9", "1", null); - JobNode n10 = graph1.getOrCreateNode("10", "1", null); - JobNode n11 = graph1.getOrCreateNode("11", "1", null); + JobNode n2 = graph1.getOrCreateJobNode("2", "1", null); + JobNode n3 = graph1.getOrCreateJobNode("3", "1", null); + JobNode n5 = graph1.getOrCreateJobNode("5", "1", null); + JobNode n7 = graph1.getOrCreateJobNode("7", "1", null); + JobNode n8 = graph1.getOrCreateJobNode("8", "1", null); + JobNode n9 = graph1.getOrCreateJobNode("9", "1", null); + JobNode n10 = graph1.getOrCreateJobNode("10", "1", null); + JobNode n11 = graph1.getOrCreateJobNode("11", "1", null); graph1.addSource(genStream(), n5); graph1.addSource(genStream(), n7); @@ -92,13 +92,13 @@ public class TestJobGraph { private void createGraph2() { graph2 = new JobGraph(null); - JobNode n1 = graph2.getOrCreateNode("1", "1", null); - JobNode n2 = graph2.getOrCreateNode("2", "1", null); - JobNode n3 = graph2.getOrCreateNode("3", "1", null); - JobNode n4 = graph2.getOrCreateNode("4", "1", null); - JobNode n5 = graph2.getOrCreateNode("5", "1", null); - JobNode n6 = graph2.getOrCreateNode("6", "1", null); - JobNode n7 = graph2.getOrCreateNode("7", "1", null); + JobNode n1 = graph2.getOrCreateJobNode("1", "1", null); + JobNode n2 = graph2.getOrCreateJobNode("2", "1", null); + JobNode n3 = graph2.getOrCreateJobNode("3", "1", null); + JobNode n4 = graph2.getOrCreateJobNode("4", "1", null); + JobNode n5 = graph2.getOrCreateJobNode("5", "1", null); + JobNode n6 = graph2.getOrCreateJobNode("6", "1", null); + JobNode n7 = graph2.getOrCreateJobNode("7", "1", null); graph2.addSource(genStream(), n1); graph2.addIntermediateStream(genStream(), n1, n2); @@ -119,8 +119,8 @@ public class TestJobGraph { private void createGraph3() { graph3 = new JobGraph(null); - JobNode n1 = graph3.getOrCreateNode("1", "1", null); - JobNode n2 = graph3.getOrCreateNode("2", "1", null); + JobNode n1 = graph3.getOrCreateJobNode("1", "1", null); + JobNode n2 = graph3.getOrCreateJobNode("2", "1", null); graph3.addSource(genStream(), n1); graph3.addIntermediateStream(genStream(), n1, n1); @@ -135,7 +135,7 @@ public class TestJobGraph { private void createGraph4() { graph4 = new JobGraph(null); - JobNode n1 = graph4.getOrCreateNode("1", "1", null); + JobNode n1 = graph4.getOrCreateJobNode("1", "1", null); graph4.addSource(genStream(), n1); graph4.addIntermediateStream(genStream(), n1, n1); @@ -160,9 +160,9 @@ public class TestJobGraph { * s3 -> 2 * |-> 3 */ - JobNode n1 = graph.getOrCreateNode("1", "1", null); - JobNode n2 = graph.getOrCreateNode("2", "1", null); - JobNode n3 = graph.getOrCreateNode("3", "1", null); + JobNode n1 = graph.getOrCreateJobNode("1", "1", null); + JobNode n2 = graph.getOrCreateJobNode("2", "1", null); + JobNode n3 = graph.getOrCreateJobNode("3", "1", null); StreamSpec s1 = genStream(); StreamSpec s2 = genStream(); StreamSpec s3 = genStream(); @@ -173,16 +173,16 @@ public class TestJobGraph { assertTrue(graph.getSources().size() == 3); - assertTrue(graph.getOrCreateNode("1", "1", null).getInEdges().size() == 2); - assertTrue(graph.getOrCreateNode("2", "1", null).getInEdges().size() == 1); - assertTrue(graph.getOrCreateNode("3", "1", null).getInEdges().size() == 1); + assertTrue(graph.getOrCreateJobNode("1", "1", null).getInEdges().size() == 2); + assertTrue(graph.getOrCreateJobNode("2", "1", null).getInEdges().size() == 1); + assertTrue(graph.getOrCreateJobNode("3", "1", null).getInEdges().size() == 1); - assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 0); - assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 1); - assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 0); - assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 1); - assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 0); - assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 2); + assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 0); + assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 1); + assertTrue(graph.getOrCreateStreamEdge(s2).getSourceNodes().size() == 0); + assertTrue(graph.getOrCreateStreamEdge(s2).getTargetNodes().size() == 1); + assertTrue(graph.getOrCreateStreamEdge(s3).getSourceNodes().size() == 0); + assertTrue(graph.getOrCreateStreamEdge(s3).getTargetNodes().size() == 2); } @Test @@ -193,8 +193,8 @@ public class TestJobGraph { * 2 -> s3 */ JobGraph graph = new JobGraph(null); - JobNode n1 = graph.getOrCreateNode("1", "1", null); - JobNode n2 = graph.getOrCreateNode("2", "1", null); + JobNode n1 = graph.getOrCreateJobNode("1", "1", null); + JobNode n2 = graph.getOrCreateJobNode("2", "1", null); StreamSpec s1 = genStream(); StreamSpec s2 = genStream(); StreamSpec s3 = genStream(); @@ -203,15 +203,15 @@ public class TestJobGraph { graph.addSink(s3, n2); assertTrue(graph.getSinks().size() == 3); - assertTrue(graph.getOrCreateNode("1", "1", null).getOutEdges().size() == 1); - assertTrue(graph.getOrCreateNode("2", "1", null).getOutEdges().size() == 2); - - assertTrue(graph.getOrCreateEdge(s1).getSourceNodes().size() == 1); - assertTrue(graph.getOrCreateEdge(s1).getTargetNodes().size() == 0); - assertTrue(graph.getOrCreateEdge(s2).getSourceNodes().size() == 1); - assertTrue(graph.getOrCreateEdge(s2).getTargetNodes().size() == 0); - assertTrue(graph.getOrCreateEdge(s3).getSourceNodes().size() == 1); - assertTrue(graph.getOrCreateEdge(s3).getTargetNodes().size() == 0); + assertTrue(graph.getOrCreateJobNode("1", "1", null).getOutEdges().size() == 1); + assertTrue(graph.getOrCreateJobNode("2", "1", null).getOutEdges().size() == 2); + + assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 1); + assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 0); + assertTrue(graph.getOrCreateStreamEdge(s2).getSourceNodes().size() == 1); + assertTrue(graph.getOrCreateStreamEdge(s2).getTargetNodes().size() == 0); + assertTrue(graph.getOrCreateStreamEdge(s3).getSourceNodes().size() == 1); + assertTrue(graph.getOrCreateStreamEdge(s3).getTargetNodes().size() == 0); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index c4ab922..2681f9c 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -125,6 +125,8 @@ public class TestJobGraphJsonGenerator { JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class); assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5); assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 12); - assertTrue(nodes.streams.size() == 7); + assertTrue(nodes.sourceStreams.size() == 3); + assertTrue(nodes.sinkStreams.size() == 2); + assertTrue(nodes.intermediateStreams.size() == 2); } } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index bd18f0b..99bf854 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -217,6 +217,11 @@ public class TestOperatorImpl { public int getOpId() { return -1; } + + @Override + public String getSourceLocation() { + return ""; + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java index d227206..cccafaf 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java @@ -57,8 +57,8 @@ public class TestOperatorSpecs { @Test public void testCreateStreamOperator() { FlatMapFunction<Object, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { { - this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L)); - } }; + this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L)); + } }; MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class); StreamOperatorSpec<Object, TestMessageEnvelope> streamOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1); @@ -78,7 +78,7 @@ public class TestOperatorSpecs { public void testCreateSinkOperator() { SystemStream testStream = new SystemStream("test-sys", "test-stream"); SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector, - TaskCoordinator taskCoordinator) -> { + TaskCoordinator taskCoordinator) -> { messageCollector.send(new OutgoingMessageEnvelope(testStream, message.getKey(), message.getMessage())); }; SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1); http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-shell/src/main/assembly/src.xml ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/assembly/src.xml b/samza-shell/src/main/assembly/src.xml index 5173fdf..cc15420 100644 --- a/samza-shell/src/main/assembly/src.xml +++ b/samza-shell/src/main/assembly/src.xml @@ -27,5 +27,13 @@ <include>*</include> </includes> </fileSet> + <fileSet> + <outputDirectory>visualizer</outputDirectory> + <directory>${basedir}/src/main/visualizer</directory> + <fileMode>0644</fileMode> + <includes> + <include>*</include> + </includes> + </fileSet> </fileSets> </assembly> http://git-wip-us.apache.org/repos/asf/samza/blob/b71b253d/samza-shell/src/main/bash/run-app.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/run-app.sh b/samza-shell/src/main/bash/run-app.sh index 3e43463..3880e3c 100644 --- a/samza-shell/src/main/bash/run-app.sh +++ b/samza-shell/src/main/bash/run-app.sh @@ -16,6 +16,15 @@ # specific language governing permissions and limitations # under the License. +home_dir=`pwd` +base_dir=$(dirname $0)/.. +cd $base_dir +base_dir=`pwd` +cd $home_dir + +export EXECUTION_PLAN_DIR="$base_dir/plan" +mkdir -p $EXECUTION_PLAN_DIR + [[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" exec $(dirname $0)/run-class.sh org.apache.samza.runtime.ApplicationRunnerMain "$@"
