[ https://issues.apache.org/jira/browse/FLINK-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354373#comment-16354373 ]
Tzu-Li (Gordon) Tai commented on FLINK-8409: -------------------------------------------- Merged. 1.4 - 3655200799929409352945a3f4fce0f3e987b9ad 1.5 - 08163009e443d00379696f9f84b3ccb0af6a25b6 > Race condition in KafkaConsumerThread leads to potential NPE > ------------------------------------------------------------ > > Key: FLINK-8409 > URL: https://issues.apache.org/jira/browse/FLINK-8409 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0, 1.3.2, 1.5.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}} > suggests a race condition with the asynchronous callback from committing > offsets to Kafka: > {code} > // record the work to be committed by the main consumer thread and make sure > the consumer notices that > if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) { > log.warn("Committing offsets to Kafka takes longer than the checkpoint > interval. " + > "Skipping commit of previous offsets because newer complete > checkpoint offsets are available. " + > "This does not compromise Flink's checkpoint integrity."); > } > this.offsetCommitCallback = commitCallback; > {code} > In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be > checked if there are any offsets to commit. If so, an asynchronous offset > commit operation will be performed. The NPE happens in the case when the > commit completes, but {{this.offsetCommitCallback = commitCallback;}} is not > yet reached. > A possible fix is to make setting the next offsets to commit along with the > callback instance a single atomic operation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)