Repository: spark
Updated Branches:
  refs/heads/master 238fb485b -> 54794113a


[SPARK-13989] [SQL] Remove non-vectorized/unsafe-row parquet record reader

## What changes were proposed in this pull request?

This PR cleans up the new parquet record reader with the following changes:

1. Removes the non-vectorized parquet reader code from 
`UnsafeRowParquetRecordReader`.
2. Removes the non-vectorized column reader code from `ColumnReader`.
3. Renames `UnsafeRowParquetRecordReader` to `VectorizedParquetRecordReader` 
and `ColumnReader` to `VectorizedColumnReader`
4. Deprecate `PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED`

## How was this patch tested?

Refactoring only; Existing tests should reveal any problems.

Author: Sameer Agarwal <[email protected]>

Closes #11799 from sameeragarwal/vectorized-parquet.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54794113
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54794113
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54794113

Branch: refs/heads/master
Commit: 54794113a6a906b0f9c6bfb9da322e18e007214c
Parents: 238fb48
Author: Sameer Agarwal <[email protected]>
Authored: Fri Mar 18 14:04:42 2016 -0700
Committer: Davies Liu <[email protected]>
Committed: Fri Mar 18 14:04:42 2016 -0700

----------------------------------------------------------------------
 .../parquet/UnsafeRowParquetRecordReader.java   | 946 -------------------
 .../parquet/VectorizedParquetRecordReader.java  | 701 ++++++++++++++
 .../spark/sql/execution/command/commands.scala  |   9 +
 .../execution/datasources/SqlNewHadoopRDD.scala |  20 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   6 +-
 .../parquet/ParquetEncodingSuite.scala          |   4 +-
 .../parquet/ParquetFilterSuite.scala            |   6 +-
 .../datasources/parquet/ParquetIOSuite.scala    |  14 +-
 .../parquet/ParquetReadBenchmark.scala          |  61 +-
 9 files changed, 739 insertions(+), 1028 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
deleted file mode 100644
index 7234726..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ /dev/null
@@ -1,946 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.parquet;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Dictionary;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.*;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
-import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import static org.apache.parquet.column.ValuesType.*;
-
-/**
- * A specialized RecordReader that reads into UnsafeRows directly using the 
Parquet column APIs.
- *
- * This is somewhat based on parquet-mr's ColumnReader.
- *
- * TODO: handle complex types, decimal requiring more than 8 bytes, INT96. 
Schema mismatch.
- * All of these can be handled efficiently and easily with codegen.
- *
- * This class can either return InternalRows or ColumnarBatches. With whole 
stage codegen
- * enabled, this class returns ColumnarBatches which offers significant 
performance gains.
- * TODO: make this always return ColumnarBatches.
- */
-public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBase<Object> {
-  /**
-   * Batch of unsafe rows that we assemble and the current index we've 
returned. Every time this
-   * batch is used up (batchIdx == numBatched), we populated the batch.
-   */
-  private UnsafeRow[] rows = new UnsafeRow[64];
-  private int batchIdx = 0;
-  private int numBatched = 0;
-
-  /**
-   * Used to write variable length columns. Same length as `rows`.
-   */
-  private UnsafeRowWriter[] rowWriters = null;
-  /**
-   * True if the row contains variable length fields.
-   */
-  private boolean containsVarLenFields;
-
-  /**
-   * For each request column, the reader to read this column.
-   * columnsReaders[i] populated the UnsafeRow's attribute at i.
-   */
-  private ColumnReader[] columnReaders;
-
-  /**
-   * The number of rows that have been returned.
-   */
-  private long rowsReturned;
-
-  /**
-   * The number of rows that have been reading, including the current in 
flight row group.
-   */
-  private long totalCountLoadedSoFar = 0;
-
-  /**
-   * For each column, the annotated original type.
-   */
-  private OriginalType[] originalTypes;
-
-  /**
-   * The default size for varlen columns. The row grows as necessary to 
accommodate the
-   * largest column.
-   */
-  private static final int DEFAULT_VAR_LEN_SIZE = 32;
-
-  /**
-   * columnBatch object that is used for batch decoding. This is created on 
first use and triggers
-   * batched decoding. It is not valid to interleave calls to the batched 
interface with the row
-   * by row RecordReader APIs.
-   * This is only enabled with additional flags for development. This is still 
a work in progress
-   * and currently unsupported cases will fail with potentially difficult to 
diagnose errors.
-   * This should be only turned on for development to work on this feature.
-   *
-   * When this is set, the code will branch early on in the RecordReader APIs. 
There is no shared
-   * code between the path that uses the MR decoders and the vectorized ones.
-   *
-   * TODOs:
-   *  - Implement v2 page formats (just make sure we create the correct 
decoders).
-   */
-  private ColumnarBatch columnarBatch;
-
-  /**
-   * If true, this class returns batches instead of rows.
-   */
-  private boolean returnColumnarBatch;
-
-  /**
-   * The default config on whether columnarBatch should be offheap.
-   */
-  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
-
-  /**
-   * Tries to initialize the reader for this split. Returns true if this 
reader supports reading
-   * this split and false otherwise.
-   */
-  public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) {
-    try {
-      initialize(inputSplit, taskAttemptContext);
-      return true;
-    } catch (Exception e) {
-      return false;
-    }
-  }
-
-  /**
-   * Implementation of RecordReader API.
-   */
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
-      throws IOException, InterruptedException {
-    super.initialize(inputSplit, taskAttemptContext);
-    initializeInternal();
-  }
-
-  /**
-   * Utility API that will read all the data in path. This circumvents the 
need to create Hadoop
-   * objects to use this class. `columns` can contain the list of columns to 
project.
-   */
-  @Override
-  public void initialize(String path, List<String> columns) throws IOException 
{
-    super.initialize(path, columns);
-    initializeInternal();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (columnarBatch != null) {
-      columnarBatch.close();
-      columnarBatch = null;
-    }
-    super.close();
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (returnColumnarBatch) return nextBatch();
-
-    if (batchIdx >= numBatched) {
-      if (vectorizedDecode()) {
-        if (!nextBatch()) return false;
-      } else {
-        if (!loadBatch()) return false;
-      }
-    }
-    ++batchIdx;
-    return true;
-  }
-
-  @Override
-  public Object getCurrentValue() throws IOException, InterruptedException {
-    if (returnColumnarBatch) return columnarBatch;
-
-    if (vectorizedDecode()) {
-      return columnarBatch.getRow(batchIdx - 1);
-    } else {
-      return rows[batchIdx - 1];
-    }
-  }
-
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    return (float) rowsReturned / totalRowCount;
-  }
-
-  /**
-   * Returns the ColumnarBatch object that will be used for all rows returned 
by this reader.
-   * This object is reused. Calling this enables the vectorized reader. This 
should be called
-   * before any calls to nextKeyValue/nextBatch.
-   */
-  public ColumnarBatch resultBatch() {
-    return resultBatch(DEFAULT_MEMORY_MODE);
-  }
-
-  public ColumnarBatch resultBatch(MemoryMode memMode) {
-    if (columnarBatch == null) {
-      columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode);
-    }
-    return columnarBatch;
-  }
-
-  /**
-   * Can be called before any rows are returned to enable returning columnar 
batches directly.
-   */
-  public void enableReturningBatches() {
-    assert(vectorizedDecode());
-    returnColumnarBatch = true;
-  }
-
-  /**
-   * Advances to the next batch of rows. Returns false if there are no more.
-   */
-  public boolean nextBatch() throws IOException {
-    assert(vectorizedDecode());
-    columnarBatch.reset();
-    if (rowsReturned >= totalRowCount) return false;
-    checkEndOfRowGroup();
-
-    int num = (int)Math.min((long) columnarBatch.capacity(), 
totalCountLoadedSoFar - rowsReturned);
-    for (int i = 0; i < columnReaders.length; ++i) {
-      columnReaders[i].readBatch(num, columnarBatch.column(i));
-    }
-    rowsReturned += num;
-    columnarBatch.setNumRows(num);
-    numBatched = num;
-    batchIdx = 0;
-    return true;
-  }
-
-  /**
-   * Returns true if we are doing a vectorized decode.
-   */
-  private boolean vectorizedDecode() { return columnarBatch != null; }
-
-  private void initializeInternal() throws IOException {
-    /**
-     * Check that the requested schema is supported.
-     */
-    int numVarLenFields = 0;
-    originalTypes = new OriginalType[requestedSchema.getFieldCount()];
-    for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
-      Type t = requestedSchema.getFields().get(i);
-      if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
-        throw new IOException("Complex types not supported.");
-      }
-      PrimitiveType primitiveType = t.asPrimitiveType();
-
-      originalTypes[i] = t.getOriginalType();
-
-      // TODO: Be extremely cautious in what is supported. Expand this.
-      if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL 
&&
-          originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != 
OriginalType.DATE &&
-          originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != 
OriginalType.INT_16) {
-        throw new IOException("Unsupported type: " + t);
-      }
-      if (originalTypes[i] == OriginalType.DECIMAL &&
-          primitiveType.getDecimalMetadata().getPrecision() > 
Decimal.MAX_LONG_DIGITS()) {
-        throw new IOException("Decimal with high precision is not supported.");
-      }
-      if (primitiveType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.INT96) {
-        throw new IOException("Int96 not supported.");
-      }
-      ColumnDescriptor fd = 
fileSchema.getColumnDescription(requestedSchema.getPaths().get(i));
-      if (!fd.equals(requestedSchema.getColumns().get(i))) {
-        throw new IOException("Schema evolution not supported.");
-      }
-
-      if (primitiveType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.BINARY) {
-        ++numVarLenFields;
-      }
-    }
-
-    /**
-     * Initialize rows and rowWriters. These objects are reused across all 
rows in the relation.
-     */
-    containsVarLenFields = numVarLenFields > 0;
-    rowWriters = new UnsafeRowWriter[rows.length];
-
-    for (int i = 0; i < rows.length; ++i) {
-      rows[i] = new UnsafeRow(requestedSchema.getFieldCount());
-      BufferHolder holder = new BufferHolder(rows[i], numVarLenFields * 
DEFAULT_VAR_LEN_SIZE);
-      rowWriters[i] = new UnsafeRowWriter(holder, 
requestedSchema.getFieldCount());
-    }
-  }
-
-  /**
-   * Decodes a batch of values into `rows`. This function is the hot path.
-   */
-  private boolean loadBatch() throws IOException {
-    // no more records left
-    if (rowsReturned >= totalRowCount) { return false; }
-    checkEndOfRowGroup();
-
-    int num = (int)Math.min(rows.length, totalCountLoadedSoFar - rowsReturned);
-    rowsReturned += num;
-
-    if (containsVarLenFields) {
-      for (int i = 0; i < rowWriters.length; ++i) {
-        rowWriters[i].holder().reset();
-      }
-    }
-
-    for (int i = 0; i < columnReaders.length; ++i) {
-      switch (columnReaders[i].descriptor.getType()) {
-        case BOOLEAN:
-          decodeBooleanBatch(i, num);
-          break;
-        case INT32:
-          if (originalTypes[i] == OriginalType.DECIMAL) {
-            decodeIntAsDecimalBatch(i, num);
-          } else {
-            decodeIntBatch(i, num);
-          }
-          break;
-        case INT64:
-          Preconditions.checkState(originalTypes[i] == null
-              || originalTypes[i] == OriginalType.DECIMAL,
-              "Unexpected original type: " + originalTypes[i]);
-          decodeLongBatch(i, num);
-          break;
-        case FLOAT:
-          decodeFloatBatch(i, num);
-          break;
-        case DOUBLE:
-          decodeDoubleBatch(i, num);
-          break;
-        case BINARY:
-          decodeBinaryBatch(i, num);
-          break;
-        case FIXED_LEN_BYTE_ARRAY:
-          Preconditions.checkState(originalTypes[i] == OriginalType.DECIMAL,
-              "Unexpected original type: " + originalTypes[i]);
-          decodeFixedLenArrayAsDecimalBatch(i, num);
-          break;
-        case INT96:
-          throw new IOException("Unsupported " + 
columnReaders[i].descriptor.getType());
-      }
-    }
-
-    numBatched = num;
-    batchIdx = 0;
-
-    // Update the total row lengths if the schema contained variable length. 
We did not maintain
-    // this as we populated the columns.
-    if (containsVarLenFields) {
-      for (int i = 0; i < numBatched; ++i) {
-        rows[i].setTotalSize(rowWriters[i].holder().totalSize());
-      }
-    }
-
-    return true;
-  }
-
-  private void decodeBooleanBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setBoolean(col, columnReaders[col].nextBoolean());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeIntBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setInt(col, columnReaders[col].nextInt());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeIntAsDecimalBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        // Since this is stored as an INT, it is always a compact decimal. 
Just set it as a long.
-        rows[n].setLong(col, columnReaders[col].nextInt());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeLongBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setLong(col, columnReaders[col].nextLong());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeFloatBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setFloat(col, columnReaders[col].nextFloat());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeDoubleBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        rows[n].setDouble(col, columnReaders[col].nextDouble());
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeBinaryBatch(int col, int num) throws IOException {
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        ByteBuffer bytes = columnReaders[col].nextBinary().toByteBuffer();
-        int len = bytes.remaining();
-        if (originalTypes[col] == OriginalType.UTF8) {
-          UTF8String str =
-              UTF8String.fromBytes(bytes.array(), bytes.arrayOffset() + 
bytes.position(), len);
-          rowWriters[n].write(col, str);
-        } else {
-          rowWriters[n].write(col, bytes.array(), bytes.arrayOffset() + 
bytes.position(), len);
-        }
-        rows[n].setNotNullAt(col);
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  private void decodeFixedLenArrayAsDecimalBatch(int col, int num) throws 
IOException {
-    PrimitiveType type = 
requestedSchema.getFields().get(col).asPrimitiveType();
-    int precision = type.getDecimalMetadata().getPrecision();
-    int scale = type.getDecimalMetadata().getScale();
-    Preconditions.checkState(precision <= Decimal.MAX_LONG_DIGITS(),
-        "Unsupported precision.");
-
-    for (int n = 0; n < num; ++n) {
-      if (columnReaders[col].next()) {
-        Binary v = columnReaders[col].nextBinary();
-        // Constructs a `Decimal` with an unscaled `Long` value if possible.
-        long unscaled = CatalystRowConverter.binaryToUnscaledLong(v);
-        rows[n].setDecimal(col, Decimal.apply(unscaled, precision, scale), 
precision);
-      } else {
-        rows[n].setNullAt(col);
-      }
-    }
-  }
-
-  /**
-   *
-   * Decoder to return values from a single column.
-   */
-  private final class ColumnReader {
-    /**
-     * Total number of values read.
-     */
-    private long valuesRead;
-
-    /**
-     * value that indicates the end of the current page. That is,
-     * if valuesRead == endOfPageValueCount, we are at the end of the page.
-     */
-    private long endOfPageValueCount;
-
-    /**
-     * The dictionary, if this column has dictionary encoding.
-     */
-    private final Dictionary dictionary;
-
-    /**
-     * If true, the current page is dictionary encoded.
-     */
-    private boolean useDictionary;
-
-    /**
-     * Maximum definition level for this column.
-     */
-    private final int maxDefLevel;
-
-    /**
-     * Repetition/Definition/Value readers.
-     */
-    private IntIterator repetitionLevelColumn;
-    private IntIterator definitionLevelColumn;
-    private ValuesReader dataColumn;
-
-    // Only set if vectorized decoding is true. This is used instead of the 
row by row decoding
-    // with `definitionLevelColumn`.
-    private VectorizedRleValuesReader defColumn;
-
-    /**
-     * Total number of values in this column (in this row group).
-     */
-    private final long totalValueCount;
-
-    /**
-     * Total values in the current page.
-     */
-    private int pageValueCount;
-
-    private final PageReader pageReader;
-    private final ColumnDescriptor descriptor;
-
-    public ColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
-        throws IOException {
-      this.descriptor = descriptor;
-      this.pageReader = pageReader;
-      this.maxDefLevel = descriptor.getMaxDefinitionLevel();
-
-      DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
-      if (dictionaryPage != null) {
-        try {
-          this.dictionary = 
dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
-          this.useDictionary = true;
-        } catch (IOException e) {
-          throw new IOException("could not decode the dictionary for " + 
descriptor, e);
-        }
-      } else {
-        this.dictionary = null;
-        this.useDictionary = false;
-      }
-      this.totalValueCount = pageReader.getTotalValueCount();
-      if (totalValueCount == 0) {
-        throw new IOException("totalValueCount == 0");
-      }
-    }
-
-    /**
-     * TODO: Hoist the useDictionary branch to decode*Batch and make the batch 
page aligned.
-     */
-    public boolean nextBoolean() {
-      if (!useDictionary) {
-        return dataColumn.readBoolean();
-      } else {
-        return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId());
-      }
-    }
-
-    public int nextInt() {
-      if (!useDictionary) {
-        return dataColumn.readInteger();
-      } else {
-        return dictionary.decodeToInt(dataColumn.readValueDictionaryId());
-      }
-    }
-
-    public long nextLong() {
-      if (!useDictionary) {
-        return dataColumn.readLong();
-      } else {
-        return dictionary.decodeToLong(dataColumn.readValueDictionaryId());
-      }
-    }
-
-    public float nextFloat() {
-      if (!useDictionary) {
-        return dataColumn.readFloat();
-      } else {
-        return dictionary.decodeToFloat(dataColumn.readValueDictionaryId());
-      }
-    }
-
-    public double nextDouble() {
-      if (!useDictionary) {
-        return dataColumn.readDouble();
-      } else {
-        return dictionary.decodeToDouble(dataColumn.readValueDictionaryId());
-      }
-    }
-
-    public Binary nextBinary() {
-      if (!useDictionary) {
-        return dataColumn.readBytes();
-      } else {
-        return dictionary.decodeToBinary(dataColumn.readValueDictionaryId());
-      }
-    }
-
-    /**
-     * Advances to the next value. Returns true if the value is non-null.
-     */
-    private boolean next() throws IOException {
-      if (valuesRead >= endOfPageValueCount) {
-        if (valuesRead >= totalValueCount) {
-          // How do we get here? Throw end of stream exception?
-          return false;
-        }
-        readPage();
-      }
-      ++valuesRead;
-      // TODO: Don't read for flat schemas
-      //repetitionLevel = repetitionLevelColumn.nextInt();
-      return definitionLevelColumn.nextInt() == maxDefLevel;
-    }
-
-    /**
-     * Reads `total` values from this columnReader into column.
-     */
-    private void readBatch(int total, ColumnVector column) throws IOException {
-      int rowId = 0;
-      while (total > 0) {
-        // Compute the number of values we want to read in this page.
-        int leftInPage = (int)(endOfPageValueCount - valuesRead);
-        if (leftInPage == 0) {
-          readPage();
-          leftInPage = (int)(endOfPageValueCount - valuesRead);
-        }
-        int num = Math.min(total, leftInPage);
-        if (useDictionary) {
-          // Read and decode dictionary ids.
-          ColumnVector dictionaryIds = column.reserveDictionaryIds(total);;
-          defColumn.readIntegers(
-              num, dictionaryIds, column, rowId, maxDefLevel, 
(VectorizedValuesReader) dataColumn);
-          decodeDictionaryIds(rowId, num, column, dictionaryIds);
-        } else {
-          column.setDictionary(null);
-          switch (descriptor.getType()) {
-            case BOOLEAN:
-              readBooleanBatch(rowId, num, column);
-              break;
-            case INT32:
-              readIntBatch(rowId, num, column);
-              break;
-            case INT64:
-              readLongBatch(rowId, num, column);
-              break;
-            case FLOAT:
-              readFloatBatch(rowId, num, column);
-              break;
-            case DOUBLE:
-              readDoubleBatch(rowId, num, column);
-              break;
-            case BINARY:
-              readBinaryBatch(rowId, num, column);
-              break;
-            case FIXED_LEN_BYTE_ARRAY:
-              readFixedLenByteArrayBatch(rowId, num, column, 
descriptor.getTypeLength());
-              break;
-            default:
-              throw new IOException("Unsupported type: " + 
descriptor.getType());
-          }
-        }
-
-        valuesRead += num;
-        rowId += num;
-        total -= num;
-      }
-    }
-
-    /**
-     * Reads `num` values into column, decoding the values from 
`dictionaryIds` and `dictionary`.
-     */
-    private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
-                                     ColumnVector dictionaryIds) {
-      switch (descriptor.getType()) {
-        case INT32:
-        case INT64:
-        case FLOAT:
-        case DOUBLE:
-        case BINARY:
-          column.setDictionary(dictionary);
-          break;
-
-        case FIXED_LEN_BYTE_ARRAY:
-          // DecimalType written in the legacy mode
-          if (DecimalType.is32BitDecimalType(column.dataType())) {
-            for (int i = rowId; i < rowId + num; ++i) {
-              Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-              column.putInt(i, (int) 
CatalystRowConverter.binaryToUnscaledLong(v));
-            }
-          } else if (DecimalType.is64BitDecimalType(column.dataType())) {
-            for (int i = rowId; i < rowId + num; ++i) {
-              Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-              column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
-            }
-          } else {
-            throw new NotImplementedException();
-          }
-          break;
-
-        default:
-          throw new NotImplementedException("Unsupported type: " + 
descriptor.getType());
-      }
-    }
-
-    /**
-     * For all the read*Batch functions, reads `num` values from this 
columnReader into column. It
-     * is guaranteed that num is smaller than the number of values left in the 
current page.
-     */
-
-    private void readBooleanBatch(int rowId, int num, ColumnVector column) 
throws IOException {
-      assert(column.dataType() == DataTypes.BooleanType);
-      defColumn.readBooleans(
-          num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-    }
-
-    private void readIntBatch(int rowId, int num, ColumnVector column) throws 
IOException {
-      // This is where we implement support for the valid type conversions.
-      // TODO: implement remaining type conversions
-      if (column.dataType() == DataTypes.IntegerType || column.dataType() == 
DataTypes.DateType ||
-        DecimalType.is32BitDecimalType(column.dataType())) {
-        defColumn.readIntegers(
-            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-      } else if (column.dataType() == DataTypes.ByteType) {
-        defColumn.readBytes(
-            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-      } else if (column.dataType() == DataTypes.ShortType) {
-        defColumn.readShorts(
-            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-      } else {
-        throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
-      }
-    }
-
-    private void readLongBatch(int rowId, int num, ColumnVector column) throws 
IOException {
-      // This is where we implement support for the valid type conversions.
-      if (column.dataType() == DataTypes.LongType ||
-          DecimalType.is64BitDecimalType(column.dataType())) {
-        defColumn.readLongs(
-            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-      } else {
-        throw new UnsupportedOperationException("Unsupported conversion to: " 
+ column.dataType());
-      }
-    }
-
-    private void readFloatBatch(int rowId, int num, ColumnVector column) 
throws IOException {
-      // This is where we implement support for the valid type conversions.
-      // TODO: support implicit cast to double?
-      if (column.dataType() == DataTypes.FloatType) {
-        defColumn.readFloats(
-            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-      } else {
-        throw new UnsupportedOperationException("Unsupported conversion to: " 
+ column.dataType());
-      }
-    }
-
-    private void readDoubleBatch(int rowId, int num, ColumnVector column) 
throws IOException {
-      // This is where we implement support for the valid type conversions.
-      // TODO: implement remaining type conversions
-      if (column.dataType() == DataTypes.DoubleType) {
-        defColumn.readDoubles(
-            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-      } else {
-        throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
-      }
-    }
-
-    private void readBinaryBatch(int rowId, int num, ColumnVector column) 
throws IOException {
-      // This is where we implement support for the valid type conversions.
-      // TODO: implement remaining type conversions
-      if (column.isArray()) {
-        defColumn.readBinarys(
-            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
-      } else {
-        throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
-      }
-    }
-
-    private void readFixedLenByteArrayBatch(int rowId, int num,
-                                            ColumnVector column, int arrayLen) 
throws IOException {
-      VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
-      // This is where we implement support for the valid type conversions.
-      // TODO: implement remaining type conversions
-      if (DecimalType.is32BitDecimalType(column.dataType())) {
-        for (int i = 0; i < num; i++) {
-          if (defColumn.readInteger() == maxDefLevel) {
-            column.putInt(rowId + i,
-              (int) 
CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
-          } else {
-            column.putNull(rowId + i);
-          }
-        }
-      } else if (DecimalType.is64BitDecimalType(column.dataType())) {
-        for (int i = 0; i < num; i++) {
-          if (defColumn.readInteger() == maxDefLevel) {
-            column.putLong(rowId + i,
-                
CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
-          } else {
-            column.putNull(rowId + i);
-          }
-        }
-      } else {
-        throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
-      }
-    }
-
-    private void readPage() throws IOException {
-      DataPage page = pageReader.readPage();
-      // TODO: Why is this a visitor?
-      page.accept(new DataPage.Visitor<Void>() {
-        @Override
-        public Void visit(DataPageV1 dataPageV1) {
-          try {
-            readPageV1(dataPageV1);
-            return null;
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        }
-
-        @Override
-        public Void visit(DataPageV2 dataPageV2) {
-          try {
-            readPageV2(dataPageV2);
-            return null;
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        }
-      });
-    }
-
-    private void initDataReader(Encoding dataEncoding, byte[] bytes, int 
offset)throws IOException {
-      this.endOfPageValueCount = valuesRead + pageValueCount;
-      if (dataEncoding.usesDictionary()) {
-        this.dataColumn = null;
-        if (dictionary == null) {
-          throw new IOException(
-              "could not read page in col " + descriptor +
-                  " as the dictionary was missing for encoding " + 
dataEncoding);
-        }
-        if (vectorizedDecode()) {
-          @SuppressWarnings("deprecation")
-          Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow 
warning suppression
-          if (dataEncoding != plainDict && dataEncoding != 
Encoding.RLE_DICTIONARY) {
-            throw new NotImplementedException("Unsupported encoding: " + 
dataEncoding);
-          }
-          this.dataColumn = new VectorizedRleValuesReader();
-        } else {
-          this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
-              descriptor, VALUES, dictionary);
-        }
-        this.useDictionary = true;
-      } else {
-        if (vectorizedDecode()) {
-          if (dataEncoding != Encoding.PLAIN) {
-            throw new NotImplementedException("Unsupported encoding: " + 
dataEncoding);
-          }
-          this.dataColumn = new VectorizedPlainValuesReader();
-        } else {
-          this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
-        }
-        this.useDictionary = false;
-      }
-
-      try {
-        dataColumn.initFromPage(pageValueCount, bytes, offset);
-      } catch (IOException e) {
-        throw new IOException("could not read page in col " + descriptor, e);
-      }
-    }
-
-    private void readPageV1(DataPageV1 page) throws IOException {
-      this.pageValueCount = page.getValueCount();
-      ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, 
REPETITION_LEVEL);
-      ValuesReader dlReader;
-
-      // Initialize the decoders.
-      if (vectorizedDecode()) {
-        if (page.getDlEncoding() != Encoding.RLE && 
descriptor.getMaxDefinitionLevel() != 0) {
-          throw new NotImplementedException("Unsupported encoding: " + 
page.getDlEncoding());
-        }
-        int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
-        this.defColumn = new VectorizedRleValuesReader(bitWidth);
-        dlReader = this.defColumn;
-      } else {
-        dlReader = page.getDlEncoding().getValuesReader(descriptor, 
DEFINITION_LEVEL);
-      }
-      this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
-      this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
-      try {
-        byte[] bytes = page.getBytes().toByteArray();
-        rlReader.initFromPage(pageValueCount, bytes, 0);
-        int next = rlReader.getNextOffset();
-        dlReader.initFromPage(pageValueCount, bytes, next);
-        next = dlReader.getNextOffset();
-        initDataReader(page.getValueEncoding(), bytes, next);
-      } catch (IOException e) {
-        throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
-      }
-    }
-
-    private void readPageV2(DataPageV2 page) throws IOException {
-      this.pageValueCount = page.getValueCount();
-      this.repetitionLevelColumn = 
createRLEIterator(descriptor.getMaxRepetitionLevel(),
-          page.getRepetitionLevels(), descriptor);
-
-      if (vectorizedDecode()) {
-        int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
-        this.defColumn = new VectorizedRleValuesReader(bitWidth);
-        this.definitionLevelColumn = new 
ValuesReaderIntIterator(this.defColumn);
-        this.defColumn.initFromBuffer(
-            this.pageValueCount, page.getDefinitionLevels().toByteArray());
-      } else {
-        this.definitionLevelColumn = 
createRLEIterator(descriptor.getMaxDefinitionLevel(),
-            page.getDefinitionLevels(), descriptor);
-      }
-      try {
-        initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 
0);
-      } catch (IOException e) {
-        throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
-      }
-    }
-  }
-
-  private void checkEndOfRowGroup() throws IOException {
-    if (rowsReturned != totalCountLoadedSoFar) return;
-    PageReadStore pages = reader.readNextRowGroup();
-    if (pages == null) {
-      throw new IOException("expecting more rows but reached last block. Read "
-          + rowsReturned + " out of " + totalRowCount);
-    }
-    List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    columnReaders = new ColumnReader[columns.size()];
-    for (int i = 0; i < columns.size(); ++i) {
-      columnReaders[i] = new ColumnReader(columns.get(i), 
pages.getPageReader(columns.get(i)));
-    }
-    totalCountLoadedSoFar += pages.getRowCount();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
new file mode 100644
index 0000000..0f00f56
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -0,0 +1,701 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.*;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+
+/**
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches 
directly using the
+ * Parquet column APIs. This is somewhat based on parquet-mr's ColumnReader.
+ *
+ * TODO: handle complex types, decimal requiring more than 8 bytes, INT96. 
Schema mismatch.
+ * All of these can be handled efficiently and easily with codegen.
+ *
+ * This class can either return InternalRows or ColumnarBatches. With whole 
stage codegen
+ * enabled, this class returns ColumnarBatches which offers significant 
performance gains.
+ * TODO: make this always return ColumnarBatches.
+ */
+public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBase<Object> {
+  /**
+   * Batch of rows that we assemble and the current index we've returned. 
Every time this
+   * batch is used up (batchIdx == numBatched), we populated the batch.
+   */
+  private int batchIdx = 0;
+  private int numBatched = 0;
+
+  /**
+   * For each request column, the reader to read this column.
+   */
+  private VectorizedColumnReader[] columnReaders;
+
+  /**
+   * The number of rows that have been returned.
+   */
+  private long rowsReturned;
+
+  /**
+   * The number of rows that have been reading, including the current in 
flight row group.
+   */
+  private long totalCountLoadedSoFar = 0;
+
+  /**
+   * columnBatch object that is used for batch decoding. This is created on 
first use and triggers
+   * batched decoding. It is not valid to interleave calls to the batched 
interface with the row
+   * by row RecordReader APIs.
+   * This is only enabled with additional flags for development. This is still 
a work in progress
+   * and currently unsupported cases will fail with potentially difficult to 
diagnose errors.
+   * This should be only turned on for development to work on this feature.
+   *
+   * When this is set, the code will branch early on in the RecordReader APIs. 
There is no shared
+   * code between the path that uses the MR decoders and the vectorized ones.
+   *
+   * TODOs:
+   *  - Implement v2 page formats (just make sure we create the correct 
decoders).
+   */
+  private ColumnarBatch columnarBatch;
+
+  /**
+   * If true, this class returns batches instead of rows.
+   */
+  private boolean returnColumnarBatch;
+
+  /**
+   * The default config on whether columnarBatch should be offheap.
+   */
+  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+
+  /**
+   * Tries to initialize the reader for this split. Returns true if this 
reader supports reading
+   * this split and false otherwise.
+   */
+  public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) {
+    try {
+      initialize(inputSplit, taskAttemptContext);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
+      throws IOException, InterruptedException {
+    super.initialize(inputSplit, taskAttemptContext);
+    initializeInternal();
+  }
+
+  /**
+   * Utility API that will read all the data in path. This circumvents the 
need to create Hadoop
+   * objects to use this class. `columns` can contain the list of columns to 
project.
+   */
+  @Override
+  public void initialize(String path, List<String> columns) throws IOException 
{
+    super.initialize(path, columns);
+    initializeInternal();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (columnarBatch != null) {
+      columnarBatch.close();
+      columnarBatch = null;
+    }
+    super.close();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    resultBatch();
+
+    if (returnColumnarBatch) return nextBatch();
+
+    if (batchIdx >= numBatched) {
+      if (!nextBatch()) return false;
+    }
+    ++batchIdx;
+    return true;
+  }
+
+  @Override
+  public Object getCurrentValue() throws IOException, InterruptedException {
+    if (returnColumnarBatch) return columnarBatch;
+    return columnarBatch.getRow(batchIdx - 1);
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return (float) rowsReturned / totalRowCount;
+  }
+
+  /**
+   * Returns the ColumnarBatch object that will be used for all rows returned 
by this reader.
+   * This object is reused. Calling this enables the vectorized reader. This 
should be called
+   * before any calls to nextKeyValue/nextBatch.
+   */
+  public ColumnarBatch resultBatch() {
+    return resultBatch(DEFAULT_MEMORY_MODE);
+  }
+
+  public ColumnarBatch resultBatch(MemoryMode memMode) {
+    if (columnarBatch == null) {
+      columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode);
+    }
+    return columnarBatch;
+  }
+
+  /**
+   * Can be called before any rows are returned to enable returning columnar 
batches directly.
+   */
+  public void enableReturningBatches() {
+    returnColumnarBatch = true;
+  }
+
+  /**
+   * Advances to the next batch of rows. Returns false if there are no more.
+   */
+  public boolean nextBatch() throws IOException {
+    columnarBatch.reset();
+    if (rowsReturned >= totalRowCount) return false;
+    checkEndOfRowGroup();
+
+    int num = (int) Math.min((long) columnarBatch.capacity(), 
totalCountLoadedSoFar - rowsReturned);
+    for (int i = 0; i < columnReaders.length; ++i) {
+      columnReaders[i].readBatch(num, columnarBatch.column(i));
+    }
+    rowsReturned += num;
+    columnarBatch.setNumRows(num);
+    numBatched = num;
+    batchIdx = 0;
+    return true;
+  }
+
+  private void initializeInternal() throws IOException {
+    /**
+     * Check that the requested schema is supported.
+     */
+    OriginalType[] originalTypes = new 
OriginalType[requestedSchema.getFieldCount()];
+    for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
+      Type t = requestedSchema.getFields().get(i);
+      if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
+        throw new IOException("Complex types not supported.");
+      }
+      PrimitiveType primitiveType = t.asPrimitiveType();
+
+      originalTypes[i] = t.getOriginalType();
+
+      // TODO: Be extremely cautious in what is supported. Expand this.
+      if (originalTypes[i] != null && originalTypes[i] != OriginalType.DECIMAL 
&&
+          originalTypes[i] != OriginalType.UTF8 && originalTypes[i] != 
OriginalType.DATE &&
+          originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != 
OriginalType.INT_16) {
+        throw new IOException("Unsupported type: " + t);
+      }
+      if (originalTypes[i] == OriginalType.DECIMAL &&
+          primitiveType.getDecimalMetadata().getPrecision() > 
Decimal.MAX_LONG_DIGITS()) {
+        throw new IOException("Decimal with high precision is not supported.");
+      }
+      if (primitiveType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.INT96) {
+        throw new IOException("Int96 not supported.");
+      }
+      ColumnDescriptor fd = 
fileSchema.getColumnDescription(requestedSchema.getPaths().get(i));
+      if (!fd.equals(requestedSchema.getColumns().get(i))) {
+        throw new IOException("Schema evolution not supported.");
+      }
+    }
+  }
+
+  /**
+   * Decoder to return values from a single column.
+   */
+  private class VectorizedColumnReader {
+    /**
+     * Total number of values read.
+     */
+    private long valuesRead;
+
+    /**
+     * value that indicates the end of the current page. That is,
+     * if valuesRead == endOfPageValueCount, we are at the end of the page.
+     */
+    private long endOfPageValueCount;
+
+    /**
+     * The dictionary, if this column has dictionary encoding.
+     */
+    private final Dictionary dictionary;
+
+    /**
+     * If true, the current page is dictionary encoded.
+     */
+    private boolean useDictionary;
+
+    /**
+     * Maximum definition level for this column.
+     */
+    private final int maxDefLevel;
+
+    /**
+     * Repetition/Definition/Value readers.
+     */
+    private IntIterator repetitionLevelColumn;
+    private IntIterator definitionLevelColumn;
+    private ValuesReader dataColumn;
+
+    // Only set if vectorized decoding is true. This is used instead of the 
row by row decoding
+    // with `definitionLevelColumn`.
+    private VectorizedRleValuesReader defColumn;
+
+    /**
+     * Total number of values in this column (in this row group).
+     */
+    private final long totalValueCount;
+
+    /**
+     * Total values in the current page.
+     */
+    private int pageValueCount;
+
+    private final PageReader pageReader;
+    private final ColumnDescriptor descriptor;
+
+    public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader 
pageReader)
+        throws IOException {
+      this.descriptor = descriptor;
+      this.pageReader = pageReader;
+      this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+
+      DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+      if (dictionaryPage != null) {
+        try {
+          this.dictionary = 
dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
+          this.useDictionary = true;
+        } catch (IOException e) {
+          throw new IOException("could not decode the dictionary for " + 
descriptor, e);
+        }
+      } else {
+        this.dictionary = null;
+        this.useDictionary = false;
+      }
+      this.totalValueCount = pageReader.getTotalValueCount();
+      if (totalValueCount == 0) {
+        throw new IOException("totalValueCount == 0");
+      }
+    }
+
+    /**
+     * TODO: Hoist the useDictionary branch to decode*Batch and make the batch 
page aligned.
+     */
+    public boolean nextBoolean() {
+      if (!useDictionary) {
+        return dataColumn.readBoolean();
+      } else {
+        return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId());
+      }
+    }
+
+    public int nextInt() {
+      if (!useDictionary) {
+        return dataColumn.readInteger();
+      } else {
+        return dictionary.decodeToInt(dataColumn.readValueDictionaryId());
+      }
+    }
+
+    public long nextLong() {
+      if (!useDictionary) {
+        return dataColumn.readLong();
+      } else {
+        return dictionary.decodeToLong(dataColumn.readValueDictionaryId());
+      }
+    }
+
+    public float nextFloat() {
+      if (!useDictionary) {
+        return dataColumn.readFloat();
+      } else {
+        return dictionary.decodeToFloat(dataColumn.readValueDictionaryId());
+      }
+    }
+
+    public double nextDouble() {
+      if (!useDictionary) {
+        return dataColumn.readDouble();
+      } else {
+        return dictionary.decodeToDouble(dataColumn.readValueDictionaryId());
+      }
+    }
+
+    public Binary nextBinary() {
+      if (!useDictionary) {
+        return dataColumn.readBytes();
+      } else {
+        return dictionary.decodeToBinary(dataColumn.readValueDictionaryId());
+      }
+    }
+
+    /**
+     * Advances to the next value. Returns true if the value is non-null.
+     */
+    private boolean next() throws IOException {
+      if (valuesRead >= endOfPageValueCount) {
+        if (valuesRead >= totalValueCount) {
+          // How do we get here? Throw end of stream exception?
+          return false;
+        }
+        readPage();
+      }
+      ++valuesRead;
+      // TODO: Don't read for flat schemas
+      //repetitionLevel = repetitionLevelColumn.nextInt();
+      return definitionLevelColumn.nextInt() == maxDefLevel;
+    }
+
+    /**
+     * Reads `total` values from this columnReader into column.
+     */
+    private void readBatch(int total, ColumnVector column) throws IOException {
+      int rowId = 0;
+      while (total > 0) {
+        // Compute the number of values we want to read in this page.
+        int leftInPage = (int)(endOfPageValueCount - valuesRead);
+        if (leftInPage == 0) {
+          readPage();
+          leftInPage = (int)(endOfPageValueCount - valuesRead);
+        }
+        int num = Math.min(total, leftInPage);
+        if (useDictionary) {
+          // Read and decode dictionary ids.
+          ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
+          defColumn.readIntegers(
+              num, dictionaryIds, column, rowId, maxDefLevel, 
(VectorizedValuesReader) dataColumn);
+          decodeDictionaryIds(rowId, num, column, dictionaryIds);
+        } else {
+          column.setDictionary(null);
+          switch (descriptor.getType()) {
+            case BOOLEAN:
+              readBooleanBatch(rowId, num, column);
+              break;
+            case INT32:
+              readIntBatch(rowId, num, column);
+              break;
+            case INT64:
+              readLongBatch(rowId, num, column);
+              break;
+            case FLOAT:
+              readFloatBatch(rowId, num, column);
+              break;
+            case DOUBLE:
+              readDoubleBatch(rowId, num, column);
+              break;
+            case BINARY:
+              readBinaryBatch(rowId, num, column);
+              break;
+            case FIXED_LEN_BYTE_ARRAY:
+              readFixedLenByteArrayBatch(rowId, num, column, 
descriptor.getTypeLength());
+              break;
+            default:
+              throw new IOException("Unsupported type: " + 
descriptor.getType());
+          }
+        }
+
+        valuesRead += num;
+        rowId += num;
+        total -= num;
+      }
+    }
+
+    /**
+     * Reads `num` values into column, decoding the values from 
`dictionaryIds` and `dictionary`.
+     */
+    private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
+                                     ColumnVector dictionaryIds) {
+      switch (descriptor.getType()) {
+        case INT32:
+        case INT64:
+        case FLOAT:
+        case DOUBLE:
+        case BINARY:
+          column.setDictionary(dictionary);
+          break;
+
+        case FIXED_LEN_BYTE_ARRAY:
+          // DecimalType written in the legacy mode
+          if (DecimalType.is32BitDecimalType(column.dataType())) {
+            for (int i = rowId; i < rowId + num; ++i) {
+              Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+              column.putInt(i, (int) 
CatalystRowConverter.binaryToUnscaledLong(v));
+            }
+          } else if (DecimalType.is64BitDecimalType(column.dataType())) {
+            for (int i = rowId; i < rowId + num; ++i) {
+              Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+              column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v));
+            }
+          } else {
+            throw new NotImplementedException();
+          }
+          break;
+
+        default:
+          throw new NotImplementedException("Unsupported type: " + 
descriptor.getType());
+      }
+    }
+
+    /**
+     * For all the read*Batch functions, reads `num` values from this 
columnReader into column. It
+     * is guaranteed that num is smaller than the number of values left in the 
current page.
+     */
+
+    private void readBooleanBatch(int rowId, int num, ColumnVector column) 
throws IOException {
+      assert(column.dataType() == DataTypes.BooleanType);
+      defColumn.readBooleans(
+          num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+    }
+
+    private void readIntBatch(int rowId, int num, ColumnVector column) throws 
IOException {
+      // This is where we implement support for the valid type conversions.
+      // TODO: implement remaining type conversions
+      if (column.dataType() == DataTypes.IntegerType || column.dataType() == 
DataTypes.DateType ||
+        DecimalType.is32BitDecimalType(column.dataType())) {
+        defColumn.readIntegers(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+      } else if (column.dataType() == DataTypes.ByteType) {
+        defColumn.readBytes(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+      } else if (column.dataType() == DataTypes.ShortType) {
+        defColumn.readShorts(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+      } else {
+        throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+      }
+    }
+
+    private void readLongBatch(int rowId, int num, ColumnVector column) throws 
IOException {
+      // This is where we implement support for the valid type conversions.
+      if (column.dataType() == DataTypes.LongType ||
+          DecimalType.is64BitDecimalType(column.dataType())) {
+        defColumn.readLongs(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+      } else {
+        throw new UnsupportedOperationException("Unsupported conversion to: " 
+ column.dataType());
+      }
+    }
+
+    private void readFloatBatch(int rowId, int num, ColumnVector column) 
throws IOException {
+      // This is where we implement support for the valid type conversions.
+      // TODO: support implicit cast to double?
+      if (column.dataType() == DataTypes.FloatType) {
+        defColumn.readFloats(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+      } else {
+        throw new UnsupportedOperationException("Unsupported conversion to: " 
+ column.dataType());
+      }
+    }
+
+    private void readDoubleBatch(int rowId, int num, ColumnVector column) 
throws IOException {
+      // This is where we implement support for the valid type conversions.
+      // TODO: implement remaining type conversions
+      if (column.dataType() == DataTypes.DoubleType) {
+        defColumn.readDoubles(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+      } else {
+        throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+      }
+    }
+
+    private void readBinaryBatch(int rowId, int num, ColumnVector column) 
throws IOException {
+      // This is where we implement support for the valid type conversions.
+      // TODO: implement remaining type conversions
+      if (column.isArray()) {
+        defColumn.readBinarys(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader) 
dataColumn);
+      } else {
+        throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+      }
+    }
+
+    private void readFixedLenByteArrayBatch(int rowId, int num,
+                                            ColumnVector column, int arrayLen) 
throws IOException {
+      VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
+      // This is where we implement support for the valid type conversions.
+      // TODO: implement remaining type conversions
+      if (DecimalType.is32BitDecimalType(column.dataType())) {
+        for (int i = 0; i < num; i++) {
+          if (defColumn.readInteger() == maxDefLevel) {
+            column.putInt(rowId + i,
+              (int) 
CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
+          } else {
+            column.putNull(rowId + i);
+          }
+        }
+      } else if (DecimalType.is64BitDecimalType(column.dataType())) {
+        for (int i = 0; i < num; i++) {
+          if (defColumn.readInteger() == maxDefLevel) {
+            column.putLong(rowId + i,
+                
CatalystRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
+          } else {
+            column.putNull(rowId + i);
+          }
+        }
+      } else {
+        throw new NotImplementedException("Unimplemented type: " + 
column.dataType());
+      }
+    }
+
+    private void readPage() throws IOException {
+      DataPage page = pageReader.readPage();
+      // TODO: Why is this a visitor?
+      page.accept(new DataPage.Visitor<Void>() {
+        @Override
+        public Void visit(DataPageV1 dataPageV1) {
+          try {
+            readPageV1(dataPageV1);
+            return null;
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+        @Override
+        public Void visit(DataPageV2 dataPageV2) {
+          try {
+            readPageV2(dataPageV2);
+            return null;
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    private void initDataReader(Encoding dataEncoding, byte[] bytes, int 
offset) throws IOException {
+      this.endOfPageValueCount = valuesRead + pageValueCount;
+      if (dataEncoding.usesDictionary()) {
+        this.dataColumn = null;
+        if (dictionary == null) {
+          throw new IOException(
+              "could not read page in col " + descriptor +
+                  " as the dictionary was missing for encoding " + 
dataEncoding);
+        }
+        @SuppressWarnings("deprecation")
+        Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow 
warning suppression
+        if (dataEncoding != plainDict && dataEncoding != 
Encoding.RLE_DICTIONARY) {
+          throw new NotImplementedException("Unsupported encoding: " + 
dataEncoding);
+        }
+        this.dataColumn = new VectorizedRleValuesReader();
+        this.useDictionary = true;
+      } else {
+        if (dataEncoding != Encoding.PLAIN) {
+          throw new NotImplementedException("Unsupported encoding: " + 
dataEncoding);
+        }
+        this.dataColumn = new VectorizedPlainValuesReader();
+        this.useDictionary = false;
+      }
+
+      try {
+        dataColumn.initFromPage(pageValueCount, bytes, offset);
+      } catch (IOException e) {
+        throw new IOException("could not read page in col " + descriptor, e);
+      }
+    }
+
+    private void readPageV1(DataPageV1 page) throws IOException {
+      this.pageValueCount = page.getValueCount();
+      ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, 
REPETITION_LEVEL);
+      ValuesReader dlReader;
+
+      // Initialize the decoders.
+      if (page.getDlEncoding() != Encoding.RLE && 
descriptor.getMaxDefinitionLevel() != 0) {
+        throw new NotImplementedException("Unsupported encoding: " + 
page.getDlEncoding());
+      }
+      int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+      this.defColumn = new VectorizedRleValuesReader(bitWidth);
+      dlReader = this.defColumn;
+      this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+      this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+      try {
+        byte[] bytes = page.getBytes().toByteArray();
+        rlReader.initFromPage(pageValueCount, bytes, 0);
+        int next = rlReader.getNextOffset();
+        dlReader.initFromPage(pageValueCount, bytes, next);
+        next = dlReader.getNextOffset();
+        initDataReader(page.getValueEncoding(), bytes, next);
+      } catch (IOException e) {
+        throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
+      }
+    }
+
+    private void readPageV2(DataPageV2 page) throws IOException {
+      this.pageValueCount = page.getValueCount();
+      this.repetitionLevelColumn = 
createRLEIterator(descriptor.getMaxRepetitionLevel(),
+          page.getRepetitionLevels(), descriptor);
+
+      int bitWidth = 
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+      this.defColumn = new VectorizedRleValuesReader(bitWidth);
+      this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
+      this.defColumn.initFromBuffer(
+          this.pageValueCount, page.getDefinitionLevels().toByteArray());
+      try {
+        initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 
0);
+      } catch (IOException e) {
+        throw new IOException("could not read page " + page + " in col " + 
descriptor, e);
+      }
+    }
+  }
+
+  private void checkEndOfRowGroup() throws IOException {
+    if (rowsReturned != totalCountLoadedSoFar) return;
+    PageReadStore pages = reader.readNextRowGroup();
+    if (pages == null) {
+      throw new IOException("expecting more rows but reached last block. Read "
+          + rowsReturned + " out of " + totalRowCount);
+    }
+    List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    columnReaders = new VectorizedColumnReader[columns.size()];
+    for (int i = 0; i < columns.size(); ++i) {
+      columnReaders[i] = new VectorizedColumnReader(columns.get(i),
+          pages.getPageReader(columns.get(i)));
+    }
+    totalCountLoadedSoFar += pages.getRowCount();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 2abfd14..cd769d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -160,6 +160,15 @@ case class SetCommand(kv: Option[(String, 
Option[String])]) extends RunnableComm
       }
       (keyValueOutput, runFunc)
 
+    case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, 
Some(value))) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property 
${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " +
+            s"deprecated and will be ignored. Vectorized parquet reader will 
be used instead.")
+        Seq(Row(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, "true"))
+      }
+      (keyValueOutput, runFunc)
+
     // Configures a single property.
     case Some((key, Some(value))) =>
       val runFunc = (sqlContext: SQLContext) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index e848f42..f3514cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -33,9 +33,9 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.SQLContext
-import 
org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
+import 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
 
@@ -99,8 +99,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
 
   // If true, enable using the custom RecordReader for parquet. This only 
works for
   // a subset of the types (no complex types).
-  protected val enableUnsafeRowParquetReader: Boolean =
-    
sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean
   protected val enableVectorizedParquetReader: Boolean =
     sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
   protected val enableWholestageCodegen: Boolean =
@@ -174,19 +172,17 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
         * fails (for example, unsupported schema), try with the normal reader.
         * TODO: plumb this through a different way?
         */
-      if (enableUnsafeRowParquetReader &&
+      if (enableVectorizedParquetReader &&
         format.getClass.getName == 
"org.apache.parquet.hadoop.ParquetInputFormat") {
-        val parquetReader: UnsafeRowParquetRecordReader = new 
UnsafeRowParquetRecordReader()
+        val parquetReader: VectorizedParquetRecordReader = new 
VectorizedParquetRecordReader()
         if (!parquetReader.tryInitialize(
             split.serializableHadoopSplit.value, hadoopAttemptContext)) {
           parquetReader.close()
         } else {
           reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
-          if (enableVectorizedParquetReader) {
-            parquetReader.resultBatch()
-            // Whole stage codegen (PhysicalRDD) is able to deal with batches 
directly
-            if (enableWholestageCodegen) 
parquetReader.enableReturningBatches();
-          }
+          parquetReader.resultBatch()
+          // Whole stage codegen (PhysicalRDD) is able to deal with batches 
directly
+          if (enableWholestageCodegen) parquetReader.enableReturningBatches()
         }
       }
 
@@ -203,7 +199,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
       private[this] var finished = false
 
       override def hasNext: Boolean = {
-        if (context.isInterrupted) {
+        if (context.isInterrupted()) {
           throw new TaskKilledException
         }
         if (!finished && !havePair) {

http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c308161..473cde5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -345,11 +345,6 @@ object SQLConf {
       "option must be set in Hadoop Configuration.  2. This option overrides " 
+
       "\"spark.sql.sources.outputCommitterClass\".")
 
-  val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf(
-    key = "spark.sql.parquet.enableUnsafeRowRecordReader",
-    defaultValue = Some(true),
-    doc = "Enables using the custom ParquetUnsafeRowRecordReader.")
-
   val PARQUET_VECTORIZED_READER_ENABLED = booleanConf(
     key = "spark.sql.parquet.enableVectorizedReader",
     defaultValue = Some(true),
@@ -527,6 +522,7 @@ object SQLConf {
     val CODEGEN_ENABLED = "spark.sql.codegen"
     val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
     val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
+    val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = 
"spark.sql.parquet.enableUnsafeRowRecordReader"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 29318d8..f42c754 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -36,7 +36,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest 
with SharedSQLContex
         
List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = 
SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
 
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())
@@ -61,7 +61,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest 
with SharedSQLContex
         data.repartition(1).write.parquet(dir.getCanonicalPath)
         val file = 
SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
 
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         reader.initialize(file.asInstanceOf[String], null)
         val batch = reader.resultBatch()
         assert(reader.nextBatch())

http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index b394ffb..51183e9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -57,7 +57,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
     val output = predicate.collect { case a: Attribute => a }.distinct
 
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
"false") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         val query = df
           .select(output.map(e => Column(e)): _*)
           .where(Column(predicate))
@@ -446,7 +446,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
   test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
     import testImplicits._
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
"false") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         withTempPath { dir =>
           val path = s"${dir.getCanonicalPath}/part=1"
           (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
@@ -520,7 +520,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest 
with SharedSQLContex
   test("SPARK-11164: test the parquet filter in") {
     import testImplicits._
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
"false") {
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         withTempPath { dir =>
           val path = s"${dir.getCanonicalPath}/table1"
           (1 to 5).map(i => (i.toFloat, i%3)).toDF("a", 
"b").write.parquet(path)

http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4d9a8d7..ebdb105 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -656,7 +656,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       var hash1: Int = 0
       var hash2: Int = 0
       (false :: true :: Nil).foreach { v =>
-        withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
v.toString) {
+        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
v.toString) {
           val df = sqlContext.read.parquet(dir.getCanonicalPath)
           val rows = df.queryExecution.toRdd.map(_.copy()).collect()
           val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow])
@@ -672,13 +672,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
     }
   }
 
-  test("UnsafeRowParquetRecordReader - direct path read") {
-    val data = (0 to 10).map(i => (i, ((i + 'a').toChar.toString)))
+  test("VectorizedParquetRecordReader - direct path read") {
+    val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString))
     withTempPath { dir =>
       
sqlContext.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
       val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
       {
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         try {
           reader.initialize(file, null)
           val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -695,7 +695,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 
       // Project just one column
       {
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         try {
           reader.initialize(file, ("_2" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String)]
@@ -711,7 +711,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 
       // Project columns in opposite order
       {
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         try {
           reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
           val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -728,7 +728,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 
       // Empty projection
       {
-        val reader = new UnsafeRowParquetRecordReader
+        val reader = new VectorizedParquetRecordReader
         try {
           reader.initialize(file, List[String]().asJava)
           var result = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/54794113/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 15bf00e..070c400 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -82,38 +82,17 @@ object ParquetReadBenchmark {
         }
 
         sqlBenchmark.addCase("SQL Parquet MR") { iter =>
-          withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
"false") {
-            sqlContext.sql("select sum(id) from tempTable").collect()
-          }
-        }
-
-        sqlBenchmark.addCase("SQL Parquet Non-Vectorized") { iter =>
           withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
"false") {
             sqlContext.sql("select sum(id) from tempTable").collect()
           }
         }
 
         val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
-        // Driving the parquet reader directly without Spark.
-        parquetReaderBenchmark.addCase("ParquetReader Non-Vectorized") { num =>
-          var sum = 0L
-          files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
-            reader.initialize(p, ("id" :: Nil).asJava)
-
-            while (reader.nextKeyValue()) {
-              val record = reader.getCurrentValue.asInstanceOf[InternalRow]
-              if (!record.isNullAt(0)) sum += record.getInt(0)
-            }
-            reader.close()
-          }
-        }
-
         // Driving the parquet reader in batch mode directly.
         parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
+            val reader = new VectorizedParquetRecordReader
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -136,7 +115,7 @@ object ParquetReadBenchmark {
         parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { 
num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
+            val reader = new VectorizedParquetRecordReader
             try {
               reader.initialize(p, ("id" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -159,7 +138,6 @@ object ParquetReadBenchmark {
         
-------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                    215 /  262         73.0      
    13.7       1.0X
         SQL Parquet MR                           1946 / 2083          8.1      
   123.7       0.1X
-        SQL Parquet Non-Vectorized               1079 / 1213         14.6      
    68.6       0.2X
         */
         sqlBenchmark.run()
 
@@ -167,9 +145,8 @@ object ParquetReadBenchmark {
         Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
         Parquet Reader Single Int Column    Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
         
-------------------------------------------------------------------------------------------
-        ParquetReader Non-Vectorized              610 /  737         25.8      
    38.8       1.0X
-        ParquetReader Vectorized                  123 /  152        127.8      
     7.8       5.0X
-        ParquetReader Vectorized -> Row           165 /  180         95.2      
    10.5       3.7X
+        ParquetReader Vectorized                  123 /  152        127.8      
     7.8       1.0X
+        ParquetReader Vectorized -> Row           165 /  180         95.2      
    10.5       0.7X
         */
         parquetReaderBenchmark.run()
       }
@@ -191,32 +168,12 @@ object ParquetReadBenchmark {
         }
 
         benchmark.addCase("SQL Parquet MR") { iter =>
-          withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
"false") {
-            sqlContext.sql("select sum(c1), sum(length(c2)) from 
tempTable").collect
-          }
-        }
-
-        benchmark.addCase("SQL Parquet Non-vectorized") { iter =>
           withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
"false") {
             sqlContext.sql("select sum(c1), sum(length(c2)) from 
tempTable").collect
           }
         }
 
         val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
-        benchmark.addCase("ParquetReader Non-vectorized") { num =>
-          var sum1 = 0L
-          var sum2 = 0L
-          files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
-            reader.initialize(p, null)
-            while (reader.nextKeyValue()) {
-              val record = reader.getCurrentValue.asInstanceOf[InternalRow]
-              if (!record.isNullAt(0)) sum1 += record.getInt(0)
-              if (!record.isNullAt(1)) sum2 += 
record.getUTF8String(1).numBytes()
-            }
-            reader.close()
-          }
-        }
 
         /*
         Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
@@ -224,8 +181,6 @@ object ParquetReadBenchmark {
         
-------------------------------------------------------------------------------------------
         SQL Parquet Vectorized                    628 /  720         16.7      
    59.9       1.0X
         SQL Parquet MR                           1905 / 2239          5.5      
   181.7       0.3X
-        SQL Parquet Non-vectorized               1429 / 1732          7.3      
   136.3       0.4X
-        ParquetReader Non-vectorized              989 / 1357         10.6      
    94.3       0.6X
         */
         benchmark.run()
       }
@@ -247,7 +202,7 @@ object ParquetReadBenchmark {
         }
 
         benchmark.addCase("SQL Parquet MR") { iter =>
-          withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
"false") {
+          withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> 
"false") {
             sqlContext.sql("select sum(length(c1)) from tempTable").collect
           }
         }
@@ -293,7 +248,7 @@ object ParquetReadBenchmark {
         Read data column                          191 /  250         82.1      
    12.2       1.0X
         Read partition column                      82 /   86        192.4      
     5.2       2.3X
         Read both columns                         220 /  248         71.5      
    14.0       0.9X
-         */
+        */
         benchmark.run()
       }
     }
@@ -319,7 +274,7 @@ object ParquetReadBenchmark {
         benchmark.addCase("PR Vectorized") { num =>
           var sum = 0
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
+            val reader = new VectorizedParquetRecordReader
             try {
               reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
               val batch = reader.resultBatch()
@@ -340,7 +295,7 @@ object ParquetReadBenchmark {
         benchmark.addCase("PR Vectorized (Null Filtering)") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
-            val reader = new UnsafeRowParquetRecordReader
+            val reader = new VectorizedParquetRecordReader
             try {
               reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
               val batch = reader.resultBatch()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to