http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index de9c664..ea891c9 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -61,8 +61,7 @@ public abstract class WindowInvokable<IN, OUT> extends 
StreamInvokable<IN, OUT>
        public WindowInvokable(Function userFunction, 
LinkedList<TriggerPolicy<IN>> triggerPolicies,
                        LinkedList<EvictionPolicy<IN>> evictionPolicies) {
                super(userFunction);
-               setChainingStrategy(ChainingStrategy.NEVER);
-               
+
                this.triggerPolicies = triggerPolicies;
                this.evictionPolicies = evictionPolicies;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 99f826d..135f742 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -54,7 +55,7 @@ public class OutputHandler<OUT> {
 
        private Map<String, StreamOutput<?>> outputMap;
        private Map<String, StreamConfig> chainedConfigs;
-       private List<String> recordWriterOrder;
+       private List<Tuple2<String, String>> outEdgesInOrder;
 
        public OutputHandler(StreamVertex<?, OUT> vertex) {
 
@@ -68,24 +69,16 @@ public class OutputHandler<OUT> {
                // We read the chained configs, and the order of record writer
                // registrations by outputname
                this.chainedConfigs = 
configuration.getTransitiveChainedTaskConfigs(cl);
-               this.recordWriterOrder = configuration.getRecordWriterOrder(cl);
-
-               // For the network outputs of the chain head we create the 
stream
-               // outputs
-               for (String outName : configuration.getOutputs(cl)) {
-                       StreamOutput<?> streamOutput = 
createStreamOutput(outName, configuration);
-                       outputMap.put(outName, streamOutput);
-               }
-
-               // If we have chained tasks we iterate through them and create 
the
-               // stream outputs for the network outputs
-               if (chainedConfigs != null) {
-                       for (StreamConfig config : chainedConfigs.values()) {
-                               for (String outName : config.getOutputs(cl)) {
-                                       StreamOutput<?> streamOutput = 
createStreamOutput(outName, config);
-                                       outputMap.put(outName, streamOutput);
-                               }
-                       }
+               this.chainedConfigs.put(configuration.getTaskName(), 
configuration);
+                               
+               this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
+
+               // We iterate through all the out edges from this job vertex 
and create
+               // a stream output
+               for (Tuple2<String, String> outEdge : outEdgesInOrder) {
+                       StreamOutput<?> streamOutput = 
createStreamOutput(outEdge.f1,
+                                       chainedConfigs.get(outEdge.f0), 
outEdgesInOrder.indexOf(outEdge));
+                       outputMap.put(outEdge.f1, streamOutput);
                }
 
                // We create the outer collector that will be passed to the 
first task
@@ -196,40 +189,32 @@ public class OutputHandler<OUT> {
         * We create the StreamOutput for the specific output given by the 
name, and
         * the configuration of its source task
         * 
-        * @param name
+        * @param outputVertex
         *            Name of the output to which the streamoutput will be set 
up
         * @param configuration
         *            The config of upStream task
         * @return
         */
-       private <T> StreamOutput<T> createStreamOutput(String name, 
StreamConfig configuration) {
-
-               int outputNumber = recordWriterOrder.indexOf(name);
-
-               StreamPartitioner<T> outputPartitioner;
-
-               try {
-                       outputPartitioner = 
configuration.getPartitioner(vertex.userClassLoader, name);
-               } catch (Exception e) {
-                       throw new StreamVertexException("Cannot deserialize 
partitioner for "
-                                       + vertex.getName() + " with " + name + 
" outputs", e);
-               }
+       private <T> StreamOutput<T> createStreamOutput(String outputVertex, 
StreamConfig configuration,
+                       int outputIndex) {
+               
+               StreamPartitioner<T> outputPartitioner = 
configuration.getPartitioner(cl, outputVertex);
 
                RecordWriter<SerializationDelegate<StreamRecord<T>>> output;
 
-               long bufferTimeout = configuration.getBufferTimeout();
+               if (configuration.getBufferTimeout() >= 0) {
 
-               if (bufferTimeout >= 0) {
                        output = new 
StreamRecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
-                                       
.getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout);
+                                       
.getEnvironment().getWriter(outputIndex), outputPartitioner,
+                                       configuration.getBufferTimeout());
 
                        if (LOG.isTraceEnabled()) {
                                LOG.trace("StreamRecordWriter initiated with {} 
bufferTimeout for {}",
-                                               bufferTimeout, 
vertex.getClass().getSimpleName());
+                                               
configuration.getBufferTimeout(), vertex.getClass().getSimpleName());
                        }
                } else {
                        output = new 
RecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
-                                       
.getEnvironment().getWriter(outputNumber), outputPartitioner);
+                                       
.getEnvironment().getWriter(outputIndex), outputPartitioner);
 
                        if (LOG.isTraceEnabled()) {
                                LOG.trace("RecordWriter initiated for {}", 
vertex.getClass().getSimpleName());
@@ -237,11 +222,12 @@ public class OutputHandler<OUT> {
                }
 
                StreamOutput<T> streamOutput = new StreamOutput<T>(output,
-                               configuration.isSelectAll(name) ? null : 
configuration.getOutputNames(name));
+                               configuration.isSelectAll(outputVertex) ? null
+                                               : 
configuration.getOutputNames(outputVertex));
 
                if (LOG.isTraceEnabled()) {
                        LOG.trace("Partitioner set: {} with {} outputs for {}", 
outputPartitioner.getClass()
-                                       .getSimpleName(), outputNumber, 
vertex.getClass().getSimpleName());
+                                       .getSimpleName(), outputIndex, 
vertex.getClass().getSimpleName());
                }
 
                return streamOutput;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index e7f15b6..95a5d9b 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -46,7 +46,7 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
 
        @Override
        public void execute(String jobName) throws Exception {
-               JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
+               JobGraph jobGraph = streamGraph.getJobGraph(jobName);
 
                Configuration configuration = jobGraph.getJobConfiguration();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index a69454c..a408ec0 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -83,7 +83,7 @@ object StreamCrossOperator {
         clean(getCrossWindowFunction(op, fun)), op.windowSize, 
op.slideInterval, op.timeStamp1,
         op.timeStamp2)
 
-      
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+      
javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
         invokable)
 
       javaStream.setType(implicitly[TypeInformation[R]])
@@ -94,7 +94,7 @@ object StreamCrossOperator {
     }
 
     override def every(length: Long): CrossWindow[I1, I2] = {
-      val builder = javaStream.getExecutionEnvironment().getJobGraphBuilder()
+      val builder = javaStream.getExecutionEnvironment().getStreamGraph()
       val invokable = builder.getInvokable(javaStream.getId())
       invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length)
       this

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index 7ecd79a..1bd1bfb 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -186,7 +186,7 @@ object StreamJoinOperator {
         clean(getJoinWindowFunction(jp, fun)), op.windowSize, 
op.slideInterval, op.timeStamp1,
         op.timeStamp2)
 
-      
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+      
javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
         invokable)
 
       javaStream.setType(implicitly[TypeInformation[R]])

Reply via email to