sachouche commented on a change in pull request #1330: DRILL-6147: Adding
Columnar Parquet Batch Sizing functionality
URL: https://github.com/apache/drill/pull/1330#discussion_r198938982
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
##########
@@ -90,22 +91,161 @@ public long readFields(long recordsToReadInThisPass)
throws IOException {
recordsReadInCurrentPass = readRecordsInBulk((int)
recordsToReadInThisPass);
}
+ // Publish this information
+ parentReader.readState.setValuesReadInCurrentPass((int)
recordsReadInCurrentPass);
+
+ // Update the stats
parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
return recordsReadInCurrentPass;
}
private int readRecordsInBulk(int recordsToReadInThisPass) throws
IOException {
- int recordsReadInCurrentPass = -1;
+ int batchNumRecords = recordsToReadInThisPass;
+ List<VarLenColumnBatchStats> columnStats = new
ArrayList<VarLenColumnBatchStats>(columns.size());
+ int prevReadColumns = -1;
+ boolean overflowCondition = false;
+
+ for (VLColumnContainer columnContainer : orderedColumns) {
+ VarLengthColumn<?> columnReader = columnContainer.column;
+
+ // Read the column data
+ int readColumns = columnReader.readRecordsInBulk(batchNumRecords);
+ assert readColumns <= batchNumRecords : "Reader cannot return more
values than requested..";
+
+ if (!overflowCondition) {
+ if (prevReadColumns >= 0 && prevReadColumns != readColumns) {
+ overflowCondition = true;
+ } else {
+ prevReadColumns = readColumns;
+ }
+ }
+
+ // Enqueue this column entry information to handle overflow conditions;
we will not know
+ // whether an overflow happened till all variable length columns have
been processed
+ columnStats.add(new VarLenColumnBatchStats(columnReader.valueVec,
readColumns));
+ // Decrease the number of records to read when a column returns less
records (minimize overflow)
+ if (batchNumRecords > readColumns) {
+ batchNumRecords = readColumns;
+ // it seems this column caused an overflow (higher layer will not ask
for more values than remaining)
+ ++columnContainer.numCausedOverflows;
+ }
+ }
+
+ // Set the value-count for each column
for (VarLengthColumn<?> columnReader : columns) {
- int readColumns =
columnReader.readRecordsInBulk(recordsToReadInThisPass);
- assert (readColumns >= 0 && recordsReadInCurrentPass == readColumns ||
recordsReadInCurrentPass == -1);
+ columnReader.valuesReadInCurrentPass = batchNumRecords;
+ }
+
+ // Publish this batch statistics
+ publishBatchStats(columnStats, batchNumRecords);
- recordsReadInCurrentPass = readColumns;
+ // Handle column(s) overflow if any
+ if (overflowCondition) {
+ handleColumnOverflow(columnStats, batchNumRecords);
}
- return recordsReadInCurrentPass;
+ return batchNumRecords;
+ }
+
+ private void handleColumnOverflow(List<VarLenColumnBatchStats> columnStats,
int batchNumRecords) {
+ // Overflow would happen if a column returned more values than
"batchValueCount"; this can happen
+ // when a column Ci is called first, returns num-values-i, and then
another column cj is called which
+ // returns less values than num-values-i.
+ RecordBatchOverflow.Builder builder = null;
+
+ // We need to collect all columns which are subject to an overflow (except
for the ones which are already
+ // returning values from previous batch overflow)
+ for (VarLenColumnBatchStats columnStat : columnStats) {
+ if (columnStat.numValuesRead > batchNumRecords) {
+ // We need to figure out whether this column was already returning
values from a previous batch
+ // overflow; if it is, then this is a NOOP (as the overflow data is
still available to be replayed)
+ if
(fieldHasAlreadyOverflowData(columnStat.vector.getField().getName())) {
+ continue;
+ }
+
+ // We need to set the value-count as otherwise some size related
vector APIs won't work
+ columnStat.vector.getMutator().setValueCount(batchNumRecords);
Review comment:
Correct; for this reason the design a) optimizes overflow handling and b)
attempts to minimize overflow occurrence.
NOTE - The same behavior is observed when the Drillbuf exponentially expands
the buffer size.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services