[ https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16479386#comment-16479386 ]
ASF GitHub Bot commented on FLINK-9349: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189031075 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Flink9349Test.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internal; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Flink9349Test}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConsumerThread.class) +public class Flink9349Test { + @Test + public void testConcurrentPartitionsDiscoveryAndLoopFetching() throws Exception { --- End diff -- I think we should have a similar test, but move it to `Kafka09FetcherTest`. > KafkaConnector Exception while fetching from multiple kafka topics > ------------------------------------------------------------------- > > Key: FLINK-9349 > URL: https://issues.apache.org/jira/browse/FLINK-9349 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0, 1.5.0 > Reporter: Vishal Santoshi > Assignee: Sergey Nuyanzin > Priority: Critical > Attachments: Flink9349Test.java > > > ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java > > It seems the List subscribedPartitionStates was being modified when > runFetchLoop iterated the List. > This can happen if, e.g., FlinkKafkaConsumer runs the following code > concurrently: > kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); > > {code:java} > java.util.ConcurrentModificationException > at > java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966) > at java.util.LinkedList$ListItr.next(LinkedList.java:888) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)