[FLINK-9087] [runtime] change the method signature of RecordWriter.broadcastEvent() from BufferConsumer to void
This closes #5802. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a5a64ab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a5a64ab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a5a64ab Branch: refs/heads/master Commit: 0a5a64ab746107c36a207a7cc62a75a88eabf1ae Parents: 4742c34 Author: triones.deng <triones.d...@vipshop.com> Authored: Tue Apr 3 10:28:52 2018 +0800 Committer: zentol <ches...@apache.org> Committed: Wed Apr 11 14:21:36 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/io/network/api/writer/RecordWriter.java | 3 +-- .../runtime/io/network/api/writer/RecordWriterTest.java | 9 +++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0a5a64ab/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index c35c7f3..e3a8e49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -150,7 +150,7 @@ public class RecordWriter<T extends IOReadableWritable> { } } - public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException { + public void broadcastEvent(AbstractEvent event) throws IOException { try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) { for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) { RecordSerializer<T> serializer = serializers[targetChannel]; @@ -164,7 +164,6 @@ public class RecordWriter<T extends IOReadableWritable> { if (flushAlways) { flushAll(); } - return eventBufferConsumer; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0a5a64ab/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index ec0dfe2..0b0a236 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -299,18 +299,23 @@ public class RecordWriterTest { new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE)); RecordWriter<?> writer = new RecordWriter<>(partition); - BufferConsumer bufferConsumer = writer.broadcastEvent(EndOfPartitionEvent.INSTANCE); + writer.broadcastEvent(EndOfPartitionEvent.INSTANCE); // Verify added to all queues assertEquals(1, queues[0].size()); assertEquals(1, queues[1].size()); + // get references to buffer consumers (copies from the original event buffer consumer) + BufferConsumer bufferConsumer1 = queues[0].getFirst(); + BufferConsumer bufferConsumer2 = queues[1].getFirst(); + // process all collected events (recycles the buffer) for (int i = 0; i < queues.length; i++) { assertTrue(parseBuffer(queues[i].remove(), i).isEvent()); } - assertTrue(bufferConsumer.isRecycled()); + assertTrue(bufferConsumer1.isRecycled()); + assertTrue(bufferConsumer2.isRecycled()); } /**