Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5284#discussion_r161159244
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 ---
    @@ -265,10 +274,8 @@ public void 
testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception {
     
                // 
--------------------------------------------------------------------
     
    -           final OneShotLatch runLatch = new OneShotLatch();
    -           final OneShotLatch stopLatch = new OneShotLatch();
    -           final AbstractFetcher<String, ?> fetcher = 
getRunnableMockFetcher(runLatch, stopLatch);
    -           when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, 
state3);
    +           final MockFetcher<String> fetcher = spy(new MockFetcher<>());
    +           
doReturn(state1).doReturn(state2).doReturn(state3).when(fetcher).snapshotCurrentState();
    --- End diff --
    
    Maybe we could go one small step further?
    
    ```
    private static class MockFetcher<T> ... {
      private final ArrayDeque<HashMap<KafkaTopicPartition, Long>> 
stateSnapshotsToReturn = new ArrayDeque<>();
      
      public MockFetcher(HashMap<KafkaTopicPartition, Long>.. 
stateSnapshotsToReturn) {
        
this.stateSnapshotsToReturn.addAll(Arrays.asList(stateSnapshotsToReturn));
      }
    
      @Override
      public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() {
        checkState(!stateSnapshotsToReturn.isEmpty());
        return stateSnapshotsToReturn.poll();
      }
    }
    ```


---

Reply via email to