[FLINK-1345] [streaming] Chaining refactor + ChainingStrategy exposed through the API for operators
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e30c6f7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e30c6f7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e30c6f7 Branch: refs/heads/master Commit: 3e30c6f73a12e2f50449a4d0dce452031f9a7317 Parents: 26535c4 Author: Gyula Fora <[email protected]> Authored: Fri Jan 16 15:11:38 2015 +0100 Committer: mbalassi <[email protected]> Committed: Wed Jan 21 16:06:34 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/JobGraphBuilder.java | 103 ++++++++--------- .../flink/streaming/api/StreamConfig.java | 21 +--- .../api/datastream/ConnectedDataStream.java | 2 +- .../streaming/api/datastream/DataStream.java | 13 ++- .../api/datastream/DataStreamSink.java | 5 +- .../api/datastream/DataStreamSource.java | 5 +- .../api/datastream/IterativeDataStream.java | 3 +- .../datastream/SingleOutputStreamOperator.java | 12 +- .../environment/StreamExecutionEnvironment.java | 9 +- .../api/invokable/ChainableInvokable.java | 1 + .../api/invokable/StreamInvokable.java | 20 +++- .../operator/GroupedReduceInvokable.java | 6 +- .../operator/GroupedWindowInvokable.java | 1 + .../api/invokable/operator/WindowInvokable.java | 3 +- .../api/streamvertex/CoStreamVertex.java | 2 +- .../api/streamvertex/OutputHandler.java | 110 +++++++++---------- 16 files changed, 159 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index 35e145e..0020d48 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.streaming.api.collector.OutputSelector; import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.api.streamvertex.CoStreamVertex; @@ -64,7 +65,7 @@ public class JobGraphBuilder { private Map<String, Integer> vertexParallelism; private Map<String, Long> bufferTimeout; private Map<String, List<String>> outEdgeList; - private Map<String, List<Integer>> outEdgeType; + private Map<String, List<Integer>> outEdgeIndex; private Map<String, List<List<String>>> outEdgeNames; private Map<String, List<Boolean>> outEdgeSelectAll; private Map<String, List<String>> inEdgeList; @@ -85,8 +86,7 @@ public class JobGraphBuilder { private Map<String, Map<String, OperatorState<?>>> operatorStates; private Map<String, InputFormat<String, ?>> inputFormatList; private Map<String, List<String>> chainedVertices; - private Map<String, List<ChainableInvokable<?, ?>>> chainedInvokable; - private Map<String, List<StreamRecordSerializer<?>>> chainedSerializer; + private Map<String, String> lastInChains; private Set<String> sources; private Set<String> builtVertices; @@ -97,11 +97,19 @@ public class JobGraphBuilder { */ public JobGraphBuilder() { + initGraph(); + + if (LOG.isDebugEnabled()) { + LOG.debug("JobGraph created"); + } + } + + public void initGraph() { streamVertices = new HashMap<String, AbstractJobVertex>(); vertexParallelism = new HashMap<String, Integer>(); bufferTimeout = new HashMap<String, Long>(); outEdgeList = new HashMap<String, List<String>>(); - outEdgeType = new HashMap<String, List<Integer>>(); + outEdgeIndex = new HashMap<String, List<Integer>>(); outEdgeNames = new HashMap<String, List<List<String>>>(); outEdgeSelectAll = new HashMap<String, List<Boolean>>(); inEdgeList = new HashMap<String, List<String>>(); @@ -121,16 +129,11 @@ public class JobGraphBuilder { iterationWaitTime = new HashMap<String, Long>(); operatorStates = new HashMap<String, Map<String, OperatorState<?>>>(); inputFormatList = new HashMap<String, InputFormat<String, ?>>(); - chainedInvokable = new HashMap<String, List<ChainableInvokable<?, ?>>>(); - chainedSerializer = new HashMap<String, List<StreamRecordSerializer<?>>>(); chainedVertices = new HashMap<String, List<String>>(); + lastInChains = new HashMap<String, String>(); sources = new HashSet<String>(); builtVertices = new HashSet<String>(); - - if (LOG.isDebugEnabled()) { - LOG.debug("JobGraph created"); - } } /** @@ -198,7 +201,7 @@ public class JobGraphBuilder { iterationIds.put(vertexName, iterationID); iterationIDtoHeadName.put(iterationID, vertexName); - setBytesFrom(iterationHead, vertexName); + setSerializersFrom(iterationHead, vertexName); setEdge(vertexName, iterationHead, outPartitioning .get(inEdgeList.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>(), @@ -241,7 +244,7 @@ public class JobGraphBuilder { iterationIds.put(vertexName, iterationID); iterationIDtoTailName.put(iterationID, vertexName); - setBytesFrom(iterationTail, vertexName); + setSerializersFrom(iterationTail, vertexName); iterationWaitTime.put(iterationIDtoTailName.get(iterationID), waitTime); if (LOG.isDebugEnabled()) { @@ -288,12 +291,13 @@ public class JobGraphBuilder { invokableObjects.put(vertexName, invokableObject); operatorNames.put(vertexName, operatorName); outEdgeList.put(vertexName, new ArrayList<String>()); - outEdgeType.put(vertexName, new ArrayList<Integer>()); + outEdgeIndex.put(vertexName, new ArrayList<Integer>()); outEdgeNames.put(vertexName, new ArrayList<List<String>>()); outEdgeSelectAll.put(vertexName, new ArrayList<Boolean>()); inEdgeList.put(vertexName, new ArrayList<String>()); outPartitioning.put(vertexName, new ArrayList<StreamPartitioner<?>>()); iterationTailCount.put(vertexName, 0); + lastInChains.put(vertexName, vertexName); } private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1, @@ -327,11 +331,16 @@ public class JobGraphBuilder { } } + List<String> chainedNames = chainedVertices.get(vertexName); + boolean isChained = chainedNames != null; + int numChained = isChained ? chainedNames.size() : 0; + String lastInChain = lastInChains.get(vertexName); + // Get vertex attributes Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName); StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName); int parallelism = vertexParallelism.get(vertexName); - byte[] outputSelector = outputSelectors.get(vertexName); + byte[] outputSelector = outputSelectors.get(lastInChain); Map<String, OperatorState<?>> state = operatorStates.get(vertexName); // Create vertex object @@ -347,16 +356,17 @@ public class JobGraphBuilder { LOG.debug("Parallelism set: {} for {}", parallelism, vertexName); } + // Set vertex config + StreamConfig config = new StreamConfig(vertex.getConfiguration()); - config.setBufferTimeout(bufferTimeout.get(vertexName)); + config.setBufferTimeout(bufferTimeout.get(lastInChain)); config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName)); config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName)); config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName)); config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName)); - // Set vertex config config.setUserInvokable(invokableObject); config.setOutputSelector(outputSelector); config.setOperatorStates(state); @@ -371,15 +381,12 @@ public class JobGraphBuilder { vertex.setInputSplitSource(inputFormatList.get(vertexName)); } - List<ChainableInvokable<?, ?>> chainedInvokables = chainedInvokable.get(vertexName); - List<StreamRecordSerializer<?>> chainedSerializers = chainedSerializer.get(vertexName); - - int numChained = chainedInvokables == null ? 0 : chainedInvokables.size(); config.setNumberofChainedTasks(numChained); for (int i = 0; i < numChained; i++) { - config.setChainedInvokable(chainedInvokables.get(i), i); - config.setChainedSerializer(chainedSerializers.get(i), i); + config.setChainedInvokable( + (ChainableInvokable<?, ?>) invokableObjects.get(chainedNames.get(i)), i); + config.setChainedSerializer(typeSerializersIn1.get(chainedNames.get(i)), i); } streamVertices.put(vertexName, vertex); @@ -390,12 +397,15 @@ public class JobGraphBuilder { } private void chainRecursively(String chainStart, String current, String next) { + // We chain the next operator to the start of this chain chainTasks(chainStart, next); - // Add multiple chaining here + // Now recursively chain the outputs of next (depth first) for (String output : outEdgeList.get(next)) { if (isChainable(next, output)) { + // Recursive call chainRecursively(chainStart, next, output); } else { + // If not chainable we continue building the jobgraph from there createVertex(output); } } @@ -405,25 +415,14 @@ public class JobGraphBuilder { return outEdgeList.get(vertexName).size() == 1 && inEdgeList.get(outName).size() == 1 && outputSelectors.get(vertexName) == null - && invokableObjects.get(outName).isChainable() + && invokableObjects.get(outName).getChainingStrategy() == ChainingStrategy.ALWAYS + && (invokableObjects.get(vertexName).getChainingStrategy() == ChainingStrategy.HEAD || invokableObjects + .get(vertexName).getChainingStrategy() == ChainingStrategy.ALWAYS) && outPartitioning.get(vertexName).get(0).getStrategy() == PartitioningStrategy.FORWARD && vertexParallelism.get(vertexName) == vertexParallelism.get(outName) && chaining; } private void chainTasks(String first, String second) { - List<ChainableInvokable<?, ?>> chainedInvokables = chainedInvokable.get(first); - if (chainedInvokables == null) { - chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>(); - } - chainedInvokables.add((ChainableInvokable<?, ?>) invokableObjects.get(second)); - chainedInvokable.put(first, chainedInvokables); - - List<StreamRecordSerializer<?>> chainedSerializers = chainedSerializer.get(first); - if (chainedSerializers == null) { - chainedSerializers = new ArrayList<StreamRecordSerializer<?>>(); - } - chainedSerializers.add(typeSerializersIn1.get(second)); - chainedSerializer.put(first, chainedSerializers); List<String> chained = chainedVertices.get(first); if (chained == null) { @@ -431,16 +430,7 @@ public class JobGraphBuilder { } chained.add(second); chainedVertices.put(first, chained); - - outEdgeList.put(first, outEdgeList.get(second)); - typeSerializersOut1.put(first, typeSerializersOut1.get(second)); - outPartitioning.put(first, outPartitioning.get(second)); - outEdgeType.put(first, outEdgeType.get(second)); - outEdgeNames.put(first, outEdgeNames.get(second)); - outEdgeSelectAll.put(first, outEdgeSelectAll.get(second)); - outPartitioning.put(first, outPartitioning.get(second)); - bufferTimeout.put(first, bufferTimeout.get(second)); - outputSelectors.put(first, outputSelectors.get(second)); + lastInChains.put(first, second); } @@ -477,8 +467,10 @@ public class JobGraphBuilder { int outputIndex = upStreamVertex.getNumberOfProducedIntermediateDataSets() - 1; - config.setOutputName(outputIndex, outEdgeNames.get(upStreamVertexName).get(outputIndex)); - config.setSelectAll(outputIndex, outEdgeSelectAll.get(upStreamVertexName).get(outputIndex)); + config.setOutputName(outputIndex, outEdgeNames.get(lastInChains.get(upStreamVertexName)) + .get(outputIndex)); + config.setSelectAll(outputIndex, outEdgeSelectAll.get(lastInChains.get(upStreamVertexName)) + .get(outputIndex)); config.setPartitioner(outputIndex, partitionerObject); config.setNumberOfOutputChannels(outputIndex, vertexParallelism.get(downStreamVertexName)); } @@ -547,7 +539,7 @@ public class JobGraphBuilder { StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames, boolean selectAll) { outEdgeList.get(upStreamVertexName).add(downStreamVertexName); - outEdgeType.get(upStreamVertexName).add(typeNumber); + outEdgeIndex.get(upStreamVertexName).add(typeNumber); inEdgeList.get(downStreamVertexName).add(upStreamVertexName); outPartitioning.get(upStreamVertexName).add(partitionerObject); outEdgeNames.get(upStreamVertexName).add(outputNames); @@ -608,7 +600,7 @@ public class JobGraphBuilder { * @param to * to */ - public void setBytesFrom(String from, String to) { + public void setSerializersFrom(String from, String to) { operatorNames.put(to, operatorNames.get(from)); typeSerializersIn1.put(to, typeSerializersOut1.get(from)); @@ -691,19 +683,20 @@ public class JobGraphBuilder { for (String upStreamVertexName : builtVertices) { int i = 0; - List<Integer> outEdgeTypeList = outEdgeType.get(upStreamVertexName); + List<Integer> outEdgeTypeList = outEdgeIndex.get(lastInChains.get(upStreamVertexName)); - for (String downStreamVertexName : outEdgeList.get(upStreamVertexName)) { + for (String downStreamVertexName : outEdgeList + .get(lastInChains.get(upStreamVertexName))) { StreamConfig downStreamVertexConfig = new StreamConfig(streamVertices.get( downStreamVertexName).getConfiguration()); int inputNumber = downStreamVertexConfig.getNumberOfInputs(); - downStreamVertexConfig.setInputType(inputNumber++, outEdgeTypeList.get(i)); + downStreamVertexConfig.setInputIndex(inputNumber++, outEdgeTypeList.get(i)); downStreamVertexConfig.setNumberOfInputs(inputNumber); connect(upStreamVertexName, downStreamVertexName, - outPartitioning.get(upStreamVertexName).get(i)); + outPartitioning.get(lastInChains.get(upStreamVertexName)).get(i)); i++; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 22bcf92..ada3aae 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -24,7 +24,6 @@ import java.util.Map; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.collector.OutputSelector; import org.apache.flink.streaming.api.invokable.ChainableInvokable; @@ -170,7 +169,7 @@ public class StreamConfig { config.setBoolean(DIRECTED_EMIT, directedEmit); } - public boolean getDirectedEmit() { + public boolean isDirectedEmit() { return config.getBoolean(DIRECTED_EMIT, false); } @@ -239,7 +238,7 @@ public class StreamConfig { } } - public boolean getSelectAll(int outputIndex) { + public boolean isSelectAll(int outputIndex) { return config.getBoolean(OUTPUT_SELECT_ALL + outputIndex, false); } @@ -272,26 +271,14 @@ public class StreamConfig { return config.getInteger(NUMBER_OF_OUTPUTS, 0); } - public void setInputType(int inputNumber, Integer inputTypeNumber) { + public void setInputIndex(int inputNumber, Integer inputTypeNumber) { config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber); } - public int getInputType(int inputNumber) { + public int getInputIndex(int inputNumber) { return config.getInteger(INPUT_TYPE + inputNumber, 0); } - public void setFunctionClass(Class<? extends AbstractRichFunction> functionClass) { - config.setClass("functionClass", functionClass); - } - - public Class<? extends AbstractRichFunction> getFunctionClass(ClassLoader cl) { - try { - return config.getClass("functionClass", null, cl); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Could not load function class", e); - } - } - public void setOperatorStates(Map<String, OperatorState<?>> states) { config.setBytes(OPERATOR_STATES, SerializationUtils.serialize((Serializable) states)); } http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index a0c8ff8..80ea970 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -400,7 +400,7 @@ public class ConnectedDataStream<IN1, IN2> { @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator( - environment, functionName, outTypeInfo); + environment, functionName, outTypeInfo, functionInvokable); dataStream1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable, getInputType1(), getInputType2(), outTypeInfo, functionName, http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index db644e9..f68ab68 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -1083,7 +1083,7 @@ public class DataStream<OUT> { protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) { DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null, - true); + null, true); jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID, degreeOfParallelism, waitTime); @@ -1110,7 +1110,7 @@ public class DataStream<OUT> { DataStream<OUT> inputStream = this.copy(); @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, - operatorName, outTypeInfo); + operatorName, outTypeInfo, invokable); jobGraphBuilder.addStreamVertex(returnStream.getId(), invokable, getType(), outTypeInfo, operatorName, degreeOfParallelism); @@ -1174,10 +1174,13 @@ public class DataStream<OUT> { */ public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) { - DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType()); + StreamInvokable<OUT, OUT> sinkInvokable = new SinkInvokable<OUT>(clean(sinkFunction)); - jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>( - clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism); + DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(), + sinkInvokable); + + jobGraphBuilder.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, + "sink", degreeOfParallelism); this.connectGraph(this.copy(), returnStream.getId(), 0); http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 61fc557..f064332 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.invokable.StreamInvokable; /** * Represents the end of a DataStream. @@ -29,8 +30,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class DataStreamSink<IN> extends SingleOutputStreamOperator<IN, DataStreamSink<IN>> { protected DataStreamSink(StreamExecutionEnvironment environment, String operatorType, - TypeInformation<IN> outTypeInfo) { - super(environment, operatorType, outTypeInfo); + TypeInformation<IN> outTypeInfo, StreamInvokable<?,?> invokable) { + super(environment, operatorType, outTypeInfo, invokable); } protected DataStreamSink(DataStream<IN> dataStream) { http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java index a649f59..b596cbd 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.invokable.StreamInvokable; /** * The DataStreamSource represents the starting point of a DataStream. @@ -31,8 +32,8 @@ public class DataStreamSource<OUT> extends SingleOutputStreamOperator<OUT, DataS boolean isParallel; public DataStreamSource(StreamExecutionEnvironment environment, String operatorType, - TypeInformation<OUT> outTypeInfo, boolean isParallel) { - super(environment, operatorType, outTypeInfo); + TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> invokable, boolean isParallel) { + super(environment, operatorType, outTypeInfo, invokable); this.isParallel = isParallel; } http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java index 88feb5d..c4cd1e1 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java @@ -61,7 +61,8 @@ public class IterativeDataStream<IN> extends * */ public DataStream<IN> closeWith(DataStream<IN> iterationTail) { - DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "iterationSink", null); + DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "iterationSink", null, + null); jobGraphBuilder.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID, iterationTail.getParallelism(), waitTime); http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 7a0abe4..b8ca42c 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -26,6 +26,8 @@ import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.collector.OutputSelector; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.invokable.StreamInvokable; +import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; import org.apache.flink.streaming.state.OperatorState; @@ -42,12 +44,14 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato DataStream<OUT> { protected boolean isSplit; + protected StreamInvokable<?, ?> invokable; protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, - String operatorType, TypeInformation<OUT> outTypeInfo) { + String operatorType, TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> invokable) { super(environment, operatorType, outTypeInfo); setBufferTimeout(environment.getBufferTimeout()); this.isSplit = false; + this.invokable = invokable; } @SuppressWarnings("unchecked") @@ -55,6 +59,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato super(dataStream); if (dataStream instanceof SingleOutputStreamOperator) { this.isSplit = ((SingleOutputStreamOperator<OUT, ?>) dataStream).isSplit; + this.invokable = ((SingleOutputStreamOperator<OUT, ?>) dataStream).invokable; } } @@ -192,4 +197,9 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato return new SingleOutputStreamOperator<OUT, O>(this); } + public SingleOutputStreamOperator<OUT, O> setChainingStrategy(ChainingStrategy strategy) { + this.invokable.setChainingStrategy(strategy); + return this; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 08fd19b..4194864 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction; import org.apache.flink.streaming.api.function.source.SourceFunction; import org.apache.flink.streaming.api.invokable.SourceInvokable; +import org.apache.flink.streaming.api.invokable.StreamInvokable; /** * {@link ExecutionEnvironment} for streaming jobs. An instance of it is @@ -431,11 +432,13 @@ public abstract class StreamExecutionEnvironment { boolean isParallel = function instanceof ParallelSourceFunction; int dop = isParallel ? getDegreeOfParallelism() : 1; + StreamInvokable<OUT, OUT> sourceInvokable = new SourceInvokable<OUT>(function); + DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, sourceName, - outTypeInfo, isParallel); + outTypeInfo, sourceInvokable, isParallel); - jobGraphBuilder.addSourceVertex(returnStream.getId(), new SourceInvokable<OUT>(function), - null, outTypeInfo, sourceName, dop); + jobGraphBuilder.addSourceVertex(returnStream.getId(), sourceInvokable, null, outTypeInfo, + sourceName, dop); return returnStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java index 373b4e8..24c0319 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/ChainableInvokable.java @@ -28,6 +28,7 @@ public abstract class ChainableInvokable<IN, OUT> extends StreamInvokable<IN, OU public ChainableInvokable(Function userFunction) { super(userFunction); + setChainingStrategy(ChainingStrategy.ALWAYS); } public void setup(Collector<OUT> collector, StreamRecordSerializer<IN> inSerializer) { http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index 25e9221..614b67f 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -59,6 +59,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { protected Function userFunction; protected volatile boolean isRunning; + private ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; + public StreamInvokable(Function userFunction) { this.userFunction = userFunction; } @@ -160,7 +162,21 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { return objectSerializer.copy(record); } - public boolean isChainable() { - return this instanceof ChainableInvokable; + public void setChainingStrategy(ChainingStrategy strategy) { + if (strategy == ChainingStrategy.ALWAYS) { + if (!(this instanceof ChainableInvokable)) { + throw new RuntimeException( + "Invokable needs to extend ChainableInvokable to be chained"); + } + } + this.chainingStrategy = strategy; + } + + public ChainingStrategy getChainingStrategy() { + return chainingStrategy; + } + + public static enum ChainingStrategy { + ALWAYS, NEVER, HEAD; } } http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java index 84258d6..01c0545 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokable.java @@ -34,6 +34,7 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> { super(reducer); this.keySelector = keySelector; values = new HashMap<Object, IN>(); + setChainingStrategy(ChainingStrategy.NEVER); } @Override @@ -56,9 +57,4 @@ public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> { reduced = reducer.reduce(currentValue, nextValue); } - @Override - public boolean isChainable() { - return false; - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java index 33348e4..bbd7b0c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java @@ -154,6 +154,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> { LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) { super(userFunction); + setChainingStrategy(ChainingStrategy.NEVER); this.keySelector = keySelector; http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java index ea891c9..de9c664 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java @@ -61,7 +61,8 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> public WindowInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>> triggerPolicies, LinkedList<EvictionPolicy<IN>> evictionPolicies) { super(userFunction); - + setChainingStrategy(ChainingStrategy.NEVER); + this.triggerPolicies = triggerPolicies; this.evictionPolicies = evictionPolicies; http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java index f065d9c..2b650be 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java @@ -80,7 +80,7 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> { ArrayList<BufferReader> inputList2 = new ArrayList<BufferReader>(); for (int i = 0; i < numberOfInputs; i++) { - int inputType = configuration.getInputType(i); + int inputType = configuration.getInputIndex(i); BufferReader reader = getEnvironment().getReader(i); switch (inputType) { case 1: http://git-wip-us.apache.org/repos/asf/flink/blob/3e30c6f7/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 457f3f8..60a7b14 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; -import org.apache.flink.streaming.api.collector.CollectorWrapper; import org.apache.flink.streaming.api.collector.DirectedOutputWrapper; import org.apache.flink.streaming.api.collector.OutputSelector; import org.apache.flink.streaming.api.collector.StreamOutput; @@ -44,11 +43,11 @@ import org.slf4j.LoggerFactory; public class OutputHandler<OUT> { private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class); - private StreamVertex<?, OUT> streamVertex; + private StreamVertex<?, OUT> vertex; private StreamConfig configuration; private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs; - private Collector<OUT> endCollector; + private Collector<OUT> outerCollector; TypeInformation<OUT> outTypeInfo = null; StreamRecordSerializer<OUT> outSerializer = null; @@ -58,18 +57,14 @@ public class OutputHandler<OUT> { private int numberOfChainedTasks; - public OutputHandler(StreamVertex<?, OUT> streamComponent) { - this.streamVertex = streamComponent; + public OutputHandler(StreamVertex<?, OUT> vertex) { + this.vertex = vertex; this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); - this.configuration = new StreamConfig(streamComponent.getTaskConfiguration()); + this.configuration = new StreamConfig(vertex.getTaskConfiguration()); this.chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>(); + this.numberOfChainedTasks = configuration.getNumberofChainedTasks(); - try { - setConfigOutputs(); - } catch (StreamVertexException e) { - throw new StreamVertexException("Cannot register outputs for " - + streamComponent.getClass().getSimpleName(), e); - } + this.outerCollector = createChainedCollector(0); } @@ -77,84 +72,83 @@ public class OutputHandler<OUT> { return outputs; } - private void setConfigOutputs() { - numberOfChainedTasks = configuration.getNumberofChainedTasks(); - endCollector = createChainedOutputs(0); - } - - @SuppressWarnings("unchecked") - private Collector<OUT> createChainedOutputs(int chainedTaskIndex) { + // We create the outer collector by nesting the chainable invokables into + // each other + @SuppressWarnings({ "unchecked", "rawtypes" }) + private Collector<OUT> createChainedCollector(int chainedTaskIndex) { if (numberOfChainedTasks == chainedTaskIndex) { - return createEndCollector(); + // At the end of the chain we create the collector that sends data + // to the recordwriters + return createNetworkCollector(); } else { - CollectorWrapper<OUT> chainedCollector = new CollectorWrapper<OUT>(); - @SuppressWarnings("rawtypes") ChainableInvokable chainableInvokable = configuration.getChainedInvokable( - chainedTaskIndex, streamVertex.getUserCodeClassLoader()); + chainedTaskIndex, vertex.getUserCodeClassLoader()); + // The nesting is done by calling this method recursively when + // passing the collector to the invokable chainableInvokable.setup( - createChainedOutputs(chainedTaskIndex + 1), + createChainedCollector(chainedTaskIndex + 1), configuration.getChainedInSerializer(chainedTaskIndex, - streamVertex.getUserCodeClassLoader())); + vertex.getUserCodeClassLoader())); + // We hold a list of the chained invokables for initializaton + // afterwards chainedInvokables.add(chainableInvokable); - chainedCollector.addChainedOutput((Collector<OUT>) chainableInvokable); - - return chainedCollector; + return chainableInvokable; } } - private Collector<OUT> createEndCollector() { + private Collector<OUT> createNetworkCollector() { - setSerializers(); + createOutSerializer(); StreamOutputWrapper<OUT> collector; - if (streamVertex.configuration.getDirectedEmit()) { - OutputSelector<OUT> outputSelector = streamVertex.configuration - .getOutputSelector(streamVertex.userClassLoader); + if (vertex.configuration.isDirectedEmit()) { + OutputSelector<OUT> outputSelector = vertex.configuration + .getOutputSelector(vertex.userClassLoader); - collector = new DirectedOutputWrapper<OUT>(streamVertex.getInstanceID(), + collector = new DirectedOutputWrapper<OUT>(vertex.getInstanceID(), outSerializationDelegate, outputSelector); } else { - collector = new StreamOutputWrapper<OUT>(streamVertex.getInstanceID(), + collector = new StreamOutputWrapper<OUT>(vertex.getInstanceID(), outSerializationDelegate); } int numberOfOutputs = configuration.getNumberOfOutputs(); for (int i = 0; i < numberOfOutputs; i++) { - collector = (StreamOutputWrapper<OUT>) setPartitioner(i, collector); + collector = (StreamOutputWrapper<OUT>) addStreamOutput(i, collector); } return collector; } public Collector<OUT> getCollector() { - return endCollector; + return outerCollector; } - void setSerializers() { - outSerializer = configuration.getTypeSerializerOut1(streamVertex.userClassLoader); + void createOutSerializer() { + outSerializer = configuration.getTypeSerializerOut1(vertex.userClassLoader); if (outSerializer != null) { outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer); outSerializationDelegate.setInstance(outSerializer.createInstance()); } } - Collector<OUT> setPartitioner(int outputNumber, StreamOutputWrapper<OUT> endCollector) { - StreamPartitioner<OUT> outputPartitioner = null; + Collector<OUT> addStreamOutput(int outputNumber, StreamOutputWrapper<OUT> networkCollector) { + + StreamPartitioner<OUT> outputPartitioner; try { - outputPartitioner = configuration.getPartitioner(streamVertex.userClassLoader, + outputPartitioner = configuration.getPartitioner(vertex.userClassLoader, outputNumber); - } catch (Exception e) { throw new StreamVertexException("Cannot deserialize partitioner for " - + streamVertex.getName() + " with " + outputNumber + " outputs", e); + + vertex.getName() + " with " + outputNumber + " outputs", e); } RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output; @@ -162,37 +156,33 @@ public class OutputHandler<OUT> { long bufferTimeout = configuration.getBufferTimeout(); if (bufferTimeout >= 0) { - output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex + output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex .getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout); if (LOG.isTraceEnabled()) { LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}", - bufferTimeout, streamVertex.getClass().getSimpleName()); + bufferTimeout, vertex.getClass().getSimpleName()); } } else { - output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex + output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex .getEnvironment().getWriter(outputNumber), outputPartitioner); if (LOG.isTraceEnabled()) { - LOG.trace("RecordWriter initiated for {}", streamVertex.getClass().getSimpleName()); + LOG.trace("RecordWriter initiated for {}", vertex.getClass().getSimpleName()); } } outputs.add(output); - List<String> outputNames = configuration.getOutputNames(outputNumber); - boolean isSelectAllOutput = configuration.getSelectAll(outputNumber); - if (endCollector != null) { - endCollector.addOutput(new StreamOutput<OUT>(output, isSelectAllOutput ? null - : outputNames)); - } + networkCollector.addOutput(new StreamOutput<OUT>(output, configuration + .isSelectAll(outputNumber) ? null : configuration.getOutputNames(outputNumber))); if (LOG.isTraceEnabled()) { LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass() - .getSimpleName(), outputNumber, streamVertex.getClass().getSimpleName()); + .getSimpleName(), outputNumber, vertex.getClass().getSimpleName()); } - return endCollector; + return networkCollector; } public void flushOutputs() throws IOException, InterruptedException { @@ -205,17 +195,15 @@ public class OutputHandler<OUT> { } } - long startTime; - public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable) throws IOException, InterruptedException { if (LOG.isDebugEnabled()) { LOG.debug("{} {} invoked with instance id {}", componentTypeName, - streamVertex.getName(), streamVertex.getInstanceID()); + vertex.getName(), vertex.getInstanceID()); } try { - streamVertex.invokeUserFunction(userInvokable); + vertex.invokeUserFunction(userInvokable); } catch (Exception e) { flushOutputs(); throw new RuntimeException(e); @@ -223,7 +211,7 @@ public class OutputHandler<OUT> { if (LOG.isDebugEnabled()) { LOG.debug("{} {} invoke finished instance id {}", componentTypeName, - streamVertex.getName(), streamVertex.getInstanceID()); + vertex.getName(), vertex.getInstanceID()); } flushOutputs();
