Github user parthchandra commented on a diff in the pull request:
https://github.com/apache/drill/pull/219#discussion_r43447604
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
---
@@ -29,117 +29,152 @@
import parquet.hadoop.metadata.ColumnChunkMetaData;
abstract class NullableColumnReader<V extends ValueVector> extends
ColumnReader<V>{
-
- int nullsFound;
- // used to skip nulls found
- int rightBitShift;
- // used when copying less than a byte worth of data at a time, to
indicate the number of used bits in the current byte
- int bitsUsed;
- BaseDataValueVector castedBaseVector;
- NullableVectorDefinitionSetter castedVectorMutator;
- long definitionLevelsRead;
- long totalDefinitionLevelsRead;
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(NullableColumnReader.class);
+ protected BaseDataValueVector castedBaseVector;
+ protected NullableVectorDefinitionSetter castedVectorMutator;
+ private long definitionLevelsRead = 0;
NullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement)
throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData,
fixedLength, v, schemaElement);
castedBaseVector = (BaseDataValueVector) v;
castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
- totalDefinitionLevelsRead = 0;
}
-
- @Override
- public void processPages(long recordsToReadInThisPass) throws
IOException {
- int indexInOutputVector = 0;
+ @Override public void processPages(long recordsToReadInThisPass)
+ throws IOException {
readStartInBytes = 0;
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
vectorData = castedBaseVector.getBuffer();
- // values need to be spaced out where nulls appear in the column
- // leaving blank space for nulls allows for random access to values
- // to optimize copying data out of the buffered disk stream, runs of
defined values
- // are located and copied together, rather than copying individual
values
-
- long runStart = pageReader.readPosInBytes;
- int runLength;
- int currentDefinitionLevel;
- boolean lastValueWasNull;
- boolean lastRunBrokenByNull = false;
- while (indexInOutputVector < recordsToReadInThisPass &&
indexInOutputVector < valueVec.getValueCapacity()){
- // read a page if needed
+ // values need to be spaced out where nulls appear in the column
+ // leaving blank space for nulls allows for random access to values
+ // to optimize copying data out of the buffered disk stream, runs of
defined values
+ // are located and copied together, rather than copying individual
values
+
+ int runLength = -1; // number of non-null records in this pass.
+ int nullRunLength = -1; // number of consecutive null records that we
read.
+ int currentDefinitionLevel = -1;
+ int readCount = 0; // the record number we last read.
+ int writeCount = 0; // the record number we last wrote to the value
vector.
+ // This was previously the indexInOutputVector
variable
+ boolean haveMoreData; // true if we have more data and have not filled
the vector
+
+ while (readCount < recordsToReadInThisPass && writeCount <
valueVec.getValueCapacity()) {
+ // read a page if needed
if (!pageReader.hasPage()
- || ((readStartInBytes + readLength >= pageReader.byteLength &&
bitsUsed == 0) &&
--- End diff --
The condition
readStartInBytes + readLength >= pageReader.byteLength
and the condition
definitionLevelsRead >= pageReader.currentPageCount
both were checking for the end of a page. I'm not sure if in the previous
code one condition could be true but not the other. In the new while loop, if
one is true, so is the other. So we need only one of the two.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---