[FLINK-1345] [streaming] Advanced task chaining added
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7dbb55ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7dbb55ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7dbb55ec Branch: refs/heads/master Commit: 7dbb55ece0a9d9777c0e3254bc8f9f5cf566d535 Parents: 3e30c6f Author: Gyula Fora <[email protected]> Authored: Sun Jan 18 18:23:57 2015 +0100 Committer: mbalassi <[email protected]> Committed: Wed Jan 21 16:06:34 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/JobGraphBuilder.java | 298 ++++++++----------- .../flink/streaming/api/StreamConfig.java | 180 +++++++---- .../api/collector/CollectorWrapper.java | 2 +- .../streaming/api/collector/StreamOutput.java | 15 + .../api/collector/StreamOutputWrapper.java | 6 - .../api/streamvertex/OutputHandler.java | 204 ++++++++----- .../api/streamvertex/StreamIterationHead.java | 23 +- .../flink/streaming/api/scala/DataStream.scala | 10 + 8 files changed, 433 insertions(+), 305 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index 0020d48..6ae97c9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -24,9 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -34,7 +35,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable; @@ -85,11 +85,11 @@ public class JobGraphBuilder { private Map<String, Long> iterationWaitTime; private Map<String, Map<String, OperatorState<?>>> operatorStates; private Map<String, InputFormat<String, ?>> inputFormatList; - private Map<String, List<String>> chainedVertices; - private Map<String, String> lastInChains; + private Map<String, Map<String, StreamConfig>> chainedConfigs; + private Map<String, StreamConfig> vertexConfigs; private Set<String> sources; - private Set<String> builtVertices; + private Set<String> builtNodes; /** * Creates an new {@link JobGraph} with the given name. A JobGraph is a DAG @@ -129,11 +129,11 @@ public class JobGraphBuilder { iterationWaitTime = new HashMap<String, Long>(); operatorStates = new HashMap<String, Map<String, OperatorState<?>>>(); inputFormatList = new HashMap<String, InputFormat<String, ?>>(); - chainedVertices = new HashMap<String, List<String>>(); - lastInChains = new HashMap<String, String>(); + chainedConfigs = new HashMap<String, Map<String, StreamConfig>>(); + vertexConfigs = new HashMap<String, StreamConfig>(); sources = new HashSet<String>(); - builtVertices = new HashSet<String>(); + builtNodes = new HashSet<String>(); } /** @@ -198,6 +198,8 @@ public class JobGraphBuilder { addVertex(vertexName, StreamIterationHead.class, null, null, parallelism); + chaining = false; + iterationIds.put(vertexName, iterationID); iterationIDtoHeadName.put(iterationID, vertexName); @@ -297,7 +299,6 @@ public class JobGraphBuilder { inEdgeList.put(vertexName, new ArrayList<String>()); outPartitioning.put(vertexName, new ArrayList<StreamPartitioner<?>>()); iterationTailCount.put(vertexName, 0); - lastInChains.put(vertexName, vertexName); } private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1, @@ -309,170 +310,175 @@ public class JobGraphBuilder { typeSerializersOut2.put(vertexName, out2); } - /** - * Creates an {@link AbstractJobVertex} in the {@link JobGraph} and sets its - * config parameters using the ones set previously. - * - * @param vertexName - * Name for which the vertex will be created. - */ - private void createVertex(String vertexName) { + private List<Tuple2<String, String>> createChain(String startNode, String current) { - if (!builtVertices.contains(vertexName)) { - if (!outEdgeList.get(vertexName).isEmpty()) { + if (!builtNodes.contains(startNode)) { - for (String outName : outEdgeList.get(vertexName)) { - if (isChainable(vertexName, outName)) { - chainRecursively(vertexName, vertexName, outName); - } else { - createVertex(outName); - } + List<Tuple2<String, String>> transitiveOutEdges = new ArrayList<Tuple2<String, String>>(); + List<String> chainableOutputs = new ArrayList<String>(); + List<String> nonChainableOutputs = new ArrayList<String>(); + for (String outName : outEdgeList.get(current)) { + if (isChainable(current, outName)) { + chainableOutputs.add(outName); + } else { + nonChainableOutputs.add(outName); } + } - List<String> chainedNames = chainedVertices.get(vertexName); - boolean isChained = chainedNames != null; - int numChained = isChained ? chainedNames.size() : 0; - String lastInChain = lastInChains.get(vertexName); - - // Get vertex attributes - Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName); - StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName); - int parallelism = vertexParallelism.get(vertexName); - byte[] outputSelector = outputSelectors.get(lastInChain); - Map<String, OperatorState<?>> state = operatorStates.get(vertexName); - - // Create vertex object - String cname = chainedVertices.get(vertexName) == null ? "" : " => " - + StringUtils.join(chainedVertices.get(vertexName), " => "); - AbstractJobVertex vertex = new AbstractJobVertex(vertexName + cname); - - this.jobGraph.addVertex(vertex); - - vertex.setInvokableClass(vertexClass); - vertex.setParallelism(parallelism); - if (LOG.isDebugEnabled()) { - LOG.debug("Parallelism set: {} for {}", parallelism, vertexName); + for (String chainable : chainableOutputs) { + transitiveOutEdges.addAll(createChain(startNode, chainable)); } - // Set vertex config + for (String nonChainable : nonChainableOutputs) { + transitiveOutEdges.add(new Tuple2<String, String>(current, nonChainable)); + transitiveOutEdges.addAll(createChain(nonChainable, nonChainable)); + } - StreamConfig config = new StreamConfig(vertex.getConfiguration()); + StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode) + : new StreamConfig(new Configuration()); - config.setBufferTimeout(bufferTimeout.get(lastInChain)); + setVertexConfig(current, config, chainableOutputs, nonChainableOutputs); - config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName)); - config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName)); - config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName)); - config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName)); + if (current.equals(startNode)) { - config.setUserInvokable(invokableObject); - config.setOutputSelector(outputSelector); - config.setOperatorStates(state); + config.setChainStart(); + config.setRecordWriterOrder(transitiveOutEdges); - if (vertexClass.equals(StreamIterationHead.class) - || vertexClass.equals(StreamIterationTail.class)) { - config.setIterationId(iterationIds.get(vertexName)); - config.setIterationWaitTime(iterationWaitTime.get(vertexName)); - } + for (Tuple2<String, String> edge : transitiveOutEdges) { + connect(startNode, edge); + } - if (inputFormatList.containsKey(vertexName)) { - vertex.setInputSplitSource(inputFormatList.get(vertexName)); - } + vertexConfigs.get(startNode).setTransitiveChainedTaskConfigs( + chainedConfigs.get(startNode)); + + } else { - config.setNumberofChainedTasks(numChained); + Map<String, StreamConfig> chainedConfs = chainedConfigs.get(startNode); - for (int i = 0; i < numChained; i++) { - config.setChainedInvokable( - (ChainableInvokable<?, ?>) invokableObjects.get(chainedNames.get(i)), i); - config.setChainedSerializer(typeSerializersIn1.get(chainedNames.get(i)), i); + if (chainedConfs == null) { + chainedConfigs.put(startNode, new HashMap<String, StreamConfig>()); + } + chainedConfigs.get(startNode).put(current, config); } - streamVertices.put(vertexName, vertex); - builtVertices.add(vertexName); + return transitiveOutEdges; + } else { + return new ArrayList<Tuple2<String, String>>(); } + } + + private StreamConfig createProcessingVertex(String vertexName) { + + AbstractJobVertex vertex = new AbstractJobVertex(vertexName); + + this.jobGraph.addVertex(vertex); + int parallelism = vertexParallelism.get(vertexName); + + vertex.setInvokableClass(vertexClasses.get(vertexName)); + vertex.setParallelism(parallelism); + if (LOG.isDebugEnabled()) { + LOG.debug("Parallelism set: {} for {}", parallelism, vertexName); + } + + if (inputFormatList.containsKey(vertexName)) { + vertex.setInputSplitSource(inputFormatList.get(vertexName)); + } + + streamVertices.put(vertexName, vertex); + builtNodes.add(vertexName); + + return new StreamConfig(vertex.getConfiguration()); } - private void chainRecursively(String chainStart, String current, String next) { - // We chain the next operator to the start of this chain - chainTasks(chainStart, next); - // Now recursively chain the outputs of next (depth first) - for (String output : outEdgeList.get(next)) { - if (isChainable(next, output)) { - // Recursive call - chainRecursively(chainStart, next, output); - } else { - // If not chainable we continue building the jobgraph from there - createVertex(output); - } + private void setVertexConfig(String vertexName, StreamConfig config, + List<String> chainableOutputs, List<String> nonChainableOutputs) { + + StreamInvokable<?, ?> invokableObject = invokableObjects.get(vertexName); + byte[] outputSelector = outputSelectors.get(vertexName); + Class<? extends AbstractInvokable> vertexClass = vertexClasses.get(vertexName); + Map<String, OperatorState<?>> state = operatorStates.get(vertexName); + + config.setVertexName(vertexName); + + config.setBufferTimeout(bufferTimeout.get(vertexName)); + + config.setTypeSerializerIn1(typeSerializersIn1.get(vertexName)); + config.setTypeSerializerIn2(typeSerializersIn2.get(vertexName)); + config.setTypeSerializerOut1(typeSerializersOut1.get(vertexName)); + config.setTypeSerializerOut2(typeSerializersOut2.get(vertexName)); + + config.setUserInvokable(invokableObject); + config.setOutputSelector(outputSelector); + config.setOperatorStates(state); + + config.setNumberOfOutputs(nonChainableOutputs.size()); + config.setOutputs(nonChainableOutputs); + config.setChainedOutputs(chainableOutputs); + + if (vertexClass.equals(StreamIterationHead.class) + || vertexClass.equals(StreamIterationTail.class)) { + config.setIterationId(iterationIds.get(vertexName)); + config.setIterationWaitTime(iterationWaitTime.get(vertexName)); } + + vertexConfigs.put(vertexName, config); } private boolean isChainable(String vertexName, String outName) { - return outEdgeList.get(vertexName).size() == 1 - && inEdgeList.get(outName).size() == 1 + return inEdgeList.get(outName).size() == 1 + && invokableObjects.get(outName) != null && outputSelectors.get(vertexName) == null && invokableObjects.get(outName).getChainingStrategy() == ChainingStrategy.ALWAYS && (invokableObjects.get(vertexName).getChainingStrategy() == ChainingStrategy.HEAD || invokableObjects .get(vertexName).getChainingStrategy() == ChainingStrategy.ALWAYS) - && outPartitioning.get(vertexName).get(0).getStrategy() == PartitioningStrategy.FORWARD + && outPartitioning.get(vertexName) + .get(outEdgeList.get(vertexName).indexOf(outName)).getStrategy() == PartitioningStrategy.FORWARD && vertexParallelism.get(vertexName) == vertexParallelism.get(outName) && chaining; } - private void chainTasks(String first, String second) { + private <T> void connect(String headOfChain, Tuple2<String, String> edge) { - List<String> chained = chainedVertices.get(first); - if (chained == null) { - chained = new ArrayList<String>(); - } - chained.add(second); - chainedVertices.put(first, chained); - lastInChains.put(first, second); - - } + String upStreamVertexName = edge.f0; + String downStreamVertexName = edge.f1; - /** - * Connects two vertices with the given names, partitioning and channel type - * - * @param upStreamVertexName - * Name of the upstream vertex, that will emit the values - * @param downStreamVertexName - * Name of the downstream vertex, that will receive the values - * @param partitionerObject - * The partitioner - */ - private <T> void connect(String upStreamVertexName, String downStreamVertexName, - StreamPartitioner<T> partitionerObject) { + int outputIndex = outEdgeList.get(upStreamVertexName).indexOf(downStreamVertexName); - AbstractJobVertex upStreamVertex = streamVertices.get(upStreamVertexName); + AbstractJobVertex headVertex = streamVertices.get(headOfChain); AbstractJobVertex downStreamVertex = streamVertices.get(downStreamVertexName); - StreamConfig config = new StreamConfig(upStreamVertex.getConfiguration()); + StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); + StreamConfig upStreamConfig = new StreamConfig(headVertex.getConfiguration()); + + List<Integer> outEdgeIndexList = outEdgeIndex.get(upStreamVertexName); + int numOfInputs = downStreamConfig.getNumberOfInputs(); + + downStreamConfig.setInputIndex(numOfInputs++, outEdgeIndexList.get(outputIndex)); + downStreamConfig.setNumberOfInputs(numOfInputs); + + StreamPartitioner<?> partitionerObject = outPartitioning.get(upStreamVertexName).get( + outputIndex); + + upStreamConfig.setPartitioner(downStreamVertexName, partitionerObject); if (partitionerObject.getStrategy() == PartitioningStrategy.FORWARD) { - downStreamVertex - .connectNewDataSetAsInput(upStreamVertex, DistributionPattern.POINTWISE); + downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); } else { - downStreamVertex.connectNewDataSetAsInput(upStreamVertex, - DistributionPattern.ALL_TO_ALL); + downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL); } if (LOG.isDebugEnabled()) { LOG.debug("CONNECTED: {} - {} -> {}", partitionerObject.getClass().getSimpleName(), - upStreamVertexName, downStreamVertexName); + headOfChain, downStreamVertexName); } - int outputIndex = upStreamVertex.getNumberOfProducedIntermediateDataSets() - 1; - - config.setOutputName(outputIndex, outEdgeNames.get(lastInChains.get(upStreamVertexName)) + upStreamConfig.setOutputNames(downStreamVertexName, outEdgeNames.get(upStreamVertexName) .get(outputIndex)); - config.setSelectAll(outputIndex, outEdgeSelectAll.get(lastInChains.get(upStreamVertexName)) + upStreamConfig.setSelectAll(downStreamVertexName, outEdgeSelectAll.get(upStreamVertexName) .get(outputIndex)); - config.setPartitioner(outputIndex, partitionerObject); - config.setNumberOfOutputChannels(outputIndex, vertexParallelism.get(downStreamVertexName)); } /** @@ -630,27 +636,6 @@ public class JobGraphBuilder { } /** - * Writes number of inputs into each JobVertex's config - */ - private void setNumberOfJobInputs() { - for (AbstractJobVertex vertex : streamVertices.values()) { - (new StreamConfig(vertex.getConfiguration())).setNumberOfInputs(vertex - .getNumberOfInputs()); - } - } - - /** - * Writes the number of outputs and output channels into each JobVertex's - * config - */ - private void setNumberOfJobOutputs() { - for (AbstractJobVertex vertex : streamVertices.values()) { - (new StreamConfig(vertex.getConfiguration())).setNumberOfOutputs(vertex - .getNumberOfProducedIntermediateDataSets()); - } - } - - /** * Gets the assembled {@link JobGraph} and adds a default name for it. */ public JobGraph getJobGraph() { @@ -677,33 +662,10 @@ public class JobGraphBuilder { private void buildJobGraph() { for (String sourceName : sources) { - createVertex(sourceName); - } - - for (String upStreamVertexName : builtVertices) { - int i = 0; - - List<Integer> outEdgeTypeList = outEdgeIndex.get(lastInChains.get(upStreamVertexName)); - - for (String downStreamVertexName : outEdgeList - .get(lastInChains.get(upStreamVertexName))) { - StreamConfig downStreamVertexConfig = new StreamConfig(streamVertices.get( - downStreamVertexName).getConfiguration()); - - int inputNumber = downStreamVertexConfig.getNumberOfInputs(); - - downStreamVertexConfig.setInputIndex(inputNumber++, outEdgeTypeList.get(i)); - downStreamVertexConfig.setNumberOfInputs(inputNumber); - - connect(upStreamVertexName, downStreamVertexName, - outPartitioning.get(lastInChains.get(upStreamVertexName)).get(i)); - i++; - } + createChain(sourceName, sourceName); } setSlotSharing(); - setNumberOfJobInputs(); - setNumberOfJobOutputs(); } public void setChaining(boolean chaining) { http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index ada3aae..6fffaa6 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -17,16 +17,16 @@ package org.apache.flink.streaming.api; -import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.api.streamvertex.StreamVertexException; @@ -35,17 +35,20 @@ import org.apache.flink.streaming.partitioner.StreamPartitioner; import org.apache.flink.streaming.state.OperatorState; import org.apache.flink.util.InstantiationUtil; -public class StreamConfig { +public class StreamConfig implements Serializable { + + private static final long serialVersionUID = 1L; + private static final String INPUT_TYPE = "inputType_"; private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs"; private static final String NUMBER_OF_INPUTS = "numberOfInputs"; - private static final String NUMBER_OF_CHAINED_TASKS = "numOfChained"; - private static final String CHAINED_IN_SERIALIZER = "chainedSerializer_"; - private static final String CHAINED_INVOKABLE = "chainedInvokable_"; + private static final String CHAINED_OUTPUTS = "chainedOutputs"; + private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_"; + private static final String IS_CHAINED_VERTEX = "isChainedSubtask"; private static final String OUTPUT_NAME = "outputName_"; private static final String OUTPUT_SELECT_ALL = "outputSelectAll_"; private static final String PARTITIONER_OBJECT = "partitionerObject_"; - private static final String NUMBER_OF_OUTPUT_CHANNELS = "numOfOutputs_"; + private static final String VERTEX_NAME = "vertexName"; private static final String ITERATION_ID = "iteration-id"; private static final String OUTPUT_SELECTOR = "outputSelector"; private static final String DIRECTED_EMIT = "directedEmit"; @@ -58,6 +61,8 @@ public class StreamConfig { private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1"; private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2"; private static final String ITERATON_WAIT = "iterationWait"; + private static final String OUTPUTS = "outVertexNames"; + private static final String RW_ORDER = "rwOrder"; // DEFAULT VALUES @@ -75,6 +80,14 @@ public class StreamConfig { return config; } + public void setVertexName(String vertexName) { + config.setString(VERTEX_NAME, vertexName); + } + + public String getTaskName() { + return config.getString(VERTEX_NAME, "Missing"); + } + public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) { setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer); } @@ -206,25 +219,21 @@ public class StreamConfig { return config.getLong(ITERATON_WAIT, 0); } - public void setNumberOfOutputChannels(int outputIndex, Integer numberOfOutputChannels) { - config.setInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, numberOfOutputChannels); - } + public <T> void setPartitioner(String output, StreamPartitioner<T> partitionerObject) { - public int getNumberOfOutputChannels(int outputIndex) { - return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0); - } - - public <T> void setPartitioner(int outputIndex, StreamPartitioner<T> partitionerObject) { - - config.setBytes(PARTITIONER_OBJECT + outputIndex, + config.setBytes(PARTITIONER_OBJECT + output, SerializationUtils.serialize(partitionerObject)); } - public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, int outputIndex) - throws ClassNotFoundException, IOException { - @SuppressWarnings("unchecked") - StreamPartitioner<T> partitioner = (StreamPartitioner<T>) InstantiationUtil - .readObjectFromConfig(this.config, PARTITIONER_OBJECT + outputIndex, cl); + @SuppressWarnings("unchecked") + public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, String output) { + StreamPartitioner<T> partitioner = null; + try { + partitioner = (StreamPartitioner<T>) InstantiationUtil.readObjectFromConfig( + this.config, PARTITIONER_OBJECT + output, cl); + } catch (Exception e) { + throw new RuntimeException("Partitioner could not be instantiated."); + } if (partitioner != null) { return partitioner; } else { @@ -232,27 +241,27 @@ public class StreamConfig { } } - public void setSelectAll(int outputIndex, Boolean selectAll) { + public void setSelectAll(String output, Boolean selectAll) { if (selectAll != null) { - config.setBoolean(OUTPUT_SELECT_ALL + outputIndex, selectAll); + config.setBoolean(OUTPUT_SELECT_ALL + output, selectAll); } } - public boolean isSelectAll(int outputIndex) { - return config.getBoolean(OUTPUT_SELECT_ALL + outputIndex, false); + public boolean isSelectAll(String output) { + return config.getBoolean(OUTPUT_SELECT_ALL + output, true); } - public void setOutputName(int outputIndex, List<String> outputName) { + public void setOutputNames(String output, List<String> outputName) { if (outputName != null) { - config.setBytes(OUTPUT_NAME + outputIndex, + config.setBytes(OUTPUT_NAME + output, SerializationUtils.serialize((Serializable) outputName)); } } @SuppressWarnings("unchecked") - public List<String> getOutputNames(int outputIndex) { - return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME - + outputIndex, null)); + public List<String> getOutputNames(String output) { + return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output, + null)); } public void setNumberOfInputs(int numberOfInputs) { @@ -271,6 +280,38 @@ public class StreamConfig { return config.getInteger(NUMBER_OF_OUTPUTS, 0); } + public void setOutputs(List<String> outputVertexNames) { + config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputVertexNames)); + } + + @SuppressWarnings("unchecked") + public List<String> getOutputs(ClassLoader cl) { + try { + return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate outputs."); + } + } + + public void setRecordWriterOrder(List<Tuple2<String, String>> outEdgeList) { + + List<String> outVertices = new ArrayList<String>(); + for (Tuple2<String, String> edge : outEdgeList) { + outVertices.add(edge.f1); + } + + config.setBytes(RW_ORDER, SerializationUtils.serialize((Serializable) outVertices)); + } + + @SuppressWarnings("unchecked") + public List<String> getRecordWriterOrder(ClassLoader cl) { + try { + return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, RW_ORDER, cl); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate outputs."); + } + } + public void setInputIndex(int inputNumber, Integer inputTypeNumber) { config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber); } @@ -293,40 +334,77 @@ public class StreamConfig { } } - public int getNumberofChainedTasks() { - return config.getInteger(NUMBER_OF_CHAINED_TASKS, 0); + public void setChainedOutputs(List<String> chainedOutputs) { + config.setBytes(CHAINED_OUTPUTS, + SerializationUtils.serialize((Serializable) chainedOutputs)); } - public void setNumberofChainedTasks(int n) { - config.setInteger(NUMBER_OF_CHAINED_TASKS, n); - } - - public ChainableInvokable<?, ?> getChainedInvokable(int chainedTaskIndex, ClassLoader cl) { + @SuppressWarnings("unchecked") + public List<String> getChainedOutputs(ClassLoader cl) { try { - return (ChainableInvokable<?, ?>) InstantiationUtil.readObjectFromConfig(this.config, - CHAINED_INVOKABLE + chainedTaskIndex, cl); + return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, + CHAINED_OUTPUTS, cl); } catch (Exception e) { - throw new RuntimeException("Could not instantiate invokable."); + throw new RuntimeException("Could not instantiate chained outputs."); } } - public StreamRecordSerializer<?> getChainedInSerializer(int chainedTaskIndex, ClassLoader cl) { + public void setTransitiveChainedTaskConfigs(Map<String, StreamConfig> chainedTaskConfigs) { + config.setBytes(CHAINED_TASK_CONFIG, + SerializationUtils.serialize((Serializable) chainedTaskConfigs)); + } + + @SuppressWarnings("unchecked") + public Map<String, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) { try { - return (StreamRecordSerializer<?>) InstantiationUtil.readObjectFromConfig(this.config, - CHAINED_IN_SERIALIZER + chainedTaskIndex, cl); + + return (Map<String, StreamConfig>) InstantiationUtil.readObjectFromConfig(this.config, + CHAINED_TASK_CONFIG, cl); } catch (Exception e) { - throw new RuntimeException("Could not instantiate serializer."); + throw new RuntimeException("Could not instantiate configuration."); } } - public void setChainedSerializer(StreamRecordSerializer<?> typeWrapper, int chainedTaskIndex) { - config.setBytes(CHAINED_IN_SERIALIZER + chainedTaskIndex, - SerializationUtils.serialize(typeWrapper)); + public void setChainStart() { + config.setBoolean(IS_CHAINED_VERTEX, true); } - public void setChainedInvokable(ChainableInvokable<?, ?> invokable, int chainedTaskIndex) { - config.setBytes(CHAINED_INVOKABLE + chainedTaskIndex, - SerializationUtils.serialize(invokable)); + public boolean isChainStart() { + return config.getBoolean(IS_CHAINED_VERTEX, false); } + @Override + public String toString() { + + ClassLoader cl = getClass().getClassLoader(); + + StringBuilder builder = new StringBuilder(); + builder.append("\n======================="); + builder.append("Stream Config"); + builder.append("======================="); + builder.append("\nTask name: " + getTaskName()); + builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs()); + builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs()); + builder.append("\nOutput names: " + getOutputs(cl)); + builder.append("\nPartitioning:"); + for (String outputname : getOutputs(cl)) { + builder.append("\n\t" + outputname + ": " + + getPartitioner(cl, outputname).getClass().getSimpleName()); + } + + builder.append("\nChained subtasks: " + getChainedOutputs(cl)); + + try { + builder.append("\nInvokable: " + getUserInvokable(cl).getClass().getSimpleName()); + } catch (Exception e) { + builder.append("\nInvokable: Missing"); + } + builder.append("\nBuffer timeout: " + getBufferTimeout()); + if (isChainStart() && getChainedOutputs(cl).size() > 0) { + builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n"); + builder.append(getTransitiveChainedTaskConfigs(cl)).toString(); + } + + return builder.toString(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java index b7e57e0..a95973b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java @@ -30,7 +30,7 @@ public class CollectorWrapper<OUT> implements Collector<OUT> { this.outputs = new LinkedList<Collector<OUT>>(); } - public void addChainedOutput(Collector<OUT> output) { + public void addCollector(Collector<OUT> output) { outputs.add(output); } http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java index 4c21564..6fd1b98 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java @@ -17,11 +17,13 @@ package org.apache.flink.streaming.api.collector; +import java.io.IOException; import java.util.List; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.io.StreamRecordWriter; import org.apache.flink.util.Collector; public class StreamOutput<OUT> implements Collector<SerializationDelegate<StreamRecord<OUT>>> { @@ -52,6 +54,15 @@ public class StreamOutput<OUT> implements Collector<SerializationDelegate<Stream @Override public void close() { + if (output instanceof StreamRecordWriter) { + ((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close(); + } else { + try { + output.flush(); + } catch (IOException e) { + e.printStackTrace(); + } + } } public boolean isSelectAll() { @@ -62,4 +73,8 @@ public class StreamOutput<OUT> implements Collector<SerializationDelegate<Stream return selectedNames; } + public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> getRecordWriter() { + return output; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java index fa374b1..c3e4c9d 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.collector; import java.util.LinkedList; import java.util.List; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.util.Collector; @@ -80,11 +79,6 @@ public class StreamOutputWrapper<OUT> implements Collector<OUT> { outputs.add(output); } - protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, - List<String> outputNames, boolean isSelectAllOutput) { - - } - /** * Collects and emits a tuple/object to the outputs by reusing a * StreamRecord object. http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 60a7b14..99f826d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -19,13 +19,15 @@ package org.apache.flink.streaming.api.streamvertex; import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; +import org.apache.flink.streaming.api.collector.CollectorWrapper; import org.apache.flink.streaming.api.collector.DirectedOutputWrapper; import org.apache.flink.streaming.api.collector.OutputSelector; import org.apache.flink.streaming.api.collector.StreamOutput; @@ -45,83 +47,142 @@ public class OutputHandler<OUT> { private StreamVertex<?, OUT> vertex; private StreamConfig configuration; - - private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs; + private ClassLoader cl; private Collector<OUT> outerCollector; - TypeInformation<OUT> outTypeInfo = null; - StreamRecordSerializer<OUT> outSerializer = null; - SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null; - public List<ChainableInvokable<?, ?>> chainedInvokables; - private int numberOfChainedTasks; + private Map<String, StreamOutput<?>> outputMap; + private Map<String, StreamConfig> chainedConfigs; + private List<String> recordWriterOrder; public OutputHandler(StreamVertex<?, OUT> vertex) { + + // Initialize some fields this.vertex = vertex; - this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); this.configuration = new StreamConfig(vertex.getTaskConfiguration()); this.chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>(); - this.numberOfChainedTasks = configuration.getNumberofChainedTasks(); + this.outputMap = new HashMap<String, StreamOutput<?>>(); + this.cl = vertex.getUserCodeClassLoader(); + + // We read the chained configs, and the order of record writer + // registrations by outputname + this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl); + this.recordWriterOrder = configuration.getRecordWriterOrder(cl); + + // For the network outputs of the chain head we create the stream + // outputs + for (String outName : configuration.getOutputs(cl)) { + StreamOutput<?> streamOutput = createStreamOutput(outName, configuration); + outputMap.put(outName, streamOutput); + } - this.outerCollector = createChainedCollector(0); + // If we have chained tasks we iterate through them and create the + // stream outputs for the network outputs + if (chainedConfigs != null) { + for (StreamConfig config : chainedConfigs.values()) { + for (String outName : config.getOutputs(cl)) { + StreamOutput<?> streamOutput = createStreamOutput(outName, config); + outputMap.put(outName, streamOutput); + } + } + } + + // We create the outer collector that will be passed to the first task + // in the chain + this.outerCollector = createChainedCollector(configuration); } - public List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> getOutputs() { - return outputs; + public Collection<StreamOutput<?>> getOutputs() { + return outputMap.values(); } - // We create the outer collector by nesting the chainable invokables into - // each other + /** + * This method builds up a nested collector which encapsulates all the + * chained operators and their network output. The result of this recursive + * call will be passed as collector to the first invokable in the chain. + * + * @param chainedTaskConfig + * The configuration of the starting operator of the chain, we + * use this paramater to recursively build the whole chain + * @return Returns the collector for the chain starting from the given + * config + */ @SuppressWarnings({ "unchecked", "rawtypes" }) - private Collector<OUT> createChainedCollector(int chainedTaskIndex) { + private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) { - if (numberOfChainedTasks == chainedTaskIndex) { - // At the end of the chain we create the collector that sends data - // to the recordwriters - return createNetworkCollector(); - } else { + // We create a wrapper that will encapsulate the chained operators and + // network outputs + CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(); - ChainableInvokable chainableInvokable = configuration.getChainedInvokable( - chainedTaskIndex, vertex.getUserCodeClassLoader()); + // If the task has network outputs we create a collector for those and + // pass + // it to the wrapper + if (chainedTaskConfig.getNumberOfOutputs() > 0) { + wrapper.addCollector((Collector<OUT>) createNetworkCollector(chainedTaskConfig)); + } - // The nesting is done by calling this method recursively when - // passing the collector to the invokable - chainableInvokable.setup( - createChainedCollector(chainedTaskIndex + 1), - configuration.getChainedInSerializer(chainedTaskIndex, - vertex.getUserCodeClassLoader())); + // If the task has chained outputs we create a chained collector for + // each of them and pass it to the wrapper + for (String output : chainedTaskConfig.getChainedOutputs(cl)) { + wrapper.addCollector(createChainedCollector(chainedConfigs.get(output))); + } - // We hold a list of the chained invokables for initializaton - // afterwards - chainedInvokables.add(chainableInvokable); + if (chainedTaskConfig.isChainStart()) { + // The current task is the first chained task at this vertex so we + // return the wrapper + return wrapper; + } else { + // The current task is a part of the chain so we get the chainable + // invokable which will be returned and set it up using the wrapper + ChainableInvokable chainableInvokable = chainedTaskConfig.getUserInvokable(vertex + .getUserCodeClassLoader()); + chainableInvokable.setup(wrapper, + chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader())); + chainedInvokables.add(chainableInvokable); return chainableInvokable; } } - private Collector<OUT> createNetworkCollector() { + /** + * We create the collector for the network outputs of the task represented + * by the config using the StreamOutputs that we have set up in the + * constructor. + * + * @param config + * The config of the task + * @return We return a collector that represents all the network outputs of + * this task + */ + @SuppressWarnings("unchecked") + private <T> Collector<T> createNetworkCollector(StreamConfig config) { + + StreamRecordSerializer<T> outSerializer = config + .getTypeSerializerOut1(vertex.userClassLoader); + SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null; - createOutSerializer(); + if (outSerializer != null) { + outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer); + outSerializationDelegate.setInstance(outSerializer.createInstance()); + } - StreamOutputWrapper<OUT> collector; + StreamOutputWrapper<T> collector; if (vertex.configuration.isDirectedEmit()) { - OutputSelector<OUT> outputSelector = vertex.configuration + OutputSelector<T> outputSelector = vertex.configuration .getOutputSelector(vertex.userClassLoader); - collector = new DirectedOutputWrapper<OUT>(vertex.getInstanceID(), + collector = new DirectedOutputWrapper<T>(vertex.getInstanceID(), outSerializationDelegate, outputSelector); } else { - collector = new StreamOutputWrapper<OUT>(vertex.getInstanceID(), - outSerializationDelegate); + collector = new StreamOutputWrapper<T>(vertex.getInstanceID(), outSerializationDelegate); } - int numberOfOutputs = configuration.getNumberOfOutputs(); - for (int i = 0; i < numberOfOutputs; i++) { - collector = (StreamOutputWrapper<OUT>) addStreamOutput(i, collector); + for (String output : config.getOutputs(cl)) { + collector.addOutput((StreamOutput<T>) outputMap.get(output)); } return collector; @@ -131,32 +192,35 @@ public class OutputHandler<OUT> { return outerCollector; } - void createOutSerializer() { - outSerializer = configuration.getTypeSerializerOut1(vertex.userClassLoader); - if (outSerializer != null) { - outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer); - outSerializationDelegate.setInstance(outSerializer.createInstance()); - } - } + /** + * We create the StreamOutput for the specific output given by the name, and + * the configuration of its source task + * + * @param name + * Name of the output to which the streamoutput will be set up + * @param configuration + * The config of upStream task + * @return + */ + private <T> StreamOutput<T> createStreamOutput(String name, StreamConfig configuration) { - Collector<OUT> addStreamOutput(int outputNumber, StreamOutputWrapper<OUT> networkCollector) { + int outputNumber = recordWriterOrder.indexOf(name); - StreamPartitioner<OUT> outputPartitioner; + StreamPartitioner<T> outputPartitioner; try { - outputPartitioner = configuration.getPartitioner(vertex.userClassLoader, - outputNumber); + outputPartitioner = configuration.getPartitioner(vertex.userClassLoader, name); } catch (Exception e) { throw new StreamVertexException("Cannot deserialize partitioner for " - + vertex.getName() + " with " + outputNumber + " outputs", e); + + vertex.getName() + " with " + name + " outputs", e); } - RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output; + RecordWriter<SerializationDelegate<StreamRecord<T>>> output; long bufferTimeout = configuration.getBufferTimeout(); if (bufferTimeout >= 0) { - output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex + output = new StreamRecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex .getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout); if (LOG.isTraceEnabled()) { @@ -164,7 +228,7 @@ public class OutputHandler<OUT> { bufferTimeout, vertex.getClass().getSimpleName()); } } else { - output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(vertex + output = new RecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex .getEnvironment().getWriter(outputNumber), outputPartitioner); if (LOG.isTraceEnabled()) { @@ -172,34 +236,28 @@ public class OutputHandler<OUT> { } } - outputs.add(output); - - networkCollector.addOutput(new StreamOutput<OUT>(output, configuration - .isSelectAll(outputNumber) ? null : configuration.getOutputNames(outputNumber))); + StreamOutput<T> streamOutput = new StreamOutput<T>(output, + configuration.isSelectAll(name) ? null : configuration.getOutputNames(name)); if (LOG.isTraceEnabled()) { LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass() .getSimpleName(), outputNumber, vertex.getClass().getSimpleName()); } - return networkCollector; + return streamOutput; } public void flushOutputs() throws IOException, InterruptedException { - for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) { - if (output instanceof StreamRecordWriter) { - ((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close(); - } else { - output.flush(); - } + for (StreamOutput<?> streamOutput : getOutputs()) { + streamOutput.close(); } } public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable) throws IOException, InterruptedException { if (LOG.isDebugEnabled()) { - LOG.debug("{} {} invoked with instance id {}", componentTypeName, - vertex.getName(), vertex.getInstanceID()); + LOG.debug("{} {} invoked with instance id {}", componentTypeName, vertex.getName(), + vertex.getInstanceID()); } try { @@ -210,8 +268,8 @@ public class OutputHandler<OUT> { } if (LOG.isDebugEnabled()) { - LOG.debug("{} {} invoke finished instance id {}", componentTypeName, - vertex.getName(), vertex.getInstanceID()); + LOG.debug("{} {} invoke finished instance id {}", componentTypeName, vertex.getName(), + vertex.getInstanceID()); } flushOutputs(); http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java index 43b455e..cba23b8 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java @@ -17,19 +17,22 @@ package org.apache.flink.streaming.api.streamvertex; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.collector.StreamOutput; import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.io.BlockingQueueBroker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT> { +public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT, OUT> { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); @@ -72,6 +75,15 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT } StreamRecord<OUT> nextRecord; + StreamRecordSerializer<OUT> serializer = configuration + .getTypeSerializerOut1(userClassLoader); + SerializationDelegate<StreamRecord<OUT>> serializationDelegate = new SerializationDelegate<StreamRecord<OUT>>( + serializer); + + List<StreamOutput<OUT>> outputs = new LinkedList<StreamOutput<OUT>>(); + for (StreamOutput<?> output : outputHandler.getOutputs()) { + outputs.add((StreamOutput<OUT>) output); + } while (true) { if (shouldWait) { @@ -82,10 +94,9 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT if (nextRecord == null) { break; } - for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputHandler - .getOutputs()) { - outputHandler.outSerializationDelegate.setInstance(nextRecord); - output.emit(outputHandler.outSerializationDelegate); + for (StreamOutput<OUT> output : outputs) { + serializationDelegate.setInstance(nextRecord); + output.collect(serializationDelegate); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7dbb55ec/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 53b75a0..698b193 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -46,6 +46,7 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.streaming.scala.ScalaStreamingAggregator +import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy class DataStream[T](javaStream: JavaStream[T]) { @@ -78,6 +79,15 @@ class DataStream[T](javaStream: JavaStream[T]) { " " + "parallelism.") } + + def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = { + javaStream match { + case ds: SingleOutputStreamOperator[_, _] => ds.setChainingStrategy(strategy) + case _ => + throw new UnsupportedOperationException("Only supported for operators.") + } + this + } /** * Creates a new DataStream by merging DataStream outputs of
