[ 
https://issues.apache.org/jira/browse/FLINK-36340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883234#comment-17883234
 ] 

chenyuzhi edited comment on FLINK-36340 at 9/20/24 12:23 PM:
-------------------------------------------------------------

I think the error is caused by 
https://issues.apache.org/jira/browse/FLINK-28758, And I think the lates 
version of operator still have the same problem.

Should operator  use cancelWithSavepoint api when stopping v15/v16 flink 
application?

[~gyfora]


was (Author: stupid_pig):
I think the error is caused by 
https://issues.apache.org/jira/browse/FLINK-28758, And I think the lates 
version of operator still have the same problem.

I think operator should  use cancelWithSavepoint api when stopping v15/v16 
flink application.  If the community thinks it's okay to do so, I think I can 
provide a patch

> Can not stop with savepoint
> ---------------------------
>
>                 Key: FLINK-36340
>                 URL: https://issues.apache.org/jira/browse/FLINK-36340
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: 1.10.0
>         Environment: flink version: 1.15.2/1.16.0
> operator version: 1.6.0 
> config: 
> h5. kubernetes.operator.job.savepoint-on-deletion=true
>            Reporter: chenyuzhi
>            Priority: Major
>
> When stopping flink application  running with kafkaConsumer on k8s operator,  
>  the jobmanager occurs error:
> {code:java}
> 2024-09-20 17:49:49,856 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Triggering stop-with-savepoint for job 
> f4d029099ad6e6249aebdcc6b4541cc9.
> 2024-09-20 17:49:49,864 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 97 (type=SavepointType{name='Suspend Savepoint', 
> postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1726825789860 for job 
> f4d029099ad6e6249aebdcc6b4541cc9.
> 2024-09-20 17:49:49,916 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Decline 
> checkpoint 97 by task ac38e2c33036a3ceea880f4c1754aa3e of job 
> f4d029099ad6e6249aebdcc6b4541cc9 at test-k8s-delete-15-mogra2-taskmanager-1-1 
> @ 7.48.188.31 (dataPort=35957).
> org.apache.flink.util.SerializedThrowable: Task name with subtask : Source: 
> Custom Source -> Flat Map (1/1)#0 Failure reason: Task has failed.
>       at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_202]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
> Caused by: org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
> ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ~[?:1.8.0_202]
>       ... 3 more
> Caused by: org.apache.flink.util.SerializedThrowable
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
> 2024-09-20 17:49:49,929 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: 
> Custom Source -> Flat Map (1/1) (ac38e2c33036a3ceea880f4c1754aa3e) switched 
> from RUNNING to FAILED on test-k8s-delete-15-mogra2-taskmanager-1-1 @ 
> 7.48.188.31 (dataPort=35957).
> org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException:
>  null
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]
> 2024-09-20 17:49:49,932 WARN  
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to 
> trigger or complete checkpoint 97 for job f4d029099ad6e6249aebdcc6b4541cc9. 
> (0 consecutive failed attempts so far)
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
>       at 
> org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_202]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_202]
>       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
> Caused by: org.apache.flink.util.SerializedThrowable: Task name with subtask 
> : Source: Custom Source -> Flat Map (1/1)#0 Failure reason: Task has failed.
>       at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_202]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
> Caused by: org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
> ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_202]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
> Caused by: org.apache.flink.util.SerializedThrowable
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       ... 1 more
> 2024-09-20 17:49:49,936 INFO  
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> 90155cf7d25cd7b8435ec43e305342bf_0.
> 2024-09-20 17:49:49,938 INFO  
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - 2 tasks should be restarted to recover the failed task 
> 90155cf7d25cd7b8435ec43e305342bf_0. 
> 2024-09-20 17:49:49,940 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job fds 
> (f4d029099ad6e6249aebdcc6b4541cc9) switched from state RUNNING to RESTARTING.
> 2024-09-20 17:49:50,012 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed 
> Reduce -> Map -> Sink: Unnamed (1/1) (8305e9e8cbe89f3488bbb9de8fc5f67f) 
> switched from RUNNING to CANCELING.
> 2024-09-20 17:49:50,019 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Trying to recover from a global failure.
> org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
>       at 
> org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_202]
>       at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]
> Caused by: org.apache.flink.util.SerializedThrowable: Task name with subtask 
> : Source: Custom Source -> Flat Map (1/1)#0 Failure reason: Task has failed.
>       at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
> ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_202]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
> Caused by: org.apache.flink.util.SerializedThrowable: 
> org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
> ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  ~[?:1.8.0_202]
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  ~[?:1.8.0_202]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
> Caused by: org.apache.flink.util.SerializedThrowable
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>  ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
> ~[flink-dist-1.15.2-GDC1.0.1.jar:1.15.2-GDC1.0.1]
>       ... 1 more
> 2024-09-20 17:49:50,022 INFO  
> org.apache.flink.runtime.executiongraph.failover.flip1.FailureRateRestartBackoffTimeStrategy
>  [] - Skipping counting failure caused by class 
> org.apache.flink.runtime.checkpoint.CheckpointException as the strategy is 
> backing off
> 2024-09-20 17:49:50,025 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Keyed 
> Reduce -> Map -> Sink: Unnamed (1/1) (8305e9e8cbe89f3488bbb9de8fc5f67f) 
> switched from CANCELING to CANCELED.
> 2024-09-20 17:49:50,028 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Clearing resource requirements of job f4d029099ad6e6249aebdcc6b4541cc9 
> {code}
> Expect:  success to stop with savepoint



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to