Hi Xiangyu/Dev,
Did any one has solution handle below important note in StreamingFileSink:
Caused by: java.io.IOException: Cannot clean commit: Staging file does not
exist.
at org.apache.flink.runtime.fs.hdfs.
HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit(
HadoopRecoverableFsDataOutputStream.java:250)
Important Note 3: Flink and the StreamingFileSink never overwrites
committed data. Given this, when trying to restore from an old
checkpoint/savepoint which assumes an in-progress file which was committed
by subsequent successful checkpoints, *Flink will refuse to resume and it
will throw an exception as it cannot locate the in-progress file*.
Currently i am facing same issue in the PROD code.
Regards,
Nagireddy Y.
On Fri, Aug 4, 2023 at 12:11 PM xiangyu feng <[email protected]> wrote:
> Hi ynagireddy4u,
>
> From the exception info, I think your application has met a HDFS file
> issue during the commit phase of checkpoint. Can u check why 'Staging file
> does not exist' in the first place?
>
> Regards,
> Xiangyu
>
> Y SREEKARA BHARGAVA REDDY <[email protected]> 于2023年8月4日周五 12:21写道:
>
>> Hi Xiangyu/Dev Team,
>>
>> Thanks for reply.
>>
>> In our flink job, we increase the *checkpoint timeout to 30 min.*
>> And the *checkpoint interval is 10 min.*
>>
>> But while running the job we got below exception.
>>
>> java.lang.RuntimeException: Error while confirming checkpoint
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .notifyCheckpointComplete(StreamTask.java:952)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924)
>> at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(
>> FunctionUtils.java:125)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at org.apache.flink.streaming.runtime.tasks.
>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
>> StreamTaskActionExecutor.java:87)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
>> .java:78)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .processMail(MailboxProcessor.java:261)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:186)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:487)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:470)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot clean commit: Staging file does
>> not exist.
>> at org.apache.flink.runtime.fs.hdfs.
>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit(
>> HadoopRecoverableFsDataOutputStream.java:250)
>> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
>> .onSuccessfulCompletionOfCheckpoint(Bucket.java:300)
>> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
>> .commitUpToCheckpoint(Buckets.java:216)
>> at org.apache.flink.streaming.api.functions.sink.filesystem.
>> StreamingFileSink.notifyCheckpointComplete(StreamingFileSink.java:415)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
>> .notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .lambda$notifyCheckpointComplete$8(StreamTask.java:936)
>> at org.apache.flink.streaming.runtime.tasks.
>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
>> StreamTaskActionExecutor.java:101)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .notifyCheckpointComplete(StreamTask.java:930)
>> ... 12 more
>>
>> It would be great, if you have any workaround for that.
>>
>> Regards,
>> Nagireddy Y.
>>
>>
>>
>>
>>
>>
>> On Thu, Aug 3, 2023 at 7:24 AM xiangyu feng <[email protected]> wrote:
>>
>>> Hi ynagireddy4u,
>>>
>>> We have met this exception before. Usually it is caused by following
>>> reasons:
>>>
>>> 1), TaskManager is too busy with other works to send the heartbeat to
>>> JobMaster or TaskManager process might already exited;
>>> 2), There might be a network issues between this TaskManager and
>>> JobMaster;
>>> 3), In certain cases, JobMaster actor might also being too busy to
>>> process the RPC requests from TaskManager;
>>>
>>> Pls check if your problem fits the above situations.
>>>
>>> Best,
>>> Xiangyu
>>>
>>>
>>> Y SREEKARA BHARGAVA REDDY <[email protected]> 于2023年7月31日周一
>>> 20:49写道:
>>>
>>>> Hi Team,
>>>>
>>>> Did any one face the below exception.
>>>> If yes, please share the resolution.
>>>>
>>>>
>>>> 2023-07-28 22:04:16
>>>> j*ava.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
>>>> container_e19_1690528962823_0382_01_000005 timed out.*
>>>> at org.apache.flink.runtime.jobmaster.
>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
>>>> .java:1147)
>>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>> HeartbeatMonitorImpl.java:109)
>>>> at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
>>>> 511)
>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>> AkkaRpcActor.java:397)
>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>>>> AkkaRpcActor.java:190)
>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>> AkkaRpcActor.java:152)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at akka.japi.pf
>>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>> at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
>>>> .java:1339)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
>>>> .java:107)
>>>>
>>>> Any suggestions, please share with me.
>>>>
>>>> Regards,
>>>> Nagireddy Y
>>>>
>>>