yuxiqian opened a new pull request, #3469:
URL: https://github.com/apache/flink-cdc/pull/3469

   This closes FLINK-35813.
   
   Currently, transform schema operator clears its state field after its 
`finish` method was called. However, checkpoints / savepoints could be 
triggered between `finish` and `close`, and any following checkpoint requests 
in snapshotState method will cause an NPE.
   
   For example:
   
   ```yaml
   source:
     type: values
   sink:
     type: values
   transform:
      ...
   ```
   
   would cause this:
   
   ```
   Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could 
not complete snapshot 1 for operator Source: Flink CDC Event Source: values -> 
Transform:Schema (1/1)#0. Failure reason: Checkpoint was declined.
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:281)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:185)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:347)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:720)
        at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:352)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1242)
        ... 14 more
   Caused by: java.lang.NullPointerException
        at 
org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator.snapshotState(TransformSchemaOperator.java:137)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:234)
        ... 25 more
   ```
   
   By postponing clearing `state` to `close()` should resolve this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to