[ https://issues.apache.org/jira/browse/FLINK-32754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yu Chen updated FLINK-32754: ---------------------------- Description: We registered some metrics in the `enumerator` of the flip-27 source via `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null. {*}Meanwhile, the task does not experience failover, and the Checkpoints cannot be successfully created even after the task is in running state{*}. We found that the implementation class of `SplitEnumerator` is `LazyInitializedCoordinatorContext`, however, the metricGroup() is initialized after calling lazyInitialize(). By reviewing the code, we found that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() has not been called yet, so NPE is thrown. *Q: Why does this bug prevent the task from creating the Checkpoint?* `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the member variable `enumerator` in `SourceCoordinator` being null. Unfortunately, all Checkpoint-related calls in `SourceCoordinator` are called via `runInEventLoop()`. In `runInEventLoop()`, if the enumerator is null, it will return directly. *Q: Why this bug doesn't trigger a task failover?* In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if `internalCoordinator.resetToCheckpoint` throws an exception, then it will catch the exception and call `cleanAndFailJob ` to try to fail the job. However, `globalFailureHandler` is also initialized in `lazyInitialize()`, while `schedulerExecutor.execute` will ignore the NPE triggered by `globalFailureHandler.handleGlobalFailure(e)`. Thus it appears that the task did not failover. !image-2023-08-04-18-28-05-897.png|width=963,height=443! was: We registered some metrics in the `enumerator` of the flip-27 source via `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null. Meanwhile, the task does not experience failover, and the Checkpoints cannot be successfully created even after the task is in running state. We found that the implementation class of `SplitEnumerator` is `LazyInitializedCoordinatorContext`, however, the metricGroup() is initialized after calling lazyInitialize(). By reviewing the code, we found that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() has not been called yet, so NPE is thrown. Q: Why does this bug prevent the task from creating the Checkpoint? `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the member variable `enumerator` in `SourceCoordinator` being null. Unfortunately, all Checkpoint-related calls in `SourceCoordinator` are called via `runInEventLoop()`. In `runInEventLoop()`, if the enumerator is null, it will return directly. Q: Why this bug doesn't trigger a task failover? In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if `internalCoordinator.resetToCheckpoint` throws an exception, then it will catch the exception and call `cleanAndFailJob ` to try to fail the job. However, `globalFailureHandler` is also initialized in `lazyInitialize()`, while `schedulerExecutor.execute` will ignore the NPE triggered by `globalFailureHandler.handleGlobalFailure(e)`. Thus it appears that the task did not failover. !image-2023-08-04-18-28-05-897.png|width=963,height=443! > Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE > -------------------------------------------------------------------------- > > Key: FLINK-32754 > URL: https://issues.apache.org/jira/browse/FLINK-32754 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.17.0, 1.17.1 > Reporter: Yu Chen > Priority: Major > Attachments: image-2023-08-04-18-28-05-897.png > > > We registered some metrics in the `enumerator` of the flip-27 source via > `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in > JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null. > {*}Meanwhile, the task does not experience failover, and the Checkpoints > cannot be successfully created even after the task is in running state{*}. > We found that the implementation class of `SplitEnumerator` is > `LazyInitializedCoordinatorContext`, however, the metricGroup() is > initialized after calling lazyInitialize(). By reviewing the code, we found > that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() > has not been called yet, so NPE is thrown. > *Q: Why does this bug prevent the task from creating the Checkpoint?* > `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the > member variable `enumerator` in `SourceCoordinator` being null. > Unfortunately, all Checkpoint-related calls in `SourceCoordinator` are called > via `runInEventLoop()`. > In `runInEventLoop()`, if the enumerator is null, it will return directly. > *Q: Why this bug doesn't trigger a task failover?* > In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if > `internalCoordinator.resetToCheckpoint` throws an exception, then it will > catch the exception and call `cleanAndFailJob ` to try to fail the job. > However, `globalFailureHandler` is also initialized in `lazyInitialize()`, > while `schedulerExecutor.execute` will ignore the NPE triggered by > `globalFailureHandler.handleGlobalFailure(e)`. > Thus it appears that the task did not failover. > !image-2023-08-04-18-28-05-897.png|width=963,height=443! -- This message was sent by Atlassian Jira (v8.20.10#820010)