This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c381480b4ff5 [SPARK-45481][SQL][FOLLOWUP] Add `lowerCaseName` for 
`ParquetCompressionCodec`
c381480b4ff5 is described below

commit c381480b4ff5f82c510329f76e8f74310255dd5d
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Oct 30 11:32:10 2023 -0700

    [SPARK-45481][SQL][FOLLOWUP] Add `lowerCaseName` for 
`ParquetCompressionCodec`
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/43308 introduces a mapper for parquet 
compression codecs.
    There are many place call `toLowerCase(Locale.ROOT)` to get the lower case 
name of parquet compression codecs.
    
    ### Why are the changes needed?
    Add `lowerCaseName` for `ParquetCompressionCodec`.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New class.
    
    ### How was this patch tested?
    Exists test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43571 from beliefer/SPARK-45481_followup.
    
    Authored-by: Jiaan Geng <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../parquet/ParquetCompressionCodec.java           | 10 ++++++++++
 .../datasources/parquet/ParquetOptions.scala       |  2 +-
 .../BuiltInDataSourceWriteBenchmark.scala          |  4 +---
 .../benchmark/DataSourceReadBenchmark.scala        | 11 ++++++-----
 .../benchmark/FilterPushdownBenchmark.scala        |  3 +--
 .../execution/benchmark/TPCDSQueryBenchmark.scala  |  5 +----
 .../datasources/FileSourceCodecSuite.scala         |  5 +----
 .../ParquetCompressionCodecPrecedenceSuite.scala   | 22 ++++++++++------------
 .../datasources/parquet/ParquetIOSuite.scala       |  6 +++---
 .../apache/spark/sql/hive/HiveParquetSuite.scala   |  5 ++---
 10 files changed, 36 insertions(+), 37 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
index 1a37c7a33f20..32d9701bdbb2 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.datasources.parquet;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
@@ -51,6 +53,14 @@ public enum ParquetCompressionCodec {
     return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT));
   }
 
+  private static final Map<String, String> codecNameMap =
+    Arrays.stream(ParquetCompressionCodec.values()).collect(
+      Collectors.toMap(codec -> codec.name(), codec -> 
codec.name().toLowerCase(Locale.ROOT)));
+
+  public String lowerCaseName() {
+    return codecNameMap.get(this.name());
+  }
+
   public static final List<ParquetCompressionCodec> availableCodecs =
     Arrays.asList(
       ParquetCompressionCodec.UNCOMPRESSED,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index ae110fdd0d3a..1f022f8d369e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -89,7 +89,7 @@ object ParquetOptions extends DataSourceOptions {
   // The parquet compression short names
   private val shortParquetCompressionCodecNames =
     ParquetCompressionCodec.values().map {
-      codec => codec.name().toLowerCase(Locale.ROOT) -> 
codec.getCompressionCodec
+      codec => codec.lowerCaseName() -> codec.getCompressionCodec
     }.toMap
 
   def getParquetCompressionCodecName(name: String): String = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index a8b448c8dc93..e3b1f467bafa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.sql.execution.benchmark
 
-import java.util.Locale
-
 import org.apache.parquet.column.ParquetProperties
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
@@ -56,7 +54,7 @@ object BuiltInDataSourceWriteBenchmark extends 
DataSourceWriteBenchmark {
     }
 
     spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
-      ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
+      ParquetCompressionCodec.SNAPPY.lowerCaseName())
     spark.conf.set(SQLConf.ORC_COMPRESSION.key,
       OrcCompressionCodec.SNAPPY.lowerCaseName())
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index bc271235f9dc..74043bac49a3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution.benchmark
 
 import java.io.File
-import java.util.Locale
 
 import scala.jdk.CollectionConverters._
 import scala.util.Random
@@ -101,17 +100,19 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
     spark.read.json(dir).createOrReplaceTempView("jsonTable")
   }
 
-  val parquetCodec = 
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)
-
   private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): 
Unit = {
-    df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
+    df.mode("overwrite")
+      .option("compression", ParquetCompressionCodec.SNAPPY.lowerCaseName())
+      .parquet(dir)
     spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table")
   }
 
   private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): 
Unit = {
     withSQLConf(ParquetOutputFormat.WRITER_VERSION ->
       ParquetProperties.WriterVersion.PARQUET_2_0.toString) {
-      df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
+      df.mode("overwrite")
+        .option("compression", ParquetCompressionCodec.SNAPPY.lowerCaseName())
+        .parquet(dir)
       spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table")
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 1a788bf5f2ff..ed5cdcd29959 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.benchmark
 
 import java.io.File
-import java.util.Locale
 
 import scala.util.Random
 
@@ -54,7 +53,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
       .setIfMissing("spark.executor.memory", "3g")
       .setIfMissing("orc.compression", 
OrcCompressionCodec.SNAPPY.lowerCaseName())
       .setIfMissing("spark.sql.parquet.compression.codec",
-        ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
+        ParquetCompressionCodec.SNAPPY.lowerCaseName())
 
     SparkSession.builder().config(conf).getOrCreate()
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index f01cfea62a95..721997d84e1a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.benchmark
 
-import java.util.Locale
-
 import scala.util.Try
 
 import org.apache.spark.SparkConf
@@ -54,8 +52,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with 
Logging {
     val conf = new SparkConf()
       .setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
       .setAppName("test-sql-context")
-      .set("spark.sql.parquet.compression.codec",
-        ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
+      .set("spark.sql.parquet.compression.codec", 
ParquetCompressionCodec.SNAPPY.lowerCaseName())
       .set("spark.sql.shuffle.partitions", 
System.getProperty("spark.sql.shuffle.partitions", "4"))
       .set("spark.driver.memory", "3g")
       .set("spark.executor.memory", "3g")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index e4d9e13c2b08..e1448f6f8c43 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.util.Locale
-
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.QueryTest
@@ -66,8 +64,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
   // on Maven Central.
   override protected def availableCodecs: Seq[String] =
     (ParquetCompressionCodec.NONE +:
-      ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq))
-      .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq)
+      
ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq)).map(_.lowerCaseName())
 }
 
 class OrcCodecSuite extends FileSourceCodecSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
index 28ea430635a2..040c127a43d1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
-import java.util.Locale
 
 import scala.jdk.CollectionConverters._
 
@@ -41,29 +40,28 @@ class ParquetCompressionCodecPrecedenceSuite extends 
ParquetTest with SharedSpar
 
   test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet 
in right order.") {
     // When "compression" is configured, it should be the first choice.
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
-      ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+    withSQLConf(
+      SQLConf.PARQUET_COMPRESSION.key -> 
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
       val props = Map(
-        "compression" -> 
ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT),
-        ParquetOutputFormat.COMPRESSION ->
-          ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
+        "compression" -> ParquetCompressionCodec.UNCOMPRESSED.lowerCaseName(),
+        ParquetOutputFormat.COMPRESSION -> 
ParquetCompressionCodec.GZIP.lowerCaseName())
       val option = new ParquetOptions(props, spark.sessionState.conf)
       assert(option.compressionCodecClassName == 
ParquetCompressionCodec.UNCOMPRESSED.name)
     }
 
     // When "compression" is not configured, "parquet.compression" should be 
the preferred choice.
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
-      ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
-      val props = Map(ParquetOutputFormat.COMPRESSION ->
-        ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
+    withSQLConf(
+      SQLConf.PARQUET_COMPRESSION.key -> 
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
+      val props =
+        Map(ParquetOutputFormat.COMPRESSION -> 
ParquetCompressionCodec.GZIP.lowerCaseName())
       val option = new ParquetOptions(props, spark.sessionState.conf)
       assert(option.compressionCodecClassName == 
ParquetCompressionCodec.GZIP.name)
     }
 
     // When both "compression" and "parquet.compression" are not configured,
     // spark.sql.parquet.compression.codec should be the right choice.
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
-      ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+    withSQLConf(
+      SQLConf.PARQUET_COMPRESSION.key -> 
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
       val props = Map.empty[String, String]
       val option = new ParquetOptions(props, spark.sessionState.conf)
       assert(option.compressionCodecClassName == 
ParquetCompressionCodec.SNAPPY.name)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 726ae87b5ce1..c064f49c3122 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1502,10 +1502,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more 
case-insensitive") {
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
-      ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+    withSQLConf(
+      SQLConf.PARQUET_COMPRESSION.key -> 
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
       val option = new ParquetOptions(
-        Map("Compression" -> 
ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)),
+        Map("Compression" -> 
ParquetCompressionCodec.UNCOMPRESSED.lowerCaseName()),
         spark.sessionState.conf)
       assert(option.compressionCodecClassName == 
ParquetCompressionCodec.UNCOMPRESSED.name)
     }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 45dd8da6e020..2152a7e30002 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
 
 import java.time.{Duration, Period}
 import java.time.temporal.ChronoUnit
-import java.util.Locale
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, 
ParquetTest}
@@ -158,8 +157,8 @@ class HiveParquetSuite extends QueryTest
 
   test("SPARK-37098: Alter table properties should invalidate cache") {
     // specify the compression in case we change it in future
-    withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
-      ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) {
+    withSQLConf(
+      SQLConf.PARQUET_COMPRESSION.key -> 
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
       withTempPath { dir =>
         withTable("t") {
           sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION 
'${dir.getCanonicalPath}'")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to