Repository: samza Updated Branches: refs/heads/samza-fluent-api-v1 86eb10f2f -> a83c69a25
Support topic partition generation of partitionBy and join Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a83c69a2 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a83c69a2 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a83c69a2 Branch: refs/heads/samza-fluent-api-v1 Commit: a83c69a25d3b1fa67cac2a9113c57e52d86d4f1d Parents: 86eb10f Author: Xinyu Liu <[email protected]> Authored: Thu Mar 2 18:42:23 2017 -0800 Committer: Xinyu Liu <[email protected]> Committed: Fri Mar 3 10:03:30 2017 -0800 ---------------------------------------------------------------------- .../samza/operators/MessageStreamImpl.java | 6 +- .../apache/samza/operators/StreamGraphImpl.java | 30 +- .../samza/operators/spec/OperatorSpecs.java | 4 +- .../samza/processorgraph/ExecutionPlanner.java | 213 ++++++++++---- .../samza/processorgraph/ProcessorGraph.java | 61 ++-- .../samza/processorgraph/ProcessorNode.java | 36 +-- .../apache/samza/processorgraph/StreamEdge.java | 15 - .../system/AbstractExecutionEnvironment.java | 5 +- .../system/RemoteExecutionEnvironment.java | 2 +- .../system/StandaloneExecutionEnvironment.java | 2 +- .../apache/samza/task/StreamOperatorTask.java | 6 +- .../apache/samza/util/ConfigInheritence.java | 6 +- .../org/apache/samza/config/JobConfig.scala | 1 + .../processorgraph/TestExecutionPlanner.java | 281 +++++++++++++++++++ 14 files changed, 525 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 830e4a5..77cb3fc 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -163,10 +163,12 @@ public class MessageStreamImpl<M> implements MessageStream<M> { @Override public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) { - MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor); + int opId = graph.getNextOpId(); + String streamId = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name(), opId); + MessageStreamImpl<M> intStream = this.graph.createIntStream(streamId, parKeyExtractor); OutputStream<M> outputStream = this.graph.getOutputStream(intStream); this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(), - this.graph, outputStream)); + this.graph, outputStream, opId)); return intStream; } /** http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index b965d6a..f3fd176 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -18,20 +18,20 @@ */ package org.apache.samza.operators; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.function.Function; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.serializers.Serde; +import org.apache.samza.system.ExecutionEnvironment; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - /** * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to * create system input/output/intermediate streams. @@ -129,9 +129,14 @@ public class StreamGraphImpl implements StreamGraph { */ private final Map<String, MessageStream> inStreams = new HashMap<>(); private final Map<String, OutputStream> outStreams = new HashMap<>(); + private final ExecutionEnvironment executionEnvironment; private ContextManager contextManager = new ContextManager() { }; + public StreamGraphImpl(ExecutionEnvironment executionEnvironment) { + this.executionEnvironment = executionEnvironment; + } + @Override public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) { if (!this.inStreams.containsKey(streamSpec.getId())) { @@ -182,7 +187,12 @@ public class StreamGraphImpl implements StreamGraph { @Override public Map<StreamSpec, OutputStream> getOutStreams() { Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>(); - this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry)); + this.outStreams.forEach((ss, entry) -> { + StreamSpec streamSpec = (entry instanceof IntermediateStreamImpl) ? + ((IntermediateStreamImpl) entry).getSpec() : + ((OutputStreamImpl) entry).getSpec(); + outStreamMap.put(streamSpec, entry); + }); return Collections.unmodifiableMap(outStreamMap); } @@ -231,9 +241,9 @@ public class StreamGraphImpl implements StreamGraph { * @param <M> the type of input message * @return the {@link OutputStream} object for the re-partitioned stream */ - <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) { + <PK, M> MessageStreamImpl<M> createIntStream(String streamId, Function<M, PK> parKeyFn) { // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec} - StreamSpec streamSpec = this.createIntStreamSpec(); + StreamSpec streamSpec = executionEnvironment.streamFromConfig(streamId); if (!this.inStreams.containsKey(streamSpec.getId())) { this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn)); @@ -244,10 +254,4 @@ public class StreamGraphImpl implements StreamGraph { } return intStream; } - - private StreamSpec createIntStreamSpec() { - // TODO: placeholder to generate the intermediate stream's {@link StreamSpec} automatically - return null; - } - } http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index d626852..7b14b9c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -152,8 +152,8 @@ public class OperatorSpecs { * @param <M> type of input message * @return the {@link SinkOperatorSpec} */ - public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) { - return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream); + public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream, int opId) { + return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, opId, stream); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java index ddf2ca7..757034e 100644 --- a/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ExecutionPlanner.java @@ -24,15 +24,20 @@ import com.google.common.collect.Multimap; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; +import java.util.LinkedList; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemFactory; @@ -43,6 +48,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and + * the intermediate topics needed for the execution. + */ public class ExecutionPlanner { private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class); @@ -56,27 +65,28 @@ public class ExecutionPlanner { Map<String, SystemAdmin> sysAdmins = getSystemAdmins(config); // create physical processors based on stream graph - ProcessorGraph processorGraph = splitStages(streamGraph); + ProcessorGraph processorGraph = createProcessorGraph(streamGraph); if (!processorGraph.getInternalStreams().isEmpty()) { - // figure out the partition for internal streams - Multimap<String, StreamSpec> streams = calculatePartitions(streamGraph, processorGraph, sysAdmins); + // figure out the partitions for internal streams + calculatePartitions(streamGraph, processorGraph, sysAdmins); // create the streams - createStreams(streams, sysAdmins); + createStreams(processorGraph, sysAdmins); } return processorGraph; } - public ProcessorGraph splitStages(StreamGraph streamGraph) throws Exception { - String pipelineId = String.format("%s-%s", config.get(JobConfig.JOB_NAME()), config.getOrDefault(JobConfig.JOB_ID(), "1")); - // For this phase, we are going to create a processor with the whole dag - String processorId = pipelineId; // only one processor, name it the same as pipeline itself + /** + * Create the physical graph from StreamGraph + * Package private for testing + */ + ProcessorGraph createProcessorGraph(StreamGraph streamGraph) { + // For this phase, we are going to create a processor for the whole dag + String processorId = config.get(JobConfig.JOB_NAME()); // only one processor, use the job name ProcessorGraph processorGraph = new ProcessorGraph(config); - - // TODO: remote the casting once we have the correct types in StreamGraph Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet()); Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet()); Set<StreamSpec> intStreams = new HashSet<>(sourceStreams); @@ -98,49 +108,32 @@ public class ExecutionPlanner { return processorGraph; } - private Multimap<String, StreamSpec> calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) { + /** + * Figure out the number of partitions of intermediate streams + * Package private for testing + */ + void calculatePartitions(StreamGraph streamGraph, ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) { // fetch the external streams partition info - getExistingStreamPartitions(processorGraph, sysAdmins); - - // use BFS to figure out the join partition count - - - // TODO this algorithm assumes only one processor, and it does not consider join - Multimap<String, StreamSpec> streamsGroupedBySystem = HashMultimap.create(); - List<ProcessorNode> processors = processorGraph.topologicalSort(); - processors.forEach(processor -> { - Set<StreamEdge> outStreams = new HashSet<>(processor.getOutEdges()); - outStreams.retainAll(processorGraph.getInternalStreams()); - if (!outStreams.isEmpty()) { - int maxInPartition = maxPartition(processor.getInEdges()); - int maxOutPartition = maxPartition(processor.getOutEdges()); - int partition = Math.max(maxInPartition, maxOutPartition); - - outStreams.forEach(streamEdge -> { - if (streamEdge.getPartitions() == -1) { - streamEdge.setPartitions(partition); - StreamSpec streamSpec = createStreamSpec(streamEdge); - streamsGroupedBySystem.put(streamEdge.getSystemStream().getSystem(), streamSpec); - } - }); - } - }); + fetchExistingStreamPartitions(processorGraph, sysAdmins); - return streamsGroupedBySystem; + // calculate the partitions for the input streams of join operators + calculateJoinInputPartitions(streamGraph, processorGraph); + + // calculate the partitions for the rest of intermediate streams + calculateIntStreamPartitions(processorGraph, config); } - private void getExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) { - Set<StreamEdge> allStreams = new HashSet<>(); - allStreams.addAll(processorGraph.getSources()); - allStreams.addAll(processorGraph.getSinks()); - allStreams.addAll(processorGraph.getInternalStreams()); + static void fetchExistingStreamPartitions(ProcessorGraph processorGraph, Map<String, SystemAdmin> sysAdmins) { + Set<StreamEdge> existingStreams = new HashSet<>(); + existingStreams.addAll(processorGraph.getSources()); + existingStreams.addAll(processorGraph.getSinks()); - Multimap<String, StreamEdge> externalStreamsMap = HashMultimap.create(); - allStreams.forEach(streamEdge -> { + Multimap<String, StreamEdge> existingStreamsMap = HashMultimap.create(); + existingStreams.forEach(streamEdge -> { SystemStream systemStream = streamEdge.getSystemStream(); - externalStreamsMap.put(systemStream.getSystem(), streamEdge); + existingStreamsMap.put(systemStream.getSystem(), streamEdge); }); - for (Map.Entry<String, Collection<StreamEdge>> entry : externalStreamsMap.asMap().entrySet()) { + for (Map.Entry<String, Collection<StreamEdge>> entry : existingStreamsMap.asMap().entrySet()) { String systemName = entry.getKey(); Collection<StreamEdge> streamEdges = entry.getValue(); Map<String, StreamEdge> streamToEdge = new HashMap<>(); @@ -150,18 +143,136 @@ public class ExecutionPlanner { metadata.forEach((stream, data) -> { int partitions = data.getSystemStreamPartitionMetadata().size(); streamToEdge.get(stream).setPartitions(partitions); - log.info("Partition count is {} for stream {}", partitions, stream); + log.debug("Partition count is {} for stream {}", partitions, stream); }); } } - private void createStreams(Multimap<String, StreamSpec> streams, Map<String, SystemAdmin> sysAdmins) { - for (Map.Entry<String, Collection<StreamSpec>> entry : streams.asMap().entrySet()) { + /** + * Calculate the partitions for the input streams of join operators + * Package private for testing + */ + static void calculateJoinInputPartitions(StreamGraph streamGraph, ProcessorGraph processorGraph) { + // get join operators with input streams + Multimap<OperatorSpec, StreamEdge> joinToStreamMap = HashMultimap.create(); + Multimap<StreamEdge, OperatorSpec> streamToJoinMap = HashMultimap.create(); + Map<MessageStream, OperatorSpec> outputToJoinMap = new HashMap<>(); + Queue<OperatorSpec> joinQ = new LinkedList<>(); // a queue of joins with known input partitions + Set<OperatorSpec> visited = new HashSet<>(); + streamGraph.getInStreams().entrySet().forEach(entry -> { + StreamEdge streamEdge = processorGraph.getEdge(entry.getKey()); + getJoins(entry.getValue(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited); + }); + // calculate join input partition count + while (!joinQ.isEmpty()) { + OperatorSpec join = joinQ.poll(); + int partitions = -1; + // loop through the input streams to the join and find the partition count + for (StreamEdge edge : joinToStreamMap.get(join)) { + int edgePartitions = edge.getPartitions(); + if (edgePartitions != -1) { + if (partitions == -1) { + //if the partition is not assigned + partitions = edgePartitions; + } else if (partitions != edgePartitions) { + throw new SamzaException(String.format("Unable to resolve input partitions of stream %s for join", + edge.getSystemStream().toString())); + } + } + } + // assign the partition count + for (StreamEdge edge : joinToStreamMap.get(join)) { + if (edge.getPartitions() <= 0) { + edge.setPartitions(partitions); + + // find other joins can be inferred by setting this edge + for (OperatorSpec op : streamToJoinMap.get(edge)) { + if (!visited.contains(op)) { + joinQ.add(op); + visited.add(op); + } + } + } + } + } + } + + /** + * This function + * @param messageStream + * @param streamEdge + * @param joinToStreamMap + * @param streamToJoinMap + * @param outputToJoinMap + * @param joinQ + * @param visited + */ + static void getJoins(MessageStream messageStream, + StreamEdge streamEdge, + Multimap<OperatorSpec, StreamEdge> joinToStreamMap, + Multimap<StreamEdge, OperatorSpec> streamToJoinMap, + Map<MessageStream, OperatorSpec> outputToJoinMap, + Queue<OperatorSpec> joinQ, + Set<OperatorSpec> visited) { + Collection<OperatorSpec> specs = ((MessageStreamImpl) messageStream).getRegisteredOperatorSpecs(); + for (OperatorSpec spec : specs) { + if (spec instanceof PartialJoinOperatorSpec) { + // every join will have two partial join operators + // we will choose one of them in order to consolidate the inputs + // the first one who registered with the outputToJoinMap will win + MessageStream output = spec.getNextStream(); + OperatorSpec joinSpec = outputToJoinMap.get(output); + if (joinSpec == null) { + joinSpec = spec; + outputToJoinMap.put(output, joinSpec); + } + + joinToStreamMap.put(joinSpec, streamEdge); + streamToJoinMap.put(streamEdge, joinSpec); + + if (!visited.contains(joinSpec) && streamEdge.getPartitions() > 0) { + // put the joins with known input partitions into the queue + joinQ.add(joinSpec); + visited.add(joinSpec); + } + } + + if (spec.getNextStream() != null) { + getJoins(spec.getNextStream(), streamEdge, joinToStreamMap, streamToJoinMap, outputToJoinMap, joinQ, visited); + } + } + } + + static void calculateIntStreamPartitions(ProcessorGraph processorGraph, Config config) { + int partitions = config.getInt(JobConfig.JOB_DEFAULT_PARTITIONS(), -1); + if (partitions < 0) { + // use the following simple algo to figure out the partitions + // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions)) + int maxInPartitions = maxPartition(processorGraph.getSources()); + int maxOutPartitions = maxPartition(processorGraph.getSinks()); + partitions = Math.max(maxInPartitions, maxOutPartitions); + } + for (StreamEdge edge : processorGraph.getInternalStreams()) { + if (edge.getPartitions() <= 0) { + edge.setPartitions(partitions); + } + } + } + + private static void createStreams(ProcessorGraph graph, Map<String, SystemAdmin> sysAdmins) { + Multimap<String, StreamSpec> streamsToCreate = HashMultimap.create(); + graph.getInternalStreams().forEach(edge -> { + StreamSpec streamSpec = createStreamSpec(edge); + streamsToCreate.put(edge.getSystemStream().getSystem(), streamSpec); + }); + + for (Map.Entry<String, Collection<StreamSpec>> entry : streamsToCreate.asMap().entrySet()) { String systemName = entry.getKey(); SystemAdmin systemAdmin = sysAdmins.get(systemName); for (StreamSpec stream : entry.getValue()) { - log.info("Creating stream {} on system {}", stream.getPhysicalName(), systemName); + log.info("Creating stream {} with partitions {} on system {}", + new Object[]{stream.getPhysicalName(), stream.getPartitionCount(), systemName}); systemAdmin.createStream(stream); } } http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java index d4ad84b..5ee4d29 100644 --- a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java +++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorGraph.java @@ -37,12 +37,9 @@ import org.slf4j.LoggerFactory; /** - * The ProcessorGraph represents the multi-stage Samza processors of a pipeline on the physical execution layer. - * High level APIs are transformed into ProcessorGraph for future plan, validation and execution. - * - * <p>The ProcessorGraph is a graph of source/sink/intermediate streams and processors are connected together. Each - * ProcessorNode contains the config which is required to run the processor. - * + * The ProcessorGraph is the physical execution graph for a multi-stage Samza application. + * It contains the topology of execution processors connected by source/sink/intermediate streams. + * High level APIs are transformed into ProcessorGraph for planing, validation and execution. */ public class ProcessorGraph { private static final Logger log = LoggerFactory.getLogger(ProcessorGraph.class); @@ -64,8 +61,6 @@ public class ProcessorGraph { edge.addTargetNode(node); node.addInEdge(edge); sources.add(edge); - - log.info(edge.toString()); } void addSink(StreamSpec output, String sourceProcessorId) { @@ -74,8 +69,6 @@ public class ProcessorGraph { edge.addSourceNode(node); node.addOutEdge(edge); sinks.add(edge); - - log.info(edge.toString()); } void addEdge(StreamSpec streamSpec, String sourceProcessorId, String targetProcessorId) { @@ -87,8 +80,6 @@ public class ProcessorGraph { sourceNode.addOutEdge(edge); targetNode.addInEdge(edge); internalStreams.add(edge); - - log.info(edge.toString()); } ProcessorNode getNode(String processorId) { @@ -242,18 +233,22 @@ public class ProcessorGraph { Collection<ProcessorNode> pnodes = nodes.values(); Queue<ProcessorNode> q = new ArrayDeque<>(); Map<String, Long> indegree = new HashMap<>(); + Set<ProcessorNode> visited = new HashSet<>(); pnodes.forEach(node -> { String nid = node.getId(); + //only count the degrees of intermediate streams since sources have degree 0 long degree = node.getInEdges().stream().filter(e -> !sources.contains(e)).count(); indegree.put(nid, degree); if (degree == 0L) { + // start from the nodes that only consume from sources q.add(node); + visited.add(node); } }); List<ProcessorNode> sortedNodes = new ArrayList<>(); - Set<ProcessorNode> visited = new HashSet<>(); + Set<ProcessorNode> reachable = new HashSet<>(); while (sortedNodes.size() < pnodes.size()) { while (!q.isEmpty()) { ProcessorNode node = q.poll(); @@ -264,31 +259,41 @@ public class ProcessorGraph { indegree.put(nid, degree); if (degree == 0L && !visited.contains(n)) { q.add(n); + visited.add(n); } - visited.add(n); + reachable.add(n); }); } if (sortedNodes.size() < pnodes.size()) { // The remaining nodes have circles - // use the following simple approach to break the circles - // start from the node that have been seen - visited.removeAll(sortedNodes); - //find out the nodes with minimal input edge - long min = Long.MAX_VALUE; - ProcessorNode minNode = null; - for (ProcessorNode node : visited) { - Long degree = indegree.get(node.getId()); - if (degree < min) { - min = degree; - minNode = node; + // use the following approach to break the circles + // start from the nodes that are reachable from previous traverse + reachable.removeAll(sortedNodes); + if (!reachable.isEmpty()) { + //find out the nodes with minimal input edge + long min = Long.MAX_VALUE; + ProcessorNode minNode = null; + for (ProcessorNode node : reachable) { + Long degree = indegree.get(node.getId()); + if (degree < min) { + min = degree; + minNode = node; + } } + // start from the node with minimal input edge again + q.add(minNode); + } else { + // all the remaining nodes should be reachable from sources + // start from sources again to find the next node that hasn't been visited + ProcessorNode nextNode = sources.stream().flatMap(source -> source.getTargetNodes().stream()) + .filter(node -> !visited.contains(node)) + .findAny().get(); + q.add(nextNode); } - // start from the node with minimal input edge again - q.add(minNode); } } return sortedNodes; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java index 0b02377..08d94b4 100644 --- a/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java +++ b/samza-core/src/main/java/org/apache/samza/processorgraph/ProcessorNode.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.TaskConfig; import org.apache.samza.util.ConfigInheritence; import org.apache.samza.util.Util; import org.slf4j.Logger; @@ -35,8 +36,9 @@ import org.slf4j.LoggerFactory; /** - * The ProcessorNode represents a Samza processor. - * It contains the input/output, and the config to run the processor. + * A ProcessorNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted + * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution. + * A ProcessorNode contains the input/output, and the configs for physical execution. */ public class ProcessorNode { private static final Logger log = LoggerFactory.getLogger(ProcessorNode.class); @@ -73,31 +75,15 @@ public class ProcessorNode { } public Config generateConfig() { - String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id); - // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline. - return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, generateProcessorConfig(), configPrefix)); - } - - private Config generateProcessorConfig() { Map<String, String> configs = new HashMap<>(); - List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList()); - - // TODO temp logs for debugging - log.info("Processor {} has formatted inputs {}", id, inputs); - - // TODO hack alert: hard coded string literals! - configs.put("task.inputs", Joiner.on(',').join(inputs)); - - // TODO: DISCUSS how does the processor know it's output names? - outEdges.forEach(edge -> { - if (!edge.getName().isEmpty()) { - configs.put(String.format("task.outputs.%s.stream", edge.getName()), edge.getFormattedSystemStream()); - } - }); - configs.put(JobConfig.JOB_NAME(), id); + List<String> inputs = inEdges.stream().map(edge -> edge.getFormattedSystemStream()).collect(Collectors.toList()); + configs.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputs)); log.info("Processor {} has generated configs {}", id, configs); - return new MapConfig(configs); + + String configPrefix = String.format(CONFIG_PROCESSOR_PREFIX, id); + // TODO: Disallow user specifying processor inputs/outputs. This info comes strictly from the pipeline. + return Util.rewriteConfig(ConfigInheritence.extractScopedConfig(config, new MapConfig(configs), configPrefix)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java index 879d705..664a458 100644 --- a/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java +++ b/samza-core/src/main/java/org/apache/samza/processorgraph/StreamEdge.java @@ -19,10 +19,8 @@ package org.apache.samza.processorgraph; -import com.google.common.base.Joiner; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; import org.apache.samza.config.Config; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; @@ -92,17 +90,4 @@ public class StreamEdge { void setName(String name) { this.name = name; } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder("StreamEdge "); - builder.append(getSystemStream().toString()).append(": ("); - List<String> sourceIds = sourceNodes.stream().map(node -> node.getId()).collect(Collectors.toList()); - String sources = Joiner.on(',').join(sourceIds); - builder.append(sources).append(") -> ("); - List<String> targetIds = targetNodes.stream().map(node -> node.getId()).collect(Collectors.toList()); - String targets = Joiner.on(',').join(targetIds); - builder.append(targets).append(")"); - return builder.toString(); - } } http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java index c066bdd..dabe651 100644 --- a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java +++ b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java @@ -20,12 +20,14 @@ package org.apache.samza.system; import java.util.Map; import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; import org.apache.samza.config.StreamConfig; public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment { private final Config config; + private final String streamPrefix; public AbstractExecutionEnvironment(Config config) { if (config == null) { @@ -33,6 +35,7 @@ public abstract class AbstractExecutionEnvironment implements ExecutionEnvironme } this.config = config; + this.streamPrefix = String.format("%s-%s", config.get(JobConfig.JOB_NAME()), config.get(JobConfig.JOB_ID(), "1")); } @Override @@ -40,7 +43,7 @@ public abstract class AbstractExecutionEnvironment implements ExecutionEnvironme StreamConfig streamConfig = new StreamConfig(config); String system = streamConfig.getSystem(streamId); - String physicalName = streamConfig.getPhysicalName(streamId, streamId); + String physicalName = streamConfig.getPhysicalName(streamId, String.format("%s-%s", streamPrefix, streamId)); Map<String, String> properties = streamConfig.getStreamProperties(streamId); return new StreamSpec(streamId, physicalName, system, properties); http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java index ce129aa..3288c5c 100644 --- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java +++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java @@ -46,7 +46,7 @@ public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment { // TODO: actually instantiate the tasks and run the job, i.e. try { // 1. build stream graph - StreamGraph streamGraph = new StreamGraphImpl(); + StreamGraph streamGraph = new StreamGraphImpl(this); app.init(streamGraph, config); // 2. create the physical execution plan http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java index 71d60ef..b88e356 100644 --- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java +++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java @@ -36,7 +36,7 @@ public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment} StreamGraph createGraph(StreamGraphBuilder app, Config config) { - StreamGraphImpl graph = new StreamGraphImpl(); + StreamGraphImpl graph = new StreamGraphImpl(this); app.init(graph, config); return graph; } http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index b007e3c..6032f4d 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -25,6 +25,7 @@ import org.apache.samza.operators.StreamGraphBuilder; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.impl.OperatorGraph; +import org.apache.samza.system.ExecutionEnvironment; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; @@ -77,8 +78,11 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo @Override public final void init(Config config, TaskContext context) throws Exception { + // for now, we need to create the execution env again + // in the future if we decide to serialize the dag, this can be clean up + ExecutionEnvironment executionEnvironment = ExecutionEnvironment.fromConfig(config); // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task - StreamGraphImpl streams = new StreamGraphImpl(); + StreamGraphImpl streams = new StreamGraphImpl(executionEnvironment); this.graphBuilder.init(streams, config); // get the context manager of the {@link StreamGraph} and initialize the task-specific context this.contextManager = streams.getContextManager(); http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java index 2eba59b..e4fb32f 100644 --- a/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java +++ b/samza-core/src/main/java/org/apache/samza/util/ConfigInheritence.java @@ -32,8 +32,8 @@ public class ConfigInheritence { public static Config extractScopedConfig(Config fullConfig, Config generatedConfig, String configPrefix) { Config scopedConfig = fullConfig.subset(configPrefix); - log.info("Prefix '{}' has extracted config {}", configPrefix, scopedConfig); - log.info("Prefix '{}' has generated config {}", configPrefix, generatedConfig); + log.debug("Prefix '{}' has extracted config {}", configPrefix, scopedConfig); + log.debug("Prefix '{}' has generated config {}", configPrefix, generatedConfig); Config[] configPrecedence; if (INHERIT_ROOT_CONFIGS) { @@ -53,7 +53,7 @@ public class ConfigInheritence { } } scopedConfig = new MapConfig(mergedConfig); - log.info("Prefix '{}' has merged config {}", configPrefix, scopedConfig); + log.debug("Prefix '{}' has merged config {}", configPrefix, scopedConfig); return scopedConfig; } http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 1c58293..fbfe90f 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -45,6 +45,7 @@ object JobConfig { val JOB_COORDINATOR_SYSTEM = "job.coordinator.system" val JOB_METADATA_DEFAULT_SYSTEM = "job.metadata.system" val JOB_DEFAULT_SYSTEM = "job.default.system" + val JOB_DEFAULT_PARTITIONS = "job.default.partitions" val JOB_CONTAINER_COUNT = "job.container.count" val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size" val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode" http://git-wip-us.apache.org/repos/asf/samza/blob/a83c69a2/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java new file mode 100644 index 0000000..68a2142 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/processorgraph/TestExecutionPlanner.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.processorgraph; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.StreamGraphBuilder; +import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.functions.JoinFunction; +import org.apache.samza.operators.functions.SinkFunction; +import org.apache.samza.system.AbstractExecutionEnvironment; +import org.apache.samza.system.ExecutionEnvironment; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +public class TestExecutionPlanner { + + private Config config; + + private ExecutionEnvironment env; + + private static final String DEFAULT_SYSTEM = "test-system"; + private static final int DEFAULT_PARTITIONS = 10; + + private StreamSpec input1; + private StreamSpec input2; + private StreamSpec input3; + private StreamSpec output1; + private StreamSpec output2; + + private Map<String, SystemAdmin> systemAdmins; + + private JoinFunction createJoin() { + return new JoinFunction() { + @Override + public Object apply(Object message, Object otherMessage) { + return null; + } + + @Override + public Object getFirstKey(Object message) { + return null; + } + + @Override + public Object getSecondKey(Object message) { + return null; + } + }; + } + + private SinkFunction createSink() { + return new SinkFunction() { + @Override + public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) { + } + }; + } + + private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) { + + return new SystemAdmin() { + @Override + public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { + return null; + } + + @Override + public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { + Map<String, SystemStreamMetadata> map = new HashMap<>(); + for (String stream : streamNames) { + Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> m = new HashMap<>(); + for (int i = 0; i < streamToPartitions.get(stream); i++) { + m.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata("", "", "")); + } + map.put(stream, new SystemStreamMetadata(stream, m)); + } + return map; + } + + @Override + public void createChangelogStream(String streamName, int numOfPartitions) { + + } + + @Override + public void validateChangelogStream(String streamName, int numOfPartitions) { + + } + + @Override + public void createCoordinatorStream(String streamName) { + + } + + @Override + public Integer offsetComparator(String offset1, String offset2) { + return null; + } + }; + } + + private StreamGraph createSimpleGraph() { + /** + * a simple graph of partitionBy and map + * + * input1 -> partitionBy -> map -> output1 + * + */ + StreamGraph streamGraph = new StreamGraphImpl(env); + streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null)); + return streamGraph; + } + + private StreamGraph createStreamGraphWithJoin() { + + /** the graph looks like the following + * + * input1 -> map -> join -> output1 + * | + * input2 -> partitionBy -> filter -| + * | + * input3 -> filter -> partitionBy -> map -> join -> output2 + * + */ + + StreamGraph streamGraph = new StreamGraphImpl(env); + MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m); + MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true); + MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m); + + m1.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output1, null, null)); + m3.join(m2, createJoin()).sendTo(streamGraph.createOutStream(output2, null, null)); + + return streamGraph; + } + + @Before + public void setup() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(JobConfig.JOB_NAME(), "test-app"); + configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), DEFAULT_SYSTEM); + + config = new MapConfig(configMap); + + env = new AbstractExecutionEnvironment(config) { + @Override + public void run(StreamGraphBuilder graphBuilder, Config config) { + } + }; + + input1 = new StreamSpec("input1", "input1", "system1"); + input2 = new StreamSpec("input2", "input2", "system2"); + input3 = new StreamSpec("input3", "input3", "system2"); + + output1 = new StreamSpec("output1", "output1", "system1"); + output2 = new StreamSpec("output2", "output2", "system2"); + + // set up external partition count + Map<String, Integer> system1Map = new HashMap<>(); + system1Map.put("input1", 64); + system1Map.put("output1", 8); + Map<String, Integer> system2Map = new HashMap<>(); + system2Map.put("input2", 16); + system2Map.put("input3", 32); + system2Map.put("output2", 16); + + SystemAdmin systemAdmin1 = createSystemAdmin(system1Map); + SystemAdmin systemAdmin2 = createSystemAdmin(system2Map); + systemAdmins = new HashMap<>(); + systemAdmins.put("system1", systemAdmin1); + systemAdmins.put("system2", systemAdmin2); + } + + @Test + public void testCreateProcessorGraph() { + ExecutionPlanner planner = new ExecutionPlanner(config); + StreamGraph streamGraph = createStreamGraphWithJoin(); + + ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph); + assertTrue(processorGraph.getSources().size() == 3); + assertTrue(processorGraph.getSinks().size() == 2); + assertTrue(processorGraph.getInternalStreams().size() == 2); // two streams generated by partitionBy + } + + @Test + public void testFetchExistingStreamPartitions() { + ExecutionPlanner planner = new ExecutionPlanner(config); + StreamGraph streamGraph = createStreamGraphWithJoin(); + ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph); + + ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins); + assertTrue(processorGraph.getEdge(input1).getPartitions() == 64); + assertTrue(processorGraph.getEdge(input2).getPartitions() == 16); + assertTrue(processorGraph.getEdge(input3).getPartitions() == 32); + assertTrue(processorGraph.getEdge(output1).getPartitions() == 8); + assertTrue(processorGraph.getEdge(output2).getPartitions() == 16); + + processorGraph.getInternalStreams().forEach(edge -> { + assertTrue(edge.getPartitions() == -1); + }); + } + + @Test + public void testCalculateJoinInputPartitions() { + ExecutionPlanner planner = new ExecutionPlanner(config); + StreamGraph streamGraph = createStreamGraphWithJoin(); + ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph); + + ExecutionPlanner.fetchExistingStreamPartitions(processorGraph, systemAdmins); + ExecutionPlanner.calculateJoinInputPartitions(streamGraph, processorGraph); + + // the partitions should be the same as input1 + processorGraph.getInternalStreams().forEach(edge -> { + assertTrue(edge.getPartitions() == 64); + }); + } + + @Test + public void testDefaultPartitions() { + Map<String, String> map = new HashMap<>(config); + map.put(JobConfig.JOB_DEFAULT_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS)); + Config cfg = new MapConfig(map); + + ExecutionPlanner planner = new ExecutionPlanner(cfg); + StreamGraph streamGraph = createSimpleGraph(); + ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph); + planner.calculatePartitions(streamGraph, processorGraph, systemAdmins); + + // the partitions should be the same as input1 + processorGraph.getInternalStreams().forEach(edge -> { + assertTrue(edge.getPartitions() == DEFAULT_PARTITIONS); + }); + } + + @Test + public void testCalculateIntStreamPartitions() { + ExecutionPlanner planner = new ExecutionPlanner(config); + StreamGraph streamGraph = createSimpleGraph(); + ProcessorGraph processorGraph = planner.createProcessorGraph(streamGraph); + planner.calculatePartitions(streamGraph, processorGraph, systemAdmins); + + // the partitions should be the same as input1 + processorGraph.getInternalStreams().forEach(edge -> { + assertTrue(edge.getPartitions() == 64); // max of input1 and output1 + }); + } +}
