Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d24d13179 -> bae4449ad


[SPARK-23426][SQL] Use `hive` ORC impl and disable PPD for Spark 2.3.0

## What changes were proposed in this pull request?

To prevent any regressions, this PR changes ORC implementation to `hive` by 
default like Spark 2.2.X.
Users can enable `native` ORC. Also, ORC PPD is also restored to `false` like 
Spark 2.2.X.

![orc_section](https://user-images.githubusercontent.com/9700541/36221575-57a1d702-1173-11e8-89fe-dca5842f4ca7.png)

## How was this patch tested?

Pass all test cases.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #20610 from dongjoon-hyun/SPARK-ORC-DISABLE.

(cherry picked from commit 2f0498d1e85a53b60da6a47d20bbdf56b42b7dcb)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bae4449a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bae4449a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bae4449a

Branch: refs/heads/branch-2.3
Commit: bae4449ad836a64db853e297d33bfd1a725faa0b
Parents: d24d131
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Thu Feb 15 08:55:39 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Feb 15 08:55:59 2018 -0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   | 52 +++++++++-----------
 .../org/apache/spark/sql/internal/SQLConf.scala |  6 +--
 .../spark/sql/FileBasedDataSourceSuite.scala    | 17 ++++++-
 .../sql/streaming/FileStreamSinkSuite.scala     | 13 +++++
 .../sql/streaming/FileStreamSourceSuite.scala   | 13 +++++
 5 files changed, 68 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bae4449a/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index cf9529a..91e4367 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1004,6 +1004,29 @@ Configuration of Parquet can be done using the `setConf` 
method on `SparkSession
 </tr>
 </table>
 
+## ORC Files
+
+Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC file 
format for ORC files.
+To do that, the following configurations are newly added. The vectorized 
reader is used for the
+native ORC tables (e.g., the ones created using the clause `USING ORC`) when 
`spark.sql.orc.impl`
+is set to `native` and `spark.sql.orc.enableVectorizedReader` is set to 
`true`. For the Hive ORC
+serde tables (e.g., the ones created using the clause `USING HIVE OPTIONS 
(fileFormat 'ORC')`),
+the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is 
also set to `true`.
+
+<table class="table">
+  <tr><th><b>Property 
Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
+  <tr>
+    <td><code>spark.sql.orc.impl</code></td>
+    <td><code>hive</code></td>
+    <td>The name of ORC implementation. It can be one of <code>native</code> 
and <code>hive</code>. <code>native</code> means the native ORC support that is 
built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1.</td>
+  </tr>
+  <tr>
+    <td><code>spark.sql.orc.enableVectorizedReader</code></td>
+    <td><code>true</code></td>
+    <td>Enables vectorized orc decoding in <code>native</code> implementation. 
If <code>false</code>, a new non-vectorized ORC reader is used in 
<code>native</code> implementation. For <code>hive</code> implementation, this 
is ignored.</td>
+  </tr>
+</table>
+
 ## JSON Datasets
 <div class="codetabs">
 
@@ -1776,35 +1799,6 @@ working with timestamps in `pandas_udf`s to get the best 
performance, see
 
 ## Upgrading From Spark SQL 2.2 to 2.3
 
-  - Since Spark 2.3, Spark supports a vectorized ORC reader with a new ORC 
file format for ORC files. To do that, the following configurations are newly 
added or change their default values. The vectorized reader is used for the 
native ORC tables (e.g., the ones created using the clause `USING ORC`) when 
`spark.sql.orc.impl` is set to `native` and 
`spark.sql.orc.enableVectorizedReader` is set to `true`. For the Hive ORC serde 
table (e.g., the ones created using the clause `USING HIVE OPTIONS (fileFormat 
'ORC')`), the vectorized reader is used when 
`spark.sql.hive.convertMetastoreOrc` is set to `true`.
-
-    - New configurations
-
-    <table class="table">
-      <tr><th><b>Property 
Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
-      <tr>
-        <td><code>spark.sql.orc.impl</code></td>
-        <td><code>native</code></td>
-        <td>The name of ORC implementation. It can be one of 
<code>native</code> and <code>hive</code>. <code>native</code> means the native 
ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in 
Hive 1.2.1 which is used prior to Spark 2.3.</td>
-      </tr>
-      <tr>
-        <td><code>spark.sql.orc.enableVectorizedReader</code></td>
-        <td><code>true</code></td>
-        <td>Enables vectorized orc decoding in <code>native</code> 
implementation. If <code>false</code>, a new non-vectorized ORC reader is used 
in <code>native</code> implementation. For <code>hive</code> implementation, 
this is ignored.</td>
-      </tr>
-    </table>
-
-    - Changed configurations
-
-    <table class="table">
-      <tr><th><b>Property 
Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
-      <tr>
-        <td><code>spark.sql.orc.filterPushdown</code></td>
-        <td><code>true</code></td>
-        <td>Enables filter pushdown for ORC files. It is <code>false</code> by 
default prior to Spark 2.3.</td>
-      </tr>
-    </table>
-
   - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when 
the referenced columns only include the internal corrupt record column (named 
`_corrupt_record` by default). For example, 
`spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()`
 and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. 
Instead, you can cache or save the parsed results and then send the same query. 
For example, `val df = spark.read.schema(schema).json(file).cache()` and then 
`df.filter($"_corrupt_record".isNotNull).count()`.
   - The `percentile_approx` function previously accepted numeric type input 
and output double type results. Now it supports date type, timestamp type and 
numeric types as input types. The result type is also changed to be the same as 
the input type, which is more reasonable for percentiles.
   - Since Spark 2.3, the Join/Filter's deterministic predicates that are after 
the first non-deterministic predicates are also pushed down/through the child 
operators, if possible. In prior Spark versions, these filters are not eligible 
for predicate pushdown.

http://git-wip-us.apache.org/repos/asf/spark/blob/bae4449a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 54d4f63..47fcf34 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -393,11 +393,11 @@ object SQLConf {
 
   val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl")
     .doc("When native, use the native version of ORC support instead of the 
ORC library in Hive " +
-      "1.2.1. It is 'hive' by default prior to Spark 2.3.")
+      "1.2.1. It is 'hive' by default.")
     .internal()
     .stringConf
     .checkValues(Set("hive", "native"))
-    .createWithDefault("native")
+    .createWithDefault("hive")
 
   val ORC_VECTORIZED_READER_ENABLED = 
buildConf("spark.sql.orc.enableVectorizedReader")
     .doc("Enables vectorized orc decoding.")
@@ -414,7 +414,7 @@ object SQLConf {
   val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
     .doc("When true, enable filter pushdown for ORC files.")
     .booleanConf
-    .createWithDefault(true)
+    .createWithDefault(false)
 
   val HIVE_VERIFY_PARTITION_PATH = 
buildConf("spark.sql.hive.verifyPartitionPath")
     .doc("When true, check all the partition paths under the table\'s root 
directory " +

http://git-wip-us.apache.org/repos/asf/spark/blob/bae4449a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 2e33236..b5d4c55 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -20,14 +20,29 @@ package org.apache.spark.sql
 import java.io.FileNotFoundException
 
 import org.apache.hadoop.fs.Path
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 
-class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
+
+class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterAll {
   import testImplicits._
 
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
+    } finally {
+      super.afterAll()
+    }
+  }
+
   private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", 
"text")
   private val nameWithSpecialChars = "sp&cial%c hars"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bae4449a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 8c4e1fd..ba48bc1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -33,6 +33,19 @@ import org.apache.spark.util.Utils
 class FileStreamSinkSuite extends StreamTest {
   import testImplicits._
 
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
+    } finally {
+      super.afterAll()
+    }
+  }
+
   test("unpartitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val df = inputData.toDF()

http://git-wip-us.apache.org/repos/asf/spark/blob/bae4449a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 5bb0f4d..d4bd9c7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -207,6 +207,19 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema 
}.head
   }
 
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.sessionState.conf.setConf(SQLConf.ORC_IMPLEMENTATION, "native")
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      spark.sessionState.conf.unsetConf(SQLConf.ORC_IMPLEMENTATION)
+    } finally {
+      super.afterAll()
+    }
+  }
+
   // ============= Basic parameter exists tests ================
 
   test("FileStreamSource schema: no path") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to