SkylerLin created FLINK-40052:
---------------------------------
Summary: NullPointerException in
MySqlHybridSplitAssigner.createBinlogSplit when snapshot split reader fails
after assigner status has reached FINISHED
Key: FLINK-40052
URL: https://issues.apache.org/jira/browse/FLINK-40052
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: 3.0.0
Environment: Flink CDC 3.6
Reporter: SkylerLin
Fix For: 3.0.0
h3. Problem
A MySQL CDC pipeline job (snapshot + binlog hybrid mode) fails with a repeated
NullPointerException and cannot recover even after restart. The error occurs in
{{MySqlHybridSplitAssigner.createBinlogSplit()}} at line 220.
The exception from the JobManager log:
{code:java}
java.lang.NullPointerException: null
at
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.createBinlogSplit(MySqlHybridSplitAssigner.java:220)
at
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:124)
at
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:226)
at
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:118)
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:634)
{code}
The job keeps failing and restarting (50 attempts configured), each time
hitting the same NPE immediately after restoring from the last checkpoint.
h3. Root Cause
The bug is triggered by the following sequence when {{{}parallelism = 1{}}}:
# *All snapshot splits finish.* When {{{}currentParallelism == 1{}}},
{{onFinishedSplits()}} immediately advances the assigner status from
{{INITIAL_ASSIGNING}} to {{{}INITIAL_ASSIGNING_FINISHED{}}}, skipping the
checkpoint completion wait.
# *A snapshot split reader fails.* Flink calls {{addSplits()}} to return the
failed split. {{MySqlSnapshotSplitAssigner.addSplits()}} removes the split from
{{assignedSplits}} and {{{}splitFinishedOffsets{}}}, and puts it back into
{{{}remainingSplits{}}}. However, it does *not* roll back {{assignerStatus}} —
the status remains {{{}INITIAL_ASSIGNING_FINISHED{}}}.
# *The split is re-assigned.* {{getNext()}} picks up the split from
{{{}remainingSplits{}}}, puts it back into {{{}assignedSplits{}}}, but
{{splitFinishedOffsets}} has no entry for it yet. At this point
{{assignedSplits}} and {{splitFinishedOffsets}} are inconsistent.
# *Binlog split creation is triggered prematurely.* Once {{remainingSplits}}
is empty again, {{noMoreSplits()}} returns true.
{{MySqlHybridSplitAssigner.getNext()}} checks
{{isInitialAssigningFinished(assignerStatus)}} — which is still true (never
rolled back) — and calls {{{}createBinlogSplit(){}}}.
# *NPE.* {{createBinlogSplit()}} iterates {{assignedSplits}} and calls
{{splitFinishedOffsets.get(split.splitId())}} for each entry. The re-assigned
split has no entry in {{{}splitFinishedOffsets{}}}, so this returns
{{{}null{}}}. The subsequent call {{binlogOffset.isBefore(minBinlogOffset)}}
throws NullPointerException.
The inconsistent state ({{{}assignedSplits{}}} has an entry with no
corresponding entry in {{{}splitFinishedOffsets{}}}) gets persisted into the
next checkpoint. Restarting from this checkpoint reproduces the NPE every time,
making the job unrecoverable.
h3. Proposed Fix
Roll back the assigner status in {{MySqlSnapshotSplitAssigner.addSplits()}}
when failed splits are re-added:
{code:java}
if (!splits.isEmpty()) {
if (assignerStatus == AssignerStatus.INITIAL_ASSIGNING_FINISHED) {
assignerStatus = AssignerStatus.INITIAL_ASSIGNING;
checkpointIdToFinish = null;
} else if (assignerStatus ==
AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED) {
assignerStatus = AssignerStatus.NEWLY_ADDED_ASSIGNING;
checkpointIdToFinish = null;
}
}
{code}
This ensures:
* The status returns to the assigning state, so
{{isInitialAssigningFinished()}} returns false and {{createBinlogSplit()}} is
not called prematurely.
* The status can only advance to FINISHED again after all re-added splits are
finished and {{splitFinishedOffsets}} is properly populated (via
{{onFinishedSplits()}} → {{allSnapshotSplitsFinished()}} → {{{}onFinish(){}}}).
* {{checkpointIdToFinish}} is reset to null so that
{{notifyCheckpointComplete()}} does not advance the status based on a stale
checkpoint ID.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)