Ran Tao created FLINK-8638:
------------------------------
Summary: Job restart when Checkpoint On Barrier failed
Key: FLINK-8638
URL: https://issues.apache.org/jira/browse/FLINK-8638
Project: Flink
Issue Type: Bug
Components: Streaming
Affects Versions: 1.3.2, 1.4.0
Reporter: Ran Tao
The following example comes from the one snapshotState process by using hdfs,
snapshotState failed due to hdfs disk problems, so that
triggerCheckpointOnBarrier fails and throws an exception to make the
application restart. However, when restarting, flink needs to recover from the
recent completed checkpoint and start chasing the data, which can lead to
significant delays. We think that when StreamTask's triggerCheckpointOnBarrier
(including the triggerCheckpoint at source) fails, the application should not
restart but instead continue running and mark the checkpoint failed. Finally,
notify the JobManager this checkpoint
failed. By adding Checkpoint failure alarm let developers or users know this
situation, and take the appropriate action. During this time, the flink job
always keeps running.
{code:java}
java.lang.Exception: Could not perform checkpoint 45843 for operator
TriggerWindow(TumblingEventTimeWindows(60000),
ReducingStateDescriptor{serializer=com.didi.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map
(153/459).
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not complete snapshot 45843 for operator
TriggerWindow(TumblingEventTimeWindows(60000),
ReducingStateDescriptor{serializer=com.didi.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map
(153/459).
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
... 8 more
Caused by: java.io.IOException: Could not flush and close the file system
output stream to
hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c750d7b7b3e3b498bc396570f/chk-45843/5905b7a2-fc8c-4500-898a-7b87fa5470ee
in order to obtain the stream state handle
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
at
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
at
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
at
org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
... 13 more
Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are bad.
Aborting...
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401){code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)