Repository: flink Updated Branches: refs/heads/master 09ff410a0 -> fd08ad2e7
Revert "[FLINK-4894] [network] Don't request buffer after writing to partition" This reverts commit cbdb784dc24abba50674d054bb21c94dd7a559a5. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd08ad2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd08ad2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd08ad2e Branch: refs/heads/master Commit: fd08ad2e715a86d7ca5dfcb3220318b421440ff1 Parents: 09ff410 Author: Stephan Ewen <[email protected]> Authored: Thu Oct 27 20:05:29 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Oct 27 20:05:29 2016 +0200 ---------------------------------------------------------------------- .../serialization/SpanningRecordSerializer.java | 2 +- .../io/network/api/writer/RecordWriter.java | 58 +++--- .../io/network/api/writer/RecordWriterTest.java | 200 +------------------ 3 files changed, 34 insertions(+), 226 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fd08ad2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 65b3d20..7c4d937 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -188,7 +188,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R @Override public boolean hasData() { // either data in current target buffer or intermediate buffers - return (this.position > 0 && this.position < this.limit) || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining()); + return this.position > 0 || (this.lengthBuffer.hasRemaining() || this.dataBuffer.hasRemaining()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fd08ad2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index fb35843..96eea23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -47,7 +47,7 @@ import static org.apache.flink.runtime.io.network.api.serialization.RecordSerial */ public class RecordWriter<T extends IOReadableWritable> { - protected final ResultPartitionWriter targetPartition; + protected final ResultPartitionWriter writer; private final ChannelSelector<T> channelSelector; @@ -64,7 +64,7 @@ public class RecordWriter<T extends IOReadableWritable> { @SuppressWarnings("unchecked") public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) { - this.targetPartition = writer; + this.writer = writer; this.channelSelector = channelSelector; this.numChannels = writer.getNumberOfOutputChannels(); @@ -108,25 +108,15 @@ public class RecordWriter<T extends IOReadableWritable> { synchronized (serializer) { SerializationResult result = serializer.addRecord(record); - while (result.isFullBuffer()) { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - buffer = targetPartition.getBufferProvider().requestBufferBlocking(); - result = serializer.setNextBuffer(buffer); + writeBuffer(buffer, targetChannel, serializer); } + + buffer = writer.getBufferProvider().requestBufferBlocking(); + result = serializer.setNextBuffer(buffer); } } } @@ -136,14 +126,23 @@ public class RecordWriter<T extends IOReadableWritable> { RecordSerializer<T> serializer = serializers[targetChannel]; synchronized (serializer) { - Buffer buffer = serializer.getCurrentBuffer(); - if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); - } else if (serializer.hasData()) { - throw new IllegalStateException("No buffer, but serializer has buffered data."); - } - targetPartition.writeEvent(event, targetChannel); + if (serializer.hasData()) { + Buffer buffer = serializer.getCurrentBuffer(); + if (buffer == null) { + throw new IllegalStateException("Serializer has data but no buffer."); + } + + writeBuffer(buffer, targetChannel, serializer); + + writer.writeEvent(event, targetChannel); + + buffer = writer.getBufferProvider().requestBufferBlocking(); + serializer.setNextBuffer(buffer); + } + else { + writer.writeEvent(event, targetChannel); + } } } } @@ -155,12 +154,15 @@ public class RecordWriter<T extends IOReadableWritable> { synchronized (serializer) { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); + writeBuffer(buffer, targetChannel, serializer); + + buffer = writer.getBufferProvider().requestBufferBlocking(); + serializer.setNextBuffer(buffer); } } } - targetPartition.writeEndOfSuperstep(); + writer.writeEndOfSuperstep(); } public void flush() throws IOException { @@ -172,7 +174,7 @@ public class RecordWriter<T extends IOReadableWritable> { Buffer buffer = serializer.getCurrentBuffer(); if (buffer != null) { - writeAndClearBuffer(buffer, targetChannel, serializer); + writeBuffer(buffer, targetChannel, serializer); } } finally { serializer.clear(); @@ -222,13 +224,13 @@ public class RecordWriter<T extends IOReadableWritable> { * * <p> The buffer is cleared from the serializer state after a call to this method. */ - private void writeAndClearBuffer( + private void writeBuffer( Buffer buffer, int targetChannel, RecordSerializer<T> serializer) throws IOException { try { - targetPartition.writeBuffer(buffer, targetChannel); + writer.writeBuffer(buffer, targetChannel); } finally { serializer.clearCurrentBuffer(); http://git-wip-us.apache.org/repos/asf/flink/blob/fd08ad2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 43a93c6..70faf22 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -18,37 +18,25 @@ package org.apache.flink.runtime.io.network.api.writer; -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.event.AbstractEvent; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.io.network.util.TestTaskEvent; -import org.apache.flink.runtime.testutils.DiscardingRecycler; import org.apache.flink.types.IntValue; -import org.apache.flink.util.XORShiftRandom; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Queue; -import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -56,6 +44,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import static org.apache.flink.util.Preconditions.checkNotNull; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; @@ -304,160 +293,10 @@ public class RecordWriterTest { recordWriter.flush(); } - /** - * Tests broadcasting events when no records have been emitted yet. - */ - @Test - public void testBroadcastEventNoRecords() throws Exception { - int numChannels = 4; - int bufferSize = 32; - - @SuppressWarnings("unchecked") - Queue<BufferOrEvent>[] queues = new Queue[numChannels]; - for (int i = 0; i < numChannels; i++) { - queues[i] = new ArrayDeque<>(); - } - - BufferProvider bufferProvider = createBufferProvider(bufferSize); - - ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider); - RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>()); - CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L); - - // No records emitted yet, broadcast should not request a buffer - writer.broadcastEvent(barrier); - - verify(bufferProvider, times(0)).requestBufferBlocking(); - - for (Queue<BufferOrEvent> queue : queues) { - assertEquals(1, queue.size()); - BufferOrEvent boe = queue.remove(); - assertTrue(boe.isEvent()); - assertEquals(barrier, boe.getEvent()); - } - } - - /** - * Tests broadcasting events when records have been emitted. The emitted - * records cover all three {@link SerializationResult} types. - */ - @Test - public void testBroadcastEventMixedRecords() throws Exception { - Random rand = new XORShiftRandom(); - int numChannels = 4; - int bufferSize = 32; - int lenBytes = 4; // serialized length - - @SuppressWarnings("unchecked") - Queue<BufferOrEvent>[] queues = new Queue[numChannels]; - for (int i = 0; i < numChannels; i++) { - queues[i] = new ArrayDeque<>(); - } - - BufferProvider bufferProvider = createBufferProvider(bufferSize); - - ResultPartitionWriter partitionWriter = createCollectingPartitionWriter(queues, bufferProvider); - RecordWriter<ByteArrayIO> writer = new RecordWriter<>(partitionWriter, new RoundRobin<ByteArrayIO>()); - CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L); - - // Emit records on some channels first (requesting buffers), then - // broadcast the event. The record buffers should be emitted first, then - // the event. After the event, no new buffer should be requested. - - // (i) Smaller than the buffer size (single buffer request => 1) - byte[] bytes = new byte[bufferSize / 2]; - rand.nextBytes(bytes); - - writer.emit(new ByteArrayIO(bytes)); - - // (ii) Larger than the buffer size (two buffer requests => 1 + 2) - bytes = new byte[bufferSize + 1]; - rand.nextBytes(bytes); - - writer.emit(new ByteArrayIO(bytes)); - - // (iii) Exactly the buffer size (single buffer request => 1 + 2 + 1) - bytes = new byte[bufferSize - lenBytes]; - rand.nextBytes(bytes); - - writer.emit(new ByteArrayIO(bytes)); - - // (iv) Nothing on the 4th channel (no buffer request => 1 + 2 + 1 + 0 = 4) - - // (v) Broadcast the event - writer.broadcastEvent(barrier); - - verify(bufferProvider, times(4)).requestBufferBlocking(); - - assertEquals(2, queues[0].size()); // 1 buffer + 1 event - assertEquals(3, queues[1].size()); // 2 buffers + 1 event - assertEquals(2, queues[2].size()); // 1 buffer + 1 event - assertEquals(1, queues[3].size()); // 0 buffers + 1 event - } - // --------------------------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------------------------- - /** - * Creates a mock partition writer that collects the added buffers/events. - * - * <p>This much mocking should not be necessary with better designed - * interfaces. Refactoring this will take too much time now though, hence - * the mocking. Ideally, we will refactor all of this mess in order to make - * our lives easier and test it better. - */ - private ResultPartitionWriter createCollectingPartitionWriter( - final Queue<BufferOrEvent>[] queues, - BufferProvider bufferProvider) throws IOException { - - int numChannels = queues.length; - - ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class); - when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferProvider)); - when(partitionWriter.getNumberOfOutputChannels()).thenReturn(numChannels); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Buffer buffer = (Buffer) invocationOnMock.getArguments()[0]; - Integer targetChannel = (Integer) invocationOnMock.getArguments()[1]; - queues[targetChannel].add(new BufferOrEvent(buffer, targetChannel)); - return null; - } - }).when(partitionWriter).writeBuffer(any(Buffer.class), anyInt()); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0]; - Integer targetChannel = (Integer) invocationOnMock.getArguments()[1]; - queues[targetChannel].add(new BufferOrEvent(event, targetChannel)); - return null; - } - }).when(partitionWriter).writeEvent(any(AbstractEvent.class), anyInt()); - - return partitionWriter; - } - - private BufferProvider createBufferProvider(final int bufferSize) - throws IOException, InterruptedException { - - BufferProvider bufferProvider = mock(BufferProvider.class); - when(bufferProvider.requestBufferBlocking()).thenAnswer( - new Answer<Buffer>() { - @Override - public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable { - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize); - Buffer buffer = new Buffer(segment, DiscardingRecycler.INSTANCE); - return buffer; - } - } - ); - - return bufferProvider; - } - private BufferProvider createBufferProvider(Buffer... buffers) throws IOException, InterruptedException { @@ -489,37 +328,4 @@ public class RecordWriterTest { return partitionWriter; } - - private static class ByteArrayIO implements IOReadableWritable { - - private final byte[] bytes; - - public ByteArrayIO(byte[] bytes) { - this.bytes = bytes; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.write(bytes); - } - - @Override - public void read(DataInputView in) throws IOException { - in.read(bytes); - } - } - - /** - * RoundRobin channel selector starting at 0 ({@link RoundRobinChannelSelector} starts at 1). - */ - private static class RoundRobin<T extends IOReadableWritable> implements ChannelSelector<T> { - - private int[] nextChannel = new int[] { -1 }; - - @Override - public int[] selectChannels(final T record, final int numberOfOutputChannels) { - nextChannel[0] = (nextChannel[0] + 1) % numberOfOutputChannels; - return nextChannel; - } - } }
