[
https://issues.apache.org/jira/browse/DRILL-5207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15843720#comment-15843720
]
ASF GitHub Bot commented on DRILL-5207:
---------------------------------------
Github user parthchandra commented on a diff in the pull request:
https://github.com/apache/drill/pull/723#discussion_r98272441
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
---
@@ -282,89 +356,119 @@ public synchronized void setValuesRead(long
valuesRead) {
this.valuesRead = valuesRead;
}
- public long getDiskScanTime() {
+ public synchronized long getDiskScanTime() {
return diskScanTime;
}
- public void setDiskScanTime(long diskScanTime) {
+ public synchronized void setDiskScanTime(long diskScanTime) {
this.diskScanTime = diskScanTime;
}
- }
+ }
- private class AsyncPageReaderTask implements Callable<ReadStatus> {
+ private class AsyncPageReaderTask implements Callable<Boolean> {
private final AsyncPageReader parent = AsyncPageReader.this;
+ private final LinkedBlockingQueue<ReadStatus> queue;
+ private final String name;
- public AsyncPageReaderTask() {
+ public AsyncPageReaderTask(String name,
LinkedBlockingQueue<ReadStatus> queue) {
+ this.name = name;
+ this.queue = queue;
}
- @Override public ReadStatus call() throws IOException {
+ @Override
+ public Boolean call() throws IOException {
ReadStatus readStatus = new ReadStatus();
- String oldname = Thread.currentThread().getName();
- String name =
parent.parentColumnReader.columnChunkMetaData.toString();
- Thread.currentThread().setName(name);
-
long bytesRead = 0;
long valuesRead = 0;
+ final long totalValuesRead = parent.totalPageValuesRead;
Stopwatch timer = Stopwatch.createStarted();
+ final long totalValuesCount =
parent.parentColumnReader.columnChunkMetaData.getValueCount();
+
+ // if we are done, just put a marker object in the queue and we are
done.
+ logger.trace("[{}]: Total Values COUNT {} Total Values READ {} ",
name, totalValuesCount, totalValuesRead);
+ if (totalValuesRead >= totalValuesCount) {
+ try {
+ queue.put(ReadStatus.EMPTY);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // Do nothing.
+ }
+ return true;
+ }
+
DrillBuf pageData = null;
+ timer.reset();
try {
long s = parent.dataReader.getPos();
PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
- long e = parent.dataReader.getPos();
- if (logger.isTraceEnabled()) {
- logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read
= {} ", name, s, e - s);
- }
+ //long e = parent.dataReader.getPos();
+ //if (logger.isTraceEnabled()) {
+ // logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes
Read = {} ", name, s, e - s);
+ //}
int compressedSize = pageHeader.getCompressed_page_size();
s = parent.dataReader.getPos();
pageData = parent.dataReader.getNext(compressedSize);
- e = parent.dataReader.getPos();
bytesRead = compressedSize;
-
- if (logger.isTraceEnabled()) {
- DrillBuf bufStart = pageData.slice(0,
compressedSize>100?100:compressedSize);
- int endOffset = compressedSize>100?compressedSize-100:0;
- DrillBuf bufEnd = pageData.slice(endOffset,
compressedSize-endOffset);
- logger
- .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read =
{} : Buf Start = {} : Buf End = {} ",
- name, s, e - s, ByteBufUtil.hexDump(bufStart),
ByteBufUtil.hexDump(bufEnd));
-
- }
+ //e = parent.dataReader.getPos();
+ //if (logger.isTraceEnabled()) {
+ // DrillBuf bufStart = pageData.slice(0,
compressedSize>100?100:compressedSize);
+ // int endOffset = compressedSize>100?compressedSize-100:0;
+ // DrillBuf bufEnd = pageData.slice(endOffset,
compressedSize-endOffset);
+ // logger
+ // .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read =
{} : Buf Start = {} : Buf End = {} ",
+ // name, s, e - s, ByteBufUtil.hexDump(bufStart),
ByteBufUtil.hexDump(bufEnd));
+ //}
synchronized (parent) {
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
readStatus.setIsDictionaryPage(true);
valuesRead +=
pageHeader.getDictionary_page_header().getNum_values();
} else {
valuesRead += pageHeader.getData_page_header().getNum_values();
+ parent.totalPageValuesRead += valuesRead;
}
long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
readStatus.setPageHeader(pageHeader);
readStatus.setPageData(pageData);
readStatus.setBytesRead(bytesRead);
readStatus.setValuesRead(valuesRead);
readStatus.setDiskScanTime(timeToRead);
+ assert (totalValuesRead <= totalValuesCount);
}
-
+ synchronized (queue) {
+ queue.put(readStatus);
+ // if the queue is not full, schedule another read task
immediately. If it is then the consumer
+ // will schedule a new read task as soon as it removes a page
from the queue.
+ if (queue.remainingCapacity() > 0) {
+ asyncPageRead.offer(parent.threadPool.submit(new
AsyncPageReaderTask(debugName, queue)));
+ }
+ }
+ // Do nothing.
+ } catch (InterruptedException e) {
+ if (pageData != null) {
+ pageData.release();
+ }
+ Thread.currentThread().interrupt();
} catch (Exception e) {
if (pageData != null) {
pageData.release();
}
- throw e;
- }
- Thread.currentThread().setName(oldname);
- return readStatus;
+ parent.handleAndThrowException(e, "Scan interrupted during
execution.");
--- End diff --
I guess the message is wrong (and misleading). We should be handling the
interrupted exception here and throwing any other exception upwards.
> Improve Parquet scan pipelining
> -------------------------------
>
> Key: DRILL-5207
> URL: https://issues.apache.org/jira/browse/DRILL-5207
> Project: Apache Drill
> Issue Type: Bug
> Components: Storage - Parquet
> Affects Versions: 1.9.0
> Reporter: Parth Chandra
> Assignee: Parth Chandra
> Fix For: 1.10
>
>
> The parquet reader's async page reader is not quite efficiently pipelined.
> The default size of the disk read buffer is 4MB while the page reader reads
> ~1MB at a time. The Parquet decode is also processing 1MB at a time. This
> means the disk is idle while the data is being processed. Reducing the buffer
> to 1MB will reduce the time the processing thread waits for the disk read
> thread.
> Additionally, since the data to process a page may be more or less than 1MB,
> a queue of pages will help so that the disk scan does not block (until the
> queue is full), waiting for the processing thread.
> Additionally, the BufferedDirectBufInputStream class reads from disk as soon
> as it is initialized. Since this is called at setup time, this increases the
> setup time for the query and query execution does not begin until this is
> completed.
> There are a few other inefficiencies - options are read every time a page
> reader is created. Reading options can be expensive.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)