[
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ran Tao closed FLINK-8638.
--------------------------
Resolution: Duplicate
> Job restart when Checkpoint On Barrier failed
> ---------------------------------------------
>
> Key: FLINK-8638
> URL: https://issues.apache.org/jira/browse/FLINK-8638
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.4.0, 1.3.2
> Reporter: Ran Tao
> Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs,
> snapshotState failed due to hdfs disk problems, so that
> triggerCheckpointOnBarrier fails and throws an exception to make the
> application restart. However, when restarting, flink needs to recover from
> the recent completed checkpoint and start chasing the data, which can lead to
> significant delays. We think that when StreamTask's
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails,
> the application should not restart but instead continue running and mark the
> checkpoint failed. Finally, notify the JobManager this checkpoint
> failed. By adding Checkpoint failure alarm let developers or users know this
> situation, and take the appropriate action. During this time, the flink job
> always keeps running.
>
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator
> TriggerWindow(TumblingEventTimeWindows(60000),
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
> EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map
> (153/459).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for
> operator TriggerWindow(TumblingEventTimeWindows(60000),
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
> EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map
> (153/459).
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
> at
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
> ... 13 more
> Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are
> bad. Aborting...
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401){code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)