Repository: spark
Updated Branches:
  refs/heads/branch-2.3 162c5becc -> ecc24ec7f


[SPARK-16060][SQL][FOLLOW-UP] add a wrapper solution for vectorized orc reader

## What changes were proposed in this pull request?

This is mostly from https://github.com/apache/spark/pull/13775

The wrapper solution is pretty good for string/binary type, as the ORC column 
vector doesn't keep bytes in a continuous memory region, and has a significant 
overhead when copying the data to Spark columnar batch. For other cases, the 
wrapper solution is almost same with the current solution.

I think we can treat the wrapper solution as a baseline and keep improving the 
writing to Spark solution.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <[email protected]>

Closes #20205 from cloud-fan/orc.

(cherry picked from commit eaac60a1e20e29084b7151ffca964cfaa5ba99d1)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecc24ec7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecc24ec7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecc24ec7

Branch: refs/heads/branch-2.3
Commit: ecc24ec7fcf1225e5030a1c1224dca9a99138830
Parents: 162c5be
Author: Wenchen Fan <[email protected]>
Authored: Wed Jan 10 15:16:27 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Jan 10 15:16:53 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   7 +
 .../datasources/orc/OrcColumnVector.java        | 251 +++++++++++++++++++
 .../datasources/orc/OrcColumnarBatchReader.java | 106 +++++---
 .../datasources/orc/OrcFileFormat.scala         |   6 +-
 .../spark/sql/hive/orc/OrcReadBenchmark.scala   | 236 ++++++++++-------
 5 files changed, 490 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ecc24ec7/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 74949db..36e802a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -391,6 +391,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val ORC_COPY_BATCH_TO_SPARK = buildConf("spark.sql.orc.copyBatchToSpark")
+    .doc("Whether or not to copy the ORC columnar batch to Spark columnar 
batch in the " +
+      "vectorized ORC reader.")
+    .internal()
+    .booleanConf
+    .createWithDefault(false)
+
   val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
     .doc("When true, enable filter pushdown for ORC files.")
     .booleanConf

http://git-wip-us.apache.org/repos/asf/spark/blob/ecc24ec7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
new file mode 100644
index 0000000..f94c55d
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import java.math.BigDecimal;
+
+import org.apache.orc.storage.ql.exec.vector.*;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A column vector class wrapping Hive's ColumnVector. Because Spark 
ColumnarBatch only accepts
+ * Spark's vectorized.ColumnVector, this column vector is used to adapt Hive 
ColumnVector with
+ * Spark ColumnarVector.
+ */
+public class OrcColumnVector extends 
org.apache.spark.sql.vectorized.ColumnVector {
+  private ColumnVector baseData;
+  private LongColumnVector longData;
+  private DoubleColumnVector doubleData;
+  private BytesColumnVector bytesData;
+  private DecimalColumnVector decimalData;
+  private TimestampColumnVector timestampData;
+  final private boolean isTimestamp;
+
+  private int batchSize;
+
+  OrcColumnVector(DataType type, ColumnVector vector) {
+    super(type);
+
+    if (type instanceof TimestampType) {
+      isTimestamp = true;
+    } else {
+      isTimestamp = false;
+    }
+
+    baseData = vector;
+    if (vector instanceof LongColumnVector) {
+      longData = (LongColumnVector) vector;
+    } else if (vector instanceof DoubleColumnVector) {
+      doubleData = (DoubleColumnVector) vector;
+    } else if (vector instanceof BytesColumnVector) {
+      bytesData = (BytesColumnVector) vector;
+    } else if (vector instanceof DecimalColumnVector) {
+      decimalData = (DecimalColumnVector) vector;
+    } else if (vector instanceof TimestampColumnVector) {
+      timestampData = (TimestampColumnVector) vector;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public int numNulls() {
+    if (baseData.isRepeating) {
+      if (baseData.isNull[0]) {
+        return batchSize;
+      } else {
+        return 0;
+      }
+    } else if (baseData.noNulls) {
+      return 0;
+    } else {
+      int count = 0;
+      for (int i = 0; i < batchSize; i++) {
+        if (baseData.isNull[i]) count++;
+      }
+      return count;
+    }
+  }
+
+  /* A helper method to get the row index in a column. */
+  private int getRowIndex(int rowId) {
+    return baseData.isRepeating ? 0 : rowId;
+  }
+
+  @Override
+  public boolean isNullAt(int rowId) {
+    return baseData.isNull[getRowIndex(rowId)];
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    return longData.vector[getRowIndex(rowId)] == 1;
+  }
+
+  @Override
+  public boolean[] getBooleans(int rowId, int count) {
+    boolean[] res = new boolean[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getBoolean(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    return (byte) longData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public byte[] getBytes(int rowId, int count) {
+    byte[] res = new byte[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getByte(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    return (short) longData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public short[] getShorts(int rowId, int count) {
+    short[] res = new short[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getShort(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    return (int) longData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public int[] getInts(int rowId, int count) {
+    int[] res = new int[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getInt(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    int index = getRowIndex(rowId);
+    if (isTimestamp) {
+      return timestampData.time[index] * 1000 + timestampData.nanos[index] / 
1000;
+    } else {
+      return longData.vector[index];
+    }
+  }
+
+  @Override
+  public long[] getLongs(int rowId, int count) {
+    long[] res = new long[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getLong(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    return (float) doubleData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public float[] getFloats(int rowId, int count) {
+    float[] res = new float[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getFloat(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    return doubleData.vector[getRowIndex(rowId)];
+  }
+
+  @Override
+  public double[] getDoubles(int rowId, int count) {
+    double[] res = new double[count];
+    for (int i = 0; i < count; i++) {
+      res[i] = getDouble(rowId + i);
+    }
+    return res;
+  }
+
+  @Override
+  public int getArrayLength(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getArrayOffset(int rowId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    BigDecimal data = 
decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue();
+    return Decimal.apply(data, precision, scale);
+  }
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    int index = getRowIndex(rowId);
+    BytesColumnVector col = bytesData;
+    return UTF8String.fromBytes(col.vector[index], col.start[index], 
col.length[index]);
+  }
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    int index = getRowIndex(rowId);
+    byte[] binary = new byte[bytesData.length[index]];
+    System.arraycopy(bytesData.vector[index], bytesData.start[index], binary, 
0, binary.length);
+    return binary;
+  }
+
+  @Override
+  public org.apache.spark.sql.vectorized.ColumnVector arrayData() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public org.apache.spark.sql.vectorized.ColumnVector getChildColumn(int 
ordinal) {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ecc24ec7/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
index 5c28d0e..36fdf2b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
@@ -51,13 +51,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
 
   /**
-   * The default size of batch. We use this value for both ORC and Spark 
consistently
-   * because they have different default values like the following.
+   * The default size of batch. We use this value for ORC reader to make it 
consistent with Spark's
+   * columnar batch, because their default batch sizes are different like the 
following:
    *
    * - ORC's VectorizedRowBatch.DEFAULT_SIZE = 1024
    * - Spark's ColumnarBatch.DEFAULT_BATCH_SIZE = 4 * 1024
    */
-  public static final int DEFAULT_SIZE = 4 * 1024;
+  private static final int DEFAULT_SIZE = 4 * 1024;
 
   // ORC File Reader
   private Reader reader;
@@ -82,13 +82,18 @@ public class OrcColumnarBatchReader extends 
RecordReader<Void, ColumnarBatch> {
   // Writable column vectors of the result columnar batch.
   private WritableColumnVector[] columnVectors;
 
-  /**
-   * The memory mode of the columnarBatch
-   */
+  // The wrapped ORC column vectors. It should be null if `copyToSpark` is 
true.
+  private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers;
+
+  // The memory mode of the columnarBatch
   private final MemoryMode MEMORY_MODE;
 
-  public OrcColumnarBatchReader(boolean useOffHeap) {
+  // Whether or not to copy the ORC columnar batch to Spark columnar batch.
+  private final boolean copyToSpark;
+
+  public OrcColumnarBatchReader(boolean useOffHeap, boolean copyToSpark) {
     MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
+    this.copyToSpark = copyToSpark;
   }
 
 
@@ -167,27 +172,61 @@ public class OrcColumnarBatchReader extends 
RecordReader<Void, ColumnarBatch> {
     }
 
     int capacity = DEFAULT_SIZE;
-    if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
-      columnVectors = OffHeapColumnVector.allocateColumns(capacity, 
resultSchema);
-    } else {
-      columnVectors = OnHeapColumnVector.allocateColumns(capacity, 
resultSchema);
-    }
-    columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
 
-    if (partitionValues.numFields() > 0) {
-      int partitionIdx = requiredFields.length;
-      for (int i = 0; i < partitionValues.numFields(); i++) {
-        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], 
partitionValues, i);
-        columnVectors[i + partitionIdx].setIsConstant();
+    if (copyToSpark) {
+      if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
+        columnVectors = OffHeapColumnVector.allocateColumns(capacity, 
resultSchema);
+      } else {
+        columnVectors = OnHeapColumnVector.allocateColumns(capacity, 
resultSchema);
       }
-    }
 
-    // Initialize the missing columns once.
-    for (int i = 0; i < requiredFields.length; i++) {
-      if (requestedColIds[i] == -1) {
-        columnVectors[i].putNulls(0, columnarBatch.capacity());
-        columnVectors[i].setIsConstant();
+      // Initialize the missing columns once.
+      for (int i = 0; i < requiredFields.length; i++) {
+        if (requestedColIds[i] == -1) {
+          columnVectors[i].putNulls(0, capacity);
+          columnVectors[i].setIsConstant();
+        }
+      }
+
+      if (partitionValues.numFields() > 0) {
+        int partitionIdx = requiredFields.length;
+        for (int i = 0; i < partitionValues.numFields(); i++) {
+          ColumnVectorUtils.populate(columnVectors[i + partitionIdx], 
partitionValues, i);
+          columnVectors[i + partitionIdx].setIsConstant();
+        }
+      }
+
+      columnarBatch = new ColumnarBatch(resultSchema, columnVectors, capacity);
+    } else {
+      // Just wrap the ORC column vector instead of copying it to Spark column 
vector.
+      orcVectorWrappers = new 
org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
+
+      for (int i = 0; i < requiredFields.length; i++) {
+        DataType dt = requiredFields[i].dataType();
+        int colId = requestedColIds[i];
+        // Initialize the missing columns once.
+        if (colId == -1) {
+          OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
+          missingCol.putNulls(0, capacity);
+          missingCol.setIsConstant();
+          orcVectorWrappers[i] = missingCol;
+        } else {
+          orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
+        }
       }
+
+      if (partitionValues.numFields() > 0) {
+        int partitionIdx = requiredFields.length;
+        for (int i = 0; i < partitionValues.numFields(); i++) {
+          DataType dt = partitionSchema.fields()[i].dataType();
+          OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, 
dt);
+          ColumnVectorUtils.populate(partitionCol, partitionValues, i);
+          partitionCol.setIsConstant();
+          orcVectorWrappers[partitionIdx + i] = partitionCol;
+        }
+      }
+
+      columnarBatch = new ColumnarBatch(resultSchema, orcVectorWrappers, 
capacity);
     }
   }
 
@@ -196,17 +235,26 @@ public class OrcColumnarBatchReader extends 
RecordReader<Void, ColumnarBatch> {
    * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch 
columns.
    */
   private boolean nextBatch() throws IOException {
-    for (WritableColumnVector vector : columnVectors) {
-      vector.reset();
-    }
-    columnarBatch.setNumRows(0);
-
     recordReader.nextBatch(batch);
     int batchSize = batch.size;
     if (batchSize == 0) {
       return false;
     }
     columnarBatch.setNumRows(batchSize);
+
+    if (!copyToSpark) {
+      for (int i = 0; i < requiredFields.length; i++) {
+        if (requestedColIds[i] != -1) {
+          ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
+        }
+      }
+      return true;
+    }
+
+    for (WritableColumnVector vector : columnVectors) {
+      vector.reset();
+    }
+
     for (int i = 0; i < requiredFields.length; i++) {
       StructField field = requiredFields[i];
       WritableColumnVector toColumn = columnVectors[i];

http://git-wip-us.apache.org/repos/asf/spark/blob/ecc24ec7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index b8bacfa..2dd314d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
@@ -150,6 +151,7 @@ class OrcFileFormat
     val sqlConf = sparkSession.sessionState.conf
     val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
     val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
+    val copyToSpark = 
sparkSession.sessionState.conf.getConf(SQLConf.ORC_COPY_BATCH_TO_SPARK)
 
     val broadcastedConf =
       sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
@@ -183,8 +185,8 @@ class OrcFileFormat
 
         val taskContext = Option(TaskContext.get())
         if (enableVectorizedReader) {
-          val batchReader =
-            new OrcColumnarBatchReader(enableOffHeapColumnVector && 
taskContext.isDefined)
+          val batchReader = new OrcColumnarBatchReader(
+            enableOffHeapColumnVector && taskContext.isDefined, copyToSpark)
           batchReader.initialize(fileSplit, taskAttemptContext)
           batchReader.initBatch(
             reader.getSchema,

http://git-wip-us.apache.org/repos/asf/spark/blob/ecc24ec7/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
index 37ed846..bf6efa7 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
@@ -86,7 +86,7 @@ object OrcReadBenchmark {
   }
 
   def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
-    val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column 
Scan", values)
+    val benchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", 
values)
 
     withTempPath { dir =>
       withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
@@ -95,61 +95,73 @@ object OrcReadBenchmark {
 
         prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id 
FROM t1"))
 
-        sqlBenchmark.addCase("Native ORC MR") { _ =>
+        benchmark.addCase("Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
           }
         }
 
-        sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+        benchmark.addCase("Native ORC Vectorized") { _ =>
           spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
         }
 
-        sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         SQL Single TINYINT Column Scan:          Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1192 / 1221         13.2 
         75.8       1.0X
-        Native ORC Vectorized                          161 /  170         97.5 
         10.3       7.4X
-        Hive built-in ORC                             1399 / 1413         11.2 
         89.0       0.9X
+        Native ORC MR                                 1135 / 1171         13.9 
         72.2       1.0X
+        Native ORC Vectorized                          152 /  163        103.4 
          9.7       7.5X
+        Native ORC Vectorized with copy                149 /  162        105.4 
          9.5       7.6X
+        Hive built-in ORC                             1380 / 1384         11.4 
         87.7       0.8X
 
         SQL Single SMALLINT Column Scan:         Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1287 / 1333         12.2 
         81.8       1.0X
-        Native ORC Vectorized                          164 /  172         95.6 
         10.5       7.8X
-        Hive built-in ORC                             1629 / 1650          9.7 
        103.6       0.8X
+        Native ORC MR                                 1182 / 1244         13.3 
         75.2       1.0X
+        Native ORC Vectorized                          145 /  156        108.7 
          9.2       8.2X
+        Native ORC Vectorized with copy                148 /  158        106.4 
          9.4       8.0X
+        Hive built-in ORC                             1591 / 1636          9.9 
        101.2       0.7X
 
         SQL Single INT Column Scan:              Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1304 / 1388         12.1 
         82.9       1.0X
-        Native ORC Vectorized                          227 /  240         69.3 
         14.4       5.7X
-        Hive built-in ORC                             1866 / 1867          8.4 
        118.6       0.7X
+        Native ORC MR                                 1271 / 1271         12.4 
         80.8       1.0X
+        Native ORC Vectorized                          206 /  212         76.3 
         13.1       6.2X
+        Native ORC Vectorized with copy                200 /  213         78.8 
         12.7       6.4X
+        Hive built-in ORC                             1776 / 1787          8.9 
        112.9       0.7X
 
         SQL Single BIGINT Column Scan:           Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1331 / 1357         11.8 
         84.6       1.0X
-        Native ORC Vectorized                          289 /  297         54.4 
         18.4       4.6X
-        Hive built-in ORC                             1922 / 1929          8.2 
        122.2       0.7X
+        Native ORC MR                                 1344 / 1355         11.7 
         85.4       1.0X
+        Native ORC Vectorized                          258 /  268         61.0 
         16.4       5.2X
+        Native ORC Vectorized with copy                252 /  257         62.4 
         16.0       5.3X
+        Hive built-in ORC                             1818 / 1823          8.7 
        115.6       0.7X
 
         SQL Single FLOAT Column Scan:            Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1410 / 1428         11.2 
         89.7       1.0X
-        Native ORC Vectorized                          328 /  335         48.0 
         20.8       4.3X
-        Hive built-in ORC                             1929 / 2012          8.2 
        122.6       0.7X
+        Native ORC MR                                 1333 / 1352         11.8 
         84.8       1.0X
+        Native ORC Vectorized                          310 /  324         50.7 
         19.7       4.3X
+        Native ORC Vectorized with copy                312 /  320         50.4 
         19.9       4.3X
+        Hive built-in ORC                             1904 / 1918          8.3 
        121.0       0.7X
 
         SQL Single DOUBLE Column Scan:           Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1467 / 1485         10.7 
         93.3       1.0X
-        Native ORC Vectorized                          402 /  411         39.1 
         25.6       3.6X
-        Hive built-in ORC                             2023 / 2042          7.8 
        128.6       0.7X
+        Native ORC MR                                 1408 / 1585         11.2 
         89.5       1.0X
+        Native ORC Vectorized                          359 /  368         43.8 
         22.8       3.9X
+        Native ORC Vectorized with copy                364 /  371         43.2 
         23.2       3.9X
+        Hive built-in ORC                             1881 / 1954          8.4 
        119.6       0.7X
         */
-        sqlBenchmark.run()
+        benchmark.run()
       }
     }
   }
@@ -176,19 +188,26 @@ object OrcReadBenchmark {
           spark.sql("SELECT sum(c1), sum(length(c2)) FROM 
nativeOrcTable").collect()
         }
 
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(c1), sum(length(c2)) FROM 
nativeOrcTable").collect()
+          }
+        }
+
         benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(c1), sum(length(c2)) FROM 
hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         Int and String Scan:                     Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 2729 / 2744          3.8 
        260.2       1.0X
-        Native ORC Vectorized                         1318 / 1344          8.0 
        125.7       2.1X
-        Hive built-in ORC                             3731 / 3782          2.8 
        355.8       0.7X
+        Native ORC MR                                 2566 / 2592          4.1 
        244.7       1.0X
+        Native ORC Vectorized                         1098 / 1113          9.6 
        104.7       2.3X
+        Native ORC Vectorized with copy               1527 / 1593          6.9 
        145.6       1.7X
+        Hive built-in ORC                             3561 / 3705          2.9 
        339.6       0.7X
         */
         benchmark.run()
       }
@@ -205,63 +224,84 @@ object OrcReadBenchmark {
 
         prepareTable(dir, spark.sql("SELECT value % 2 AS p, value AS id FROM 
t1"), Some("p"))
 
-        benchmark.addCase("Read data column - Native ORC MR") { _ =>
+        benchmark.addCase("Data column - Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
           }
         }
 
-        benchmark.addCase("Read data column - Native ORC Vectorized") { _ =>
+        benchmark.addCase("Data column - Native ORC Vectorized") { _ =>
           spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
         }
 
-        benchmark.addCase("Read data column - Hive built-in ORC") { _ =>
+        benchmark.addCase("Data column - Native ORC Vectorized with copy") { _ 
=>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Data column - Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
         }
 
-        benchmark.addCase("Read partition column - Native ORC MR") { _ =>
+        benchmark.addCase("Partition column - Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
           }
         }
 
-        benchmark.addCase("Read partition column - Native ORC Vectorized") { _ 
=>
+        benchmark.addCase("Partition column - Native ORC Vectorized") { _ =>
           spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
         }
 
-        benchmark.addCase("Read partition column - Hive built-in ORC") { _ =>
+        benchmark.addCase("Partition column - Native ORC Vectorized with 
copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(p) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Partition column - Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(p) FROM hiveOrcTable").collect()
         }
 
-        benchmark.addCase("Read both columns - Native ORC MR") { _ =>
+        benchmark.addCase("Both columns - Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
           }
         }
 
-        benchmark.addCase("Read both columns - Native ORC Vectorized") { _ =>
+        benchmark.addCase("Both columns - Native ORC Vectorized") { _ =>
           spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
         }
 
-        benchmark.addCase("Read both columns - Hive built-in ORC") { _ =>
+        benchmark.addCase("Both column - Native ORC Vectorized with copy") { _ 
=>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(p), sum(id) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Both columns - Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(p), sum(id) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         Partitioned Table:                       Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Read data column - Native ORC MR               1531 / 1536         
10.3          97.4       1.0X
-        Read data column - Native ORC Vectorized        295 /  298         
53.3          18.8       5.2X
-        Read data column - Hive built-in ORC           2125 / 2126          
7.4         135.1       0.7X
-        Read partition column - Native ORC MR          1049 / 1062         
15.0          66.7       1.5X
-        Read partition column - Native ORC Vectorized    54 /   57        
290.1           3.4      28.2X
-        Read partition column - Hive built-in ORC      1282 / 1291         
12.3          81.5       1.2X
-        Read both columns - Native ORC MR              1594 / 1598          
9.9         101.3       1.0X
-        Read both columns - Native ORC Vectorized       332 /  336         
47.4          21.1       4.6X
-        Read both columns - Hive built-in ORC          2145 / 2187          
7.3         136.4       0.7X
+        Data only - Native ORC MR                      1447 / 1457         
10.9          92.0       1.0X
+        Data only - Native ORC Vectorized               256 /  266         
61.4          16.3       5.6X
+        Data only - Native ORC Vectorized with copy     263 /  273         
59.8          16.7       5.5X
+        Data only - Hive built-in ORC                  1960 / 1988          
8.0         124.6       0.7X
+        Partition only - Native ORC MR                 1039 / 1043         
15.1          66.0       1.4X
+        Partition only - Native ORC Vectorized           48 /   53        
326.6           3.1      30.1X
+        Partition only - Native ORC Vectorized with copy 48 /   53        
328.4           3.0      30.2X
+        Partition only - Hive built-in ORC             1234 / 1242         
12.7          78.4       1.2X
+        Both columns - Native ORC MR                   1465 / 1475         
10.7          93.1       1.0X
+        Both columns - Native ORC Vectorized            292 /  301         
53.9          18.6       5.0X
+        Both column - Native ORC Vectorized with copy   348 /  354         
45.1          22.2       4.2X
+        Both columns - Hive built-in ORC               2051 / 2060          
7.7         130.4       0.7X
         */
         benchmark.run()
       }
@@ -287,19 +327,26 @@ object OrcReadBenchmark {
           spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
         }
 
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT sum(length(c1)) FROM nativeOrcTable").collect()
+          }
+        }
+
         benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql("SELECT sum(length(c1)) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         Repeated String:                         Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1325 / 1328          7.9 
        126.4       1.0X
-        Native ORC Vectorized                          320 /  330         32.8 
         30.5       4.1X
-        Hive built-in ORC                             1971 / 1972          5.3 
        188.0       0.7X
+        Native ORC MR                                 1271 / 1278          8.3 
        121.2       1.0X
+        Native ORC Vectorized                          200 /  212         52.4 
         19.1       6.4X
+        Native ORC Vectorized with copy                342 /  347         30.7 
         32.6       3.7X
+        Hive built-in ORC                             1874 / 2105          5.6 
        178.7       0.7X
         */
         benchmark.run()
       }
@@ -331,32 +378,42 @@ object OrcReadBenchmark {
             "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
         }
 
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql("SELECT SUM(LENGTH(c2)) FROM nativeOrcTable " +
+              "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
+          }
+        }
+
         benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql("SELECT SUM(LENGTH(c2)) FROM hiveOrcTable " +
             "WHERE c1 IS NOT NULL AND c2 IS NOT NULL").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
         String with Nulls Scan (0.0%):           Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 2553 / 2554          4.1 
        243.4       1.0X
-        Native ORC Vectorized                          953 /  954         11.0 
         90.9       2.7X
-        Hive built-in ORC                             3875 / 3898          2.7 
        369.6       0.7X
+        Native ORC MR                                 2394 / 2886          4.4 
        228.3       1.0X
+        Native ORC Vectorized                          699 /  729         15.0 
         66.7       3.4X
+        Native ORC Vectorized with copy                959 / 1025         10.9 
         91.5       2.5X
+        Hive built-in ORC                             3899 / 3901          2.7 
        371.9       0.6X
 
         String with Nulls Scan (0.5%):           Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 2389 / 2408          4.4 
        227.8       1.0X
-        Native ORC Vectorized                         1208 / 1209          8.7 
        115.2       2.0X
-        Hive built-in ORC                             2940 / 2952          3.6 
        280.4       0.8X
+        Native ORC MR                                 2234 / 2255          4.7 
        213.1       1.0X
+        Native ORC Vectorized                          854 /  869         12.3 
         81.4       2.6X
+        Native ORC Vectorized with copy               1099 / 1128          9.5 
        104.8       2.0X
+        Hive built-in ORC                             2767 / 2793          3.8 
        263.9       0.8X
 
         String with Nulls Scan (0.95%):          Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1295 / 1311          8.1 
        123.5       1.0X
-        Native ORC Vectorized                          449 /  457         23.4 
         42.8       2.9X
-        Hive built-in ORC                             1649 / 1660          6.4 
        157.3       0.8X
+        Native ORC MR                                 1166 / 1202          9.0 
        111.2       1.0X
+        Native ORC Vectorized                          338 /  345         31.1 
         32.2       3.5X
+        Native ORC Vectorized with copy                418 /  428         25.1 
         39.9       2.8X
+        Hive built-in ORC                             1730 / 1761          6.1 
        164.9       0.7X
         */
         benchmark.run()
       }
@@ -364,7 +421,7 @@ object OrcReadBenchmark {
   }
 
   def columnsBenchmark(values: Int, width: Int): Unit = {
-    val sqlBenchmark = new Benchmark(s"SQL Single Column Scan from $width 
columns", values)
+    val benchmark = new Benchmark(s"Single Column Scan from $width columns", 
values)
 
     withTempPath { dir =>
       withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
@@ -376,43 +433,52 @@ object OrcReadBenchmark {
 
         prepareTable(dir, spark.sql("SELECT * FROM t1"))
 
-        sqlBenchmark.addCase("Native ORC MR") { _ =>
+        benchmark.addCase("Native ORC MR") { _ =>
           withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") {
             spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
           }
         }
 
-        sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+        benchmark.addCase("Native ORC Vectorized") { _ =>
           spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
         }
 
-        sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+        benchmark.addCase("Native ORC Vectorized with copy") { _ =>
+          withSQLConf(SQLConf.ORC_COPY_BATCH_TO_SPARK.key -> "true") {
+            spark.sql(s"SELECT sum(c$middle) FROM nativeOrcTable").collect()
+          }
+        }
+
+        benchmark.addCase("Hive built-in ORC") { _ =>
           spark.sql(s"SELECT sum(c$middle) FROM hiveOrcTable").collect()
         }
 
         /*
-        Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.2
-        Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+        Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.13.1
+        Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
 
-        SQL Single Column Scan from 100 columns: Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
+        Single Column Scan from 100 columns:     Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 1103 / 1124          1.0 
       1052.0       1.0X
-        Native ORC Vectorized                           92 /  100         11.4 
         87.9      12.0X
-        Hive built-in ORC                              383 /  390          2.7 
        365.4       2.9X
+        Native ORC MR                                 1050 / 1053          1.0 
       1001.1       1.0X
+        Native ORC Vectorized                           95 /  101         11.0 
         90.9      11.0X
+        Native ORC Vectorized with copy                 95 /  102         11.0 
         90.9      11.0X
+        Hive built-in ORC                              348 /  358          3.0 
        331.8       3.0X
 
-        SQL Single Column Scan from 200 columns: Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
+        Single Column Scan from 200 columns:     Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 2245 / 2250          0.5 
       2141.0       1.0X
-        Native ORC Vectorized                          157 /  165          6.7 
        150.2      14.3X
-        Hive built-in ORC                              587 /  593          1.8 
        559.4       3.8X
+        Native ORC MR                                 2099 / 2108          0.5 
       2002.1       1.0X
+        Native ORC Vectorized                          179 /  187          5.8 
        171.1      11.7X
+        Native ORC Vectorized with copy                176 /  188          6.0 
        167.6      11.9X
+        Hive built-in ORC                              562 /  581          1.9 
        535.9       3.7X
 
-        SQL Single Column Scan from 300 columns: Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
+        Single Column Scan from 300 columns:     Best/Avg Time(ms)    
Rate(M/s)   Per Row(ns)   Relative
         
------------------------------------------------------------------------------------------------
-        Native ORC MR                                 3343 / 3350          0.3 
       3188.3       1.0X
-        Native ORC Vectorized                          265 /  280          3.9 
        253.2      12.6X
-        Hive built-in ORC                              828 /  842          1.3 
        789.8       4.0X
+        Native ORC MR                                 3221 / 3246          0.3 
       3071.4       1.0X
+        Native ORC Vectorized                          312 /  322          3.4 
        298.0      10.3X
+        Native ORC Vectorized with copy                306 /  320          3.4 
        291.6      10.5X
+        Hive built-in ORC                              815 /  824          1.3 
        777.3       4.0X
         */
-        sqlBenchmark.run()
+        benchmark.run()
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to