[FLINK-9087] [runtime] change the method signature of 
RecordWriter.broadcastEvent() from BufferConsumer to void

This closes #5802.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2787a1b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2787a1b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2787a1b8

Branch: refs/heads/release-1.5
Commit: 2787a1b818d15efdb809b96ce5bd7839a02a97a5
Parents: 0d198c2
Author: triones.deng <triones.d...@vipshop.com>
Authored: Tue Apr 3 10:28:52 2018 +0800
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 11 14:21:38 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/api/writer/RecordWriter.java   | 3 +--
 .../runtime/io/network/api/writer/RecordWriterTest.java     | 9 +++++++--
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2787a1b8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index c35c7f3..e3a8e49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -150,7 +150,7 @@ public class RecordWriter<T extends IOReadableWritable> {
                }
        }
 
-       public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException {
+       public void broadcastEvent(AbstractEvent event) throws IOException {
                try (BufferConsumer eventBufferConsumer = 
EventSerializer.toBufferConsumer(event)) {
                        for (int targetChannel = 0; targetChannel < 
numChannels; targetChannel++) {
                                RecordSerializer<T> serializer = 
serializers[targetChannel];
@@ -164,7 +164,6 @@ public class RecordWriter<T extends IOReadableWritable> {
                        if (flushAlways) {
                                flushAll();
                        }
-                       return eventBufferConsumer;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2787a1b8/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ec0dfe2..0b0a236 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -299,18 +299,23 @@ public class RecordWriterTest {
                        new CollectingPartitionWriter(queues, new 
TestPooledBufferProvider(Integer.MAX_VALUE));
                RecordWriter<?> writer = new RecordWriter<>(partition);
 
-               BufferConsumer bufferConsumer = 
writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
+               writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
 
                // Verify added to all queues
                assertEquals(1, queues[0].size());
                assertEquals(1, queues[1].size());
 
+               // get references to buffer consumers (copies from the original 
event buffer consumer)
+               BufferConsumer bufferConsumer1 = queues[0].getFirst();
+               BufferConsumer bufferConsumer2 = queues[1].getFirst();
+
                // process all collected events (recycles the buffer)
                for (int i = 0; i < queues.length; i++) {
                        assertTrue(parseBuffer(queues[i].remove(), 
i).isEvent());
                }
 
-               assertTrue(bufferConsumer.isRecycled());
+               assertTrue(bufferConsumer1.isRecycled());
+               assertTrue(bufferConsumer2.isRecycled());
        }
 
        /**

Reply via email to