Tzu-Li (Gordon) Tai created FLINK-8409:
------------------------------------------

             Summary: Race condition in KafkaConsumerThread leads to potential 
NPE
                 Key: FLINK-8409
                 URL: https://issues.apache.org/jira/browse/FLINK-8409
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.3.2, 1.4.0, 1.5.0
            Reporter: Tzu-Li (Gordon) Tai
            Assignee: Tzu-Li (Gordon) Tai
            Priority: Blocker
             Fix For: 1.3.3, 1.5.0, 1.4.1


The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}} 
suggests a race condition with the asynchronous callback from committing 
offsets to Kafka:

{code}
// record the work to be committed by the main consumer thread and make sure 
the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
    log.warn("Committing offsets to Kafka takes longer than the checkpoint 
interval. " +
        "Skipping commit of previous offsets because newer complete checkpoint 
offsets are available. " +
        "This does not compromise Flink's checkpoint integrity.");
}
this.offsetCommitCallback = commitCallback;
{code}

In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be 
checked if there are any offsets to commit. If so, an asynchronous offset 
commit operation will be performed. The NPE happens in the case when the commit 
completes, but {{this.offsetCommitCallback = commitCallback;}} is not yet 
reached.

A possible fix is to make setting the next offsets to commit along with the 
callback instance a single atomic operation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to