[ https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486693#comment-16486693 ]
ASF GitHub Bot commented on FLINK-9349: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r190111239 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTest.java --- @@ -390,6 +398,102 @@ public void testPeriodicWatermarksWithNoSubscribedPartitionsShouldYieldNoWaterma assertEquals(100, sourceContext.getLatestWatermark().getTimestamp()); } + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + + final Map<KafkaTopicPartition, Long> testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext<String> sourceContext = PowerMockito.mock(SourceContext.class); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(testPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + + final TestFetcher<String> fetcher = new TestFetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic assigner */ + null, /* punctuated assigner */ + new TestProcessingTimeService(), + 10); + + // ----- run the fetcher ----- + + final AtomicReference<Throwable> error = new AtomicReference<>(); + int fetchTasks = 5; + final CountDownLatch latch = new CountDownLatch(fetchTasks); + ExecutorService service = Executors.newFixedThreadPool(fetchTasks + 1); + + service.submit(new Thread("fetcher runner") { + @Override + public void run() { + try { + latch.await(); + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }); + + for (int i = 0; i < fetchTasks; i++) { + service.submit(new Thread("add partitions " + i) { + @Override + public void run() { + try { + List<KafkaTopicPartition> newPartitions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + newPartitions.add(testPartition); + } + fetcher.addDiscoveredPartitions(newPartitions); + latch.countDown(); + for (int i = 0; i < 100; i++) { + fetcher.addDiscoveredPartitions(newPartitions); + Thread.sleep(1L); + } + } catch (Throwable t) { + error.set(t); + } + } + }); + } + + service.awaitTermination(1L, TimeUnit.SECONDS); + + // ----- trigger the offset commit ----- --- End diff -- We should be able to ignore offset commit triggering in this test > KafkaConnector Exception while fetching from multiple kafka topics > ------------------------------------------------------------------- > > Key: FLINK-9349 > URL: https://issues.apache.org/jira/browse/FLINK-9349 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0, 1.5.0 > Reporter: Vishal Santoshi > Assignee: Sergey Nuyanzin > Priority: Critical > Attachments: Flink9349Test.java > > > ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java > > It seems the List subscribedPartitionStates was being modified when > runFetchLoop iterated the List. > This can happen if, e.g., FlinkKafkaConsumer runs the following code > concurrently: > kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); > > {code:java} > java.util.ConcurrentModificationException > at > java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966) > at java.util.LinkedList$ListItr.next(LinkedList.java:888) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)