[streaming] Streaming runtime collector rework for increased flexibility with 
directed outputs and operator chains


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9b942b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9b942b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9b942b3

Branch: refs/heads/master
Commit: d9b942b38cea01bf78ffab22a164b6622d629bc5
Parents: dc0d81b
Author: Gyula Fora <[email protected]>
Authored: Thu Jan 15 11:14:33 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Wed Jan 21 16:06:33 2015 +0100

----------------------------------------------------------------------
 .../api/collector/CollectorWrapper.java         |  48 ++++++
 .../api/collector/DirectedOutputWrapper.java    | 144 ++++++++++++++++++
 .../api/collector/DirectedStreamCollector.java  | 130 ----------------
 .../api/collector/StreamCollector.java          | 148 -------------------
 .../streaming/api/collector/StreamOutput.java   |  65 ++++++++
 .../api/collector/StreamOutputWrapper.java      | 122 +++++++++++++++
 .../api/streamvertex/InputHandler.java          |   1 -
 .../api/streamvertex/OutputHandler.java         |  26 ++--
 .../flink/streaming/io/StreamRecordWriter.java  |   2 +-
 .../api/collector/StreamCollectorTest.java      |   4 +-
 .../runtime/operators/RegularPactTask.java      |   2 -
 11 files changed, 396 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
new file mode 100644
index 0000000..d782d08
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.util.Collector;
+
+public class CollectorWrapper<OUT> implements Collector<OUT> {
+
+       List<Collector<OUT>> outputs;
+
+       public CollectorWrapper() {
+               this.outputs = new LinkedList<Collector<OUT>>();
+       }
+
+       public void addChainedOutput(Collector<OUT> output) {
+               outputs.add(output);
+       }
+
+       @Override
+       public void collect(OUT record) {
+               for(Collector<OUT> output: outputs){
+                       output.collect(record);;
+               }
+       }
+
+       @Override
+       public void close() {
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java
new file mode 100755
index 0000000..6ff03c4
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedOutputWrapper.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A StreamCollector that uses user defined output names and a user defined
+ * output selector to make directed emits.
+ * 
+ * @param <OUT>
+ *            Type of the Tuple collected.
+ */
+public class DirectedOutputWrapper<OUT> extends StreamOutputWrapper<OUT> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DirectedOutputWrapper.class);
+
+       OutputSelector<OUT> outputSelector;
+
+       protected Map<String, List<StreamOutput<OUT>>> outputMap;
+
+       private List<StreamOutput<OUT>> selectAllOutputs;
+       private Set<StreamOutput<OUT>> emitted;
+
+       /**
+        * Creates a new DirectedStreamCollector
+        * 
+        * @param channelID
+        *            Channel ID of the Task
+        * @param serializationDelegate
+        *            Serialization delegate used for serialization
+        * @param outputSelector
+        *            User defined {@link OutputSelector}
+        */
+       public DirectedOutputWrapper(int channelID,
+                       SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate,
+                       OutputSelector<OUT> outputSelector) {
+               super(channelID, serializationDelegate);
+               this.outputSelector = outputSelector;
+               this.emitted = new HashSet<StreamOutput<OUT>>();
+               this.selectAllOutputs = new LinkedList<StreamOutput<OUT>>();
+               this.outputMap = new HashMap<String, List<StreamOutput<OUT>>>();
+
+       }
+
+       @Override
+       public void addOutput(StreamOutput<OUT> output) {
+
+               super.addOutput(output);
+
+               if (output.isSelectAll()) {
+                       selectAllOutputs.add(output);
+               } else {
+                       for (String selectedName : output.getSelectedNames()) {
+                               if (selectedName != null) {
+                                       if 
(!outputMap.containsKey(selectedName)) {
+                                               outputMap.put(selectedName, new 
LinkedList<StreamOutput<OUT>>());
+                                               
outputMap.get(selectedName).add(output);
+                                       } else {
+                                               if 
(!outputMap.get(selectedName).contains(output)) {
+                                                       
outputMap.get(selectedName).add(output);
+                                               }
+                                       }
+
+                               }
+                       }
+               }
+       }
+
+       @Override
+       protected void emit() {
+               Iterable<String> outputNames = 
outputSelector.select(serializationDelegate.getInstance()
+                               .getObject());
+               emitted.clear();
+
+               for (StreamOutput<OUT> output : selectAllOutputs) {
+                       try {
+                               output.collect(serializationDelegate);
+                       } catch (Exception e) {
+                               if (LOG.isErrorEnabled()) {
+                                       LOG.error("Emit to {} failed due to: 
{}", output,
+                                                       
StringUtils.stringifyException(e));
+                               }
+                       }
+               }
+
+               for (String outputName : outputNames) {
+                       List<StreamOutput<OUT>> outputList = 
outputMap.get(outputName);
+                       try {
+                               if (outputList == null) {
+                                       if (LOG.isErrorEnabled()) {
+                                               String format = String.format(
+                                                               "Cannot emit 
because no output is selected with the name: %s",
+                                                               outputName);
+                                               LOG.error(format);
+
+                                       }
+                               } else {
+
+                                       for (StreamOutput<OUT> output : 
outputList) {
+                                               if (!emitted.contains(output)) {
+                                                       
output.collect(serializationDelegate);
+                                                       emitted.add(output);
+                                               }
+                                       }
+
+                               }
+
+                       } catch (Exception e) {
+                               if (LOG.isErrorEnabled()) {
+                                       LOG.error("Emit to {} failed due to: 
{}", outputName,
+                                                       
StringUtils.stringifyException(e));
+                               }
+                       }
+
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
deleted file mode 100755
index f37761f..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StreamCollector that uses user defined output names and a user defined
- * output selector to make directed emits.
- * 
- * @param <OUT>
- *            Type of the Tuple collected.
- */
-public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(DirectedStreamCollector.class);
-
-       OutputSelector<OUT> outputSelector;
-       private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
selectAllOutputs;
-       private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
emitted;
-
-       /**
-        * Creates a new DirectedStreamCollector
-        * 
-        * @param channelID
-        *            Channel ID of the Task
-        * @param serializationDelegate
-        *            Serialization delegate used for serialization
-        * @param outputSelector
-        *            User defined {@link OutputSelector}
-        */
-       public DirectedStreamCollector(int channelID,
-                       SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate,
-                       OutputSelector<OUT> outputSelector) {
-               super(channelID, serializationDelegate);
-               this.outputSelector = outputSelector;
-               this.emitted = new 
HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-               this.selectAllOutputs = new 
LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-       }
-
-       @Override
-       public void 
addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
-                       List<String> outputNames, boolean isSelectAllOutput) {
-
-               if (isSelectAllOutput) {
-                       selectAllOutputs.add(output);
-               } else {
-                       addOneOutput(output, outputNames, isSelectAllOutput);
-               }
-       }
-
-       /**
-        * Emits a StreamRecord to the outputs selected by the user defined
-        * OutputSelector
-        *
-        */
-       protected void emitToOutputs() {
-               Iterable<String> outputNames = 
outputSelector.select(streamRecord.getObject());
-               emitted.clear();
-
-               for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
output : selectAllOutputs) {
-                       try {
-                               output.emit(serializationDelegate);
-                       } catch (Exception e) {
-                               if (LOG.isErrorEnabled()) {
-                                       LOG.error("Emit to {} failed due to: 
{}", output,
-                                                       
StringUtils.stringifyException(e));
-                               }
-                       }
-               }
-               emitted.addAll(selectAllOutputs);
-
-               for (String outputName : outputNames) {
-                       
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList = 
outputMap
-                                       .get(outputName);
-                       try {
-                               if (outputList == null) {
-                                       if (LOG.isErrorEnabled()) {
-                                               String format = String.format(
-                                                               "Cannot emit 
because no output is selected with the name: %s",
-                                                               outputName);
-                                               LOG.error(format);
-
-                                       }
-                               } else {
-
-                                       for 
(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : outputList) {
-                                               if (!emitted.contains(output)) {
-                                                       
output.emit(serializationDelegate);
-                                                       emitted.add(output);
-                                               }
-                                       }
-
-                               }
-
-                       } catch (Exception e) {
-                               if (LOG.isErrorEnabled()) {
-                                       LOG.error("Emit to {} failed due to: 
{}", outputName,
-                                                       
StringUtils.stringifyException(e));
-                               }
-                       }
-
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
deleted file mode 100755
index a1b9e66..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Collector for tuples in Apache Flink stream processing. The collected values
- * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
- * outputs.
- * 
- * @param <OUT>
- *            Type of the Tuples/Objects collected.
- */
-public class StreamCollector<OUT> implements Collector<OUT> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamCollector.class);
-
-       protected StreamRecord<OUT> streamRecord;
-       protected int channelID;
-       protected List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
outputs;
-       protected Map<String, 
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>> outputMap;
-       protected SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate;
-
-       /**
-        * Creates a new StreamCollector
-        * 
-        * @param channelID
-        *            Channel ID of the Task
-        * @param serializationDelegate
-        *            Serialization delegate used for serialization
-        */
-       public StreamCollector(int channelID,
-                       SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate) {
-               this.serializationDelegate = serializationDelegate;
-
-               if (serializationDelegate != null) {
-                       this.streamRecord = serializationDelegate.getInstance();
-               } else {
-                       this.streamRecord = new StreamRecord<OUT>();
-               }
-               this.channelID = channelID;
-               this.outputs = new 
ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
-               this.outputMap = new HashMap<String, 
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
-       }
-
-       /**
-        * Adds an output with the given user defined name
-        * 
-        * @param output
-        *            The RecordWriter object representing the output.
-        * @param outputNames
-        *            User defined names of the output.
-        * @param isSelectAllOutput
-        *            Marks whether all the outputs are selected.
-        */
-       public void 
addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
-                       List<String> outputNames, boolean isSelectAllOutput) {
-               addOneOutput(output, outputNames, isSelectAllOutput);
-       }
-
-       protected void 
addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
-                       List<String> outputNames, boolean isSelectAllOutput) {
-               outputs.add(output);
-               for (String outputName : outputNames) {
-                       if (outputName != null) {
-                               if (!outputMap.containsKey(outputName)) {
-                                       outputMap
-                                                       .put(outputName,
-                                                                       new 
ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
-                                       outputMap.get(outputName).add(output);
-                               } else {
-                                       if 
(!outputMap.get(outputName).contains(output)) {
-                                               
outputMap.get(outputName).add(output);
-                                       }
-                               }
-
-                       }
-               }
-       }
-
-       /**
-        * Collects and emits a tuple/object to the outputs by reusing a
-        * StreamRecord object.
-        * 
-        * @param outputObject
-        *            Object to be collected and emitted.
-        */
-       @Override
-       public void collect(OUT outputObject) {
-               streamRecord.setObject(outputObject);
-               emit(streamRecord);
-       }
-
-       /**
-        * Emits a StreamRecord to the outputs.
-        * 
-        * @param streamRecord
-        *            StreamRecord to emit.
-        */
-       private void emit(StreamRecord<OUT> streamRecord) {
-               streamRecord.newId(channelID);
-               serializationDelegate.setInstance(streamRecord);
-               emitToOutputs();
-       }
-
-       protected void emitToOutputs() {
-               for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
output : outputs) {
-                       try {
-                               output.emit(serializationDelegate);
-                       } catch (Exception e) {
-                               if (LOG.isErrorEnabled()) {
-                                       LOG.error("Emit failed due to: {}", 
StringUtils.stringifyException(e));
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void close() {
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
new file mode 100644
index 0000000..4c21564
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+public class StreamOutput<OUT> implements 
Collector<SerializationDelegate<StreamRecord<OUT>>> {
+
+       private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
+
+       private List<String> selectedNames;
+       private boolean selectAll = true;
+
+       public 
StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
+                       List<String> selectedNames) {
+
+               this.output = output;
+
+               if (selectedNames != null) {
+                       this.selectedNames = selectedNames;
+                       selectAll = false;
+               }
+       }
+
+       public void collect(SerializationDelegate<StreamRecord<OUT>> record) {
+               try {
+                       output.emit(record);
+               } catch (Exception e) {
+                       throw new RuntimeException("Could not emit record: " + 
record.getInstance());
+               }
+       }
+
+       @Override
+       public void close() {
+       }
+
+       public boolean isSelectAll() {
+               return selectAll;
+       }
+
+       public List<String> getSelectedNames() {
+               return selectedNames;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
new file mode 100755
index 0000000..fa374b1
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutputWrapper.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Collector for tuples in Apache Flink stream processing. The collected values
+ * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
+ * outputs.
+ * 
+ * @param <OUT>
+ *            Type of the Tuples/Objects collected.
+ */
+public class StreamOutputWrapper<OUT> implements Collector<OUT> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamOutputWrapper.class);
+
+       protected StreamRecord<OUT> streamRecord;
+       protected int channelID;
+       protected List<StreamOutput<OUT>> outputs;
+       protected SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate;
+
+       /**
+        * Creates a new StreamCollector
+        * 
+        * @param channelID
+        *            Channel ID of the Task
+        * @param serializationDelegate
+        *            Serialization delegate used for serialization
+        */
+       public StreamOutputWrapper(int channelID,
+                       SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate) {
+               this.serializationDelegate = serializationDelegate;
+
+               if (serializationDelegate != null) {
+                       this.streamRecord = serializationDelegate.getInstance();
+               } else {
+                       this.streamRecord = new StreamRecord<OUT>();
+               }
+               this.channelID = channelID;
+               this.outputs = new LinkedList<StreamOutput<OUT>>();
+       }
+
+       /**
+        * Adds an output with the given user defined name
+        * 
+        * @param output
+        *            The RecordWriter object representing the output.
+        * @param outputNames
+        *            User defined names of the output.
+        * @param isSelectAllOutput
+        *            Marks whether all the outputs are selected.
+        */
+       public void addOutput(StreamOutput<OUT> output) {
+               outputs.add(output);
+       }
+
+       protected void 
addOneOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
+                       List<String> outputNames, boolean isSelectAllOutput) {
+
+       }
+
+       /**
+        * Collects and emits a tuple/object to the outputs by reusing a
+        * StreamRecord object.
+        * 
+        * @param outputObject
+        *            Object to be collected and emitted.
+        */
+       @Override
+       public void collect(OUT outputObject) {
+               streamRecord.setObject(outputObject);
+               streamRecord.newId(channelID);
+               serializationDelegate.setInstance(streamRecord);
+
+               emit();
+       }
+
+       /**
+        * Emits the current streamrecord to the outputs.
+        */
+       protected void emit() {
+               for (StreamOutput<OUT> output : outputs) {
+                       try {
+                               output.collect(serializationDelegate);
+                       } catch (Exception e) {
+                               if (LOG.isErrorEnabled()) {
+                                       LOG.error("Emit failed due to: {}", 
StringUtils.stringifyException(e));
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void close() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index fdf23f7..ad6d948 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -49,7 +49,6 @@ public class InputHandler<IN> {
 
        }
 
-       @SuppressWarnings("unchecked")
        protected void setConfigInputs() throws StreamVertexException {
                inputSerializer = 
configuration.getTypeSerializerIn1(streamVertex.userClassLoader);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index f9fe593..61a6eb4 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -21,9 +21,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.collector.DirectedStreamCollector;
+import org.apache.flink.streaming.api.collector.DirectedOutputWrapper;
 import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.collector.StreamCollector;
+import org.apache.flink.streaming.api.collector.StreamOutputWrapper;
+import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
@@ -43,7 +44,7 @@ public class OutputHandler<OUT> {
        private StreamConfig configuration;
 
        private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
outputs;
-       private StreamCollector<OUT> collector;
+       private StreamOutputWrapper<OUT> collector;
        private long bufferTimeout;
 
        TypeInformation<OUT> outTypeInfo = null;
@@ -79,21 +80,21 @@ public class OutputHandler<OUT> {
                }
        }
 
-       private StreamCollector<OUT> setCollector() {
+       private StreamOutputWrapper<OUT> setCollector() {
                if (streamVertex.configuration.getDirectedEmit()) {
                        OutputSelector<OUT> outputSelector = 
streamVertex.configuration
                                        
.getOutputSelector(streamVertex.userClassLoader);
 
-                       collector = new 
DirectedStreamCollector<OUT>(streamVertex.getInstanceID(),
+                       collector = new 
DirectedOutputWrapper<OUT>(streamVertex.getInstanceID(),
                                        outSerializationDelegate, 
outputSelector);
                } else {
-                       collector = new 
StreamCollector<OUT>(streamVertex.getInstanceID(),
+                       collector = new 
StreamOutputWrapper<OUT>(streamVertex.getInstanceID(),
                                        outSerializationDelegate);
                }
                return collector;
        }
 
-       public StreamCollector<OUT> getCollector() {
+       public StreamOutputWrapper<OUT> getCollector() {
                return collector;
        }
 
@@ -121,16 +122,16 @@ public class OutputHandler<OUT> {
                RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
 
                if (bufferTimeout >= 0) {
-                       output = new 
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex.getEnvironment().getWriter(outputNumber),
-                                       outputPartitioner, bufferTimeout);
+                       output = new 
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex
+                                       
.getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout);
 
                        if (LOG.isTraceEnabled()) {
                                LOG.trace("StreamRecordWriter initiated with {} 
bufferTimeout for {}",
                                                bufferTimeout, 
streamVertex.getClass().getSimpleName());
                        }
                } else {
-                       output = new 
RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex.getEnvironment().getWriter(outputNumber),
-                                       outputPartitioner);
+                       output = new 
RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex
+                                       
.getEnvironment().getWriter(outputNumber), outputPartitioner);
 
                        if (LOG.isTraceEnabled()) {
                                LOG.trace("RecordWriter initiated for {}", 
streamVertex.getClass().getSimpleName());
@@ -142,7 +143,8 @@ public class OutputHandler<OUT> {
                boolean isSelectAllOutput = 
configuration.getSelectAll(outputNumber);
 
                if (collector != null) {
-                       collector.addOutput(output, outputName, 
isSelectAllOutput);
+                       collector
+                                       .addOutput(new 
StreamOutput<OUT>(output, isSelectAllOutput ? null : outputName));
                }
 
                if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
index fe9cce3..b7af589 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamRecordWriter.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 
 import java.io.IOException;
 
-public class StreamRecordWriter<T extends IOReadableWritable> extends 
RecordWriter {
+public class StreamRecordWriter<T extends IOReadableWritable> extends 
RecordWriter<T> {
 
        private long timeout;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 535e109..0bc3d7d 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -37,8 +37,8 @@ public class StreamCollectorTest {
                                null);
                sd.setInstance(new 
StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
 
-               StreamCollector<Tuple1<Integer>> collector = new 
StreamCollector<Tuple1<Integer>>(2, sd);
-               collector.addOutput(recWriter, new ArrayList<String>(), false);
+               StreamOutputWrapper<Tuple1<Integer>> collector = new 
StreamOutputWrapper<Tuple1<Integer>>(2, sd);
+               collector.addOutput(new 
StreamOutput<Tuple1<Integer>>(recWriter, new ArrayList<String>()));
                collector.collect(new Tuple1<Integer>(3));
                collector.collect(new Tuple1<Integer>(4));
                collector.collect(new Tuple1<Integer>(5));

http://git-wip-us.apache.org/repos/asf/flink/blob/d9b942b3/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 2d36c1e..6b6918d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -706,7 +706,6 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
         *
         * This method requires that the task configuration, the driver, and 
the user-code class loader are set.
         */
-       @SuppressWarnings("unchecked")
        protected void initInputReaders() throws Exception {
                final int numInputs = getNumTaskInputs();
                final MutableReader<?>[] inputReaders = new 
MutableReader[numInputs];
@@ -748,7 +747,6 @@ public class RegularPactTask<S extends Function, OT> 
extends AbstractInvokable i
         *
         * This method requires that the task configuration, the driver, and 
the user-code class loader are set.
         */
-       @SuppressWarnings("unchecked")
        protected void initBroadcastInputReaders() throws Exception {
                final int numBroadcastInputs = 
this.config.getNumBroadcastInputs();
                final MutableReader<?>[] broadcastInputReaders = new 
MutableReader[numBroadcastInputs];

Reply via email to