[ https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16479388#comment-16479388 ]
ASF GitHub Bot commented on FLINK-9349: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6040#discussion_r189029077 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java --- @@ -507,7 +507,7 @@ private void updateMinPunctuatedWatermark(Watermark nextWatermark) { SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { - List<KafkaTopicPartitionState<KPH>> partitionStates = new LinkedList<>(); + List<KafkaTopicPartitionState<KPH>> partitionStates = new CopyOnWriteArrayList<>(); --- End diff -- Would be nice to have a comment on why we need to use a `CopyOnWriteArrayList` > KafkaConnector Exception while fetching from multiple kafka topics > ------------------------------------------------------------------- > > Key: FLINK-9349 > URL: https://issues.apache.org/jira/browse/FLINK-9349 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0, 1.5.0 > Reporter: Vishal Santoshi > Assignee: Sergey Nuyanzin > Priority: Critical > Attachments: Flink9349Test.java > > > ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java > > It seems the List subscribedPartitionStates was being modified when > runFetchLoop iterated the List. > This can happen if, e.g., FlinkKafkaConsumer runs the following code > concurrently: > kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); > > {code:java} > java.util.ConcurrentModificationException > at > java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966) > at java.util.LinkedList$ListItr.next(LinkedList.java:888) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)