[
https://issues.apache.org/jira/browse/DRILL-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15843709#comment-15843709
]
ASF GitHub Bot commented on DRILL-5207:
---------------------------------------
Github user parthchandra commented on a diff in the pull request:
https://github.com/apache/drill/pull/723#discussion_r98271038
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
---
@@ -192,45 +235,74 @@ private DrillBuf decompress(PageHeader pageHeader,
DrillBuf compressedData) {
stats.timeDataPageLoads.addAndGet(timeBlocked +
readStatus.getDiskScanTime());
}
pageHeader = readStatus.getPageHeader();
- // reset this. At the time of calling close, if this is not null
then a pending asyncPageRead needs to be consumed
- asyncPageRead = null;
- } catch (Exception e) {
- handleAndThrowException(e, "Error reading page data.");
- }
// TODO - figure out if we need multiple dictionary pages, I believe
it may be limited to one
// I think we are clobbering parts of the dictionary if there can be
multiple pages of dictionary
- do {
- if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
- readDictionaryPageData(readStatus, parentColumnReader);
- // Ugly. Use the Async task to make a synchronous read call.
- readStatus = new AsyncPageReaderTask().call();
- pageHeader = readStatus.getPageHeader();
- }
- } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
-
- if (parentColumnReader.totalValuesRead + readStatus.getValuesRead()
- < parentColumnReader.columnChunkMetaData.getValueCount()) {
- asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
- }
+ do {
+ if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+ readDictionaryPageData(readStatus, parentColumnReader);
+ asyncPageRead.poll().get(); // get the result of execution
+ synchronized (pageQueue) {
+ boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+ readStatus = pageQueue.take(); // get the data if no exception
has been thrown
+ if (readStatus.pageData == null || readStatus ==
ReadStatus.EMPTY) {
+ break;
+ }
+ //if the queue was full before we took a page out, then there
would
+ // have been no new read tasks scheduled. In that case,
schedule a new read.
+ if (pageQueueFull) {
+ asyncPageRead.offer(threadPool.submit(new
AsyncPageReaderTask(debugName, pageQueue)));
+ }
+ }
+ assert (readStatus.pageData != null);
+ pageHeader = readStatus.getPageHeader();
+ }
+ } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
pageHeader = readStatus.getPageHeader();
pageData = getDecompressedPageData(readStatus);
-
+ assert(pageData != null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e){
+ handleAndThrowException(e, "Error reading page data");
+ }
}
-
@Override public void clear() {
- if (asyncPageRead != null) {
+ while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
try {
- final ReadStatus readStatus = asyncPageRead.get();
- readStatus.getPageData().release();
+ Future<Boolean> f = asyncPageRead.poll();
+ if(!f.isDone() && !f.isCancelled()){
+ f.cancel(true);
+ } else {
+ Boolean b = f.get(1, TimeUnit.MILLISECONDS);
--- End diff --
OK
> Improve Parquet scan pipelining
> -------------------------------
>
> Key: DRILL-5207
> URL: https://issues.apache.org/jira/browse/DRILL-5207
> Project: Apache Drill
> Issue Type: Bug
> Components: Storage - Parquet
> Affects Versions: 1.9.0
> Reporter: Parth Chandra
> Assignee: Parth Chandra
> Fix For: 1.10
>
>
> The parquet reader's async page reader is not quite efficiently pipelined.
> The default size of the disk read buffer is 4MB while the page reader reads
> ~1MB at a time. The Parquet decode is also processing 1MB at a time. This
> means the disk is idle while the data is being processed. Reducing the buffer
> to 1MB will reduce the time the processing thread waits for the disk read
> thread.
> Additionally, since the data to process a page may be more or less than 1MB,
> a queue of pages will help so that the disk scan does not block (until the
> queue is full), waiting for the processing thread.
> Additionally, the BufferedDirectBufInputStream class reads from disk as soon
> as it is initialized. Since this is called at setup time, this increases the
> setup time for the query and query execution does not begin until this is
> completed.
> There are a few other inefficiencies - options are read every time a page
> reader is created. Reading options can be expensive.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)