[ https://issues.apache.org/jira/browse/FLINK-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354356#comment-16354356 ]
ASF GitHub Bot commented on FLINK-8409: --------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5329 > Race condition in KafkaConsumerThread leads to potential NPE > ------------------------------------------------------------ > > Key: FLINK-8409 > URL: https://issues.apache.org/jira/browse/FLINK-8409 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.4.0, 1.3.2, 1.5.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}} > suggests a race condition with the asynchronous callback from committing > offsets to Kafka: > {code} > // record the work to be committed by the main consumer thread and make sure > the consumer notices that > if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) { > log.warn("Committing offsets to Kafka takes longer than the checkpoint > interval. " + > "Skipping commit of previous offsets because newer complete > checkpoint offsets are available. " + > "This does not compromise Flink's checkpoint integrity."); > } > this.offsetCommitCallback = commitCallback; > {code} > In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be > checked if there are any offsets to commit. If so, an asynchronous offset > commit operation will be performed. The NPE happens in the case when the > commit completes, but {{this.offsetCommitCallback = commitCallback;}} is not > yet reached. > A possible fix is to make setting the next offsets to commit along with the > callback instance a single atomic operation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)