[
https://issues.apache.org/jira/browse/HADOOP-14214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101756#comment-16101756
]
Zhe Zhang commented on HADOOP-14214:
------------------------------------
Thanks [~liuml07] for the fix. Indeed a major bug. I just backported to
branch-2.7.
> DomainSocketWatcher::add()/delete() should not self interrupt while looping
> await()
> -----------------------------------------------------------------------------------
>
> Key: HADOOP-14214
> URL: https://issues.apache.org/jira/browse/HADOOP-14214
> Project: Hadoop Common
> Issue Type: Bug
> Components: hdfs-client
> Reporter: Mingliang Liu
> Assignee: Mingliang Liu
> Priority: Critical
> Fix For: 2.9.0, 2.7.4, 3.0.0-alpha4, 2.8.2
>
> Attachments: HADOOP-14214.000.patch
>
>
> Our hive team found a TPCDS job whose queries running on LLAP seem to be
> getting stuck. Dozens of threads were waiting for the
> {{DfsClientShmManager::lock}}, as following jstack:
> {code}
> Thread 251 (IO-Elevator-Thread-5):
> State: WAITING
> Blocked count: 3871
> Wtaited count: 4565
> Waiting on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@16ead198
> Stack:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1976)
>
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
>
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
>
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
>
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
>
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
>
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
>
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
>
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
>
> org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
>
> org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
> org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
> org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
>
> org.apache.hadoop.fs.FSDataInputStream.readFully(FSDataInputStream.java:111)
>
> org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readStripeFooter(RecordReaderUtils.java:166)
>
> org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata.<init>(OrcStripeMetadata.java:64)
>
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.readStripesMetadata(OrcEncodedDataReader.java:622)
> {code}
> The thread that is expected to signal those threads is calling
> {{DomainSocketWatcher::add()}} method, but it gets stuck there dealing with
> InterruptedException infinitely. The jstack is like:
> {code}
> Thread 44417 (TezTR-257387_2840_12_10_52_0):
> State: RUNNABLE
> Blocked count: 3
> Wtaited count: 5
> Stack:
> java.lang.Throwable.fillInStackTrace(Native Method)
> java.lang.Throwable.fillInStackTrace(Throwable.java:783)
> java.lang.Throwable.<init>(Throwable.java:250)
> java.lang.Exception.<init>(Exception.java:54)
> java.lang.InterruptedException.<init>(InterruptedException.java:57)
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2034)
>
> org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
>
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
>
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
>
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
>
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
>
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
>
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
>
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
>
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
>
> org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
>
> org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
> org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
> org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
> org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
> {code}
> The whole job makes no progress because of this.
> The thread in {{DomainSocketWatcher::add()}} is expected to eventually break
> the while loop where it waits for the newly added entry being deleted by
> another thread. However, if this thread is ever interrupted, chances are that
> it will hold the lock forever so {{if(!toAdd.contains(entry))}} always be
> false.
> {code:title=DomainSocketWatcher::add()}
> public void add(DomainSocket sock, Handler handler) {
> lock.lock();
> try {
> ......
> toAdd.add(entry);
> kick();
> while (true) {
> try {
> processedCond.await();
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> }
> if (!toAdd.contains(entry)) {
> break;
> }
> }
> } finally {
> lock.unlock();
> }
> }
> {code}
> The reason here is that, this method catches the InterruptedException and
> self interrupts during await(). The await() method internally calls
> {{AbstractQueuedSynchronizer::await()}}, which will throw a new
> InterruptedException if it's interrupted.
> {code:title=AbstractQueuedSynchronizer::await()}
> public final void await() throws InterruptedException {
> if (Thread.interrupted())
> throw new InterruptedException();
> Node node = addConditionWaiter();
> ...
> {code}
> Our code in {{DomainSocketWatcher::add()}} catches this exception (again) and
> self interrupt (again). Please note in this process, the associated lock is
> never released so that the other thread which is supposed to make
> {{if(!toAdd.contains(entry))}} be true is still pending on the lock.
> {{DomainSocketWatcher::delete()}} has similar code logic and should suffer
> from similar problems.
> Thanks [~jdere] for testing and reporting this.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]