http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java index de9c664..ea891c9 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java @@ -61,8 +61,7 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> public WindowInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>> triggerPolicies, LinkedList<EvictionPolicy<IN>> evictionPolicies) { super(userFunction); - setChainingStrategy(ChainingStrategy.NEVER); - + this.triggerPolicies = triggerPolicies; this.evictionPolicies = evictionPolicies;
http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 99f826d..135f742 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.StreamConfig; @@ -54,7 +55,7 @@ public class OutputHandler<OUT> { private Map<String, StreamOutput<?>> outputMap; private Map<String, StreamConfig> chainedConfigs; - private List<String> recordWriterOrder; + private List<Tuple2<String, String>> outEdgesInOrder; public OutputHandler(StreamVertex<?, OUT> vertex) { @@ -68,24 +69,16 @@ public class OutputHandler<OUT> { // We read the chained configs, and the order of record writer // registrations by outputname this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl); - this.recordWriterOrder = configuration.getRecordWriterOrder(cl); - - // For the network outputs of the chain head we create the stream - // outputs - for (String outName : configuration.getOutputs(cl)) { - StreamOutput<?> streamOutput = createStreamOutput(outName, configuration); - outputMap.put(outName, streamOutput); - } - - // If we have chained tasks we iterate through them and create the - // stream outputs for the network outputs - if (chainedConfigs != null) { - for (StreamConfig config : chainedConfigs.values()) { - for (String outName : config.getOutputs(cl)) { - StreamOutput<?> streamOutput = createStreamOutput(outName, config); - outputMap.put(outName, streamOutput); - } - } + this.chainedConfigs.put(configuration.getTaskName(), configuration); + + this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl); + + // We iterate through all the out edges from this job vertex and create + // a stream output + for (Tuple2<String, String> outEdge : outEdgesInOrder) { + StreamOutput<?> streamOutput = createStreamOutput(outEdge.f1, + chainedConfigs.get(outEdge.f0), outEdgesInOrder.indexOf(outEdge)); + outputMap.put(outEdge.f1, streamOutput); } // We create the outer collector that will be passed to the first task @@ -196,40 +189,32 @@ public class OutputHandler<OUT> { * We create the StreamOutput for the specific output given by the name, and * the configuration of its source task * - * @param name + * @param outputVertex * Name of the output to which the streamoutput will be set up * @param configuration * The config of upStream task * @return */ - private <T> StreamOutput<T> createStreamOutput(String name, StreamConfig configuration) { - - int outputNumber = recordWriterOrder.indexOf(name); - - StreamPartitioner<T> outputPartitioner; - - try { - outputPartitioner = configuration.getPartitioner(vertex.userClassLoader, name); - } catch (Exception e) { - throw new StreamVertexException("Cannot deserialize partitioner for " - + vertex.getName() + " with " + name + " outputs", e); - } + private <T> StreamOutput<T> createStreamOutput(String outputVertex, StreamConfig configuration, + int outputIndex) { + + StreamPartitioner<T> outputPartitioner = configuration.getPartitioner(cl, outputVertex); RecordWriter<SerializationDelegate<StreamRecord<T>>> output; - long bufferTimeout = configuration.getBufferTimeout(); + if (configuration.getBufferTimeout() >= 0) { - if (bufferTimeout >= 0) { output = new StreamRecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex - .getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout); + .getEnvironment().getWriter(outputIndex), outputPartitioner, + configuration.getBufferTimeout()); if (LOG.isTraceEnabled()) { LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}", - bufferTimeout, vertex.getClass().getSimpleName()); + configuration.getBufferTimeout(), vertex.getClass().getSimpleName()); } } else { output = new RecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex - .getEnvironment().getWriter(outputNumber), outputPartitioner); + .getEnvironment().getWriter(outputIndex), outputPartitioner); if (LOG.isTraceEnabled()) { LOG.trace("RecordWriter initiated for {}", vertex.getClass().getSimpleName()); @@ -237,11 +222,12 @@ public class OutputHandler<OUT> { } StreamOutput<T> streamOutput = new StreamOutput<T>(output, - configuration.isSelectAll(name) ? null : configuration.getOutputNames(name)); + configuration.isSelectAll(outputVertex) ? null + : configuration.getOutputNames(outputVertex)); if (LOG.isTraceEnabled()) { LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass() - .getSimpleName(), outputNumber, vertex.getClass().getSimpleName()); + .getSimpleName(), outputIndex, vertex.getClass().getSimpleName()); } return streamOutput; http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index e7f15b6..95a5d9b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -46,7 +46,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { @Override public void execute(String jobName) throws Exception { - JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName); + JobGraph jobGraph = streamGraph.getJobGraph(jobName); Configuration configuration = jobGraph.getJobConfiguration(); http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala index a69454c..a408ec0 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala @@ -83,7 +83,7 @@ object StreamCrossOperator { clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) - javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), + javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(), invokable) javaStream.setType(implicitly[TypeInformation[R]]) @@ -94,7 +94,7 @@ object StreamCrossOperator { } override def every(length: Long): CrossWindow[I1, I2] = { - val builder = javaStream.getExecutionEnvironment().getJobGraphBuilder() + val builder = javaStream.getExecutionEnvironment().getStreamGraph() val invokable = builder.getInvokable(javaStream.getId()) invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length) this http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala index 7ecd79a..1bd1bfb 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -186,7 +186,7 @@ object StreamJoinOperator { clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) - javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), + javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(), invokable) javaStream.setType(implicitly[TypeInformation[R]])
