[ https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478886#comment-16478886 ]
Sergey Nuyanzin edited comment on FLINK-9349 at 5/17/18 10:52 AM: ------------------------------------------------------------------ Hello I was able to write a test (based on existing) to reproduce this and one more related issue. the second one is {noformat}Caused by: java.lang.NullPointerException at java.util.LinkedList$ListItr.next(LinkedList.java:893) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.doCommitInternalOffsetsToKafka(Kafka09Fetcher.java:228) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.commitInternalOffsetsToKafka(AbstractFetcher.java:293){noformat} as a result of concurrent work with non thread safe subscribedPartitionStates (LinkedList). from my point of view there could be 2 possible solutions: # added synchronization as you mentioned # use threadsafe collection e.g. CopyOnWriteArrayList instead of LinkedList for subscribedPartitionStates both options pass the test by the way the code snippet for the test is attached [^Flink9349Test.java] in the second option no synchronized is required and it might be an option if partitionStates are not frequent otherwise it makes sense to use synchronization [~yuzhih...@gmail.com] could you please point to more appropriate approach? was (Author: sergey nuyanzin): Hello I was able to write a test (based on existing) to reproduce this and one more related issue. the second one is {noformat}Caused by: java.lang.NullPointerException at java.util.LinkedList$ListItr.next(LinkedList.java:893) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.doCommitInternalOffsetsToKafka(Kafka09Fetcher.java:228) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.commitInternalOffsetsToKafka(AbstractFetcher.java:293){noformat} as a result of concurrent work with non thread safe subscribedPartitionStates (LinkedList). from my point of view there could be 2 possible solutions: # added synchronization as you mentioned # use threadsafe collection e.g. CopyOnWriteArrayList instead of LinkedList for subscribedPartitionStates # [^Flink9349Test.java] both options pass the test by the way the code snippet for the test is attached in the second option no synchronized is required and it might be an option if partitionStates are not frequent otherwise it makes sense to use synchronization [~yuzhih...@gmail.com] could you please point to more appropriate approach? > KafkaConnector Exception while fetching from multiple kafka topics > ------------------------------------------------------------------- > > Key: FLINK-9349 > URL: https://issues.apache.org/jira/browse/FLINK-9349 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0 > Reporter: Vishal Santoshi > Priority: Major > Attachments: Flink9349Test.java > > > ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java > > It seems the List subscribedPartitionStates was being modified when > runFetchLoop iterated the List. > This can happen if, e.g., FlinkKafkaConsumer runs the following code > concurrently: > kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); > > {code:java} > java.util.ConcurrentModificationException > at > java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966) > at java.util.LinkedList$ListItr.next(LinkedList.java:888) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)