[hotfix][network-tests] Simplify TestPooledBufferProvider
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/409ea231 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/409ea231 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/409ea231 Branch: refs/heads/master Commit: 409ea2314d780436a404b3d8b61768e20af7485a Parents: 91c72b9 Author: Piotr Nowojski <[email protected]> Authored: Mon Dec 4 15:08:12 2017 +0100 Committer: Stefan Richter <[email protected]> Committed: Mon Jan 8 11:46:00 2018 +0100 ---------------------------------------------------------------------- .../io/network/util/TestBufferFactory.java | 35 ++++++++------------ .../network/util/TestPooledBufferProvider.java | 32 +++++------------- 2 files changed, 22 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/409ea231/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java index 9856d22..71ab260 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java @@ -18,18 +18,17 @@ package org.apache.flink.runtime.io.network.util; -import org.apache.flink.core.memory.HeapMemorySegment; -import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.testutils.DiscardingRecycler; -import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.ThreadSafe; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +@ThreadSafe public class TestBufferFactory { public static final int BUFFER_SIZE = 32 * 1024; @@ -40,37 +39,31 @@ public class TestBufferFactory { private final BufferRecycler bufferRecycler; - private AtomicInteger numberOfCreatedBuffers = new AtomicInteger(); + private final int poolSize; - public TestBufferFactory() { - this(BUFFER_SIZE, RECYCLER); - } - - public TestBufferFactory(int bufferSize) { - this(bufferSize, RECYCLER); - } + private int numberOfCreatedBuffers = 0; - public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler) { + public TestBufferFactory(int poolSize, int bufferSize, BufferRecycler bufferRecycler) { checkArgument(bufferSize > 0); + this.poolSize = poolSize; this.bufferSize = bufferSize; this.bufferRecycler = checkNotNull(bufferRecycler); } - public Buffer create() { - numberOfCreatedBuffers.incrementAndGet(); + public synchronized Buffer create() { + if (numberOfCreatedBuffers >= poolSize) { + return null; + } + numberOfCreatedBuffers++; return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler); } - public Buffer createFrom(MemorySegment segment) { - return new Buffer(segment, bufferRecycler); - } - - public int getNumberOfCreatedBuffers() { - return numberOfCreatedBuffers.get(); + public synchronized int getNumberOfCreatedBuffers() { + return numberOfCreatedBuffers; } - public int getBufferSize() { + public synchronized int getBufferSize() { return bufferSize; } http://git-wip-us.apache.org/repos/asf/flink/blob/409ea231/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java index 221a535..a88f4ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java @@ -36,58 +36,42 @@ import static org.apache.flink.util.Preconditions.checkArgument; public class TestPooledBufferProvider implements BufferProvider { - private final Object bufferCreationLock = new Object(); - private final BlockingQueue<Buffer> buffers = new LinkedBlockingDeque<>(); private final TestBufferFactory bufferFactory; private final PooledBufferProviderRecycler bufferRecycler; - private final int poolSize; - public TestPooledBufferProvider(int poolSize) { checkArgument(poolSize > 0); - this.poolSize = poolSize; this.bufferRecycler = new PooledBufferProviderRecycler(buffers); - this.bufferFactory = new TestBufferFactory(32 * 1024, bufferRecycler); + this.bufferFactory = new TestBufferFactory(poolSize, 32 * 1024, bufferRecycler); } @Override public Buffer requestBuffer() throws IOException { final Buffer buffer = buffers.poll(); - if (buffer != null) { return buffer; } - else { - synchronized (bufferCreationLock) { - if (bufferFactory.getNumberOfCreatedBuffers() < poolSize) { - return bufferFactory.create(); - } - } - return null; - } + return bufferFactory.create(); } @Override public Buffer requestBufferBlocking() throws IOException, InterruptedException { - final Buffer buffer = buffers.poll(); - + Buffer buffer = buffers.poll(); if (buffer != null) { return buffer; } - else { - synchronized (bufferCreationLock) { - if (bufferFactory.getNumberOfCreatedBuffers() < poolSize) { - return bufferFactory.create(); - } - } - return buffers.take(); + buffer = bufferFactory.create(); + if (buffer != null) { + return buffer; } + + return buffers.take(); } @Override
