[FLINK-8582][runtime] Optimize BufferBuilder writes By introducing #commit() method on critical path we reduce number of volatile writes from 2 down to 1. This improves network throughput by 20% and restores the orignal performance for high latency cases.
This closes #5423. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc55d7a0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc55d7a0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc55d7a0 Branch: refs/heads/master Commit: bc55d7a084f5bf109c3bc2ff134699c997648552 Parents: 4c38b38 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Thu Feb 8 09:16:30 2018 +0100 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Mon Feb 19 15:05:24 2018 +0100 ---------------------------------------------------------------------- .../serialization/SpanningRecordSerializer.java | 32 +++++++-------- .../io/network/buffer/BufferBuilder.java | 23 +++++++++++ .../buffer/BufferBuilderAndConsumerTest.java | 42 ++++++++++++++------ .../network/buffer/BufferBuilderTestUtils.java | 2 +- .../partition/PipelinedSubpartitionTest.java | 4 +- .../consumer/LocalInputChannelTest.java | 2 +- 6 files changed, 70 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index ba8e659..d7befeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; @@ -94,8 +93,11 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R dataBuffer = serializationBuffer.wrapAsByteBuffer(); // Copy from intermediate buffers to current target memory segment - copyToTargetBufferFrom(lengthBuffer); - copyToTargetBufferFrom(dataBuffer); + if (targetBuffer != null) { + targetBuffer.append(lengthBuffer); + targetBuffer.append(dataBuffer); + targetBuffer.commit(); + } return getSerializationResult(); } @@ -104,12 +106,19 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R public SerializationResult setNextBufferBuilder(BufferBuilder buffer) throws IOException { targetBuffer = buffer; + boolean mustCommit = false; if (lengthBuffer.hasRemaining()) { - copyToTargetBufferFrom(lengthBuffer); + targetBuffer.append(lengthBuffer); + mustCommit = true; } if (dataBuffer.hasRemaining()) { - copyToTargetBufferFrom(dataBuffer); + targetBuffer.append(dataBuffer); + mustCommit = true; + } + + if (mustCommit) { + targetBuffer.commit(); } SerializationResult result = getSerializationResult(); @@ -124,19 +133,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R return result; } - /** - * Copies as many bytes as possible from the given {@link ByteBuffer} to the {@link MemorySegment} of the target - * {@link Buffer} and advances the current position by the number of written bytes. - * - * @param source the {@link ByteBuffer} to copy data from - */ - private void copyToTargetBufferFrom(ByteBuffer source) { - if (targetBuffer == null) { - return; - } - targetBuffer.append(source); - } - private SerializationResult getSerializationResult() { if (dataBuffer.hasRemaining() || lengthBuffer.hasRemaining()) { return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL; http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java index bac141f..63b60d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java @@ -61,6 +61,15 @@ public class BufferBuilder { } /** + * Same as {@link #append(ByteBuffer)} but additionally {@link #commit()} the appending. + */ + public int appendAndCommit(ByteBuffer source) { + int writtenBytes = append(source); + commit(); + return writtenBytes; + } + + /** * Append as many data as possible from {@code source}. Not everything might be copied if there is not enough * space in the underlying {@link MemorySegment} * @@ -79,6 +88,14 @@ public class BufferBuilder { } /** + * Make the change visible to the readers. This is costly operation (volatile access) thus in case of bulk writes + * it's better to commit them all together instead one by one. + */ + public void commit() { + positionMarker.commit(); + } + + /** * Mark this {@link BufferBuilder} and associated {@link BufferConsumer} as finished - no new data writes will be * allowed. * @@ -87,6 +104,7 @@ public class BufferBuilder { public int finish() { checkState(!isFinished()); positionMarker.markFinished(); + commit(); return getWrittenBytes(); } @@ -138,6 +156,8 @@ public class BufferBuilder { * * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently * of one another - for example the cached values can not accidentally leak from one to another. + * + * <p>Remember to commit the {@link SettablePositionMarker} to make the changes visible. */ private static class SettablePositionMarker implements PositionMarker { private volatile int position = 0; @@ -174,6 +194,9 @@ public class BufferBuilder { public void set(int value) { cachedPosition = value; + } + + public void commit() { position = cachedPosition; } } http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java index a20397d..edf2bfe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java @@ -44,7 +44,7 @@ public class BufferBuilderAndConsumerTest { BufferBuilder bufferBuilder = createBufferBuilder(); BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); - assertEquals(3 * Integer.BYTES, bufferBuilder.append(toByteBuffer(1, 2, 3))); + assertEquals(3 * Integer.BYTES, bufferBuilder.appendAndCommit(toByteBuffer(1, 2, 3))); Buffer buffer = bufferConsumer.build(); assertFalse(buffer.isRecycled()); @@ -61,7 +61,7 @@ public class BufferBuilderAndConsumerTest { int[] intsToWrite = new int[] {0, 1, 2, 3, 42}; ByteBuffer bytesToWrite = toByteBuffer(intsToWrite); - assertEquals(bytesToWrite.limit(), bufferBuilder.append(bytesToWrite)); + assertEquals(bytesToWrite.limit(), bufferBuilder.appendAndCommit(bytesToWrite)); assertEquals(bytesToWrite.limit(), bytesToWrite.position()); assertFalse(bufferBuilder.isFull()); @@ -74,10 +74,26 @@ public class BufferBuilderAndConsumerTest { BufferBuilder bufferBuilder = createBufferBuilder(); BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); + bufferBuilder.appendAndCommit(toByteBuffer(0, 1)); + bufferBuilder.appendAndCommit(toByteBuffer(2)); + bufferBuilder.appendAndCommit(toByteBuffer(3, 42)); + + assertContent(bufferConsumer, 0, 1, 2, 3, 42); + } + + @Test + public void multipleNotCommittedAppends() { + BufferBuilder bufferBuilder = createBufferBuilder(); + BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); + bufferBuilder.append(toByteBuffer(0, 1)); bufferBuilder.append(toByteBuffer(2)); bufferBuilder.append(toByteBuffer(3, 42)); + assertContent(bufferConsumer); + + bufferBuilder.commit(); + assertContent(bufferConsumer, 0, 1, 2, 3, 42); } @@ -87,14 +103,14 @@ public class BufferBuilderAndConsumerTest { BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 42); - assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite)); + assertEquals(BUFFER_SIZE, bufferBuilder.appendAndCommit(bytesToWrite)); assertTrue(bufferBuilder.isFull()); assertContent(bufferConsumer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9); bufferBuilder = createBufferBuilder(); bufferConsumer = bufferBuilder.createBufferConsumer(); - assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite)); + assertEquals(Integer.BYTES, bufferBuilder.appendAndCommit(bytesToWrite)); assertFalse(bufferBuilder.isFull()); assertContent(bufferConsumer, 42); @@ -112,18 +128,18 @@ public class BufferBuilderAndConsumerTest { BufferBuilder bufferBuilder = createBufferBuilder(); BufferConsumer bufferConsumer1 = bufferBuilder.createBufferConsumer(); - bufferBuilder.append(toByteBuffer(0, 1)); + bufferBuilder.appendAndCommit(toByteBuffer(0, 1)); BufferConsumer bufferConsumer2 = bufferConsumer1.copy(); - bufferBuilder.append(toByteBuffer(2)); + bufferBuilder.appendAndCommit(toByteBuffer(2)); assertContent(bufferConsumer1, 0, 1, 2); assertContent(bufferConsumer2, 0, 1, 2); BufferConsumer bufferConsumer3 = bufferConsumer1.copy(); - bufferBuilder.append(toByteBuffer(3, 42)); + bufferBuilder.appendAndCommit(toByteBuffer(3, 42)); BufferConsumer bufferConsumer4 = bufferConsumer1.copy(); @@ -144,19 +160,19 @@ public class BufferBuilderAndConsumerTest { public void buildingBufferMultipleTimes() { BufferBuilder bufferBuilder = createBufferBuilder(); try (BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer()) { - bufferBuilder.append(toByteBuffer(0, 1)); - bufferBuilder.append(toByteBuffer(2)); + bufferBuilder.appendAndCommit(toByteBuffer(0, 1)); + bufferBuilder.appendAndCommit(toByteBuffer(2)); assertContent(bufferConsumer, 0, 1, 2); - bufferBuilder.append(toByteBuffer(3, 42)); - bufferBuilder.append(toByteBuffer(44)); + bufferBuilder.appendAndCommit(toByteBuffer(3, 42)); + bufferBuilder.appendAndCommit(toByteBuffer(44)); assertContent(bufferConsumer, 3, 42, 44); ArrayList<Integer> originalValues = new ArrayList<>(); while (!bufferBuilder.isFull()) { - bufferBuilder.append(toByteBuffer(1337)); + bufferBuilder.appendAndCommit(toByteBuffer(1337)); originalValues.add(1337); } @@ -184,7 +200,7 @@ public class BufferBuilderAndConsumerTest { BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); for (int i = 0; i < writes; i++) { - assertEquals(Integer.BYTES, bufferBuilder.append(toByteBuffer(42))); + assertEquals(Integer.BYTES, bufferBuilder.appendAndCommit(toByteBuffer(42))); } assertFalse(bufferBuilder.isFinished()); http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java index ead42df..a6e9fdc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java @@ -43,7 +43,7 @@ public class BufferBuilderTestUtils { BufferBuilder bufferBuilder = new BufferBuilder( MemorySegmentFactory.allocateUnpooledSegment(size), FreeingBufferRecycler.INSTANCE); - bufferBuilder.append(ByteBuffer.allocate(dataSize)); + bufferBuilder.appendAndCommit(ByteBuffer.allocate(dataSize)); return bufferBuilder; } http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 4f3a5f9..2ca01c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -131,7 +131,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { assertEquals(0, availablityListener.getNumNotifications()); BufferBuilder bufferBuilder = createBufferBuilder(); - bufferBuilder.append(ByteBuffer.allocate(1024)); + bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024)); subpartition.add(bufferBuilder.createBufferConsumer()); assertNextBuffer(readView, 1024, false, 1); @@ -360,7 +360,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase { next++; } - checkState(bufferBuilder.append(ByteBuffer.wrap(segment.getArray())) == segmentSize); + checkState(bufferBuilder.appendAndCommit(ByteBuffer.wrap(segment.getArray())) == segmentSize); bufferBuilder.finish(); numberOfBuffers++; http://git-wip-us.apache.org/repos/asf/flink/blob/bc55d7a0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index d5c2492..c78b7b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -449,7 +449,7 @@ public class LocalInputChannelTest { if (channelIndexes.size() > 0) { final int channelIndex = channelIndexes.remove(0); BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking(); - bufferBuilder.append(ByteBuffer.wrap(new byte[4])); + bufferBuilder.appendAndCommit(ByteBuffer.wrap(new byte[4])); bufferBuilder.finish(); return new BufferConsumerAndChannel(bufferBuilder.createBufferConsumer(), channelIndex); }