[ 
https://issues.apache.org/jira/browse/FLINK-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8638:
-------------------------------

    Assignee: vinoyang

> Job restart when Checkpoint On Barrier failed
> ---------------------------------------------
>
>                 Key: FLINK-8638
>                 URL: https://issues.apache.org/jira/browse/FLINK-8638
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Ran Tao
>            Assignee: vinoyang
>            Priority: Major
>
> The following example comes from the one snapshotState process by using hdfs, 
> snapshotState failed due to hdfs disk problems, so that 
> triggerCheckpointOnBarrier fails and throws an exception to make the 
> application restart. However, when restarting, flink needs to recover from 
> the recent completed checkpoint and start chasing the data, which can lead to 
> significant delays. We think that when StreamTask's 
> triggerCheckpointOnBarrier (including the triggerCheckpoint at source) fails, 
> the application should not restart but instead continue running and mark the 
> checkpoint failed. Finally, notify the JobManager this checkpoint
>  failed. By adding Checkpoint failure alarm let developers or users know this 
> situation, and take the appropriate action. During this time, the flink job 
> always keeps running.
>  
> {code:java}
> java.lang.Exception: Could not perform checkpoint 45843 for operator 
> TriggerWindow(TumblingEventTimeWindows(60000), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:552)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:378)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:281)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:183)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:213)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not complete snapshot 45843 for 
> operator TriggerWindow(TumblingEventTimeWindows(60000), 
> ReducingStateDescriptor{serializer=com.op.router.streaming.Launcher$$anon$49$$anon$13@af3ff050,
>  
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@26f7cdf8},
>  EventTimeTrigger(), WindowedStream.reduce(WindowedStream.java:300)) -> Map 
> (153/459).
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:407)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:543)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://hadoop/checkpoint/flink/2406585590309979134/c8ac238c751d7b7b3e3b498bc396550f
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:105)
> at 
> org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream.closeAndGetHandle(KeyedStateCheckpointOutputStream.java:30)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.closeAndUnregisterStreamToObtainStateHandle(StateSnapshotContextSynchronousImpl.java:131)
> at 
> org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:113)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:387)
> ... 13 more
> Caused by: java.io.IOException: All datanodes DatanodeInfoWithStorage are 
> bad. Aborting...
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1109)
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:871)
> at 
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:401){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to