[
https://issues.apache.org/jira/browse/FLINK-32754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751437#comment-17751437
]
Hang Ruan commented on FLINK-32754:
-----------------------------------
[~Yu Chen] [~yunta] ,Thanks for the issue.
I have opened a duplicate issue FLINK-31268 about it. And I have raised a
[PR|https://github.com/apache/flink/pull/22048] to fix.
> Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE
> --------------------------------------------------------------------------
>
> Key: FLINK-32754
> URL: https://issues.apache.org/jira/browse/FLINK-32754
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.17.0, 1.17.1
> Reporter: Yu Chen
> Priority: Major
> Attachments: image-2023-08-04-18-28-05-897.png
>
>
> We registered some metrics in the `enumerator` of the flip-27 source via
> `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in
> JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
> {*}Meanwhile, the task does not experience failover, and the Checkpoints
> cannot be successfully created even after the task is in running state{*}.
> We found that the implementation class of `SplitEnumerator` is
> `LazyInitializedCoordinatorContext`, however, the metricGroup() is
> initialized after calling lazyInitialize(). By reviewing the code, we found
> that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize()
> has not been called yet, so NPE is thrown.
> *Q: Why does this bug prevent the task from creating the Checkpoint?*
> `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the
> member variable `enumerator` in `SourceCoordinator` being null.
> Unfortunately, all Checkpoint-related calls in `SourceCoordinator` are called
> via `runInEventLoop()`.
> In `runInEventLoop()`, if the enumerator is null, it will return directly.
> *Q: Why this bug doesn't trigger a task failover?*
> In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if
> `internalCoordinator.resetToCheckpoint` throws an exception, then it will
> catch the exception and call `cleanAndFailJob ` to try to fail the job.
> However, `globalFailureHandler` is also initialized in `lazyInitialize()`,
> while `schedulerExecutor.execute` will ignore the NPE triggered by
> `globalFailureHandler.handleGlobalFailure(e)`.
> Thus it appears that the task did not failover.
> !image-2023-08-04-18-28-05-897.png|width=963,height=443!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)