[
https://issues.apache.org/jira/browse/FLINK-8409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333922#comment-16333922
]
ASF GitHub Bot commented on FLINK-8409:
---------------------------------------
GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/5329
[FLINK-8409] [kafka] Fix potential NPE in KafkaConsumerThread
## What is the purpose of the change
This PR fixes a race condition that may lead to a NPE in the async
callbacks for Kafka offset committing.
The following lines in the KafkaConsumerThread::setOffsetsToCommit(...)
suggests a race condition with the asynchronous callback from committing
offsets to Kafka:
```
// record the work to be committed by the main consumer thread and make
sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint
interval. " +
"Skipping commit of previous offsets because newer complete
checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
this.offsetCommitCallback = commitCallback;
```
In the main consumer thread's main loop, `nextOffsetsToCommit` will be
checked if there are any offsets to commit. If so, an asynchronous offset
commit operation will be performed. The NPE happens in the case when the commit
completes, but `this.offsetCommitCallback = commitCallback;` is not yet reached.
This PR fixes this by making setting the `commitCallback` and
`nextOffsetsToCommit` an atomic operation.
## Verifying this change
No new tests were added.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes /
**no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs /
JavaDocs / not documented)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-8409
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5329.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5329
----
commit c8081acaf51b3407af7d425063572f03a68021ac
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2018-01-13T12:13:20Z
[FLINK-8409] [kafka] Fix offset committing race condition in
KafkaConsumerThread
commit a1bf7af3a80bbbb6871b1177cb58c127c94bd75c
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date: 2018-01-13T12:35:03Z
[hotfix] [kafka] Make AbortedReassignmentException a static class
----
> Race condition in KafkaConsumerThread leads to potential NPE
> ------------------------------------------------------------
>
> Key: FLINK-8409
> URL: https://issues.apache.org/jira/browse/FLINK-8409
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.4.0, 1.3.2, 1.5.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}}
> suggests a race condition with the asynchronous callback from committing
> offsets to Kafka:
> {code}
> // record the work to be committed by the main consumer thread and make sure
> the consumer notices that
> if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
> log.warn("Committing offsets to Kafka takes longer than the checkpoint
> interval. " +
> "Skipping commit of previous offsets because newer complete
> checkpoint offsets are available. " +
> "This does not compromise Flink's checkpoint integrity.");
> }
> this.offsetCommitCallback = commitCallback;
> {code}
> In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be
> checked if there are any offsets to commit. If so, an asynchronous offset
> commit operation will be performed. The NPE happens in the case when the
> commit completes, but {{this.offsetCommitCallback = commitCallback;}} is not
> yet reached.
> A possible fix is to make setting the next offsets to commit along with the
> callback instance a single atomic operation.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)