This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2e265f81927d9871d408dec5d21ed6ebddd7910 Author: Arvid Heise <[email protected]> AuthorDate: Wed Sep 8 18:22:34 2021 +0200 [FLINK-23528][connectors/kinesis] Use proper mock of ExecutorService in KinesisDataFetcherTest. Mockito makes it so much harder to debug tests. Here, we replace Mockito with a small test class that emulates the previously dispersed functionality in a simpler way. --- .../kinesis/internals/KinesisDataFetcherTest.java | 13 ++- .../testutils/FakeKinesisBehavioursFactory.java | 25 ++--- .../testutils/TestableKinesisDataFetcher.java | 108 ++++++++++++++------- 3 files changed, 94 insertions(+), 52 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java index c2d6dde7d96..8be2bb41097 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java @@ -17,6 +17,11 @@ package org.apache.flink.streaming.connectors.kinesis.internals; +import com.amazonaws.services.kinesis.model.HashKeyRange; +import com.amazonaws.services.kinesis.model.SequenceNumberRange; +import com.amazonaws.services.kinesis.model.Shard; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.CheckedThread; @@ -44,12 +49,6 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDa import org.apache.flink.streaming.connectors.kinesis.testutils.TestableKinesisDataFetcherForShardConsumerException; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; - -import com.amazonaws.services.kinesis.model.HashKeyRange; -import com.amazonaws.services.kinesis.model.SequenceNumberRange; -import com.amazonaws.services.kinesis.model.Shard; -import org.apache.commons.lang3.mutable.MutableBoolean; -import org.apache.commons.lang3.mutable.MutableLong; import org.junit.Assert; import org.junit.Test; import org.powermock.reflect.Whitebox; @@ -93,7 +92,7 @@ public class KinesisDataFetcherTest extends TestLogger { assertTrue(fetcher.isRunning()); } - @Test(timeout = 10000) + @Test public void testIsRunningFalseAfterShutDown() throws InterruptedException { KinesisDataFetcher<String> fetcher = createTestDataFetcherWithNoShards(10, 2, "test-stream"); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java index 1e737ac3358..46cc797ff7d 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.streaming.connectors.kinesis.testutils.TestUtils.createDummyStreamShardHandle; @@ -660,17 +661,19 @@ public class FakeKinesisBehavioursFactory { shardIterator); List<Record> records = Collections.emptyList(); try { - String data = queue.take(); - Record record = - new Record() - .withData( - ByteBuffer.wrap( - data.getBytes(ConfigConstants.DEFAULT_CHARSET))) - .withPartitionKey(UUID.randomUUID().toString()) - .withApproximateArrivalTimestamp( - new Date(System.currentTimeMillis())) - .withSequenceNumber(String.valueOf(0)); - records = Collections.singletonList(record); + String data = queue.poll(100, TimeUnit.MILLISECONDS); + if (data != null) { + Record record = + new Record() + .withData( + ByteBuffer.wrap( + data.getBytes(ConfigConstants.DEFAULT_CHARSET))) + .withPartitionKey(UUID.randomUUID().toString()) + .withApproximateArrivalTimestamp( + new Date(System.currentTimeMillis())) + .withSequenceNumber(String.valueOf(0)); + records = Collections.singletonList(record); + } } catch (InterruptedException e) { shardIterator = null; } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java index b4c69eebb8c..4bdb95eb78a 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java @@ -26,23 +26,19 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.mockito.invocation.InvocationOnMock; - +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** Extension of the {@link KinesisDataFetcher} for testing. */ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> { @@ -50,9 +46,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> { private final Semaphore discoveryWaiter = new Semaphore(0); private final OneShotLatch shutdownWaiter; - private volatile boolean running; - private volatile boolean executorServiceShutdownNowCalled; - public TestableKinesisDataFetcher( List<String> fakeStreams, SourceFunction.SourceContext<T> sourceContext, @@ -109,8 +102,6 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> { this.runWaiter = new OneShotLatch(); this.shutdownWaiter = new OneShotLatch(); - - this.running = true; } @Override @@ -130,28 +121,7 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> { @Override protected ExecutorService createShardConsumersThreadPool(String subtaskName) { // this is just a dummy fetcher, so no need to create a thread pool for shard consumers - ExecutorService mockExecutorService = mock(ExecutorService.class); - when(mockExecutorService.isTerminated()) - .thenAnswer((InvocationOnMock invocation) -> !running); - when(mockExecutorService.shutdownNow()) - .thenAnswer( - invocationOnMock -> { - executorServiceShutdownNowCalled = true; - return Collections.emptyList(); - }); - try { - when(mockExecutorService.awaitTermination(anyLong(), any())) - .thenAnswer(invocationOnMock -> !running && executorServiceShutdownNowCalled); - } catch (InterruptedException e) { - // We're just trying to stub the method. Must acknowledge the checked exception. - } - return mockExecutorService; - } - - @Override - public void awaitTermination() throws InterruptedException { - this.running = false; - super.awaitTermination(); + return new TestExecutorService(); } @Override @@ -174,4 +144,74 @@ public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> { public void waitUntilDiscovery(int number) throws InterruptedException { discoveryWaiter.acquire(number); } + + private static class TestExecutorService implements ExecutorService { + boolean terminated = false; + + @Override + public void execute(Runnable command) {} + + @Override + public void shutdown() { + terminated = true; + } + + @Override + public List<Runnable> shutdownNow() { + terminated = true; + return Collections.emptyList(); + } + + @Override + public boolean isShutdown() { + return terminated; + } + + @Override + public boolean isTerminated() { + return terminated; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + return terminated; + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + return null; + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + return null; + } + + @Override + public Future<?> submit(Runnable task) { + return null; + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { + return null; + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { + return null; + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) { + return null; + } + + @Override + public <T> T invokeAny( + Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { + return null; + } + } }
