[FLINK-7749][network] Refactor ResultPartitionWriter into an interface

This closes #5127.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94123cec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94123cec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94123cec

Branch: refs/heads/master
Commit: 94123cec718a3edbf1199fd703627f0081e39065
Parents: 175e1b3
Author: Nico Kruber <[email protected]>
Authored: Wed Dec 6 10:25:26 2017 +0100
Committer: zentol <[email protected]>
Committed: Tue Dec 12 19:09:07 2017 +0100

----------------------------------------------------------------------
 .../runtime/io/network/NetworkEnvironment.java  | 24 +-----
 .../io/network/api/writer/RecordWriter.java     | 10 +--
 .../api/writer/ResultPartitionWriter.java       | 65 ++++++----------
 .../io/network/partition/ResultPartition.java   | 15 ++--
 .../iterative/task/IterationHeadTask.java       |  2 +-
 .../apache/flink/runtime/taskmanager/Task.java  | 12 +--
 .../io/network/NetworkEnvironmentTest.java      |  6 --
 .../io/network/api/writer/RecordWriterTest.java |  6 +-
 .../api/writer/ResultPartitionWriterTest.java   | 79 --------------------
 .../network/partition/ResultPartitionTest.java  | 51 +++++++++++--
 .../io/network/util/TestPartitionProducer.java  |  4 +-
 .../operators/testutils/MockEnvironment.java    |  2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  2 +-
 .../runtime/io/StreamRecordWriterTest.java      |  2 +-
 .../runtime/tasks/StreamMockEnvironment.java    |  4 +-
 15 files changed, 92 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index f2619e8..6c01350 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -174,11 +173,6 @@ public class NetworkEnvironment {
 
        public void registerTask(Task task) throws IOException {
                final ResultPartition[] producedPartitions = 
task.getProducedPartitions();
-               final ResultPartitionWriter[] writers = task.getAllWriters();
-
-               if (writers.length != producedPartitions.length) {
-                       throw new IllegalStateException("Unequal number of 
writers and partitions.");
-               }
 
                synchronized (lock) {
                        if (isShutdown) {
@@ -187,7 +181,6 @@ public class NetworkEnvironment {
 
                        for (int i = 0; i < producedPartitions.length; i++) {
                                final ResultPartition partition = 
producedPartitions[i];
-                               final ResultPartitionWriter writer = writers[i];
 
                                // Buffer pool for the partition
                                BufferPool bufferPool = null;
@@ -214,7 +207,7 @@ public class NetworkEnvironment {
                                }
 
                                // Register writer with task event dispatcher
-                               
taskEventDispatcher.registerPartition(writer.getPartitionId());
+                               
taskEventDispatcher.registerPartition(partition.getPartitionId());
                        }
 
                        // Setup the buffer pool for each buffer reader
@@ -263,18 +256,9 @@ public class NetworkEnvironment {
                                
resultPartitionManager.releasePartitionsProducedBy(executionId, 
task.getFailureCause());
                        }
 
-                       ResultPartitionWriter[] writers = task.getAllWriters();
-                       if (writers != null) {
-                               for (ResultPartitionWriter writer : writers) {
-                                       
taskEventDispatcher.unregisterPartition(writer.getPartitionId());
-                               }
-                       }
-
-                       ResultPartition[] partitions = 
task.getProducedPartitions();
-                       if (partitions != null) {
-                               for (ResultPartition partition : partitions) {
-                                       partition.destroyBufferPool();
-                               }
+                       for (ResultPartition partition : 
task.getProducedPartitions()) {
+                               
taskEventDispatcher.unregisterPartition(partition.getPartitionId());
+                               partition.destroyBufferPool();
                        }
 
                        final SingleInputGate[] inputGates = 
task.getAllInputGates();

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 623dc62..4729800 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -36,11 +36,11 @@ import static 
org.apache.flink.runtime.io.network.api.serialization.RecordSerial
 
 /**
  * A record-oriented runtime result writer.
- * <p>
- * The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and 
takes care of
+ *
+ * <p>The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and 
takes care of
  * serializing records into buffers.
- * <p>
- * <strong>Important</strong>: it is necessary to call {@link #flush()} after
+ *
+ * <p><strong>Important</strong>: it is necessary to call {@link #flush()} 
after
  * all records have been written with {@link #emit(IOReadableWritable)}. This
  * ensures that all produced records are written to the output stream (incl.
  * partially filled ones).
@@ -71,7 +71,7 @@ public class RecordWriter<T extends IOReadableWritable> {
                this.targetPartition = writer;
                this.channelSelector = channelSelector;
 
-               this.numChannels = writer.getNumberOfOutputChannels();
+               this.numChannels = writer.getNumberOfSubpartitions();
 
                /**
                 * The runtime exposes a channel abstraction for the produced 
results

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 777c7ad..3a66e53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -20,73 +20,50 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import java.io.IOException;
 
 /**
- * A buffer-oriented runtime result writer.
- * <p>
- * The {@link ResultPartitionWriter} is the runtime API for producing results. 
It
- * supports two kinds of data to be sent: buffers and events.
+ * A buffer-oriented runtime result writer API for producing results.
  */
-public class ResultPartitionWriter {
+public interface ResultPartitionWriter {
 
-       private final ResultPartition partition;
+       BufferProvider getBufferProvider();
 
-       public ResultPartitionWriter(ResultPartition partition) {
-               this.partition = partition;
-       }
-
-       // 
------------------------------------------------------------------------
-       // Attributes
-       // 
------------------------------------------------------------------------
+       ResultPartitionID getPartitionId();
 
-       public ResultPartitionID getPartitionId() {
-               return partition.getPartitionId();
-       }
+       int getNumberOfSubpartitions();
 
-       public BufferProvider getBufferProvider() {
-               return partition.getBufferProvider();
-       }
+       int getNumTargetKeyGroups();
 
-       public int getNumberOfOutputChannels() {
-               return partition.getNumberOfSubpartitions();
-       }
-
-       public int getNumTargetKeyGroups() {
-               return partition.getNumTargetKeyGroups();
-       }
-
-       // 
------------------------------------------------------------------------
-       // Data processing
-       // 
------------------------------------------------------------------------
-
-       public void writeBuffer(Buffer buffer, int targetChannel) throws 
IOException {
-               partition.add(buffer, targetChannel);
-       }
+       /**
+        * Adds a buffer to the subpartition with the given index.
+        *
+        * <p>For PIPELINED {@link 
org.apache.flink.runtime.io.network.partition.ResultPartitionType}s,
+        * this will trigger the deployment of consuming tasks after the first 
buffer has been added.
+        */
+       void writeBuffer(Buffer buffer, int subpartitionIndex) throws 
IOException;
 
        /**
-        * Writes the given buffer to all available target channels.
+        * Writes the given buffer to all available target subpartitions.
         *
-        * The buffer is taken over and used for each of the channels.
+        * <p>The buffer is taken over and used for each of the channels.
         * It will be recycled afterwards.
         *
-        * @param eventBuffer the buffer to write
-        * @throws IOException
+        * @param buffer the buffer to write
         */
-       public void writeBufferToAllChannels(final Buffer eventBuffer) throws 
IOException {
+       default void writeBufferToAllSubpartitions(final Buffer buffer) throws 
IOException {
                try {
-                       for (int targetChannel = 0; targetChannel < 
partition.getNumberOfSubpartitions(); targetChannel++) {
+                       for (int subpartition = 0; subpartition < 
getNumberOfSubpartitions(); subpartition++) {
                                // retain the buffer so that it can be recycled 
by each channel of targetPartition
-                               eventBuffer.retain();
-                               writeBuffer(eventBuffer, targetChannel);
+                               buffer.retain();
+                               writeBuffer(buffer, subpartition);
                        }
                } finally {
                        // we do not need to further retain the eventBuffer
                        // (it will be recycled after the last channel stops 
using it)
-                       eventBuffer.recycle();
+                       buffer.recycle();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 9b02e4d..be050b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
@@ -74,7 +75,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <h2>State management</h2>
  */
-public class ResultPartition implements BufferPoolOwner {
+public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner 
{
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ResultPartition.class);
        
@@ -209,10 +210,12 @@ public class ResultPartition implements BufferPoolOwner {
                return partitionId;
        }
 
+       @Override
        public int getNumberOfSubpartitions() {
                return subpartitions.length;
        }
 
+       @Override
        public BufferProvider getBufferProvider() {
                return bufferPool;
        }
@@ -260,13 +263,8 @@ public class ResultPartition implements BufferPoolOwner {
 
        // 
------------------------------------------------------------------------
 
-       /**
-        * Adds a buffer to the subpartition with the given index.
-        *
-        * <p> For PIPELINED results, this will trigger the deployment of 
consuming tasks after the
-        * first buffer has been added.
-        */
-       public void add(Buffer buffer, int subpartitionIndex) throws 
IOException {
+       @Override
+       public void writeBuffer(Buffer buffer, int subpartitionIndex) throws 
IOException {
                boolean success = false;
 
                try {
@@ -381,6 +379,7 @@ public class ResultPartition implements BufferPoolOwner {
                return cause;
        }
 
+       @Override
        public int getNumTargetKeyGroups() {
                return numTargetKeyGroups;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
index 65dd8ac..2a95a65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -441,6 +441,6 @@ public class IterationHeadTask<X, Y, S extends Function, 
OT> extends AbstractIte
                        log.info(formatLogString("sending " + 
WorkerDoneEvent.class.getSimpleName() + " to sync"));
                }
 
-               
this.toSync.writeBufferToAllChannels(EventSerializer.toBuffer(event));
+               
this.toSync.writeBufferToAllSubpartitions(EventSerializer.toBuffer(event));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e54adb9..a049063 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -51,7 +51,6 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
@@ -187,8 +186,6 @@ public class Task implements Runnable, TaskActions {
 
        private final ResultPartition[] producedPartitions;
 
-       private final ResultPartitionWriter[] writers;
-
        private final SingleInputGate[] inputGates;
 
        private final Map<IntermediateDataSetID, SingleInputGate> 
inputGatesById;
@@ -360,7 +357,6 @@ public class Task implements Runnable, TaskActions {
 
                // Produced intermediate result partitions
                this.producedPartitions = new 
ResultPartition[resultPartitionDeploymentDescriptors.size()];
-               this.writers = new 
ResultPartitionWriter[resultPartitionDeploymentDescriptors.size()];
 
                int counter = 0;
 
@@ -380,8 +376,6 @@ public class Task implements Runnable, TaskActions {
                                ioManager,
                                desc.sendScheduleOrUpdateConsumersMessage());
 
-                       writers[counter] = new 
ResultPartitionWriter(producedPartitions[counter]);
-
                        ++counter;
                }
 
@@ -445,10 +439,6 @@ public class Task implements Runnable, TaskActions {
                return this.taskConfiguration;
        }
 
-       public ResultPartitionWriter[] getAllWriters() {
-               return writers;
-       }
-
        public SingleInputGate[] getAllInputGates() {
                return inputGates;
        }
@@ -682,7 +672,7 @@ public class Task implements Runnable, TaskActions {
                                kvStateRegistry,
                                inputSplitProvider,
                                distributedCacheEntries,
-                               writers,
+                               producedPartitions,
                                inputGates,
                                network.getTaskEventDispatcher(),
                                checkpointResponder,

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 123082f..ba92bdf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -37,8 +37,6 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
-
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -82,9 +80,6 @@ public class NetworkEnvironmentTest {
                ResultPartition rp3 = 
createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
                ResultPartition rp4 = 
createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8);
                final ResultPartition[] resultPartitions = new 
ResultPartition[] {rp1, rp2, rp3, rp4};
-               final ResultPartitionWriter[] resultPartitionWriters = new 
ResultPartitionWriter[] {
-                       new ResultPartitionWriter(rp1), new 
ResultPartitionWriter(rp2),
-                       new ResultPartitionWriter(rp3), new 
ResultPartitionWriter(rp4)};
 
                // input gates
                SingleInputGate ig1 = 
createSingleInputGateMock(ResultPartitionType.PIPELINED, 2);
@@ -96,7 +91,6 @@ public class NetworkEnvironmentTest {
                // overall task to register
                Task task = mock(Task.class);
                when(task.getProducedPartitions()).thenReturn(resultPartitions);
-               when(task.getAllWriters()).thenReturn(resultPartitionWriters);
                when(task.getAllInputGates()).thenReturn(inputGates);
 
                network.registerTask(task);

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ff001c2..9509013 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -181,7 +181,7 @@ public class RecordWriterTest {
 
                        ResultPartitionWriter partitionWriter = 
mock(ResultPartitionWriter.class);
                        
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
-                       
when(partitionWriter.getNumberOfOutputChannels()).thenReturn(1);
+                       
when(partitionWriter.getNumberOfSubpartitions()).thenReturn(1);
 
                        // Recycle buffer and throw Exception
                        doAnswer(new Answer<Void>() {
@@ -454,7 +454,7 @@ public class RecordWriterTest {
 
                ResultPartitionWriter partitionWriter = 
mock(ResultPartitionWriter.class);
                
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferProvider));
-               
when(partitionWriter.getNumberOfOutputChannels()).thenReturn(numChannels);
+               
when(partitionWriter.getNumberOfSubpartitions()).thenReturn(numChannels);
 
                doAnswer(new Answer<Void>() {
                        @Override
@@ -512,7 +512,7 @@ public class RecordWriterTest {
 
                ResultPartitionWriter partitionWriter = 
mock(ResultPartitionWriter.class);
                
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferProvider));
-               when(partitionWriter.getNumberOfOutputChannels()).thenReturn(1);
+               when(partitionWriter.getNumberOfSubpartitions()).thenReturn(1);
 
                // Recycle each written buffer.
                doAnswer(new Answer<Void>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
deleted file mode 100644
index 3b54247..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriterTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.writer;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class ResultPartitionWriterTest {
-
-       // 
---------------------------------------------------------------------------------------------
-       // Resource release tests
-       // 
---------------------------------------------------------------------------------------------
-
-       /**
-        * Tests that event buffers are properly recycled when broadcasting 
events
-        * to multiple channels.
-        *
-        * @throws Exception
-        */
-       @Test
-       public void testWriteBufferToAllChannelsReferenceCounting() throws 
Exception {
-               Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
-
-               ResultPartition partition = new ResultPartition(
-                       "TestTask",
-                       mock(TaskActions.class),
-                       new JobID(),
-                       new ResultPartitionID(),
-                       ResultPartitionType.PIPELINED,
-                       2,
-                       2,
-                       mock(ResultPartitionManager.class),
-                       mock(ResultPartitionConsumableNotifier.class),
-                       mock(IOManager.class),
-                       false);
-               ResultPartitionWriter partitionWriter =
-                       new ResultPartitionWriter(
-                               partition);
-
-               partitionWriter.writeBufferToAllChannels(buffer);
-
-               // Verify added to all queues, i.e. two buffers in total
-               assertEquals(2, partition.getTotalNumberOfBuffers());
-               // release the buffers in the partition
-               partition.release();
-
-               assertTrue(buffer.isRecycled());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 0cd3591..9fb7fd3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -20,12 +20,16 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -44,7 +48,7 @@ public class ResultPartitionTest {
                        // Pipelined, send message => notify
                        ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
                        ResultPartition partition = createPartition(notifier, 
ResultPartitionType.PIPELINED, true);
-                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       partition.writeBuffer(TestBufferFactory.createBuffer(), 
0);
                        verify(notifier, 
times(1)).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
                }
 
@@ -52,7 +56,7 @@ public class ResultPartitionTest {
                        // Pipelined, don't send message => don't notify
                        ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
                        ResultPartition partition = createPartition(notifier, 
ResultPartitionType.PIPELINED, false);
-                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       partition.writeBuffer(TestBufferFactory.createBuffer(), 
0);
                        verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
                }
 
@@ -60,7 +64,7 @@ public class ResultPartitionTest {
                        // Blocking, send message => don't notify
                        ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
                        ResultPartition partition = createPartition(notifier, 
ResultPartitionType.BLOCKING, true);
-                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       partition.writeBuffer(TestBufferFactory.createBuffer(), 
0);
                        verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
                }
 
@@ -68,7 +72,7 @@ public class ResultPartitionTest {
                        // Blocking, don't send message => don't notify
                        ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
                        ResultPartition partition = createPartition(notifier, 
ResultPartitionType.BLOCKING, false);
-                       partition.add(TestBufferFactory.createBuffer(), 0);
+                       partition.writeBuffer(TestBufferFactory.createBuffer(), 
0);
                        verify(notifier, 
never()).notifyPartitionConsumable(any(JobID.class), 
any(ResultPartitionID.class), any(TaskActions.class));
                }
        }
@@ -84,7 +88,7 @@ public class ResultPartitionTest {
        }
 
        /**
-        * Tests {@link ResultPartition#add} on a partition which has already 
finished.
+        * Tests {@link ResultPartition#writeBuffer} on a partition which has 
already finished.
         *
         * @param pipelined the result partition type to set up
         */
@@ -97,7 +101,7 @@ public class ResultPartitionTest {
                        partition.finish();
                        reset(notifier);
                        // partition.add() should fail
-                       partition.add(buffer, 0);
+                       partition.writeBuffer(buffer, 0);
                        Assert.fail("exception expected");
                } catch (IllegalStateException e) {
                        // expected => ignored
@@ -122,7 +126,7 @@ public class ResultPartitionTest {
        }
 
        /**
-        * Tests {@link ResultPartition#add} on a partition which has already 
been released.
+        * Tests {@link ResultPartition#writeBuffer} on a partition which has 
already been released.
         *
         * @param pipelined the result partition type to set up
         */
@@ -134,7 +138,7 @@ public class ResultPartitionTest {
                        ResultPartition partition = createPartition(notifier, 
pipelined, true);
                        partition.release();
                        // partition.add() silently drops the buffer but 
recycles it
-                       partition.add(buffer, 0);
+                       partition.writeBuffer(buffer, 0);
                } finally {
                        if (!buffer.isRecycled()) {
                                Assert.fail("buffer not recycled");
@@ -145,6 +149,37 @@ public class ResultPartitionTest {
                }
        }
 
+       /**
+        * Tests that event buffers are properly added and recycled when 
broadcasting events
+        * to multiple channels.
+        */
+       @Test
+       public void testWriteBufferToAllSubpartitionsReferenceCounting() throws 
Exception {
+               Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+
+               ResultPartition partition = new ResultPartition(
+                       "TestTask",
+                       mock(TaskActions.class),
+                       new JobID(),
+                       new ResultPartitionID(),
+                       ResultPartitionType.PIPELINED,
+                       2,
+                       2,
+                       mock(ResultPartitionManager.class),
+                       mock(ResultPartitionConsumableNotifier.class),
+                       mock(IOManager.class),
+                       false);
+
+               partition.writeBufferToAllSubpartitions(buffer);
+
+               // Verify added to all queues, i.e. two buffers in total
+               assertEquals(2, partition.getTotalNumberOfBuffers());
+               // release the buffers in the partition
+               partition.release();
+
+               assertTrue(buffer.isRecycled());
+       }
+
        // 
------------------------------------------------------------------------
 
        private static ResultPartition createPartition(

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
index e694dfe..53f95c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
@@ -77,12 +77,12 @@ public class TestPartitionProducer implements 
Callable<Boolean> {
                                int targetChannelIndex = 
bufferOrEvent.getChannelIndex();
 
                                if (bufferOrEvent.isBuffer()) {
-                                       
partition.add(bufferOrEvent.getBuffer(), targetChannelIndex);
+                                       
partition.writeBuffer(bufferOrEvent.getBuffer(), targetChannelIndex);
                                }
                                else if (bufferOrEvent.isEvent()) {
                                        final Buffer buffer = 
EventSerializer.toBuffer(bufferOrEvent.getEvent());
 
-                                       partition.add(buffer, 
targetChannelIndex);
+                                       partition.writeBuffer(buffer, 
targetChannelIndex);
                                }
                                else {
                                        throw new 
IllegalStateException("BufferOrEvent instance w/o buffer nor event.");

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index c8ca654..f655b12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -204,7 +204,7 @@ public class MockEnvironment implements Environment {
                        });
 
                        ResultPartitionWriter mockWriter = 
mock(ResultPartitionWriter.class);
-                       
when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
+                       
when(mockWriter.getNumberOfSubpartitions()).thenReturn(1);
                        
when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
 
                        final Record record = new Record();

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index eff8a29..a3b16e6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -632,7 +632,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                                for (ResultPartitionWriter output : 
getEnvironment().getAllWriters()) {
                                        try {
-                                               
output.writeBufferToAllChannels(EventSerializer.toBuffer(message));
+                                               
output.writeBufferToAllSubpartitions(EventSerializer.toBuffer(message));
                                        } catch (Exception e) {
                                                exception = 
ExceptionUtils.firstOrSuppressed(
                                                        new Exception("Could 
not send cancel checkpoint marker to downstream tasks.", e),

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index b1b86b1..78d4303 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -98,7 +98,7 @@ public class StreamRecordWriterTest {
 
                ResultPartitionWriter mockWriter = 
mock(ResultPartitionWriter.class);
                when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
-               
when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions);
+               
when(mockWriter.getNumberOfSubpartitions()).thenReturn(numPartitions);
 
                return mockWriter;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/94123cec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 6b6506a..277ca51 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -161,7 +161,7 @@ public class StreamMockEnvironment implements Environment {
                        });
 
                        ResultPartitionWriter mockWriter = 
mock(ResultPartitionWriter.class);
-                       
when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
+                       
when(mockWriter.getNumberOfSubpartitions()).thenReturn(1);
                        
when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
 
                        final RecordDeserializer<DeserializationDelegate<T>> 
recordDeserializer = new 
AdaptiveSpanningRecordDeserializer<DeserializationDelegate<T>>();
@@ -186,7 +186,7 @@ public class StreamMockEnvironment implements Environment {
                                        
addBufferToOutputList(recordDeserializer, delegate, buffer, outputList);
                                        return null;
                                }
-                       
}).when(mockWriter).writeBufferToAllChannels(any(Buffer.class));
+                       
}).when(mockWriter).writeBufferToAllSubpartitions(any(Buffer.class));
 
                        outputs.add(mockWriter);
                }

Reply via email to