[ https://issues.apache.org/jira/browse/FLINK-3466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15375397#comment-15375397 ]
Stephan Ewen commented on FLINK-3466: ------------------------------------- Here is a Unit test that minimally reproduces getting stuck in interrupt sensitive state handles (like those reading from HDFS) {code} public class InterruptSensitiveRestoreTest { private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch(); @Test public void testRestoreWithInterrupt() throws Exception { Configuration taskConfig = new Configuration(); StreamConfig cfg = new StreamConfig(taskConfig); cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(taskConfig, new InterruptLockingStateHandle()); Task task = createTask(tdd); // start the task and wait until it is in "restore" task.startTaskThread(); IN_RESTORE_LATCH.await(); // trigger cancellation and signal to continue task.cancelExecution(); task.getExecutingThread().join(30000); if (task.getExecutionState() == ExecutionState.CANCELING) { fail("Task is stuck and not canceling"); } assertEquals(ExecutionState.CANCELED, task.getExecutionState()); assertNull(task.getFailureCause()); } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ private static TaskDeploymentDescriptor createTaskDeploymentDescriptor( Configuration taskConfig, StateHandle<?> state) throws IOException { return new TaskDeploymentDescriptor( new JobID(), "test job name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue<>(new ExecutionConfig()), "test task name", 0, 1, 0, new Configuration(), taskConfig, SourceStreamTask.class.getName(), Collections.<ResultPartitionDeploymentDescriptor>emptyList(), Collections.<InputGateDeploymentDescriptor>emptyList(), Collections.<BlobKey>emptyList(), Collections.<URL>emptyList(), 0, new SerializedValue<StateHandle<?>>(state)); } private static Task createTask(TaskDeploymentDescriptor tdd) throws IOException { return new Task( tdd, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), mock(BroadcastVariableManager.class), mock(ActorGateway.class), mock(ActorGateway.class), new FiniteDuration(10, TimeUnit.SECONDS), new FallbackLibraryCacheManager(), new FileCache(new Configuration()), new TaskManagerRuntimeInfo( "localhost", new Configuration(), EnvironmentInformation.getTemporaryFileDirectory()), mock(TaskMetricGroup.class)); } @SuppressWarnings("serial") private static class InterruptLockingStateHandle extends StreamTaskStateList { public InterruptLockingStateHandle() throws Exception { super(new StreamTaskState[0]); } @Override public StreamTaskState[] getState(ClassLoader userCodeClassLoader) { IN_RESTORE_LATCH.trigger(); // this mimics what happens in the HDFS client code. // an interrupt on a waiting object leads to an infinite loop try { synchronized (this) { wait(); } } catch (InterruptedException e) { while (true) { try { synchronized (this) { wait(); } } catch (InterruptedException ignored) {} } } return super.getState(userCodeClassLoader); } } } {code} > Job might get stuck in restoreState() from HDFS due to interrupt > ---------------------------------------------------------------- > > Key: FLINK-3466 > URL: https://issues.apache.org/jira/browse/FLINK-3466 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Affects Versions: 1.0.0, 0.10.2 > Reporter: Robert Metzger > Assignee: Stephan Ewen > > A user reported the following issue with a failing job: > {code} > 10:46:09,223 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck > in method: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016) > org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717) > org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421) > org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332) > org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576) > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800) > org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848) > java.io.DataInputStream.read(DataInputStream.java:149) > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69) > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323) > java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) > java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55) > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52) > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162) > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > java.lang.Thread.run(Thread.java:745) > {code} > and > {code} > 10:46:09,223 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck > in method: > java.lang.Throwable.fillInStackTrace(Native Method) > java.lang.Throwable.fillInStackTrace(Throwable.java:783) > java.lang.Throwable.<init>(Throwable.java:250) > java.lang.Exception.<init>(Exception.java:54) > java.lang.InterruptedException.<init>(InterruptedException.java:57) > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038) > org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266) > org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016) > org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783) > org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717) > org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421) > org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332) > org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576) > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800) > org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848) > java.io.DataInputStream.read(DataInputStream.java:149) > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69) > java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310) > java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323) > java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794) > java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801) > java.io.ObjectInputStream.<init>(ObjectInputStream.java:299) > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55) > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52) > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162) > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440) > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > java.lang.Thread.run(Thread.java:745) > {code} > The issue is most likely that the HDFS client gets stuck in the > "org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read()" call when it > receives an interrupt. > By putting the call into a separate thread, the TaskInterrupt would not break > the HadoopReadThread. > The HadoopReadThread would stop eventually with an error or after the read > operation has finished. -- This message was sent by Atlassian JIRA (v6.3.4#6332)