Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190120134
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
---
@@ -390,6 +398,102 @@ public void
testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100,
sourceContext.getLatestWatermark().getTimestamp());
}
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws
Exception {
+ // test data
+ final KafkaTopicPartition testPartition = new
KafkaTopicPartition("test", 42);
+
+ final Map<KafkaTopicPartition, Long> testCommitData = new
HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext =
PowerMockito.mock(SourceContext.class);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(testPartition,
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+
+ final TestFetcher<String> fetcher = new TestFetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic assigner */
+ null, /* punctuated assigner */
+ new TestProcessingTimeService(),
+ 10);
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new
AtomicReference<>();
+ int fetchTasks = 5;
+ final CountDownLatch latch = new CountDownLatch(fetchTasks);
+ ExecutorService service =
Executors.newFixedThreadPool(fetchTasks + 1);
+
+ service.submit(new Thread("fetcher runner") {
+ @Override
+ public void run() {
+ try {
+ latch.await();
+ fetcher.runFetchLoop();
--- End diff --
So, IMO, the test should look something like this:
```
final OneShotLatch fetchLoopWaitLatch = new OneShotLatch();
final OneShotLatch stateIterationBlockLatch = new
OneShotLatch();
final TestFetcher<String> fetcher = new TestFetcher<>(
sourceContext,
partitionsWithInitialOffsets,
null, /* periodic assigner */
null, /* punctuated assigner */
new TestProcessingTimeService(),
10,
fetchLoopWaitLatch,
stateIterationBlockLatch);
// ----- run the fetcher -----
final CheckedThread checkedThread = new CheckedThread() {
@Override
public void go() throws Exception {
fetcher.runFetchLoop();
}
};
checkedThread.start();
// wait until state iteration begins before adding discovered
partitions
fetchLoopWaitLatch.await();
fetcher.addDiscoveredPartitions(Collections.singletonList(testPartition));
stateIterationBlockLatch.trigger();
checkedThread.sync();
```
---