[
https://issues.apache.org/jira/browse/FLINK-28206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17557762#comment-17557762
]
Chenya Zhang commented on FLINK-28206:
--------------------------------------
+1 that we are seeing similar exceptions today when running with Flink 1.14 to
read/deserialize from checkpoint states:
{code:java}
org.apache.flink.util.FlinkRuntimeException: Unexpected list element
deserialization failure at
org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeNextElement(ListDelimitedSerializer.java:89)
at
org.apache.flink.runtime.state.ListDelimitedSerializer.deserializeList(ListDelimitedSerializer.java:51)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:120)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
at
org.apache.flink.runtime.state.metrics.LatencyTrackingListState.get(LatencyTrackingListState.java:63)
at
org.apache.flink.runtime.state.metrics.LatencyTrackingListState.get(LatencyTrackingListState.java:34)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:475)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:603)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239)
at
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
at
org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at
java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.EOFException at
org.apache.flink.core.memory.DataInputDeserializer.readFully(DataInputDeserializer.java:172)
at
org.apache.flink.formats.avro.utils.DataInputDecoder.readBytes(DataInputDecoder.java:95)
at org.apache.avro.io.ResolvingDecoder.readBytes(ResolvingDecoder.java:243) at
org.apache.avro.generic.GenericDatumReader.readBytes(GenericDatumReader.java:543)
at
org.apache.avro.generic.GenericDatumReader.readBytes(GenericDatumReader.java:534)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:193)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at
... 25 more{code}
> EOFException on Checkpoint Recovery
> -----------------------------------
>
> Key: FLINK-28206
> URL: https://issues.apache.org/jira/browse/FLINK-28206
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.4
> Reporter: uharaqo
> Priority: Critical
>
>
> We have only one Job Manager in Kubernetes and it suddenly got killed without
> any logs. A new Job Manager process could not recover from a checkpoint due
> to an EOFException.
> Task Managers killed themselves since they could not find any Job Manager.
> There were no error logs other than that on the Task Manager side.
> It looks to me that the checkpoint is corrupted. Is there a way to identify
> the cause? What would you recommend us to do to mitigate this problem?
> Here's the logs during the recovery phase. (Removed the stacktrace. Please
> find that at the bottom.)
> {noformat}
> {"timestamp":"2022-06-22T17:21:25.870Z","level":"INFO","logger":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","message":"Recovering
> checkpoints from
> KubernetesStateHandleStore{configMapName='univex-flink-record-collector-46071c6a64e47d1ce828dfe032f943a6-jobmanager-leader'}."}
> {"timestamp":"2022-06-22T17:21:25.875Z","level":"INFO","logger":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","message":"Found
> 1 checkpoints in
> KubernetesStateHandleStore{configMapName='univex-flink-record-collector-46071c6a64e47d1ce828dfe032f943a6-jobmanager-leader'}."}
> {"timestamp":"2022-06-22T17:21:25.876Z","level":"INFO","logger":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","message":"Trying
> to fetch 1 checkpoints from storage."}
> {"timestamp":"2022-06-22T17:21:25.876Z","level":"INFO","logger":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils","message":"Trying
> to retrieve checkpoint 58130."}
> {"timestamp":"2022-06-22T17:21:25.901Z","level":"ERROR","logger":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","message":"Fatal
> error occurred in the cluster
> entrypoint.","level":"INFO","logger":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","message":"Shutting
> StandaloneSessionClusterEntrypoint down with application status UNKNOWN.
> Diagnostics Cluster entrypoint has been closed externally.."}
> {"timestamp":"2022-06-22T17:21:25.921Z","level":"INFO","logger":"org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint","message":"Shutting
> down rest endpoint."}
> {"timestamp":"2022-06-22T17:21:25.922Z","level":"INFO","logger":"org.apache.flink.runtime.blob.BlobServer","message":"Stopped
> BLOB server at 0.0.0.0:6124"}
> {noformat}
> The stacktrace of the ERROR:
> {noformat}
> org.apache.flink.util.FlinkException: JobMaster for job
> 46071c6a64e47d1ce828dfe032f943a6 failed.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:913)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:473)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:450)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:427)
> at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
> at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
> not start the JobMaster.
> at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.util.concurrent.CompletionException:
> java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Failed to initialize
> high-availability completed checkpoint store
> at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
> at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
> ... 3 more
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Failed to initialize
> high-availability completed checkpoint store
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
> ... 3 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to
> initialize high-availability completed checkpoint store
> at
> org.apache.flink.runtime.scheduler.SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(SchedulerUtils.java:57)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:180)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:134)
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:346)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:323)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> ... 4 more
> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
> checkpoint 58130 from state handle under checkpointID-0000000000000058130.
> This indicates that the retrieved state handle is broken. Try cleaning the
> state handle store.
> at
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStoreUtils.java:111)
> at
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(DefaultCompletedCheckpointStoreUtils.java:89)
> at
> org.apache.flink.kubernetes.utils.KubernetesUtils.createCompletedCheckpointStore(KubernetesUtils.java:314)
> at
> org.apache.flink.kubernetes.highavailability.KubernetesCheckpointRecoveryFactory.createRecoveredCompletedCheckpointStore(KubernetesCheckpointRecoveryFactory.java:78)
> at
> org.apache.flink.runtime.scheduler.SchedulerUtils.createCompletedCheckpointStore(SchedulerUtils.java:91)
> at
> org.apache.flink.runtime.scheduler.SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(SchedulerUtils.java:54)
> ... 13 more
> Caused by: java.io.EOFException
> at
> java.base/java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2872)
> at
> java.base/java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3367)
> at
> java.base/java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:936)
> at java.base/java.io.ObjectInputStream.<init>(ObjectInputStream.java:379)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:68)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:612)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:595)
> at
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:59)
> at
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStoreUtils.java:102)
> ... 18 more
> {noformat}
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)