[FLINK-8590][runtime] Drop addBufferConsumerToAllSubpartitions method

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eb96d5d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eb96d5d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eb96d5d2

Branch: refs/heads/master
Commit: eb96d5d2205eb1e807b2dc56bf0fd1e33b13c760
Parents: 0af22bf
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Tue Feb 6 10:03:03 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:25 2018 +0100

----------------------------------------------------------------------
 .../api/writer/ResultPartitionWriter.java       | 25 -------
 .../iterative/task/IterationHeadTask.java       | 15 ++--
 .../network/partition/ResultPartitionTest.java  | 32 --------
 .../flink/streaming/api/graph/StreamConfig.java |  7 ++
 .../streaming/runtime/tasks/OperatorChain.java  | 43 +++--------
 .../streaming/runtime/tasks/StreamTask.java     | 79 +++++++++++++++++---
 .../operators/StreamOperatorChainingTest.java   | 13 +++-
 .../runtime/tasks/StreamTaskTestHarness.java    |  3 +-
 8 files changed, 105 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eb96d5d2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index a0a1dff..7b8e485 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -47,29 +47,4 @@ public interface ResultPartitionWriter {
         * resources.
         */
        void writeBuffer(Buffer buffer, int subpartitionIndex) throws 
IOException;
-
-       /**
-        * Writes the given buffer to all available target subpartitions.
-        *
-        * <p>The buffer is taken over and used for each of the channels.
-        * It will be recycled afterwards.
-        *
-        * <p>This method takes the ownership of the passed {@code buffer} and 
thus is responsible for releasing it's
-        * resources.
-        *
-        * @param buffer the buffer to write
-        */
-       default void writeBufferToAllSubpartitions(final Buffer buffer) throws 
IOException {
-               try {
-                       for (int subpartition = 0; subpartition < 
getNumberOfSubpartitions(); subpartition++) {
-                               // retain the buffer so that it can be recycled 
by each channel of targetPartition
-                               buffer.retainBuffer();
-                               writeBuffer(buffer, subpartition);
-                       }
-               } finally {
-                       // we do not need to further retain the eventBuffer
-                       // (it will be recycled after the last channel stops 
using it)
-                       buffer.recycleBuffer();
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eb96d5d2/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 1dd3da4..9f9af36 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -24,15 +24,14 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
 import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
@@ -95,7 +94,9 @@ public class IterationHeadTask<X, Y, S extends Function, OT> 
extends AbstractIte
 
        private TypeSerializerFactory<X> solutionTypeSerializer;
 
-       private ResultPartitionWriter toSync;
+       private RecordWriter<IOReadableWritable> toSync;
+
+       private ResultPartitionID toSyncPartitionId;
 
        private int feedbackDataInput; // workset or bulk partial solution
 
@@ -141,7 +142,8 @@ public class IterationHeadTask<X, Y, S extends Function, 
OT> extends AbstractIte
                        throw new Exception("Error: Inconsistent head task 
setup - wrong mapping of output gates.");
                }
                // now, we can instantiate the sync gate
-               this.toSync = getEnvironment().getWriter(syncGateIndex);
+               this.toSync = new 
RecordWriter<IOReadableWritable>(getEnvironment().getWriter(syncGateIndex));
+               this.toSyncPartitionId = 
getEnvironment().getWriter(syncGateIndex).getPartitionId();
        }
 
        /**
@@ -238,7 +240,7 @@ public class IterationHeadTask<X, Y, S extends Function, 
OT> extends AbstractIte
        private SuperstepBarrier initSuperstepBarrier() {
                SuperstepBarrier barrier = new 
SuperstepBarrier(getUserCodeClassLoader());
                TaskEventDispatcher taskEventDispatcher = 
getEnvironment().getTaskEventDispatcher();
-               ResultPartitionID partitionId = toSync.getPartitionId();
+               ResultPartitionID partitionId = toSyncPartitionId;
                taskEventDispatcher.subscribeToEvent(partitionId, barrier, 
AllWorkersDoneEvent.class);
                taskEventDispatcher.subscribeToEvent(partitionId, barrier, 
TerminationEvent.class);
                return barrier;
@@ -452,7 +454,6 @@ public class IterationHeadTask<X, Y, S extends Function, 
OT> extends AbstractIte
                if (log.isInfoEnabled()) {
                        log.info(formatLogString("sending " + 
WorkerDoneEvent.class.getSimpleName() + " to sync"));
                }
-
-               
this.toSync.writeBufferToAllSubpartitions(EventSerializer.toBuffer(event));
+               this.toSync.broadcastEvent(event);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eb96d5d2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 907939a..82c6fd5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -21,8 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -32,7 +30,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -168,35 +165,6 @@ public class ResultPartitionTest {
                }
        }
 
-       /**
-        * Tests that event buffers are properly added and recycled when 
broadcasting events
-        * to multiple channels.
-        */
-       @Test
-       public void testWriteBufferToAllSubpartitionsReferenceCounting() throws 
Exception {
-               Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-
-               ResultPartition partition = new ResultPartition(
-                       "TestTask",
-                       mock(TaskActions.class),
-                       new JobID(),
-                       new ResultPartitionID(),
-                       ResultPartitionType.PIPELINED,
-                       2,
-                       2,
-                       mock(ResultPartitionManager.class),
-                       mock(ResultPartitionConsumableNotifier.class),
-                       mock(IOManager.class),
-                       false);
-
-               partition.writeBufferToAllSubpartitions(buffer);
-
-               // release the buffers in the partition
-               partition.release();
-
-               assertTrue(buffer.isRecycled());
-       }
-
        @Test
        public void testAddOnPipelinedPartition() throws Exception {
                testAddOnPartition(ResultPartitionType.PIPELINED);

http://git-wip-us.apache.org/repos/asf/flink/blob/eb96d5d2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index ccf6baf..c290c67 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -413,6 +413,13 @@ public class StreamConfig implements Serializable {
                }
        }
 
+       public Map<Integer, StreamConfig> 
getTransitiveChainedTaskConfigsWithSelf(ClassLoader cl) {
+               //TODO: could this logic be moved to the user of 
#setTransitiveChainedTaskConfigs() ?
+               Map<Integer, StreamConfig> chainedTaskConfigs = 
getTransitiveChainedTaskConfigs(cl);
+               chainedTaskConfigs.put(getVertexID(), this);
+               return chainedTaskConfigs;
+       }
+
        public void setOperatorID(OperatorID operatorID) {
                this.config.setBytes(OPERATOR_ID, operatorID.getBytes());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/eb96d5d2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index f3c7293..4807c77 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
@@ -44,8 +43,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
-import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
@@ -92,7 +89,9 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
         */
        private StreamStatus streamStatus = StreamStatus.ACTIVE;
 
-       public OperatorChain(StreamTask<OUT, OP> containingTask) {
+       public OperatorChain(
+                       StreamTask<OUT, OP> containingTask,
+                       
List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
streamRecordWriters) {
 
                final ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
                final StreamConfig configuration = 
containingTask.getConfiguration();
@@ -100,8 +99,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                headOperator = 
configuration.getStreamOperator(userCodeClassloader);
 
                // we read the chained configs, and the order of record writer 
registrations by output name
-               Map<Integer, StreamConfig> chainedConfigs = 
configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
-               chainedConfigs.put(configuration.getVertexID(), configuration);
+               Map<Integer, StreamConfig> chainedConfigs = 
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
 
                // create the final output stream writers
                // we iterate through all the out edges from this job vertex 
and create a stream output
@@ -116,11 +114,10 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                                StreamEdge outEdge = outEdgesInOrder.get(i);
 
                                RecordWriterOutput<?> streamOutput = 
createStreamOutput(
+                                       streamRecordWriters.get(i),
                                        outEdge,
                                        
chainedConfigs.get(outEdge.getSourceId()),
-                                       i,
-                                       containingTask.getEnvironment(),
-                                       containingTask.getName());
+                                       containingTask.getEnvironment());
 
                                this.streamOutputs[i] = streamOutput;
                                streamOutputMap.put(outEdge, streamOutput);
@@ -380,12 +377,11 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                return currentOperatorOutput;
        }
 
-       private <T> RecordWriterOutput<T> createStreamOutput(
+       private RecordWriterOutput<OUT> createStreamOutput(
+                       
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter,
                        StreamEdge edge,
                        StreamConfig upStreamConfig,
-                       int outputIndex,
-                       Environment taskEnvironment,
-                       String taskName) {
+                       Environment taskEnvironment) {
                OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, 
return null if not sideOutput
 
                TypeSerializer outSerializer = null;
@@ -399,26 +395,7 @@ public class OperatorChain<OUT, OP extends 
StreamOperator<OUT>> implements Strea
                        outSerializer = 
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
                }
 
-               @SuppressWarnings("unchecked")
-               StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) 
edge.getPartitioner();
-
-               LOG.debug("Using partitioner {} for output {} of task ", 
outputPartitioner, outputIndex, taskName);
-
-               ResultPartitionWriter bufferWriter = 
taskEnvironment.getWriter(outputIndex);
-
-               // we initialize the partitioner here with the number of key 
groups (aka max. parallelism)
-               if (outputPartitioner instanceof ConfigurableStreamPartitioner) 
{
-                       int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
-                       if (0 < numKeyGroups) {
-                               ((ConfigurableStreamPartitioner) 
outputPartitioner).configure(numKeyGroups);
-                       }
-               }
-
-               StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> 
output =
-                               new StreamRecordWriter<>(bufferWriter, 
outputPartitioner, upStreamConfig.getBufferTimeout());
-               
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
-
-               return new RecordWriterOutput<>(output, outSerializer, 
sideOutputTag, this);
+               return new RecordWriterOutput<>(streamRecordWriter, 
outSerializer, sideOutputTag, this);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/eb96d5d2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d6fb6c0..03c23a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -31,10 +32,10 @@ import 
org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StateBackend;
@@ -44,12 +45,16 @@ import 
org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import 
org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import org.apache.flink.util.ExceptionUtils;
@@ -62,7 +67,9 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -136,7 +143,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        protected OperatorChain<OUT, OP> operatorChain;
 
        /** The configuration of this streaming task. */
-       protected StreamConfig configuration;
+       protected final StreamConfig configuration;
 
        /** Our state backend. We use this to create checkpoint streams and a 
keyed state backend. */
        protected StateBackend stateBackend;
@@ -175,6 +182,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        /** Wrapper for synchronousCheckpointExceptionHandler to deal with 
rethrown exceptions. Used in the async part. */
        private AsyncCheckpointExceptionHandler 
asynchronousCheckpointExceptionHandler;
 
+       private final 
List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
streamRecordWriters;
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -193,17 +202,20 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         * null is passes for the time provider) a {@link 
SystemProcessingTimeService DefaultTimerService}
         * will be used.
         *
-        * @param env The task environment for this task.
+        * @param environment The task environment for this task.
         * @param timeProvider Optionally, a specific time provider to use.
         */
        protected StreamTask(
-                       Environment env,
+                       Environment environment,
                        @Nullable ProcessingTimeService timeProvider) {
 
-               super(env);
+               super(environment);
 
-               // assign a possibly injected timer service
                this.timerService = timeProvider;
+               this.configuration = new StreamConfig(getTaskConfiguration());
+               this.streamRecordWriters = createStreamRecordWriters(
+                       configuration,
+                       environment);
        }
 
        // 
------------------------------------------------------------------------
@@ -238,7 +250,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        LOG.debug("Initializing {}.", getName());
 
                        asyncOperationsThreadPool = 
Executors.newCachedThreadPool();
-                       configuration = new 
StreamConfig(getTaskConfiguration());
 
                        CheckpointExceptionHandlerFactory 
cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();
 
@@ -261,7 +272,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                timerService = new 
SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
                        }
 
-                       operatorChain = new OperatorChain<>(this);
+                       operatorChain = new OperatorChain<>(this, 
streamRecordWriters);
                        headOperator = operatorChain.getHeadOperator();
 
                        // task specific initialization
@@ -648,9 +659,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                final CancelCheckpointMarker message = new 
CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                                Exception exception = null;
 
-                               for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
+                               for 
(StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
streamRecordWriter : streamRecordWriters) {
                                        try {
-                                               
output.writeBufferToAllSubpartitions(EventSerializer.toBuffer(message));
+                                               
streamRecordWriter.broadcastEvent(message);
                                        } catch (Exception e) {
                                                exception = 
ExceptionUtils.firstOrSuppressed(
                                                        new Exception("Could 
not send cancel checkpoint marker to downstream tasks.", e),
@@ -1086,4 +1097,52 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        }
                }
        }
+
+       @VisibleForTesting
+       public static <OUT> 
List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
createStreamRecordWriters(
+                       StreamConfig configuration,
+                       Environment environment) {
+               
List<StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>> 
streamRecordWriters = new ArrayList<>();
+               List<StreamEdge> outEdgesInOrder = 
configuration.getOutEdgesInOrder(environment.getUserClassLoader());
+               Map<Integer, StreamConfig> chainedConfigs = 
configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
+
+               for (int i = 0; i < outEdgesInOrder.size(); i++) {
+                       StreamEdge edge = outEdgesInOrder.get(i);
+                       streamRecordWriters.add(
+                               createStreamRecordWriter(
+                                       edge,
+                                       i,
+                                       environment,
+                                       environment.getTaskInfo().getTaskName(),
+                                       
chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
+               }
+               return streamRecordWriters;
+       }
+
+       private static <OUT> 
StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
createStreamRecordWriter(
+                       StreamEdge edge,
+                       int outputIndex,
+                       Environment environment,
+                       String taskName,
+                       long bufferTimeout) {
+               @SuppressWarnings("unchecked")
+               StreamPartitioner<OUT> outputPartitioner = 
(StreamPartitioner<OUT>) edge.getPartitioner();
+
+               LOG.debug("Using partitioner {} for output {} of task ", 
outputPartitioner, outputIndex, taskName);
+
+               ResultPartitionWriter bufferWriter = 
environment.getWriter(outputIndex);
+
+               // we initialize the partitioner here with the number of key 
groups (aka max. parallelism)
+               if (outputPartitioner instanceof ConfigurableStreamPartitioner) 
{
+                       int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
+                       if (0 < numKeyGroups) {
+                               ((ConfigurableStreamPartitioner) 
outputPartitioner).configure(numKeyGroups);
+                       }
+               }
+
+               StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
output =
+                       new StreamRecordWriter<>(bufferWriter, 
outputPartitioner, bufferTimeout);
+               
output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
+               return output;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eb96d5d2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
index 3cf5248..e980ab7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java
@@ -150,8 +150,7 @@ public class StreamOperatorChainingTest {
 
                try (MockEnvironment environment = 
createMockEnvironment(chainedVertex.getName())) {
                        StreamTask<Integer, StreamMap<Integer, Integer>> 
mockTask = createMockTask(streamConfig, environment);
-
-                       OperatorChain<Integer, StreamMap<Integer, Integer>> 
operatorChain = new OperatorChain<>(mockTask);
+                       OperatorChain<Integer, StreamMap<Integer, Integer>> 
operatorChain = createOperatorChain(streamConfig, environment, mockTask);
 
                        headOperator.setup(mockTask, streamConfig, 
operatorChain.getChainEntryPoint());
 
@@ -300,8 +299,7 @@ public class StreamOperatorChainingTest {
 
                try (MockEnvironment environment = 
createMockEnvironment(chainedVertex.getName())) {
                        StreamTask<Integer, StreamMap<Integer, Integer>> 
mockTask = createMockTask(streamConfig, environment);
-
-                       OperatorChain<Integer, StreamMap<Integer, Integer>> 
operatorChain = new OperatorChain<>(mockTask);
+                       OperatorChain<Integer, StreamMap<Integer, Integer>> 
operatorChain = createOperatorChain(streamConfig, environment, mockTask);
 
                        headOperator.setup(mockTask, streamConfig, 
operatorChain.getChainEntryPoint());
 
@@ -321,6 +319,13 @@ public class StreamOperatorChainingTest {
                }
        }
 
+       private <IN, OT extends StreamOperator<IN>> OperatorChain<IN, OT> 
createOperatorChain(
+                       StreamConfig streamConfig,
+                       Environment environment,
+                       StreamTask<IN, OT> task) {
+               return new OperatorChain<>(task, 
StreamTask.createStreamRecordWriters(streamConfig, environment));
+       }
+
        private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> 
createMockTask(
                        StreamConfig streamConfig,
                        Environment environment) {

http://git-wip-us.apache.org/repos/asf/flink/blob/eb96d5d2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index e535bed..bcb833e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -214,11 +214,12 @@ public class StreamTaskTestHarness<OUT> {
         */
        public void invoke(StreamMockEnvironment mockEnv) throws Exception {
                this.mockEnv = checkNotNull(mockEnv);
-               this.task = taskFactory.apply(mockEnv);
 
                initializeInputs();
                initializeOutput();
 
+               this.task = taskFactory.apply(mockEnv);
+
                taskThread = new TaskThread(task);
                taskThread.start();
        }

Reply via email to