Github user adeneche commented on a diff in the pull request:
https://github.com/apache/drill/pull/219#discussion_r43432757
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
---
@@ -29,117 +29,152 @@
import parquet.hadoop.metadata.ColumnChunkMetaData;
abstract class NullableColumnReader<V extends ValueVector> extends
ColumnReader<V>{
-
- int nullsFound;
- // used to skip nulls found
- int rightBitShift;
- // used when copying less than a byte worth of data at a time, to
indicate the number of used bits in the current byte
- int bitsUsed;
- BaseDataValueVector castedBaseVector;
- NullableVectorDefinitionSetter castedVectorMutator;
- long definitionLevelsRead;
- long totalDefinitionLevelsRead;
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(NullableColumnReader.class);
+ protected BaseDataValueVector castedBaseVector;
+ protected NullableVectorDefinitionSetter castedVectorMutator;
+ private long definitionLevelsRead = 0;
NullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement)
throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData,
fixedLength, v, schemaElement);
castedBaseVector = (BaseDataValueVector) v;
castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
- totalDefinitionLevelsRead = 0;
}
-
- @Override
- public void processPages(long recordsToReadInThisPass) throws
IOException {
- int indexInOutputVector = 0;
+ @Override public void processPages(long recordsToReadInThisPass)
+ throws IOException {
readStartInBytes = 0;
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
vectorData = castedBaseVector.getBuffer();
- // values need to be spaced out where nulls appear in the column
- // leaving blank space for nulls allows for random access to values
- // to optimize copying data out of the buffered disk stream, runs of
defined values
- // are located and copied together, rather than copying individual
values
-
- long runStart = pageReader.readPosInBytes;
- int runLength;
- int currentDefinitionLevel;
- boolean lastValueWasNull;
- boolean lastRunBrokenByNull = false;
- while (indexInOutputVector < recordsToReadInThisPass &&
indexInOutputVector < valueVec.getValueCapacity()){
- // read a page if needed
+ // values need to be spaced out where nulls appear in the column
+ // leaving blank space for nulls allows for random access to values
+ // to optimize copying data out of the buffered disk stream, runs of
defined values
+ // are located and copied together, rather than copying individual
values
+
+ int runLength = -1; // number of non-null records in this pass.
+ int nullRunLength = -1; // number of consecutive null records that we
read.
+ int currentDefinitionLevel = -1;
+ int readCount = 0; // the record number we last read.
+ int writeCount = 0; // the record number we last wrote to the value
vector.
+ // This was previously the indexInOutputVector
variable
+ boolean haveMoreData; // true if we have more data and have not filled
the vector
+
+ while (readCount < recordsToReadInThisPass && writeCount <
valueVec.getValueCapacity()) {
+ // read a page if needed
if (!pageReader.hasPage()
- || ((readStartInBytes + readLength >= pageReader.byteLength &&
bitsUsed == 0) &&
- definitionLevelsRead >= pageReader.currentPageCount)) {
- if (!pageReader.next()) {
- break;
- }
- definitionLevelsRead = 0;
+ || (definitionLevelsRead >= pageReader.currentPageCount)) {
+ if (!pageReader.next()) {
+ break;
}
- lastValueWasNull = true;
- runLength = 0;
- if (lastRunBrokenByNull ) {
- nullsFound = 1;
- lastRunBrokenByNull = false;
- } else {
- nullsFound = 0;
- }
- // loop to find the longest run of defined values available, can
be preceded by several nulls
- while(indexInOutputVector < recordsToReadInThisPass
- && indexInOutputVector < valueVec.getValueCapacity()
- && definitionLevelsRead < pageReader.currentPageCount) {
+ //New page. Reset the definition level.
+ currentDefinitionLevel = -1;
+ definitionLevelsRead = 0;
+ recordsReadInThisIteration = 0;
+ }
+
+ nullRunLength = 0;
+ runLength = 0;
+
+ //
+ // Let's skip the next run of nulls if any ...
+ //
+
+ // If we are reentering this loop, the currentDefinitionLevel has
already been read
+ if (currentDefinitionLevel < 0) {
+ currentDefinitionLevel = pageReader.definitionLevels.readInteger();
+ }
+ haveMoreData = readCount < recordsToReadInThisPass
+ && writeCount + nullRunLength < valueVec.getValueCapacity()
+ && definitionLevelsRead < pageReader.currentPageCount;
+ while (haveMoreData && currentDefinitionLevel < columnDescriptor
+ .getMaxDefinitionLevel()) {
+ readCount++;
+ nullRunLength++;
+ definitionLevelsRead++;
+ haveMoreData = readCount < recordsToReadInThisPass
+ && writeCount + nullRunLength < valueVec.getValueCapacity()
+ && definitionLevelsRead < pageReader.currentPageCount;
+ if (haveMoreData) {
currentDefinitionLevel =
pageReader.definitionLevels.readInteger();
- definitionLevelsRead++;
- indexInOutputVector++;
- totalDefinitionLevelsRead++;
- if ( currentDefinitionLevel <
columnDescriptor.getMaxDefinitionLevel()){
- // a run of non-null values was found, break out of this loop
to do a read in the outer loop
- if ( ! lastValueWasNull ){
- lastRunBrokenByNull = true;
- break;
- }
- nullsFound++;
- lastValueWasNull = true;
- }
- else{
- if (lastValueWasNull){
- runLength = 0;
- lastValueWasNull = false;
- }
- runLength++;
- castedVectorMutator.setIndexDefined(indexInOutputVector - 1);
- }
}
- valuesReadInCurrentPass += nullsFound;
+ }
+ //
+ // Write the nulls if any
+ //
+ if (nullRunLength > 0) {
+ int writerIndex =
+ ((BaseDataValueVector) valueVec).getBuffer().writerIndex();
+ castedBaseVector.getBuffer().setIndex(0, writerIndex + (int) Math
--- End diff --
previous code seemed to handle `dataTypeLengthInBits <= 8` separately. Is
it no longer required/needed ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---