Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5284#discussion_r161159244 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java --- @@ -265,10 +274,8 @@ public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception { // -------------------------------------------------------------------- - final OneShotLatch runLatch = new OneShotLatch(); - final OneShotLatch stopLatch = new OneShotLatch(); - final AbstractFetcher<String, ?> fetcher = getRunnableMockFetcher(runLatch, stopLatch); - when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3); + final MockFetcher<String> fetcher = spy(new MockFetcher<>()); + doReturn(state1).doReturn(state2).doReturn(state3).when(fetcher).snapshotCurrentState(); --- End diff -- Maybe we could go one small step further? ``` private static class MockFetcher<T> ... { private final ArrayDeque<HashMap<KafkaTopicPartition, Long>> stateSnapshotsToReturn = new ArrayDeque<>(); public MockFetcher(HashMap<KafkaTopicPartition, Long>.. stateSnapshotsToReturn) { this.stateSnapshotsToReturn.addAll(Arrays.asList(stateSnapshotsToReturn)); } @Override public HashMap<KafkaTopicPartition, Long> snapshotCurrentState() { checkState(!stateSnapshotsToReturn.isEmpty()); return stateSnapshotsToReturn.poll(); } } ```
---