Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5268#discussion_r164401125
--- Diff:
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
---
@@ -18,123 +18,110 @@
package org.apache.flink.streaming.connectors.kinesis.testutils;
import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
import
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
-import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
-import
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Extension of the {@link KinesisDataFetcher} for testing.
*/
-public class TestableKinesisDataFetcher extends KinesisDataFetcher<String>
{
-
- private static final Object fakeCheckpointLock = new Object();
-
- private long numElementsCollected;
+public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
private OneShotLatch runWaiter;
+ private OneShotLatch initialDiscoveryWaiter;
+
+ private volatile boolean running;
public TestableKinesisDataFetcher(
List<String> fakeStreams,
+ SourceFunction.SourceContext<T> sourceContext,
Properties fakeConfiguration,
+ KinesisDeserializationSchema<T> deserializationSchema,
int fakeTotalCountOfSubtasks,
int fakeIndexOfThisSubtask,
AtomicReference<Throwable> thrownErrorUnderTest,
LinkedList<KinesisStreamShardState>
subscribedShardsStateUnderTest,
HashMap<String, String>
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
KinesisProxyInterface fakeKinesis) {
- super(fakeStreams,
- getMockedSourceContext(),
- fakeCheckpointLock,
+ super(
+ fakeStreams,
+ sourceContext,
+ sourceContext.getCheckpointLock(),
getMockedRuntimeContext(fakeTotalCountOfSubtasks,
fakeIndexOfThisSubtask),
fakeConfiguration,
- new KinesisDeserializationSchemaWrapper<>(new
SimpleStringSchema()),
+ deserializationSchema,
thrownErrorUnderTest,
subscribedShardsStateUnderTest,
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
fakeKinesis);
- this.numElementsCollected = 0;
this.runWaiter = new OneShotLatch();
- }
+ this.initialDiscoveryWaiter = new OneShotLatch();
- public long getNumOfElementsCollected() {
- return numElementsCollected;
+ this.running = true;
}
@Override
- protected KinesisDeserializationSchema<String>
getClonedDeserializationSchema() {
- return new KinesisDeserializationSchemaWrapper<>(new
SimpleStringSchema());
+ public void runFetcher() throws Exception {
+ runWaiter.trigger();
+ super.runFetcher();
+ }
+
+ public void waitUntilRun() throws Exception {
+ runWaiter.await();
}
@Override
- protected void emitRecordAndUpdateState(String record, long
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
- synchronized (fakeCheckpointLock) {
- this.numElementsCollected++;
- updateState(shardStateIndex, lastSequenceNumber);
- }
+ protected ExecutorService createShardConsumersThreadPool(String
subtaskName) {
+ // this is just a dummy fetcher, so no need to create a thread
pool for shard consumers
+ ExecutorService mockExecutor = mock(ExecutorService.class);
+ when(mockExecutor.isTerminated()).thenAnswer((InvocationOnMock
invocation) -> !running);
+ return mockExecutor;
}
@Override
- public void runFetcher() throws Exception {
- runWaiter.trigger();
- super.runFetcher();
+ public void awaitTermination() throws InterruptedException {
+ this.running = false;
+ super.awaitTermination();
}
- public void waitUntilRun() throws Exception {
- runWaiter.await();
+ @Override
+ public List<StreamShardHandle> discoverNewShardsToSubscribe() throws
InterruptedException {
+ initialDiscoveryWaiter.trigger();
--- End diff --
shouldn't we trigger this after `super.discoverNewShardstoSubscribe()`?
As in
```
List<StreamShardHandle> newShards = super.discoverNewShardsToSubscribe();
initialDiscoveryWaiter.trigger();
return newShards;
```
---