[ 
https://issues.apache.org/jira/browse/DRILL-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066933#comment-16066933
 ] 

Kunal Khatua commented on DRILL-5420:
-------------------------------------

The CPU churn is most easily reproduced by cancellation of a running query that 
is using the Async Parquet Reader, because it involves interrupting the query 
during reading of the data. The original approach worked fine as long as the 
query ran to completion, because the reader tasks run to completion with no new 
submissions. 

The cause of the CPU churn is this line in the AsyncPageReader: 
{{r = pageQueue.take();}}
Ref: 
https://github.com/kkhatua/drill/blob/6446e56f292a5905d646462c618c056839ad5198/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java#L301


{code:java}
    //Empty the page queue
    ReadStatus r;
    while (!pageQueue.isEmpty()) {
      r = null;
      try {
        r = pageQueue.take();
        if (r == ReadStatus.EMPTY) {
          break;
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      } finally {
        if (r != null && r.pageData != null) {
          r.pageData.release();
        }
      }
    }
{code}

The second segment of the clear() method tries to clear the pageQueue, but is 
constantly being interrupted because tasks can still get potentially submitted 
to the pool to continue writing to the queue. The interrupts were being caught 
and passed on, instead of being ignored. 
The best approach would be to first block any submission of additional tasks as 
early as possible. In this case, we should try to have the ColumnReader 
immediately mark a flag (accessible by the PageReaders), indicating that the 
readers are shutting down. This helps terminate any mid-flight readers 
gracefully. Once, all possible points of submission of reader tasks have been 
blocked, the queue can now be cleared without an interruptable fetch [ i.e. 
{{pageQueue.take()}} ] by using {{pageQueue.poll()}} to fetch and release the 
retrieved page.
In addition, any running page reader tasks [ {{AsyncPageReaderTask.call()}} ] 
can do an explicit check for termination as well. This helps because if a 
running task is being cancelled, it can help to save CPU cycles in trying to 
decode those pages. 

> all cores at 100% of all servers
> --------------------------------
>
>                 Key: DRILL-5420
>                 URL: https://issues.apache.org/jira/browse/DRILL-5420
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.10.0
>         Environment: linux, cluster with 5 servers over hdfs/parquet
>            Reporter: Hugo Bellomusto
>            Assignee: Parth Chandra
>         Attachments: 2709a36d-804a-261a-64e5-afa271e782f8.json
>
>
> We have a drill cluster with five servers over hdfs/parquet.
> Each machine have 8 cores. All cores get at 100% of use.
> Each thread is looping in the while in line 314 in AsyncPageReader.java 
> inside clear() method.
> https://github.com/apache/drill/blob/1.10.0/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java#L314
> jstack -l 19255|grep -A 50 $(printf "%x" 29250)
> "271d6262-ff19-ad24-af36-777bfe6c6375:frag:1:4" daemon prio=10 
> tid=0x00007f5b2adec800 nid=0x7242 runnable [0x00007f5aa33e8000]
>    java.lang.Thread.State: RUNNABLE
>       at java.lang.Throwable.fillInStackTrace(Native Method)
>       at java.lang.Throwable.fillInStackTrace(Throwable.java:783)
>       - locked <0x00000007374bfcb0> (a java.lang.InterruptedException)
>       at java.lang.Throwable.<init>(Throwable.java:250)
>       at java.lang.Exception.<init>(Exception.java:54)
>       at java.lang.InterruptedException.<init>(InterruptedException.java:57)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1219)
>       at 
> java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:340)
>       at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
>       at 
> org.apache.drill.exec.store.parquet.columnreaders.AsyncPageReader.clear(AsyncPageReader.java:317)
>       at 
> org.apache.drill.exec.store.parquet.columnreaders.ColumnReader.clear(ColumnReader.java:140)
>       at 
> org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader.close(ParquetRecordReader.java:632)
>       at 
> org.apache.drill.exec.physical.impl.ScanBatch.next(ScanBatch.java:183)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:109)
>       at 
> org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:51)
>       at 
> org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.innerNext(ProjectRecordBatch.java:135)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:109)
>       at 
> org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:51)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:109)
>       at 
> org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:51)
>       at 
> org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.innerNext(RemovingRecordBatch.java:93)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:109)
>       at 
> org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:51)
>       at 
> org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.innerNext(ProjectRecordBatch.java:135)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:109)
>       at 
> org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:51)
>       at 
> org.apache.drill.exec.physical.impl.limit.LimitRecordBatch.innerNext(LimitRecordBatch.java:115)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:109)
>       at 
> org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:51)
>       at 
> org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch.innerNext(RemovingRecordBatch.java:93)
>       at 
> org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
>       at 
> org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:104)
>       at 
> org.apache.drill.exec.physical.impl.SingleSenderCreator$SingleSenderRootExec.innerNext(SingleSenderCreator.java:92)
>       at 
> org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:94)
>       at 
> org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:232)
>       at 
> org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:226)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:415)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to