Repository: spark Updated Branches: refs/heads/master 238fb485b -> 54794113a
[SPARK-13989] [SQL] Remove non-vectorized/unsafe-row parquet record reader ## What changes were proposed in this pull request? This PR cleans up the new parquet record reader with the following changes: 1. Removes the non-vectorized parquet reader code from `UnsafeRowParquetRecordReader`. 2. Removes the non-vectorized column reader code from `ColumnReader`. 3. Renames `UnsafeRowParquetRecordReader` to `VectorizedParquetRecordReader` and `ColumnReader` to `VectorizedColumnReader` 4. Deprecate `PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED` ## How was this patch tested? Refactoring only; Existing tests should reveal any problems. Author: Sameer Agarwal <[email protected]> Closes #11799 from sameeragarwal/vectorized-parquet. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54794113 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54794113 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54794113 Branch: refs/heads/master Commit: 54794113a6a906b0f9c6bfb9da322e18e007214c Parents: 238fb48 Author: Sameer Agarwal <[email protected]> Authored: Fri Mar 18 14:04:42 2016 -0700 Committer: Davies Liu <[email protected]> Committed: Fri Mar 18 14:04:42 2016 -0700 ---------------------------------------------------------------------- .../parquet/UnsafeRowParquetRecordReader.java | 946 ------------------- .../parquet/VectorizedParquetRecordReader.java | 701 ++++++++++++++ .../spark/sql/execution/command/commands.scala | 9 + .../execution/datasources/SqlNewHadoopRDD.scala | 20 +- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +- .../parquet/ParquetEncodingSuite.scala | 4 +- .../parquet/ParquetFilterSuite.scala | 6 +- .../datasources/parquet/ParquetIOSuite.scala | 14 +- .../parquet/ParquetReadBenchmark.scala | 61 +- 9 files changed, 739 insertions(+), 1028 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java deleted file mode 100644 index 7234726..0000000 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ /dev/null @@ -1,946 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.parquet.Preconditions; -import org.apache.parquet.bytes.BytesUtils; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.Dictionary; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.page.*; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; - -import org.apache.spark.memory.MemoryMode; -import org.apache.spark.sql.catalyst.expressions.UnsafeRow; -import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; -import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; -import org.apache.spark.sql.execution.vectorized.ColumnVector; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.unsafe.types.UTF8String; - -import static org.apache.parquet.column.ValuesType.*; - -/** - * A specialized RecordReader that reads into UnsafeRows directly using the Parquet column APIs. - * - * This is somewhat based on parquet-mr's ColumnReader. - * - * TODO: handle complex types, decimal requiring more than 8 bytes, INT96. Schema mismatch. - * All of these can be handled efficiently and easily with codegen. - * - * This class can either return InternalRows or ColumnarBatches. With whole stage codegen - * enabled, this class returns ColumnarBatches which offers significant performance gains. - * TODO: make this always return ColumnarBatches. - */ -public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBase<Object> { - /** - * Batch of unsafe rows that we assemble and the current index we've returned. Every time this - * batch is used up (batchIdx == numBatched), we populated the batch. - */ - private UnsafeRow[] rows = new UnsafeRow[64]; - private int batchIdx = 0; - private int numBatched = 0; - - /** - * Used to write variable length columns. Same length as `rows`. - */ - private UnsafeRowWriter[] rowWriters = null; - /** - * True if the row contains variable length fields. - */ - private boolean containsVarLenFields; - - /** - * For each request column, the reader to read this column. - * columnsReaders[i] populated the UnsafeRow's attribute at i. - */ - private ColumnReader[] columnReaders; - - /** - * The number of rows that have been returned. - */ - private long rowsReturned; - - /** - * The number of rows that have been reading, including the current in flight row group. - */ - private long totalCountLoadedSoFar = 0; - - /** - * For each column, the annotated original type. - */ - private OriginalType[] originalTypes; - - /** - * The default size for varlen columns. The row grows as necessary to accommodate the - * largest column. - */ - private static final int DEFAULT_VAR_LEN_SIZE = 32; - - /** - * columnBatch object that is used for batch decoding. This is created on first use and triggers - * batched decoding. It is not valid to interleave calls to the batched interface with the row - * by row RecordReader APIs. - * This is only enabled with additional flags for development. This is still a work in progress - * and currently unsupported cases will fail with potentially difficult to diagnose errors. - * This should be only turned on for development to work on this feature. - * - * When this is set, the code will branch early on in the RecordReader APIs. There is no shared - * code between the path that uses the MR decoders and the vectorized ones. - * - * TODOs: - * - Implement v2 page formats (just make sure we create the correct decoders). - */ - private ColumnarBatch columnarBatch; - - /** - * If true, this class returns batches instead of rows. - */ - private boolean returnColumnarBatch; - - /** - * The default config on whether columnarBatch should be offheap. - */ - private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; - - /** - * Tries to initialize the reader for this split. Returns true if this reader supports reading - * this split and false otherwise. - */ - public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { - try { - initialize(inputSplit, taskAttemptContext); - return true; - } catch (Exception e) { - return false; - } - } - - /** - * Implementation of RecordReader API. - */ - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - super.initialize(inputSplit, taskAttemptContext); - initializeInternal(); - } - - /** - * Utility API that will read all the data in path. This circumvents the need to create Hadoop - * objects to use this class. `columns` can contain the list of columns to project. - */ - @Override - public void initialize(String path, List<String> columns) throws IOException { - super.initialize(path, columns); - initializeInternal(); - } - - @Override - public void close() throws IOException { - if (columnarBatch != null) { - columnarBatch.close(); - columnarBatch = null; - } - super.close(); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (returnColumnarBatch) return nextBatch(); - - if (batchIdx >= numBatched) { - if (vectorizedDecode()) { - if (!nextBatch()) return false; - } else { - if (!loadBatch()) return false; - } - } - ++batchIdx; - return true; - } - - @Override - public Object getCurrentValue() throws IOException, InterruptedException { - if (returnColumnarBatch) return columnarBatch; - - if (vectorizedDecode()) { - return columnarBatch.getRow(batchIdx - 1); - } else { - return rows[batchIdx - 1]; - } - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return (float) rowsReturned / totalRowCount; - } - - /** - * Returns the ColumnarBatch object that will be used for all rows returned by this reader. - * This object is reused. Calling this enables the vectorized reader. This should be called - * before any calls to nextKeyValue/nextBatch. - */ - public ColumnarBatch resultBatch() { - return resultBatch(DEFAULT_MEMORY_MODE); - } - - public ColumnarBatch resultBatch(MemoryMode memMode) { - if (columnarBatch == null) { - columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode); - } - return columnarBatch; - } - - /** - * Can be called before any rows are returned to enable returning columnar batches directly. - */ - public void enableReturningBatches() { - assert(vectorizedDecode()); - returnColumnarBatch = true; - } - - /** - * Advances to the next batch of rows. Returns false if there are no more. - */ - public boolean nextBatch() throws IOException { - assert(vectorizedDecode()); - columnarBatch.reset(); - if (rowsReturned >= totalRowCount) return false; - checkEndOfRowGroup(); - - int num = (int)Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned); - for (int i = 0; i < columnReaders.length; ++i) { - columnReaders[i].readBatch(num, columnarBatch.column(i)); - } - rowsReturned += num; - columnarBatch.setNumRows(num); - numBatched = num; - batchIdx = 0; - return true; - } - - /** - * Returns true if we are doing a vectorized decode. - */ - private boolean vectorizedDecode() { return columnarBatch != null; } - - private void initializeInternal() throws IOException { - /** - * Check that the requested schema is supported. - */ - int numVarLenFields = 0; - originalTypes = new OriginalType[requestedSchema.getFieldCount()]; - for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { - Type t = requestedSchema.getFields().get(i); - if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { - throw new IOException("Complex types not supported."); - } - PrimitiveType primitiveType = t.asPrimitiveType(); - - originalTypes[i] = t.getOriginalType(); - - // TODO: Be extremely cautious in what is supported. Expand this. - if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL && - originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != OriginalType.DATE && - originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) { - throw new IOException("Unsupported type: " + t); - } - if (originalTypes[i] == OriginalType.DECIMAL && - primitiveType.getDecimalMetadata().getPrecision() > Decimal.MAX_LONG_DIGITS()) { - throw new IOException("Decimal with high precision is not supported."); - } - if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { - throw new IOException("Int96 not supported."); - } - ColumnDescriptor fd = fileSchema.getColumnDescription(requestedSchema.getPaths().get(i)); - if (!fd.equals(requestedSchema.getColumns().get(i))) { - throw new IOException("Schema evolution not supported."); - } - - if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) { - ++numVarLenFields; - } - } - - /** - * Initialize rows and rowWriters. These objects are reused across all rows in the relation. - */ - containsVarLenFields = numVarLenFields > 0; - rowWriters = new UnsafeRowWriter[rows.length]; - - for (int i = 0; i < rows.length; ++i) { - rows[i] = new UnsafeRow(requestedSchema.getFieldCount()); - BufferHolder holder = new BufferHolder(rows[i], numVarLenFields * DEFAULT_VAR_LEN_SIZE); - rowWriters[i] = new UnsafeRowWriter(holder, requestedSchema.getFieldCount()); - } - } - - /** - * Decodes a batch of values into `rows`. This function is the hot path. - */ - private boolean loadBatch() throws IOException { - // no more records left - if (rowsReturned >= totalRowCount) { return false; } - checkEndOfRowGroup(); - - int num = (int)Math.min(rows.length, totalCountLoadedSoFar - rowsReturned); - rowsReturned += num; - - if (containsVarLenFields) { - for (int i = 0; i < rowWriters.length; ++i) { - rowWriters[i].holder().reset(); - } - } - - for (int i = 0; i < columnReaders.length; ++i) { - switch (columnReaders[i].descriptor.getType()) { - case BOOLEAN: - decodeBooleanBatch(i, num); - break; - case INT32: - if (originalTypes[i] == OriginalType.DECIMAL) { - decodeIntAsDecimalBatch(i, num); - } else { - decodeIntBatch(i, num); - } - break; - case INT64: - Preconditions.checkState(originalTypes[i] == null - || originalTypes[i] == OriginalType.DECIMAL, - "Unexpected original type: " + originalTypes[i]); - decodeLongBatch(i, num); - break; - case FLOAT: - decodeFloatBatch(i, num); - break; - case DOUBLE: - decodeDoubleBatch(i, num); - break; - case BINARY: - decodeBinaryBatch(i, num); - break; - case FIXED_LEN_BYTE_ARRAY: - Preconditions.checkState(originalTypes[i] == OriginalType.DECIMAL, - "Unexpected original type: " + originalTypes[i]); - decodeFixedLenArrayAsDecimalBatch(i, num); - break; - case INT96: - throw new IOException("Unsupported " + columnReaders[i].descriptor.getType()); - } - } - - numBatched = num; - batchIdx = 0; - - // Update the total row lengths if the schema contained variable length. We did not maintain - // this as we populated the columns. - if (containsVarLenFields) { - for (int i = 0; i < numBatched; ++i) { - rows[i].setTotalSize(rowWriters[i].holder().totalSize()); - } - } - - return true; - } - - private void decodeBooleanBatch(int col, int num) throws IOException { - for (int n = 0; n < num; ++n) { - if (columnReaders[col].next()) { - rows[n].setBoolean(col, columnReaders[col].nextBoolean()); - } else { - rows[n].setNullAt(col); - } - } - } - - private void decodeIntBatch(int col, int num) throws IOException { - for (int n = 0; n < num; ++n) { - if (columnReaders[col].next()) { - rows[n].setInt(col, columnReaders[col].nextInt()); - } else { - rows[n].setNullAt(col); - } - } - } - - private void decodeIntAsDecimalBatch(int col, int num) throws IOException { - for (int n = 0; n < num; ++n) { - if (columnReaders[col].next()) { - // Since this is stored as an INT, it is always a compact decimal. Just set it as a long. - rows[n].setLong(col, columnReaders[col].nextInt()); - } else { - rows[n].setNullAt(col); - } - } - } - - private void decodeLongBatch(int col, int num) throws IOException { - for (int n = 0; n < num; ++n) { - if (columnReaders[col].next()) { - rows[n].setLong(col, columnReaders[col].nextLong()); - } else { - rows[n].setNullAt(col); - } - } - } - - private void decodeFloatBatch(int col, int num) throws IOException { - for (int n = 0; n < num; ++n) { - if (columnReaders[col].next()) { - rows[n].setFloat(col, columnReaders[col].nextFloat()); - } else { - rows[n].setNullAt(col); - } - } - } - - private void decodeDoubleBatch(int col, int num) throws IOException { - for (int n = 0; n < num; ++n) { - if (columnReaders[col].next()) { - rows[n].setDouble(col, columnReaders[col].nextDouble()); - } else { - rows[n].setNullAt(col); - } - } - } - - private void decodeBinaryBatch(int col, int num) throws IOException { - for (int n = 0; n < num; ++n) { - if (columnReaders[col].next()) { - ByteBuffer bytes = columnReaders[col].nextBinary().toByteBuffer(); - int len = bytes.remaining(); - if (originalTypes[col] == OriginalType.UTF8) { - UTF8String str = - UTF8String.fromBytes(bytes.array(), bytes.arrayOffset() + bytes.position(), len); - rowWriters[n].write(col, str); - } else { - rowWriters[n].write(col, bytes.array(), bytes.arrayOffset() + bytes.position(), len); - } - rows[n].setNotNullAt(col); - } else { - rows[n].setNullAt(col); - } - } - } - - private void decodeFixedLenArrayAsDecimalBatch(int col, int num) throws IOException { - PrimitiveType type = requestedSchema.getFields().get(col).asPrimitiveType(); - int precision = type.getDecimalMetadata().getPrecision(); - int scale = type.getDecimalMetadata().getScale(); - Preconditions.checkState(precision <= Decimal.MAX_LONG_DIGITS(), - "Unsupported precision."); - - for (int n = 0; n < num; ++n) { - if (columnReaders[col].next()) { - Binary v = columnReaders[col].nextBinary(); - // Constructs a `Decimal` with an unscaled `Long` value if possible. - long unscaled = CatalystRowConverter.binaryToUnscaledLong(v); - rows[n].setDecimal(col, Decimal.apply(unscaled, precision, scale), precision); - } else { - rows[n].setNullAt(col); - } - } - } - - /** - * - * Decoder to return values from a single column. - */ - private final class ColumnReader { - /** - * Total number of values read. - */ - private long valuesRead; - - /** - * value that indicates the end of the current page. That is, - * if valuesRead == endOfPageValueCount, we are at the end of the page. - */ - private long endOfPageValueCount; - - /** - * The dictionary, if this column has dictionary encoding. - */ - private final Dictionary dictionary; - - /** - * If true, the current page is dictionary encoded. - */ - private boolean useDictionary; - - /** - * Maximum definition level for this column. - */ - private final int maxDefLevel; - - /** - * Repetition/Definition/Value readers. - */ - private IntIterator repetitionLevelColumn; - private IntIterator definitionLevelColumn; - private ValuesReader dataColumn; - - // Only set if vectorized decoding is true. This is used instead of the row by row decoding - // with `definitionLevelColumn`. - private VectorizedRleValuesReader defColumn; - - /** - * Total number of values in this column (in this row group). - */ - private final long totalValueCount; - - /** - * Total values in the current page. - */ - private int pageValueCount; - - private final PageReader pageReader; - private final ColumnDescriptor descriptor; - - public ColumnReader(ColumnDescriptor descriptor, PageReader pageReader) - throws IOException { - this.descriptor = descriptor; - this.pageReader = pageReader; - this.maxDefLevel = descriptor.getMaxDefinitionLevel(); - - DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); - if (dictionaryPage != null) { - try { - this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); - this.useDictionary = true; - } catch (IOException e) { - throw new IOException("could not decode the dictionary for " + descriptor, e); - } - } else { - this.dictionary = null; - this.useDictionary = false; - } - this.totalValueCount = pageReader.getTotalValueCount(); - if (totalValueCount == 0) { - throw new IOException("totalValueCount == 0"); - } - } - - /** - * TODO: Hoist the useDictionary branch to decode*Batch and make the batch page aligned. - */ - public boolean nextBoolean() { - if (!useDictionary) { - return dataColumn.readBoolean(); - } else { - return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId()); - } - } - - public int nextInt() { - if (!useDictionary) { - return dataColumn.readInteger(); - } else { - return dictionary.decodeToInt(dataColumn.readValueDictionaryId()); - } - } - - public long nextLong() { - if (!useDictionary) { - return dataColumn.readLong(); - } else { - return dictionary.decodeToLong(dataColumn.readValueDictionaryId()); - } - } - - public float nextFloat() { - if (!useDictionary) { - return dataColumn.readFloat(); - } else { - return dictionary.decodeToFloat(dataColumn.readValueDictionaryId()); - } - } - - public double nextDouble() { - if (!useDictionary) { - return dataColumn.readDouble(); - } else { - return dictionary.decodeToDouble(dataColumn.readValueDictionaryId()); - } - } - - public Binary nextBinary() { - if (!useDictionary) { - return dataColumn.readBytes(); - } else { - return dictionary.decodeToBinary(dataColumn.readValueDictionaryId()); - } - } - - /** - * Advances to the next value. Returns true if the value is non-null. - */ - private boolean next() throws IOException { - if (valuesRead >= endOfPageValueCount) { - if (valuesRead >= totalValueCount) { - // How do we get here? Throw end of stream exception? - return false; - } - readPage(); - } - ++valuesRead; - // TODO: Don't read for flat schemas - //repetitionLevel = repetitionLevelColumn.nextInt(); - return definitionLevelColumn.nextInt() == maxDefLevel; - } - - /** - * Reads `total` values from this columnReader into column. - */ - private void readBatch(int total, ColumnVector column) throws IOException { - int rowId = 0; - while (total > 0) { - // Compute the number of values we want to read in this page. - int leftInPage = (int)(endOfPageValueCount - valuesRead); - if (leftInPage == 0) { - readPage(); - leftInPage = (int)(endOfPageValueCount - valuesRead); - } - int num = Math.min(total, leftInPage); - if (useDictionary) { - // Read and decode dictionary ids. - ColumnVector dictionaryIds = column.reserveDictionaryIds(total);; - defColumn.readIntegers( - num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - decodeDictionaryIds(rowId, num, column, dictionaryIds); - } else { - column.setDictionary(null); - switch (descriptor.getType()) { - case BOOLEAN: - readBooleanBatch(rowId, num, column); - break; - case INT32: - readIntBatch(rowId, num, column); - break; - case INT64: - readLongBatch(rowId, num, column); - break; - case FLOAT: - readFloatBatch(rowId, num, column); - break; - case DOUBLE: - readDoubleBatch(rowId, num, column); - break; - case BINARY: - readBinaryBatch(rowId, num, column); - break; - case FIXED_LEN_BYTE_ARRAY: - readFixedLenByteArrayBatch(rowId, num, column, descriptor.getTypeLength()); - break; - default: - throw new IOException("Unsupported type: " + descriptor.getType()); - } - } - - valuesRead += num; - rowId += num; - total -= num; - } - } - - /** - * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. - */ - private void decodeDictionaryIds(int rowId, int num, ColumnVector column, - ColumnVector dictionaryIds) { - switch (descriptor.getType()) { - case INT32: - case INT64: - case FLOAT: - case DOUBLE: - case BINARY: - column.setDictionary(dictionary); - break; - - case FIXED_LEN_BYTE_ARRAY: - // DecimalType written in the legacy mode - if (DecimalType.is32BitDecimalType(column.dataType())) { - for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v)); - } - } else if (DecimalType.is64BitDecimalType(column.dataType())) { - for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); - column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v)); - } - } else { - throw new NotImplementedException(); - } - break; - - default: - throw new NotImplementedException("Unsupported type: " + descriptor.getType()); - } - } - - /** - * For all the read*Batch functions, reads `num` values from this columnReader into column. It - * is guaranteed that num is smaller than the number of values left in the current page. - */ - - private void readBooleanBatch(int rowId, int num, ColumnVector column) throws IOException { - assert(column.dataType() == DataTypes.BooleanType); - defColumn.readBooleans( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } - - private void readIntBatch(int rowId, int num, ColumnVector column) throws IOException { - // This is where we implement support for the valid type conversions. - // TODO: implement remaining type conversions - if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType || - DecimalType.is32BitDecimalType(column.dataType())) { - defColumn.readIntegers( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else if (column.dataType() == DataTypes.ByteType) { - defColumn.readBytes( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else if (column.dataType() == DataTypes.ShortType) { - defColumn.readShorts( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { - throw new NotImplementedException("Unimplemented type: " + column.dataType()); - } - } - - private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException { - // This is where we implement support for the valid type conversions. - if (column.dataType() == DataTypes.LongType || - DecimalType.is64BitDecimalType(column.dataType())) { - defColumn.readLongs( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { - throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType()); - } - } - - private void readFloatBatch(int rowId, int num, ColumnVector column) throws IOException { - // This is where we implement support for the valid type conversions. - // TODO: support implicit cast to double? - if (column.dataType() == DataTypes.FloatType) { - defColumn.readFloats( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { - throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType()); - } - } - - private void readDoubleBatch(int rowId, int num, ColumnVector column) throws IOException { - // This is where we implement support for the valid type conversions. - // TODO: implement remaining type conversions - if (column.dataType() == DataTypes.DoubleType) { - defColumn.readDoubles( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { - throw new NotImplementedException("Unimplemented type: " + column.dataType()); - } - } - - private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException { - // This is where we implement support for the valid type conversions. - // TODO: implement remaining type conversions - if (column.isArray()) { - defColumn.readBinarys( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else { - throw new NotImplementedException("Unimplemented type: " + column.dataType()); - } - } - - private void readFixedLenByteArrayBatch(int rowId, int num, - ColumnVector column, int arrayLen) throws IOException { - VectorizedValuesReader data = (VectorizedValuesReader) dataColumn; - // This is where we implement support for the valid type conversions. - // TODO: implement remaining type conversions - if (DecimalType.is32BitDecimalType(column.dataType())) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - column.putInt(rowId + i, - (int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); - } else { - column.putNull(rowId + i); - } - } - } else if (DecimalType.is64BitDecimalType(column.dataType())) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - column.putLong(rowId + i, - CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); - } else { - column.putNull(rowId + i); - } - } - } else { - throw new NotImplementedException("Unimplemented type: " + column.dataType()); - } - } - - private void readPage() throws IOException { - DataPage page = pageReader.readPage(); - // TODO: Why is this a visitor? - page.accept(new DataPage.Visitor<Void>() { - @Override - public Void visit(DataPageV1 dataPageV1) { - try { - readPageV1(dataPageV1); - return null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Void visit(DataPageV2 dataPageV2) { - try { - readPageV2(dataPageV2); - return null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - }); - } - - private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset)throws IOException { - this.endOfPageValueCount = valuesRead + pageValueCount; - if (dataEncoding.usesDictionary()) { - this.dataColumn = null; - if (dictionary == null) { - throw new IOException( - "could not read page in col " + descriptor + - " as the dictionary was missing for encoding " + dataEncoding); - } - if (vectorizedDecode()) { - @SuppressWarnings("deprecation") - Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression - if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) { - throw new NotImplementedException("Unsupported encoding: " + dataEncoding); - } - this.dataColumn = new VectorizedRleValuesReader(); - } else { - this.dataColumn = dataEncoding.getDictionaryBasedValuesReader( - descriptor, VALUES, dictionary); - } - this.useDictionary = true; - } else { - if (vectorizedDecode()) { - if (dataEncoding != Encoding.PLAIN) { - throw new NotImplementedException("Unsupported encoding: " + dataEncoding); - } - this.dataColumn = new VectorizedPlainValuesReader(); - } else { - this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES); - } - this.useDictionary = false; - } - - try { - dataColumn.initFromPage(pageValueCount, bytes, offset); - } catch (IOException e) { - throw new IOException("could not read page in col " + descriptor, e); - } - } - - private void readPageV1(DataPageV1 page) throws IOException { - this.pageValueCount = page.getValueCount(); - ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); - ValuesReader dlReader; - - // Initialize the decoders. - if (vectorizedDecode()) { - if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) { - throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding()); - } - int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); - this.defColumn = new VectorizedRleValuesReader(bitWidth); - dlReader = this.defColumn; - } else { - dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); - } - this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); - this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); - try { - byte[] bytes = page.getBytes().toByteArray(); - rlReader.initFromPage(pageValueCount, bytes, 0); - int next = rlReader.getNextOffset(); - dlReader.initFromPage(pageValueCount, bytes, next); - next = dlReader.getNextOffset(); - initDataReader(page.getValueEncoding(), bytes, next); - } catch (IOException e) { - throw new IOException("could not read page " + page + " in col " + descriptor, e); - } - } - - private void readPageV2(DataPageV2 page) throws IOException { - this.pageValueCount = page.getValueCount(); - this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(), - page.getRepetitionLevels(), descriptor); - - if (vectorizedDecode()) { - int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); - this.defColumn = new VectorizedRleValuesReader(bitWidth); - this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn); - this.defColumn.initFromBuffer( - this.pageValueCount, page.getDefinitionLevels().toByteArray()); - } else { - this.definitionLevelColumn = createRLEIterator(descriptor.getMaxDefinitionLevel(), - page.getDefinitionLevels(), descriptor); - } - try { - initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0); - } catch (IOException e) { - throw new IOException("could not read page " + page + " in col " + descriptor, e); - } - } - } - - private void checkEndOfRowGroup() throws IOException { - if (rowsReturned != totalCountLoadedSoFar) return; - PageReadStore pages = reader.readNextRowGroup(); - if (pages == null) { - throw new IOException("expecting more rows but reached last block. Read " - + rowsReturned + " out of " + totalRowCount); - } - List<ColumnDescriptor> columns = requestedSchema.getColumns(); - columnReaders = new ColumnReader[columns.size()]; - for (int i = 0; i < columns.size(); ++i) { - columnReaders[i] = new ColumnReader(columns.get(i), pages.getPageReader(columns.get(i))); - } - totalCountLoadedSoFar += pages.getRowCount(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java new file mode 100644 index 0000000..0f00f56 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -0,0 +1,701 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.*; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; + +import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; + +/** + * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the + * Parquet column APIs. This is somewhat based on parquet-mr's ColumnReader. + * + * TODO: handle complex types, decimal requiring more than 8 bytes, INT96. Schema mismatch. + * All of these can be handled efficiently and easily with codegen. + * + * This class can either return InternalRows or ColumnarBatches. With whole stage codegen + * enabled, this class returns ColumnarBatches which offers significant performance gains. + * TODO: make this always return ColumnarBatches. + */ +public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBase<Object> { + /** + * Batch of rows that we assemble and the current index we've returned. Every time this + * batch is used up (batchIdx == numBatched), we populated the batch. + */ + private int batchIdx = 0; + private int numBatched = 0; + + /** + * For each request column, the reader to read this column. + */ + private VectorizedColumnReader[] columnReaders; + + /** + * The number of rows that have been returned. + */ + private long rowsReturned; + + /** + * The number of rows that have been reading, including the current in flight row group. + */ + private long totalCountLoadedSoFar = 0; + + /** + * columnBatch object that is used for batch decoding. This is created on first use and triggers + * batched decoding. It is not valid to interleave calls to the batched interface with the row + * by row RecordReader APIs. + * This is only enabled with additional flags for development. This is still a work in progress + * and currently unsupported cases will fail with potentially difficult to diagnose errors. + * This should be only turned on for development to work on this feature. + * + * When this is set, the code will branch early on in the RecordReader APIs. There is no shared + * code between the path that uses the MR decoders and the vectorized ones. + * + * TODOs: + * - Implement v2 page formats (just make sure we create the correct decoders). + */ + private ColumnarBatch columnarBatch; + + /** + * If true, this class returns batches instead of rows. + */ + private boolean returnColumnarBatch; + + /** + * The default config on whether columnarBatch should be offheap. + */ + private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; + + /** + * Tries to initialize the reader for this split. Returns true if this reader supports reading + * this split and false otherwise. + */ + public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { + try { + initialize(inputSplit, taskAttemptContext); + return true; + } catch (Exception e) { + return false; + } + } + + /** + * Implementation of RecordReader API. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { + super.initialize(inputSplit, taskAttemptContext); + initializeInternal(); + } + + /** + * Utility API that will read all the data in path. This circumvents the need to create Hadoop + * objects to use this class. `columns` can contain the list of columns to project. + */ + @Override + public void initialize(String path, List<String> columns) throws IOException { + super.initialize(path, columns); + initializeInternal(); + } + + @Override + public void close() throws IOException { + if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; + } + super.close(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + resultBatch(); + + if (returnColumnarBatch) return nextBatch(); + + if (batchIdx >= numBatched) { + if (!nextBatch()) return false; + } + ++batchIdx; + return true; + } + + @Override + public Object getCurrentValue() throws IOException, InterruptedException { + if (returnColumnarBatch) return columnarBatch; + return columnarBatch.getRow(batchIdx - 1); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return (float) rowsReturned / totalRowCount; + } + + /** + * Returns the ColumnarBatch object that will be used for all rows returned by this reader. + * This object is reused. Calling this enables the vectorized reader. This should be called + * before any calls to nextKeyValue/nextBatch. + */ + public ColumnarBatch resultBatch() { + return resultBatch(DEFAULT_MEMORY_MODE); + } + + public ColumnarBatch resultBatch(MemoryMode memMode) { + if (columnarBatch == null) { + columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode); + } + return columnarBatch; + } + + /** + * Can be called before any rows are returned to enable returning columnar batches directly. + */ + public void enableReturningBatches() { + returnColumnarBatch = true; + } + + /** + * Advances to the next batch of rows. Returns false if there are no more. + */ + public boolean nextBatch() throws IOException { + columnarBatch.reset(); + if (rowsReturned >= totalRowCount) return false; + checkEndOfRowGroup(); + + int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned); + for (int i = 0; i < columnReaders.length; ++i) { + columnReaders[i].readBatch(num, columnarBatch.column(i)); + } + rowsReturned += num; + columnarBatch.setNumRows(num); + numBatched = num; + batchIdx = 0; + return true; + } + + private void initializeInternal() throws IOException { + /** + * Check that the requested schema is supported. + */ + OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()]; + for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { + Type t = requestedSchema.getFields().get(i); + if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { + throw new IOException("Complex types not supported."); + } + PrimitiveType primitiveType = t.asPrimitiveType(); + + originalTypes[i] = t.getOriginalType(); + + // TODO: Be extremely cautious in what is supported. Expand this. + if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL && + originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != OriginalType.DATE && + originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) { + throw new IOException("Unsupported type: " + t); + } + if (originalTypes[i] == OriginalType.DECIMAL && + primitiveType.getDecimalMetadata().getPrecision() > Decimal.MAX_LONG_DIGITS()) { + throw new IOException("Decimal with high precision is not supported."); + } + if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { + throw new IOException("Int96 not supported."); + } + ColumnDescriptor fd = fileSchema.getColumnDescription(requestedSchema.getPaths().get(i)); + if (!fd.equals(requestedSchema.getColumns().get(i))) { + throw new IOException("Schema evolution not supported."); + } + } + } + + /** + * Decoder to return values from a single column. + */ + private class VectorizedColumnReader { + /** + * Total number of values read. + */ + private long valuesRead; + + /** + * value that indicates the end of the current page. That is, + * if valuesRead == endOfPageValueCount, we are at the end of the page. + */ + private long endOfPageValueCount; + + /** + * The dictionary, if this column has dictionary encoding. + */ + private final Dictionary dictionary; + + /** + * If true, the current page is dictionary encoded. + */ + private boolean useDictionary; + + /** + * Maximum definition level for this column. + */ + private final int maxDefLevel; + + /** + * Repetition/Definition/Value readers. + */ + private IntIterator repetitionLevelColumn; + private IntIterator definitionLevelColumn; + private ValuesReader dataColumn; + + // Only set if vectorized decoding is true. This is used instead of the row by row decoding + // with `definitionLevelColumn`. + private VectorizedRleValuesReader defColumn; + + /** + * Total number of values in this column (in this row group). + */ + private final long totalValueCount; + + /** + * Total values in the current page. + */ + private int pageValueCount; + + private final PageReader pageReader; + private final ColumnDescriptor descriptor; + + public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + throws IOException { + this.descriptor = descriptor; + this.pageReader = pageReader; + this.maxDefLevel = descriptor.getMaxDefinitionLevel(); + + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); + if (dictionaryPage != null) { + try { + this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); + this.useDictionary = true; + } catch (IOException e) { + throw new IOException("could not decode the dictionary for " + descriptor, e); + } + } else { + this.dictionary = null; + this.useDictionary = false; + } + this.totalValueCount = pageReader.getTotalValueCount(); + if (totalValueCount == 0) { + throw new IOException("totalValueCount == 0"); + } + } + + /** + * TODO: Hoist the useDictionary branch to decode*Batch and make the batch page aligned. + */ + public boolean nextBoolean() { + if (!useDictionary) { + return dataColumn.readBoolean(); + } else { + return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId()); + } + } + + public int nextInt() { + if (!useDictionary) { + return dataColumn.readInteger(); + } else { + return dictionary.decodeToInt(dataColumn.readValueDictionaryId()); + } + } + + public long nextLong() { + if (!useDictionary) { + return dataColumn.readLong(); + } else { + return dictionary.decodeToLong(dataColumn.readValueDictionaryId()); + } + } + + public float nextFloat() { + if (!useDictionary) { + return dataColumn.readFloat(); + } else { + return dictionary.decodeToFloat(dataColumn.readValueDictionaryId()); + } + } + + public double nextDouble() { + if (!useDictionary) { + return dataColumn.readDouble(); + } else { + return dictionary.decodeToDouble(dataColumn.readValueDictionaryId()); + } + } + + public Binary nextBinary() { + if (!useDictionary) { + return dataColumn.readBytes(); + } else { + return dictionary.decodeToBinary(dataColumn.readValueDictionaryId()); + } + } + + /** + * Advances to the next value. Returns true if the value is non-null. + */ + private boolean next() throws IOException { + if (valuesRead >= endOfPageValueCount) { + if (valuesRead >= totalValueCount) { + // How do we get here? Throw end of stream exception? + return false; + } + readPage(); + } + ++valuesRead; + // TODO: Don't read for flat schemas + //repetitionLevel = repetitionLevelColumn.nextInt(); + return definitionLevelColumn.nextInt() == maxDefLevel; + } + + /** + * Reads `total` values from this columnReader into column. + */ + private void readBatch(int total, ColumnVector column) throws IOException { + int rowId = 0; + while (total > 0) { + // Compute the number of values we want to read in this page. + int leftInPage = (int)(endOfPageValueCount - valuesRead); + if (leftInPage == 0) { + readPage(); + leftInPage = (int)(endOfPageValueCount - valuesRead); + } + int num = Math.min(total, leftInPage); + if (useDictionary) { + // Read and decode dictionary ids. + ColumnVector dictionaryIds = column.reserveDictionaryIds(total); + defColumn.readIntegers( + num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + decodeDictionaryIds(rowId, num, column, dictionaryIds); + } else { + column.setDictionary(null); + switch (descriptor.getType()) { + case BOOLEAN: + readBooleanBatch(rowId, num, column); + break; + case INT32: + readIntBatch(rowId, num, column); + break; + case INT64: + readLongBatch(rowId, num, column); + break; + case FLOAT: + readFloatBatch(rowId, num, column); + break; + case DOUBLE: + readDoubleBatch(rowId, num, column); + break; + case BINARY: + readBinaryBatch(rowId, num, column); + break; + case FIXED_LEN_BYTE_ARRAY: + readFixedLenByteArrayBatch(rowId, num, column, descriptor.getTypeLength()); + break; + default: + throw new IOException("Unsupported type: " + descriptor.getType()); + } + } + + valuesRead += num; + rowId += num; + total -= num; + } + } + + /** + * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. + */ + private void decodeDictionaryIds(int rowId, int num, ColumnVector column, + ColumnVector dictionaryIds) { + switch (descriptor.getType()) { + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case BINARY: + column.setDictionary(dictionary); + break; + + case FIXED_LEN_BYTE_ARRAY: + // DecimalType written in the legacy mode + if (DecimalType.is32BitDecimalType(column.dataType())) { + for (int i = rowId; i < rowId + num; ++i) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putInt(i, (int) CatalystRowConverter.binaryToUnscaledLong(v)); + } + } else if (DecimalType.is64BitDecimalType(column.dataType())) { + for (int i = rowId; i < rowId + num; ++i) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v)); + } + } else { + throw new NotImplementedException(); + } + break; + + default: + throw new NotImplementedException("Unsupported type: " + descriptor.getType()); + } + } + + /** + * For all the read*Batch functions, reads `num` values from this columnReader into column. It + * is guaranteed that num is smaller than the number of values left in the current page. + */ + + private void readBooleanBatch(int rowId, int num, ColumnVector column) throws IOException { + assert(column.dataType() == DataTypes.BooleanType); + defColumn.readBooleans( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } + + private void readIntBatch(int rowId, int num, ColumnVector column) throws IOException { + // This is where we implement support for the valid type conversions. + // TODO: implement remaining type conversions + if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType || + DecimalType.is32BitDecimalType(column.dataType())) { + defColumn.readIntegers( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.ByteType) { + defColumn.readBytes( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.ShortType) { + defColumn.readShorts( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + throw new NotImplementedException("Unimplemented type: " + column.dataType()); + } + } + + private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException { + // This is where we implement support for the valid type conversions. + if (column.dataType() == DataTypes.LongType || + DecimalType.is64BitDecimalType(column.dataType())) { + defColumn.readLongs( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType()); + } + } + + private void readFloatBatch(int rowId, int num, ColumnVector column) throws IOException { + // This is where we implement support for the valid type conversions. + // TODO: support implicit cast to double? + if (column.dataType() == DataTypes.FloatType) { + defColumn.readFloats( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + throw new UnsupportedOperationException("Unsupported conversion to: " + column.dataType()); + } + } + + private void readDoubleBatch(int rowId, int num, ColumnVector column) throws IOException { + // This is where we implement support for the valid type conversions. + // TODO: implement remaining type conversions + if (column.dataType() == DataTypes.DoubleType) { + defColumn.readDoubles( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + throw new NotImplementedException("Unimplemented type: " + column.dataType()); + } + } + + private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException { + // This is where we implement support for the valid type conversions. + // TODO: implement remaining type conversions + if (column.isArray()) { + defColumn.readBinarys( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else { + throw new NotImplementedException("Unimplemented type: " + column.dataType()); + } + } + + private void readFixedLenByteArrayBatch(int rowId, int num, + ColumnVector column, int arrayLen) throws IOException { + VectorizedValuesReader data = (VectorizedValuesReader) dataColumn; + // This is where we implement support for the valid type conversions. + // TODO: implement remaining type conversions + if (DecimalType.is32BitDecimalType(column.dataType())) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + column.putInt(rowId + i, + (int) CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); + } else { + column.putNull(rowId + i); + } + } + } else if (DecimalType.is64BitDecimalType(column.dataType())) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + column.putLong(rowId + i, + CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen))); + } else { + column.putNull(rowId + i); + } + } + } else { + throw new NotImplementedException("Unimplemented type: " + column.dataType()); + } + } + + private void readPage() throws IOException { + DataPage page = pageReader.readPage(); + // TODO: Why is this a visitor? + page.accept(new DataPage.Visitor<Void>() { + @Override + public Void visit(DataPageV1 dataPageV1) { + try { + readPageV1(dataPageV1); + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Void visit(DataPageV2 dataPageV2) { + try { + readPageV2(dataPageV2); + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } + + private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException { + this.endOfPageValueCount = valuesRead + pageValueCount; + if (dataEncoding.usesDictionary()) { + this.dataColumn = null; + if (dictionary == null) { + throw new IOException( + "could not read page in col " + descriptor + + " as the dictionary was missing for encoding " + dataEncoding); + } + @SuppressWarnings("deprecation") + Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression + if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) { + throw new NotImplementedException("Unsupported encoding: " + dataEncoding); + } + this.dataColumn = new VectorizedRleValuesReader(); + this.useDictionary = true; + } else { + if (dataEncoding != Encoding.PLAIN) { + throw new NotImplementedException("Unsupported encoding: " + dataEncoding); + } + this.dataColumn = new VectorizedPlainValuesReader(); + this.useDictionary = false; + } + + try { + dataColumn.initFromPage(pageValueCount, bytes, offset); + } catch (IOException e) { + throw new IOException("could not read page in col " + descriptor, e); + } + } + + private void readPageV1(DataPageV1 page) throws IOException { + this.pageValueCount = page.getValueCount(); + ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); + ValuesReader dlReader; + + // Initialize the decoders. + if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) { + throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding()); + } + int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); + this.defColumn = new VectorizedRleValuesReader(bitWidth); + dlReader = this.defColumn; + this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); + this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader); + try { + byte[] bytes = page.getBytes().toByteArray(); + rlReader.initFromPage(pageValueCount, bytes, 0); + int next = rlReader.getNextOffset(); + dlReader.initFromPage(pageValueCount, bytes, next); + next = dlReader.getNextOffset(); + initDataReader(page.getValueEncoding(), bytes, next); + } catch (IOException e) { + throw new IOException("could not read page " + page + " in col " + descriptor, e); + } + } + + private void readPageV2(DataPageV2 page) throws IOException { + this.pageValueCount = page.getValueCount(); + this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(), + page.getRepetitionLevels(), descriptor); + + int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); + this.defColumn = new VectorizedRleValuesReader(bitWidth); + this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn); + this.defColumn.initFromBuffer( + this.pageValueCount, page.getDefinitionLevels().toByteArray()); + try { + initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0); + } catch (IOException e) { + throw new IOException("could not read page " + page + " in col " + descriptor, e); + } + } + } + + private void checkEndOfRowGroup() throws IOException { + if (rowsReturned != totalCountLoadedSoFar) return; + PageReadStore pages = reader.readNextRowGroup(); + if (pages == null) { + throw new IOException("expecting more rows but reached last block. Read " + + rowsReturned + " out of " + totalRowCount); + } + List<ColumnDescriptor> columns = requestedSchema.getColumns(); + columnReaders = new VectorizedColumnReader[columns.size()]; + for (int i = 0; i < columns.size(); ++i) { + columnReaders[i] = new VectorizedColumnReader(columns.get(i), + pages.getPageReader(columns.get(i))); + } + totalCountLoadedSoFar += pages.getRowCount(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 2abfd14..cd769d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -160,6 +160,15 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm } (keyValueOutput, runFunc) + case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " + + s"deprecated and will be ignored. Vectorized parquet reader will be used instead.") + Seq(Row(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, "true")) + } + (keyValueOutput, runFunc) + // Configures a single property. case Some((key, Some(value))) => val runFunc = (sqlContext: SQLContext) => { http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index e848f42..f3514cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -33,9 +33,9 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.Logging -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader +import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager} @@ -99,8 +99,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( // If true, enable using the custom RecordReader for parquet. This only works for // a subset of the types (no complex types). - protected val enableUnsafeRowParquetReader: Boolean = - sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean protected val enableVectorizedParquetReader: Boolean = sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean protected val enableWholestageCodegen: Boolean = @@ -174,19 +172,17 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( * fails (for example, unsupported schema), try with the normal reader. * TODO: plumb this through a different way? */ - if (enableUnsafeRowParquetReader && + if (enableVectorizedParquetReader && format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") { - val parquetReader: UnsafeRowParquetRecordReader = new UnsafeRowParquetRecordReader() + val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader() if (!parquetReader.tryInitialize( split.serializableHadoopSplit.value, hadoopAttemptContext)) { parquetReader.close() } else { reader = parquetReader.asInstanceOf[RecordReader[Void, V]] - if (enableVectorizedParquetReader) { - parquetReader.resultBatch() - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - if (enableWholestageCodegen) parquetReader.enableReturningBatches(); - } + parquetReader.resultBatch() + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + if (enableWholestageCodegen) parquetReader.enableReturningBatches() } } @@ -203,7 +199,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( private[this] var finished = false override def hasNext: Boolean = { - if (context.isInterrupted) { + if (context.isInterrupted()) { throw new TaskKilledException } if (!finished && !havePair) { http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c308161..473cde5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -345,11 +345,6 @@ object SQLConf { "option must be set in Hadoop Configuration. 2. This option overrides " + "\"spark.sql.sources.outputCommitterClass\".") - val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf( - key = "spark.sql.parquet.enableUnsafeRowRecordReader", - defaultValue = Some(true), - doc = "Enables using the custom ParquetUnsafeRowRecordReader.") - val PARQUET_VECTORIZED_READER_ENABLED = booleanConf( key = "spark.sql.parquet.enableVectorizedReader", defaultValue = Some(true), @@ -527,6 +522,7 @@ object SQLConf { val CODEGEN_ENABLED = "spark.sql.codegen" val UNSAFE_ENABLED = "spark.sql.unsafe.enabled" val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin" + val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = "spark.sql.parquet.enableUnsafeRowRecordReader" } } http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index 29318d8..f42c754 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -36,7 +36,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath) val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader reader.initialize(file.asInstanceOf[String], null) val batch = reader.resultBatch() assert(reader.nextBatch()) @@ -61,7 +61,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex data.repartition(1).write.parquet(dir.getCanonicalPath) val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader reader.initialize(file.asInstanceOf[String], null) val batch = reader.resultBatch() assert(reader.nextBatch()) http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index b394ffb..51183e9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -57,7 +57,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val output = predicate.collect { case a: Attribute => a }.distinct withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { val query = df .select(output.map(e => Column(e)): _*) .where(Column(predicate)) @@ -446,7 +446,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("SPARK-11661 Still pushdown filters returned by unhandledFilters") { import testImplicits._ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { withTempPath { dir => val path = s"${dir.getCanonicalPath}/part=1" (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) @@ -520,7 +520,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex test("SPARK-11164: test the parquet filter in") { import testImplicits._ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { - withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { withTempPath { dir => val path = s"${dir.getCanonicalPath}/table1" (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path) http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 4d9a8d7..ebdb105 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -656,7 +656,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { var hash1: Int = 0 var hash2: Int = 0 (false :: true :: Nil).foreach { v => - withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> v.toString) { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> v.toString) { val df = sqlContext.read.parquet(dir.getCanonicalPath) val rows = df.queryExecution.toRdd.map(_.copy()).collect() val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow]) @@ -672,13 +672,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("UnsafeRowParquetRecordReader - direct path read") { - val data = (0 to 10).map(i => (i, ((i + 'a').toChar.toString))) + test("VectorizedParquetRecordReader - direct path read") { + val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString)) withTempPath { dir => sqlContext.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath) val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0); { - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader try { reader.initialize(file, null) val result = mutable.ArrayBuffer.empty[(Int, String)] @@ -695,7 +695,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // Project just one column { - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader try { reader.initialize(file, ("_2" :: Nil).asJava) val result = mutable.ArrayBuffer.empty[(String)] @@ -711,7 +711,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // Project columns in opposite order { - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader try { reader.initialize(file, ("_2" :: "_1" :: Nil).asJava) val result = mutable.ArrayBuffer.empty[(String, Int)] @@ -728,7 +728,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { // Empty projection { - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader try { reader.initialize(file, List[String]().asJava) var result = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index 15bf00e..070c400 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -82,38 +82,17 @@ object ParquetReadBenchmark { } sqlBenchmark.addCase("SQL Parquet MR") { iter => - withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { - sqlContext.sql("select sum(id) from tempTable").collect() - } - } - - sqlBenchmark.addCase("SQL Parquet Non-Vectorized") { iter => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { sqlContext.sql("select sum(id) from tempTable").collect() } } val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray - // Driving the parquet reader directly without Spark. - parquetReaderBenchmark.addCase("ParquetReader Non-Vectorized") { num => - var sum = 0L - files.map(_.asInstanceOf[String]).foreach { p => - val reader = new UnsafeRowParquetRecordReader - reader.initialize(p, ("id" :: Nil).asJava) - - while (reader.nextKeyValue()) { - val record = reader.getCurrentValue.asInstanceOf[InternalRow] - if (!record.isNullAt(0)) sum += record.getInt(0) - } - reader.close() - } - } - // Driving the parquet reader in batch mode directly. parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num => var sum = 0L files.map(_.asInstanceOf[String]).foreach { p => - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader try { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() @@ -136,7 +115,7 @@ object ParquetReadBenchmark { parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num => var sum = 0L files.map(_.asInstanceOf[String]).foreach { p => - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader try { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() @@ -159,7 +138,6 @@ object ParquetReadBenchmark { ------------------------------------------------------------------------------------------- SQL Parquet Vectorized 215 / 262 73.0 13.7 1.0X SQL Parquet MR 1946 / 2083 8.1 123.7 0.1X - SQL Parquet Non-Vectorized 1079 / 1213 14.6 68.6 0.2X */ sqlBenchmark.run() @@ -167,9 +145,8 @@ object ParquetReadBenchmark { Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz Parquet Reader Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - ParquetReader Non-Vectorized 610 / 737 25.8 38.8 1.0X - ParquetReader Vectorized 123 / 152 127.8 7.8 5.0X - ParquetReader Vectorized -> Row 165 / 180 95.2 10.5 3.7X + ParquetReader Vectorized 123 / 152 127.8 7.8 1.0X + ParquetReader Vectorized -> Row 165 / 180 95.2 10.5 0.7X */ parquetReaderBenchmark.run() } @@ -191,32 +168,12 @@ object ParquetReadBenchmark { } benchmark.addCase("SQL Parquet MR") { iter => - withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { - sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect - } - } - - benchmark.addCase("SQL Parquet Non-vectorized") { iter => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect } } val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray - benchmark.addCase("ParquetReader Non-vectorized") { num => - var sum1 = 0L - var sum2 = 0L - files.map(_.asInstanceOf[String]).foreach { p => - val reader = new UnsafeRowParquetRecordReader - reader.initialize(p, null) - while (reader.nextKeyValue()) { - val record = reader.getCurrentValue.asInstanceOf[InternalRow] - if (!record.isNullAt(0)) sum1 += record.getInt(0) - if (!record.isNullAt(1)) sum2 += record.getUTF8String(1).numBytes() - } - reader.close() - } - } /* Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz @@ -224,8 +181,6 @@ object ParquetReadBenchmark { ------------------------------------------------------------------------------------------- SQL Parquet Vectorized 628 / 720 16.7 59.9 1.0X SQL Parquet MR 1905 / 2239 5.5 181.7 0.3X - SQL Parquet Non-vectorized 1429 / 1732 7.3 136.3 0.4X - ParquetReader Non-vectorized 989 / 1357 10.6 94.3 0.6X */ benchmark.run() } @@ -247,7 +202,7 @@ object ParquetReadBenchmark { } benchmark.addCase("SQL Parquet MR") { iter => - withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { sqlContext.sql("select sum(length(c1)) from tempTable").collect } } @@ -293,7 +248,7 @@ object ParquetReadBenchmark { Read data column 191 / 250 82.1 12.2 1.0X Read partition column 82 / 86 192.4 5.2 2.3X Read both columns 220 / 248 71.5 14.0 0.9X - */ + */ benchmark.run() } } @@ -319,7 +274,7 @@ object ParquetReadBenchmark { benchmark.addCase("PR Vectorized") { num => var sum = 0 files.map(_.asInstanceOf[String]).foreach { p => - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader try { reader.initialize(p, ("c1" :: "c2" :: Nil).asJava) val batch = reader.resultBatch() @@ -340,7 +295,7 @@ object ParquetReadBenchmark { benchmark.addCase("PR Vectorized (Null Filtering)") { num => var sum = 0L files.map(_.asInstanceOf[String]).foreach { p => - val reader = new UnsafeRowParquetRecordReader + val reader = new VectorizedParquetRecordReader try { reader.initialize(p, ("c1" :: "c2" :: Nil).asJava) val batch = reader.resultBatch() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
