[FLINK-1345] [streaming] Basic operator chaining added
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26535c48 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26535c48 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26535c48 Branch: refs/heads/master Commit: 26535c487753975556aafc19c2ce4f9c24cc677b Parents: d9b942b Author: Gyula Fora <[email protected]> Authored: Thu Jan 15 22:32:13 2015 +0100 Committer: mbalassi <[email protected]> Committed: Wed Jan 21 16:06:33 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/JobGraphBuilder.java | 222 +++++++++++++------ .../flink/streaming/api/StreamConfig.java | 56 +++-- .../api/collector/CollectorWrapper.java | 2 +- .../api/datastream/IterativeDataStream.java | 2 +- .../datastream/SingleOutputStreamOperator.java | 5 + .../environment/StreamExecutionEnvironment.java | 2 +- .../api/invokable/ChainableInvokable.java | 38 ++++ .../streaming/api/invokable/SinkInvokable.java | 10 +- .../api/invokable/StreamInvokable.java | 23 +- .../invokable/operator/CounterInvokable.java | 10 +- .../api/invokable/operator/FilterInvokable.java | 18 +- .../invokable/operator/FlatMapInvokable.java | 12 +- .../operator/GroupedReduceInvokable.java | 7 +- .../api/invokable/operator/MapInvokable.java | 12 +- .../operator/StreamReduceInvokable.java | 15 +- .../operator/co/CoWindowInvokable.java | 2 +- .../api/streamvertex/OutputHandler.java | 86 +++++-- .../api/streamvertex/StreamVertex.java | 24 +- .../examples/iteration/IterateExample.java | 3 +- 19 files changed, 406 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index 649072f..35e145e 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -22,7 +22,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; @@ -32,8 +34,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.function.source.SourceFunction; -import org.apache.flink.streaming.api.invokable.SourceInvokable; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; @@ -56,6 +57,8 @@ public class JobGraphBuilder { private final static String DEAFULT_JOB_NAME = "Streaming Job"; private JobGraph jobGraph; + private boolean chaining = true; + // Graph attributes private Map<String, AbstractJobVertex> streamVertices; private Map<String, Integer> vertexParallelism; @@ -65,7 +68,7 @@ public class JobGraphBuilder { private Map<String, List<List<String>>> outEdgeNames; private Map<String, List<Boolean>> outEdgeSelectAll; private Map<String, List<String>> inEdgeList; - private Map<String, List<StreamPartitioner<?>>> connectionTypes; + private Map<String, List<StreamPartitioner<?>>> outPartitioning; private Map<String, String> operatorNames; private Map<String, StreamInvokable<?, ?>> invokableObjects; private Map<String, StreamRecordSerializer<?>> typeSerializersIn1; @@ -81,6 +84,12 @@ public class JobGraphBuilder { private Map<String, Long> iterationWaitTime; private Map<String, Map<String, OperatorState<?>>> operatorStates; private Map<String, InputFormat<String, ?>> inputFormatList; + private Map<String, List<String>> chainedVertices; + private Map<String, List<ChainableInvokable<?, ?>>> chainedInvokable; + private Map<String, List<StreamRecordSerializer<?>>> chainedSerializer; + + private Set<String> sources; + private Set<String> builtVertices; /** * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG @@ -96,7 +105,7 @@ public class JobGraphBuilder { outEdgeNames = new HashMap<String, List<List<String>>>(); outEdgeSelectAll = new HashMap<String, List<Boolean>>(); inEdgeList = new HashMap<String, List<String>>(); - connectionTypes = new HashMap<String, List<StreamPartitioner<?>>>(); + outPartitioning = new HashMap<String, List<StreamPartitioner<?>>>(); operatorNames = new HashMap<String, String>(); invokableObjects = new HashMap<String, StreamInvokable<?, ?>>(); typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>(); @@ -112,6 +121,12 @@ public class JobGraphBuilder { iterationWaitTime = new HashMap<String, Long>(); operatorStates = new HashMap<String, Map<String, OperatorState<?>>>(); inputFormatList = new HashMap<String, InputFormat<String, ?>>(); + chainedInvokable = new HashMap<String, List<ChainableInvokable<?, ?>>>(); + chainedSerializer = new HashMap<String, List<StreamRecordSerializer<?>>>(); + chainedVertices = new HashMap<String, List<String>>(); + + sources = new HashSet<String>(); + builtVertices = new HashSet<String>(); if (LOG.isDebugEnabled()) { LOG.debug("JobGraph created"); @@ -152,32 +167,12 @@ public class JobGraphBuilder { } } - /** - * Adds a source vertex to the streaming JobGraph with the given parameters - * - * @param vertexName - * Name of the vertex - * @param function - * User defined function - * @param inTypeInfo - * Input type for serialization - * @param outTypeInfo - * Output type for serialization - * @param operatorName - * Operator type - * @param parallelism - * Number of parallel instances created - */ - public <IN, OUT> void addSourceVertex(String vertexName, SourceFunction<OUT> function, - TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName, - byte[] serializedFunction, int parallelism) { - - @SuppressWarnings("unchecked") - StreamInvokable<IN, OUT> invokableObject = (StreamInvokable<IN, OUT>) new SourceInvokable<OUT>( - function); - + public <IN, OUT> void addSourceVertex(String vertexName, + StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, + TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName, parallelism); + sources.add(vertexName); } /** @@ -205,7 +200,7 @@ public class JobGraphBuilder { setBytesFrom(iterationHead, vertexName); - setEdge(vertexName, iterationHead, connectionTypes + setEdge(vertexName, iterationHead, outPartitioning .get(inEdgeList.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>(), false); @@ -214,6 +209,8 @@ public class JobGraphBuilder { if (LOG.isDebugEnabled()) { LOG.debug("ITERATION SOURCE: {}", vertexName); } + + sources.add(vertexName); } /** @@ -295,7 +292,7 @@ public class JobGraphBuilder { outEdgeNames.put(vertexName, new ArrayList<List<String>>()); outEdgeSelectAll.put(vertexName, new ArrayList<Boolean>()); inEdgeList.put(vertexName, new ArrayList<String>()); - connectionTypes.put(vertexName, new ArrayList<StreamPartitioner<?>>()); + outPartitioning.put(vertexName, new ArrayList<StreamPartitioner<?>>()); iterationTailCount.put(vertexName, 0); } @@ -317,50 +314,134 @@ public class JobGraphBuilder { */ private void createVertex(String vertexName) { - // Get vertex attributes - Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName); - StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName); - int parallelism = vertexParallelism.get(vertexName); - byte[] outputSelector = outputSelectors.get(vertexName); - Map<String, OperatorState<?>> state = operatorStates.get(vertexName); + if (!builtVertices.contains(vertexName)) { + if (!outEdgeList.get(vertexName).isEmpty()) { - // Create vertex object - AbstractJobVertex vertex = new AbstractJobVertex(vertexName); + for (String outName : outEdgeList.get(vertexName)) { + if (isChainable(vertexName, outName)) { + chainRecursively(vertexName, vertexName, outName); + } else { + createVertex(outName); + } - this.jobGraph.addVertex(vertex); + } + } + + // Get vertex attributes + Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName); + StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName); + int parallelism = vertexParallelism.get(vertexName); + byte[] outputSelector = outputSelectors.get(vertexName); + Map<String, OperatorState<?>> state = operatorStates.get(vertexName); + + // Create vertex object + String cname = chainedVertices.get(vertexName) == null ? "" : " => " + + StringUtils.join(chainedVertices.get(vertexName), " => "); + AbstractJobVertex vertex = new AbstractJobVertex(vertexName + cname); + + this.jobGraph.addVertex(vertex); + + vertex.setInvokableClass(vertexClass); + vertex.setParallelism(parallelism); + if (LOG.isDebugEnabled()) { + LOG.debug("Parallelism set: {} for {}", parallelism, vertexName); + } + + StreamConfig config = new StreamConfig(vertex.getConfiguration()); + + config.setBufferTimeout(bufferTimeout.get(vertexName)); + + config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName)); + config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName)); + config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName)); + config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName)); + + // Set vertex config + config.setUserInvokable(invokableObject); + config.setOutputSelector(outputSelector); + config.setOperatorStates(state); + + if (vertexClass.equals(StreamIterationHead.class) + || vertexClass.equals(StreamIterationTail.class)) { + config.setIterationId(iterationIds.get(vertexName)); + config.setIterationWaitTime(iterationWaitTime.get(vertexName)); + } + + if (inputFormatList.containsKey(vertexName)) { + vertex.setInputSplitSource(inputFormatList.get(vertexName)); + } + + List<ChainableInvokable<?, ?>> chainedInvokables = chainedInvokable.get(vertexName); + List<StreamRecordSerializer<?>> chainedSerializers = chainedSerializer.get(vertexName); + + int numChained = chainedInvokables == null ? 0 : chainedInvokables.size(); + config.setNumberofChainedTasks(numChained); + + for (int i = 0; i < numChained; i++) { + config.setChainedInvokable(chainedInvokables.get(i), i); + config.setChainedSerializer(chainedSerializers.get(i), i); + } + + streamVertices.put(vertexName, vertex); + builtVertices.add(vertexName); - vertex.setInvokableClass(vertexClass); - vertex.setParallelism(parallelism); - if (LOG.isDebugEnabled()) { - LOG.debug("Parallelism set: {} for {}", parallelism, vertexName); } - StreamConfig config = new StreamConfig(vertex.getConfiguration()); + } - config.setBufferTimeout(bufferTimeout.get(vertexName)); + private void chainRecursively(String chainStart, String current, String next) { + chainTasks(chainStart, next); + // Add multiple chaining here + for (String output : outEdgeList.get(next)) { + if (isChainable(next, output)) { + chainRecursively(chainStart, next, output); + } else { + createVertex(output); + } + } + } - config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName)); - config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName)); - config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName)); - config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName)); + private boolean isChainable(String vertexName, String outName) { + return outEdgeList.get(vertexName).size() == 1 + && inEdgeList.get(outName).size() == 1 + && outputSelectors.get(vertexName) == null + && invokableObjects.get(outName).isChainable() + && outPartitioning.get(vertexName).get(0).getStrategy() == PartitioningStrategy.FORWARD + && vertexParallelism.get(vertexName) == vertexParallelism.get(outName) && chaining; + } - // Set vertex config - config.setUserInvokable(invokableObject); - config.setVertexName(vertexName); - config.setOutputSelector(outputSelector); - config.setOperatorStates(state); + private void chainTasks(String first, String second) { + List<ChainableInvokable<?, ?>> chainedInvokables = chainedInvokable.get(first); + if (chainedInvokables == null) { + chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>(); + } + chainedInvokables.add((ChainableInvokable<?, ?>) invokableObjects.get(second)); + chainedInvokable.put(first, chainedInvokables); - if (vertexClass.equals(StreamIterationHead.class) - || vertexClass.equals(StreamIterationTail.class)) { - config.setIterationId(iterationIds.get(vertexName)); - config.setIterationWaitTime(iterationWaitTime.get(vertexName)); + List<StreamRecordSerializer<?>> chainedSerializers = chainedSerializer.get(first); + if (chainedSerializers == null) { + chainedSerializers = new ArrayList<StreamRecordSerializer<?>>(); } + chainedSerializers.add(typeSerializersIn1.get(second)); + chainedSerializer.put(first, chainedSerializers); - if (inputFormatList.containsKey(vertexName)) { - vertex.setInputSplitSource(inputFormatList.get(vertexName)); + List<String> chained = chainedVertices.get(first); + if (chained == null) { + chained = new ArrayList<String>(); } + chained.add(second); + chainedVertices.put(first, chained); + + outEdgeList.put(first, outEdgeList.get(second)); + typeSerializersOut1.put(first, typeSerializersOut1.get(second)); + outPartitioning.put(first, outPartitioning.get(second)); + outEdgeType.put(first, outEdgeType.get(second)); + outEdgeNames.put(first, outEdgeNames.get(second)); + outEdgeSelectAll.put(first, outEdgeSelectAll.get(second)); + outPartitioning.put(first, outPartitioning.get(second)); + bufferTimeout.put(first, bufferTimeout.get(second)); + outputSelectors.put(first, outputSelectors.get(second)); - streamVertices.put(vertexName, vertex); } /** @@ -385,8 +466,8 @@ public class JobGraphBuilder { downStreamVertex .connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE); } else { - downStreamVertex - .connectNewDataSetAsInput(upStreamVertex, DistributionPattern.ALL_TO_ALL); + downStreamVertex.connectNewDataSetAsInput(upStreamVertex, + DistributionPattern.ALL_TO_ALL); } if (LOG.isDebugEnabled()) { @@ -468,7 +549,7 @@ public class JobGraphBuilder { outEdgeList.get(upStreamVertexName).add(downStreamVertexName); outEdgeType.get(upStreamVertexName).add(typeNumber); inEdgeList.get(downStreamVertexName).add(upStreamVertexName); - connectionTypes.get(upStreamVertexName).add(partitionerObject); + outPartitioning.get(upStreamVertexName).add(partitionerObject); outEdgeNames.get(upStreamVertexName).add(outputNames); outEdgeSelectAll.get(upStreamVertexName).add(selectAll); } @@ -602,11 +683,12 @@ public class JobGraphBuilder { * provided. */ private void buildJobGraph() { - for (String vertexName : outEdgeList.keySet()) { - createVertex(vertexName); + + for (String sourceName : sources) { + createVertex(sourceName); } - for (String upStreamVertexName : outEdgeList.keySet()) { + for (String upStreamVertexName : builtVertices) { int i = 0; List<Integer> outEdgeTypeList = outEdgeType.get(upStreamVertexName); @@ -621,7 +703,7 @@ public class JobGraphBuilder { downStreamVertexConfig.setNumberOfInputs(inputNumber); connect(upStreamVertexName, downStreamVertexName, - connectionTypes.get(upStreamVertexName).get(i)); + outPartitioning.get(upStreamVertexName).get(i)); i++; } } @@ -631,4 +713,8 @@ public class JobGraphBuilder { setNumberOfJobOutputs(); } + public void setChaining(boolean chaining) { + this.chaining = chaining; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 8837b85..22bcf92 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.api.streamvertex.StreamVertexException; @@ -39,6 +40,9 @@ public class StreamConfig { private static final String INPUT_TYPE = "inputType_"; private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs"; private static final String NUMBER_OF_INPUTS = "numberOfInputs"; + private static final String NUMBER_OF_CHAINED_TASKS = "numOfChained"; + private static final String CHAINED_IN_SERIALIZER = "chainedSerializer_"; + private static final String CHAINED_INVOKABLE = "chainedInvokable_"; private static final String OUTPUT_NAME = "outputName_"; private static final String OUTPUT_SELECT_ALL = "outputSelectAll_"; private static final String PARTITIONER_OBJECT = "partitionerObject_"; @@ -46,8 +50,6 @@ public class StreamConfig { private static final String ITERATION_ID = "iteration-id"; private static final String OUTPUT_SELECTOR = "outputSelector"; private static final String DIRECTED_EMIT = "directedEmit"; - private static final String FUNCTION_NAME = "operatorName"; - private static final String VERTEX_NAME = "vertexName"; private static final String SERIALIZEDUDF = "serializedudf"; private static final String USER_FUNCTION = "userfunction"; private static final String BUFFER_TIMEOUT = "bufferTimeout"; @@ -164,18 +166,6 @@ public class StreamConfig { } } - public void setVertexName(String vertexName) { - config.setString(VERTEX_NAME, vertexName); - } - - public String getVertexName() { - return config.getString(VERTEX_NAME, null); - } - - public String getFunctionName() { - return config.getString(FUNCTION_NAME, ""); - } - public void setDirectedEmit(boolean directedEmit) { config.setBoolean(DIRECTED_EMIT, directedEmit); } @@ -261,7 +251,7 @@ public class StreamConfig { } @SuppressWarnings("unchecked") - public List<String> getOutputName(int outputIndex) { + public List<String> getOutputNames(int outputIndex) { return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + outputIndex, null)); } @@ -316,4 +306,40 @@ public class StreamConfig { } } + public int getNumberofChainedTasks() { + return config.getInteger(NUMBER_OF_CHAINED_TASKS, 0); + } + + public void setNumberofChainedTasks(int n) { + config.setInteger(NUMBER_OF_CHAINED_TASKS, n); + } + + public ChainableInvokable<?, ?> getChainedInvokable(int chainedTaskIndex, ClassLoader cl) { + try { + return (ChainableInvokable<?, ?>) InstantiationUtil.readObjectFromConfig(this.config, + CHAINED_INVOKABLE + chainedTaskIndex, cl); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate invokable."); + } + } + + public StreamRecordSerializer<?> getChainedInSerializer(int chainedTaskIndex, ClassLoader cl) { + try { + return (StreamRecordSerializer<?>) InstantiationUtil.readObjectFromConfig(this.config, + CHAINED_IN_SERIALIZER + chainedTaskIndex, cl); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate serializer."); + } + } + + public void setChainedSerializer(StreamRecordSerializer<?> typeWrapper, int chainedTaskIndex) { + config.setBytes(CHAINED_IN_SERIALIZER + chainedTaskIndex, + SerializationUtils.serialize(typeWrapper)); + } + + public void setChainedInvokable(ChainableInvokable<?, ?> invokable, int chainedTaskIndex) { + config.setBytes(CHAINED_INVOKABLE + chainedTaskIndex, + SerializationUtils.serialize(invokable)); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java index d782d08..b7e57e0 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java @@ -37,7 +37,7 @@ public class CollectorWrapper<OUT> implements Collector<OUT> { @Override public void collect(OUT record) { for(Collector<OUT> output: outputs){ - output.collect(record);; + output.collect(record); } } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java index d7467d1..88feb5d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -67,7 +67,7 @@ public class IterativeDataStream<IN> extends iterationTail.getParallelism(), waitTime); jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), iterationTail.getId()); - connectGraph(iterationTail, iterationSink.getId(), 0); + connectGraph(iterationTail.forward(), iterationSink.getId(), 0); return iterationTail; } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 5a8261e..7a0abe4 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -182,6 +182,11 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato return (SingleOutputStreamOperator<OUT, O>) super.distribute(); } + @SuppressWarnings("unchecked") + public SingleOutputStreamOperator<OUT, O> global() { + return (SingleOutputStreamOperator<OUT, O>) super.global(); + } + @Override public SingleOutputStreamOperator<OUT, O> copy() { return new SingleOutputStreamOperator<OUT, O>(this); http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 87f2287..08fd19b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -434,7 +434,7 @@ public abstract class StreamExecutionEnvironment { DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, sourceName, outTypeInfo, isParallel); - jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function), + jobGraphBuilder.addSourceVertex(returnStream.getId(), new SourceInvokable<OUT>(function), null, outTypeInfo, sourceName, dop); return returnStream; http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java new file mode 100644 index 0000000..373b4e8 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; +import org.apache.flink.util.Collector; + +public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OUT> implements + Collector<IN> { + + private static final long serialVersionUID = 1L; + + public ChainableInvokable(Function userFunction) { + super(userFunction); + } + + public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> inSerializer) { + this.collector = collector; + this.inSerializer = inSerializer; + this.objectSerializer = inSerializer.getObjectSerializer(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java index 74591a8..13a6ba1 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.invokable; import org.apache.flink.streaming.api.function.sink.SinkFunction; -public class SinkInvokable<IN> extends StreamInvokable<IN, IN> { +public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> { private static final long serialVersionUID = 1L; private SinkFunction<IN> sinkFunction; @@ -38,7 +38,13 @@ public class SinkInvokable<IN> extends StreamInvokable<IN, IN> { @Override protected void callUserFunction() throws Exception { - sinkFunction.invoke((IN) nextRecord.getObject()); + sinkFunction.invoke(nextObject); + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index 87ad4e0..25e9221 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; /** * The StreamInvokable represents the base class for all invokables in the * streaming topology. - * + * * @param <OUT> * The output type of the invokable */ @@ -52,6 +52,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { protected StreamRecordSerializer<IN> inSerializer; protected TypeSerializer<IN> objectSerializer; protected StreamRecord<IN> nextRecord; + protected IN nextObject; protected boolean isMutable; protected Collector<OUT> collector; @@ -92,7 +93,13 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { protected StreamRecord<IN> readNext() { this.nextRecord = inSerializer.createInstance(); try { - return nextRecord = recordIterator.next(nextRecord); + nextRecord = recordIterator.next(nextRecord); + try { + nextObject = nextRecord.getObject(); + } catch (NullPointerException e) { + // end of stream + } + return nextRecord; } catch (IOException e) { throw new RuntimeException("Could not read next record."); } @@ -135,10 +142,14 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { * RichFunction class * */ - public void close() throws Exception { + public void close() { isRunning = false; collector.close(); - FunctionUtils.closeFunction(userFunction); + try { + FunctionUtils.closeFunction(userFunction); + } catch (Exception e) { + throw new RuntimeException("Error when closing the function: " + e.getMessage()); + } } public void setRuntimeContext(RuntimeContext t) { @@ -148,4 +159,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { protected IN copy(IN record) { return objectSerializer.copy(record); } + + public boolean isChainable() { + return this instanceof ChainableInvokable; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java index 0267253..3fc314c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java @@ -17,9 +17,9 @@ package org.apache.flink.streaming.api.invokable.operator; -import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; -public class CounterInvokable<IN> extends StreamInvokable<IN, Long> { +public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> { private static final long serialVersionUID = 1L; Long count = 0L; @@ -34,4 +34,10 @@ public class CounterInvokable<IN> extends StreamInvokable<IN, Long> { collector.collect(++count); } } + + @Override + public void collect(IN record) { + collector.collect(++count); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java index 48b8ad0..0c8298e 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.invokable.operator; import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; -public class FilterInvokable<IN> extends StreamInvokable<IN, IN> { +public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> { private static final long serialVersionUID = 1L; @@ -36,14 +36,20 @@ public class FilterInvokable<IN> extends StreamInvokable<IN, IN> { public void invoke() throws Exception { while (readNext() != null) { callUserFunctionAndLogException(); - if (collect) { - collector.collect(nextRecord.getObject()); - } } } @Override protected void callUserFunction() throws Exception { - collect = filterFunction.filter(copy(nextRecord.getObject())); + collect = filterFunction.filter(copy(nextObject)); + if (collect) { + collector.collect(nextObject); + } + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java index 8ff78eb..2a4081b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.invokable.operator; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; -public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { +public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> { private static final long serialVersionUID = 1L; private FlatMapFunction<IN, OUT> flatMapper; @@ -39,7 +39,13 @@ public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { @Override protected void callUserFunction() throws Exception { - flatMapper.flatMap(nextRecord.getObject(), collector); + flatMapper.flatMap(nextObject, collector); + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java index fdcf520..84258d6 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java @@ -40,7 +40,7 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> { protected void reduce() throws Exception { Object key = nextRecord.getKey(keySelector); currentValue = values.get(key); - nextValue = nextRecord.getObject(); + nextValue = nextObject; if (currentValue != null) { callUserFunctionAndLogException(); values.put(key, reduced); @@ -56,4 +56,9 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> { reduced = reducer.reduce(currentValue, nextValue); } + @Override + public boolean isChainable() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java index 6be96ec..7c8e577 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.invokable.operator; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; -public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { +public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> { private static final long serialVersionUID = 1L; private MapFunction<IN, OUT> mapper; @@ -39,6 +39,12 @@ public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { @Override protected void callUserFunction() throws Exception { - collector.collect(mapper.map(nextRecord.getObject())); + collector.collect(mapper.map(nextObject)); + } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java index 5f5cb12..e1a56cc 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.invokable.operator; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; -public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> { +public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> { private static final long serialVersionUID = 1L; protected ReduceFunction<IN> reducer; @@ -41,13 +41,15 @@ public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> { } protected void reduce() throws Exception { - nextValue = nextRecord.getObject(); callUserFunctionAndLogException(); } @Override protected void callUserFunction() throws Exception { + + nextValue = nextObject; + if (currentValue != null) { currentValue = reducer.reduce(currentValue, nextValue); } else { @@ -57,4 +59,11 @@ public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> { collector.collect(currentValue); } + + @Override + public void collect(IN record) { + nextObject = copy(record); + callUserFunctionAndLogException(); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java index 03219b7..93f597f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java @@ -175,7 +175,7 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> } @Override - public void close() throws Exception { + public void close() { if (!window.miniBatchEnd()) { callUserFunctionAndLogException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 61a6eb4..457f3f8 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -17,26 +17,30 @@ package org.apache.flink.streaming.api.streamvertex; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; +import org.apache.flink.streaming.api.collector.CollectorWrapper; import org.apache.flink.streaming.api.collector.DirectedOutputWrapper; import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.collector.StreamOutputWrapper; import org.apache.flink.streaming.api.collector.StreamOutput; +import org.apache.flink.streaming.api.collector.StreamOutputWrapper; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.io.StreamRecordWriter; import org.apache.flink.streaming.partitioner.StreamPartitioner; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; - public class OutputHandler<OUT> { private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class); @@ -44,17 +48,21 @@ public class OutputHandler<OUT> { private StreamConfig configuration; private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs; - private StreamOutputWrapper<OUT> collector; - private long bufferTimeout; + private Collector<OUT> endCollector; TypeInformation<OUT> outTypeInfo = null; StreamRecordSerializer<OUT> outSerializer = null; SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null; + public List<ChainableInvokable<?, ?>> chainedInvokables; + + private int numberOfChainedTasks; + public OutputHandler(StreamVertex<?, OUT> streamComponent) { this.streamVertex = streamComponent; this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); this.configuration = new StreamConfig(streamComponent.getTaskConfiguration()); + this.chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>(); try { setConfigOutputs(); @@ -62,6 +70,7 @@ public class OutputHandler<OUT> { throw new StreamVertexException("Cannot register outputs for " + streamComponent.getClass().getSimpleName(), e); } + } public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() { @@ -69,18 +78,42 @@ public class OutputHandler<OUT> { } private void setConfigOutputs() { - setSerializers(); - setCollector(); + numberOfChainedTasks = configuration.getNumberofChainedTasks(); + endCollector = createChainedOutputs(0); + } - int numberOfOutputs = configuration.getNumberOfOutputs(); - bufferTimeout = configuration.getBufferTimeout(); + @SuppressWarnings("unchecked") + private Collector<OUT> createChainedOutputs(int chainedTaskIndex) { - for (int i = 0; i < numberOfOutputs; i++) { - setPartitioner(i, outputs); + if (numberOfChainedTasks == chainedTaskIndex) { + return createEndCollector(); + } else { + CollectorWrapper<OUT> chainedCollector = new CollectorWrapper<OUT>(); + + @SuppressWarnings("rawtypes") + ChainableInvokable chainableInvokable = configuration.getChainedInvokable( + chainedTaskIndex, streamVertex.getUserCodeClassLoader()); + + chainableInvokable.setup( + createChainedOutputs(chainedTaskIndex + 1), + configuration.getChainedInSerializer(chainedTaskIndex, + streamVertex.getUserCodeClassLoader())); + + chainedInvokables.add(chainableInvokable); + + chainedCollector.addChainedOutput((Collector<OUT>) chainableInvokable); + + return chainedCollector; } + } - private StreamOutputWrapper<OUT> setCollector() { + private Collector<OUT> createEndCollector() { + + setSerializers(); + + StreamOutputWrapper<OUT> collector; + if (streamVertex.configuration.getDirectedEmit()) { OutputSelector<OUT> outputSelector = streamVertex.configuration .getOutputSelector(streamVertex.userClassLoader); @@ -91,11 +124,17 @@ public class OutputHandler<OUT> { collector = new StreamOutputWrapper<OUT>(streamVertex.getInstanceID(), outSerializationDelegate); } + + int numberOfOutputs = configuration.getNumberOfOutputs(); + for (int i = 0; i < numberOfOutputs; i++) { + collector = (StreamOutputWrapper<OUT>) setPartitioner(i, collector); + } + return collector; } - public StreamOutputWrapper<OUT> getCollector() { - return collector; + public Collector<OUT> getCollector() { + return endCollector; } void setSerializers() { @@ -106,8 +145,7 @@ public class OutputHandler<OUT> { } } - void setPartitioner(int outputNumber, - List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs) { + Collector<OUT> setPartitioner(int outputNumber, StreamOutputWrapper<OUT> endCollector) { StreamPartitioner<OUT> outputPartitioner = null; try { @@ -121,6 +159,8 @@ public class OutputHandler<OUT> { RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output; + long bufferTimeout = configuration.getBufferTimeout(); + if (bufferTimeout >= 0) { output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex .getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout); @@ -139,18 +179,20 @@ public class OutputHandler<OUT> { } outputs.add(output); - List<String> outputName = configuration.getOutputName(outputNumber); + List<String> outputNames = configuration.getOutputNames(outputNumber); boolean isSelectAllOutput = configuration.getSelectAll(outputNumber); - if (collector != null) { - collector - .addOutput(new StreamOutput<OUT>(output, isSelectAllOutput ? null : outputName)); + if (endCollector != null) { + endCollector.addOutput(new StreamOutput<OUT>(output, isSelectAllOutput ? null + : outputNames)); } if (LOG.isTraceEnabled()) { LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass() .getSimpleName(), outputNumber, streamVertex.getClass().getSimpleName()); } + + return endCollector; } public void flushOutputs() throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java index 93e290f..994b1fa 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.api.StreamConfig; +import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.io.CoReaderIterator; @@ -35,11 +36,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected StreamConfig configuration; protected int instanceID; - protected String name; private static int numVertices = 0; - protected String functionName; - private InputHandler<IN> inputHandler; protected OutputHandler<OUT> outputHandler; private StreamInvokable<IN, OUT> userInvokable; @@ -70,17 +68,26 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa protected void initialize() { this.userClassLoader = getUserCodeClassLoader(); this.configuration = new StreamConfig(getTaskConfiguration()); - this.name = configuration.getVertexName(); - this.functionName = configuration.getFunctionName(); this.states = configuration.getOperatorStates(userClassLoader); - this.context = createRuntimeContext(name, this.states); + this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states); } protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception { userInvokable.setRuntimeContext(context); userInvokable.open(getTaskConfiguration()); + + for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) { + invokable.setRuntimeContext(context); + invokable.open(getTaskConfiguration()); + } + userInvokable.invoke(); userInvokable.close(); + + for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) { + invokable.close(); + } + } public void setInputsOutputs() { @@ -94,14 +101,15 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa } public String getName() { - return name; + return getEnvironment().getTaskName(); } public int getInstanceID() { return instanceID; } - public StreamingRuntimeContext createRuntimeContext(String taskName, Map<String, OperatorState<?>> states) { + public StreamingRuntimeContext createRuntimeContext(String taskName, + Map<String, OperatorState<?>> states) { Environment env = getEnvironment(); return new StreamingRuntimeContext(taskName, env, getUserCodeClassLoader(), states); } http://git-wip-us.apache.org/repos/asf/flink/blob/26535c48/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java index a5dc68a..998e818 100644 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java @@ -76,8 +76,7 @@ public class IterateExample { // apply the step function to add new random value to the tuple and to // increment the counter and split the output with the output selector - SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle() - .split(new MySelector()); + SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).split(new MySelector()); // close the iteration by selecting the tuples that were directed to the // 'iterate' channel in the output selector
