[
https://issues.apache.org/jira/browse/FLINK-28758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616946#comment-17616946
]
Yanfei Lei commented on FLINK-28758:
------------------------------------
Hi [~hjw] , from the log, this is because the source failed when you posted the
savepoint request.
And "It is successful to use savepoint command alone." is because the job is
running when you send the request. I think this is expected behavior, the
implementation of stop-with-savepoint behind REST and CLI is the same.
> Failed to stop with savepoint
> ------------------------------
>
> Key: FLINK-28758
> URL: https://issues.apache.org/jira/browse/FLINK-28758
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Runtime / Checkpointing, Runtime /
> Task
> Affects Versions: 1.15.0
> Environment: Flink version:1.15.0
> deploy mode :K8s applicaiton Mode. local mini cluster also have this
> problem.
> Kafka Connector : use Kafka SourceFunction . No new Api.
> Reporter: hjw
> Priority: Major
>
> I post a save with savepoint request to Flink Job throught rest api.
> A Error happened in Kafka connector close.
> The job will enter restarting .
> It is successful to use savepoint command alone.
> {code:java}
> 13:33:42.857 [Kafka Fetcher for Source: nlp-kafka-source -> nlp-clean
> (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
> clientId=consumer-hjw-3, groupId=hjw] Kafka consumer has been closed
> 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean
> (1/1)#0] INFO org.apache.kafka.common.utils.AppInfoParser - App info
> kafka.consumer for consumer-hjw-4 unregistered
> 13:33:42.857 [Kafka Fetcher for Source: cpp-kafka-source -> cpp-clean
> (1/1)#0] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer
> clientId=consumer-hjw-4, groupId=hjw] Kafka consumer has been closed
> 13:33:42.860 [Source: nlp-kafka-source -> nlp-clean (1/1)#0] DEBUG
> org.apache.flink.streaming.runtime.tasks.StreamTask - Cleanup StreamTask
> (operators closed: false, cancelled: false)
> 13:33:42.860 [jobmanager-io-thread-4] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
> checkpoint 5 by task eeefcb27475446241861ad8db3f33144 of job
> d6fed247feab1c0bcc1b0dcc2cfb4736 at 79edfa88-ccc3-4140-b0e1-7ce8a7f8669f @
> 127.0.0.1 (dataPort=-1).
> org.apache.flink.util.SerializedThrowable: Task name with subtask : Source:
> nlp-kafka-source -> nlp-clean (1/1)#0 Failure reason: Task has failed.
> at
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
> at
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
> Caused by: org.apache.flink.util.SerializedThrowable:
> org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ... 3 common frames omitted
> Caused by: org.apache.flink.util.SerializedThrowable: null
> at
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
> at
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> 13:34:00.925 [flink-akka.actor.default-dispatcher-21] DEBUG
> org.apache.flink.runtime.jobmaster.JobMaster - Trigger heartbeat request.
> 13:34:00.926 [flink-akka.actor.default-dispatcher-22] DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Trigger
> heartbeat request.
> 13:34:00.926 [flink-akka.actor.default-dispatcher-21] DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor - Received heartbeat
> request from 85d44174e17281984d28699c42e3eed6.
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)