[streaming] JobGraphbuilder separated to StreamGraph and StreamingJobGraphGenerator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3b608ce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3b608ce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3b608ce Branch: refs/heads/master Commit: e3b608ce20f2c06b6c9ce8f231b6f53ab73f1de5 Parents: 7dbb55e Author: Gyula Fora <[email protected]> Authored: Wed Jan 21 00:34:11 2015 +0100 Committer: mbalassi <[email protected]> Committed: Wed Jan 21 16:06:34 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/JobGraphBuilder.java | 675 ------------------- .../flink/streaming/api/StreamConfig.java | 28 +- .../apache/flink/streaming/api/StreamGraph.java | 545 +++++++++++++++ .../api/StreamingJobGraphGenerator.java | 286 ++++++++ .../api/datastream/ConnectedDataStream.java | 8 +- .../streaming/api/datastream/DataStream.java | 18 +- .../api/datastream/IterativeDataStream.java | 4 +- .../datastream/SingleOutputStreamOperator.java | 20 +- .../temporaloperator/StreamCrossOperator.java | 4 +- .../temporaloperator/StreamJoinOperator.java | 2 +- .../api/environment/LocalStreamEnvironment.java | 4 +- .../environment/RemoteStreamEnvironment.java | 4 +- .../environment/StreamContextEnvironment.java | 4 +- .../environment/StreamExecutionEnvironment.java | 18 +- .../operator/GroupedReduceInvokable.java | 1 - .../operator/GroupedWindowInvokable.java | 3 +- .../api/invokable/operator/WindowInvokable.java | 3 +- .../api/streamvertex/OutputHandler.java | 64 +- .../streaming/util/TestStreamEnvironment.java | 2 +- .../api/scala/StreamCrossOperator.scala | 4 +- .../api/scala/StreamJoinOperator.scala | 2 +- 21 files changed, 914 insertions(+), 785 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java deleted file mode 100644 index 6ae97c9..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ /dev/null @@ -1,675 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.invokable.StreamInvokable; -import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; -import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; -import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; -import org.apache.flink.streaming.api.streamvertex.CoStreamVertex; -import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; -import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; -import org.apache.flink.streaming.api.streamvertex.StreamVertex; -import org.apache.flink.streaming.partitioner.StreamPartitioner; -import org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy; -import org.apache.flink.streaming.state.OperatorState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Object for building Apache Flink stream processing job graphs - */ -public class JobGraphBuilder { - - private static final Logger LOG = LoggerFactory.getLogger(JobGraphBuilder.class); - private final static String DEAFULT_JOB_NAME = "Streaming Job"; - private JobGraph jobGraph; - - private boolean chaining = true; - - // Graph attributes - private Map<String, AbstractJobVertex> streamVertices; - private Map<String, Integer> vertexParallelism; - private Map<String, Long> bufferTimeout; - private Map<String, List<String>> outEdgeList; - private Map<String, List<Integer>> outEdgeIndex; - private Map<String, List<List<String>>> outEdgeNames; - private Map<String, List<Boolean>> outEdgeSelectAll; - private Map<String, List<String>> inEdgeList; - private Map<String, List<StreamPartitioner<?>>> outPartitioning; - private Map<String, String> operatorNames; - private Map<String, StreamInvokable<?, ?>> invokableObjects; - private Map<String, StreamRecordSerializer<?>> typeSerializersIn1; - private Map<String, StreamRecordSerializer<?>> typeSerializersIn2; - private Map<String, StreamRecordSerializer<?>> typeSerializersOut1; - private Map<String, StreamRecordSerializer<?>> typeSerializersOut2; - private Map<String, byte[]> outputSelectors; - private Map<String, Class<? extends AbstractInvokable>> vertexClasses; - private Map<String, Integer> iterationIds; - private Map<Integer, String> iterationIDtoHeadName; - private Map<Integer, String> iterationIDtoTailName; - private Map<String, Integer> iterationTailCount; - private Map<String, Long> iterationWaitTime; - private Map<String, Map<String, OperatorState<?>>> operatorStates; - private Map<String, InputFormat<String, ?>> inputFormatList; - private Map<String, Map<String, StreamConfig>> chainedConfigs; - private Map<String, StreamConfig> vertexConfigs; - - private Set<String> sources; - private Set<String> builtNodes; - - /** - * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG - * and consists of sources, tasks (intermediate vertices) and sinks. - */ - public JobGraphBuilder() { - - initGraph(); - - if (LOG.isDebugEnabled()) { - LOG.debug("JobGraph created"); - } - } - - public void initGraph() { - streamVertices = new HashMap<String, AbstractJobVertex>(); - vertexParallelism = new HashMap<String, Integer>(); - bufferTimeout = new HashMap<String, Long>(); - outEdgeList = new HashMap<String, List<String>>(); - outEdgeIndex = new HashMap<String, List<Integer>>(); - outEdgeNames = new HashMap<String, List<List<String>>>(); - outEdgeSelectAll = new HashMap<String, List<Boolean>>(); - inEdgeList = new HashMap<String, List<String>>(); - outPartitioning = new HashMap<String, List<StreamPartitioner<?>>>(); - operatorNames = new HashMap<String, String>(); - invokableObjects = new HashMap<String, StreamInvokable<?, ?>>(); - typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>(); - typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>(); - typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>(); - typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>(); - outputSelectors = new HashMap<String, byte[]>(); - vertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>(); - iterationIds = new HashMap<String, Integer>(); - iterationIDtoHeadName = new HashMap<Integer, String>(); - iterationIDtoTailName = new HashMap<Integer, String>(); - iterationTailCount = new HashMap<String, Integer>(); - iterationWaitTime = new HashMap<String, Long>(); - operatorStates = new HashMap<String, Map<String, OperatorState<?>>>(); - inputFormatList = new HashMap<String, InputFormat<String, ?>>(); - chainedConfigs = new HashMap<String, Map<String, StreamConfig>>(); - vertexConfigs = new HashMap<String, StreamConfig>(); - - sources = new HashSet<String>(); - builtNodes = new HashSet<String>(); - } - - /** - * Adds a vertex to the streaming JobGraph with the given parameters - * - * @param vertexName - * Name of the vertex - * @param invokableObject - * User defined operator - * @param inTypeInfo - * Input type for serialization - * @param outTypeInfo - * Output type for serialization - * @param operatorName - * Operator type - * @param parallelism - * Number of parallel instances created - */ - public <IN, OUT> void addStreamVertex(String vertexName, - StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, - TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { - - addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism); - - StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>( - inTypeInfo) : null; - StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>( - outTypeInfo) : null; - - addTypeSerializers(vertexName, inSerializer, null, outSerializer, null); - - if (LOG.isDebugEnabled()) { - LOG.debug("Vertex: {}", vertexName); - } - } - - public <IN, OUT> void addSourceVertex(String vertexName, - StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, - TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { - addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName, - parallelism); - sources.add(vertexName); - } - - /** - * Adds a vertex for the iteration head to the {@link JobGraph}. The - * iterated values will be fed from this vertex back to the graph. - * - * @param vertexName - * Name of the vertex - * @param iterationHead - * Id of the iteration head - * @param iterationID - * ID of iteration for multiple iterations - * @param parallelism - * Number of parallel instances created - * @param waitTime - * Max wait time for next record - */ - public void addIterationHead(String vertexName, String iterationHead, Integer iterationID, - int parallelism, long waitTime) { - - addVertex(vertexName, StreamIterationHead.class, null, null, parallelism); - - chaining = false; - - iterationIds.put(vertexName, iterationID); - iterationIDtoHeadName.put(iterationID, vertexName); - - setSerializersFrom(iterationHead, vertexName); - - setEdge(vertexName, iterationHead, outPartitioning - .get(inEdgeList.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>(), - false); - - iterationWaitTime.put(iterationIDtoHeadName.get(iterationID), waitTime); - - if (LOG.isDebugEnabled()) { - LOG.debug("ITERATION SOURCE: {}", vertexName); - } - - sources.add(vertexName); - } - - /** - * Adds a vertex for the iteration tail to the {@link JobGraph}. The values - * intended to be iterated will be sent to this sink from the iteration - * head. - * - * @param vertexName - * Name of the vertex - * @param iterationTail - * Id of the iteration tail - * @param iterationID - * ID of iteration for mulitple iterations - * @param parallelism - * Number of parallel instances created - * @param waitTime - * Max waiting time for next record - */ - public void addIterationTail(String vertexName, String iterationTail, Integer iterationID, - int parallelism, long waitTime) { - - if (bufferTimeout.get(iterationTail) == 0) { - throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported."); - } - - addVertex(vertexName, StreamIterationTail.class, null, null, parallelism); - - iterationIds.put(vertexName, iterationID); - iterationIDtoTailName.put(iterationID, vertexName); - - setSerializersFrom(iterationTail, vertexName); - iterationWaitTime.put(iterationIDtoTailName.get(iterationID), waitTime); - - if (LOG.isDebugEnabled()) { - LOG.debug("ITERATION SINK: {}", vertexName); - } - - } - - public <IN1, IN2, OUT> void addCoTask(String vertexName, - CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo, - TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, - String operatorName, int parallelism) { - - addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism); - - addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo), - new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>( - outTypeInfo), null); - - if (LOG.isDebugEnabled()) { - LOG.debug("CO-TASK: {}", vertexName); - } - } - - /** - * Sets vertex parameters in the JobGraph - * - * @param vertexName - * Name of the vertex - * @param vertexClass - * The class of the vertex - * @param invokableObjectject - * The user defined invokable object - * @param operatorName - * Type of the user defined operator - * @param parallelism - * Number of parallel instances created - */ - private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass, - StreamInvokable<?, ?> invokableObject, String operatorName, int parallelism) { - - vertexClasses.put(vertexName, vertexClass); - setParallelism(vertexName, parallelism); - invokableObjects.put(vertexName, invokableObject); - operatorNames.put(vertexName, operatorName); - outEdgeList.put(vertexName, new ArrayList<String>()); - outEdgeIndex.put(vertexName, new ArrayList<Integer>()); - outEdgeNames.put(vertexName, new ArrayList<List<String>>()); - outEdgeSelectAll.put(vertexName, new ArrayList<Boolean>()); - inEdgeList.put(vertexName, new ArrayList<String>()); - outPartitioning.put(vertexName, new ArrayList<StreamPartitioner<?>>()); - iterationTailCount.put(vertexName, 0); - } - - private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1, - StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1, - StreamRecordSerializer<?> out2) { - typeSerializersIn1.put(vertexName, in1); - typeSerializersIn2.put(vertexName, in2); - typeSerializersOut1.put(vertexName, out1); - typeSerializersOut2.put(vertexName, out2); - } - - private List<Tuple2<String, String>> createChain(String startNode, String current) { - - if (!builtNodes.contains(startNode)) { - - List<Tuple2<String, String>> transitiveOutEdges = new ArrayList<Tuple2<String, String>>(); - List<String> chainableOutputs = new ArrayList<String>(); - List<String> nonChainableOutputs = new ArrayList<String>(); - - for (String outName : outEdgeList.get(current)) { - if (isChainable(current, outName)) { - chainableOutputs.add(outName); - } else { - nonChainableOutputs.add(outName); - } - - } - - for (String chainable : chainableOutputs) { - transitiveOutEdges.addAll(createChain(startNode, chainable)); - } - - for (String nonChainable : nonChainableOutputs) { - transitiveOutEdges.add(new Tuple2<String, String>(current, nonChainable)); - transitiveOutEdges.addAll(createChain(nonChainable, nonChainable)); - } - - StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode) - : new StreamConfig(new Configuration()); - - setVertexConfig(current, config, chainableOutputs, nonChainableOutputs); - - if (current.equals(startNode)) { - - config.setChainStart(); - config.setRecordWriterOrder(transitiveOutEdges); - - for (Tuple2<String, String> edge : transitiveOutEdges) { - connect(startNode, edge); - } - - vertexConfigs.get(startNode).setTransitiveChainedTaskConfigs( - chainedConfigs.get(startNode)); - - } else { - - Map<String, StreamConfig> chainedConfs = chainedConfigs.get(startNode); - - if (chainedConfs == null) { - chainedConfigs.put(startNode, new HashMap<String, StreamConfig>()); - } - chainedConfigs.get(startNode).put(current, config); - } - - return transitiveOutEdges; - - } else { - return new ArrayList<Tuple2<String, String>>(); - } - } - - private StreamConfig createProcessingVertex(String vertexName) { - - AbstractJobVertex vertex = new AbstractJobVertex(vertexName); - - this.jobGraph.addVertex(vertex); - - int parallelism = vertexParallelism.get(vertexName); - - vertex.setInvokableClass(vertexClasses.get(vertexName)); - vertex.setParallelism(parallelism); - if (LOG.isDebugEnabled()) { - LOG.debug("Parallelism set: {} for {}", parallelism, vertexName); - } - - if (inputFormatList.containsKey(vertexName)) { - vertex.setInputSplitSource(inputFormatList.get(vertexName)); - } - - streamVertices.put(vertexName, vertex); - builtNodes.add(vertexName); - - return new StreamConfig(vertex.getConfiguration()); - } - - private void setVertexConfig(String vertexName, StreamConfig config, - List<String> chainableOutputs, List<String> nonChainableOutputs) { - - StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName); - byte[] outputSelector = outputSelectors.get(vertexName); - Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName); - Map<String, OperatorState<?>> state = operatorStates.get(vertexName); - - config.setVertexName(vertexName); - - config.setBufferTimeout(bufferTimeout.get(vertexName)); - - config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName)); - config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName)); - config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName)); - config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName)); - - config.setUserInvokable(invokableObject); - config.setOutputSelector(outputSelector); - config.setOperatorStates(state); - - config.setNumberOfOutputs(nonChainableOutputs.size()); - config.setOutputs(nonChainableOutputs); - config.setChainedOutputs(chainableOutputs); - - if (vertexClass.equals(StreamIterationHead.class) - || vertexClass.equals(StreamIterationTail.class)) { - config.setIterationId(iterationIds.get(vertexName)); - config.setIterationWaitTime(iterationWaitTime.get(vertexName)); - } - - vertexConfigs.put(vertexName, config); - } - - private boolean isChainable(String vertexName, String outName) { - return inEdgeList.get(outName).size() == 1 - && invokableObjects.get(outName) != null - && outputSelectors.get(vertexName) == null - && invokableObjects.get(outName).getChainingStrategy() == ChainingStrategy.ALWAYS - && (invokableObjects.get(vertexName).getChainingStrategy() == ChainingStrategy.HEAD || invokableObjects - .get(vertexName).getChainingStrategy() == ChainingStrategy.ALWAYS) - && outPartitioning.get(vertexName) - .get(outEdgeList.get(vertexName).indexOf(outName)).getStrategy() == PartitioningStrategy.FORWARD - && vertexParallelism.get(vertexName) == vertexParallelism.get(outName) && chaining; - } - - private <T> void connect(String headOfChain, Tuple2<String, String> edge) { - - String upStreamVertexName = edge.f0; - String downStreamVertexName = edge.f1; - - int outputIndex = outEdgeList.get(upStreamVertexName).indexOf(downStreamVertexName); - - AbstractJobVertex headVertex = streamVertices.get(headOfChain); - AbstractJobVertex downStreamVertex = streamVertices.get(downStreamVertexName); - - StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); - StreamConfig upStreamConfig = new StreamConfig(headVertex.getConfiguration()); - - List<Integer> outEdgeIndexList = outEdgeIndex.get(upStreamVertexName); - int numOfInputs = downStreamConfig.getNumberOfInputs(); - - downStreamConfig.setInputIndex(numOfInputs++, outEdgeIndexList.get(outputIndex)); - downStreamConfig.setNumberOfInputs(numOfInputs); - - StreamPartitioner<?> partitionerObject = outPartitioning.get(upStreamVertexName).get( - outputIndex); - - upStreamConfig.setPartitioner(downStreamVertexName, partitionerObject); - - if (partitionerObject.getStrategy() == PartitioningStrategy.FORWARD) { - downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); - } else { - downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(), - headOfChain, downStreamVertexName); - } - - upStreamConfig.setOutputNames(downStreamVertexName, outEdgeNames.get(upStreamVertexName) - .get(outputIndex)); - upStreamConfig.setSelectAll(downStreamVertexName, outEdgeSelectAll.get(upStreamVertexName) - .get(outputIndex)); - } - - /** - * Sets the number of parallel instances created for the given vertex. - * - * @param vertexName - * Name of the vertex - * @param parallelism - * Number of parallel instances created - */ - public void setParallelism(String vertexName, int parallelism) { - vertexParallelism.put(vertexName, parallelism); - } - - /** - * Sets the input format for the given vertex. - * - * @param vertexName - * Name of the vertex - * @param inputFormat - * input format of the file source associated with the given - * vertex - */ - public void setInputFormat(String vertexName, InputFormat<String, ?> inputFormat) { - inputFormatList.put(vertexName, inputFormat); - } - - public void setBufferTimeout(String vertexName, long bufferTimeout) { - this.bufferTimeout.put(vertexName, bufferTimeout); - } - - public void addOperatorState(String veretxName, String stateName, OperatorState<?> state) { - Map<String, OperatorState<?>> states = operatorStates.get(veretxName); - if (states == null) { - states = new HashMap<String, OperatorState<?>>(); - states.put(stateName, state); - } else { - if (states.containsKey(stateName)) { - throw new RuntimeException("State has already been registered with this name: " - + stateName); - } else { - states.put(stateName, state); - } - } - operatorStates.put(veretxName, states); - } - - /** - * Connects two vertices in the JobGraph using the selected partitioner - * settings - * - * @param upStreamVertexName - * Name of the upstream(output) vertex - * @param downStreamVertexName - * Name of the downstream(input) vertex - * @param partitionerObject - * Partitioner object - * @param typeNumber - * Number of the type (used at co-functions) - * @param outputNames - * User defined names of the out edge - */ - public void setEdge(String upStreamVertexName, String downStreamVertexName, - StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames, - boolean selectAll) { - outEdgeList.get(upStreamVertexName).add(downStreamVertexName); - outEdgeIndex.get(upStreamVertexName).add(typeNumber); - inEdgeList.get(downStreamVertexName).add(upStreamVertexName); - outPartitioning.get(upStreamVertexName).add(partitionerObject); - outEdgeNames.get(upStreamVertexName).add(outputNames); - outEdgeSelectAll.get(upStreamVertexName).add(selectAll); - } - - /** - * Sets the parallelism and buffertimeout of the iteration head of the given - * iteration id to the parallelism given. - * - * @param iterationID - * ID of the iteration - * @param iterationTail - * ID of the iteration tail - */ - public void setIterationSourceSettings(String iterationID, String iterationTail) { - setParallelism(iterationIDtoHeadName.get(iterationID), vertexParallelism.get(iterationTail)); - setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeout.get(iterationTail)); - } - - /** - * Sets a user defined {@link OutputSelector} for the given vertex. Used for - * directed emits. - * - * @param vertexName - * Name of the vertex for which the output selector will be set - * @param serializedOutputSelector - * Byte array representing the serialized output selector. - */ - public <T> void setOutputSelector(String vertexName, byte[] serializedOutputSelector) { - outputSelectors.put(vertexName, serializedOutputSelector); - - if (LOG.isDebugEnabled()) { - LOG.debug("Outputselector set for {}", vertexName); - } - - } - - public <IN, OUT> void setInvokable(String id, StreamInvokable<IN, OUT> invokableObject) { - invokableObjects.put(id, invokableObject); - } - - public StreamInvokable<?, ?> getInvokable(String id) { - return invokableObjects.get(id); - } - - public <OUT> void setOutType(String id, TypeInformation<OUT> outType) { - StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType); - typeSerializersOut1.put(id, serializer); - } - - /** - * Sets TypeSerializerWrapper from one vertex to another, used with some - * sinks. - * - * @param from - * from - * @param to - * to - */ - public void setSerializersFrom(String from, String to) { - operatorNames.put(to, operatorNames.get(from)); - - typeSerializersIn1.put(to, typeSerializersOut1.get(from)); - typeSerializersIn2.put(to, typeSerializersOut2.get(from)); - typeSerializersOut1.put(to, typeSerializersOut1.get(from)); - typeSerializersOut2.put(to, typeSerializersOut2.get(from)); - } - - /** - * Sets slot sharing for the vertices. - */ - private void setSlotSharing() { - SlotSharingGroup shareGroup = new SlotSharingGroup(); - - for (AbstractJobVertex vertex : streamVertices.values()) { - vertex.setSlotSharingGroup(shareGroup); - } - - for (Integer iterID : new HashSet<Integer>(iterationIds.values())) { - CoLocationGroup ccg = new CoLocationGroup(); - AbstractJobVertex tail = streamVertices.get(iterationIDtoTailName.get(iterID)); - AbstractJobVertex head = streamVertices.get(iterationIDtoHeadName.get(iterID)); - - ccg.addVertex(head); - ccg.addVertex(tail); - } - } - - /** - * Gets the assembled {@link JobGraph} and adds a default name for it. - */ - public JobGraph getJobGraph() { - return getJobGraph(DEAFULT_JOB_NAME); - } - - /** - * Gets the assembled {@link JobGraph} and adds a user specified name for - * it. - * - * @param jobGraphName - * name of the jobGraph - */ - public JobGraph getJobGraph(String jobGraphName) { - jobGraph = new JobGraph(jobGraphName); - buildJobGraph(); - return jobGraph; - } - - /** - * Builds the {@link JobGraph} from the vertices with the edges and settings - * provided. - */ - private void buildJobGraph() { - - for (String sourceName : sources) { - createChain(sourceName, sourceName); - } - - setSlotSharing(); - } - - public void setChaining(boolean chaining) { - this.chaining = chaining; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 6fffaa6..213a892 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api; import java.io.Serializable; -import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,7 +62,7 @@ public class StreamConfig implements Serializable { private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2"; private static final String ITERATON_WAIT = "iterationWait"; private static final String OUTPUTS = "outVertexNames"; - private static final String RW_ORDER = "rwOrder"; + private static final String EDGES_IN_ORDER = "rwOrder"; // DEFAULT VALUES @@ -186,10 +186,10 @@ public class StreamConfig implements Serializable { return config.getBoolean(DIRECTED_EMIT, false); } - public void setOutputSelector(byte[] outputSelector) { + public void setOutputSelector(OutputSelector<?> outputSelector) { if (outputSelector != null) { setDirectedEmit(true); - config.setBytes(OUTPUT_SELECTOR, outputSelector); + config.setBytes(OUTPUT_SELECTOR, SerializationUtils.serialize(outputSelector)); } } @@ -293,20 +293,16 @@ public class StreamConfig implements Serializable { } } - public void setRecordWriterOrder(List<Tuple2<String, String>> outEdgeList) { + public void setOutEdgesInOrder(List<Tuple2<String, String>> outEdgeList) { - List<String> outVertices = new ArrayList<String>(); - for (Tuple2<String, String> edge : outEdgeList) { - outVertices.add(edge.f1); - } - - config.setBytes(RW_ORDER, SerializationUtils.serialize((Serializable) outVertices)); + config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList)); } @SuppressWarnings("unchecked") - public List<String> getRecordWriterOrder(ClassLoader cl) { + public List<Tuple2<String, String>> getOutEdgesInOrder(ClassLoader cl) { try { - return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, RW_ORDER, cl); + return (List<Tuple2<String, String>>) InstantiationUtil.readObjectFromConfig( + this.config, EDGES_IN_ORDER, cl); } catch (Exception e) { throw new RuntimeException("Could not instantiate outputs."); } @@ -358,8 +354,10 @@ public class StreamConfig implements Serializable { public Map<String, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) { try { - return (Map<String, StreamConfig>) InstantiationUtil.readObjectFromConfig(this.config, - CHAINED_TASK_CONFIG, cl); + Map<String, StreamConfig> confs = (Map<String, StreamConfig>) InstantiationUtil + .readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl); + + return confs == null ? new HashMap<String, StreamConfig>() : confs; } catch (Exception e) { throw new RuntimeException("Could not instantiate configuration."); } http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java new file mode 100644 index 0000000..3dd1bdb --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.streaming.api.streamvertex.CoStreamVertex; +import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; +import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; +import org.apache.flink.streaming.api.streamvertex.StreamVertex; +import org.apache.flink.streaming.partitioner.StreamPartitioner; +import org.apache.flink.streaming.state.OperatorState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Object for building Apache Flink stream processing graphs + */ +public class StreamGraph { + + private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); + private final static String DEAFULT_JOB_NAME = "Flink Streaming Job"; + + protected boolean chaining = true; + + // Graph attributes + private Map<String, Integer> operatorParallelisms; + private Map<String, Long> bufferTimeouts; + private Map<String, List<String>> outEdgeLists; + private Map<String, List<Integer>> outEdgeTypes; + private Map<String, List<List<String>>> selectedNames; + private Map<String, List<Boolean>> outEdgeSelectAlls; + private Map<String, List<String>> inEdgeLists; + private Map<String, List<StreamPartitioner<?>>> outputPartitioners; + private Map<String, String> operatorNames; + private Map<String, StreamInvokable<?, ?>> invokableObjects; + private Map<String, StreamRecordSerializer<?>> typeSerializersIn1; + private Map<String, StreamRecordSerializer<?>> typeSerializersIn2; + private Map<String, StreamRecordSerializer<?>> typeSerializersOut1; + private Map<String, StreamRecordSerializer<?>> typeSerializersOut2; + private Map<String, OutputSelector<?>> outputSelectors; + private Map<String, Class<? extends AbstractInvokable>> jobVertexClasses; + private Map<String, Integer> iterationIds; + private Map<Integer, String> iterationIDtoHeadName; + private Map<Integer, String> iterationIDtoTailName; + private Map<String, Integer> iterationTailCount; + private Map<String, Long> iterationTimeouts; + private Map<String, Map<String, OperatorState<?>>> operatorStates; + private Map<String, InputFormat<String, ?>> inputFormatLists; + + private Set<String> sources; + + public StreamGraph() { + + initGraph(); + + if (LOG.isDebugEnabled()) { + LOG.debug("StreamGraph created"); + } + } + + public void initGraph() { + operatorParallelisms = new HashMap<String, Integer>(); + bufferTimeouts = new HashMap<String, Long>(); + outEdgeLists = new HashMap<String, List<String>>(); + outEdgeTypes = new HashMap<String, List<Integer>>(); + selectedNames = new HashMap<String, List<List<String>>>(); + outEdgeSelectAlls = new HashMap<String, List<Boolean>>(); + inEdgeLists = new HashMap<String, List<String>>(); + outputPartitioners = new HashMap<String, List<StreamPartitioner<?>>>(); + operatorNames = new HashMap<String, String>(); + invokableObjects = new HashMap<String, StreamInvokable<?, ?>>(); + typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>(); + typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>(); + typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>(); + typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>(); + outputSelectors = new HashMap<String, OutputSelector<?>>(); + jobVertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>(); + iterationIds = new HashMap<String, Integer>(); + iterationIDtoHeadName = new HashMap<Integer, String>(); + iterationIDtoTailName = new HashMap<Integer, String>(); + iterationTailCount = new HashMap<String, Integer>(); + iterationTimeouts = new HashMap<String, Long>(); + operatorStates = new HashMap<String, Map<String, OperatorState<?>>>(); + inputFormatLists = new HashMap<String, InputFormat<String, ?>>(); + sources = new HashSet<String>(); + } + + /** + * Adds a vertex to the streaming graph with the given parameters + * + * @param vertexName + * Name of the vertex + * @param invokableObject + * User defined operator + * @param inTypeInfo + * Input type for serialization + * @param outTypeInfo + * Output type for serialization + * @param operatorName + * Operator type + * @param parallelism + * Number of parallel instances created + */ + public <IN, OUT> void addStreamVertex(String vertexName, + StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, + TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { + + addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism); + + StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>( + inTypeInfo) : null; + StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>( + outTypeInfo) : null; + + addTypeSerializers(vertexName, inSerializer, null, outSerializer, null); + + if (LOG.isDebugEnabled()) { + LOG.debug("Vertex: {}", vertexName); + } + } + + public <IN, OUT> void addSourceVertex(String vertexName, + StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, + TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { + addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName, + parallelism); + sources.add(vertexName); + } + + /** + * Adds a vertex for the iteration head to the {@link JobGraph}. The + * iterated values will be fed from this vertex back to the graph. + * + * @param vertexName + * Name of the vertex + * @param iterationHead + * Id of the iteration head + * @param iterationID + * ID of iteration for multiple iterations + * @param parallelism + * Number of parallel instances created + * @param waitTime + * Max wait time for next record + */ + public void addIterationHead(String vertexName, String iterationHead, Integer iterationID, + int parallelism, long waitTime) { + + addVertex(vertexName, StreamIterationHead.class, null, null, parallelism); + + chaining = false; + + iterationIds.put(vertexName, iterationID); + iterationIDtoHeadName.put(iterationID, vertexName); + + setSerializersFrom(iterationHead, vertexName); + + setEdge(vertexName, iterationHead, + outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0, + new ArrayList<String>(), false); + + iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), waitTime); + + if (LOG.isDebugEnabled()) { + LOG.debug("ITERATION SOURCE: {}", vertexName); + } + + sources.add(vertexName); + } + + /** + * Adds a vertex for the iteration tail to the {@link JobGraph}. The values + * intended to be iterated will be sent to this sink from the iteration + * head. + * + * @param vertexName + * Name of the vertex + * @param iterationTail + * Id of the iteration tail + * @param iterationID + * ID of iteration for mulitple iterations + * @param parallelism + * Number of parallel instances created + * @param waitTime + * Max waiting time for next record + */ + public void addIterationTail(String vertexName, String iterationTail, Integer iterationID, + int parallelism, long waitTime) { + + if (bufferTimeouts.get(iterationTail) == 0) { + throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported."); + } + + addVertex(vertexName, StreamIterationTail.class, null, null, parallelism); + + iterationIds.put(vertexName, iterationID); + iterationIDtoTailName.put(iterationID, vertexName); + + setSerializersFrom(iterationTail, vertexName); + iterationTimeouts.put(iterationIDtoTailName.get(iterationID), waitTime); + + if (LOG.isDebugEnabled()) { + LOG.debug("ITERATION SINK: {}", vertexName); + } + + } + + public <IN1, IN2, OUT> void addCoTask(String vertexName, + CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo, + TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, + String operatorName, int parallelism) { + + addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism); + + addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo), + new StreamRecordSerializer<IN2>(in2TypeInfo), new StreamRecordSerializer<OUT>( + outTypeInfo), null); + + if (LOG.isDebugEnabled()) { + LOG.debug("CO-TASK: {}", vertexName); + } + } + + /** + * Sets vertex parameters in the JobGraph + * + * @param vertexName + * Name of the vertex + * @param vertexClass + * The class of the vertex + * @param invokableObjectject + * The user defined invokable object + * @param operatorName + * Type of the user defined operator + * @param parallelism + * Number of parallel instances created + */ + private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass, + StreamInvokable<?, ?> invokableObject, String operatorName, int parallelism) { + + jobVertexClasses.put(vertexName, vertexClass); + setParallelism(vertexName, parallelism); + invokableObjects.put(vertexName, invokableObject); + operatorNames.put(vertexName, operatorName); + outEdgeLists.put(vertexName, new ArrayList<String>()); + outEdgeTypes.put(vertexName, new ArrayList<Integer>()); + selectedNames.put(vertexName, new ArrayList<List<String>>()); + outEdgeSelectAlls.put(vertexName, new ArrayList<Boolean>()); + inEdgeLists.put(vertexName, new ArrayList<String>()); + outputPartitioners.put(vertexName, new ArrayList<StreamPartitioner<?>>()); + iterationTailCount.put(vertexName, 0); + } + + /** + * Connects two vertices in the JobGraph using the selected partitioner + * settings + * + * @param upStreamVertexName + * Name of the upstream(output) vertex + * @param downStreamVertexName + * Name of the downstream(input) vertex + * @param partitionerObject + * Partitioner object + * @param typeNumber + * Number of the type (used at co-functions) + * @param outputNames + * User defined names of the out edge + */ + public void setEdge(String upStreamVertexName, String downStreamVertexName, + StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames, + boolean selectAll) { + outEdgeLists.get(upStreamVertexName).add(downStreamVertexName); + outEdgeTypes.get(upStreamVertexName).add(typeNumber); + inEdgeLists.get(downStreamVertexName).add(upStreamVertexName); + outputPartitioners.get(upStreamVertexName).add(partitionerObject); + selectedNames.get(upStreamVertexName).add(outputNames); + outEdgeSelectAlls.get(upStreamVertexName).add(selectAll); + } + + private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1, + StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1, + StreamRecordSerializer<?> out2) { + typeSerializersIn1.put(vertexName, in1); + typeSerializersIn2.put(vertexName, in2); + typeSerializersOut1.put(vertexName, out1); + typeSerializersOut2.put(vertexName, out2); + } + + /** + * Sets the number of parallel instances created for the given vertex. + * + * @param vertexName + * Name of the vertex + * @param parallelism + * Number of parallel instances created + */ + public void setParallelism(String vertexName, int parallelism) { + operatorParallelisms.put(vertexName, parallelism); + } + + public int getParallelism(String vertexName) { + return operatorParallelisms.get(vertexName); + } + + /** + * Sets the input format for the given vertex. + * + * @param vertexName + * Name of the vertex + * @param inputFormat + * input format of the file source associated with the given + * vertex + */ + public void setInputFormat(String vertexName, InputFormat<String, ?> inputFormat) { + inputFormatLists.put(vertexName, inputFormat); + } + + public void setBufferTimeout(String vertexName, long bufferTimeout) { + this.bufferTimeouts.put(vertexName, bufferTimeout); + } + + public long getBufferTimeout(String vertexName) { + return this.bufferTimeouts.get(vertexName); + } + + public void addOperatorState(String veretxName, String stateName, OperatorState<?> state) { + Map<String, OperatorState<?>> states = operatorStates.get(veretxName); + if (states == null) { + states = new HashMap<String, OperatorState<?>>(); + states.put(stateName, state); + } else { + if (states.containsKey(stateName)) { + throw new RuntimeException("State has already been registered with this name: " + + stateName); + } else { + states.put(stateName, state); + } + } + operatorStates.put(veretxName, states); + } + + /** + * Sets the parallelism and buffertimeout of the iteration head of the given + * iteration id to the parallelism given. + * + * @param iterationID + * ID of the iteration + * @param iterationTail + * ID of the iteration tail + */ + public void setIterationSourceSettings(String iterationID, String iterationTail) { + setParallelism(iterationIDtoHeadName.get(iterationID), + operatorParallelisms.get(iterationTail)); + setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeouts.get(iterationTail)); + } + + /** + * Sets a user defined {@link OutputSelector} for the given operator. Used + * for directed emits. + * + * @param vertexName + * Name of the vertex for which the output selector will be set + * @param outputSelector + * The outputselector object + */ + public void setOutputSelector(String vertexName, OutputSelector<?> outputSelector) { + outputSelectors.put(vertexName, outputSelector); + + if (LOG.isDebugEnabled()) { + LOG.debug("Outputselector set for {}", vertexName); + } + + } + + public <IN, OUT> void setInvokable(String vertexName, StreamInvokable<IN, OUT> invokableObject) { + invokableObjects.put(vertexName, invokableObject); + } + + public <OUT> void setOutType(String id, TypeInformation<OUT> outType) { + StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType); + typeSerializersOut1.put(id, serializer); + } + + public StreamInvokable<?, ?> getInvokable(String vertexName) { + return invokableObjects.get(vertexName); + } + + @SuppressWarnings("unchecked") + public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(String vertexName) { + return (StreamRecordSerializer<OUT>) typeSerializersOut1.get(vertexName); + } + + @SuppressWarnings("unchecked") + public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(String vertexName) { + return (StreamRecordSerializer<OUT>) typeSerializersOut2.get(vertexName); + } + + @SuppressWarnings("unchecked") + public <IN> StreamRecordSerializer<IN> getInSerializer1(String vertexName) { + return (StreamRecordSerializer<IN>) typeSerializersIn1.get(vertexName); + } + + @SuppressWarnings("unchecked") + public <IN> StreamRecordSerializer<IN> getInSerializer2(String vertexName) { + return (StreamRecordSerializer<IN>) typeSerializersIn2.get(vertexName); + } + + /** + * Sets TypeSerializerWrapper from one vertex to another, used with some + * sinks. + * + * @param from + * from + * @param to + * to + */ + public void setSerializersFrom(String from, String to) { + operatorNames.put(to, operatorNames.get(from)); + + typeSerializersIn1.put(to, typeSerializersOut1.get(from)); + typeSerializersIn2.put(to, typeSerializersOut2.get(from)); + typeSerializersOut1.put(to, typeSerializersOut1.get(from)); + typeSerializersOut2.put(to, typeSerializersOut2.get(from)); + } + + /** + * Gets the assembled {@link JobGraph} and adds a default name for it. + */ + public JobGraph getJobGraph() { + return getJobGraph(DEAFULT_JOB_NAME); + } + + /** + * Gets the assembled {@link JobGraph} and adds a user specified name for + * it. + * + * @param jobGraphName + * name of the jobGraph + */ + public JobGraph getJobGraph(String jobGraphName) { + + StreamingJobGraphGenerator optimizer = new StreamingJobGraphGenerator(this); + + return optimizer.createJobGraph(jobGraphName); + } + + public void setChaining(boolean chaining) { + this.chaining = chaining; + } + + public Collection<String> getSources() { + return sources; + } + + public List<String> getOutEdges(String vertexName) { + return outEdgeLists.get(vertexName); + } + + public List<String> getInEdges(String vertexName) { + return inEdgeLists.get(vertexName); + } + + public List<Integer> getOutEdgeTypes(String vertexName) { + + return outEdgeTypes.get(vertexName); + } + + public StreamPartitioner<?> getOutPartitioner(String vertexName, int outputIndex) { + return outputPartitioners.get(vertexName).get(outputIndex); + } + + public List<String> getSelectedNames(String vertexName, int outputIndex) { + return selectedNames.get(vertexName).get(outputIndex); + } + + public Boolean isSelectAll(String vertexName, int outputIndex) { + return outEdgeSelectAlls.get(vertexName).get(outputIndex); + } + + public Collection<Integer> getIterationIDs() { + return new HashSet<Integer>(iterationIds.values()); + } + + public String getIterationTail(int iterID) { + return iterationIDtoTailName.get(iterID); + } + + public String getIterationHead(int iterID) { + return iterationIDtoHeadName.get(iterID); + } + + public Class<? extends AbstractInvokable> getJobVertexClass(String vertexName) { + return jobVertexClasses.get(vertexName); + } + + public InputFormat<String, ?> getInputFormat(String vertexName) { + return inputFormatLists.get(vertexName); + } + + public OutputSelector<?> getOutputSelector(String vertexName) { + return outputSelectors.get(vertexName); + } + + public Map<String, OperatorState<?>> getState(String vertexName) { + return operatorStates.get(vertexName); + } + + public Integer getIterationID(String vertexName) { + return iterationIds.get(vertexName); + } + + public long getIterationTimeout(String vertexName) { + return iterationTimeouts.get(vertexName); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java new file mode 100644 index 0000000..6c0cc20 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; +import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; +import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; +import org.apache.flink.streaming.partitioner.StreamPartitioner; +import org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StreamingJobGraphGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class); + + private StreamGraph streamGraph; + + private Map<String, AbstractJobVertex> streamVertices; + private JobGraph jobGraph; + private Collection<String> builtNodes; + + private Map<String, Map<String, StreamConfig>> chainedConfigs; + private Map<String, StreamConfig> vertexConfigs; + private Map<String, String> chainedNames; + + public StreamingJobGraphGenerator(StreamGraph streamGraph) { + this.streamGraph = streamGraph; + } + + private void init() { + this.streamVertices = new HashMap<String, AbstractJobVertex>(); + this.builtNodes = new HashSet<String>(); + this.chainedConfigs = new HashMap<String, Map<String, StreamConfig>>(); + this.vertexConfigs = new HashMap<String, StreamConfig>(); + this.chainedNames = new HashMap<String, String>(); + } + + public JobGraph createJobGraph(String jobName) { + jobGraph = new JobGraph(jobName); + init(); + + for (String sourceName : streamGraph.getSources()) { + createChain(sourceName, sourceName); + } + + setSlotSharing(); + + return jobGraph; + } + + private List<Tuple2<String, String>> createChain(String startNode, String current) { + + if (!builtNodes.contains(startNode)) { + + List<Tuple2<String, String>> transitiveOutEdges = new ArrayList<Tuple2<String, String>>(); + List<String> chainableOutputs = new ArrayList<String>(); + List<String> nonChainableOutputs = new ArrayList<String>(); + + for (String outName : streamGraph.getOutEdges(current)) { + if (isChainable(current, outName)) { + chainableOutputs.add(outName); + } else { + nonChainableOutputs.add(outName); + } + } + + for (String chainable : chainableOutputs) { + transitiveOutEdges.addAll(createChain(startNode, chainable)); + } + + for (String nonChainable : nonChainableOutputs) { + transitiveOutEdges.add(new Tuple2<String, String>(current, nonChainable)); + createChain(nonChainable, nonChainable); + } + + chainedNames.put(current, createChainedName(current, chainableOutputs)); + + StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode) + : new StreamConfig(new Configuration()); + + setVertexConfig(current, config, chainableOutputs, nonChainableOutputs); + + if (current.equals(startNode)) { + + config.setChainStart(); + config.setOutEdgesInOrder(transitiveOutEdges); + + for (Tuple2<String, String> edge : transitiveOutEdges) { + connect(startNode, edge); + } + + config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode)); + + } else { + + Map<String, StreamConfig> chainedConfs = chainedConfigs.get(startNode); + + if (chainedConfs == null) { + chainedConfigs.put(startNode, new HashMap<String, StreamConfig>()); + } + chainedConfigs.get(startNode).put(current, config); + } + + return transitiveOutEdges; + + } else { + return new ArrayList<Tuple2<String, String>>(); + } + } + + private String createChainedName(String vertexName, List<String> chainedOutputs) { + if (chainedOutputs.size() > 1) { + List<String> outputChainedNames = new ArrayList<String>(); + for (String chainable : chainedOutputs) { + outputChainedNames.add(chainedNames.get(chainable)); + } + + return vertexName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")"; + } else if (chainedOutputs.size() == 1) { + return vertexName + " -> " + chainedNames.get(chainedOutputs.get(0)); + } else { + return vertexName; + } + + } + + private StreamConfig createProcessingVertex(String vertexName) { + + AbstractJobVertex vertex = new AbstractJobVertex(chainedNames.get(vertexName)); + + vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName)); + vertex.setParallelism(streamGraph.getParallelism(vertexName)); + if (LOG.isDebugEnabled()) { + LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexName), + vertexName); + } + + if (streamGraph.getInputFormat(vertexName) != null) { + vertex.setInputSplitSource(streamGraph.getInputFormat(vertexName)); + } + + streamVertices.put(vertexName, vertex); + builtNodes.add(vertexName); + jobGraph.addVertex(vertex); + + return new StreamConfig(vertex.getConfiguration()); + } + + private void setVertexConfig(String vertexName, StreamConfig config, + List<String> chainableOutputs, List<String> nonChainableOutputs) { + + config.setVertexName(vertexName); + config.setBufferTimeout(streamGraph.getBufferTimeout(vertexName)); + + config.setTypeSerializerIn1(streamGraph.getInSerializer1(vertexName)); + config.setTypeSerializerIn2(streamGraph.getInSerializer2(vertexName)); + config.setTypeSerializerOut1(streamGraph.getOutSerializer1(vertexName)); + config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName)); + + config.setUserInvokable(streamGraph.getInvokable(vertexName)); + config.setOutputSelector(streamGraph.getOutputSelector(vertexName)); + config.setOperatorStates(streamGraph.getState(vertexName)); + + config.setNumberOfOutputs(nonChainableOutputs.size()); + config.setOutputs(nonChainableOutputs); + config.setChainedOutputs(chainableOutputs); + + Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexName); + + if (vertexClass.equals(StreamIterationHead.class) + || vertexClass.equals(StreamIterationTail.class)) { + config.setIterationId(streamGraph.getIterationID(vertexName)); + config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexName)); + } + + vertexConfigs.put(vertexName, config); + } + + private <T> void connect(String headOfChain, Tuple2<String, String> edge) { + + String upStreamVertexName = edge.f0; + String downStreamVertexName = edge.f1; + + int outputIndex = streamGraph.getOutEdges(upStreamVertexName).indexOf(downStreamVertexName); + + AbstractJobVertex headVertex = streamVertices.get(headOfChain); + AbstractJobVertex downStreamVertex = streamVertices.get(downStreamVertexName); + + StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); + StreamConfig upStreamConfig = new StreamConfig(headVertex.getConfiguration()); + + List<Integer> outEdgeIndexList = streamGraph.getOutEdgeTypes(upStreamVertexName); + int numOfInputs = downStreamConfig.getNumberOfInputs(); + + downStreamConfig.setInputIndex(numOfInputs++, outEdgeIndexList.get(outputIndex)); + downStreamConfig.setNumberOfInputs(numOfInputs); + + StreamPartitioner<?> partitioner = streamGraph.getOutPartitioner(upStreamVertexName, + outputIndex); + + upStreamConfig.setPartitioner(downStreamVertexName, partitioner); + + if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) { + downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); + } else { + downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(), + headOfChain, downStreamVertexName); + } + + upStreamConfig.setOutputNames(downStreamVertexName, + streamGraph.getSelectedNames(upStreamVertexName, outputIndex)); + upStreamConfig.setSelectAll(downStreamVertexName, + streamGraph.isSelectAll(upStreamVertexName, outputIndex)); + } + + private boolean isChainable(String vertexName, String outName) { + + StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexName); + StreamInvokable<?, ?> outInvokable = streamGraph.getInvokable(outName); + + return streamGraph.getInEdges(outName).size() == 1 + && outInvokable != null + && streamGraph.getOutputSelector(vertexName) == null + && outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS + && (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable + .getChainingStrategy() == ChainingStrategy.ALWAYS) + && streamGraph.getOutPartitioner(vertexName, + streamGraph.getOutEdges(vertexName).indexOf(outName)).getStrategy() == PartitioningStrategy.FORWARD + && streamGraph.getParallelism(vertexName) == streamGraph.getParallelism(outName) + && streamGraph.chaining; + } + + private void setSlotSharing() { + SlotSharingGroup shareGroup = new SlotSharingGroup(); + + for (AbstractJobVertex vertex : streamVertices.values()) { + vertex.setSlotSharingGroup(shareGroup); + } + + for (Integer iterID : streamGraph.getIterationIDs()) { + CoLocationGroup ccg = new CoLocationGroup(); + AbstractJobVertex tail = streamVertices.get(streamGraph.getIterationTail(iterID)); + AbstractJobVertex head = streamVertices.get(streamGraph.getIterationHead(iterID)); + + ccg.addVertex(head); + ccg.addVertex(tail); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index 80ea970..db8649b 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.JobGraphBuilder; +import org.apache.flink.streaming.api.StreamGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.co.CoFlatMapFunction; import org.apache.flink.streaming.api.function.co.CoMapFunction; @@ -52,7 +52,7 @@ import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; public class ConnectedDataStream<IN1, IN2> { protected StreamExecutionEnvironment environment; - protected JobGraphBuilder jobGraphBuilder; + protected StreamGraph jobGraphBuilder; protected DataStream<IN1> dataStream1; protected DataStream<IN2> dataStream2; @@ -61,7 +61,7 @@ public class ConnectedDataStream<IN1, IN2> { protected KeySelector<IN2, ?> keySelector2; protected ConnectedDataStream(DataStream<IN1> input1, DataStream<IN2> input2) { - this.jobGraphBuilder = input1.jobGraphBuilder; + this.jobGraphBuilder = input1.streamGraph; this.environment = input1.environment; this.dataStream1 = input1.copy(); this.dataStream2 = input2.copy(); @@ -402,7 +402,7 @@ public class ConnectedDataStream<IN1, IN2> { SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator( environment, functionName, outTypeInfo, functionInvokable); - dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, + dataStream1.streamGraph.addCoTask(returnStream.getId(), functionInvokable, getInputType1(), getInputType2(), outTypeInfo, functionName, environment.getDegreeOfParallelism()); http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index f68ab68..8e7d823 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -43,7 +43,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.JobGraphBuilder; +import org.apache.flink.streaming.api.StreamGraph; import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator; import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -102,7 +102,7 @@ public class DataStream<OUT> { protected TypeInformation typeInfo; protected List<DataStream<OUT>> mergedStreams; - protected final JobGraphBuilder jobGraphBuilder; + protected final StreamGraph streamGraph; /** * Create a new {@link DataStream} in the given execution environment with @@ -125,7 +125,7 @@ public class DataStream<OUT> { this.id = operatorType + "-" + counter.toString(); this.environment = environment; this.degreeOfParallelism = environment.getDegreeOfParallelism(); - this.jobGraphBuilder = environment.getJobGraphBuilder(); + this.streamGraph = environment.getStreamGraph(); this.userDefinedNames = new ArrayList<String>(); this.selectAll = true; this.partitioner = new DistributePartitioner<OUT>(true); @@ -147,7 +147,7 @@ public class DataStream<OUT> { this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames); this.selectAll = dataStream.selectAll; this.partitioner = dataStream.partitioner; - this.jobGraphBuilder = dataStream.jobGraphBuilder; + this.streamGraph = dataStream.streamGraph; this.typeInfo = dataStream.typeInfo; this.mergedStreams = new ArrayList<DataStream<OUT>>(); this.mergedStreams.add(this); @@ -189,7 +189,7 @@ public class DataStream<OUT> { @SuppressWarnings("unchecked") public <R> DataStream<R> setType(TypeInformation<R> outType) { - jobGraphBuilder.setOutType(id, outType); + streamGraph.setOutType(id, outType); typeInfo = outType; return (DataStream<R>) this; } @@ -1085,7 +1085,7 @@ public class DataStream<OUT> { DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null, null, true); - jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID, + streamGraph.addIterationHead(returnStream.getId(), this.getId(), iterationID, degreeOfParallelism, waitTime); return this.copy(); @@ -1112,7 +1112,7 @@ public class DataStream<OUT> { SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, operatorName, outTypeInfo, invokable); - jobGraphBuilder.addStreamVertex(returnStream.getId(), invokable, getType(), outTypeInfo, + streamGraph.addStreamVertex(returnStream.getId(), invokable, getType(), outTypeInfo, operatorName, degreeOfParallelism); connectGraph(inputStream, returnStream.getId(), 0); @@ -1157,7 +1157,7 @@ public class DataStream<OUT> { */ protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) { for (DataStream<X> stream : inputStream.mergedStreams) { - jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber, + streamGraph.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber, inputStream.userDefinedNames, inputStream.selectAll); } @@ -1179,7 +1179,7 @@ public class DataStream<OUT> { DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(), sinkInvokable); - jobGraphBuilder.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, + streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, "sink", degreeOfParallelism); this.connectGraph(this.copy(), returnStream.getId(), 0); http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java index c4cd1e1..6f66b2c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -64,10 +64,10 @@ public class IterativeDataStream<IN> extends DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "iterationSink", null, null); - jobGraphBuilder.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID, + streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID, iterationTail.getParallelism(), waitTime); - jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), iterationTail.getId()); + streamGraph.setIterationSourceSettings(iterationID.toString(), iterationTail.getId()); connectGraph(iterationTail.forward(), iterationSink.getId(), 0); return iterationTail; } http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index b8ca42c..dbfbc48 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -20,8 +20,6 @@ package org.apache.flink.streaming.api.datastream; import java.util.Map; import java.util.Map.Entry; -import org.apache.commons.lang3.SerializationException; -import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.collector.OutputSelector; @@ -65,7 +63,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato @SuppressWarnings("unchecked") public <R> SingleOutputStreamOperator<R, ?> setType(TypeInformation<R> outType) { - jobGraphBuilder.setOutType(id, outType); + streamGraph.setOutType(id, outType); typeInfo = outType; return (SingleOutputStreamOperator<R, ?>) this; } @@ -84,7 +82,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato } this.degreeOfParallelism = dop; - jobGraphBuilder.setParallelism(id, degreeOfParallelism); + streamGraph.setParallelism(id, degreeOfParallelism); return this; } @@ -98,7 +96,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato * @return The operator with buffer timeout set. */ public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) { - jobGraphBuilder.setBufferTimeout(id, timeoutMillis); + streamGraph.setBufferTimeout(id, timeoutMillis); return this; } @@ -115,13 +113,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) { if (!isSplit) { this.isSplit = true; - try { - jobGraphBuilder.setOutputSelector(id, - SerializationUtils.serialize(clean(outputSelector))); - - } catch (SerializationException e) { - throw new RuntimeException("Cannot serialize OutputSelector"); - } + streamGraph.setOutputSelector(id, clean(outputSelector)); return new SplitDataStream<OUT>(this); } else { @@ -144,7 +136,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato * @return The data stream with state registered. */ protected SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) { - jobGraphBuilder.addOperatorState(getId(), name, state); + streamGraph.addOperatorState(getId(), name, state); return this; } @@ -161,7 +153,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato */ protected SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) { for (Entry<String, OperatorState<?>> entry : states.entrySet()) { - jobGraphBuilder.addOperatorState(getId(), entry.getKey(), entry.getValue()); + streamGraph.addOperatorState(getId(), entry.getKey(), entry.getValue()); } return this; http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java index 8422400..03160c2 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java @@ -67,7 +67,7 @@ public class StreamCrossOperator<I1, I2> extends @SuppressWarnings("unchecked") public CrossWindow<I1, I2> every(long length) { - ((CoWindowInvokable<I1, I2, ?>) jobGraphBuilder.getInvokable(id)).setSlideSize(length); + ((CoWindowInvokable<I1, I2, ?>) streamGraph.getInvokable(id)).setSlideSize(length); return this; } @@ -90,7 +90,7 @@ public class StreamCrossOperator<I1, I2> extends new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2); - jobGraphBuilder.setInvokable(id, invokable); + streamGraph.setInvokable(id, invokable); return setType(outTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java index 626b9f1..d2b2032 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java @@ -245,7 +245,7 @@ public class StreamJoinOperator<I1, I2> extends getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize, predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2); - jobGraphBuilder.setInvokable(id, invokable); + streamGraph.setInvokable(id, invokable); return setType(outType); } http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index 3e44012..4824fca 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -29,7 +29,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public void execute() throws Exception { - ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism()); + ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getDegreeOfParallelism()); } /** @@ -41,7 +41,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { */ @Override public void execute(String jobName) throws Exception { - ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(jobName), + ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), getDegreeOfParallelism()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index d833c8e..2eb05ad 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -81,14 +81,14 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { @Override public void execute() { - JobGraph jobGraph = jobGraphBuilder.getJobGraph(); + JobGraph jobGraph = streamGraph.getJobGraph(); executeRemotely(jobGraph); } @Override public void execute(String jobName) { - JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName); + JobGraph jobGraph = streamGraph.getJobGraph(jobName); executeRemotely(jobGraph); } http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index c157435..a9a5bd3 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -54,9 +54,9 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { JobGraph jobGraph; if (jobName == null) { - jobGraph = this.jobGraphBuilder.getJobGraph(); + jobGraph = this.streamGraph.getJobGraph(); } else { - jobGraph = this.jobGraphBuilder.getJobGraph(jobName); + jobGraph = this.streamGraph.getJobGraph(jobName); } for (File file : jars) { http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 4194864..51dc0ae 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -33,7 +33,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.ContextEnvironment; import org.apache.flink.core.fs.Path; -import org.apache.flink.streaming.api.JobGraphBuilder; +import org.apache.flink.streaming.api.StreamGraph; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.function.source.FileSourceFunction; @@ -61,7 +61,7 @@ public abstract class StreamExecutionEnvironment { private ExecutionConfig config = new ExecutionConfig(); - protected JobGraphBuilder jobGraphBuilder; + protected StreamGraph streamGraph; // -------------------------------------------------------------------------------------------- // Constructor and Properties @@ -71,7 +71,7 @@ public abstract class StreamExecutionEnvironment { * Constructor for creating StreamExecutionEnvironment */ protected StreamExecutionEnvironment() { - jobGraphBuilder = new JobGraphBuilder(); + streamGraph = new StreamGraph(); } /** @@ -352,7 +352,7 @@ public abstract class StreamExecutionEnvironment { TypeInformation<String> typeInfo) { FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); DataStreamSource<String> returnStream = addSource(function, null, "fileSource"); - jobGraphBuilder.setInputFormat(returnStream.getId(), inputFormat); + streamGraph.setInputFormat(returnStream.getId(), inputFormat); return returnStream; } @@ -437,7 +437,7 @@ public abstract class StreamExecutionEnvironment { DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, sourceName, outTypeInfo, sourceInvokable, isParallel); - jobGraphBuilder.addSourceVertex(returnStream.getId(), sourceInvokable, null, outTypeInfo, + streamGraph.addSourceVertex(returnStream.getId(), sourceInvokable, null, outTypeInfo, sourceName, dop); return returnStream; @@ -584,12 +584,12 @@ public abstract class StreamExecutionEnvironment { public abstract void execute(String jobName) throws Exception; /** - * Getter of the {@link JobGraphBuilder} of the streaming job. + * Getter of the {@link StreamGraph} of the streaming job. * - * @return jobGraphBuilder + * @return The streamgraph representing the transformations */ - public JobGraphBuilder getJobGraphBuilder() { - return jobGraphBuilder; + public StreamGraph getStreamGraph() { + return streamGraph; } } http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java index 01c0545..c2177fa 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java @@ -34,7 +34,6 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> { super(reducer); this.keySelector = keySelector; values = new HashMap<Object, IN>(); - setChainingStrategy(ChainingStrategy.NEVER); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java index bbd7b0c..a46fa96 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java @@ -67,7 +67,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; * method. Fake elements created on prenotification will be forwarded to all * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that * it forwards/distributed calls all groups. - * + * * @param <IN> * The type of input elements handled by this operator invokable. */ @@ -154,7 +154,6 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) { super(userFunction); - setChainingStrategy(ChainingStrategy.NEVER); this.keySelector = keySelector;
