[
https://issues.apache.org/jira/browse/FLINK-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354373#comment-16354373
]
Tzu-Li (Gordon) Tai commented on FLINK-8409:
--------------------------------------------
Merged.
1.4 - 3655200799929409352945a3f4fce0f3e987b9ad
1.5 - 08163009e443d00379696f9f84b3ccb0af6a25b6
> Race condition in KafkaConsumerThread leads to potential NPE
> ------------------------------------------------------------
>
> Key: FLINK-8409
> URL: https://issues.apache.org/jira/browse/FLINK-8409
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.4.0, 1.3.2, 1.5.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}}
> suggests a race condition with the asynchronous callback from committing
> offsets to Kafka:
> {code}
> // record the work to be committed by the main consumer thread and make sure
> the consumer notices that
> if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
> log.warn("Committing offsets to Kafka takes longer than the checkpoint
> interval. " +
> "Skipping commit of previous offsets because newer complete
> checkpoint offsets are available. " +
> "This does not compromise Flink's checkpoint integrity.");
> }
> this.offsetCommitCallback = commitCallback;
> {code}
> In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be
> checked if there are any offsets to commit. If so, an asynchronous offset
> commit operation will be performed. The NPE happens in the case when the
> commit completes, but {{this.offsetCommitCallback = commitCallback;}} is not
> yet reached.
> A possible fix is to make setting the next offsets to commit along with the
> callback instance a single atomic operation.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)