[FLINK-8207][network-tests] Unify TestInfiniteBufferProvider and TestPooledBufferProvider
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91c72b9d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91c72b9d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91c72b9d Branch: refs/heads/master Commit: 91c72b9dc611e73790147256adf859c177afe862 Parents: 1f60a1d Author: Piotr Nowojski <[email protected]> Authored: Mon Dec 4 15:13:57 2017 +0100 Committer: Stefan Richter <[email protected]> Committed: Mon Jan 8 11:46:00 2018 +0100 ---------------------------------------------------------------------- .../io/network/api/writer/RecordWriterTest.java | 5 +- .../partition/SpilledSubpartitionViewTest.java | 22 ++---- .../util/TestInfiniteBufferProvider.java | 81 -------------------- .../network/util/TestPooledBufferProvider.java | 10 ++- 4 files changed, 17 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/91c72b9d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 9509013..63540c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -36,7 +36,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.TestBufferFactory; -import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; +import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestTaskEvent; import org.apache.flink.runtime.testutils.DiscardingRecycler; import org.apache.flink.types.IntValue; @@ -421,8 +421,7 @@ public class RecordWriterTest { new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()}; ResultPartitionWriter partition = - createCollectingPartitionWriter(queues, - new TestInfiniteBufferProvider()); + createCollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE)); RecordWriter<?> writer = new RecordWriter<>(partition); writer.broadcastEvent(EndOfPartitionEvent.INSTANCE); http://git-wip-us.apache.org/repos/asf/flink/blob/91c72b9d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java index b748e1c..69d19fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java @@ -24,8 +24,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; -import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider; import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; @@ -52,9 +52,6 @@ public class SpilledSubpartitionViewTest { private static final IOManager IO_MANAGER = new IOManagerAsync(); - private static final TestInfiniteBufferProvider writerBufferPool = - new TestInfiniteBufferProvider(); - @AfterClass public static void shutdown() { IO_MANAGER.shutdown(); @@ -66,7 +63,7 @@ public class SpilledSubpartitionViewTest { final int numberOfBuffersToWrite = 512; // Setup - final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite); + final BufferFileWriter writer = createWriterAndWriteBuffers(numberOfBuffersToWrite); writer.close(); @@ -94,7 +91,7 @@ public class SpilledSubpartitionViewTest { final int numberOfBuffersToWrite = 512; // Setup - final BufferFileWriter writer = createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, numberOfBuffersToWrite); + final BufferFileWriter writer = createWriterAndWriteBuffers(numberOfBuffersToWrite); writer.close(); @@ -134,8 +131,8 @@ public class SpilledSubpartitionViewTest { // Setup writers = new BufferFileWriter[]{ - createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512), - createWriterAndWriteBuffers(IO_MANAGER, writerBufferPool, 512) + createWriterAndWriteBuffers(512), + createWriterAndWriteBuffers(512) }; readers = new ResultSubpartitionView[writers.length]; @@ -211,15 +208,12 @@ public class SpilledSubpartitionViewTest { * * <p> Call {@link BufferFileWriter#close()} to ensure that all buffers have been written. */ - static BufferFileWriter createWriterAndWriteBuffers( - IOManager ioManager, - BufferProvider bufferProvider, - int numberOfBuffers) throws IOException { + private static BufferFileWriter createWriterAndWriteBuffers(int numberOfBuffers) throws IOException { - final BufferFileWriter writer = ioManager.createBufferFileWriter(ioManager.createChannel()); + final BufferFileWriter writer = IO_MANAGER.createBufferFileWriter(IO_MANAGER.createChannel()); for (int i = 0; i < numberOfBuffers; i++) { - writer.writeBlock(bufferProvider.requestBuffer()); + writer.writeBlock(TestBufferFactory.createBuffer()); } writer.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE)); http://git-wip-us.apache.org/repos/asf/flink/blob/91c72b9d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java deleted file mode 100644 index ad40a54..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.util; - -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferListener; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; -import org.apache.flink.runtime.io.network.buffer.BufferRecycler; - -import java.io.IOException; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class TestInfiniteBufferProvider implements BufferProvider { - - private final ConcurrentLinkedQueue<Buffer> buffers = new ConcurrentLinkedQueue<Buffer>(); - - private final TestBufferFactory bufferFactory = new TestBufferFactory( - 32 * 1024, new InfiniteBufferProviderRecycler(buffers)); - - @Override - public Buffer requestBuffer() throws IOException { - Buffer buffer = buffers.poll(); - - if (buffer != null) { - return buffer; - } - - return bufferFactory.create(); - } - - @Override - public Buffer requestBufferBlocking() throws IOException, InterruptedException { - return requestBuffer(); - } - - @Override - public boolean addBufferListener(BufferListener listener) { - return false; - } - - @Override - public boolean isDestroyed() { - return false; - } - - @Override - public int getMemorySegmentSize() { - return bufferFactory.getBufferSize(); - } - - private static class InfiniteBufferProviderRecycler implements BufferRecycler { - - private final ConcurrentLinkedQueue<Buffer> buffers; - - public InfiniteBufferProviderRecycler(ConcurrentLinkedQueue<Buffer> buffers) { - this.buffers = buffers; - } - - @Override - public void recycle(MemorySegment segment) { - buffers.add(new Buffer(segment, this)); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/91c72b9d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java index c354eeb..221a535 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java @@ -28,8 +28,9 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Queues; import java.io.IOException; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; import static org.apache.flink.util.Preconditions.checkArgument; @@ -37,7 +38,7 @@ public class TestPooledBufferProvider implements BufferProvider { private final Object bufferCreationLock = new Object(); - private final ArrayBlockingQueue<Buffer> buffers; + private final BlockingQueue<Buffer> buffers = new LinkedBlockingDeque<>(); private final TestBufferFactory bufferFactory; @@ -49,7 +50,6 @@ public class TestPooledBufferProvider implements BufferProvider { checkArgument(poolSize > 0); this.poolSize = poolSize; - this.buffers = new ArrayBlockingQueue<Buffer>(poolSize); this.bufferRecycler = new PooledBufferProviderRecycler(buffers); this.bufferFactory = new TestBufferFactory(32 * 1024, bufferRecycler); } @@ -109,6 +109,10 @@ public class TestPooledBufferProvider implements BufferProvider { return buffers.size(); } + public int getNumberOfCreatedBuffers() { + return bufferFactory.getNumberOfCreatedBuffers(); + } + private static class PooledBufferProviderRecycler implements BufferRecycler { private final Object listenerRegistrationLock = new Object();
