[
https://issues.apache.org/jira/browse/DRILL-6147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16527058#comment-16527058
]
ASF GitHub Bot commented on DRILL-6147:
---------------------------------------
sachouche commented on a change in pull request #1330: DRILL-6147: Adding
Columnar Parquet Batch Sizing functionality
URL: https://github.com/apache/drill/pull/1330#discussion_r198938982
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
##########
@@ -90,22 +91,161 @@ public long readFields(long recordsToReadInThisPass)
throws IOException {
recordsReadInCurrentPass = readRecordsInBulk((int)
recordsToReadInThisPass);
}
+ // Publish this information
+ parentReader.readState.setValuesReadInCurrentPass((int)
recordsReadInCurrentPass);
+
+ // Update the stats
parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
return recordsReadInCurrentPass;
}
private int readRecordsInBulk(int recordsToReadInThisPass) throws
IOException {
- int recordsReadInCurrentPass = -1;
+ int batchNumRecords = recordsToReadInThisPass;
+ List<VarLenColumnBatchStats> columnStats = new
ArrayList<VarLenColumnBatchStats>(columns.size());
+ int prevReadColumns = -1;
+ boolean overflowCondition = false;
+
+ for (VLColumnContainer columnContainer : orderedColumns) {
+ VarLengthColumn<?> columnReader = columnContainer.column;
+
+ // Read the column data
+ int readColumns = columnReader.readRecordsInBulk(batchNumRecords);
+ assert readColumns <= batchNumRecords : "Reader cannot return more
values than requested..";
+
+ if (!overflowCondition) {
+ if (prevReadColumns >= 0 && prevReadColumns != readColumns) {
+ overflowCondition = true;
+ } else {
+ prevReadColumns = readColumns;
+ }
+ }
+
+ // Enqueue this column entry information to handle overflow conditions;
we will not know
+ // whether an overflow happened till all variable length columns have
been processed
+ columnStats.add(new VarLenColumnBatchStats(columnReader.valueVec,
readColumns));
+ // Decrease the number of records to read when a column returns less
records (minimize overflow)
+ if (batchNumRecords > readColumns) {
+ batchNumRecords = readColumns;
+ // it seems this column caused an overflow (higher layer will not ask
for more values than remaining)
+ ++columnContainer.numCausedOverflows;
+ }
+ }
+
+ // Set the value-count for each column
for (VarLengthColumn<?> columnReader : columns) {
- int readColumns =
columnReader.readRecordsInBulk(recordsToReadInThisPass);
- assert (readColumns >= 0 && recordsReadInCurrentPass == readColumns ||
recordsReadInCurrentPass == -1);
+ columnReader.valuesReadInCurrentPass = batchNumRecords;
+ }
+
+ // Publish this batch statistics
+ publishBatchStats(columnStats, batchNumRecords);
- recordsReadInCurrentPass = readColumns;
+ // Handle column(s) overflow if any
+ if (overflowCondition) {
+ handleColumnOverflow(columnStats, batchNumRecords);
}
- return recordsReadInCurrentPass;
+ return batchNumRecords;
+ }
+
+ private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats,
int batchNumRecords) {
+ // Overflow would happen if a column returned more values than
"batchValueCount"; this can happen
+ // when a column Ci is called first, returns num-values-i, and then
another column cj is called which
+ // returns less values than num-values-i.
+ RecordBatchOverflow.Builder builder = null;
+
+ // We need to collect all columns which are subject to an overflow (except
for the ones which are already
+ // returning values from previous batch overflow)
+ for (VarLenColumnBatchStats columnStat : columnStats) {
+ if (columnStat.numValuesRead > batchNumRecords) {
+ // We need to figure out whether this column was already returning
values from a previous batch
+ // overflow; if it is, then this is a NOOP (as the overflow data is
still available to be replayed)
+ if
(fieldHasAlreadyOverflowData(columnStat.vector.getField().getName())) {
+ continue;
+ }
+
+ // We need to set the value-count as otherwise some size related
vector APIs won't work
+ columnStat.vector.getMutator().setValueCount(batchNumRecords);
Review comment:
Correct; for this reason the design a) optimizes overflow handling and b)
attempts to minimize overflow occurrence.
NOTE - The same behavior is observed when the Drillbuf exponentially expands
the buffer size.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Limit batch size for Flat Parquet Reader
> ----------------------------------------
>
> Key: DRILL-6147
> URL: https://issues.apache.org/jira/browse/DRILL-6147
> Project: Apache Drill
> Issue Type: Improvement
> Components: Storage - Parquet
> Reporter: salim achouche
> Assignee: salim achouche
> Priority: Major
> Fix For: 1.14.0
>
>
> The Parquet reader currently uses a hard-coded batch size limit (32k rows)
> when creating scan batches; there is no parameter nor any logic for
> controlling the amount of memory used. This enhancement will allow Drill to
> take an extra input parameter to control direct memory usage.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)