[streaming] Streaming runtime collector rework for increased flexibility with directed outputs and operator chains
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9b942b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9b942b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9b942b3 Branch: refs/heads/master Commit: d9b942b38cea01bf78ffab22a164b6622d629bc5 Parents: dc0d81b Author: Gyula Fora <[email protected]> Authored: Thu Jan 15 11:14:33 2015 +0100 Committer: mbalassi <[email protected]> Committed: Wed Jan 21 16:06:33 2015 +0100 ---------------------------------------------------------------------- .../api/collector/CollectorWrapper.java | 48 ++++++ .../api/collector/DirectedOutputWrapper.java | 144 ++++++++++++++++++ .../api/collector/DirectedStreamCollector.java | 130 ---------------- .../api/collector/StreamCollector.java | 148 ------------------- .../streaming/api/collector/StreamOutput.java | 65 ++++++++ .../api/collector/StreamOutputWrapper.java | 122 +++++++++++++++ .../api/streamvertex/InputHandler.java | 1 - .../api/streamvertex/OutputHandler.java | 26 ++-- .../flink/streaming/io/StreamRecordWriter.java | 2 +- .../api/collector/StreamCollectorTest.java | 4 +- .../runtime/operators/RegularPactTask.java | 2 - 11 files changed, 396 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java new file mode 100644 index 0000000..d782d08 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.flink.util.Collector; + +public class CollectorWrapper<OUT> implements Collector<OUT> { + + List<Collector<OUT>> outputs; + + public CollectorWrapper() { + this.outputs = new LinkedList<Collector<OUT>>(); + } + + public void addChainedOutput(Collector<OUT> output) { + outputs.add(output); + } + + @Override + public void collect(OUT record) { + for(Collector<OUT> output: outputs){ + output.collect(record);; + } + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java new file mode 100755 index 0000000..6ff03c4 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A StreamCollector that uses user defined output names and a user defined + * output selector to make directed emits. + * + * @param <OUT> + * Type of the Tuple collected. + */ +public class DirectedOutputWrapper<OUT> extends StreamOutputWrapper<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(DirectedOutputWrapper.class); + + OutputSelector<OUT> outputSelector; + + protected Map<String, List<StreamOutput<OUT>>> outputMap; + + private List<StreamOutput<OUT>> selectAllOutputs; + private Set<StreamOutput<OUT>> emitted; + + /** + * Creates a new DirectedStreamCollector + * + * @param channelID + * Channel ID of the Task + * @param serializationDelegate + * Serialization delegate used for serialization + * @param outputSelector + * User defined {@link OutputSelector} + */ + public DirectedOutputWrapper(int channelID, + SerializationDelegate<StreamRecord<OUT>> serializationDelegate, + OutputSelector<OUT> outputSelector) { + super(channelID, serializationDelegate); + this.outputSelector = outputSelector; + this.emitted = new HashSet<StreamOutput<OUT>>(); + this.selectAllOutputs = new LinkedList<StreamOutput<OUT>>(); + this.outputMap = new HashMap<String, List<StreamOutput<OUT>>>(); + + } + + @Override + public void addOutput(StreamOutput<OUT> output) { + + super.addOutput(output); + + if (output.isSelectAll()) { + selectAllOutputs.add(output); + } else { + for (String selectedName : output.getSelectedNames()) { + if (selectedName != null) { + if (!outputMap.containsKey(selectedName)) { + outputMap.put(selectedName, new LinkedList<StreamOutput<OUT>>()); + outputMap.get(selectedName).add(output); + } else { + if (!outputMap.get(selectedName).contains(output)) { + outputMap.get(selectedName).add(output); + } + } + + } + } + } + } + + @Override + protected void emit() { + Iterable<String> outputNames = outputSelector.select(serializationDelegate.getInstance() + .getObject()); + emitted.clear(); + + for (StreamOutput<OUT> output : selectAllOutputs) { + try { + output.collect(serializationDelegate); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Emit to {} failed due to: {}", output, + StringUtils.stringifyException(e)); + } + } + } + + for (String outputName : outputNames) { + List<StreamOutput<OUT>> outputList = outputMap.get(outputName); + try { + if (outputList == null) { + if (LOG.isErrorEnabled()) { + String format = String.format( + "Cannot emit because no output is selected with the name: %s", + outputName); + LOG.error(format); + + } + } else { + + for (StreamOutput<OUT> output : outputList) { + if (!emitted.contains(output)) { + output.collect(serializationDelegate); + emitted.add(output); + } + } + + } + + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Emit to {} failed due to: {}", outputName, + StringUtils.stringifyException(e)); + } + } + + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java deleted file mode 100755 index f37761f..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A StreamCollector that uses user defined output names and a user defined - * output selector to make directed emits. - * - * @param <OUT> - * Type of the Tuple collected. - */ -public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> { - - private static final Logger LOG = LoggerFactory.getLogger(DirectedStreamCollector.class); - - OutputSelector<OUT> outputSelector; - private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> selectAllOutputs; - private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted; - - /** - * Creates a new DirectedStreamCollector - * - * @param channelID - * Channel ID of the Task - * @param serializationDelegate - * Serialization delegate used for serialization - * @param outputSelector - * User defined {@link OutputSelector} - */ - public DirectedStreamCollector(int channelID, - SerializationDelegate<StreamRecord<OUT>> serializationDelegate, - OutputSelector<OUT> outputSelector) { - super(channelID, serializationDelegate); - this.outputSelector = outputSelector; - this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); - this.selectAllOutputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); - } - - @Override - public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, - List<String> outputNames, boolean isSelectAllOutput) { - - if (isSelectAllOutput) { - selectAllOutputs.add(output); - } else { - addOneOutput(output, outputNames, isSelectAllOutput); - } - } - - /** - * Emits a StreamRecord to the outputs selected by the user defined - * OutputSelector - * - */ - protected void emitToOutputs() { - Iterable<String> outputNames = outputSelector.select(streamRecord.getObject()); - emitted.clear(); - - for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) { - try { - output.emit(serializationDelegate); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Emit to {} failed due to: {}", output, - StringUtils.stringifyException(e)); - } - } - } - emitted.addAll(selectAllOutputs); - - for (String outputName : outputNames) { - List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList = outputMap - .get(outputName); - try { - if (outputList == null) { - if (LOG.isErrorEnabled()) { - String format = String.format( - "Cannot emit because no output is selected with the name: %s", - outputName); - LOG.error(format); - - } - } else { - - for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputList) { - if (!emitted.contains(output)) { - output.emit(serializationDelegate); - emitted.add(output); - } - } - - } - - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Emit to {} failed due to: {}", outputName, - StringUtils.stringifyException(e)); - } - } - - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java deleted file mode 100755 index a1b9e66..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.collector; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.plugable.SerializationDelegate; -import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.util.Collector; -import org.apache.flink.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Collector for tuples in Apache Flink stream processing. The collected values - * will be wrapped with ID in a {@link StreamRecord} and then emitted to the - * outputs. - * - * @param <OUT> - * Type of the Tuples/Objects collected. - */ -public class StreamCollector<OUT> implements Collector<OUT> { - - private static final Logger LOG = LoggerFactory.getLogger(StreamCollector.class); - - protected StreamRecord<OUT> streamRecord; - protected int channelID; - protected List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs; - protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>> outputMap; - protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate; - - /** - * Creates a new StreamCollector - * - * @param channelID - * Channel ID of the Task - * @param serializationDelegate - * Serialization delegate used for serialization - */ - public StreamCollector(int channelID, - SerializationDelegate<StreamRecord<OUT>> serializationDelegate) { - this.serializationDelegate = serializationDelegate; - - if (serializationDelegate != null) { - this.streamRecord = serializationDelegate.getInstance(); - } else { - this.streamRecord = new StreamRecord<OUT>(); - } - this.channelID = channelID; - this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>(); - this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>(); - } - - /** - * Adds an output with the given user defined name - * - * @param output - * The RecordWriter object representing the output. - * @param outputNames - * User defined names of the output. - * @param isSelectAllOutput - * Marks whether all the outputs are selected. - */ - public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, - List<String> outputNames, boolean isSelectAllOutput) { - addOneOutput(output, outputNames, isSelectAllOutput); - } - - protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, - List<String> outputNames, boolean isSelectAllOutput) { - outputs.add(output); - for (String outputName : outputNames) { - if (outputName != null) { - if (!outputMap.containsKey(outputName)) { - outputMap - .put(outputName, - new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>()); - outputMap.get(outputName).add(output); - } else { - if (!outputMap.get(outputName).contains(output)) { - outputMap.get(outputName).add(output); - } - } - - } - } - } - - /** - * Collects and emits a tuple/object to the outputs by reusing a - * StreamRecord object. - * - * @param outputObject - * Object to be collected and emitted. - */ - @Override - public void collect(OUT outputObject) { - streamRecord.setObject(outputObject); - emit(streamRecord); - } - - /** - * Emits a StreamRecord to the outputs. - * - * @param streamRecord - * StreamRecord to emit. - */ - private void emit(StreamRecord<OUT> streamRecord) { - streamRecord.newId(channelID); - serializationDelegate.setInstance(streamRecord); - emitToOutputs(); - } - - protected void emitToOutputs() { - for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputs) { - try { - output.emit(serializationDelegate); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e)); - } - } - } - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java new file mode 100644 index 0000000..4c21564 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector; + +import java.util.List; + +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; + +public class StreamOutput<OUT> implements Collector<SerializationDelegate<StreamRecord<OUT>>> { + + private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output; + + private List<String> selectedNames; + private boolean selectAll = true; + + public StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, + List<String> selectedNames) { + + this.output = output; + + if (selectedNames != null) { + this.selectedNames = selectedNames; + selectAll = false; + } + } + + public void collect(SerializationDelegate<StreamRecord<OUT>> record) { + try { + output.emit(record); + } catch (Exception e) { + throw new RuntimeException("Could not emit record: " + record.getInstance()); + } + } + + @Override + public void close() { + } + + public boolean isSelectAll() { + return selectAll; + } + + public List<String> getSelectedNames() { + return selectedNames; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java new file mode 100755 index 0000000..fa374b1 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.collector; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.api.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Collector for tuples in Apache Flink stream processing. The collected values + * will be wrapped with ID in a {@link StreamRecord} and then emitted to the + * outputs. + * + * @param <OUT> + * Type of the Tuples/Objects collected. + */ +public class StreamOutputWrapper<OUT> implements Collector<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(StreamOutputWrapper.class); + + protected StreamRecord<OUT> streamRecord; + protected int channelID; + protected List<StreamOutput<OUT>> outputs; + protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate; + + /** + * Creates a new StreamCollector + * + * @param channelID + * Channel ID of the Task + * @param serializationDelegate + * Serialization delegate used for serialization + */ + public StreamOutputWrapper(int channelID, + SerializationDelegate<StreamRecord<OUT>> serializationDelegate) { + this.serializationDelegate = serializationDelegate; + + if (serializationDelegate != null) { + this.streamRecord = serializationDelegate.getInstance(); + } else { + this.streamRecord = new StreamRecord<OUT>(); + } + this.channelID = channelID; + this.outputs = new LinkedList<StreamOutput<OUT>>(); + } + + /** + * Adds an output with the given user defined name + * + * @param output + * The RecordWriter object representing the output. + * @param outputNames + * User defined names of the output. + * @param isSelectAllOutput + * Marks whether all the outputs are selected. + */ + public void addOutput(StreamOutput<OUT> output) { + outputs.add(output); + } + + protected void addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, + List<String> outputNames, boolean isSelectAllOutput) { + + } + + /** + * Collects and emits a tuple/object to the outputs by reusing a + * StreamRecord object. + * + * @param outputObject + * Object to be collected and emitted. + */ + @Override + public void collect(OUT outputObject) { + streamRecord.setObject(outputObject); + streamRecord.newId(channelID); + serializationDelegate.setInstance(streamRecord); + + emit(); + } + + /** + * Emits the current streamrecord to the outputs. + */ + protected void emit() { + for (StreamOutput<OUT> output : outputs) { + try { + output.collect(serializationDelegate); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e)); + } + } + } + } + + @Override + public void close() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java index fdf23f7..ad6d948 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java @@ -49,7 +49,6 @@ public class InputHandler<IN> { } - @SuppressWarnings("unchecked") protected void setConfigInputs() throws StreamVertexException { inputSerializer = configuration.getTypeSerializerIn1(streamVertex.userClassLoader); http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index f9fe593..61a6eb4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -21,9 +21,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; -import org.apache.flink.streaming.api.collector.DirectedStreamCollector; +import org.apache.flink.streaming.api.collector.DirectedOutputWrapper; import org.apache.flink.streaming.api.collector.OutputSelector; -import org.apache.flink.streaming.api.collector.StreamCollector; +import org.apache.flink.streaming.api.collector.StreamOutputWrapper; +import org.apache.flink.streaming.api.collector.StreamOutput; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer; @@ -43,7 +44,7 @@ public class OutputHandler<OUT> { private StreamConfig configuration; private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs; - private StreamCollector<OUT> collector; + private StreamOutputWrapper<OUT> collector; private long bufferTimeout; TypeInformation<OUT> outTypeInfo = null; @@ -79,21 +80,21 @@ public class OutputHandler<OUT> { } } - private StreamCollector<OUT> setCollector() { + private StreamOutputWrapper<OUT> setCollector() { if (streamVertex.configuration.getDirectedEmit()) { OutputSelector<OUT> outputSelector = streamVertex.configuration .getOutputSelector(streamVertex.userClassLoader); - collector = new DirectedStreamCollector<OUT>(streamVertex.getInstanceID(), + collector = new DirectedOutputWrapper<OUT>(streamVertex.getInstanceID(), outSerializationDelegate, outputSelector); } else { - collector = new StreamCollector<OUT>(streamVertex.getInstanceID(), + collector = new StreamOutputWrapper<OUT>(streamVertex.getInstanceID(), outSerializationDelegate); } return collector; } - public StreamCollector<OUT> getCollector() { + public StreamOutputWrapper<OUT> getCollector() { return collector; } @@ -121,16 +122,16 @@ public class OutputHandler<OUT> { RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output; if (bufferTimeout >= 0) { - output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex.getEnvironment().getWriter(outputNumber), - outputPartitioner, bufferTimeout); + output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex + .getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout); if (LOG.isTraceEnabled()) { LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}", bufferTimeout, streamVertex.getClass().getSimpleName()); } } else { - output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex.getEnvironment().getWriter(outputNumber), - outputPartitioner); + output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex + .getEnvironment().getWriter(outputNumber), outputPartitioner); if (LOG.isTraceEnabled()) { LOG.trace("RecordWriter initiated for {}", streamVertex.getClass().getSimpleName()); @@ -142,7 +143,8 @@ public class OutputHandler<OUT> { boolean isSelectAllOutput = configuration.getSelectAll(outputNumber); if (collector != null) { - collector.addOutput(output, outputName, isSelectAllOutput); + collector + .addOutput(new StreamOutput<OUT>(output, isSelectAllOutput ? null : outputName)); } if (LOG.isTraceEnabled()) { http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java index fe9cce3..b7af589 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java @@ -25,7 +25,7 @@ import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector; import java.io.IOException; -public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter { +public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> { private long timeout; http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java index 535e109..0bc3d7d 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java @@ -37,8 +37,8 @@ public class StreamCollectorTest { null); sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>())); - StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, sd); - collector.addOutput(recWriter, new ArrayList<String>(), false); + StreamOutputWrapper<Tuple1<Integer>> collector = new StreamOutputWrapper<Tuple1<Integer>>(2, sd); + collector.addOutput(new StreamOutput<Tuple1<Integer>>(recWriter, new ArrayList<String>())); collector.collect(new Tuple1<Integer>(3)); collector.collect(new Tuple1<Integer>(4)); collector.collect(new Tuple1<Integer>(5)); http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 2d36c1e..6b6918d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -706,7 +706,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i * * This method requires that the task configuration, the driver, and the user-code class loader are set. */ - @SuppressWarnings("unchecked") protected void initInputReaders() throws Exception { final int numInputs = getNumTaskInputs(); final MutableReader<?>[] inputReaders = new MutableReader[numInputs]; @@ -748,7 +747,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i * * This method requires that the task configuration, the driver, and the user-code class loader are set. */ - @SuppressWarnings("unchecked") protected void initBroadcastInputReaders() throws Exception { final int numBroadcastInputs = this.config.getNumBroadcastInputs(); final MutableReader<?>[] broadcastInputReaders = new MutableReader[numBroadcastInputs];
