[FLINK-8178][network] Introduce not threadsafe write only BufferBuilder

While Buffer class is used in multithreaded context it requires synchronisation.
Previously it was miss-leading and unclear, suggesting that RecordSerializer 
should
take into account synchronisation of the Buffer that's holding. With 
NotThreadSafe
BufferBuilder there is now clear separation between single-threaded 
writing/creating
a BufferBuilder and multithreaded Buffer handling/retaining/recycling.

This increases throughput of network stack by factor of 2, because previously
method getMemorySegment() was called twice per record and it is a synchronized
method on recycleLock, while RecordSerializer is sole owner of the Buffer at
this point, so synchronisation is not needed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6945c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6945c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6945c2e

Branch: refs/heads/master
Commit: c6945c2ef48d4c2cad3fc935435c1ab83e834969
Parents: 0888bb6
Author: Piotr Nowojski <[email protected]>
Authored: Tue Nov 28 16:49:37 2017 +0100
Committer: Stefan Richter <[email protected]>
Committed: Mon Jan 8 11:46:00 2018 +0100

----------------------------------------------------------------------
 .../api/serialization/RecordSerializer.java     |  17 +--
 .../serialization/SpanningRecordSerializer.java |  57 ++++-----
 .../io/network/api/writer/RecordWriter.java     |  10 +-
 .../flink/runtime/io/network/buffer/Buffer.java |   7 +-
 .../io/network/buffer/BufferBuilder.java        |  82 +++++++++++++
 .../io/network/buffer/BufferProvider.java       |   8 ++
 .../io/network/buffer/LocalBufferPool.java      |  15 ++-
 .../SpanningRecordSerializationTest.java        |  11 +-
 .../SpanningRecordSerializerTest.java           |  26 ++---
 .../io/network/api/writer/RecordWriterTest.java |  37 ++++--
 .../io/network/buffer/BufferBuilderTest.java    | 115 +++++++++++++++++++
 .../network/buffer/BufferBuilderTestUtils.java  |  32 ++++++
 .../IteratorWrappingTestSingleInputGate.java    |   9 +-
 .../network/serialization/LargeRecordsTest.java |  23 ++--
 .../network/util/TestPooledBufferProvider.java  |   7 ++
 .../consumer/StreamTestSingleInputGate.java     |  10 +-
 16 files changed, 352 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 5fe56c4..6a07f31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -19,11 +19,12 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+
+import java.io.IOException;
 
 /**
  * Interface for turning records into sequences of memory segments.
@@ -79,19 +80,19 @@ public interface RecordSerializer<T extends 
IOReadableWritable> {
         * Sets a (next) target buffer to use and continues writing remaining 
data
         * to it until it is full.
         *
-        * @param buffer the new target buffer to use
+        * @param bufferBuilder the new target buffer to use
         * @return how much information was written to the target buffer and
         *         whether this buffer is full
         * @throws IOException
         */
-       SerializationResult setNextBuffer(Buffer buffer) throws IOException;
+       SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) 
throws IOException;
 
        /**
         * Retrieves the current target buffer and sets its size to the actual
         * number of written bytes.
         *
         * After calling this method, a new target buffer is required to 
continue
-        * writing (see {@link #setNextBuffer(Buffer)}).
+        * writing (see {@link #setNextBufferBuilder(BufferBuilder)}).
         *
         * @return the target buffer that was used
         */
@@ -102,7 +103,7 @@ public interface RecordSerializer<T extends 
IOReadableWritable> {
         *
         * <p><strong>NOTE:</strong> After calling this method, <strong>a new 
target
         * buffer is required to continue writing</strong> (see
-        * {@link #setNextBuffer(Buffer)}).</p>
+        * {@link #setNextBufferBuilder(BufferBuilder)}).</p>
         */
        void clearCurrentBuffer();
 
@@ -112,7 +113,7 @@ public interface RecordSerializer<T extends 
IOReadableWritable> {
         *
         * <p><strong>NOTE:</strong> After calling this method, a <strong>new 
record
         * and a new target buffer is required to start writing again</strong>
-        * (see {@link #setNextBuffer(Buffer)}). If you want to continue
+        * (see {@link #setNextBufferBuilder(BufferBuilder)}). If you want to 
continue
         * with the current record, use {@link #clearCurrentBuffer()} 
instead.</p>
         */
        void clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 7394f83..efdfaa1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -18,20 +18,23 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 /**
  * Record serializer which serializes the complete record to an intermediate
  * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link #setNextBuffer(Buffer)}.
+ * one-by-one using {@link #setNextBufferBuilder(BufferBuilder)}.
  *
  * @param <T>
  */
@@ -50,13 +53,8 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        private final ByteBuffer lengthBuffer;
 
        /** Current target {@link Buffer} of the serializer */
-       private Buffer targetBuffer;
-
-       /** Position in current {@link MemorySegment} of target buffer */
-       private int position;
-
-       /** Limit of current {@link MemorySegment} of target buffer */
-       private int limit;
+       @Nullable
+       private BufferBuilder targetBuffer;
 
        public SpanningRecordSerializer() {
                serializationBuffer = new DataOutputSerializer(128);
@@ -64,7 +62,7 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                lengthBuffer = ByteBuffer.allocate(4);
                lengthBuffer.order(ByteOrder.BIG_ENDIAN);
 
-               // ensure initial state with hasRemaining false (for correct 
setNextBuffer logic)
+               // ensure initial state with hasRemaining false (for correct 
setNextBufferBuilder logic)
                dataBuffer = serializationBuffer.wrapAsByteBuffer();
                lengthBuffer.position(4);
        }
@@ -105,10 +103,8 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        }
 
        @Override
-       public SerializationResult setNextBuffer(Buffer buffer) throws 
IOException {
+       public SerializationResult setNextBufferBuilder(BufferBuilder buffer) 
throws IOException {
                targetBuffer = buffer;
-               position = 0;
-               limit = buffer.getSize();
 
                if (lengthBuffer.hasRemaining()) {
                        copyToTargetBufferFrom(lengthBuffer);
@@ -140,19 +136,12 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                if (targetBuffer == null) {
                        return;
                }
-
-               int needed = source.remaining();
-               int available = limit - position;
-               int toCopy = Math.min(needed, available);
-
-               targetBuffer.getMemorySegment().put(position, source, toCopy);
-
-               position += toCopy;
+               targetBuffer.append(source);
        }
 
        private SerializationResult getSerializationResult() {
                if (!dataBuffer.hasRemaining() && !lengthBuffer.hasRemaining()) 
{
-                       return (position < limit)
+                       return !targetBuffer.isFull()
                                        ? SerializationResult.FULL_RECORD
                                        : 
SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
                }
@@ -165,25 +154,21 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                if (targetBuffer == null) {
                        return null;
                }
-
-               targetBuffer.setSize(position);
-               return targetBuffer;
+               Buffer result = targetBuffer.build();
+               targetBuffer = null;
+               return result;
        }
 
        @Override
        public void clearCurrentBuffer() {
                targetBuffer = null;
-               position = 0;
-               limit = 0;
        }
 
        @Override
        public void clear() {
                targetBuffer = null;
-               position = 0;
-               limit = 0;
 
-               // ensure clear state with hasRemaining false (for correct 
setNextBuffer logic)
+               // ensure clear state with hasRemaining false (for correct 
setNextBufferBuilder logic)
                dataBuffer.position(dataBuffer.limit());
                lengthBuffer.position(4);
        }
@@ -191,7 +176,7 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        @Override
        public boolean hasData() {
                // either data in current target buffer or intermediate buffers
-               return position > 0 || (lengthBuffer.hasRemaining() || 
dataBuffer.hasRemaining());
+               return (targetBuffer != null && !targetBuffer.isEmpty()) || 
lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4729800..39dbacc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -21,12 +21,13 @@ package org.apache.flink.runtime.io.network.api.writer;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.XORShiftRandom;
 
 import java.io.IOException;
@@ -129,8 +130,9 @@ public class RecordWriter<T extends IOReadableWritable> {
                                                break;
                                        }
                                } else {
-                                       buffer = 
targetPartition.getBufferProvider().requestBufferBlocking();
-                                       result = 
serializer.setNextBuffer(buffer);
+                                       BufferBuilder bufferBuilder =
+                                               
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
+                                       result = 
serializer.setNextBufferBuilder(bufferBuilder);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index d7980d2..33516bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -54,11 +54,14 @@ public class Buffer {
        }
 
        public Buffer(MemorySegment memorySegment, BufferRecycler recycler, 
boolean isBuffer) {
+               this(memorySegment, recycler, isBuffer, memorySegment.size());
+       }
+
+       public Buffer(MemorySegment memorySegment, BufferRecycler recycler, 
boolean isBuffer, int size) {
                this.memorySegment = checkNotNull(memorySegment);
                this.recycler = checkNotNull(recycler);
                this.isBuffer = isBuffer;
-
-               this.currentSize = memorySegment.size();
+               this.currentSize = size;
        }
 
        public boolean isBuffer() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
new file mode 100644
index 0000000..08e49b5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for filling in the initial content of the {@link 
Buffer}. Once writing to the builder
+ * is complete, {@link Buffer} instance can be built and shared across 
multiple threads.
+ */
+@NotThreadSafe
+public class BufferBuilder {
+       private final MemorySegment memorySegment;
+
+       private final BufferRecycler recycler;
+
+       private int position = 0;
+
+       private boolean built = false;
+
+       public BufferBuilder(MemorySegment memorySegment, BufferRecycler 
recycler) {
+               this.memorySegment = checkNotNull(memorySegment);
+               this.recycler = checkNotNull(recycler);
+       }
+
+       /**
+        * @return number of copied bytes
+        */
+       public int append(ByteBuffer source) {
+               checkState(!built);
+
+               int needed = source.remaining();
+               int available = limit() - position;
+               int toCopy = Math.min(needed, available);
+
+               memorySegment.put(position, source, toCopy);
+               position += toCopy;
+               return toCopy;
+       }
+
+       public boolean isFull() {
+               checkState(position <= limit());
+               return position == limit();
+       }
+
+       public Buffer build() {
+               checkState(!built);
+               built = true;
+               return new Buffer(memorySegment, recycler, true, position);
+       }
+
+       public boolean isEmpty() {
+               return position == 0;
+       }
+
+       private int limit() {
+               return memorySegment.size();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index 9782584..843a2f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -44,6 +44,14 @@ public interface BufferProvider {
        Buffer requestBufferBlocking() throws IOException, InterruptedException;
 
        /**
+        * Returns a {@link BufferBuilder} instance from the buffer provider.
+        *
+        * <p>If there is no buffer available, the call will block until one 
becomes available again or the
+        * buffer provider has been destroyed.
+        */
+       BufferBuilder requestBufferBuilderBlocking() throws IOException, 
InterruptedException;
+
+       /**
         * Adds a buffer availability listener to the buffer provider.
         *
         * <p>The operation fails with return value <code>false</code>, when 
there is a buffer available or

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a66373c..7403bd3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -179,7 +179,8 @@ class LocalBufferPool implements BufferPool {
        @Override
        public Buffer requestBuffer() throws IOException {
                try {
-                       return requestBuffer(false);
+                       BufferBuilder bufferBuilder = 
requestBufferBuilder(false);
+                       return bufferBuilder != null ? bufferBuilder.build() : 
null;
                }
                catch (InterruptedException e) {
                        throw new IOException(e);
@@ -188,10 +189,16 @@ class LocalBufferPool implements BufferPool {
 
        @Override
        public Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
-               return requestBuffer(true);
+               BufferBuilder bufferBuilder = requestBufferBuilder(true);
+               return bufferBuilder != null ? bufferBuilder.build() : null;
        }
 
-       private Buffer requestBuffer(boolean isBlocking) throws 
InterruptedException, IOException {
+       @Override
+       public BufferBuilder requestBufferBuilderBlocking() throws IOException, 
InterruptedException {
+               return requestBufferBuilder(true);
+       }
+
+       private BufferBuilder requestBufferBuilder(boolean isBlocking) throws 
InterruptedException, IOException {
                synchronized (availableMemorySegments) {
                        returnExcessMemorySegments();
 
@@ -226,7 +233,7 @@ class LocalBufferPool implements BufferPool {
                                }
                        }
 
-                       return new Buffer(availableMemorySegments.poll(), this);
+                       return new 
BufferBuilder(availableMemorySegments.poll(), this);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index f988c55..ed0ce6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -19,19 +19,16 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayDeque;
 
-import static org.mockito.Mockito.mock;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
 public class SpanningRecordSerializationTest {
 
@@ -129,13 +126,11 @@ public class SpanningRecordSerializationTest {
        {
                final int SERIALIZATION_OVERHEAD = 4; // length encoding
 
-               final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), 
mock(BufferRecycler.class));
-
                final ArrayDeque<SerializationTestType> serializedRecords = new 
ArrayDeque<SerializationTestType>();
 
                // 
-------------------------------------------------------------------------------------------------------------
 
-               serializer.setNextBuffer(buffer);
+               
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 
                int numBytes = 0;
                int numRecords = 0;
@@ -164,7 +159,7 @@ public class SpanningRecordSerializationTest {
                                        }
                                }
 
-                               while 
(serializer.setNextBuffer(buffer).isFullBuffer()) {
+                               while 
(serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer())
 {
                                        
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 segmentSize);
                                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index fe9a386..ed6677e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -21,19 +21,17 @@ package 
org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import 
org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Random;
 
-import static org.mockito.Mockito.mock;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
 public class SpanningRecordSerializerTest {
 
@@ -42,7 +40,6 @@ public class SpanningRecordSerializerTest {
                final int SEGMENT_SIZE = 16;
 
                final SpanningRecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
-               final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), 
mock(BufferRecycler.class));
                final SerializationTestType randomIntRecord = 
Util.randomRecord(SerializationTestTypeFactory.INT);
 
                Assert.assertFalse(serializer.hasData());
@@ -51,13 +48,13 @@ public class SpanningRecordSerializerTest {
                        serializer.addRecord(randomIntRecord);
                        Assert.assertTrue(serializer.hasData());
 
-                       serializer.setNextBuffer(buffer);
+                       
serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
                        Assert.assertTrue(serializer.hasData());
 
                        serializer.clear();
                        Assert.assertFalse(serializer.hasData());
 
-                       serializer.setNextBuffer(buffer);
+                       
serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
 
                        serializer.addRecord(randomIntRecord);
                        Assert.assertTrue(serializer.hasData());
@@ -76,10 +73,11 @@ public class SpanningRecordSerializerTest {
                final int SEGMENT_SIZE = 11;
 
                final SpanningRecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
-               final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), 
mock(BufferRecycler.class));
 
                try {
-                       
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, 
serializer.setNextBuffer(buffer));
+                       Assert.assertEquals(
+                               
RecordSerializer.SerializationResult.FULL_RECORD,
+                               
serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)));
                } catch (IOException e) {
                        e.printStackTrace();
                }
@@ -122,7 +120,7 @@ public class SpanningRecordSerializerTest {
                        result = serializer.addRecord(emptyRecord);
                        
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
-                       result = serializer.setNextBuffer(buffer);
+                       result = 
serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
                        
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
                }
                catch (Exception e) {
@@ -202,11 +200,10 @@ public class SpanningRecordSerializerTest {
                final int SERIALIZATION_OVERHEAD = 4; // length encoding
 
                final SpanningRecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
-               final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), 
mock(BufferRecycler.class));
 
                // 
-------------------------------------------------------------------------------------------------------------
 
-               serializer.setNextBuffer(buffer);
+               
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 
                int numBytes = 0;
                for (SerializationTestType record : records) {
@@ -217,14 +214,15 @@ public class SpanningRecordSerializerTest {
                                
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
                        } else if (numBytes == segmentSize) {
                                
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL,
 result);
-                               serializer.setNextBuffer(buffer);
+                               
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
                                numBytes = 0;
                        } else {
                                
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
 result);
 
                                while (result.isFullBuffer()) {
                                        numBytes -= segmentSize;
-                                       result = 
serializer.setNextBuffer(buffer);
+
+                                       result = 
serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 59b98a2..f0eaa94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -31,11 +31,12 @@ import 
org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -54,6 +55,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -70,7 +73,6 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -97,16 +99,18 @@ public class RecordWriterTest {
 
                        final CountDownLatch sync = new CountDownLatch(2);
 
-                       final Buffer buffer = 
spy(TestBufferFactory.createBuffer(4));
+                       final TrackingBufferRecycler recycler = new 
TrackingBufferRecycler();
+
+                       final MemorySegment memorySegment = 
MemorySegmentFactory.allocateUnpooledSegment(4);
 
                        // Return buffer for first request, but block for all 
following requests.
-                       Answer<Buffer> request = new Answer<Buffer>() {
+                       Answer<BufferBuilder> request = new 
Answer<BufferBuilder>() {
                                @Override
-                               public Buffer answer(InvocationOnMock 
invocation) throws Throwable {
+                               public BufferBuilder answer(InvocationOnMock 
invocation) throws Throwable {
                                        sync.countDown();
 
                                        if (sync.getCount() == 1) {
-                                               return buffer;
+                                               return new 
BufferBuilder(memorySegment, recycler);
                                        }
 
                                        final Object o = new Object();
@@ -119,7 +123,7 @@ public class RecordWriterTest {
                        };
 
                        BufferProvider bufferProvider = 
mock(BufferProvider.class);
-                       
when(bufferProvider.requestBufferBlocking()).thenAnswer(request);
+                       
when(bufferProvider.requestBufferBuilderBlocking()).thenAnswer(request);
 
                        ResultPartitionWriter partitionWriter = 
createResultPartitionWriter(bufferProvider);
 
@@ -156,13 +160,13 @@ public class RecordWriterTest {
                        recordWriter.clearBuffers();
 
                        // Verify that buffer have been requested, but only one 
has been written out.
-                       verify(bufferProvider, 
times(2)).requestBufferBlocking();
+                       verify(bufferProvider, 
times(2)).requestBufferBuilderBlocking();
                        verify(partitionWriter, 
times(1)).writeBuffer(any(Buffer.class), anyInt());
 
                        // Verify that the written out buffer has only been 
recycled once
                        // (by the partition writer).
-                       assertTrue("Buffer not recycled.", buffer.isRecycled());
-                       verify(buffer, times(1)).recycle();
+                       assertEquals(1, 
recycler.getRecycledMemorySegments().size());
+                       assertEquals(memorySegment, 
recycler.getRecycledMemorySegments().get(0));
                }
                finally {
                        if (executor != null) {
@@ -566,4 +570,17 @@ public class RecordWriterTest {
                        return nextChannel;
                }
        }
+
+       private static class TrackingBufferRecycler implements BufferRecycler {
+               private final ArrayList<MemorySegment> recycledMemorySegments = 
new ArrayList<>();
+
+               @Override
+               public synchronized void recycle(MemorySegment memorySegment) {
+                       recycledMemorySegments.add(memorySegment);
+               }
+
+               public synchronized List<MemorySegment> 
getRecycledMemorySegments() {
+                       return recycledMemorySegments;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
new file mode 100644
index 0000000..3805274
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link BufferBuilder}.
+ */
+public class BufferBuilderTest {
+       private static final int BUFFER_SIZE = 10 * Integer.BYTES;
+
+       @Test
+       public void append() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               int[] intsToWrite = new int[] {0, 1, 2, 3, 42};
+               ByteBuffer bytesToWrite = toByteBuffer(intsToWrite);
+
+               assertEquals(bytesToWrite.limit(), 
bufferBuilder.append(bytesToWrite));
+
+               assertEquals(bytesToWrite.limit(), bytesToWrite.position());
+               assertFalse(bufferBuilder.isFull());
+               Buffer buffer = bufferBuilder.build();
+               assertBufferContent(buffer, intsToWrite);
+               assertEquals(5 * Integer.BYTES, buffer.getSize());
+               assertEquals(DiscardingRecycler.INSTANCE, buffer.getRecycler());
+       }
+
+       @Test
+       public void multipleAppends() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+
+               bufferBuilder.append(toByteBuffer(0, 1));
+               bufferBuilder.append(toByteBuffer(2));
+               bufferBuilder.append(toByteBuffer(3, 42));
+
+               Buffer buffer = bufferBuilder.build();
+               assertBufferContent(buffer, 0, 1, 2, 3, 42);
+               assertEquals(5 * Integer.BYTES, buffer.getSize());
+       }
+
+       @Test
+       public void appendOverSize() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 
8, 9, 42);
+
+               assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite));
+
+               assertTrue(bufferBuilder.isFull());
+               Buffer buffer = bufferBuilder.build();
+               assertBufferContent(buffer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+               assertEquals(BUFFER_SIZE, buffer.getSize());
+
+               bufferBuilder = createBufferBuilder();
+               assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite));
+
+               assertFalse(bufferBuilder.isFull());
+               buffer = bufferBuilder.build();
+               assertBufferContent(buffer, 42);
+               assertEquals(Integer.BYTES, buffer.getSize());
+       }
+
+       @Test
+       public void buildEmptyBuffer() {
+               Buffer buffer = createBufferBuilder().build();
+               assertEquals(0, buffer.getSize());
+               assertBufferContent(buffer);
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void buildingBufferTwice() {
+               BufferBuilder bufferBuilder = createBufferBuilder();
+               bufferBuilder.build();
+               bufferBuilder.build();
+       }
+
+       private static ByteBuffer toByteBuffer(int... data) {
+               ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * 
Integer.BYTES);
+               byteBuffer.asIntBuffer().put(data);
+               return byteBuffer;
+       }
+
+       private static void assertBufferContent(Buffer actualBuffer, int... 
expected) {
+               assertEquals(toByteBuffer(expected), 
actualBuffer.getNioBuffer());
+       }
+
+       private static BufferBuilder createBufferBuilder() {
+               return new 
BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), 
DiscardingRecycler.INSTANCE);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
new file mode 100644
index 0000000..1113664
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+/**
+ * Utility class for create not-pooled {@link BufferBuilder}.
+ */
+public class BufferBuilderTestUtils {
+       public static BufferBuilder createBufferBuilder(int size) {
+               return new BufferBuilder(
+                       MemorySegmentFactory.allocateUnpooledSegment(size),
+                       FreeingBufferRecycler.INSTANCE);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index fa44393..16285b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -19,22 +19,20 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 
-import static org.mockito.Mockito.mock;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.mockito.Mockito.when;
 
 public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> 
extends TestSingleInputGate {
@@ -71,8 +69,7 @@ public class IteratorWrappingTestSingleInputGate<T extends 
IOReadableWritable> e
                        @Override
                        public InputChannel.BufferAndAvailability 
answer(InvocationOnMock invocationOnMock) throws Throwable {
                                if (hasData) {
-                                       final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), 
mock(BufferRecycler.class));
-                                       serializer.setNextBuffer(buffer);
+                                       
serializer.setNextBufferBuilder(createBufferBuilder(bufferSize));
                                        serializer.addRecord(reuse);
 
                                        hasData = inputIterator.next(reuse) != 
null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index d596863..057b917 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -18,28 +18,27 @@
 
 package org.apache.flink.runtime.io.network.serialization;
 
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.testutils.serialization.types.IntType;
-import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
+import org.apache.flink.testutils.serialization.types.IntType;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 public class LargeRecordsTest {
 
@@ -52,8 +51,6 @@ public class LargeRecordsTest {
                        final RecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
                        final RecordDeserializer<SerializationTestType> 
deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
 
-                       final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), 
mock(BufferRecycler.class));
-
                        List<SerializationTestType> originalRecords = new 
ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
                        List<SerializationTestType> deserializedRecords = new 
ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
                        
@@ -73,7 +70,7 @@ public class LargeRecordsTest {
 
                        // 
-------------------------------------------------------------------------------------------------------------
 
-                       serializer.setNextBuffer(buffer);
+                       
serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
                        
                        int numRecordsDeserialized = 0;
                        
@@ -98,7 +95,7 @@ public class LargeRecordsTest {
                                        }
 
                                        // move buffers as long as necessary 
(for long records)
-                                       while 
(serializer.setNextBuffer(buffer).isFullBuffer()) {
+                                       while 
(serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer())
 {
                                                
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 SEGMENT_SIZE);
                                        }
                                        
@@ -152,8 +149,6 @@ public class LargeRecordsTest {
                                        new 
SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
                                                        new String[] { 
System.getProperty("java.io.tmpdir") } );
 
-                       final Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), 
mock(BufferRecycler.class));
-
                        List<SerializationTestType> originalRecords = new 
ArrayList<>((NUM_RECORDS + 1) / 2);
                        List<SerializationTestType> deserializedRecords = new 
ArrayList<>((NUM_RECORDS + 1) / 2);
                        
@@ -173,7 +168,7 @@ public class LargeRecordsTest {
 
                        // 
-------------------------------------------------------------------------------------------------------------
 
-                       serializer.setNextBuffer(buffer);
+                       
serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
                        
                        int numRecordsDeserialized = 0;
                        
@@ -198,7 +193,7 @@ public class LargeRecordsTest {
                                        }
 
                                        // move buffers as long as necessary 
(for long records)
-                                       while 
(serializer.setNextBuffer(buffer).isFullBuffer()) {
+                                       while 
(serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer())
 {
                                                
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 SEGMENT_SIZE);
                                        }
                                        

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index cc52549..eb80578 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
@@ -79,6 +80,12 @@ public class TestPooledBufferProvider implements 
BufferProvider {
        }
 
        @Override
+       public BufferBuilder requestBufferBuilderBlocking() throws IOException, 
InterruptedException {
+               Buffer buffer = requestBufferBlocking();
+               return new BufferBuilder(buffer.getMemorySegment(), 
buffer.getRecycler());
+       }
+
+       @Override
        public boolean addBufferListener(BufferListener listener) {
                return bufferRecycler.registerListener(listener);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index e14430e..f19d59d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,14 +21,11 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -41,8 +38,8 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
@@ -104,11 +101,8 @@ public class StreamTestSingleInputGate<T> extends 
TestSingleInputGate {
                                                return new 
BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), 
false);
                                        } else if (input != null && 
input.isStreamRecord()) {
                                                Object inputElement = 
input.getStreamRecord();
-                                               final Buffer buffer = new 
Buffer(
-                                                       
MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
-                                                       
mock(BufferRecycler.class));
 
-                                               
recordSerializer.setNextBuffer(buffer);
+                                               
recordSerializer.setNextBufferBuilder(createBufferBuilder(bufferSize));
                                                
delegate.setInstance(inputElement);
                                                
recordSerializer.addRecord(delegate);
 

Reply via email to