Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5268#discussion_r164401125
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestableKinesisDataFetcher.java
 ---
    @@ -18,123 +18,110 @@
     package org.apache.flink.streaming.connectors.kinesis.testutils;
     
     import org.apache.flink.api.common.functions.RuntimeContext;
    -import org.apache.flink.api.common.serialization.SimpleStringSchema;
     import org.apache.flink.core.testutils.OneShotLatch;
     import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher;
     import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
    -import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
    +import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
     import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
     import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
    -import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper;
     
     import org.mockito.Mockito;
     import org.mockito.invocation.InvocationOnMock;
    -import org.mockito.stubbing.Answer;
     
     import java.util.HashMap;
     import java.util.LinkedList;
     import java.util.List;
     import java.util.Properties;
    +import java.util.concurrent.ExecutorService;
     import java.util.concurrent.atomic.AtomicReference;
     
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
     /**
      * Extension of the {@link KinesisDataFetcher} for testing.
      */
    -public class TestableKinesisDataFetcher extends KinesisDataFetcher<String> 
{
    -
    -   private static final Object fakeCheckpointLock = new Object();
    -
    -   private long numElementsCollected;
    +public class TestableKinesisDataFetcher<T> extends KinesisDataFetcher<T> {
     
        private OneShotLatch runWaiter;
    +   private OneShotLatch initialDiscoveryWaiter;
    +
    +   private volatile boolean running;
     
        public TestableKinesisDataFetcher(
                        List<String> fakeStreams,
    +                   SourceFunction.SourceContext<T> sourceContext,
                        Properties fakeConfiguration,
    +                   KinesisDeserializationSchema<T> deserializationSchema,
                        int fakeTotalCountOfSubtasks,
                        int fakeIndexOfThisSubtask,
                        AtomicReference<Throwable> thrownErrorUnderTest,
                        LinkedList<KinesisStreamShardState> 
subscribedShardsStateUnderTest,
                        HashMap<String, String> 
subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
                        KinesisProxyInterface fakeKinesis) {
    -           super(fakeStreams,
    -                   getMockedSourceContext(),
    -                   fakeCheckpointLock,
    +           super(
    +                   fakeStreams,
    +                   sourceContext,
    +                   sourceContext.getCheckpointLock(),
                        getMockedRuntimeContext(fakeTotalCountOfSubtasks, 
fakeIndexOfThisSubtask),
                        fakeConfiguration,
    -                   new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema()),
    +                   deserializationSchema,
                        thrownErrorUnderTest,
                        subscribedShardsStateUnderTest,
                        subscribedStreamsToLastDiscoveredShardIdsStateUnderTest,
                        fakeKinesis);
     
    -           this.numElementsCollected = 0;
                this.runWaiter = new OneShotLatch();
    -   }
    +           this.initialDiscoveryWaiter = new OneShotLatch();
     
    -   public long getNumOfElementsCollected() {
    -           return numElementsCollected;
    +           this.running = true;
        }
     
        @Override
    -   protected KinesisDeserializationSchema<String> 
getClonedDeserializationSchema() {
    -           return new KinesisDeserializationSchemaWrapper<>(new 
SimpleStringSchema());
    +   public void runFetcher() throws Exception {
    +           runWaiter.trigger();
    +           super.runFetcher();
    +   }
    +
    +   public void waitUntilRun() throws Exception {
    +           runWaiter.await();
        }
     
        @Override
    -   protected void emitRecordAndUpdateState(String record, long 
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
    -           synchronized (fakeCheckpointLock) {
    -                   this.numElementsCollected++;
    -                   updateState(shardStateIndex, lastSequenceNumber);
    -           }
    +   protected ExecutorService createShardConsumersThreadPool(String 
subtaskName) {
    +           // this is just a dummy fetcher, so no need to create a thread 
pool for shard consumers
    +           ExecutorService mockExecutor = mock(ExecutorService.class);
    +           when(mockExecutor.isTerminated()).thenAnswer((InvocationOnMock 
invocation) -> !running);
    +           return mockExecutor;
        }
     
        @Override
    -   public void runFetcher() throws Exception {
    -           runWaiter.trigger();
    -           super.runFetcher();
    +   public void awaitTermination() throws InterruptedException {
    +           this.running = false;
    +           super.awaitTermination();
        }
     
    -   public void waitUntilRun() throws Exception {
    -           runWaiter.await();
    +   @Override
    +   public List<StreamShardHandle> discoverNewShardsToSubscribe() throws 
InterruptedException {
    +           initialDiscoveryWaiter.trigger();
    --- End diff --
    
    shouldn't we trigger this after `super.discoverNewShardstoSubscribe()`?
    
    As in
    ```
    List<StreamShardHandle> newShards = super.discoverNewShardsToSubscribe();
    initialDiscoveryWaiter.trigger();
    return newShards;
    ```



---

Reply via email to