This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51f53d8aeb7b9b4db8f3f4dde0c0f6ab6b626d2f
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Sep 13 12:24:37 2018 +0200

    [hotfix][network] minor optimisations and clarifications around 
BufferBuilder and BufferConsumer
---
 .../runtime/io/network/buffer/BufferBuilder.java   | 25 +++++++++++-----------
 .../runtime/io/network/buffer/BufferConsumer.java  | 23 ++++++++++++++------
 2 files changed, 28 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 305f184..6fb067e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -104,9 +104,9 @@ public class BufferBuilder {
         * @return number of written bytes.
         */
        public int finish() {
-               positionMarker.markFinished();
+               int writtenBytes = positionMarker.markFinished();
                commit();
-               return getWrittenBytes();
+               return writtenBytes;
        }
 
        public boolean isFinished() {
@@ -118,18 +118,10 @@ public class BufferBuilder {
                return positionMarker.getCached() == getMaxCapacity();
        }
 
-       public boolean isEmpty() {
-               return positionMarker.getCached() == 0;
-       }
-
        public int getMaxCapacity() {
                return memorySegment.size();
        }
 
-       private int getWrittenBytes() {
-               return positionMarker.getCached();
-       }
-
        /**
         * Holds a reference to the current writer position. Negative values 
indicate that writer ({@link BufferBuilder}
         * has finished. Value {@code Integer.MIN_VALUE} represents finished 
empty buffer.
@@ -156,7 +148,7 @@ public class BufferBuilder {
         * Cached writing implementation of {@link PositionMarker}.
         *
         * <p>Writer ({@link BufferBuilder}) and reader ({@link 
BufferConsumer}) caches must be implemented independently
-        * of one another - for example the cached values can not accidentally 
leak from one to another.
+        * of one another - so that the cached values can not accidentally leak 
from one to another.
         *
         * <p>Remember to commit the {@link SettablePositionMarker} to make the 
changes visible.
         */
@@ -181,12 +173,19 @@ public class BufferBuilder {
                        return PositionMarker.getAbsolute(cachedPosition);
                }
 
-               public void markFinished() {
-                       int newValue = -getCached();
+               /**
+                * Marks this position as finished and returns the current 
position.
+                *
+                * @return current position as of {@link #getCached()}
+                */
+               public int markFinished() {
+                       int currentPosition = getCached();
+                       int newValue = -currentPosition;
                        if (newValue == 0) {
                                newValue = FINISHED_EMPTY;
                        }
                        set(newValue);
+                       return currentPosition;
                }
 
                public void move(int offset) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index f368ff0..3117fe5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -42,7 +42,7 @@ public class BufferConsumer implements Closeable {
 
        private final CachedPositionMarker writerPosition;
 
-       private int currentReaderPosition = 0;
+       private int currentReaderPosition;
 
        /**
         * Constructs {@link BufferConsumer} instance with content that can be 
changed by {@link BufferBuilder}.
@@ -74,6 +74,14 @@ public class BufferConsumer implements Closeable {
                this.currentReaderPosition = currentReaderPosition;
        }
 
+       /**
+        * Checks whether the {@link BufferBuilder} has already been finished.
+        *
+        * <p>BEWARE: this method accesses the cached value of the position 
marker which is only updated
+        * after calls to {@link #build()}!
+        *
+        * @return <tt>true</tt> if the buffer was finished, <tt>false</tt> 
otherwise
+        */
        public boolean isFinished() {
                return writerPosition.isFinished();
        }
@@ -84,16 +92,17 @@ public class BufferConsumer implements Closeable {
         */
        public Buffer build() {
                writerPosition.update();
-               Buffer slice = buffer.readOnlySlice(currentReaderPosition, 
writerPosition.getCached() - currentReaderPosition);
-               currentReaderPosition = writerPosition.getCached();
+               int cachedWriterPosition = writerPosition.getCached();
+               Buffer slice = buffer.readOnlySlice(currentReaderPosition, 
cachedWriterPosition - currentReaderPosition);
+               currentReaderPosition = cachedWriterPosition;
                return slice.retainBuffer();
        }
 
        /**
         * Returns a retained copy with separate indexes. This allows to read 
from the same {@link MemorySegment} twice.
         *
-        * <p>WARNING: newly returned {@link BufferConsumer} will have reader 
index copied from the original buffer. In
-        * other words, data already consumed before copying will not be 
visible to the returned copies.
+        * <p>WARNING: the newly returned {@link BufferConsumer} will have its 
reader index copied from the original buffer.
+        * In other words, data already consumed before copying will not be 
visible to the returned copies.
         *
         * @return a retained copy of self with separate indexes
         */
@@ -124,7 +133,7 @@ public class BufferConsumer implements Closeable {
         * Cached reading wrapper around {@link PositionMarker}.
         *
         * <p>Writer ({@link BufferBuilder}) and reader ({@link 
BufferConsumer}) caches must be implemented independently
-        * of one another - for example the cached values can not accidentally 
leak from one to another.
+        * of one another - so that the cached values can not accidentally leak 
from one to another.
         */
        private static class CachedPositionMarker {
                private final PositionMarker positionMarker;
@@ -134,7 +143,7 @@ public class BufferConsumer implements Closeable {
                 */
                private int cachedPosition;
 
-               public CachedPositionMarker(PositionMarker positionMarker) {
+               CachedPositionMarker(PositionMarker positionMarker) {
                        this.positionMarker = checkNotNull(positionMarker);
                        update();
                }

Reply via email to