[ 
https://issues.apache.org/jira/browse/HADOOP-14214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingliang Liu updated HADOOP-14214:
-----------------------------------
    Description: 

Our hive team found a TPCDS job whose queries running on LLAP seem to be 
getting stuck. Dozens of threads were waiting for the 
{{DfsClientShmManager::lock}}, as following jstack:
{code}
Thread 251 (IO-Elevator-Thread-5):
  State: WAITING
  Blocked count: 3871
  Wtaited count: 4565
  Waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@16ead198
  Stack:
    sun.misc.Unsafe.park(Native Method)
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1976)
    
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
    
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
    
org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
    
org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
    org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
    
org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
    
org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
    org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
    org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
    org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
    org.apache.hadoop.fs.FSDataInputStream.readFully(FSDataInputStream.java:111)
    
org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readStripeFooter(RecordReaderUtils.java:166)
    
org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata.<init>(OrcStripeMetadata.java:64)
    
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.readStripesMetadata(OrcEncodedDataReader.java:622)
{code}

The thread that is expected to signal those threads is calling 
{{DomainSocketWatcher::add()}} method, but it gets stuck there dealing with 
InterruptedException infinitely. The jstack is like:
{code}
Thread 44417 (TezTR-257387_2840_12_10_52_0):
  State: RUNNABLE
  Blocked count: 3
  Wtaited count: 5
  Stack:
    java.lang.Throwable.fillInStackTrace(Native Method)
    java.lang.Throwable.fillInStackTrace(Throwable.java:783)
    java.lang.Throwable.<init>(Throwable.java:250)
    java.lang.Exception.<init>(Exception.java:54)
    java.lang.InterruptedException.<init>(InterruptedException.java:57)
    
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2034)
    
org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
    
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
    
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
    
org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
    
org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
    org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
    
org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
    
org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
    org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
    org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
    org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
{code}
The whole job makes no progress because of this.

The thread in {{DomainSocketWatcher::add()}} is expected to eventually break 
the while loop where it waits for the newly added entry being deleted by 
another thread. However, if this thread is ever interrupted, chances are that 
it will hold the lock forever so {{if(!toAdd.contains(entry))}} always be false.
{code:title=DomainSocketWatcher::add()}
  public void add(DomainSocket sock, Handler handler) {
    lock.lock();
    try {
      ......
      toAdd.add(entry);
      kick();
      while (true) {
        try {
          processedCond.await();
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
        if (!toAdd.contains(entry)) {
          break;
        }
      }
    } finally {
      lock.unlock();
    }
  }
{code}

The reason here is that, this method catches the InterruptedException and self 
interrupts during await(). The await() method internally calls 
{{AbstractQueuedSynchronizer::await()}}, which will throw a new 
InterruptedException if it's interrupted.
{code:title=AbstractQueuedSynchronizer::await()}
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            ...
{code}

Our code in {{DomainSocketWatcher::add()}} catches this exception (again) and 
self interrupt (again). Please note in this process, the associated lock is 
never released so that the other thread which is supposed to make 
{{if(!toAdd.contains(entry))}} be true is still pending on the lock.

{{DomainSocketWatcher::delete()}} has similar code logic and should suffer from 
similar problems. 

Thanks [~jdere] for testing and reporting this.

  was:



Our hive team found a TPCDS job whose queries running on LLAP seem to be 
getting stuck. Dozens of threads were waiting for the 
{{DfsClientShmManager::lock}}, as following jstack:
{code}
Thread 251 (IO-Elevator-Thread-5):
  State: WAITING
  Blocked count: 3871
  Wtaited count: 4565
  Waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@16ead198
  Stack:
    sun.misc.Unsafe.park(Native Method)
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1976)
    
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
    
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
    
org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
    
org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
    org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
    
org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
    
org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
    org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
    org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
    org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
    org.apache.hadoop.fs.FSDataInputStream.readFully(FSDataInputStream.java:111)
    
org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readStripeFooter(RecordReaderUtils.java:166)
    
org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata.<init>(OrcStripeMetadata.java:64)
    
org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.readStripesMetadata(OrcEncodedDataReader.java:622)
{code}

The thread that is expected to signal those threads is calling 
{{DomainSocketWatcher::add()}} method, but it gets stuck there dealing with 
InterruptedException infinitely. The jstack is like:
{code}
Thread 44417 (TezTR-257387_2840_12_10_52_0):
  State: RUNNABLE
  Blocked count: 3
  Wtaited count: 5
  Stack:
    java.lang.Throwable.fillInStackTrace(Native Method)
    java.lang.Throwable.fillInStackTrace(Throwable.java:783)
    java.lang.Throwable.<init>(Throwable.java:250)
    java.lang.Exception.<init>(Exception.java:54)
    java.lang.InterruptedException.<init>(InterruptedException.java:57)
    
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2034)
    
org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
    
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
    
org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
    
org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
    
org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
    
org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
    org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
    
org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
    
org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
    org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
    org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
    org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
{code}
The whole job makes no progress because of this.

The thread in {{DomainSocketWatcher::add()}} is expected to eventually break 
the while loop where it waits for the newly added entry being deleted by 
another thread. However, if this thread is ever interrupted, chances are that 
it will hold the lock forever so {{if(!toAdd.contains(entry))}} always be false.
{code:title=DomainSocketWatcher::add()}
  public void add(DomainSocket sock, Handler handler) {
    lock.lock();
    try {
      ......
      toAdd.add(entry);
      kick();
      while (true) {
        try {
          processedCond.await();
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
        if (!toAdd.contains(entry)) {
          break;
        }
      }
    } finally {
      lock.unlock();
    }
  }
{code}

The reason here is that, this method catches the InterruptedException and self 
interrupts during await(). The await() method internally calls 
{{AbstractQueuedSynchronizer::await()}}, which will throw a new 
InterruptedException if it's interrupted.
{code:title=AbstractQueuedSynchronizer::await()}
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            ...
{code}

Our code in {{DomainSocketWatcher::add()}} catches this exception (again) and 
self interrupt (again). Please note in this process, the associated lock is 
never released so that the other thread which is supposed to make 
{{if(!toAdd.contains(entry))}} be true is still pending on the lock.

The {{DomainSocketWatcher::delete()} has similar code logic and should suffer 
from similar problems. 

Thanks [~jdere] for testing and reporting this.


> DomainSocketWatcher::add()/delete() should not self interrupt while looping 
> await()
> -----------------------------------------------------------------------------------
>
>                 Key: HADOOP-14214
>                 URL: https://issues.apache.org/jira/browse/HADOOP-14214
>             Project: Hadoop Common
>          Issue Type: Bug
>            Reporter: Mingliang Liu
>            Assignee: Mingliang Liu
>            Priority: Critical
>
> Our hive team found a TPCDS job whose queries running on LLAP seem to be 
> getting stuck. Dozens of threads were waiting for the 
> {{DfsClientShmManager::lock}}, as following jstack:
> {code}
> Thread 251 (IO-Elevator-Thread-5):
>   State: WAITING
>   Blocked count: 3871
>   Wtaited count: 4565
>   Waiting on 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@16ead198
>   Stack:
>     sun.misc.Unsafe.park(Native Method)
>     java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUninterruptibly(AbstractQueuedSynchronizer.java:1976)
>     
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:255)
>     
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
>     
> org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
>     
> org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
>     org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
>     org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
>     org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
>     
> org.apache.hadoop.fs.FSDataInputStream.readFully(FSDataInputStream.java:111)
>     
> org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readStripeFooter(RecordReaderUtils.java:166)
>     
> org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata.<init>(OrcStripeMetadata.java:64)
>     
> org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader.readStripesMetadata(OrcEncodedDataReader.java:622)
> {code}
> The thread that is expected to signal those threads is calling 
> {{DomainSocketWatcher::add()}} method, but it gets stuck there dealing with 
> InterruptedException infinitely. The jstack is like:
> {code}
> Thread 44417 (TezTR-257387_2840_12_10_52_0):
>   State: RUNNABLE
>   Blocked count: 3
>   Wtaited count: 5
>   Stack:
>     java.lang.Throwable.fillInStackTrace(Native Method)
>     java.lang.Throwable.fillInStackTrace(Throwable.java:783)
>     java.lang.Throwable.<init>(Throwable.java:250)
>     java.lang.Exception.<init>(Exception.java:54)
>     java.lang.InterruptedException.<init>(InterruptedException.java:57)
>     
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2034)
>     
> org.apache.hadoop.net.unix.DomainSocketWatcher.add(DomainSocketWatcher.java:325)
>     
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager$EndpointShmManager.allocSlot(DfsClientShmManager.java:266)
>     
> org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.allocSlot(DfsClientShmManager.java:434)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.allocShmSlot(ShortCircuitCache.java:1017)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.createShortCircuitReplicaInfo(BlockReaderFactory.java:476)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.create(ShortCircuitCache.java:784)
>     
> org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.fetchOrCreate(ShortCircuitCache.java:718)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.getBlockReaderLocal(BlockReaderFactory.java:422)
>     
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:333)
>     
> org.apache.hadoop.hdfs.DFSInputStream.actualGetFromOneDataNode(DFSInputStream.java:1181)
>     
> org.apache.hadoop.hdfs.DFSInputStream.fetchBlockByteRange(DFSInputStream.java:1118)
>     org.apache.hadoop.hdfs.DFSInputStream.pread(DFSInputStream.java:1478)
>     org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:1441)
>     org.apache.hadoop.fs.FSInputStream.readFully(FSInputStream.java:121)
> {code}
> The whole job makes no progress because of this.
> The thread in {{DomainSocketWatcher::add()}} is expected to eventually break 
> the while loop where it waits for the newly added entry being deleted by 
> another thread. However, if this thread is ever interrupted, chances are that 
> it will hold the lock forever so {{if(!toAdd.contains(entry))}} always be 
> false.
> {code:title=DomainSocketWatcher::add()}
>   public void add(DomainSocket sock, Handler handler) {
>     lock.lock();
>     try {
>       ......
>       toAdd.add(entry);
>       kick();
>       while (true) {
>         try {
>           processedCond.await();
>         } catch (InterruptedException e) {
>           Thread.currentThread().interrupt();
>         }
>         if (!toAdd.contains(entry)) {
>           break;
>         }
>       }
>     } finally {
>       lock.unlock();
>     }
>   }
> {code}
> The reason here is that, this method catches the InterruptedException and 
> self interrupts during await(). The await() method internally calls 
> {{AbstractQueuedSynchronizer::await()}}, which will throw a new 
> InterruptedException if it's interrupted.
> {code:title=AbstractQueuedSynchronizer::await()}
>         public final void await() throws InterruptedException {
>             if (Thread.interrupted())
>                 throw new InterruptedException();
>             Node node = addConditionWaiter();
>             ...
> {code}
> Our code in {{DomainSocketWatcher::add()}} catches this exception (again) and 
> self interrupt (again). Please note in this process, the associated lock is 
> never released so that the other thread which is supposed to make 
> {{if(!toAdd.contains(entry))}} be true is still pending on the lock.
> {{DomainSocketWatcher::delete()}} has similar code logic and should suffer 
> from similar problems. 
> Thanks [~jdere] for testing and reporting this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to