Repository: spark
Updated Branches:
  refs/heads/master be9a804f2 -> 7b7804142


[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 
'ParquetOptions', `parquet.compression` needs to be considered.

[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 
'ParquetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
Since Hive 1.1, Hive allows users to set parquet compression codec via 
table-level properties parquet.compression. See the JIRA: 
https://issues.apache.org/jira/browse/HIVE-7858 . We do support orc.compression 
for ORC. Thus, for external users, it is more straightforward to support both. 
See the stackflow question: 
https://stackoverflow.com/questions/36941122/spark-sql-ignores-parquet-compression-propertie-specified-in-tblproperties
In Spark side, our table-level compression conf compression was added by #11464 
since Spark 2.0.
We need to support both table-level conf. Users might also use session-level 
conf spark.sql.parquet.compression.codec. The priority rule will be like
If other compression codec configuration was found through hive or parquet, the 
precedence would be compression, parquet.compression, 
spark.sql.parquet.compression.codec. Acceptable values include: none, 
uncompressed, snappy, gzip, lzo.
The rule for Parquet is consistent with the ORC after the change.

Changes:
1.Increased acquiring 'compressionCodecClassName' from 
`parquet.compression`,and the precedence order is 
`compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just 
like what we do in `OrcOptions`.

2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in 
`ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it 
does not allowed to configured to "none".

3.Change `compressionCode` to `compressionCodecClassName`.

## How was this patch tested?
Add test.

Author: fjh100456 <fu.jinh...@zte.com.cn>

Closes #20076 from fjh100456/ParquetOptionIssue.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b780414
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b780414
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b780414

Branch: refs/heads/master
Commit: 7b78041423b6ee330def2336dfd1ff9ae8469c59
Parents: be9a804
Author: fjh100456 <fu.jinh...@zte.com.cn>
Authored: Sat Jan 6 18:19:57 2018 +0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Sat Jan 6 18:19:57 2018 +0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |   6 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  14 ++-
 .../datasources/parquet/ParquetOptions.scala    |  12 +-
 ...ParquetCompressionCodecPrecedenceSuite.scala | 122 +++++++++++++++++++
 4 files changed, 145 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b780414/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index b50f936..3ccaaf4 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -953,8 +953,10 @@ Configuration of Parquet can be done using the `setConf` 
method on `SparkSession
   <td><code>spark.sql.parquet.compression.codec</code></td>
   <td>snappy</td>
   <td>
-    Sets the compression codec use when writing Parquet files. Acceptable 
values include:
-    uncompressed, snappy, gzip, lzo.
+    Sets the compression codec used when writing Parquet files. If either 
`compression` or
+    `parquet.compression` is specified in the table-specific 
options/properties, the precedence would be
+    `compression`, `parquet.compression`, 
`spark.sql.parquet.compression.codec`. Acceptable values include:
+    none, uncompressed, snappy, gzip, lzo.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/7b780414/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 80b8965..7d1217d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -325,11 +325,13 @@ object SQLConf {
     .createWithDefault(false)
 
   val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
-    .doc("Sets the compression codec use when writing Parquet files. 
Acceptable values include: " +
-      "uncompressed, snappy, gzip, lzo.")
+    .doc("Sets the compression codec used when writing Parquet files. If 
either `compression` or" +
+      "`parquet.compression` is specified in the table-specific 
options/properties, the precedence" +
+      "would be `compression`, `parquet.compression`, 
`spark.sql.parquet.compression.codec`." +
+      "Acceptable values include: none, uncompressed, snappy, gzip, lzo.")
     .stringConf
     .transform(_.toLowerCase(Locale.ROOT))
-    .checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
+    .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
     .createWithDefault("snappy")
 
   val PARQUET_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.parquet.filterPushdown")
@@ -366,8 +368,10 @@ object SQLConf {
       .createWithDefault(true)
 
   val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
-    .doc("Sets the compression codec use when writing ORC files. Acceptable 
values include: " +
-      "none, uncompressed, snappy, zlib, lzo.")
+    .doc("Sets the compression codec used when writing ORC files. If either 
`compression` or" +
+      "`orc.compress` is specified in the table-specific options/properties, 
the precedence" +
+      "would be `compression`, `orc.compress`, 
`spark.sql.orc.compression.codec`." +
+      "Acceptable values include: none, uncompressed, snappy, zlib, lzo.")
     .stringConf
     .transform(_.toLowerCase(Locale.ROOT))
     .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))

http://git-wip-us.apache.org/repos/asf/spark/blob/7b780414/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 772d456..ef67ea7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.util.Locale
 
+import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -42,8 +43,15 @@ private[parquet] class ParquetOptions(
    * Acceptable values are defined in [[shortParquetCompressionCodecNames]].
    */
   val compressionCodecClassName: String = {
-    val codecName = parameters.getOrElse("compression",
-      sqlConf.parquetCompressionCodec).toLowerCase(Locale.ROOT)
+    // `compression`, `parquet.compression`(i.e., 
ParquetOutputFormat.COMPRESSION), and
+    // `spark.sql.parquet.compression.codec`
+    // are in order of precedence from highest to lowest.
+    val parquetCompressionConf = 
parameters.get(ParquetOutputFormat.COMPRESSION)
+    val codecName = parameters
+      .get("compression")
+      .orElse(parquetCompressionConf)
+      .getOrElse(sqlConf.parquetCompressionCodec)
+      .toLowerCase(Locale.ROOT)
     if (!shortParquetCompressionCodecNames.contains(codecName)) {
       val availableCodecs =
         shortParquetCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))

http://git-wip-us.apache.org/repos/asf/spark/blob/7b780414/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
new file mode 100644
index 0000000..ed8fd2b
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetOutputFormat
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with 
SharedSQLContext {
+  test("Test `spark.sql.parquet.compression.codec` config") {
+    Seq("NONE", "UNCOMPRESSED", "SNAPPY", "GZIP", "LZO").foreach { c =>
+      withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
+        val expected = if (c == "NONE") "UNCOMPRESSED" else c
+        val option = new ParquetOptions(Map.empty[String, String], 
spark.sessionState.conf)
+        assert(option.compressionCodecClassName == expected)
+      }
+    }
+  }
+
+  test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet 
in right order.") {
+    // When "compression" is configured, it should be the first choice.
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+      val props = Map("compression" -> "uncompressed", 
ParquetOutputFormat.COMPRESSION -> "gzip")
+      val option = new ParquetOptions(props, spark.sessionState.conf)
+      assert(option.compressionCodecClassName == "UNCOMPRESSED")
+    }
+
+    // When "compression" is not configured, "parquet.compression" should be 
the preferred choice.
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+      val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
+      val option = new ParquetOptions(props, spark.sessionState.conf)
+      assert(option.compressionCodecClassName == "GZIP")
+    }
+
+    // When both "compression" and "parquet.compression" are not configured,
+    // spark.sql.parquet.compression.codec should be the right choice.
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+      val props = Map.empty[String, String]
+      val option = new ParquetOptions(props, spark.sessionState.conf)
+      assert(option.compressionCodecClassName == "SNAPPY")
+    }
+  }
+
+  private def getTableCompressionCodec(path: String): Seq[String] = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val codecs = for {
+      footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
+      block <- footer.getParquetMetadata.getBlocks.asScala
+      column <- block.getColumns.asScala
+    } yield column.getCodec.name()
+    codecs.distinct
+  }
+
+  private def createTableWithCompression(
+      tableName: String,
+      isPartitioned: Boolean,
+      compressionCodec: String,
+      rootDir: File): Unit = {
+    val options =
+      s"""
+        
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
+        |'parquet.compression'='$compressionCodec')
+       """.stripMargin
+    val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
+    sql(
+      s"""
+        |CREATE TABLE $tableName USING Parquet $options $partitionCreate
+        |AS SELECT 1 AS col1, 2 AS p
+       """.stripMargin)
+  }
+
+  private def checkCompressionCodec(compressionCodec: String, isPartitioned: 
Boolean): Unit = {
+    withTempDir { tmpDir =>
+      val tempTableName = "TempParquetTable"
+      withTable(tempTableName) {
+        createTableWithCompression(tempTableName, isPartitioned, 
compressionCodec, tmpDir)
+        val partitionPath = if (isPartitioned) "p=2" else ""
+        val path = 
s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
+        val realCompressionCodecs = getTableCompressionCodec(path)
+        assert(realCompressionCodecs.forall(_ == compressionCodec))
+      }
+    }
+  }
+
+  test("Create parquet table with compression") {
+    Seq(true, false).foreach { isPartitioned =>
+      Seq("UNCOMPRESSED", "SNAPPY", "GZIP").foreach { compressionCodec =>
+        checkCompressionCodec(compressionCodec, isPartitioned)
+      }
+    }
+  }
+
+  test("Create table with unknown compression") {
+    Seq(true, false).foreach { isPartitioned =>
+      val exception = intercept[IllegalArgumentException] {
+        checkCompressionCodec("aa", isPartitioned)
+      }
+      assert(exception.getMessage.contains("Codec [aa] is not available"))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to