This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c381480b4ff5 [SPARK-45481][SQL][FOLLOWUP] Add `lowerCaseName` for
`ParquetCompressionCodec`
c381480b4ff5 is described below
commit c381480b4ff5f82c510329f76e8f74310255dd5d
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Oct 30 11:32:10 2023 -0700
[SPARK-45481][SQL][FOLLOWUP] Add `lowerCaseName` for
`ParquetCompressionCodec`
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/43308 introduces a mapper for parquet
compression codecs.
There are many place call `toLowerCase(Locale.ROOT)` to get the lower case
name of parquet compression codecs.
### Why are the changes needed?
Add `lowerCaseName` for `ParquetCompressionCodec`.
### Does this PR introduce _any_ user-facing change?
'No'.
New class.
### How was this patch tested?
Exists test cases.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43571 from beliefer/SPARK-45481_followup.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../parquet/ParquetCompressionCodec.java | 10 ++++++++++
.../datasources/parquet/ParquetOptions.scala | 2 +-
.../BuiltInDataSourceWriteBenchmark.scala | 4 +---
.../benchmark/DataSourceReadBenchmark.scala | 11 ++++++-----
.../benchmark/FilterPushdownBenchmark.scala | 3 +--
.../execution/benchmark/TPCDSQueryBenchmark.scala | 5 +----
.../datasources/FileSourceCodecSuite.scala | 5 +----
.../ParquetCompressionCodecPrecedenceSuite.scala | 22 ++++++++++------------
.../datasources/parquet/ParquetIOSuite.scala | 6 +++---
.../apache/spark/sql/hive/HiveParquetSuite.scala | 5 ++---
10 files changed, 36 insertions(+), 37 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
index 1a37c7a33f20..32d9701bdbb2 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.datasources.parquet;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -51,6 +53,14 @@ public enum ParquetCompressionCodec {
return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT));
}
+ private static final Map<String, String> codecNameMap =
+ Arrays.stream(ParquetCompressionCodec.values()).collect(
+ Collectors.toMap(codec -> codec.name(), codec ->
codec.name().toLowerCase(Locale.ROOT)));
+
+ public String lowerCaseName() {
+ return codecNameMap.get(this.name());
+ }
+
public static final List<ParquetCompressionCodec> availableCodecs =
Arrays.asList(
ParquetCompressionCodec.UNCOMPRESSED,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index ae110fdd0d3a..1f022f8d369e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -89,7 +89,7 @@ object ParquetOptions extends DataSourceOptions {
// The parquet compression short names
private val shortParquetCompressionCodecNames =
ParquetCompressionCodec.values().map {
- codec => codec.name().toLowerCase(Locale.ROOT) ->
codec.getCompressionCodec
+ codec => codec.lowerCaseName() -> codec.getCompressionCodec
}.toMap
def getParquetCompressionCodecName(name: String): String = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index a8b448c8dc93..e3b1f467bafa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.benchmark
-import java.util.Locale
-
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat
@@ -56,7 +54,7 @@ object BuiltInDataSourceWriteBenchmark extends
DataSourceWriteBenchmark {
}
spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
- ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
+ ParquetCompressionCodec.SNAPPY.lowerCaseName())
spark.conf.set(SQLConf.ORC_COMPRESSION.key,
OrcCompressionCodec.SNAPPY.lowerCaseName())
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index bc271235f9dc..74043bac49a3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.benchmark
import java.io.File
-import java.util.Locale
import scala.jdk.CollectionConverters._
import scala.util.Random
@@ -101,17 +100,19 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
spark.read.json(dir).createOrReplaceTempView("jsonTable")
}
- val parquetCodec =
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)
-
private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String):
Unit = {
- df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
+ df.mode("overwrite")
+ .option("compression", ParquetCompressionCodec.SNAPPY.lowerCaseName())
+ .parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table")
}
private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String):
Unit = {
withSQLConf(ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString) {
- df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
+ df.mode("overwrite")
+ .option("compression", ParquetCompressionCodec.SNAPPY.lowerCaseName())
+ .parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table")
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 1a788bf5f2ff..ed5cdcd29959 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.benchmark
import java.io.File
-import java.util.Locale
import scala.util.Random
@@ -54,7 +53,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
.setIfMissing("spark.executor.memory", "3g")
.setIfMissing("orc.compression",
OrcCompressionCodec.SNAPPY.lowerCaseName())
.setIfMissing("spark.sql.parquet.compression.codec",
- ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
+ ParquetCompressionCodec.SNAPPY.lowerCaseName())
SparkSession.builder().config(conf).getOrCreate()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index f01cfea62a95..721997d84e1a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.benchmark
-import java.util.Locale
-
import scala.util.Try
import org.apache.spark.SparkConf
@@ -54,8 +52,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with
Logging {
val conf = new SparkConf()
.setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
.setAppName("test-sql-context")
- .set("spark.sql.parquet.compression.codec",
- ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
+ .set("spark.sql.parquet.compression.codec",
ParquetCompressionCodec.SNAPPY.lowerCaseName())
.set("spark.sql.shuffle.partitions",
System.getProperty("spark.sql.shuffle.partitions", "4"))
.set("spark.driver.memory", "3g")
.set("spark.executor.memory", "3g")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index e4d9e13c2b08..e1448f6f8c43 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources
-import java.util.Locale
-
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.QueryTest
@@ -66,8 +64,7 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// on Maven Central.
override protected def availableCodecs: Seq[String] =
(ParquetCompressionCodec.NONE +:
- ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq))
- .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq)
+
ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq)).map(_.lowerCaseName())
}
class OrcCodecSuite extends FileSourceCodecSuite {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
index 28ea430635a2..040c127a43d1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
-import java.util.Locale
import scala.jdk.CollectionConverters._
@@ -41,29 +40,28 @@ class ParquetCompressionCodecPrecedenceSuite extends
ParquetTest with SharedSpar
test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet
in right order.") {
// When "compression" is configured, it should be the first choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
- ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ withSQLConf(
+ SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
val props = Map(
- "compression" ->
ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT),
- ParquetOutputFormat.COMPRESSION ->
- ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
+ "compression" -> ParquetCompressionCodec.UNCOMPRESSED.lowerCaseName(),
+ ParquetOutputFormat.COMPRESSION ->
ParquetCompressionCodec.GZIP.lowerCaseName())
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName ==
ParquetCompressionCodec.UNCOMPRESSED.name)
}
// When "compression" is not configured, "parquet.compression" should be
the preferred choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
- ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
- val props = Map(ParquetOutputFormat.COMPRESSION ->
- ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
+ withSQLConf(
+ SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
+ val props =
+ Map(ParquetOutputFormat.COMPRESSION ->
ParquetCompressionCodec.GZIP.lowerCaseName())
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName ==
ParquetCompressionCodec.GZIP.name)
}
// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
- ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ withSQLConf(
+ SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
assert(option.compressionCodecClassName ==
ParquetCompressionCodec.SNAPPY.name)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 726ae87b5ce1..c064f49c3122 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1502,10 +1502,10 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
test("SPARK-18433: Improve DataSource option keys to be more
case-insensitive") {
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
- ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ withSQLConf(
+ SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
val option = new ParquetOptions(
- Map("Compression" ->
ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)),
+ Map("Compression" ->
ParquetCompressionCodec.UNCOMPRESSED.lowerCaseName()),
spark.sessionState.conf)
assert(option.compressionCodecClassName ==
ParquetCompressionCodec.UNCOMPRESSED.name)
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 45dd8da6e020..2152a7e30002 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive
import java.time.{Duration, Period}
import java.time.temporal.ChronoUnit
-import java.util.Locale
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec,
ParquetTest}
@@ -158,8 +157,8 @@ class HiveParquetSuite extends QueryTest
test("SPARK-37098: Alter table properties should invalidate cache") {
// specify the compression in case we change it in future
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
- ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) {
+ withSQLConf(
+ SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.SNAPPY.lowerCaseName()) {
withTempPath { dir =>
withTable("t") {
sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION
'${dir.getCanonicalPath}'")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]