SkylerLin created FLINK-40052:
---------------------------------

             Summary: NullPointerException in 
MySqlHybridSplitAssigner.createBinlogSplit when snapshot split reader fails 
after assigner status has reached FINISHED
                 Key: FLINK-40052
                 URL: https://issues.apache.org/jira/browse/FLINK-40052
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: 3.0.0
         Environment: Flink CDC 3.6
            Reporter: SkylerLin
             Fix For: 3.0.0


h3. Problem

A MySQL CDC pipeline job (snapshot + binlog hybrid mode) fails with a repeated 
NullPointerException and cannot recover even after restart. The error occurs in 
{{MySqlHybridSplitAssigner.createBinlogSplit()}} at line 220.

The exception from the JobManager log:
{code:java}
java.lang.NullPointerException: null
    at 
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.createBinlogSplit(MySqlHybridSplitAssigner.java:220)
    at 
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:124)
    at 
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:226)
    at 
org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:118)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:634)
{code}
The job keeps failing and restarting (50 attempts configured), each time 
hitting the same NPE immediately after restoring from the last checkpoint.
h3. Root Cause

The bug is triggered by the following sequence when {{{}parallelism = 1{}}}:
 # *All snapshot splits finish.* When {{{}currentParallelism == 1{}}}, 
{{onFinishedSplits()}} immediately advances the assigner status from 
{{INITIAL_ASSIGNING}} to {{{}INITIAL_ASSIGNING_FINISHED{}}}, skipping the 
checkpoint completion wait.
 # *A snapshot split reader fails.* Flink calls {{addSplits()}} to return the 
failed split. {{MySqlSnapshotSplitAssigner.addSplits()}} removes the split from 
{{assignedSplits}} and {{{}splitFinishedOffsets{}}}, and puts it back into 
{{{}remainingSplits{}}}. However, it does *not* roll back {{assignerStatus}} — 
the status remains {{{}INITIAL_ASSIGNING_FINISHED{}}}.
 # *The split is re-assigned.* {{getNext()}} picks up the split from 
{{{}remainingSplits{}}}, puts it back into {{{}assignedSplits{}}}, but 
{{splitFinishedOffsets}} has no entry for it yet. At this point 
{{assignedSplits}} and {{splitFinishedOffsets}} are inconsistent.
 # *Binlog split creation is triggered prematurely.* Once {{remainingSplits}} 
is empty again, {{noMoreSplits()}} returns true. 
{{MySqlHybridSplitAssigner.getNext()}} checks 
{{isInitialAssigningFinished(assignerStatus)}} — which is still true (never 
rolled back) — and calls {{{}createBinlogSplit(){}}}.
 # *NPE.* {{createBinlogSplit()}} iterates {{assignedSplits}} and calls 
{{splitFinishedOffsets.get(split.splitId())}} for each entry. The re-assigned 
split has no entry in {{{}splitFinishedOffsets{}}}, so this returns 
{{{}null{}}}. The subsequent call {{binlogOffset.isBefore(minBinlogOffset)}} 
throws NullPointerException.

The inconsistent state ({{{}assignedSplits{}}} has an entry with no 
corresponding entry in {{{}splitFinishedOffsets{}}}) gets persisted into the 
next checkpoint. Restarting from this checkpoint reproduces the NPE every time, 
making the job unrecoverable.
h3. Proposed Fix

Roll back the assigner status in {{MySqlSnapshotSplitAssigner.addSplits()}} 
when failed splits are re-added:
{code:java}
if (!splits.isEmpty()) {
    if (assignerStatus == AssignerStatus.INITIAL_ASSIGNING_FINISHED) {
        assignerStatus = AssignerStatus.INITIAL_ASSIGNING;
        checkpointIdToFinish = null;
    } else if (assignerStatus == 
AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED) {
        assignerStatus = AssignerStatus.NEWLY_ADDED_ASSIGNING;
        checkpointIdToFinish = null;
    }
}
{code}
This ensures:
 * The status returns to the assigning state, so 
{{isInitialAssigningFinished()}} returns false and {{createBinlogSplit()}} is 
not called prematurely.
 * The status can only advance to FINISHED again after all re-added splits are 
finished and {{splitFinishedOffsets}} is properly populated (via 
{{onFinishedSplits()}} → {{allSnapshotSplitsFinished()}} → {{{}onFinish(){}}}).
 * {{checkpointIdToFinish}} is reset to null so that 
{{notifyCheckpointComplete()}} does not advance the status based on a stale 
checkpoint ID.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to