[FLINK-8178][network] Introduce not threadsafe write only BufferBuilder While Buffer class is used in multithreaded context it requires synchronisation. Previously it was miss-leading and unclear, suggesting that RecordSerializer should take into account synchronisation of the Buffer that's holding. With NotThreadSafe BufferBuilder there is now clear separation between single-threaded writing/creating a BufferBuilder and multithreaded Buffer handling/retaining/recycling.
This increases throughput of network stack by factor of 2, because previously method getMemorySegment() was called twice per record and it is a synchronized method on recycleLock, while RecordSerializer is sole owner of the Buffer at this point, so synchronisation is not needed. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6945c2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6945c2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6945c2e Branch: refs/heads/master Commit: c6945c2ef48d4c2cad3fc935435c1ab83e834969 Parents: 0888bb6 Author: Piotr Nowojski <[email protected]> Authored: Tue Nov 28 16:49:37 2017 +0100 Committer: Stefan Richter <[email protected]> Committed: Mon Jan 8 11:46:00 2018 +0100 ---------------------------------------------------------------------- .../api/serialization/RecordSerializer.java | 17 +-- .../serialization/SpanningRecordSerializer.java | 57 ++++----- .../io/network/api/writer/RecordWriter.java | 10 +- .../flink/runtime/io/network/buffer/Buffer.java | 7 +- .../io/network/buffer/BufferBuilder.java | 82 +++++++++++++ .../io/network/buffer/BufferProvider.java | 8 ++ .../io/network/buffer/LocalBufferPool.java | 15 ++- .../SpanningRecordSerializationTest.java | 11 +- .../SpanningRecordSerializerTest.java | 26 ++--- .../io/network/api/writer/RecordWriterTest.java | 37 ++++-- .../io/network/buffer/BufferBuilderTest.java | 115 +++++++++++++++++++ .../network/buffer/BufferBuilderTestUtils.java | 32 ++++++ .../IteratorWrappingTestSingleInputGate.java | 9 +- .../network/serialization/LargeRecordsTest.java | 23 ++-- .../network/util/TestPooledBufferProvider.java | 7 ++ .../consumer/StreamTestSingleInputGate.java | 10 +- 16 files changed, 352 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java index 5fe56c4..6a07f31 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java @@ -19,11 +19,12 @@ package org.apache.flink.runtime.io.network.api.serialization; -import java.io.IOException; - import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; + +import java.io.IOException; /** * Interface for turning records into sequences of memory segments. @@ -79,19 +80,19 @@ public interface RecordSerializer<T extends IOReadableWritable> { * Sets a (next) target buffer to use and continues writing remaining data * to it until it is full. * - * @param buffer the new target buffer to use + * @param bufferBuilder the new target buffer to use * @return how much information was written to the target buffer and * whether this buffer is full * @throws IOException */ - SerializationResult setNextBuffer(Buffer buffer) throws IOException; + SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException; /** * Retrieves the current target buffer and sets its size to the actual * number of written bytes. * * After calling this method, a new target buffer is required to continue - * writing (see {@link #setNextBuffer(Buffer)}). + * writing (see {@link #setNextBufferBuilder(BufferBuilder)}). * * @return the target buffer that was used */ @@ -102,7 +103,7 @@ public interface RecordSerializer<T extends IOReadableWritable> { * * <p><strong>NOTE:</strong> After calling this method, <strong>a new target * buffer is required to continue writing</strong> (see - * {@link #setNextBuffer(Buffer)}).</p> + * {@link #setNextBufferBuilder(BufferBuilder)}).</p> */ void clearCurrentBuffer(); @@ -112,7 +113,7 @@ public interface RecordSerializer<T extends IOReadableWritable> { * * <p><strong>NOTE:</strong> After calling this method, a <strong>new record * and a new target buffer is required to start writing again</strong> - * (see {@link #setNextBuffer(Buffer)}). If you want to continue + * (see {@link #setNextBufferBuilder(BufferBuilder)}). If you want to continue * with the current record, use {@link #clearCurrentBuffer()} instead.</p> */ void clear(); http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index 7394f83..efdfaa1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -18,20 +18,23 @@ package org.apache.flink.runtime.io.network.api.serialization; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; /** * Record serializer which serializes the complete record to an intermediate * data serialization buffer and copies this buffer to target buffers - * one-by-one using {@link #setNextBuffer(Buffer)}. + * one-by-one using {@link #setNextBufferBuilder(BufferBuilder)}. * * @param <T> */ @@ -50,13 +53,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R private final ByteBuffer lengthBuffer; /** Current target {@link Buffer} of the serializer */ - private Buffer targetBuffer; - - /** Position in current {@link MemorySegment} of target buffer */ - private int position; - - /** Limit of current {@link MemorySegment} of target buffer */ - private int limit; + @Nullable + private BufferBuilder targetBuffer; public SpanningRecordSerializer() { serializationBuffer = new DataOutputSerializer(128); @@ -64,7 +62,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R lengthBuffer = ByteBuffer.allocate(4); lengthBuffer.order(ByteOrder.BIG_ENDIAN); - // ensure initial state with hasRemaining false (for correct setNextBuffer logic) + // ensure initial state with hasRemaining false (for correct setNextBufferBuilder logic) dataBuffer = serializationBuffer.wrapAsByteBuffer(); lengthBuffer.position(4); } @@ -105,10 +103,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R } @Override - public SerializationResult setNextBuffer(Buffer buffer) throws IOException { + public SerializationResult setNextBufferBuilder(BufferBuilder buffer) throws IOException { targetBuffer = buffer; - position = 0; - limit = buffer.getSize(); if (lengthBuffer.hasRemaining()) { copyToTargetBufferFrom(lengthBuffer); @@ -140,19 +136,12 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R if (targetBuffer == null) { return; } - - int needed = source.remaining(); - int available = limit - position; - int toCopy = Math.min(needed, available); - - targetBuffer.getMemorySegment().put(position, source, toCopy); - - position += toCopy; + targetBuffer.append(source); } private SerializationResult getSerializationResult() { if (!dataBuffer.hasRemaining() && !lengthBuffer.hasRemaining()) { - return (position < limit) + return !targetBuffer.isFull() ? SerializationResult.FULL_RECORD : SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL; } @@ -165,25 +154,21 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R if (targetBuffer == null) { return null; } - - targetBuffer.setSize(position); - return targetBuffer; + Buffer result = targetBuffer.build(); + targetBuffer = null; + return result; } @Override public void clearCurrentBuffer() { targetBuffer = null; - position = 0; - limit = 0; } @Override public void clear() { targetBuffer = null; - position = 0; - limit = 0; - // ensure clear state with hasRemaining false (for correct setNextBuffer logic) + // ensure clear state with hasRemaining false (for correct setNextBufferBuilder logic) dataBuffer.position(dataBuffer.limit()); lengthBuffer.position(4); } @@ -191,7 +176,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R @Override public boolean hasData() { // either data in current target buffer or intermediate buffers - return position > 0 || (lengthBuffer.hasRemaining() || dataBuffer.hasRemaining()); + return (targetBuffer != null && !targetBuffer.isEmpty()) || lengthBuffer.hasRemaining() || dataBuffer.hasRemaining(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 4729800..39dbacc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -21,12 +21,13 @@ package org.apache.flink.runtime.io.network.api.writer; import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.util.XORShiftRandom; import java.io.IOException; @@ -129,8 +130,9 @@ public class RecordWriter<T extends IOReadableWritable> { break; } } else { - buffer = targetPartition.getBufferProvider().requestBufferBlocking(); - result = serializer.setNextBuffer(buffer); + BufferBuilder bufferBuilder = + targetPartition.getBufferProvider().requestBufferBuilderBlocking(); + result = serializer.setNextBufferBuilder(bufferBuilder); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java index d7980d2..33516bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java @@ -54,11 +54,14 @@ public class Buffer { } public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) { + this(memorySegment, recycler, isBuffer, memorySegment.size()); + } + + public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer, int size) { this.memorySegment = checkNotNull(memorySegment); this.recycler = checkNotNull(recycler); this.isBuffer = isBuffer; - - this.currentSize = memorySegment.size(); + this.currentSize = size; } public boolean isBuffer() { http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java new file mode 100644 index 0000000..08e49b5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegment; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.nio.ByteBuffer; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Not thread safe class for filling in the initial content of the {@link Buffer}. Once writing to the builder + * is complete, {@link Buffer} instance can be built and shared across multiple threads. + */ +@NotThreadSafe +public class BufferBuilder { + private final MemorySegment memorySegment; + + private final BufferRecycler recycler; + + private int position = 0; + + private boolean built = false; + + public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) { + this.memorySegment = checkNotNull(memorySegment); + this.recycler = checkNotNull(recycler); + } + + /** + * @return number of copied bytes + */ + public int append(ByteBuffer source) { + checkState(!built); + + int needed = source.remaining(); + int available = limit() - position; + int toCopy = Math.min(needed, available); + + memorySegment.put(position, source, toCopy); + position += toCopy; + return toCopy; + } + + public boolean isFull() { + checkState(position <= limit()); + return position == limit(); + } + + public Buffer build() { + checkState(!built); + built = true; + return new Buffer(memorySegment, recycler, true, position); + } + + public boolean isEmpty() { + return position == 0; + } + + private int limit() { + return memorySegment.size(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java index 9782584..843a2f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java @@ -44,6 +44,14 @@ public interface BufferProvider { Buffer requestBufferBlocking() throws IOException, InterruptedException; /** + * Returns a {@link BufferBuilder} instance from the buffer provider. + * + * <p>If there is no buffer available, the call will block until one becomes available again or the + * buffer provider has been destroyed. + */ + BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException; + + /** * Adds a buffer availability listener to the buffer provider. * * <p>The operation fails with return value <code>false</code>, when there is a buffer available or http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index a66373c..7403bd3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -179,7 +179,8 @@ class LocalBufferPool implements BufferPool { @Override public Buffer requestBuffer() throws IOException { try { - return requestBuffer(false); + BufferBuilder bufferBuilder = requestBufferBuilder(false); + return bufferBuilder != null ? bufferBuilder.build() : null; } catch (InterruptedException e) { throw new IOException(e); @@ -188,10 +189,16 @@ class LocalBufferPool implements BufferPool { @Override public Buffer requestBufferBlocking() throws IOException, InterruptedException { - return requestBuffer(true); + BufferBuilder bufferBuilder = requestBufferBuilder(true); + return bufferBuilder != null ? bufferBuilder.build() : null; } - private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IOException { + @Override + public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException { + return requestBufferBuilder(true); + } + + private BufferBuilder requestBufferBuilder(boolean isBlocking) throws InterruptedException, IOException { synchronized (availableMemorySegments) { returnExcessMemorySegments(); @@ -226,7 +233,7 @@ class LocalBufferPool implements BufferPool { } } - return new Buffer(availableMemorySegments.poll(), this); + return new BufferBuilder(availableMemorySegments.poll(), this); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index f988c55..ed0ce6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -19,19 +19,16 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.testutils.serialization.types.SerializationTestType; import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; import org.apache.flink.testutils.serialization.types.Util; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.junit.Assert; import org.junit.Test; import java.util.ArrayDeque; -import static org.mockito.Mockito.mock; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; public class SpanningRecordSerializationTest { @@ -129,13 +126,11 @@ public class SpanningRecordSerializationTest { { final int SERIALIZATION_OVERHEAD = 4; // length encoding - final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class)); - final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>(); // ------------------------------------------------------------------------------------------------------------- - serializer.setNextBuffer(buffer); + serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); int numBytes = 0; int numRecords = 0; @@ -164,7 +159,7 @@ public class SpanningRecordSerializationTest { } } - while (serializer.setNextBuffer(buffer).isFullBuffer()) { + while (serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer()) { deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize); } http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java index fe9a386..ed6677e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java @@ -21,19 +21,17 @@ package org.apache.flink.runtime.io.network.api.serialization; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.testutils.serialization.types.SerializationTestType; import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; import org.apache.flink.testutils.serialization.types.Util; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; + import org.junit.Assert; import org.junit.Test; import java.io.IOException; import java.util.Random; -import static org.mockito.Mockito.mock; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; public class SpanningRecordSerializerTest { @@ -42,7 +40,6 @@ public class SpanningRecordSerializerTest { final int SEGMENT_SIZE = 16; final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); - final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class)); final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); Assert.assertFalse(serializer.hasData()); @@ -51,13 +48,13 @@ public class SpanningRecordSerializerTest { serializer.addRecord(randomIntRecord); Assert.assertTrue(serializer.hasData()); - serializer.setNextBuffer(buffer); + serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); Assert.assertTrue(serializer.hasData()); serializer.clear(); Assert.assertFalse(serializer.hasData()); - serializer.setNextBuffer(buffer); + serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); serializer.addRecord(randomIntRecord); Assert.assertTrue(serializer.hasData()); @@ -76,10 +73,11 @@ public class SpanningRecordSerializerTest { final int SEGMENT_SIZE = 11; final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); - final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class)); try { - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer)); + Assert.assertEquals( + RecordSerializer.SerializationResult.FULL_RECORD, + serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE))); } catch (IOException e) { e.printStackTrace(); } @@ -122,7 +120,7 @@ public class SpanningRecordSerializerTest { result = serializer.addRecord(emptyRecord); Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); - result = serializer.setNextBuffer(buffer); + result = serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } catch (Exception e) { @@ -202,11 +200,10 @@ public class SpanningRecordSerializerTest { final int SERIALIZATION_OVERHEAD = 4; // length encoding final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); - final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class)); // ------------------------------------------------------------------------------------------------------------- - serializer.setNextBuffer(buffer); + serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); int numBytes = 0; for (SerializationTestType record : records) { @@ -217,14 +214,15 @@ public class SpanningRecordSerializerTest { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); } else if (numBytes == segmentSize) { Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result); - serializer.setNextBuffer(buffer); + serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); numBytes = 0; } else { Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); while (result.isFullBuffer()) { numBytes -= segmentSize; - result = serializer.setNextBuffer(buffer); + + result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 59b98a2..f0eaa94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -31,11 +31,12 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; @@ -54,6 +55,8 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; import java.util.Random; import java.util.concurrent.Callable; @@ -70,7 +73,6 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -97,16 +99,18 @@ public class RecordWriterTest { final CountDownLatch sync = new CountDownLatch(2); - final Buffer buffer = spy(TestBufferFactory.createBuffer(4)); + final TrackingBufferRecycler recycler = new TrackingBufferRecycler(); + + final MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(4); // Return buffer for first request, but block for all following requests. - Answer<Buffer> request = new Answer<Buffer>() { + Answer<BufferBuilder> request = new Answer<BufferBuilder>() { @Override - public Buffer answer(InvocationOnMock invocation) throws Throwable { + public BufferBuilder answer(InvocationOnMock invocation) throws Throwable { sync.countDown(); if (sync.getCount() == 1) { - return buffer; + return new BufferBuilder(memorySegment, recycler); } final Object o = new Object(); @@ -119,7 +123,7 @@ public class RecordWriterTest { }; BufferProvider bufferProvider = mock(BufferProvider.class); - when(bufferProvider.requestBufferBlocking()).thenAnswer(request); + when(bufferProvider.requestBufferBuilderBlocking()).thenAnswer(request); ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider); @@ -156,13 +160,13 @@ public class RecordWriterTest { recordWriter.clearBuffers(); // Verify that buffer have been requested, but only one has been written out. - verify(bufferProvider, times(2)).requestBufferBlocking(); + verify(bufferProvider, times(2)).requestBufferBuilderBlocking(); verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt()); // Verify that the written out buffer has only been recycled once // (by the partition writer). - assertTrue("Buffer not recycled.", buffer.isRecycled()); - verify(buffer, times(1)).recycle(); + assertEquals(1, recycler.getRecycledMemorySegments().size()); + assertEquals(memorySegment, recycler.getRecycledMemorySegments().get(0)); } finally { if (executor != null) { @@ -566,4 +570,17 @@ public class RecordWriterTest { return nextChannel; } } + + private static class TrackingBufferRecycler implements BufferRecycler { + private final ArrayList<MemorySegment> recycledMemorySegments = new ArrayList<>(); + + @Override + public synchronized void recycle(MemorySegment memorySegment) { + recycledMemorySegments.add(memorySegment); + } + + public synchronized List<MemorySegment> getRecycledMemorySegments() { + return recycledMemorySegments; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java new file mode 100644 index 0000000..3805274 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.testutils.DiscardingRecycler; + +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link BufferBuilder}. + */ +public class BufferBuilderTest { + private static final int BUFFER_SIZE = 10 * Integer.BYTES; + + @Test + public void append() { + BufferBuilder bufferBuilder = createBufferBuilder(); + int[] intsToWrite = new int[] {0, 1, 2, 3, 42}; + ByteBuffer bytesToWrite = toByteBuffer(intsToWrite); + + assertEquals(bytesToWrite.limit(), bufferBuilder.append(bytesToWrite)); + + assertEquals(bytesToWrite.limit(), bytesToWrite.position()); + assertFalse(bufferBuilder.isFull()); + Buffer buffer = bufferBuilder.build(); + assertBufferContent(buffer, intsToWrite); + assertEquals(5 * Integer.BYTES, buffer.getSize()); + assertEquals(DiscardingRecycler.INSTANCE, buffer.getRecycler()); + } + + @Test + public void multipleAppends() { + BufferBuilder bufferBuilder = createBufferBuilder(); + + bufferBuilder.append(toByteBuffer(0, 1)); + bufferBuilder.append(toByteBuffer(2)); + bufferBuilder.append(toByteBuffer(3, 42)); + + Buffer buffer = bufferBuilder.build(); + assertBufferContent(buffer, 0, 1, 2, 3, 42); + assertEquals(5 * Integer.BYTES, buffer.getSize()); + } + + @Test + public void appendOverSize() { + BufferBuilder bufferBuilder = createBufferBuilder(); + ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 42); + + assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite)); + + assertTrue(bufferBuilder.isFull()); + Buffer buffer = bufferBuilder.build(); + assertBufferContent(buffer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + assertEquals(BUFFER_SIZE, buffer.getSize()); + + bufferBuilder = createBufferBuilder(); + assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite)); + + assertFalse(bufferBuilder.isFull()); + buffer = bufferBuilder.build(); + assertBufferContent(buffer, 42); + assertEquals(Integer.BYTES, buffer.getSize()); + } + + @Test + public void buildEmptyBuffer() { + Buffer buffer = createBufferBuilder().build(); + assertEquals(0, buffer.getSize()); + assertBufferContent(buffer); + } + + @Test(expected = IllegalStateException.class) + public void buildingBufferTwice() { + BufferBuilder bufferBuilder = createBufferBuilder(); + bufferBuilder.build(); + bufferBuilder.build(); + } + + private static ByteBuffer toByteBuffer(int... data) { + ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * Integer.BYTES); + byteBuffer.asIntBuffer().put(data); + return byteBuffer; + } + + private static void assertBufferContent(Buffer actualBuffer, int... expected) { + assertEquals(toByteBuffer(expected), actualBuffer.getNioBuffer()); + } + + private static BufferBuilder createBufferBuilder() { + return new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), DiscardingRecycler.INSTANCE); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java new file mode 100644 index 0000000..1113664 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +import org.apache.flink.core.memory.MemorySegmentFactory; + +/** + * Utility class for create not-pooled {@link BufferBuilder}. + */ +public class BufferBuilderTestUtils { + public static BufferBuilder createBufferBuilder(int size) { + return new BufferBuilder( + MemorySegmentFactory.allocateUnpooledSegment(size), + FreeingBufferRecycler.INSTANCE); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index fa44393..16285b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -19,22 +19,20 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.MutableObjectIterator; + import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; -import static org.mockito.Mockito.mock; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.mockito.Mockito.when; public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate { @@ -71,8 +69,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e @Override public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable { if (hasData) { - final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class)); - serializer.setNextBuffer(buffer); + serializer.setNextBufferBuilder(createBufferBuilder(bufferSize)); serializer.addRecord(reuse); hasData = inputIterator.next(reuse) != null; http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java index d596863..057b917 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java @@ -18,28 +18,27 @@ package org.apache.flink.runtime.io.network.serialization; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; -import org.apache.flink.testutils.serialization.types.IntType; -import org.apache.flink.testutils.serialization.types.SerializationTestType; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType; +import org.apache.flink.testutils.serialization.types.IntType; +import org.apache.flink.testutils.serialization.types.SerializationTestType; + import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.Random; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; public class LargeRecordsTest { @@ -52,8 +51,6 @@ public class LargeRecordsTest { final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>(); final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>(); - final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class)); - List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2); List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2); @@ -73,7 +70,7 @@ public class LargeRecordsTest { // ------------------------------------------------------------------------------------------------------------- - serializer.setNextBuffer(buffer); + serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); int numRecordsDeserialized = 0; @@ -98,7 +95,7 @@ public class LargeRecordsTest { } // move buffers as long as necessary (for long records) - while (serializer.setNextBuffer(buffer).isFullBuffer()) { + while (serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer()) { deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), SEGMENT_SIZE); } @@ -152,8 +149,6 @@ public class LargeRecordsTest { new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>( new String[] { System.getProperty("java.io.tmpdir") } ); - final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class)); - List<SerializationTestType> originalRecords = new ArrayList<>((NUM_RECORDS + 1) / 2); List<SerializationTestType> deserializedRecords = new ArrayList<>((NUM_RECORDS + 1) / 2); @@ -173,7 +168,7 @@ public class LargeRecordsTest { // ------------------------------------------------------------------------------------------------------------- - serializer.setNextBuffer(buffer); + serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)); int numRecordsDeserialized = 0; @@ -198,7 +193,7 @@ public class LargeRecordsTest { } // move buffers as long as necessary (for long records) - while (serializer.setNextBuffer(buffer).isFullBuffer()) { + while (serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer()) { deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), SEGMENT_SIZE); } http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java index cc52549..eb80578 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.util; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferListener; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; @@ -79,6 +80,12 @@ public class TestPooledBufferProvider implements BufferProvider { } @Override + public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException { + Buffer buffer = requestBufferBlocking(); + return new BufferBuilder(buffer.getMemorySegment(), buffer.getRecycler()); + } + + @Override public boolean addBufferListener(BufferListener listener) { return bufferRecycler.registerListener(listener); } http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index e14430e..f19d59d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -21,14 +21,11 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.plugable.SerializationDelegate; @@ -41,8 +38,8 @@ import org.mockito.stubbing.Answer; import java.io.IOException; import java.util.concurrent.ConcurrentLinkedQueue; +import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** @@ -104,11 +101,8 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false); } else if (input != null && input.isStreamRecord()) { Object inputElement = input.getStreamRecord(); - final Buffer buffer = new Buffer( - MemorySegmentFactory.allocateUnpooledSegment(bufferSize), - mock(BufferRecycler.class)); - recordSerializer.setNextBuffer(buffer); + recordSerializer.setNextBufferBuilder(createBufferBuilder(bufferSize)); delegate.setInstance(inputElement); recordSerializer.addRecord(delegate);
