This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 62a3868b93a [SPARK-45481][SQL] Introduce a mapper for parquet 
compression codecs
62a3868b93a is described below

commit 62a3868b93a51a4dc424e87d6fd06df914158e1b
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Fri Oct 27 10:48:44 2023 +0800

    [SPARK-45481][SQL] Introduce a mapper for parquet compression codecs
    
    ### What changes were proposed in this pull request?
    Currently, Spark supported all the parquet compression codecs, but the 
parquet supported compression codecs and spark supported are not completely 
one-on-one due to Spark introduce a fake compression codecs none.
    On the other hand, there are a lot of magic strings copy from parquet 
compression codecs. This issue lead to developers need to manually maintain its 
consistency. It is easy to make mistakes and reduce development efficiency.
    
    The `CompressionCodecName`, refer: 
https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
    
    ### Why are the changes needed?
    Let developers easy to use parquet compression codecs.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    Introduce a new class.
    
    ### How was this patch tested?
    Exists test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43308 from beliefer/SPARK-45481.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Jiaan Geng <belie...@163.com>
---
 .../parquet/ParquetCompressionCodec.java           | 62 ++++++++++++++++++++++
 .../datasources/parquet/ParquetOptions.scala       | 15 ++----
 .../BuiltInDataSourceWriteBenchmark.scala          |  6 ++-
 .../benchmark/DataSourceReadBenchmark.scala        |  9 ++--
 .../benchmark/FilterPushdownBenchmark.scala        |  5 +-
 .../execution/benchmark/TPCDSQueryBenchmark.scala  |  6 ++-
 .../datasources/FileSourceCodecSuite.scala         | 12 +++--
 .../ParquetCompressionCodecPrecedenceSuite.scala   | 41 +++++++-------
 .../datasources/parquet/ParquetIOSuite.scala       | 22 ++++----
 .../spark/sql/hive/CompressionCodecSuite.scala     | 23 ++++++--
 .../apache/spark/sql/hive/HiveParquetSuite.scala   |  6 ++-
 .../spark/sql/hive/execution/HiveDDLSuite.scala    | 10 ++--
 .../sql/sources/ParquetHadoopFsRelationSuite.scala |  3 +-
 13 files changed, 155 insertions(+), 65 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
new file mode 100644
index 00000000000..1a37c7a33f2
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * A mapper class from Spark supported parquet compression codecs to parquet 
compression codecs.
+ */
+public enum ParquetCompressionCodec {
+  NONE(CompressionCodecName.UNCOMPRESSED),
+  UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED),
+  SNAPPY(CompressionCodecName.SNAPPY),
+  GZIP(CompressionCodecName.GZIP),
+  LZO(CompressionCodecName.LZO),
+  BROTLI(CompressionCodecName.BROTLI),
+  LZ4(CompressionCodecName.LZ4),
+  LZ4_RAW(CompressionCodecName.LZ4_RAW),
+  ZSTD(CompressionCodecName.ZSTD);
+
+  private final CompressionCodecName compressionCodec;
+
+  ParquetCompressionCodec(CompressionCodecName compressionCodec) {
+    this.compressionCodec = compressionCodec;
+  }
+
+  public CompressionCodecName getCompressionCodec() {
+    return this.compressionCodec;
+  }
+
+  public static ParquetCompressionCodec fromString(String s) {
+    return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT));
+  }
+
+  public static final List<ParquetCompressionCodec> availableCodecs =
+    Arrays.asList(
+      ParquetCompressionCodec.UNCOMPRESSED,
+      ParquetCompressionCodec.SNAPPY,
+      ParquetCompressionCodec.GZIP,
+      ParquetCompressionCodec.ZSTD,
+      ParquetCompressionCodec.LZ4,
+      ParquetCompressionCodec.LZ4_RAW);
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 559a994319d..ae110fdd0d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.util.Locale
 
 import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
 import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -88,16 +87,10 @@ class ParquetOptions(
 
 object ParquetOptions extends DataSourceOptions {
   // The parquet compression short names
-  private val shortParquetCompressionCodecNames = Map(
-    "none" -> CompressionCodecName.UNCOMPRESSED,
-    "uncompressed" -> CompressionCodecName.UNCOMPRESSED,
-    "snappy" -> CompressionCodecName.SNAPPY,
-    "gzip" -> CompressionCodecName.GZIP,
-    "lzo" -> CompressionCodecName.LZO,
-    "brotli" -> CompressionCodecName.BROTLI,
-    "lz4" -> CompressionCodecName.LZ4,
-    "lz4_raw" -> CompressionCodecName.LZ4_RAW,
-    "zstd" -> CompressionCodecName.ZSTD)
+  private val shortParquetCompressionCodecNames =
+    ParquetCompressionCodec.values().map {
+      codec => codec.name().toLowerCase(Locale.ROOT) -> 
codec.getCompressionCodec
+    }.toMap
 
   def getParquetCompressionCodecName(name: String): String = {
     shortParquetCompressionCodecNames(name).name()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index 4752787c501..ba3228878ec 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -16,9 +16,12 @@
  */
 package org.apache.spark.sql.execution.benchmark
 
+import java.util.Locale
+
 import org.apache.parquet.column.ParquetProperties
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
 import org.apache.spark.sql.internal.SQLConf
 
 /**
@@ -51,7 +54,8 @@ object BuiltInDataSourceWriteBenchmark extends 
DataSourceWriteBenchmark {
       mainArgs
     }
 
-    spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy")
+    spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
+      ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
     spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")
 
     formats.foreach { format =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index 771f944f1f6..a8736c04151 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution.benchmark
 
 import java.io.File
+import java.util.Locale
 
 import scala.jdk.CollectionConverters._
 import scala.util.Random
@@ -28,7 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils}
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
-import 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, 
VectorizedParquetRecordReader}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.ColumnVector
@@ -99,15 +100,17 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
     spark.read.json(dir).createOrReplaceTempView("jsonTable")
   }
 
+  val parquetCodec = 
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)
+
   private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): 
Unit = {
-    df.mode("overwrite").option("compression", "snappy").parquet(dir)
+    df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
     spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table")
   }
 
   private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): 
Unit = {
     withSQLConf(ParquetOutputFormat.WRITER_VERSION ->
       ParquetProperties.WriterVersion.PARQUET_2_0.toString) {
-      df.mode("overwrite").option("compression", "snappy").parquet(dir)
+      df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
       spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table")
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 4862571b9c1..10781ec90fa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -18,12 +18,14 @@
 package org.apache.spark.sql.execution.benchmark
 
 import java.io.File
+import java.util.Locale
 
 import scala.util.Random
 
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
 import org.apache.spark.sql.functions.{monotonically_increasing_id, 
timestamp_seconds}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
@@ -50,7 +52,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
       .setIfMissing("spark.driver.memory", "3g")
       .setIfMissing("spark.executor.memory", "3g")
       .setIfMissing("orc.compression", "snappy")
-      .setIfMissing("spark.sql.parquet.compression.codec", "snappy")
+      .setIfMissing("spark.sql.parquet.compression.codec",
+        ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
 
     SparkSession.builder().config(conf).getOrCreate()
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index c26272d1dcd..f01cfea62a9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.benchmark
 
+import java.util.Locale
+
 import scala.util.Try
 
 import org.apache.spark.SparkConf
@@ -29,6 +31,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
@@ -51,7 +54,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with 
Logging {
     val conf = new SparkConf()
       .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
       .setAppName("test-sql-context")
-      .set("spark.sql.parquet.compression.codec", "snappy")
+      .set("spark.sql.parquet.compression.codec",
+        ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
       .set("spark.sql.shuffle.partitions", 
System.getProperty("spark.sql.shuffle.partitions", "4"))
       .set("spark.driver.memory", "3g")
       .set("spark.executor.memory", "3g")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index 11e9f4665a9..1f1805a02d7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -17,7 +17,12 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.util.Locale
+
+import scala.jdk.CollectionConverters._
+
 import org.apache.spark.sql.QueryTest
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 
@@ -58,9 +63,10 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
   // Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
   // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is 
not available
   // on Maven Central.
-  override protected def availableCodecs: Seq[String] = {
-    Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw")
-  }
+  override protected def availableCodecs: Seq[String] =
+    (ParquetCompressionCodec.NONE +:
+      ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq))
+      .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq)
 }
 
 class OrcCodecSuite extends FileSourceCodecSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
index 1a387b7d2de..28ea430635a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
+import java.util.Locale
 
 import scala.jdk.CollectionConverters._
 
@@ -29,18 +30,9 @@ import org.apache.spark.sql.test.SharedSparkSession
 
 class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with 
SharedSparkSession {
   test("Test `spark.sql.parquet.compression.codec` config") {
-    Seq(
-      "NONE",
-      "UNCOMPRESSED",
-      "SNAPPY",
-      "GZIP",
-      "LZO",
-      "LZ4",
-      "BROTLI",
-      "ZSTD",
-      "LZ4_RAW").foreach { c =>
-      withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
-        val expected = if (c == "NONE") "UNCOMPRESSED" else c
+    ParquetCompressionCodec.values().foreach { codec =>
+      withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
+        val expected = codec.getCompressionCodec.name()
         val option = new ParquetOptions(Map.empty[String, String], 
spark.sessionState.conf)
         assert(option.compressionCodecClassName == expected)
       }
@@ -49,25 +41,32 @@ class ParquetCompressionCodecPrecedenceSuite extends 
ParquetTest with SharedSpar
 
   test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet 
in right order.") {
     // When "compression" is configured, it should be the first choice.
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
-      val props = Map("compression" -> "uncompressed", 
ParquetOutputFormat.COMPRESSION -> "gzip")
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+      ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+      val props = Map(
+        "compression" -> 
ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT),
+        ParquetOutputFormat.COMPRESSION ->
+          ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
       val option = new ParquetOptions(props, spark.sessionState.conf)
-      assert(option.compressionCodecClassName == "UNCOMPRESSED")
+      assert(option.compressionCodecClassName == 
ParquetCompressionCodec.UNCOMPRESSED.name)
     }
 
     // When "compression" is not configured, "parquet.compression" should be 
the preferred choice.
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
-      val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+      ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+      val props = Map(ParquetOutputFormat.COMPRESSION ->
+        ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
       val option = new ParquetOptions(props, spark.sessionState.conf)
-      assert(option.compressionCodecClassName == "GZIP")
+      assert(option.compressionCodecClassName == 
ParquetCompressionCodec.GZIP.name)
     }
 
     // When both "compression" and "parquet.compression" are not configured,
     // spark.sql.parquet.compression.codec should be the right choice.
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+      ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
       val props = Map.empty[String, String]
       val option = new ParquetOptions(props, spark.sessionState.conf)
-      assert(option.compressionCodecClassName == "SNAPPY")
+      assert(option.compressionCodecClassName == 
ParquetCompressionCodec.SNAPPY.name)
     }
   }
 
@@ -113,8 +112,8 @@ class ParquetCompressionCodecPrecedenceSuite extends 
ParquetTest with SharedSpar
   }
 
   test("Create parquet table with compression") {
+    val codecs = ParquetCompressionCodec.availableCodecs.asScala.map(_.name())
     Seq(true, false).foreach { isPartitioned =>
-      val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", 
"LZ4_RAW")
       codecs.foreach { compressionCodec =>
         checkCompressionCodec(compressionCodec, isPartitioned)
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 95a45e52bfb..a5d5f8ce30f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -34,8 +34,6 @@ import org.apache.parquet.example.data.Group
 import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory}
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.example.ExampleParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
 import org.apache.parquet.io.api.Binary
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
@@ -845,7 +843,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 
     val data = (0 until 10).map(i => (i, i.toString))
 
-    def checkCompressionCodec(codec: CompressionCodecName): Unit = {
+    def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = {
       withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
         withParquetFile(data) { path =>
           
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT))
 {
@@ -857,12 +855,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
 
     // Checks default compression codec
     checkCompressionCodec(
-      
CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
+      
ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
 
-    checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
-    checkCompressionCodec(CompressionCodecName.GZIP)
-    checkCompressionCodec(CompressionCodecName.SNAPPY)
-    checkCompressionCodec(CompressionCodecName.ZSTD)
+    
ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_))
   }
 
   private def createParquetWriter(
@@ -878,7 +873,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
       .withDictionaryEncoding(dictionaryEnabled)
       .withType(schema)
       .withWriterVersion(PARQUET_1_0)
-      .withCompressionCodec(GZIP)
+      .withCompressionCodec(ParquetCompressionCodec.GZIP.getCompressionCodec)
       .withRowGroupSize(1024 * 1024)
       .withPageSize(pageSize)
       .withDictionaryPageSize(dictionaryPageSize)
@@ -1507,9 +1502,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more 
case-insensitive") {
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
-      val option = new ParquetOptions(Map("Compression" -> "uncompressed"), 
spark.sessionState.conf)
-      assert(option.compressionCodecClassName == "UNCOMPRESSED")
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+      ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+      val option = new ParquetOptions(
+        Map("Compression" -> 
ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)),
+        spark.sessionState.conf)
+      assert(option.compressionCodecClassName == 
ParquetCompressionCodec.UNCOMPRESSED.name)
     }
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
index a5d11f6e0e1..df28e7b4485 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.execution.datasources.orc.OrcOptions
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, 
ParquetTest}
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, 
ParquetOptions, ParquetTest}
 import org.apache.spark.sql.hive.orc.OrcFileOperator
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -289,8 +289,14 @@ class CompressionCodecSuite extends TestHiveSingleton with 
ParquetTest with Befo
 
   test("both table-level and session-level compression are set") {
     checkForTableWithCompressProp("parquet",
-      tableCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"),
-      sessionCompressCodecs = List("SNAPPY", "GZIP", "SNAPPY"))
+      tableCompressCodecs = List(
+        ParquetCompressionCodec.UNCOMPRESSED.name,
+        ParquetCompressionCodec.SNAPPY.name,
+        ParquetCompressionCodec.GZIP.name),
+      sessionCompressCodecs = List(
+        ParquetCompressionCodec.SNAPPY.name,
+        ParquetCompressionCodec.GZIP.name,
+        ParquetCompressionCodec.SNAPPY.name))
     checkForTableWithCompressProp("orc",
       tableCompressCodecs =
         List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, 
CompressionKind.ZLIB.name),
@@ -301,7 +307,10 @@ class CompressionCodecSuite extends TestHiveSingleton with 
ParquetTest with Befo
   test("table-level compression is not set but session-level compressions is 
set ") {
     checkForTableWithCompressProp("parquet",
       tableCompressCodecs = List.empty,
-      sessionCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"))
+      sessionCompressCodecs = List(
+        ParquetCompressionCodec.UNCOMPRESSED.name,
+        ParquetCompressionCodec.SNAPPY.name,
+        ParquetCompressionCodec.GZIP.name))
     checkForTableWithCompressProp("orc",
       tableCompressCodecs = List.empty,
       sessionCompressCodecs =
@@ -339,7 +348,11 @@ class CompressionCodecSuite extends TestHiveSingleton with 
ParquetTest with Befo
   }
 
   test("test table containing mixed compression codec") {
-    checkTableWriteWithCompressionCodecs("parquet", List("UNCOMPRESSED", 
"SNAPPY", "GZIP"))
+    checkTableWriteWithCompressionCodecs("parquet",
+      List(
+        ParquetCompressionCodec.UNCOMPRESSED.name,
+        ParquetCompressionCodec.SNAPPY.name,
+        ParquetCompressionCodec.GZIP.name))
     checkTableWriteWithCompressionCodecs(
       "orc",
       List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, 
CompressionKind.ZLIB.name))
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 2a3c77a56e6..45dd8da6e02 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.hive
 
 import java.time.{Duration, Period}
 import java.time.temporal.ChronoUnit
+import java.util.Locale
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, 
ParquetTest}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 
@@ -157,7 +158,8 @@ class HiveParquetSuite extends QueryTest
 
   test("SPARK-37098: Alter table properties should invalidate cache") {
     // specify the compression in case we change it in future
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+      ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) {
       withTempPath { dir =>
         withTable("t") {
           sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION 
'${dir.getCanonicalPath}'")
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 05d2ca1e210..78365d25c89 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, 
ParquetFooterReader}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
 import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, 
CONVERT_METASTORE_PARQUET}
@@ -2709,7 +2709,9 @@ class HiveDDLSuite
     assert(compression === actualCompression)
   }
 
-  Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, 
compression) =>
+  Seq(
+    ("orc", "ZLIB"),
+    ("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case 
(fileFormat, compression) =>
     test(s"SPARK-22158 convertMetastore should not ignore table property - 
$fileFormat") {
       withSQLConf(CONVERT_METASTORE_ORC.key -> "true", 
CONVERT_METASTORE_PARQUET.key -> "true") {
         withTable("t") {
@@ -2804,14 +2806,14 @@ class HiveDDLSuite
           assert(DDLUtils.isHiveTable(table))
           assert(table.storage.serde.get.contains("parquet"))
           val properties = table.properties
-          assert(properties.get("parquet.compression") == Some("GZIP"))
+          assert(properties.get("parquet.compression") == 
Some(ParquetCompressionCodec.GZIP.name))
           assert(spark.table("t").collect().isEmpty)
 
           sql("INSERT INTO t SELECT 1")
           checkAnswer(spark.table("t"), Row(1))
           val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
 
-          assertCompression(maybeFile, "parquet", "GZIP")
+          assertCompression(maybeFile, "parquet", 
ParquetCompressionCodec.GZIP.name)
         }
       }
     }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 18e8401ee3d..84ee19e62bc 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.CatalogUtils
 import 
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -199,7 +200,7 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
   }
 
   test("SPARK-13543: Support for specifying compression codec for Parquet via 
option()") {
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") {
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> 
ParquetCompressionCodec.UNCOMPRESSED.name) {
       withTempPath { dir =>
         val path = s"${dir.getCanonicalPath}/table1"
         val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to