[FLINK-1345] [streaming] Added support for chaining operators with directed outputs
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92947f0d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92947f0d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92947f0d Branch: refs/heads/master Commit: 92947f0d8308afa74b666e11d5ec3af08f0198c1 Parents: e3b608c Author: Gyula Fora <[email protected]> Authored: Wed Jan 21 13:41:44 2015 +0100 Committer: mbalassi <[email protected]> Committed: Wed Jan 21 16:06:34 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/StreamConfig.java | 23 ++- .../apache/flink/streaming/api/StreamGraph.java | 22 ++- .../api/StreamingJobGraphGenerator.java | 18 +-- .../api/collector/CollectorWrapper.java | 7 +- .../api/collector/DirectedCollectorWrapper.java | 128 +++++++++++++++++ .../api/collector/DirectedOutputWrapper.java | 144 ------------------- .../streaming/api/collector/StreamOutput.java | 56 ++++---- .../api/collector/StreamOutputWrapper.java | 116 --------------- .../streaming/api/datastream/DataStream.java | 5 +- .../api/datastream/SplitDataStream.java | 7 +- .../environment/StreamExecutionEnvironment.java | 2 +- .../api/streamvertex/OutputHandler.java | 95 +++++------- .../api/streamvertex/StreamIterationHead.java | 12 +- .../api/streamvertex/StreamIterationTail.java | 3 +- .../api/collector/StreamCollectorTest.java | 96 ++++++------- 15 files changed, 283 insertions(+), 451 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 213a892..94349d7 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.api; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,7 +47,6 @@ public class StreamConfig implements Serializable { private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_"; private static final String IS_CHAINED_VERTEX = "isChainedSubtask"; private static final String OUTPUT_NAME = "outputName_"; - private static final String OUTPUT_SELECT_ALL = "outputSelectAll_"; private static final String PARTITIONER_OBJECT = "partitionerObject_"; private static final String VERTEX_NAME = "vertexName"; private static final String ITERATION_ID = "iteration-id"; @@ -241,25 +241,18 @@ public class StreamConfig implements Serializable { } } - public void setSelectAll(String output, Boolean selectAll) { - if (selectAll != null) { - config.setBoolean(OUTPUT_SELECT_ALL + output, selectAll); - } - } - - public boolean isSelectAll(String output) { - return config.getBoolean(OUTPUT_SELECT_ALL + output, true); - } - - public void setOutputNames(String output, List<String> outputName) { - if (outputName != null) { + public void setSelectedNames(String output, List<String> selected) { + if (selected != null) { + config.setBytes(OUTPUT_NAME + output, + SerializationUtils.serialize((Serializable) selected)); + } else { config.setBytes(OUTPUT_NAME + output, - SerializationUtils.serialize((Serializable) outputName)); + SerializationUtils.serialize((Serializable) new ArrayList<String>())); } } @SuppressWarnings("unchecked") - public List<String> getOutputNames(String output) { + public List<String> getSelectedNames(String output) { return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output, null)); } http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 3dd1bdb..c9ecd55 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -58,7 +58,6 @@ public class StreamGraph { private Map<String, List<String>> outEdgeLists; private Map<String, List<Integer>> outEdgeTypes; private Map<String, List<List<String>>> selectedNames; - private Map<String, List<Boolean>> outEdgeSelectAlls; private Map<String, List<String>> inEdgeLists; private Map<String, List<StreamPartitioner<?>>> outputPartitioners; private Map<String, String> operatorNames; @@ -94,7 +93,6 @@ public class StreamGraph { outEdgeLists = new HashMap<String, List<String>>(); outEdgeTypes = new HashMap<String, List<Integer>>(); selectedNames = new HashMap<String, List<List<String>>>(); - outEdgeSelectAlls = new HashMap<String, List<Boolean>>(); inEdgeLists = new HashMap<String, List<String>>(); outputPartitioners = new HashMap<String, List<StreamPartitioner<?>>>(); operatorNames = new HashMap<String, String>(); @@ -186,7 +184,7 @@ public class StreamGraph { setEdge(vertexName, iterationHead, outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0, - new ArrayList<String>(), false); + new ArrayList<String>()); iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), waitTime); @@ -274,7 +272,6 @@ public class StreamGraph { outEdgeLists.put(vertexName, new ArrayList<String>()); outEdgeTypes.put(vertexName, new ArrayList<Integer>()); selectedNames.put(vertexName, new ArrayList<List<String>>()); - outEdgeSelectAlls.put(vertexName, new ArrayList<Boolean>()); inEdgeLists.put(vertexName, new ArrayList<String>()); outputPartitioners.put(vertexName, new ArrayList<StreamPartitioner<?>>()); iterationTailCount.put(vertexName, 0); @@ -296,14 +293,12 @@ public class StreamGraph { * User defined names of the out edge */ public void setEdge(String upStreamVertexName, String downStreamVertexName, - StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames, - boolean selectAll) { + StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) { outEdgeLists.get(upStreamVertexName).add(downStreamVertexName); outEdgeTypes.get(upStreamVertexName).add(typeNumber); inEdgeLists.get(downStreamVertexName).add(upStreamVertexName); outputPartitioners.get(upStreamVertexName).add(partitionerObject); selectedNames.get(upStreamVertexName).add(outputNames); - outEdgeSelectAlls.get(upStreamVertexName).add(selectAll); } private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1, @@ -494,16 +489,15 @@ public class StreamGraph { return outEdgeTypes.get(vertexName); } - public StreamPartitioner<?> getOutPartitioner(String vertexName, int outputIndex) { - return outputPartitioners.get(vertexName).get(outputIndex); + public StreamPartitioner<?> getOutPartitioner(String upStreamVertex, String downStreamVertex) { + return outputPartitioners.get(upStreamVertex).get( + outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex)); } - public List<String> getSelectedNames(String vertexName, int outputIndex) { - return selectedNames.get(vertexName).get(outputIndex); - } + public List<String> getSelectedNames(String upStreamVertex, String downStreamVertex) { - public Boolean isSelectAll(String vertexName, int outputIndex) { - return outEdgeSelectAlls.get(vertexName).get(outputIndex); + return selectedNames.get(upStreamVertex).get( + outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex)); } public Collection<Integer> getIterationIDs() { http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index 6c0cc20..3b2d135 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -206,6 +206,13 @@ public class StreamingJobGraphGenerator { config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexName)); } + List<String> allOutputs = new ArrayList<String>(chainableOutputs); + allOutputs.addAll(nonChainableOutputs); + + for (String output : allOutputs) { + config.setSelectedNames(output, streamGraph.getSelectedNames(vertexName, output)); + } + vertexConfigs.put(vertexName, config); } @@ -229,7 +236,7 @@ public class StreamingJobGraphGenerator { downStreamConfig.setNumberOfInputs(numOfInputs); StreamPartitioner<?> partitioner = streamGraph.getOutPartitioner(upStreamVertexName, - outputIndex); + downStreamVertexName); upStreamConfig.setPartitioner(downStreamVertexName, partitioner); @@ -243,11 +250,6 @@ public class StreamingJobGraphGenerator { LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(), headOfChain, downStreamVertexName); } - - upStreamConfig.setOutputNames(downStreamVertexName, - streamGraph.getSelectedNames(upStreamVertexName, outputIndex)); - upStreamConfig.setSelectAll(downStreamVertexName, - streamGraph.isSelectAll(upStreamVertexName, outputIndex)); } private boolean isChainable(String vertexName, String outName) { @@ -257,12 +259,10 @@ public class StreamingJobGraphGenerator { return streamGraph.getInEdges(outName).size() == 1 && outInvokable != null - && streamGraph.getOutputSelector(vertexName) == null && outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS && (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable .getChainingStrategy() == ChainingStrategy.ALWAYS) - && streamGraph.getOutPartitioner(vertexName, - streamGraph.getOutEdges(vertexName).indexOf(outName)).getStrategy() == PartitioningStrategy.FORWARD + && streamGraph.getOutPartitioner(vertexName, outName).getStrategy() == PartitioningStrategy.FORWARD && streamGraph.getParallelism(vertexName) == streamGraph.getParallelism(outName) && streamGraph.chaining; } http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java index a95973b..1281bf0 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java @@ -24,14 +24,15 @@ import org.apache.flink.util.Collector; public class CollectorWrapper<OUT> implements Collector<OUT> { - List<Collector<OUT>> outputs; + private List<Collector<OUT>> outputs; public CollectorWrapper() { this.outputs = new LinkedList<Collector<OUT>>(); } - public void addCollector(Collector<OUT> output) { - outputs.add(output); + @SuppressWarnings("unchecked") + public void addCollector(Collector<?> output) { + outputs.add((Collector<OUT>) output); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java new file mode 100755 index 0000000..66fb667 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A StreamCollector that uses user defined output names and a user defined + * output selector to make directed emits. + * + * @param <OUT> + * Type of the Tuple collected. + */ +public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(DirectedCollectorWrapper.class); + + OutputSelector<OUT> outputSelector; + + protected Map<String, List<Collector<OUT>>> outputMap; + + private List<Collector<OUT>> selectAllOutputs; + private Set<Collector<OUT>> emitted; + + /** + * Creates a new DirectedStreamCollector + * + * @param outputSelector + * User defined {@link OutputSelector} + */ + public DirectedCollectorWrapper(OutputSelector<OUT> outputSelector) { + this.outputSelector = outputSelector; + this.emitted = new HashSet<Collector<OUT>>(); + this.selectAllOutputs = new LinkedList<Collector<OUT>>(); + this.outputMap = new HashMap<String, List<Collector<OUT>>>(); + + } + + @Override + public void addCollector(Collector<?> output) { + addCollector(output, new ArrayList<String>()); + } + + @SuppressWarnings("unchecked") + public void addCollector(Collector<?> output, List<String> selectedNames) { + + if (selectedNames.isEmpty()) { + selectAllOutputs.add((Collector<OUT>) output); + } else { + for (String selectedName : selectedNames) { + + if (!outputMap.containsKey(selectedName)) { + outputMap.put(selectedName, new LinkedList<Collector<OUT>>()); + outputMap.get(selectedName).add((Collector<OUT>) output); + } else { + if (!outputMap.get(selectedName).contains(output)) { + outputMap.get(selectedName).add((Collector<OUT>) output); + } + } + + } + } + } + + @Override + public void collect(OUT record) { + emitted.clear(); + + Iterable<String> outputNames = outputSelector.select(record); + + for (Collector<OUT> output : selectAllOutputs) { + output.collect(record); + emitted.add(output); + } + + for (String outputName : outputNames) { + List<Collector<OUT>> outputList = outputMap.get(outputName); + if (outputList == null) { + if (LOG.isErrorEnabled()) { + String format = String.format( + "Cannot emit because no output is selected with the name: %s", + outputName); + LOG.error(format); + + } + } else { + for (Collector<OUT> output : outputList) { + if (!emitted.contains(output)) { + output.collect(record); + emitted.add(output); + } + } + + } + + } + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java deleted file mode 100755 index 6ff03c4..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A StreamCollector that uses user defined output names and a user defined - * output selector to make directed emits. - * - * @param <OUT> - * Type of the Tuple collected. - */ -public class DirectedOutputWrapper<OUT> extends StreamOutputWrapper<OUT> { - - private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputWrapper.class); - - OutputSelector<OUT> outputSelector; - - protected Map<String, List<StreamOutput<OUT>>> outputMap; - - private List<StreamOutput<OUT>> selectAllOutputs; - private Set<StreamOutput<OUT>> emitted; - - /** - * Creates a new DirectedStreamCollector - * - * @param channelID - * Channel ID of the Task - * @param serializationDelegate - * Serialization delegate used for serialization - * @param outputSelector - * User defined {@link OutputSelector} - */ - public DirectedOutputWrapper(int channelID, - SerializationDelegate<StreamRecord<OUT>> serializationDelegate, - OutputSelector<OUT> outputSelector) { - super(channelID, serializationDelegate); - this.outputSelector = outputSelector; - this.emitted = new HashSet<StreamOutput<OUT>>(); - this.selectAllOutputs = new LinkedList<StreamOutput<OUT>>(); - this.outputMap = new HashMap<String, List<StreamOutput<OUT>>>(); - - } - - @Override - public void addOutput(StreamOutput<OUT> output) { - - super.addOutput(output); - - if (output.isSelectAll()) { - selectAllOutputs.add(output); - } else { - for (String selectedName : output.getSelectedNames()) { - if (selectedName != null) { - if (!outputMap.containsKey(selectedName)) { - outputMap.put(selectedName, new LinkedList<StreamOutput<OUT>>()); - outputMap.get(selectedName).add(output); - } else { - if (!outputMap.get(selectedName).contains(output)) { - outputMap.get(selectedName).add(output); - } - } - - } - } - } - } - - @Override - protected void emit() { - Iterable<String> outputNames = outputSelector.select(serializationDelegate.getInstance() - .getObject()); - emitted.clear(); - - for (StreamOutput<OUT> output : selectAllOutputs) { - try { - output.collect(serializationDelegate); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Emit to {} failed due to: {}", output, - StringUtils.stringifyException(e)); - } - } - } - - for (String outputName : outputNames) { - List<StreamOutput<OUT>> outputList = outputMap.get(outputName); - try { - if (outputList == null) { - if (LOG.isErrorEnabled()) { - String format = String.format( - "Cannot emit because no output is selected with the name: %s", - outputName); - LOG.error(format); - - } - } else { - - for (StreamOutput<OUT> output : outputList) { - if (!emitted.contains(output)) { - output.collect(serializationDelegate); - emitted.add(output); - } - } - - } - - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Emit to {} failed due to: {}", outputName, - StringUtils.stringifyException(e)); - } - } - - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java index 6fd1b98..4551c5a 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java @@ -18,37 +18,55 @@ package org.apache.flink.streaming.api.collector; import java.io.IOException; -import java.util.List; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.io.StreamRecordWriter; import org.apache.flink.util.Collector; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class StreamOutput<OUT> implements Collector<SerializationDelegate<StreamRecord<OUT>>> { +public class StreamOutput<OUT> implements Collector<OUT> { - private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output; + private static final Logger LOG = LoggerFactory.getLogger(StreamOutput.class); - private List<String> selectedNames; - private boolean selectAll = true; + private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output; + private SerializationDelegate<StreamRecord<OUT>> serializationDelegate; + private StreamRecord<OUT> streamRecord; + private int channelID; public StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, - List<String> selectedNames) { + int channelID, SerializationDelegate<StreamRecord<OUT>> serializationDelegate) { - this.output = output; + this.serializationDelegate = serializationDelegate; - if (selectedNames != null) { - this.selectedNames = selectedNames; - selectAll = false; + if (serializationDelegate != null) { + this.streamRecord = serializationDelegate.getInstance(); + } else { + throw new RuntimeException("Serializer cannot be null"); } + this.channelID = channelID; + this.output = output; } - public void collect(SerializationDelegate<StreamRecord<OUT>> record) { + public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> getRecordWriter() { + return output; + } + + @Override + public void collect(OUT record) { + streamRecord.setObject(record); + streamRecord.newId(channelID); + serializationDelegate.setInstance(streamRecord); + try { - output.emit(record); + output.emit(serializationDelegate); } catch (Exception e) { - throw new RuntimeException("Could not emit record: " + record.getInstance()); + if (LOG.isErrorEnabled()) { + LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e)); + } } } @@ -65,16 +83,4 @@ public class StreamOutput<OUT> implements Collector<SerializationDelegate<Stream } } - public boolean isSelectAll() { - return selectAll; - } - - public List<String> getSelectedNames() { - return selectedNames; - } - - public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> getRecordWriter() { - return output; - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java deleted file mode 100755 index c3e4c9d..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.util.LinkedList; -import java.util.List; - -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; -import org.apache.flink.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Collector for tuples in Apache Flink stream processing. The collected values - * will be wrapped with ID in a {@link StreamRecord} and then emitted to the - * outputs. - * - * @param <OUT> - * Type of the Tuples/Objects collected. - */ -public class StreamOutputWrapper<OUT> implements Collector<OUT> { - - private static final Logger LOG = LoggerFactory.getLogger(StreamOutputWrapper.class); - - protected StreamRecord<OUT> streamRecord; - protected int channelID; - protected List<StreamOutput<OUT>> outputs; - protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate; - - /** - * Creates a new StreamCollector - * - * @param channelID - * Channel ID of the Task - * @param serializationDelegate - * Serialization delegate used for serialization - */ - public StreamOutputWrapper(int channelID, - SerializationDelegate<StreamRecord<OUT>> serializationDelegate) { - this.serializationDelegate = serializationDelegate; - - if (serializationDelegate != null) { - this.streamRecord = serializationDelegate.getInstance(); - } else { - this.streamRecord = new StreamRecord<OUT>(); - } - this.channelID = channelID; - this.outputs = new LinkedList<StreamOutput<OUT>>(); - } - - /** - * Adds an output with the given user defined name - * - * @param output - * The RecordWriter object representing the output. - * @param outputNames - * User defined names of the output. - * @param isSelectAllOutput - * Marks whether all the outputs are selected. - */ - public void addOutput(StreamOutput<OUT> output) { - outputs.add(output); - } - - /** - * Collects and emits a tuple/object to the outputs by reusing a - * StreamRecord object. - * - * @param outputObject - * Object to be collected and emitted. - */ - @Override - public void collect(OUT outputObject) { - streamRecord.setObject(outputObject); - streamRecord.newId(channelID); - serializationDelegate.setInstance(streamRecord); - - emit(); - } - - /** - * Emits the current streamrecord to the outputs. - */ - protected void emit() { - for (StreamOutput<OUT> output : outputs) { - try { - output.collect(serializationDelegate); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e)); - } - } - } - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 8e7d823..0f633ec 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -96,7 +96,6 @@ public class DataStream<OUT> { protected final String id; protected int degreeOfParallelism; protected List<String> userDefinedNames; - protected boolean selectAll; protected StreamPartitioner<OUT> partitioner; @SuppressWarnings("rawtypes") protected TypeInformation typeInfo; @@ -127,7 +126,6 @@ public class DataStream<OUT> { this.degreeOfParallelism = environment.getDegreeOfParallelism(); this.streamGraph = environment.getStreamGraph(); this.userDefinedNames = new ArrayList<String>(); - this.selectAll = true; this.partitioner = new DistributePartitioner<OUT>(true); this.typeInfo = typeInfo; this.mergedStreams = new ArrayList<DataStream<OUT>>(); @@ -145,7 +143,6 @@ public class DataStream<OUT> { this.id = dataStream.id; this.degreeOfParallelism = dataStream.degreeOfParallelism; this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames); - this.selectAll = dataStream.selectAll; this.partitioner = dataStream.partitioner; this.streamGraph = dataStream.streamGraph; this.typeInfo = dataStream.typeInfo; @@ -1158,7 +1155,7 @@ public class DataStream<OUT> { protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) { for (DataStream<X> stream : inputStream.mergedStreams) { streamGraph.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber, - inputStream.userDefinedNames, inputStream.selectAll); + inputStream.userDefinedNames); } } http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java index 1bf4f9c..2b5b7c5 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java @@ -49,8 +49,13 @@ public class SplitDataStream<OUT> extends DataStream<OUT> { } private DataStream<OUT> selectOutput(String[] outputNames) { + for (String outName : outputNames) { + if (outName == null) { + throw new RuntimeException("Selected names must not be null"); + } + } + DataStream<OUT> returnStream = copy(); - returnStream.selectAll = false; returnStream.userDefinedNames = Arrays.asList(outputNames); return returnStream; } http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 51dc0ae..8a9be93 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -313,7 +313,7 @@ public abstract class StreamExecutionEnvironment { */ public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) { return addSource(new SocketTextStreamFunction(hostname, port, delimiter), null, - "socketStrean"); + "socketStream"); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 135f742..0d60939 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -29,10 +29,8 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; import org.apache.flink.streaming.api.collector.CollectorWrapper; -import org.apache.flink.streaming.api.collector.DirectedOutputWrapper; -import org.apache.flink.streaming.api.collector.OutputSelector; +import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper; import org.apache.flink.streaming.api.collector.StreamOutput; -import org.apache.flink.streaming.api.collector.StreamOutputWrapper; import org.apache.flink.streaming.api.invokable.ChainableInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecord; @@ -70,7 +68,7 @@ public class OutputHandler<OUT> { // registrations by outputname this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl); this.chainedConfigs.put(configuration.getTaskName(), configuration); - + this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl); // We iterate through all the out edges from this job vertex and create @@ -105,21 +103,35 @@ public class OutputHandler<OUT> { @SuppressWarnings({ "unchecked", "rawtypes" }) private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) { + boolean isDirectEmit = chainedTaskConfig.isDirectedEmit(); + // We create a wrapper that will encapsulate the chained operators and // network outputs - CollectorWrapper<OUT> wrapper = new CollectorWrapper<OUT>(); + CollectorWrapper<OUT> wrapper = isDirectEmit ? new DirectedCollectorWrapper( + chainedTaskConfig.getOutputSelector(cl)) : new CollectorWrapper<OUT>(); + + // Create collectors for the network outputs + for (String output : chainedTaskConfig.getOutputs(cl)) { - // If the task has network outputs we create a collector for those and - // pass - // it to the wrapper - if (chainedTaskConfig.getNumberOfOutputs() > 0) { - wrapper.addCollector((Collector<OUT>) createNetworkCollector(chainedTaskConfig)); + Collector<?> outCollector = outputMap.get(output); + + if (isDirectEmit) { + ((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector, + chainedTaskConfig.getSelectedNames(output)); + } else { + wrapper.addCollector(outCollector); + } } - // If the task has chained outputs we create a chained collector for - // each of them and pass it to the wrapper + // Create collectors for the chained outputs for (String output : chainedTaskConfig.getChainedOutputs(cl)) { - wrapper.addCollector(createChainedCollector(chainedConfigs.get(output))); + Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output)); + if (isDirectEmit) { + ((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector, + chainedTaskConfig.getSelectedNames(output)); + } else { + wrapper.addCollector(outCollector); + } } if (chainedTaskConfig.isChainStart()) { @@ -140,47 +152,6 @@ public class OutputHandler<OUT> { } - /** - * We create the collector for the network outputs of the task represented - * by the config using the StreamOutputs that we have set up in the - * constructor. - * - * @param config - * The config of the task - * @return We return a collector that represents all the network outputs of - * this task - */ - @SuppressWarnings("unchecked") - private <T> Collector<T> createNetworkCollector(StreamConfig config) { - - StreamRecordSerializer<T> outSerializer = config - .getTypeSerializerOut1(vertex.userClassLoader); - SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null; - - if (outSerializer != null) { - outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer); - outSerializationDelegate.setInstance(outSerializer.createInstance()); - } - - StreamOutputWrapper<T> collector; - - if (vertex.configuration.isDirectedEmit()) { - OutputSelector<T> outputSelector = vertex.configuration - .getOutputSelector(vertex.userClassLoader); - - collector = new DirectedOutputWrapper<T>(vertex.getInstanceID(), - outSerializationDelegate, outputSelector); - } else { - collector = new StreamOutputWrapper<T>(vertex.getInstanceID(), outSerializationDelegate); - } - - for (String output : config.getOutputs(cl)) { - collector.addOutput((StreamOutput<T>) outputMap.get(output)); - } - - return collector; - } - public Collector<OUT> getCollector() { return outerCollector; } @@ -197,7 +168,16 @@ public class OutputHandler<OUT> { */ private <T> StreamOutput<T> createStreamOutput(String outputVertex, StreamConfig configuration, int outputIndex) { - + + StreamRecordSerializer<T> outSerializer = configuration + .getTypeSerializerOut1(vertex.userClassLoader); + SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null; + + if (outSerializer != null) { + outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer); + outSerializationDelegate.setInstance(outSerializer.createInstance()); + } + StreamPartitioner<T> outputPartitioner = configuration.getPartitioner(cl, outputVertex); RecordWriter<SerializationDelegate<StreamRecord<T>>> output; @@ -221,9 +201,8 @@ public class OutputHandler<OUT> { } } - StreamOutput<T> streamOutput = new StreamOutput<T>(output, - configuration.isSelectAll(outputVertex) ? null - : configuration.getOutputNames(outputVertex)); + StreamOutput<T> streamOutput = new StreamOutput<T>(output, vertex.instanceID, + outSerializationDelegate); if (LOG.isTraceEnabled()) { LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass() http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java index cba23b8..bc89241 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java @@ -23,16 +23,13 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.collector.StreamOutput; import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; import org.apache.flink.streaming.io.BlockingQueueBroker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT, OUT> { +public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); @@ -75,10 +72,6 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT, OU } StreamRecord<OUT> nextRecord; - StreamRecordSerializer<OUT> serializer = configuration - .getTypeSerializerOut1(userClassLoader); - SerializationDelegate<StreamRecord<OUT>> serializationDelegate = new SerializationDelegate<StreamRecord<OUT>>( - serializer); List<StreamOutput<OUT>> outputs = new LinkedList<StreamOutput<OUT>>(); for (StreamOutput<?> output : outputHandler.getOutputs()) { @@ -95,8 +88,7 @@ public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT, OU break; } for (StreamOutput<OUT> output : outputs) { - serializationDelegate.setInstance(nextRecord); - output.collect(serializationDelegate); + output.collect(nextRecord.getObject()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java index 1883c06..b3ecdf8 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java @@ -20,14 +20,13 @@ package org.apache.flink.streaming.api.streamvertex; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.io.BlockingQueueBroker; import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN, IN> { +public class StreamIterationTail<IN> extends StreamVertex<IN, IN> { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class); http://git-wip-us.apache.org/repos/asf/flink/blob/92947f0d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java index 0bc3d7d..49b3bf8 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java @@ -1,54 +1,52 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import static org.junit.Assert.assertArrayEquals; + */ -import java.util.ArrayList; +package org.apache.flink.streaming.api.collector; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import static org.junit.Assert.assertArrayEquals; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamvertex.MockRecordWriter; -import org.apache.flink.streaming.util.MockRecordWriterFactory; -import org.junit.Test; - -public class StreamCollectorTest { - - @Test - public void testCollect() { - MockRecordWriter recWriter = MockRecordWriterFactory.create(); - SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new SerializationDelegate<StreamRecord<Tuple1<Integer>>>( - null); - sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>())); - - StreamOutputWrapper<Tuple1<Integer>> collector = new StreamOutputWrapper<Tuple1<Integer>>(2, sd); - collector.addOutput(new StreamOutput<Tuple1<Integer>>(recWriter, new ArrayList<String>())); - collector.collect(new Tuple1<Integer>(3)); - collector.collect(new Tuple1<Integer>(4)); - collector.collect(new Tuple1<Integer>(5)); - collector.collect(new Tuple1<Integer>(6)); - - assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, recWriter.emittedRecords.toArray()); - } - - @Test - public void testClose() { - } - -} +import org.apache.flink.streaming.util.MockRecordWriterFactory; +import org.apache.flink.util.Collector; +import org.junit.Test; + +public class StreamCollectorTest { + + @Test + public void testCollect() { + MockRecordWriter recWriter = MockRecordWriterFactory.create(); + SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new SerializationDelegate<StreamRecord<Tuple1<Integer>>>( + null); + sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>())); + + Collector<Tuple1<Integer>> collector = new StreamOutput<Tuple1<Integer>>(recWriter, 2, sd); + collector.collect(new Tuple1<Integer>(3)); + collector.collect(new Tuple1<Integer>(4)); + collector.collect(new Tuple1<Integer>(5)); + collector.collect(new Tuple1<Integer>(6)); + + assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, recWriter.emittedRecords.toArray()); + } + + @Test + public void testClose() { + } + +}
