Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6040#discussion_r190110928
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java
---
@@ -390,6 +398,102 @@ public void
testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma
assertEquals(100,
sourceContext.getLatestWatermark().getTimestamp());
}
+ @Test
+ public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws
Exception {
+ // test data
+ final KafkaTopicPartition testPartition = new
KafkaTopicPartition("test", 42);
+
+ final Map<KafkaTopicPartition, Long> testCommitData = new
HashMap<>();
+ testCommitData.put(testPartition, 11L);
+
+ // ----- create the test fetcher -----
+
+ @SuppressWarnings("unchecked")
+ SourceContext<String> sourceContext =
PowerMockito.mock(SourceContext.class);
--- End diff --
It is unnecessary to use a power mock here. A dummy implementation of a
`SourceContext` will be better.
---