This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 51f53d8aeb7b9b4db8f3f4dde0c0f6ab6b626d2f Author: Nico Kruber <[email protected]> AuthorDate: Thu Sep 13 12:24:37 2018 +0200 [hotfix][network] minor optimisations and clarifications around BufferBuilder and BufferConsumer --- .../runtime/io/network/buffer/BufferBuilder.java | 25 +++++++++++----------- .../runtime/io/network/buffer/BufferConsumer.java | 23 ++++++++++++++------ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java index 305f184..6fb067e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java @@ -104,9 +104,9 @@ public class BufferBuilder { * @return number of written bytes. */ public int finish() { - positionMarker.markFinished(); + int writtenBytes = positionMarker.markFinished(); commit(); - return getWrittenBytes(); + return writtenBytes; } public boolean isFinished() { @@ -118,18 +118,10 @@ public class BufferBuilder { return positionMarker.getCached() == getMaxCapacity(); } - public boolean isEmpty() { - return positionMarker.getCached() == 0; - } - public int getMaxCapacity() { return memorySegment.size(); } - private int getWrittenBytes() { - return positionMarker.getCached(); - } - /** * Holds a reference to the current writer position. Negative values indicate that writer ({@link BufferBuilder} * has finished. Value {@code Integer.MIN_VALUE} represents finished empty buffer. @@ -156,7 +148,7 @@ public class BufferBuilder { * Cached writing implementation of {@link PositionMarker}. * * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently - * of one another - for example the cached values can not accidentally leak from one to another. + * of one another - so that the cached values can not accidentally leak from one to another. * * <p>Remember to commit the {@link SettablePositionMarker} to make the changes visible. */ @@ -181,12 +173,19 @@ public class BufferBuilder { return PositionMarker.getAbsolute(cachedPosition); } - public void markFinished() { - int newValue = -getCached(); + /** + * Marks this position as finished and returns the current position. + * + * @return current position as of {@link #getCached()} + */ + public int markFinished() { + int currentPosition = getCached(); + int newValue = -currentPosition; if (newValue == 0) { newValue = FINISHED_EMPTY; } set(newValue); + return currentPosition; } public void move(int offset) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java index f368ff0..3117fe5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java @@ -42,7 +42,7 @@ public class BufferConsumer implements Closeable { private final CachedPositionMarker writerPosition; - private int currentReaderPosition = 0; + private int currentReaderPosition; /** * Constructs {@link BufferConsumer} instance with content that can be changed by {@link BufferBuilder}. @@ -74,6 +74,14 @@ public class BufferConsumer implements Closeable { this.currentReaderPosition = currentReaderPosition; } + /** + * Checks whether the {@link BufferBuilder} has already been finished. + * + * <p>BEWARE: this method accesses the cached value of the position marker which is only updated + * after calls to {@link #build()}! + * + * @return <tt>true</tt> if the buffer was finished, <tt>false</tt> otherwise + */ public boolean isFinished() { return writerPosition.isFinished(); } @@ -84,16 +92,17 @@ public class BufferConsumer implements Closeable { */ public Buffer build() { writerPosition.update(); - Buffer slice = buffer.readOnlySlice(currentReaderPosition, writerPosition.getCached() - currentReaderPosition); - currentReaderPosition = writerPosition.getCached(); + int cachedWriterPosition = writerPosition.getCached(); + Buffer slice = buffer.readOnlySlice(currentReaderPosition, cachedWriterPosition - currentReaderPosition); + currentReaderPosition = cachedWriterPosition; return slice.retainBuffer(); } /** * Returns a retained copy with separate indexes. This allows to read from the same {@link MemorySegment} twice. * - * <p>WARNING: newly returned {@link BufferConsumer} will have reader index copied from the original buffer. In - * other words, data already consumed before copying will not be visible to the returned copies. + * <p>WARNING: the newly returned {@link BufferConsumer} will have its reader index copied from the original buffer. + * In other words, data already consumed before copying will not be visible to the returned copies. * * @return a retained copy of self with separate indexes */ @@ -124,7 +133,7 @@ public class BufferConsumer implements Closeable { * Cached reading wrapper around {@link PositionMarker}. * * <p>Writer ({@link BufferBuilder}) and reader ({@link BufferConsumer}) caches must be implemented independently - * of one another - for example the cached values can not accidentally leak from one to another. + * of one another - so that the cached values can not accidentally leak from one to another. */ private static class CachedPositionMarker { private final PositionMarker positionMarker; @@ -134,7 +143,7 @@ public class BufferConsumer implements Closeable { */ private int cachedPosition; - public CachedPositionMarker(PositionMarker positionMarker) { + CachedPositionMarker(PositionMarker positionMarker) { this.positionMarker = checkNotNull(positionMarker); update(); }
