[ 
https://issues.apache.org/jira/browse/DRILL-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15843709#comment-15843709
 ] 

ASF GitHub Bot commented on DRILL-5207:
---------------------------------------

Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/723#discussion_r98271038
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
 ---
    @@ -192,45 +235,74 @@ private DrillBuf decompress(PageHeader pageHeader, 
DrillBuf compressedData) {
             stats.timeDataPageLoads.addAndGet(timeBlocked + 
readStatus.getDiskScanTime());
           }
           pageHeader = readStatus.getPageHeader();
    -      // reset this. At the time of calling close, if this is not null 
then a pending asyncPageRead needs to be consumed
    -      asyncPageRead = null;
    -    } catch (Exception e) {
    -      handleAndThrowException(e, "Error reading page data.");
    -    }
     
         // TODO - figure out if we need multiple dictionary pages, I believe 
it may be limited to one
         // I think we are clobbering parts of the dictionary if there can be 
multiple pages of dictionary
     
    -    do {
    -      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
    -        readDictionaryPageData(readStatus, parentColumnReader);
    -        // Ugly. Use the Async task to make a synchronous read call.
    -        readStatus = new AsyncPageReaderTask().call();
    -        pageHeader = readStatus.getPageHeader();
    -      }
    -    } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
    -
    -    if (parentColumnReader.totalValuesRead + readStatus.getValuesRead()
    -        < parentColumnReader.columnChunkMetaData.getValueCount()) {
    -      asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
    -    }
    +      do {
    +        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
    +          readDictionaryPageData(readStatus, parentColumnReader);
    +          asyncPageRead.poll().get(); // get the result of execution
    +          synchronized (pageQueue) {
    +            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
    +            readStatus = pageQueue.take(); // get the data if no exception 
has been thrown
    +            if (readStatus.pageData == null || readStatus == 
ReadStatus.EMPTY) {
    +              break;
    +            }
    +            //if the queue was full before we took a page out, then there 
would
    +            // have been no new read tasks scheduled. In that case, 
schedule a new read.
    +            if (pageQueueFull) {
    +              asyncPageRead.offer(threadPool.submit(new 
AsyncPageReaderTask(debugName, pageQueue)));
    +            }
    +          }
    +          assert (readStatus.pageData != null);
    +          pageHeader = readStatus.getPageHeader();
    +        }
    +      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
     
         pageHeader = readStatus.getPageHeader();
         pageData = getDecompressedPageData(readStatus);
    -
    +    assert(pageData != null);
    +    } catch (InterruptedException e) {
    +      Thread.currentThread().interrupt();
    +    } catch (Exception e){
    +      handleAndThrowException(e, "Error reading page data");
    +    }
     
       }
     
    -
       @Override public void clear() {
    -    if (asyncPageRead != null) {
    +    while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
           try {
    -        final ReadStatus readStatus = asyncPageRead.get();
    -        readStatus.getPageData().release();
    +        Future<Boolean> f = asyncPageRead.poll();
    +        if(!f.isDone() && !f.isCancelled()){
    +          f.cancel(true);
    +        } else {
    +          Boolean b = f.get(1, TimeUnit.MILLISECONDS);
    --- End diff --
    
    OK


> Improve Parquet scan pipelining
> -------------------------------
>
>                 Key: DRILL-5207
>                 URL: https://issues.apache.org/jira/browse/DRILL-5207
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Storage - Parquet
>    Affects Versions: 1.9.0
>            Reporter: Parth Chandra
>            Assignee: Parth Chandra
>             Fix For: 1.10
>
>
> The parquet reader's async page reader is not quite efficiently pipelined. 
> The default size of the disk read buffer is 4MB while the page reader reads 
> ~1MB at a time. The Parquet decode is also processing 1MB at a time. This 
> means the disk is idle while the data is being processed. Reducing the buffer 
> to 1MB will reduce the time the processing thread waits for the disk read 
> thread.
> Additionally, since the data to process a page may be more or less than 1MB, 
> a queue of pages will help so that the disk scan does not block (until the 
> queue is full), waiting for the processing thread.
> Additionally, the BufferedDirectBufInputStream class reads from disk as soon 
> as it is initialized. Since this is called at setup time, this increases the 
> setup time for the query and query execution does not begin until this is 
> completed.
> There are a few other inefficiencies - options are read every time a page 
> reader is created. Reading options can be expensive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to