Drill-400 change parquet reader to place varbinary fields into VarCharVectors, allowing them to be returned by default as UTF-8 Strings. Note that this will only be the case with newer parquet files that were produced after Converted Types were added to the format. This metadata stores the desired intrepertation of a column, but was not originally in the format. For older files you will still need to case binary data to Varchar.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f071aca7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f071aca7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f071aca7 Branch: refs/heads/master Commit: f071aca7f8c85eace6f96d931068bceabdb2c419 Parents: a2355d4 Author: Jason Altekruse <[email protected]> Authored: Tue Apr 22 21:31:14 2014 -0500 Committer: Steven Phillips <[email protected]> Committed: Mon May 5 18:51:02 2014 -0700 ---------------------------------------------------------------------- exec/java-exec/pom.xml | 4 +- .../drill/exec/store/parquet/BitReader.java | 9 +- .../drill/exec/store/parquet/ColumnReader.java | 20 ++- .../store/parquet/FixedByteAlignedReader.java | 5 +- .../exec/store/parquet/NullableBitReader.java | 7 +- .../store/parquet/NullableColumnReader.java | 19 +- .../parquet/NullableFixedByteAlignedReader.java | 5 +- .../exec/store/parquet/PageReadStatus.java | 6 +- .../exec/store/parquet/ParquetRecordReader.java | 106 ++++++++--- .../exec/store/parquet/VarLenBinaryReader.java | 176 ++++++++++++++++--- .../store/parquet/ParquetRecordReaderTest.java | 21 ++- .../store/parquet/ParquetResultListener.java | 31 +--- 12 files changed, 305 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/pom.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 196b095..3e26662 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -95,7 +95,7 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-column</artifactId> - <version>1.2.8</version> + <version>1.4.0</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> @@ -110,7 +110,7 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-hadoop</artifactId> - <version>1.2.8</version> + <version>1.4.0</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java index c489d5b..c323222 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java @@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; import parquet.hadoop.metadata.ColumnChunkMetaData; final class BitReader extends ColumnReader { @@ -30,8 +31,8 @@ final class BitReader extends ColumnReader { private byte[] bytes; BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v); + boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); } @Override @@ -47,11 +48,11 @@ final class BitReader extends ColumnReader { bytes = pageReadStatus.pageDataByteArray; // standard read, using memory mapping if (pageReadStatus.bitShift == 0) { - ((BaseDataValueVector) valueVecHolder.getValueVector()).getData().writeBytes(bytes, + ((BaseDataValueVector) valueVec).getData().writeBytes(bytes, (int) readStartInBytes, (int) readLength); } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended - vectorData = ((BaseDataValueVector) valueVecHolder.getValueVector()).getData(); + vectorData = ((BaseDataValueVector) valueVec).getData(); nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)]; readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java index d5c88ef..97ecfb8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java @@ -24,17 +24,18 @@ import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.FSDataInputStream; import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.schema.PrimitiveType; import java.io.IOException; -abstract class ColumnReader { +abstract class ColumnReader<V extends ValueVector> { final ParquetRecordReader parentReader; // Value Vector for this column - final VectorHolder valueVecHolder; + final V valueVec; // column description from the parquet library final ColumnDescriptor columnDescriptor; // metadata of the column, from the parquet library @@ -42,6 +43,8 @@ abstract class ColumnReader { // status information on the current page final PageReadStatus pageReadStatus; + final ConvertedType convertedType; + // quick reference to see if the field is fixed length (as this requires an instanceof) final boolean isFixedLength; @@ -62,16 +65,17 @@ abstract class ColumnReader { long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0; protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException { + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, ConvertedType convertedType) throws ExecutionSetupException { this.parentReader = parentReader; this.columnDescriptor = descriptor; this.columnChunkMetaData = columnChunkMetaData; this.isFixedLength = fixedLength; + this.convertedType = convertedType; if (allocateSize > 1) { - valueVecHolder = new VectorHolder(allocateSize, v); + valueVec = v; } else { - valueVecHolder = new VectorHolder(5000, v); + valueVec = v; } @@ -88,7 +92,7 @@ abstract class ColumnReader { readLength = 0; readLengthInBits = 0; recordsReadInThisIteration = 0; - vectorData = ((BaseValueVector) valueVecHolder.getValueVector()).getData(); + vectorData = ((BaseValueVector) valueVec).getData(); do { // if no page has been read, or all of the records have been read out of a page, read the next one if (pageReadStatus.currentPage == null || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) { @@ -108,11 +112,11 @@ abstract class ColumnReader { pageReadStatus.readPosInBytes = readStartInBytes + readLength; } } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null); - valueVecHolder.getValueVector().getMutator().setValueCount(valuesReadInCurrentPass); + valueVec.getMutator().setValueCount(valuesReadInCurrentPass); } public void clear() { - this.valueVecHolder.reset(); + valueVec.clear(); this.pageReadStatus.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java index 4f14f60..0aa18cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java @@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; import parquet.hadoop.metadata.ColumnChunkMetaData; class FixedByteAlignedReader extends ColumnReader { @@ -29,8 +30,8 @@ class FixedByteAlignedReader extends ColumnReader { FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v); + boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); } // this method is called by its superclass during a read loop http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java index 16c2715..22933ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.exec.vector.ValueVector; import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; import parquet.hadoop.metadata.ColumnChunkMetaData; import java.io.IOException; @@ -38,8 +39,8 @@ import java.io.IOException; final class NullableBitReader extends ColumnReader { NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v); + boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); } @Override @@ -52,7 +53,7 @@ final class NullableBitReader extends ColumnReader { defLevel = pageReadStatus.definitionLevels.readInteger(); // if the value is defined if (defLevel == columnDescriptor.getMaxDefinitionLevel()){ - if (!((NullableBitVector)valueVecHolder.getValueVector()).getMutator().setSafe(i + valuesReadInCurrentPass, + if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass, pageReadStatus.valueReader.readBoolean() ? 1 : 0 )) { throw new RuntimeException(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java index b6ae715..8faf756 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java @@ -22,6 +22,7 @@ import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.NullableVectorDefinitionSetter; import org.apache.drill.exec.vector.ValueVector; import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; import parquet.hadoop.metadata.ColumnChunkMetaData; import java.io.IOException; @@ -35,8 +36,8 @@ abstract class NullableColumnReader extends ColumnReader{ int bitsUsed; NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v); + boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); } public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException { @@ -44,7 +45,7 @@ abstract class NullableColumnReader extends ColumnReader{ readLength = 0; readLengthInBits = 0; recordsReadInThisIteration = 0; - vectorData = ((BaseValueVector)valueVecHolder.getValueVector()).getData(); + vectorData = ((BaseValueVector)valueVec).getData(); do { // if no page has been read, or all of the records have been read out of a page, read the next one @@ -72,11 +73,11 @@ abstract class NullableColumnReader extends ColumnReader{ lastValueWasNull = true; nullsFound = 0; if (currentValueIndexInVector - totalValuesRead == recordsToReadInThisPass - || currentValueIndexInVector >= valueVecHolder.getValueVector().getValueCapacity()){ + || currentValueIndexInVector >= valueVec.getValueCapacity()){ break; } while(currentValueIndexInVector - totalValuesRead < recordsToReadInThisPass - && currentValueIndexInVector < valueVecHolder.getValueVector().getValueCapacity() + && currentValueIndexInVector < valueVec.getValueCapacity() && pageReadStatus.valuesRead + definitionLevelsRead < pageReadStatus.currentPage.getValueCount()){ currentDefinitionLevel = pageReadStatus.definitionLevels.readInteger(); definitionLevelsRead++; @@ -96,7 +97,7 @@ abstract class NullableColumnReader extends ColumnReader{ lastValueWasNull = false; } runLength++; - ((NullableVectorDefinitionSetter)valueVecHolder.getValueVector().getMutator()).setIndexDefined(currentValueIndexInVector); + ((NullableVectorDefinitionSetter)valueVec.getMutator()).setIndexDefined(currentValueIndexInVector); } currentValueIndexInVector++; } @@ -104,9 +105,9 @@ abstract class NullableColumnReader extends ColumnReader{ recordsReadInThisIteration = runLength; readField( runLength, firstColumnStatus); - int writerIndex = ((BaseValueVector) valueVecHolder.getValueVector()).getData().writerIndex(); + int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex(); if ( dataTypeLengthInBits > 8 || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){ - ((BaseValueVector) valueVecHolder.getValueVector()).getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0)); + ((BaseValueVector) valueVec).getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0)); } else if (dataTypeLengthInBits < 8){ rightBitShift += dataTypeLengthInBits * nullsFound; @@ -125,7 +126,7 @@ abstract class NullableColumnReader extends ColumnReader{ } } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null); - valueVecHolder.getValueVector().getMutator().setValueCount( + valueVec.getMutator().setValueCount( valuesReadInCurrentPass); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java index c2fc606..038f2d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java @@ -21,6 +21,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; import parquet.hadoop.metadata.ColumnChunkMetaData; class NullableFixedByteAlignedReader extends NullableColumnReader { @@ -28,8 +29,8 @@ class NullableFixedByteAlignedReader extends NullableColumnReader { private byte[] bytes; NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v); + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); } // this method is called by its superclass during a read loop http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java index 67262f6..fe83159 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java @@ -115,9 +115,9 @@ final class PageReadStatus { if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){ definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); - int endOfDefinitionLevels = definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0); - valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, endOfDefinitionLevels); - readPosInBytes = endOfDefinitionLevels; + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0); + readPosInBytes = definitionLevels.getNextOffset(); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); } return true; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 9acb557..463f3ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import org.apache.drill.common.exceptions.DrillRuntimeException; @@ -38,12 +39,21 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.VarCharVector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; +import parquet.format.FileMetaData; +import parquet.format.SchemaElement; +import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactoryExposer; +import parquet.hadoop.ParquetFileWriter; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.PrimitiveType; @@ -173,11 +183,23 @@ class ParquetRecordReader implements RecordReader { int columnsToScan = 0; MaterializedField field; + ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); + FileMetaData fileMetaData; + + // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below + // store a map from column name to converted types if they are non-null + HashMap<String, ConvertedType> convertedTypes = new HashMap<>(); + fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer); + for (SchemaElement se : fileMetaData.getSchema()) { + convertedTypes.put(se.getName(), se.getConverted_type()); + } + // loop to add up the length of the fixed width columns and build the schema for (int i = 0; i < columns.size(); ++i) { column = columns.get(i); + logger.debug("name: " + fileMetaData.getSchema().get(i).name); field = MaterializedField.create(toFieldName(column.getPath()), - toMajorType(column.getType(), getDataMode(column))); + toMajorType(column.getType(), getDataMode(column), convertedTypes.get(column.getPath()[0]))); if ( ! fieldSelected(field)){ continue; } @@ -203,9 +225,22 @@ class ParquetRecordReader implements RecordReader { return; } if (allFieldsFixedLength) { - recordsPerBatch = (int) Math.min(batchSize / bitWidthAllFixedFields, footer.getBlocks().get(0).getColumns().get(0).getValueCount()); + recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, + footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535); + } + else { + recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH; } +// for (SchemaElement se : fileMetaData.getSchema()) { +// if (fieldSelected()) +// System.out.println("convertedtype :" + se.getConverted_type()); +// System.out.println("name:" + se.getName()); +// System.out.println(); +// +// } try { + ValueVector v; + ConvertedType convertedType; ArrayList<VarLenBinaryReader.VarLengthColumn> varLengthColumns = new ArrayList<>(); ArrayList<VarLenBinaryReader.NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>(); // initialize all of the column read status objects @@ -213,21 +248,38 @@ class ParquetRecordReader implements RecordReader { for (int i = 0; i < columns.size(); ++i) { column = columns.get(i); columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i); - MajorType type = toMajorType(column.getType(), getDataMode(column)); + convertedType = convertedTypes.get(column.getPath()[0]); + MajorType type = toMajorType(column.getType(), getDataMode(column), convertedType); field = MaterializedField.create(toFieldName(column.getPath()), type); // the field was not requested to be read if ( ! fieldSelected(field)) continue; + //convertedTypes.put() fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY; - ValueVector v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); + v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { - createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v); + createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v, + convertedType); } else { if (column.getMaxDefinitionLevel() == 0){// column is required - varLengthColumns.add(new VarLenBinaryReader.VarLengthColumn(this, -1, column, columnChunkMetaData, false, v)); + if (convertedType == ConvertedType.UTF8) { + varLengthColumns.add( + new VarLenBinaryReader.VarCharColumn(this, -1, column, columnChunkMetaData, false, (VarCharVector) v, convertedType)); + } else { + varLengthColumns.add( + new VarLenBinaryReader.VarBinaryColumn(this, -1, column, columnChunkMetaData, false, (VarBinaryVector) v, convertedType)); + } } else{ - nullableVarLengthColumns.add(new VarLenBinaryReader.NullableVarLengthColumn(this, -1, column, columnChunkMetaData, false, v)); + if (convertedType == ConvertedType.UTF8) { + nullableVarLengthColumns.add( + new VarLenBinaryReader.NullableVarCharColumn(this, -1, column, columnChunkMetaData, false, + (NullableVarCharVector) v, convertedType)); + } else { + nullableVarLengthColumns.add( + new VarLenBinaryReader.NullableVarBinaryColumn(this, -1, column, columnChunkMetaData, false, + (NullableVarBinaryVector) v, convertedType)); + } } } } @@ -259,15 +311,15 @@ class ParquetRecordReader implements RecordReader { private void resetBatch() { for (ColumnReader column : columnStatuses) { - column.valueVecHolder.reset(); + AllocationHelper.allocate(column.valueVec, recordsPerBatch, 10, 5); column.valuesReadInCurrentPass = 0; } for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){ - r.valueVecHolder.reset(); + AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5); r.valuesReadInCurrentPass = 0; } for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns){ - r.valueVecHolder.reset(); + AllocationHelper.allocate(r.valueVec, recordsPerBatch, 10, 5); r.valuesReadInCurrentPass = 0; } } @@ -281,28 +333,29 @@ class ParquetRecordReader implements RecordReader { * @throws SchemaChangeException */ private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v) + ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v, + ConvertedType convertedType) throws SchemaChangeException, ExecutionSetupException { // if the column is required if (descriptor.getMaxDefinitionLevel() == 0){ if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v)); + fixedLength, v, convertedType)); } else{ columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v)); + fixedLength, v, convertedType)); } return true; } else { // if the column is nullable if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v)); + fixedLength, v, convertedType)); } else{ columnStatuses.add(new NullableFixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v)); + fixedLength, v, convertedType)); } return true; } @@ -363,18 +416,21 @@ class ParquetRecordReader implements RecordReader { } static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, - TypeProtos.DataMode mode) { - return toMajorType(primitiveTypeName, 0, mode); + TypeProtos.DataMode mode, ConvertedType convertedType) { + return toMajorType(primitiveTypeName, 0, mode, convertedType); } static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length, - TypeProtos.DataMode mode) { + TypeProtos.DataMode mode, ConvertedType convertedType) { switch (mode) { case OPTIONAL: switch (primitiveTypeName) { case BINARY: - return Types.optional(TypeProtos.MinorType.VARBINARY); + if (convertedType == ConvertedType.UTF8) + return Types.optional(TypeProtos.MinorType.VARCHAR); + else + return Types.optional(TypeProtos.MinorType.VARBINARY); case INT64: return Types.optional(TypeProtos.MinorType.BIGINT); case INT32: @@ -400,7 +456,10 @@ class ParquetRecordReader implements RecordReader { case REQUIRED: switch (primitiveTypeName) { case BINARY: - return Types.required(TypeProtos.MinorType.VARBINARY); + if (convertedType == ConvertedType.UTF8) + return Types.required(TypeProtos.MinorType.VARCHAR); + else + return Types.required(TypeProtos.MinorType.VARBINARY); case INT64: return Types.required(TypeProtos.MinorType.BIGINT); case INT32: @@ -426,7 +485,10 @@ class ParquetRecordReader implements RecordReader { case REPEATED: switch (primitiveTypeName) { case BINARY: - return Types.repeated(TypeProtos.MinorType.VARBINARY); + if (convertedType == ConvertedType.UTF8) + return Types.required(TypeProtos.MinorType.VARCHAR); + else + return Types.repeated(TypeProtos.MinorType.VARBINARY); case INT64: return Types.repeated(TypeProtos.MinorType.BIGINT); case INT32: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java index 09d19a8..ae01104 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java @@ -18,11 +18,14 @@ package org.apache.drill.exec.store.parquet; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.*; import org.apache.drill.exec.vector.NullableVarBinaryVector; -import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.VarCharVector; import parquet.bytes.BytesUtils; import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; import parquet.hadoop.metadata.ColumnChunkMetaData; import java.io.IOException; @@ -42,25 +45,156 @@ public class VarLenBinaryReader { this.columns = columns; } - public static class VarLengthColumn extends ColumnReader { + public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader { - VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v); + VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, + ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); } @Override protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { throw new UnsupportedOperationException(); } + + public abstract boolean setSafe(int index, byte[] bytes, int start, int length); + + public abstract int capacity(); + + } + + public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader { + + int nullsRead; + boolean currentValNull = false; + + NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, + ConvertedType convertedType ) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + } + + public abstract boolean setSafe(int index, byte[] value, int start, int length); + + public abstract int capacity(); + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + } + + public static class VarCharColumn extends VarLengthColumn <VarCharVector> { + + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected VarCharVector varCharVector; + + VarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarCharVector v, + ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + varCharVector = v; + } + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + return varCharVector.getMutator().setSafe(valuesReadInCurrentPass, bytes, + (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits); + } + + @Override + public int capacity() { + return varCharVector.getData().capacity(); + } } - public static class NullableVarLengthColumn extends ColumnReader { + public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> { int nullsRead; boolean currentValNull = false; + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected NullableVarCharVector nullableVarCharVector; + + NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v, + ConvertedType convertedType ) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + nullableVarCharVector = v; + } - NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v); + public boolean setSafe(int index, byte[] value, int start, int length) { + return nullableVarCharVector.getMutator().setSafe(index, value, + start, length); + } + + @Override + public int capacity() { + return nullableVarCharVector.getData().capacity(); + } + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + } + + public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> { + + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected VarBinaryVector varBinaryVector; + + VarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, + ConvertedType convertedType) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + varBinaryVector = v; + } + + @Override + protected void readField(long recordsToRead, ColumnReader firstColumnStatus) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean setSafe(int index, byte[] bytes, int start, int length) { + return varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, bytes, + (int) (pageReadStatus.readPosInBytes + 4), dataTypeLengthInBits); + } + + @Override + public int capacity() { + return varBinaryVector.getData().capacity(); + } + } + + public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> { + + int nullsRead; + boolean currentValNull = false; + // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting + protected NullableVarBinaryVector nullableVarBinaryVector; + + NullableVarBinaryColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, + ConvertedType convertedType ) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, convertedType); + nullableVarBinaryVector = v; + } + + public boolean setSafe(int index, byte[] value, int start, int length) { + return nullableVarBinaryVector.getMutator().setSafe(index, value, + start, length); + } + + @Override + public int capacity() { + return nullableVarBinaryVector.getData().capacity(); } @Override @@ -83,8 +217,6 @@ public class VarLenBinaryReader { int lengthVarFieldsInCurrentRecord; boolean rowGroupFinished = false; byte[] bytes; - VarBinaryVector currVec; - NullableVarBinaryVector currNullVec; // write the first 0 offset for (ColumnReader columnReader : columns) { columnReader.bytesReadInCurrentPass = 0; @@ -98,8 +230,8 @@ public class VarLenBinaryReader { } outer: do { lengthVarFieldsInCurrentRecord = 0; - for (ColumnReader columnReader : columns) { - if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){ + for (VarLengthColumn columnReader : columns) { + if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){ rowGroupFinished = true; break; } @@ -118,7 +250,7 @@ public class VarLenBinaryReader { (int) columnReader.pageReadStatus.readPosInBytes); lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; - if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > ((VarBinaryVector) columnReader.valueVecHolder.getValueVector()).getData().capacity()) { + if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) { break outer; } @@ -126,7 +258,7 @@ public class VarLenBinaryReader { for (NullableVarLengthColumn columnReader : nullableColumns) { // check to make sure there is capacity for the next value (for nullables this is a check to see if there is // still space in the nullability recording vector) - if (recordsReadInCurrentPass == columnReader.valueVecHolder.getValueVector().getValueCapacity()){ + if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){ rowGroupFinished = true; break; } @@ -151,7 +283,7 @@ public class VarLenBinaryReader { (int) columnReader.pageReadStatus.readPosInBytes); lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; - if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > ((NullableVarBinaryVector) columnReader.valueVecHolder.getValueVector()).getData().capacity()) { + if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) { break outer; } } @@ -160,12 +292,11 @@ public class VarLenBinaryReader { > parentReader.getBatchSize()){ break outer; } - for (ColumnReader columnReader : columns) { + for (VarLengthColumn columnReader : columns) { bytes = columnReader.pageReadStatus.pageDataByteArray; - currVec = (VarBinaryVector) columnReader.valueVecHolder.getValueVector(); // again, I am re-purposing the unused field here, it is a length n BYTES, not bits - boolean success = currVec.getMutator().setSafe(columnReader.valuesReadInCurrentPass, bytes, - (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits); + boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes, + (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits); assert success; columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4; columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4; @@ -174,11 +305,10 @@ public class VarLenBinaryReader { } for (NullableVarLengthColumn columnReader : nullableColumns) { bytes = columnReader.pageReadStatus.pageDataByteArray; - currNullVec = (NullableVarBinaryVector) columnReader.valueVecHolder.getValueVector(); // again, I am re-purposing the unused field here, it is a length n BYTES, not bits if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits > 0){ - boolean success = currNullVec.getMutator().setSafe(columnReader.valuesReadInCurrentPass, bytes, - (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits); + boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes, + (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits); assert success; } columnReader.currentValNull = false; @@ -195,10 +325,10 @@ public class VarLenBinaryReader { recordsReadInCurrentPass++; } while (recordsReadInCurrentPass < recordsToReadInThisPass); for (VarLengthColumn columnReader : columns) { - columnReader.valueVecHolder.getValueVector().getMutator().setValueCount((int) recordsReadInCurrentPass); + columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass); } for (NullableVarLengthColumn columnReader : nullableColumns) { - columnReader.valueVecHolder.getValueVector().getMutator().setValueCount((int) recordsReadInCurrentPass); + columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass); } return recordsReadInCurrentPass; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 9ba94fa..67b5394 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -103,6 +103,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ testParquetFullEngineLocalPath(planName, fileName, 2, numberRowGroups, recordsPerRowGroup); } + public String getPlanForFile(String pathFileName, String parquetFileName) throws IOException { + return Files.toString(FileUtils.getResourceAsFile(pathFileName), Charsets.UTF_8) + .replaceFirst("&REPLACED_IN_PARQUET_TEST&", parquetFileName); + } + @Test public void testMultipleRowGroupsAndReads2() throws Exception { String readEntries; @@ -273,15 +278,27 @@ public class ParquetRecordReaderTest extends BaseTestQuery{ @Ignore @Test + /** + * Tests the reading of nullable var length columns, runs the tests twice, once on a file that has + * a converted type of UTF-8 to make sure it can be read + */ public void testNullableColumnsVarLen() throws Exception { HashMap<String, FieldInfo> fields = new HashMap<>(); ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields); byte[] val = {'b'}; byte[] val2 = {'b', '2'}; - byte[] val3 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'}; - Object[] boolVals = { val, val2, val3}; + byte[] val3 = {'b', '3'}; + byte[] val4 = { 'l','o','n','g','e','r',' ','s','t','r','i','n','g'}; + Object[] boolVals = { val, val2, val4}; props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals, TypeProtos.MinorType.BIT, props)); + // testParquetFullEngineEventBased(false, "/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, props); + fields.clear(); + // pass strings instead of byte arrays + Object[] boolVals2 = { "b", "b2", "b3"}; + props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, TypeProtos.MinorType.BIT, props)); + testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json", + "\"/tmp/varLen.parquet/a\"", "unused", 1, props); } @Test http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f071aca7/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java index 73af98c..257a49e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java @@ -78,7 +78,10 @@ public class ParquetResultListener implements UserResultsListener { T val = (T) valueVector.getAccessor().getObject(index); if (val instanceof byte[]) { - assertEquals(true, Arrays.equals((byte[]) value, (byte[]) val)); + assert(Arrays.equals((byte[]) value, (byte[]) val)); + } + else if (val instanceof String) { + assert(val.equals(value)); } else { assertEquals(value, val); } @@ -120,16 +123,7 @@ public class ParquetResultListener implements UserResultsListener { } for (int j = 0; j < vv.getAccessor().getValueCount(); j++) { if (ParquetRecordReaderTest.VERBOSE_DEBUG){ - if (vv.getAccessor().getObject(j) instanceof byte[]){ - System.out.print("[len:" + ((byte[]) vv.getAccessor().getObject(j)).length + " - ("); - for (int k = 0; k < ((byte[]) vv.getAccessor().getObject(j)).length; k++){ - System.out.print((char)((byte[])vv.getAccessor().getObject(j))[k] + ","); - } - System.out.print(") ]"); - } - else{ - System.out.print(Strings.padStart(vv.getAccessor().getObject(j) + "", 20, ' ') + " "); - } + System.out.print(Strings.padStart(vv.getAccessor().getObject(j) + "", 20, ' ') + " "); System.out.print(", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); } if (testValues){ @@ -161,20 +155,9 @@ public class ParquetResultListener implements UserResultsListener { for (VectorWrapper vw : batchLoader) { ValueVector v = vw.getValueVector(); - if (v.getAccessor().getObject(i) instanceof byte[]){ - System.out.print("[len:" + ((byte[]) v.getAccessor().getObject(i)).length + " - ("); - for (int j = 0; j < ((byte[]) v.getAccessor().getObject(i)).length; j++){ - System.out.print(((byte[])v.getAccessor().getObject(i))[j] + ","); - } - System.out.print(") ]"); - } - else{ - System.out.print(Strings.padStart(v.getAccessor().getObject(i) + "", 20, ' ') + " "); - } + System.out.print(Strings.padStart(v.getAccessor().getObject(i) + "", 20, ' ') + " "); } - System.out.println( - - ); + System.out.println(); } } batchCounter++;
