This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a9d8f7694141c636e2628abb1c50e300c59f65c Author: Piotr Nowojski <[email protected]> AuthorDate: Fri May 10 10:41:06 2019 +0200 [hotfix][test] Drop mockito usage from TestSingleInputGate --- .../partition/consumer/TestSingleInputGate.java | 3 +- .../consumer/StreamTestSingleInputGate.java | 84 +++++++++++++++++++++- 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java index 8404663..f60cdb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java @@ -22,7 +22,6 @@ import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.mockito.Mockito.spy; /** * A test input gate to mock reading data. @@ -36,7 +35,7 @@ public class TestSingleInputGate { public TestSingleInputGate(int numberOfInputChannels, boolean initialize) { checkArgument(numberOfInputChannels >= 1); - inputGate = spy(createSingleInputGate(numberOfInputChannels)); + inputGate = createSingleInputGate(numberOfInputChannels); inputChannels = new TestInputChannel[numberOfInputChannels]; if (initialize) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index dbb81ab..b9fe84f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -21,12 +21,16 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferListener; +import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability; import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; @@ -40,7 +44,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; -import static org.mockito.Mockito.doReturn; /** * Test {@link InputGate} that allows setting multiple channels. Use @@ -76,7 +79,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { inputQueues = new ConcurrentLinkedQueue[numInputChannels]; setupInputChannels(); - doReturn(bufferSize).when(inputGate).getPageSize(); + inputGate.setBufferPool(new NoOpBufferPool(bufferSize)); } @SuppressWarnings("unchecked") @@ -219,4 +222,81 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate { return isEvent; } } + + private static class NoOpBufferPool implements BufferPool { + private int bufferSize; + + public NoOpBufferPool(int bufferSize) { + this.bufferSize = bufferSize; + } + + @Override + public void lazyDestroy() { + } + + @Override + public int getMemorySegmentSize() { + return bufferSize; + } + + @Override + public Buffer requestBuffer() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public Buffer requestBufferBlocking() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addBufferListener(BufferListener listener) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDestroyed() { + throw new UnsupportedOperationException(); + } + + @Override + public int getNumberOfRequiredMemorySegments() { + throw new UnsupportedOperationException(); + } + + @Override + public int getMaxNumberOfMemorySegments() { + throw new UnsupportedOperationException(); + } + + @Override + public int getNumBuffers() { + throw new UnsupportedOperationException(); + } + + @Override + public void setNumBuffers(int numBuffers) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getNumberOfAvailableMemorySegments() { + throw new UnsupportedOperationException(); + } + + @Override + public int bestEffortGetNumOfUsedBuffers() { + throw new UnsupportedOperationException(); + } + + @Override + public void recycle(MemorySegment memorySegment) { + throw new UnsupportedOperationException(); + } + } }
