[
https://issues.apache.org/jira/browse/FLINK-3466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15378350#comment-15378350
]
ASF GitHub Bot commented on FLINK-3466:
---------------------------------------
GitHub user StephanEwen opened a pull request:
https://github.com/apache/flink/pull/2252
[FLINK-3466] [runtime] Cancel state handled on state restore
This pull request fixes the issue that state restore operations can get
stuck when tasks are cancelled during state restore. That happens due to a bug
in HDFS, which deadlocks (or livelocks) when the reading thread is interrupted.
This introduces two things:
1. All state handles and key/value snapshots are now `Closable`. This
does not delete any checkpoint data, but simply closes pending streams and data
fetch handles. Operations concurrently accessing the state handles state should
fail.
2. The `StreamTask` holds a set of "Closables" that it closes upon
cancellation. This is a cleaner way of stopping in-progress work than relying
on "interrupt()" to interrupt that work.
This mechanism should eventually be extended to also cancel operators and
state handles pending asynchronous materialization.
There is a test that has an interrupt sensitive state handle (mimicking
HDFS's deadlock behavior) that causes a stall without this pull request and
cleanly finishes with the changes in this pull request.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StephanEwen/incubator-flink
state_handle_cancellation
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2252.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2252
----
commit 224503b86c2864f604a7c519ea5f415c57f35ff3
Author: Stephan Ewen <[email protected]>
Date: 2016-07-14T13:14:12Z
[FLINK-3466] [tests] Add serialization validation for state handles
commit c411b379381ab1390e2166356232a33165c1abd9
Author: Stephan Ewen <[email protected]>
Date: 2016-07-13T19:32:40Z
[FLINK-3466] [runtime] Make state handles cancelable.
State handles are cancelable, to make sure long running checkpoint restore
operations do
finish early on cancallation, even if the code does not properly react to
interrupts.
This is especially important since HDFS client code is so buggy that it
deadlocks when
interrupted without closing.
----
> Job might get stuck in restoreState() from HDFS due to interrupt
> ----------------------------------------------------------------
>
> Key: FLINK-3466
> URL: https://issues.apache.org/jira/browse/FLINK-3466
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.0.0, 0.10.2
> Reporter: Robert Metzger
> Assignee: Stephan Ewen
>
> A user reported the following issue with a failing job:
> {code}
> 10:46:09,223 WARN org.apache.flink.runtime.taskmanager.Task
> - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck
> in method:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
> java.io.DataInputStream.read(DataInputStream.java:149)
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
> java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:745)
> {code}
> and
> {code}
> 10:46:09,223 WARN org.apache.flink.runtime.taskmanager.Task
> - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck
> in method:
> java.lang.Throwable.fillInStackTrace(Native Method)
> java.lang.Throwable.fillInStackTrace(Throwable.java:783)
> java.lang.Throwable.<init>(Throwable.java:250)
> java.lang.Exception.<init>(Exception.java:54)
> java.lang.InterruptedException.<init>(InterruptedException.java:57)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038)
> org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
> java.io.DataInputStream.read(DataInputStream.java:149)
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
> java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:745)
> {code}
> The issue is most likely that the HDFS client gets stuck in the
> "org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read()" call when it
> receives an interrupt.
> By putting the call into a separate thread, the TaskInterrupt would not break
> the HadoopReadThread.
> The HadoopReadThread would stop eventually with an error or after the read
> operation has finished.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)