[FLINK-8207][network-tests] Unify TestInfiniteBufferProvider and 
TestPooledBufferProvider


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91c72b9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91c72b9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91c72b9d

Branch: refs/heads/master
Commit: 91c72b9dc611e73790147256adf859c177afe862
Parents: 1f60a1d
Author: Piotr Nowojski <[email protected]>
Authored: Mon Dec 4 15:13:57 2017 +0100
Committer: Stefan Richter <[email protected]>
Committed: Mon Jan 8 11:46:00 2018 +0100

----------------------------------------------------------------------
 .../io/network/api/writer/RecordWriterTest.java |  5 +-
 .../partition/SpilledSubpartitionViewTest.java  | 22 ++----
 .../util/TestInfiniteBufferProvider.java        | 81 --------------------
 .../network/util/TestPooledBufferProvider.java  | 10 ++-
 4 files changed, 17 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91c72b9d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 9509013..63540c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -36,7 +36,7 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.types.IntValue;
@@ -421,8 +421,7 @@ public class RecordWriterTest {
                        new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
 
                ResultPartitionWriter partition =
-                       createCollectingPartitionWriter(queues,
-                               new TestInfiniteBufferProvider());
+                       createCollectingPartitionWriter(queues, new 
TestPooledBufferProvider(Integer.MAX_VALUE));
                RecordWriter<?> writer = new RecordWriter<>(partition);
 
                writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);

http://git-wip-us.apache.org/repos/asf/flink/blob/91c72b9d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index b748e1c..69d19fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -24,8 +24,8 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 
@@ -52,9 +52,6 @@ public class SpilledSubpartitionViewTest {
 
        private static final IOManager IO_MANAGER = new IOManagerAsync();
 
-       private static final TestInfiniteBufferProvider writerBufferPool =
-               new TestInfiniteBufferProvider();
-
        @AfterClass
        public static void shutdown() {
                IO_MANAGER.shutdown();
@@ -66,7 +63,7 @@ public class SpilledSubpartitionViewTest {
                final int numberOfBuffersToWrite = 512;
 
                // Setup
-               final BufferFileWriter writer = 
createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 
numberOfBuffersToWrite);
+               final BufferFileWriter writer = 
createWriterAndWriteBuffers(numberOfBuffersToWrite);
 
                writer.close();
 
@@ -94,7 +91,7 @@ public class SpilledSubpartitionViewTest {
                final int numberOfBuffersToWrite = 512;
 
                // Setup
-               final BufferFileWriter writer = 
createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 
numberOfBuffersToWrite);
+               final BufferFileWriter writer = 
createWriterAndWriteBuffers(numberOfBuffersToWrite);
 
                writer.close();
 
@@ -134,8 +131,8 @@ public class SpilledSubpartitionViewTest {
 
                        // Setup
                        writers = new BufferFileWriter[]{
-                               createWriterAndWriteBuffers(IO_MANAGER, 
writerBufferPool, 512),
-                               createWriterAndWriteBuffers(IO_MANAGER, 
writerBufferPool, 512)
+                               createWriterAndWriteBuffers(512),
+                               createWriterAndWriteBuffers(512)
                        };
 
                        readers = new ResultSubpartitionView[writers.length];
@@ -211,15 +208,12 @@ public class SpilledSubpartitionViewTest {
         *
         * <p> Call {@link BufferFileWriter#close()} to ensure that all buffers 
have been written.
         */
-       static BufferFileWriter createWriterAndWriteBuffers(
-               IOManager ioManager,
-               BufferProvider bufferProvider,
-               int numberOfBuffers) throws IOException {
+       private static BufferFileWriter createWriterAndWriteBuffers(int 
numberOfBuffers) throws IOException {
 
-               final BufferFileWriter writer = 
ioManager.createBufferFileWriter(ioManager.createChannel());
+               final BufferFileWriter writer = 
IO_MANAGER.createBufferFileWriter(IO_MANAGER.createChannel());
 
                for (int i = 0; i < numberOfBuffers; i++) {
-                       writer.writeBlock(bufferProvider.requestBuffer());
+                       writer.writeBlock(TestBufferFactory.createBuffer());
                }
 
                
writer.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));

http://git-wip-us.apache.org/repos/asf/flink/blob/91c72b9d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
deleted file mode 100644
index ad40a54..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.util;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferListener;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-public class TestInfiniteBufferProvider implements BufferProvider {
-
-       private final ConcurrentLinkedQueue<Buffer> buffers = new 
ConcurrentLinkedQueue<Buffer>();
-
-       private final TestBufferFactory bufferFactory = new TestBufferFactory(
-                       32 * 1024, new InfiniteBufferProviderRecycler(buffers));
-
-       @Override
-       public Buffer requestBuffer() throws IOException {
-               Buffer buffer = buffers.poll();
-
-               if (buffer != null) {
-                       return buffer;
-               }
-
-               return bufferFactory.create();
-       }
-
-       @Override
-       public Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
-               return requestBuffer();
-       }
-
-       @Override
-       public boolean addBufferListener(BufferListener listener) {
-               return false;
-       }
-
-       @Override
-       public boolean isDestroyed() {
-               return false;
-       }
-
-       @Override
-       public int getMemorySegmentSize() {
-               return bufferFactory.getBufferSize();
-       }
-
-       private static class InfiniteBufferProviderRecycler implements 
BufferRecycler {
-
-               private final ConcurrentLinkedQueue<Buffer> buffers;
-
-               public 
InfiniteBufferProviderRecycler(ConcurrentLinkedQueue<Buffer> buffers) {
-                       this.buffers = buffers;
-               }
-
-               @Override
-               public void recycle(MemorySegment segment) {
-                       buffers.add(new Buffer(segment, this));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91c72b9d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index c354eeb..221a535 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -28,8 +28,9 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Queues;
 
 import java.io.IOException;
 import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -37,7 +38,7 @@ public class TestPooledBufferProvider implements 
BufferProvider {
 
        private final Object bufferCreationLock = new Object();
 
-       private final ArrayBlockingQueue<Buffer> buffers;
+       private final BlockingQueue<Buffer> buffers = new 
LinkedBlockingDeque<>();
 
        private final TestBufferFactory bufferFactory;
 
@@ -49,7 +50,6 @@ public class TestPooledBufferProvider implements 
BufferProvider {
                checkArgument(poolSize > 0);
                this.poolSize = poolSize;
 
-               this.buffers = new ArrayBlockingQueue<Buffer>(poolSize);
                this.bufferRecycler = new PooledBufferProviderRecycler(buffers);
                this.bufferFactory = new TestBufferFactory(32 * 1024, 
bufferRecycler);
        }
@@ -109,6 +109,10 @@ public class TestPooledBufferProvider implements 
BufferProvider {
                return buffers.size();
        }
 
+       public int getNumberOfCreatedBuffers() {
+               return bufferFactory.getNumberOfCreatedBuffers();
+       }
+
        private static class PooledBufferProviderRecycler implements 
BufferRecycler {
 
                private final Object listenerRegistrationLock = new Object();

Reply via email to