[hotfix][network-tests] Simplify TestPooledBufferProvider

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/409ea231
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/409ea231
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/409ea231

Branch: refs/heads/master
Commit: 409ea2314d780436a404b3d8b61768e20af7485a
Parents: 91c72b9
Author: Piotr Nowojski <[email protected]>
Authored: Mon Dec 4 15:08:12 2017 +0100
Committer: Stefan Richter <[email protected]>
Committed: Mon Jan 8 11:46:00 2018 +0100

----------------------------------------------------------------------
 .../io/network/util/TestBufferFactory.java      | 35 ++++++++------------
 .../network/util/TestPooledBufferProvider.java  | 32 +++++-------------
 2 files changed, 22 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/409ea231/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index 9856d22..71ab260 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -18,18 +18,17 @@
 
 package org.apache.flink.runtime.io.network.util;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.concurrent.ThreadSafe;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
+@ThreadSafe
 public class TestBufferFactory {
 
        public static final int BUFFER_SIZE = 32 * 1024;
@@ -40,37 +39,31 @@ public class TestBufferFactory {
 
        private final BufferRecycler bufferRecycler;
 
-       private AtomicInteger numberOfCreatedBuffers = new AtomicInteger();
+       private final int poolSize;
 
-       public TestBufferFactory() {
-               this(BUFFER_SIZE, RECYCLER);
-       }
-
-       public TestBufferFactory(int bufferSize) {
-               this(bufferSize, RECYCLER);
-       }
+       private int numberOfCreatedBuffers = 0;
 
-       public TestBufferFactory(int bufferSize, BufferRecycler bufferRecycler) 
{
+       public TestBufferFactory(int poolSize, int bufferSize, BufferRecycler 
bufferRecycler) {
                checkArgument(bufferSize > 0);
+               this.poolSize = poolSize;
                this.bufferSize = bufferSize;
                this.bufferRecycler = checkNotNull(bufferRecycler);
        }
 
-       public Buffer create() {
-               numberOfCreatedBuffers.incrementAndGet();
+       public synchronized Buffer create() {
+               if (numberOfCreatedBuffers >= poolSize) {
+                       return null;
+               }
 
+               numberOfCreatedBuffers++;
                return new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), 
bufferRecycler);
        }
 
-       public Buffer createFrom(MemorySegment segment) {
-               return new Buffer(segment, bufferRecycler);
-       }
-
-       public int getNumberOfCreatedBuffers() {
-               return numberOfCreatedBuffers.get();
+       public synchronized int getNumberOfCreatedBuffers() {
+               return numberOfCreatedBuffers;
        }
 
-       public int getBufferSize() {
+       public synchronized int getBufferSize() {
                return bufferSize;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/409ea231/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index 221a535..a88f4ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -36,58 +36,42 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 
 public class TestPooledBufferProvider implements BufferProvider {
 
-       private final Object bufferCreationLock = new Object();
-
        private final BlockingQueue<Buffer> buffers = new 
LinkedBlockingDeque<>();
 
        private final TestBufferFactory bufferFactory;
 
        private final PooledBufferProviderRecycler bufferRecycler;
 
-       private final int poolSize;
-
        public TestPooledBufferProvider(int poolSize) {
                checkArgument(poolSize > 0);
-               this.poolSize = poolSize;
 
                this.bufferRecycler = new PooledBufferProviderRecycler(buffers);
-               this.bufferFactory = new TestBufferFactory(32 * 1024, 
bufferRecycler);
+               this.bufferFactory = new TestBufferFactory(poolSize, 32 * 1024, 
bufferRecycler);
        }
 
        @Override
        public Buffer requestBuffer() throws IOException {
                final Buffer buffer = buffers.poll();
-
                if (buffer != null) {
                        return buffer;
                }
-               else {
-                       synchronized (bufferCreationLock) {
-                               if (bufferFactory.getNumberOfCreatedBuffers() < 
poolSize) {
-                                       return bufferFactory.create();
-                               }
-                       }
 
-                       return null;
-               }
+               return bufferFactory.create();
        }
 
        @Override
        public Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
-               final Buffer buffer = buffers.poll();
-
+               Buffer buffer = buffers.poll();
                if (buffer != null) {
                        return buffer;
                }
-               else {
-                       synchronized (bufferCreationLock) {
-                               if (bufferFactory.getNumberOfCreatedBuffers() < 
poolSize) {
-                                       return bufferFactory.create();
-                               }
-                       }
 
-                       return buffers.take();
+               buffer = bufferFactory.create();
+               if (buffer != null) {
+                       return buffer;
                }
+
+               return buffers.take();
        }
 
        @Override

Reply via email to