http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 5320198..e38c51c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -19,8 +19,9 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.SchemaChangeException; - import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.BitVector; +import org.apache.drill.exec.vector.DateVector; import org.apache.drill.exec.vector.Decimal18Vector; import org.apache.drill.exec.vector.Decimal28SparseVector; import org.apache.drill.exec.vector.Decimal38SparseVector; @@ -28,7 +29,10 @@ import org.apache.drill.exec.vector.Decimal9Vector; import org.apache.drill.exec.vector.Float4Vector; import org.apache.drill.exec.vector.Float8Vector; import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.IntervalVector; import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableBitVector; +import org.apache.drill.exec.vector.NullableDateVector; import org.apache.drill.exec.vector.NullableDecimal18Vector; import org.apache.drill.exec.vector.NullableDecimal28SparseVector; import org.apache.drill.exec.vector.NullableDecimal38SparseVector; @@ -36,6 +40,7 @@ import org.apache.drill.exec.vector.NullableDecimal9Vector; import org.apache.drill.exec.vector.NullableFloat4Vector; import org.apache.drill.exec.vector.NullableFloat8Vector; import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.NullableIntervalVector; import org.apache.drill.exec.vector.NullableTimeStampVector; import org.apache.drill.exec.vector.NullableTimeVector; import org.apache.drill.exec.vector.NullableVarBinaryVector; @@ -63,7 +68,7 @@ public class ColumnReaderFactory { * @return * @throws SchemaChangeException */ - static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor, + static ColumnReader<?> createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v, SchemaElement schemaElement) throws Exception { @@ -73,24 +78,24 @@ public class ColumnReaderFactory { if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0){ if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement); + fixedLength, (BitVector) v, schemaElement); } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY || columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT96) { if (convertedType == ConvertedType.DECIMAL){ int length = schemaElement.type_length; if (length <= 12) { - return new FixedByteAlignedReader.Decimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + return new FixedByteAlignedReader.Decimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement); } else if (length <= 16) { - return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement); } } else if (convertedType == ConvertedType.INTERVAL) { - return new FixedByteAlignedReader.IntervalReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + return new FixedByteAlignedReader.IntervalReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntervalVector) v, schemaElement); } else { return new FixedByteAlignedReader.FixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, (VariableWidthVector) v, schemaElement); } } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ - return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, schemaElement); } else{ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { switch (columnChunkMetaData.getType()) { @@ -129,7 +134,7 @@ public class ColumnReaderFactory { } } else { - return new FixedByteAlignedReader(recordReader, allocateSize, descriptor, columnChunkMetaData, + return new FixedByteAlignedReader<>(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } } @@ -137,20 +142,20 @@ public class ColumnReaderFactory { else { // if the column is nullable if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){ return new NullableBitReader(recordReader, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement); + fixedLength, (NullableBitVector) v, schemaElement); } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){ - return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, schemaElement); } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { if (convertedType == ConvertedType.DECIMAL) { int length = schemaElement.type_length; if (length <= 12) { - return new NullableFixedByteAlignedReaders.NullableDecimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + return new NullableFixedByteAlignedReaders.NullableDecimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement); } else if (length <= 16) { - return new NullableFixedByteAlignedReaders.NullableDecimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + return new NullableFixedByteAlignedReaders.NullableDecimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement); } } else if (convertedType == ConvertedType.INTERVAL) { return new NullableFixedByteAlignedReaders.NullableIntervalReader(recordReader, allocateSize, descriptor, - columnChunkMetaData, fixedLength, v, schemaElement); + columnChunkMetaData, fixedLength, (NullableIntervalVector) v, schemaElement); } } else { return getNullableColumnReader(recordReader, allocateSize, descriptor, @@ -160,7 +165,7 @@ public class ColumnReaderFactory { throw new Exception("Unexpected parquet metadata configuration."); } - static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + static VarLengthValuesColumn<?> getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement ) throws ExecutionSetupException { @@ -202,7 +207,7 @@ public class ColumnReaderFactory { } } - public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize, + public static NullableColumnReader<?> getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor columnDescriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, @@ -214,7 +219,7 @@ public class ColumnReaderFactory { if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT96) { return new NullableFixedByteAlignedReaders.NullableFixedBinaryReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, true, (NullableVarBinaryVector) valueVec, schemaElement); }else{ - return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement); + return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader<>(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, fixedLength, valueVec, schemaElement); } } else { switch (columnDescriptor.getType()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java index 76aa073..d4b43d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java @@ -17,14 +17,11 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; -import io.netty.buffer.DrillBuf; - import java.math.BigDecimal; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; -import org.apache.drill.exec.expr.holders.IntervalHolder; import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.util.DecimalUtility; @@ -34,20 +31,20 @@ import org.apache.drill.exec.vector.Decimal38SparseVector; import org.apache.drill.exec.vector.IntervalVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; -import org.joda.time.DateTimeUtils; - import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.io.api.Binary; +import org.joda.time.DateTimeUtils; -class FixedByteAlignedReader extends ColumnReader { +import io.netty.buffer.DrillBuf; + +class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> { protected DrillBuf bytebuf; FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } @@ -71,7 +68,7 @@ class FixedByteAlignedReader extends ColumnReader { vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength); } - public static class FixedBinaryReader extends FixedByteAlignedReader { + public static class FixedBinaryReader extends FixedByteAlignedReader<VariableWidthVector> { // TODO - replace this with fixed binary type in drill VariableWidthVector castedVector; @@ -95,12 +92,12 @@ class FixedByteAlignedReader extends ColumnReader { } - public static abstract class ConvertedReader extends FixedByteAlignedReader { + public static abstract class ConvertedReader<V extends ValueVector> extends FixedByteAlignedReader<V> { protected int dataTypeLengthInBytes; ConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } @@ -120,14 +117,11 @@ class FixedByteAlignedReader extends ColumnReader { abstract void addNext(int start, int index); } - public static class DateReader extends ConvertedReader { - - private final DateVector.Mutator mutator; + public static class DateReader extends ConvertedReader<DateVector> { DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, DateVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - mutator = ((DateVector) v).getMutator(); } @Override @@ -139,19 +133,16 @@ class FixedByteAlignedReader extends ColumnReader { intValue = readIntLittleEndian(bytebuf, start); } - mutator.set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); } } - public static class Decimal28Reader extends ConvertedReader { - - Decimal28SparseVector decimal28Vector; + public static class Decimal28Reader extends ConvertedReader<Decimal28SparseVector> { Decimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, Decimal28SparseVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - decimal28Vector = (Decimal28SparseVector) v; } @Override @@ -159,19 +150,16 @@ class FixedByteAlignedReader extends ColumnReader { int width = Decimal28SparseHolder.WIDTH; BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); - DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getBuffer(), index * width, schemaElement.getScale(), + DecimalUtility.getSparseFromBigDecimal(intermediate, valueVec.getBuffer(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); } } - public static class Decimal38Reader extends ConvertedReader { - - Decimal38SparseVector decimal38Vector; + public static class Decimal38Reader extends ConvertedReader<Decimal38SparseVector> { Decimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, Decimal38SparseVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - decimal38Vector = (Decimal38SparseVector) v; } @Override @@ -179,30 +167,27 @@ class FixedByteAlignedReader extends ColumnReader { int width = Decimal38SparseHolder.WIDTH; BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); - DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getBuffer(), index * width, schemaElement.getScale(), + DecimalUtility.getSparseFromBigDecimal(intermediate, valueVec.getBuffer(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); } } - public static class IntervalReader extends ConvertedReader { - IntervalVector intervalVector; - + public static class IntervalReader extends ConvertedReader<IntervalVector> { IntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, IntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - intervalVector = (IntervalVector) v; } @Override void addNext(int start, int index) { if (usingDictionary) { byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes(); - intervalVector.getMutator().setSafe(index * 12, + valueVec.getMutator().setSafe(index * 12, ParquetReaderUtility.getIntFromLEBytes(input, 0), ParquetReaderUtility.getIntFromLEBytes(input, 4), ParquetReaderUtility.getIntFromLEBytes(input, 8)); } - intervalVector.getMutator().setSafe(index, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8)); + valueVec.getMutator().setSafe(index, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8)); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java index 501f5a6..f70c8d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java @@ -21,17 +21,15 @@ import java.io.IOException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.BaseDataValueVector; -import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.apache.drill.exec.vector.UInt4Vector; - +import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -public class FixedWidthRepeatedReader extends VarLengthColumn { +public class FixedWidthRepeatedReader extends VarLengthColumn<RepeatedValueVector> { - RepeatedValueVector castedRepeatedVector; - ColumnReader dataReader; + ColumnReader<?> dataReader; int dataTypeLengthInBytes; // we can do a vector copy of the data once we figure out how much we need to copy // this tracks the number of values to transfer (the dataReader will translate this to a number @@ -48,9 +46,8 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { boolean notFishedReadingList; byte[] leftOverBytes; - FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException { + FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader<?> dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement); - this.castedRepeatedVector = valueVector; this.dataTypeLengthInBytes = dataTypeLengthInBytes; this.dataReader = dataReader; this.dataReader.pageReader.clear(); @@ -66,7 +63,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { bytesReadInCurrentPass = 0; valuesReadInCurrentPass = 0; pageReader.valuesReadyToRead = 0; - dataReader.vectorData = BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer(); + dataReader.vectorData = BaseDataValueVector.class.cast(valueVec.getDataVector()).getBuffer(); dataReader.valuesReadInCurrentPass = 0; repeatedGroupsReadInCurrentPass = 0; } @@ -145,12 +142,10 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { @Override protected boolean readAndStoreValueSizeInformation() { - boolean readingValsAcrossPageBoundary = false; int numLeftoverVals = 0; if (notFishedReadingList) { numLeftoverVals = repeatedValuesInCurrentList; readRecords(numLeftoverVals); - readingValsAcrossPageBoundary = true; notFishedReadingList = false; pageReader.valuesReadyToRead = 0; try { @@ -196,12 +191,8 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { } else { repeatedValuesInCurrentList = 0; } - int currentValueListLength = repeatedValuesInCurrentList; - if (readingValsAcrossPageBoundary) { - currentValueListLength += numLeftoverVals; - } // this should not fail - final UInt4Vector offsets = castedRepeatedVector.getOffsetVector(); + final UInt4Vector offsets = valueVec.getOffsetVector(); offsets.getMutator().setSafe(repeatedGroupsReadInCurrentPass + 1, offsets.getAccessor().get(repeatedGroupsReadInCurrentPass)); // This field is being referenced in the superclass determineSize method, so we need to set it here // again going to make this the length in BYTES to avoid repetitive multiplication/division @@ -219,13 +210,13 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { dataReader.valuesReadInCurrentPass = 0; dataReader.readValues(valuesToRead); valuesReadInCurrentPass += valuesToRead; - castedRepeatedVector.getMutator().setValueCount(repeatedGroupsReadInCurrentPass); - castedRepeatedVector.getDataVector().getMutator().setValueCount(valuesReadInCurrentPass); + valueVec.getMutator().setValueCount(repeatedGroupsReadInCurrentPass); + valueVec.getDataVector().getMutator().setValueCount(valuesReadInCurrentPass); } @Override public int capacity() { - return BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer().capacity(); + return BaseDataValueVector.class.cast(valueVec.getDataVector()).getBuffer().capacity(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java index bbc45a7..5a58831 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java @@ -19,8 +19,6 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.NullableBitVector; -import org.apache.drill.exec.vector.ValueVector; - import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -33,10 +31,10 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; * because page/batch boundaries that do not land on byte boundaries require shifting of all of the values * in the next batch. */ -final class NullableBitReader extends ColumnReader { +final class NullableBitReader extends ColumnReader<NullableBitVector> { NullableBitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, NullableBitVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } @@ -50,7 +48,7 @@ final class NullableBitReader extends ColumnReader { defLevel = pageReader.definitionLevels.readInteger(); // if the value is defined if (defLevel == columnDescriptor.getMaxDefinitionLevel()){ - ((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass, + valueVec.getMutator().setSafe(i + valuesReadInCurrentPass, pageReader.valueReader.readBoolean() ? 1 : 0 ); } // otherwise the value is skipped, because the bit vector indicating nullability is zero filled http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index 5259345..800d422 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; -import io.netty.buffer.DrillBuf; - import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -42,20 +40,21 @@ import org.apache.drill.exec.vector.NullableTimeStampVector; import org.apache.drill.exec.vector.NullableTimeVector; import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.ValueVector; -import org.joda.time.DateTimeUtils; - import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.io.api.Binary; +import org.joda.time.DateTimeUtils; + +import io.netty.buffer.DrillBuf; public class NullableFixedByteAlignedReaders { - static class NullableFixedByteAlignedReader extends NullableColumnReader { + static class NullableFixedByteAlignedReader<V extends ValueVector> extends NullableColumnReader<V> { protected DrillBuf bytebuf; NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } @@ -74,21 +73,17 @@ public class NullableFixedByteAlignedReaders { * a fixed length binary type, so this is read into a varbinary with the same size recorded for * each value. */ - static class NullableFixedBinaryReader extends NullableFixedByteAlignedReader { - - NullableVarBinaryVector castedVector; - + static class NullableFixedBinaryReader extends NullableFixedByteAlignedReader<NullableVarBinaryVector> { NullableFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarBinaryVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } @Override protected void readField(long recordsToReadInThisPass) { this.bytebuf = pageReader.pageData; if (usingDictionary) { - NullableVarBinaryVector.Mutator mutator = castedVector.getMutator(); + NullableVarBinaryVector.Mutator mutator = valueVec.getMutator(); Binary currDictValToWrite; for (int i = 0; i < recordsReadInThisIteration; i++){ currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); @@ -107,7 +102,7 @@ public class NullableFixedByteAlignedReaders { // for now we need to write the lengths of each value int byteLength = dataTypeLengthInBits / 8; for (int i = 0; i < recordsToReadInThisPass; i++) { - castedVector.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength); + valueVec.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength); } } } @@ -295,12 +290,12 @@ public class NullableFixedByteAlignedReaders { } } - static abstract class NullableConvertedReader extends NullableFixedByteAlignedReader { + static abstract class NullableConvertedReader<V extends ValueVector> extends NullableFixedByteAlignedReader<V> { protected int dataTypeLengthInBytes; NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } @@ -318,14 +313,10 @@ public class NullableFixedByteAlignedReaders { abstract void addNext(int start, int index); } - public static class NullableDateReader extends NullableConvertedReader { - - NullableDateVector dateVector; - + public static class NullableDateReader extends NullableConvertedReader<NullableDateVector> { NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, NullableDateVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - dateVector = (NullableDateVector) v; } @Override @@ -337,19 +328,15 @@ public class NullableFixedByteAlignedReaders { intValue = readIntLittleEndian(bytebuf, start); } - dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); } } - public static class NullableDecimal28Reader extends NullableConvertedReader { - - NullableDecimal28SparseVector decimal28Vector; - + public static class NullableDecimal28Reader extends NullableConvertedReader<NullableDecimal28SparseVector> { NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, NullableDecimal28SparseVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - decimal28Vector = (NullableDecimal28SparseVector) v; } @Override @@ -357,19 +344,15 @@ public class NullableFixedByteAlignedReaders { int width = NullableDecimal28SparseHolder.WIDTH; BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); - DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getBuffer(), index * width, schemaElement.getScale(), + DecimalUtility.getSparseFromBigDecimal(intermediate, valueVec.getBuffer(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits); } } - public static class NullableDecimal38Reader extends NullableConvertedReader { - - NullableDecimal38SparseVector decimal38Vector; - + public static class NullableDecimal38Reader extends NullableConvertedReader<NullableDecimal38SparseVector> { NullableDecimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, NullableDecimal38SparseVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - decimal38Vector = (NullableDecimal38SparseVector) v; } @Override @@ -377,30 +360,27 @@ public class NullableFixedByteAlignedReaders { int width = NullableDecimal38SparseHolder.WIDTH; BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); - DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getBuffer(), index * width, schemaElement.getScale(), + DecimalUtility.getSparseFromBigDecimal(intermediate, valueVec.getBuffer(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits); } } - public static class NullableIntervalReader extends NullableConvertedReader { - NullableIntervalVector nullableIntervalVector; - + public static class NullableIntervalReader extends NullableConvertedReader<NullableIntervalVector> { NullableIntervalReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, NullableIntervalVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - nullableIntervalVector = (NullableIntervalVector) v; } @Override void addNext(int start, int index) { if (usingDictionary) { byte[] input = pageReader.dictionaryValueReader.readBytes().getBytes(); - nullableIntervalVector.getMutator().setSafe(index * 12, 1, + valueVec.getMutator().setSafe(index * 12, 1, ParquetReaderUtility.getIntFromLEBytes(input, 0), ParquetReaderUtility.getIntFromLEBytes(input, 4), ParquetReaderUtility.getIntFromLEBytes(input, 8)); } - nullableIntervalVector.getMutator().set(index, 1, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8)); + valueVec.getMutator().set(index, 1, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8)); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 77a2161..e7b4b6e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -17,10 +17,7 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; import static org.apache.parquet.column.Encoding.valueOf; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.DrillBuf; import java.io.IOException; import java.nio.ByteBuffer; @@ -35,13 +32,11 @@ import org.apache.drill.exec.store.parquet.ParquetReaderStats; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ValuesType; import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; import org.apache.parquet.format.PageHeader; @@ -56,13 +51,16 @@ import org.apache.parquet.schema.PrimitiveType; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + // class to keep track of the read position of variable length columns final class PageReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class); public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter; - private final ColumnReader parentColumnReader; + private final ColumnReader<?> parentColumnReader; private final ColumnDataReader dataReader; // buffer to store bytes of current page @@ -242,9 +240,6 @@ final class PageReader { currentPageCount = pageHeader.data_page_header.num_values; - final int uncompressedPageSize = pageHeader.uncompressed_page_size; - final Statistics<?> stats = fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader - .getColumnDescriptor().getType()); final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding); final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding); http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java index a8e6c2c..00bf5f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -26,7 +26,6 @@ import org.apache.drill.exec.vector.Float8Vector; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.TimeStampVector; import org.apache.drill.exec.vector.TimeVector; - import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; @@ -35,15 +34,11 @@ import org.apache.parquet.io.api.Binary; public class ParquetFixedWidthDictionaryReaders { - static class DictionaryIntReader extends FixedByteAlignedReader { - - IntVector castedVector; - + static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> { DictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -55,21 +50,17 @@ public class ParquetFixedWidthDictionaryReaders { if (usingDictionary) { for (int i = 0; i < recordsReadInThisIteration; i++){ - castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); } } } } - static class DictionaryFixedBinaryReader extends FixedByteAlignedReader { - - VarBinaryVector castedVector; - + static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> { DictionaryFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -82,7 +73,7 @@ public class ParquetFixedWidthDictionaryReaders { readLength = (int) Math.ceil(readLengthInBits / 8.0); if (usingDictionary) { - VarBinaryVector.Mutator mutator = castedVector.getMutator(); + VarBinaryVector.Mutator mutator = valueVec.getMutator(); Binary currDictValToWrite = null; for (int i = 0; i < recordsReadInThisIteration; i++){ currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); @@ -92,8 +83,8 @@ public class ParquetFixedWidthDictionaryReaders { // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding // and we will go into the else condition below. The readField method of the parent class requires the // writer index to be set correctly. - int writerIndex = castedVector.getBuffer().writerIndex(); - castedVector.getBuffer().setIndex(0, writerIndex + (int)readLength); + int writerIndex = valueVec.getBuffer().writerIndex(); + valueVec.getBuffer().setIndex(0, writerIndex + (int)readLength); } else { super.readField(recordsToReadInThisPass); } @@ -102,20 +93,16 @@ public class ParquetFixedWidthDictionaryReaders { // now we need to write the lengths of each value int byteLength = dataTypeLengthInBits / 8; for (int i = 0; i < recordsToReadInThisPass; i++) { - castedVector.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength); + valueVec.getMutator().setValueLengthSafe(valuesReadInCurrentPass + i, byteLength); } } } - static class DictionaryDecimal9Reader extends FixedByteAlignedReader { - - Decimal9Vector castedVector; - + static class DictionaryDecimal9Reader extends FixedByteAlignedReader<Decimal9Vector> { DictionaryDecimal9Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal9Vector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -127,21 +114,17 @@ public class ParquetFixedWidthDictionaryReaders { if (usingDictionary) { for (int i = 0; i < recordsReadInThisIteration; i++){ - castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); } } } } - static class DictionaryTimeReader extends FixedByteAlignedReader { - - TimeVector castedVector; - + static class DictionaryTimeReader extends FixedByteAlignedReader<TimeVector> { DictionaryTimeReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -153,21 +136,17 @@ public class ParquetFixedWidthDictionaryReaders { if (usingDictionary) { for (int i = 0; i < recordsReadInThisIteration; i++){ - castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); } } } } - static class DictionaryBigIntReader extends FixedByteAlignedReader { - - BigIntVector castedVector; - + static class DictionaryBigIntReader extends FixedByteAlignedReader<BigIntVector> { DictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -179,7 +158,7 @@ public class ParquetFixedWidthDictionaryReaders { for (int i = 0; i < recordsReadInThisIteration; i++){ try { - castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); } catch ( Exception ex) { throw ex; } @@ -187,15 +166,11 @@ public class ParquetFixedWidthDictionaryReaders { } } - static class DictionaryDecimal18Reader extends FixedByteAlignedReader { - - Decimal18Vector castedVector; - + static class DictionaryDecimal18Reader extends FixedByteAlignedReader<Decimal18Vector> { DictionaryDecimal18Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal18Vector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -207,7 +182,7 @@ public class ParquetFixedWidthDictionaryReaders { for (int i = 0; i < recordsReadInThisIteration; i++){ try { - castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); } catch ( Exception ex) { throw ex; } @@ -215,15 +190,11 @@ public class ParquetFixedWidthDictionaryReaders { } } - static class DictionaryTimeStampReader extends FixedByteAlignedReader { - - TimeStampVector castedVector; - + static class DictionaryTimeStampReader extends FixedByteAlignedReader<TimeStampVector> { DictionaryTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -235,7 +206,7 @@ public class ParquetFixedWidthDictionaryReaders { for (int i = 0; i < recordsReadInThisIteration; i++){ try { - castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); } catch ( Exception ex) { throw ex; } @@ -243,15 +214,11 @@ public class ParquetFixedWidthDictionaryReaders { } } - static class DictionaryFloat4Reader extends FixedByteAlignedReader { - - Float4Vector castedVector; - + static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> { DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -261,20 +228,16 @@ public class ParquetFixedWidthDictionaryReaders { - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); for (int i = 0; i < recordsReadInThisIteration; i++){ - castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); } } } - static class DictionaryFloat8Reader extends FixedByteAlignedReader { - - Float8Vector castedVector; - + static class DictionaryFloat8Reader extends FixedByteAlignedReader<Float8Vector> { DictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - castedVector = v; } // this method is called by its superclass during a read loop @@ -284,7 +247,7 @@ public class ParquetFixedWidthDictionaryReaders { - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); for (int i = 0; i < recordsReadInThisIteration; i++){ - castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index da3b067..23c0759 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -42,8 +42,8 @@ import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.parquet.ParquetReaderStats; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; @@ -278,13 +278,13 @@ public class ParquetRecordReader extends AbstractRecordReader { try { ValueVector vector; SchemaElement schemaElement; - final ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>(); + final ArrayList<VarLengthColumn<? extends ValueVector>> varLengthColumns = new ArrayList<>(); // initialize all of the column read status objects boolean fieldFixedLength; // the column chunk meta-data is not guaranteed to be in the same order as the columns in the schema // a map is constructed for fast access to the correct columnChunkMetadata to correspond // to an element in the schema - Map<String, Integer> columnChunkMetadataPositionsInList = new HashMap(); + Map<String, Integer> columnChunkMetadataPositionsInList = new HashMap<>(); BlockMetaData rowGroupMetadata = footer.getBlocks().get(rowGroupIndex); int colChunkIndex = 0; @@ -309,7 +309,7 @@ public class ParquetRecordReader extends AbstractRecordReader { if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { if (column.getMaxRepetitionLevel() > 0) { final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector); - ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, + ColumnReader<?> dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, repeatedVector.getDataVector(), schemaElement); varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader, @@ -470,7 +470,7 @@ public class ParquetRecordReader extends AbstractRecordReader { // limit kills upstream operators once it has enough records, so this assert will fail // assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount(); if (columnStatuses != null) { - for (final ColumnReader column : columnStatuses) { + for (final ColumnReader<?> column : columnStatuses) { column.clear(); } columnStatuses.clear(); http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java index 68a7e2a..6ca0205 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java @@ -23,9 +23,9 @@ import java.util.List; public class VarLenBinaryReader { ParquetRecordReader parentReader; - final List<VarLengthColumn> columns; + final List<VarLengthColumn<?>> columns; - public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns) { + public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<?>> columns) { this.parentReader = parentReader; this.columns = columns; } @@ -38,20 +38,20 @@ public class VarLenBinaryReader { * @return - the number of fixed length fields that will fit in the batch * @throws IOException */ - public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException { + public long readFields(long recordsToReadInThisPass, ColumnReader<?> firstColumnStatus) throws IOException { long recordsReadInCurrentPass = 0; int lengthVarFieldsInCurrentRecord; long totalVariableLengthData = 0; boolean exitLengthDeterminingLoop = false; // write the first 0 offset - for (VarLengthColumn columnReader : columns) { + for (VarLengthColumn<?> columnReader : columns) { columnReader.reset(); } do { lengthVarFieldsInCurrentRecord = 0; - for (VarLengthColumn columnReader : columns) { + for (VarLengthColumn<?> columnReader : columns) { if ( !exitLengthDeterminingLoop ) { exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); } else { @@ -63,7 +63,7 @@ public class VarLenBinaryReader { + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { break; } - for (VarLengthColumn columnReader : columns ) { + for (VarLengthColumn<?> columnReader : columns ) { columnReader.updateReadyToReadPosition(); columnReader.currDefLevel = -1; } @@ -71,10 +71,10 @@ public class VarLenBinaryReader { totalVariableLengthData += lengthVarFieldsInCurrentRecord; } while (recordsReadInCurrentPass < recordsToReadInThisPass); - for (VarLengthColumn columnReader : columns) { + for (VarLengthColumn<?> columnReader : columns) { columnReader.readRecords(columnReader.pageReader.valuesReadyToRead); } - for (VarLengthColumn columnReader : columns) { + for (VarLengthColumn<?> columnReader : columns) { columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass); } return recordsReadInCurrentPass; http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java index a62e8c5..17f9fc6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java @@ -21,14 +21,13 @@ import java.io.IOException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; - import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.Encoding; import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.io.api.Binary; -public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader { +public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader<V> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumn.class); Binary currDictVal; http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java index 9efcf4a..632cf66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java @@ -26,18 +26,18 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; -import com.carrotsearch.hppc.cursors.ObjectLongCursor; -import com.google.common.collect.Iterators; -import com.google.common.collect.Maps; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.server.DrillbitContext; +import com.carrotsearch.hppc.cursors.ObjectLongCursor; import com.google.common.base.Stopwatch; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Iterators; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; -import org.apache.drill.exec.server.DrillbitContext; +import com.google.common.collect.Maps; /** * The AssignmentCreator is responsible for assigning a set of work units to the available slices. @@ -96,7 +96,7 @@ public class AssignmentCreator<T extends CompleteWork> { if (useOldAssignmentCode) { return OldAssignmentCreator.getMappings(incomingEndpoints, units); } else { - AssignmentCreator<T> creator = new AssignmentCreator(incomingEndpoints, units); + AssignmentCreator<T> creator = new AssignmentCreator<>(incomingEndpoints, units); return creator.getMappings(); } } @@ -185,20 +185,20 @@ public class AssignmentCreator<T extends CompleteWork> { for (ObjectLongCursor<DrillbitEndpoint> cursor : work.getByteMap()) { final DrillbitEndpoint ep = cursor.key; final Long val = cursor.value; - Map.Entry<DrillbitEndpoint,Long> entry = new Entry() { + Map.Entry<DrillbitEndpoint,Long> entry = new Entry<DrillbitEndpoint, Long>() { @Override - public Object getKey() { + public DrillbitEndpoint getKey() { return ep; } @Override - public Object getValue() { + public Long getValue() { return val; } @Override - public Object setValue(Object value) { + public Long setValue(Long value) { throw new UnsupportedOperationException(); } }; http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java index 1d73001..7f99fb7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java @@ -98,7 +98,7 @@ public enum SystemTable { return distributed; } - public Class getPojoClass() { + public Class<?> getPojoClass() { return pojoClass; } http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java index 4b9d357..9aa33e6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java @@ -28,7 +28,7 @@ import com.sun.codemodel.JExpression; import com.sun.codemodel.JVar; public class CopyUtil { - public static void generateCopies(ClassGenerator g, VectorAccessible batch, boolean hyper){ + public static void generateCopies(ClassGenerator<?> g, VectorAccessible batch, boolean hyper){ // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all. int fieldId = 0; http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 735aaa2..2118e16 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -17,7 +17,7 @@ */ package org.apache.parquet.hadoop; -import io.netty.buffer.ByteBuf; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; import java.io.IOException; import java.nio.ByteBuffer; @@ -27,13 +27,11 @@ import java.util.List; import java.util.Map; import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.store.parquet.ColumnDataReader; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.DataPage; @@ -50,7 +48,7 @@ import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.util.CompatibilityUtil; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; +import io.netty.buffer.ByteBuf; public class ColumnChunkIncReadStore implements PageReadStore { @@ -62,7 +60,7 @@ public class ColumnChunkIncReadStore implements PageReadStore { private FileSystem fs; private Path path; private long rowCount; - private List<FSDataInputStream> streams = new ArrayList(); + private List<FSDataInputStream> streams = new ArrayList<>(); public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) { @@ -239,7 +237,7 @@ public class ColumnChunkIncReadStore implements PageReadStore { } } - private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap(); + private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap<>(); public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) throws IOException { FSDataInputStream in = fs.open(path); http://git-wip-us.apache.org/repos/asf/drill/blob/2ffe3117/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java index c9f6eae..be017dc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java @@ -81,8 +81,6 @@ public class DrillTestWrapper { private UserBitShared.QueryType baselineQueryType; // should ordering be enforced in the baseline check private boolean ordered; - // TODO - implement this - private boolean approximateEquality; private BufferAllocator allocator; // queries to run before the baseline or test queries, can be used to set options private String baselineOptionSettingQueries; @@ -96,21 +94,20 @@ public class DrillTestWrapper { // if the baseline is a single option test writers can provide the baseline values and columns // without creating a file, these are provided to the builder in the baselineValues() and baselineColumns() methods // and translated into a map in the builder - private List<Map> baselineRecords; + private List<Map<String, Object>> baselineRecords; private int expectedNumBatches; public DrillTestWrapper(TestBuilder testBuilder, BufferAllocator allocator, String query, QueryType queryType, String baselineOptionSettingQueries, String testOptionSettingQueries, - QueryType baselineQueryType, boolean ordered, boolean approximateEquality, - boolean highPerformanceComparison, List<Map> baselineRecords, int expectedNumBatches) { + QueryType baselineQueryType, boolean ordered, boolean highPerformanceComparison, + List<Map<String, Object>> baselineRecords, int expectedNumBatches) { this.testBuilder = testBuilder; this.allocator = allocator; this.query = query; this.queryType = queryType; this.baselineQueryType = baselineQueryType; this.ordered = ordered; - this.approximateEquality = approximateEquality; this.baselineOptionSettingQueries = baselineOptionSettingQueries; this.testOptionSettingQueries = testOptionSettingQueries; this.highPerformanceComparison = highPerformanceComparison; @@ -159,13 +156,13 @@ public class DrillTestWrapper { } } - private void compareMergedVectors(Map<String, List> expectedRecords, Map<String, List> actualRecords) throws Exception { + private void compareMergedVectors(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords) throws Exception { for (String s : actualRecords.keySet()) { assertNotNull("Unexpected extra column " + s + " returned by query.", expectedRecords.get(s)); assertEquals("Incorrect number of rows returned by query.", expectedRecords.get(s).size(), actualRecords.get(s).size()); - List expectedValues = expectedRecords.get(s); - List actualValues = actualRecords.get(s); + List<?> expectedValues = expectedRecords.get(s); + List<?> actualValues = actualRecords.get(s); assertEquals("Different number of records returned", expectedValues.size(), actualValues.size()); for (int i = 0; i < expectedValues.size(); i++) { @@ -181,24 +178,24 @@ public class DrillTestWrapper { } } - private String printNearbyRecords(Map<String, List> expectedRecords, Map<String, List> actualRecords, int offset) { + private String printNearbyRecords(Map<String, List<Object>> expectedRecords, Map<String, List<Object>> actualRecords, int offset) { StringBuilder expected = new StringBuilder(); StringBuilder actual = new StringBuilder(); expected.append("Expected Records near verification failure:\n"); actual.append("Actual Records near verification failure:\n"); int firstRecordToPrint = Math.max(0, offset - 5); - List<Object> expectedValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); - List<Object> actualValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); + List<?> expectedValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); + List<?> actualValuesInFirstColumn = expectedRecords.get(expectedRecords.keySet().iterator().next()); int numberOfRecordsToPrint = Math.min(Math.min(10, expectedValuesInFirstColumn.size()), actualValuesInFirstColumn.size()); for (int i = firstRecordToPrint; i < numberOfRecordsToPrint; i++) { expected.append("Record Number: ").append(i).append(" { "); actual.append("Record Number: ").append(i).append(" { "); for (String s : actualRecords.keySet()) { - List actualValues = actualRecords.get(s); + List<?> actualValues = actualRecords.get(s); actual.append(s).append(" : ").append(actualValues.get(i)).append(","); } for (String s : expectedRecords.keySet()) { - List expectedValues = expectedRecords.get(s); + List<?> expectedValues = expectedRecords.get(s); expected.append(s).append(" : ").append(expectedValues.get(i)).append(","); } expected.append(" }\n"); @@ -212,7 +209,7 @@ public class DrillTestWrapper { private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryDataBatch> records, RecordBatchLoader loader, BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes - Map<String, HyperVectorValueIterator> combinedVectors = new HashMap(); + Map<String, HyperVectorValueIterator> combinedVectors = new HashMap<>(); long totalRecords = 0; QueryDataBatch batch; @@ -223,14 +220,13 @@ public class DrillTestWrapper { loader.load(batch.getHeader().getDef(), batch.getData()); logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); totalRecords += loader.getRecordCount(); - for (VectorWrapper w : loader) { + for (VectorWrapper<?> w : loader) { String field = SchemaPath.getSimplePath(w.getField().getPath()).toExpr(); if (!combinedVectors.containsKey(field)) { MaterializedField mf = w.getField(); ValueVector[] vvList = (ValueVector[]) Array.newInstance(mf.getValueClass(), 1); vvList[0] = w.getValueVector(); - combinedVectors.put(SchemaPath.getSimplePath(mf.getPath()).toExpr(), new HyperVectorValueIterator(mf, new HyperVectorWrapper(mf, - vvList))); + combinedVectors.put(field, new HyperVectorValueIterator(mf, new HyperVectorWrapper<>(mf, vvList))); } else { combinedVectors.get(field).getHyperVector().addVector(w.getValueVector()); } @@ -256,10 +252,10 @@ public class DrillTestWrapper { * @throws SchemaChangeException * @throws UnsupportedEncodingException */ - private Map<String, List> addToCombinedVectorResults(List<QueryDataBatch> records, RecordBatchLoader loader, + private Map<String, List<Object>> addToCombinedVectorResults(List<QueryDataBatch> records, RecordBatchLoader loader, BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException { // TODO - this does not handle schema changes - Map<String, List> combinedVectors = new HashMap(); + Map<String, List<Object>> combinedVectors = new HashMap<>(); long totalRecords = 0; QueryDataBatch batch; @@ -272,12 +268,12 @@ public class DrillTestWrapper { if (schema == null) { schema = loader.getSchema(); for (MaterializedField mf : schema) { - combinedVectors.put(SchemaPath.getSimplePath(mf.getPath()).toExpr(), new ArrayList()); + combinedVectors.put(SchemaPath.getSimplePath(mf.getPath()).toExpr(), new ArrayList<Object>()); } } logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords); totalRecords += loader.getRecordCount(); - for (VectorWrapper w : loader) { + for (VectorWrapper<?> w : loader) { String field = SchemaPath.getSimplePath(w.getField().getPath()).toExpr(); for (int j = 0; j < loader.getRecordCount(); j++) { Object obj = w.getValueVector().getAccessor().getObject(j); @@ -345,10 +341,10 @@ public class DrillTestWrapper { RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); BatchSchema schema = null; - List<QueryDataBatch> actual = Collections.EMPTY_LIST; - List<QueryDataBatch> expected = Collections.EMPTY_LIST; - List<Map> expectedRecords = new ArrayList<>(); - List<Map> actualRecords = new ArrayList<>(); + List<QueryDataBatch> actual = Collections.emptyList(); + List<QueryDataBatch> expected = Collections.emptyList(); + List<Map<String, Object>> expectedRecords = new ArrayList<>(); + List<Map<String, Object>> actualRecords = new ArrayList<>(); try { BaseTestQuery.test(testOptionSettingQueries); @@ -396,10 +392,10 @@ public class DrillTestWrapper { RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); BatchSchema schema = null; - List<QueryDataBatch> actual = Collections.EMPTY_LIST;; - List<QueryDataBatch> expected = Collections.EMPTY_LIST; - Map<String, List> actualSuperVectors; - Map<String, List> expectedSuperVectors; + List<QueryDataBatch> actual = Collections.emptyList(); + List<QueryDataBatch> expected = Collections.emptyList(); + Map<String, List<Object>> actualSuperVectors; + Map<String, List<Object>> expectedSuperVectors; try { BaseTestQuery.test(testOptionSettingQueries); @@ -421,10 +417,10 @@ public class DrillTestWrapper { } else { // data is built in the TestBuilder in a row major format as it is provided by the user // translate it here to vectorized, the representation expected by the ordered comparison - expectedSuperVectors = new HashMap(); - expected = new ArrayList(); - for (String s : ((Map<String, Object>)baselineRecords.get(0)).keySet()) { - expectedSuperVectors.put(s, new ArrayList()); + expectedSuperVectors = new HashMap<>(); + expected = new ArrayList<>(); + for (String s : baselineRecords.get(0).keySet()) { + expectedSuperVectors.put(s, new ArrayList<>()); } for (Map<String, Object> m : baselineRecords) { for (String s : m.keySet()) { @@ -481,7 +477,7 @@ public class DrillTestWrapper { } private Map<SchemaPath, TypeProtos.MajorType> getTypeMapFromBatch(QueryDataBatch batch) { - Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap(); + Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap<>(); for (int i = 0; i < batch.getHeader().getDef().getFieldCount(); i++) { typeMap.put(SchemaPath.getSimplePath(MaterializedField.create(batch.getHeader().getDef().getField(i)).getPath()), batch.getHeader().getDef().getField(i).getMajorType()); @@ -489,7 +485,8 @@ public class DrillTestWrapper { return typeMap; } - private void cleanupBatches(List<QueryDataBatch>... results) { + @SafeVarargs + private final void cleanupBatches(List<QueryDataBatch>... results) { for (List<QueryDataBatch> resultList : results ) { for (QueryDataBatch result : resultList) { result.release(); @@ -497,7 +494,7 @@ public class DrillTestWrapper { } } - protected void addToMaterializedResults(List<Map> materializedRecords, List<QueryDataBatch> records, RecordBatchLoader loader, + protected void addToMaterializedResults(List<Map<String, Object>> materializedRecords, List<QueryDataBatch> records, RecordBatchLoader loader, BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException { long totalRecords = 0; QueryDataBatch batch; @@ -514,7 +511,7 @@ public class DrillTestWrapper { totalRecords += loader.getRecordCount(); for (int j = 0; j < loader.getRecordCount(); j++) { HashMap<String, Object> record = new HashMap<>(); - for (VectorWrapper w : loader) { + for (VectorWrapper<?> w : loader) { Object obj = w.getValueVector().getAccessor().getObject(j); if (obj != null) { if (obj instanceof Text) { @@ -599,7 +596,7 @@ public class DrillTestWrapper { * @param actualRecords - list of records from test query, WARNING - this list is destroyed in this method * @throws Exception */ - private void compareResults(List<Map> expectedRecords, List<Map> actualRecords) throws Exception { + private void compareResults(List<Map<String, Object>> expectedRecords, List<Map<String, Object>> actualRecords) throws Exception { assertEquals("Different number of records returned", expectedRecords.size(), actualRecords.size()); @@ -662,7 +659,7 @@ public class DrillTestWrapper { return "Expected column(s) " + missingCols + " not found in result set: " + actual + "."; } - private String printRecord(Map<String, Object> record) { + private String printRecord(Map<String, ?> record) { String ret = ""; for (String s : record.keySet()) { ret += s + " : " + record.get(s) + ", ";
