Github user sachouche commented on a diff in the pull request: https://github.com/apache/drill/pull/1060#discussion_r161032779 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java --- @@ -165,17 +181,133 @@ + "Run Length: {} \t Null Run Length: {} \t readCount: {} \t writeCount: {} \t " + "recordsReadInThisIteration: {} \t valuesReadInCurrentPass: {} \t " + "totalValuesRead: {} \t readStartInBytes: {} \t readLength: {} \t pageReader.byteLength: {} \t " - + "definitionLevelsRead: {} \t pageReader.currentPageCount: {}", + + "currPageValuesProcessed: {} \t pageReader.currentPageCount: {}", recordsToReadInThisPass, runLength, nullRunLength, readCount, writeCount, recordsReadInThisIteration, valuesReadInCurrentPass, totalValuesRead, readStartInBytes, readLength, pageReader.byteLength, - definitionLevelsRead, pageReader.currentPageCount); + currPageValuesProcessed, pageReader.currentPageCount); } valueVec.getMutator().setValueCount(valuesReadInCurrentPass); } + private final void processPagesBulk(long recordsToReadInThisPass) throws IOException { + readStartInBytes = 0; + readLength = 0; + readLengthInBits = 0; + recordsReadInThisIteration = 0; + vectorData = castedBaseVector.getBuffer(); + + // values need to be spaced out where nulls appear in the column + // leaving blank space for nulls allows for random access to values + // to optimize copying data out of the buffered disk stream, runs of defined values + // are located and copied together, rather than copying individual values + + int valueCount = 0; + final int maxValuesToProcess = Math.min((int) recordsToReadInThisPass, valueVec.getValueCapacity()); + + // To handle the case where the page has been already loaded + if (pageReader.definitionLevels != null && currPageValuesProcessed == 0) { + definitionLevelWrapper.set(pageReader.definitionLevels, pageReader.currentPageCount); + } + + while (valueCount < maxValuesToProcess) { + + // read a page if needed + if (!pageReader.hasPage() || (currPageValuesProcessed == pageReader.currentPageCount)) { + if (!pageReader.next()) { + break; + } + + //New page. Reset the definition level. + currPageValuesProcessed = 0; + recordsReadInThisIteration = 0; + readStartInBytes = 0; + + // Update the Definition Level reader + definitionLevelWrapper.set(pageReader.definitionLevels, pageReader.currentPageCount); + } + + definitionLevelWrapper.readFirstIntegerIfNeeded(); + + int numNullValues = 0; + int numNonNullValues = 0; + final int remaining = maxValuesToProcess - valueCount; + int currBatchSz = Math.min(remaining, (pageReader.currentPageCount - currPageValuesProcessed)); + assert currBatchSz > 0; + + // Let's skip the next run of nulls if any ... + int idx; + for (idx = 0; idx < currBatchSz; ++idx) { + if (definitionLevelWrapper.readCurrInteger() == 1) { + break; // non-value encountered + } + definitionLevelWrapper.nextIntegerIfNotEOF(); + } + numNullValues += idx; + + // Write the nulls if any --- End diff -- This is the original logic for processing fixed precision columns (except more efficient); the intent is to figure out how many null values there are before processing non-null values. We could have avoided this for-loop if we controlled the Parquet ValuesReader API; I was tempted to do that but decided to prioritize my tasks. The good news is that I recorded all such improvement opportunities and will try to tackle them at some point.
---