[ 
https://issues.apache.org/jira/browse/FLINK-3466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15375397#comment-15375397
 ] 

Stephan Ewen commented on FLINK-3466:
-------------------------------------

Here is a Unit test that minimally reproduces getting stuck in interrupt 
sensitive state handles (like those reading from HDFS)

{code}
public class InterruptSensitiveRestoreTest {

        private static final OneShotLatch IN_RESTORE_LATCH = new OneShotLatch();

        @Test
        public void testRestoreWithInterrupt() throws Exception {

                Configuration taskConfig = new Configuration();
                StreamConfig cfg = new StreamConfig(taskConfig);
                cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);

                TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(taskConfig, new InterruptLockingStateHandle());
                Task task = createTask(tdd);

                // start the task and wait until it is in "restore"
                task.startTaskThread();
                IN_RESTORE_LATCH.await();

                // trigger cancellation and signal to continue
                task.cancelExecution();

                task.getExecutingThread().join(30000);

                if (task.getExecutionState() == ExecutionState.CANCELING) {
                        fail("Task is stuck and not canceling");
                }

                assertEquals(ExecutionState.CANCELED, task.getExecutionState());
                assertNull(task.getFailureCause());
        }

        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------

        private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(
                        Configuration taskConfig,
                        StateHandle<?> state) throws IOException {
                return new TaskDeploymentDescriptor(
                                new JobID(),
                                "test job name",
                                new JobVertexID(),
                                new ExecutionAttemptID(),
                                new SerializedValue<>(new ExecutionConfig()),
                                "test task name",
                                0, 1, 0,
                                new Configuration(),
                                taskConfig,
                                SourceStreamTask.class.getName(),
                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                Collections.<BlobKey>emptyList(),
                                Collections.<URL>emptyList(),
                                0,
                                new SerializedValue<StateHandle<?>>(state));
        }
        
        private static Task createTask(TaskDeploymentDescriptor tdd) throws 
IOException {
                return new Task(
                                tdd,
                                mock(MemoryManager.class),
                                mock(IOManager.class),
                                mock(NetworkEnvironment.class),
                                mock(BroadcastVariableManager.class),
                                mock(ActorGateway.class),
                                mock(ActorGateway.class),
                                new FiniteDuration(10, TimeUnit.SECONDS),
                                new FallbackLibraryCacheManager(),
                                new FileCache(new Configuration()),
                                new TaskManagerRuntimeInfo(
                                                "localhost", new 
Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
                                mock(TaskMetricGroup.class));
                
        }

        @SuppressWarnings("serial")
        private static class InterruptLockingStateHandle extends 
StreamTaskStateList {

                public InterruptLockingStateHandle() throws Exception {
                        super(new StreamTaskState[0]);
                }

                @Override
                public StreamTaskState[] getState(ClassLoader 
userCodeClassLoader) {
                        IN_RESTORE_LATCH.trigger();
                        
                        // this mimics what happens in the HDFS client code.
                        // an interrupt on a waiting object leads to an 
infinite loop
                        try {
                                synchronized (this) {
                                        wait();
                                }
                        }
                        catch (InterruptedException e) {
                                while (true) {
                                        try {
                                                synchronized (this) {
                                                        wait();
                                                }
                                        } catch (InterruptedException ignored) 
{}
                                }
                        }
                        
                        return super.getState(userCodeClassLoader);
                }
        }
}
{code}

> Job might get stuck in restoreState() from HDFS due to interrupt
> ----------------------------------------------------------------
>
>                 Key: FLINK-3466
>                 URL: https://issues.apache.org/jira/browse/FLINK-3466
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.0.0, 0.10.2
>            Reporter: Robert Metzger
>            Assignee: Stephan Ewen
>
> A user reported the following issue with a failing job:
> {code}
> 10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                  
>    - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck 
> in method:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1979)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
> java.io.DataInputStream.read(DataInputStream.java:149)
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
> java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:745)
> {code}
> and 
> {code}
> 10:46:09,223 WARN  org.apache.flink.runtime.taskmanager.Task                  
>    - Task 'XXX -> YYY (3/5)' did not react to cancelling signal, but is stuck 
> in method:
> java.lang.Throwable.fillInStackTrace(Native Method)
> java.lang.Throwable.fillInStackTrace(Throwable.java:783)
> java.lang.Throwable.<init>(Throwable.java:250)
> java.lang.Exception.<init>(Exception.java:54)
> java.lang.InterruptedException.<init>(InterruptedException.java:57)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2038)
> org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1016)
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:477)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:783)
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:717)
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:421)
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:332)
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:576)
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:800)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:848)
> java.io.DataInputStream.read(DataInputStream.java:149)
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:69)
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2310)
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2323)
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2794)
> java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:801)
> java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:55)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:52)
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:162)
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:440)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> java.lang.Thread.run(Thread.java:745)
> {code}
> The issue is most likely that the HDFS client gets stuck in the 
> "org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read()" call when it 
> receives an interrupt.
> By putting the call into a separate thread, the TaskInterrupt would not break 
> the HadoopReadThread.
> The HadoopReadThread would stop eventually with an error or after the read 
> operation has finished.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to