[ 
https://issues.apache.org/jira/browse/DRILL-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15843720#comment-15843720
 ] 

ASF GitHub Bot commented on DRILL-5207:
---------------------------------------

Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/723#discussion_r98272441
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
    @@ -282,89 +356,119 @@ public synchronized void setValuesRead(long 
valuesRead) {
           this.valuesRead = valuesRead;
         }
     
    -    public long getDiskScanTime() {
    +    public synchronized long getDiskScanTime() {
           return diskScanTime;
         }
     
    -    public void setDiskScanTime(long diskScanTime) {
    +    public synchronized void setDiskScanTime(long diskScanTime) {
           this.diskScanTime = diskScanTime;
         }
    -  }
     
    +  }
     
    -  private class AsyncPageReaderTask implements Callable<ReadStatus> {
    +  private class AsyncPageReaderTask implements Callable<Boolean> {
     
         private final AsyncPageReader parent = AsyncPageReader.this;
    +    private final LinkedBlockingQueue<ReadStatus> queue;
    +    private final String name;
     
    -    public AsyncPageReaderTask() {
    +    public AsyncPageReaderTask(String name, 
LinkedBlockingQueue<ReadStatus> queue) {
    +      this.name = name;
    +      this.queue = queue;
         }
     
    -    @Override public ReadStatus call() throws IOException {
    +    @Override
    +    public Boolean call() throws IOException {
           ReadStatus readStatus = new ReadStatus();
     
    -      String oldname = Thread.currentThread().getName();
    -      String name = 
parent.parentColumnReader.columnChunkMetaData.toString();
    -      Thread.currentThread().setName(name);
    -
           long bytesRead = 0;
           long valuesRead = 0;
    +      final long totalValuesRead = parent.totalPageValuesRead;
           Stopwatch timer = Stopwatch.createStarted();
     
    +      final long totalValuesCount = 
parent.parentColumnReader.columnChunkMetaData.getValueCount();
    +
    +      // if we are done, just put a marker object in the queue and we are 
done.
    +      logger.trace("[{}]: Total Values COUNT {}  Total Values READ {} ", 
name, totalValuesCount, totalValuesRead);
    +      if (totalValuesRead >= totalValuesCount) {
    +        try {
    +          queue.put(ReadStatus.EMPTY);
    +        } catch (InterruptedException e) {
    +          Thread.currentThread().interrupt();
    +          // Do nothing.
    +        }
    +        return true;
    +      }
    +
           DrillBuf pageData = null;
    +      timer.reset();
           try {
             long s = parent.dataReader.getPos();
             PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
    -        long e = parent.dataReader.getPos();
    -        if (logger.isTraceEnabled()) {
    -          logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read 
= {} ", name, s, e - s);
    -        }
    +        //long e = parent.dataReader.getPos();
    +        //if (logger.isTraceEnabled()) {
    +        //  logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes 
Read = {} ", name, s, e - s);
    +        //}
             int compressedSize = pageHeader.getCompressed_page_size();
             s = parent.dataReader.getPos();
             pageData = parent.dataReader.getNext(compressedSize);
    -        e = parent.dataReader.getPos();
             bytesRead = compressedSize;
    -
    -        if (logger.isTraceEnabled()) {
    -          DrillBuf bufStart = pageData.slice(0, 
compressedSize>100?100:compressedSize);
    -          int endOffset = compressedSize>100?compressedSize-100:0;
    -          DrillBuf bufEnd = pageData.slice(endOffset, 
compressedSize-endOffset);
    -          logger
    -              .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = 
{} : Buf Start = {} : Buf End = {} ",
    -                  name, s, e - s, ByteBufUtil.hexDump(bufStart), 
ByteBufUtil.hexDump(bufEnd));
    -
    -        }
    +        //e = parent.dataReader.getPos();
    +        //if (logger.isTraceEnabled()) {
    +        //  DrillBuf bufStart = pageData.slice(0, 
compressedSize>100?100:compressedSize);
    +        //  int endOffset = compressedSize>100?compressedSize-100:0;
    +        //  DrillBuf bufEnd = pageData.slice(endOffset, 
compressedSize-endOffset);
    +        //  logger
    +        //      .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = 
{} : Buf Start = {} : Buf End = {} ",
    +        //          name, s, e - s, ByteBufUtil.hexDump(bufStart), 
ByteBufUtil.hexDump(bufEnd));
    +        //}
     
             synchronized (parent) {
               if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
                 readStatus.setIsDictionaryPage(true);
                 valuesRead += 
pageHeader.getDictionary_page_header().getNum_values();
               } else {
                 valuesRead += pageHeader.getData_page_header().getNum_values();
    +            parent.totalPageValuesRead += valuesRead;
               }
               long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
               readStatus.setPageHeader(pageHeader);
               readStatus.setPageData(pageData);
               readStatus.setBytesRead(bytesRead);
               readStatus.setValuesRead(valuesRead);
               readStatus.setDiskScanTime(timeToRead);
    +          assert (totalValuesRead <= totalValuesCount);
             }
    -
    +        synchronized (queue) {
    +          queue.put(readStatus);
    +          // if the queue is not full, schedule another read task 
immediately. If it is then the consumer
    +          // will schedule a new read task as soon as it removes a page 
from the queue.
    +          if (queue.remainingCapacity() > 0) {
    +            asyncPageRead.offer(parent.threadPool.submit(new 
AsyncPageReaderTask(debugName, queue)));
    +          }
    +        }
    +        // Do nothing.
    +      } catch (InterruptedException e) {
    +        if (pageData != null) {
    +          pageData.release();
    +        }
    +        Thread.currentThread().interrupt();
           } catch (Exception e) {
             if (pageData != null) {
               pageData.release();
             }
    -        throw e;
    -      }
    -      Thread.currentThread().setName(oldname);
    -      return readStatus;
    +        parent.handleAndThrowException(e, "Scan interrupted during 
execution.");
    --- End diff --
    
    I guess the message is wrong (and misleading). We should be handling the 
interrupted exception here and throwing any other exception upwards. 


> Improve Parquet scan pipelining
> -------------------------------
>
>                 Key: DRILL-5207
>                 URL: https://issues.apache.org/jira/browse/DRILL-5207
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Storage - Parquet
>    Affects Versions: 1.9.0
>            Reporter: Parth Chandra
>            Assignee: Parth Chandra
>             Fix For: 1.10
>
>
> The parquet reader's async page reader is not quite efficiently pipelined. 
> The default size of the disk read buffer is 4MB while the page reader reads 
> ~1MB at a time. The Parquet decode is also processing 1MB at a time. This 
> means the disk is idle while the data is being processed. Reducing the buffer 
> to 1MB will reduce the time the processing thread waits for the disk read 
> thread.
> Additionally, since the data to process a page may be more or less than 1MB, 
> a queue of pages will help so that the disk scan does not block (until the 
> queue is full), waiting for the processing thread.
> Additionally, the BufferedDirectBufInputStream class reads from disk as soon 
> as it is initialized. Since this is called at setup time, this increases the 
> setup time for the query and query execution does not begin until this is 
> completed.
> There are a few other inefficiencies - options are read every time a page 
> reader is created. Reading options can be expensive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to