[ 
https://issues.apache.org/jira/browse/FLINK-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354373#comment-16354373
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8409:
--------------------------------------------

Merged.

1.4 - 3655200799929409352945a3f4fce0f3e987b9ad
1.5 - 08163009e443d00379696f9f84b3ccb0af6a25b6

> Race condition in KafkaConsumerThread leads to potential NPE
> ------------------------------------------------------------
>
>                 Key: FLINK-8409
>                 URL: https://issues.apache.org/jira/browse/FLINK-8409
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.4.0, 1.3.2, 1.5.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}} 
> suggests a race condition with the asynchronous callback from committing 
> offsets to Kafka:
> {code}
> // record the work to be committed by the main consumer thread and make sure 
> the consumer notices that
> if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
>     log.warn("Committing offsets to Kafka takes longer than the checkpoint 
> interval. " +
>         "Skipping commit of previous offsets because newer complete 
> checkpoint offsets are available. " +
>         "This does not compromise Flink's checkpoint integrity.");
> }
> this.offsetCommitCallback = commitCallback;
> {code}
> In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be 
> checked if there are any offsets to commit. If so, an asynchronous offset 
> commit operation will be performed. The NPE happens in the case when the 
> commit completes, but {{this.offsetCommitCallback = commitCallback;}} is not 
> yet reached.
> A possible fix is to make setting the next offsets to commit along with the 
> callback instance a single atomic operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to