This is an automated email from the ASF dual-hosted git repository.
beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 62a3868b93a [SPARK-45481][SQL] Introduce a mapper for parquet
compression codecs
62a3868b93a is described below
commit 62a3868b93a51a4dc424e87d6fd06df914158e1b
Author: Jiaan Geng <[email protected]>
AuthorDate: Fri Oct 27 10:48:44 2023 +0800
[SPARK-45481][SQL] Introduce a mapper for parquet compression codecs
### What changes were proposed in this pull request?
Currently, Spark supported all the parquet compression codecs, but the
parquet supported compression codecs and spark supported are not completely
one-on-one due to Spark introduce a fake compression codecs none.
On the other hand, there are a lot of magic strings copy from parquet
compression codecs. This issue lead to developers need to manually maintain its
consistency. It is easy to make mistakes and reduce development efficiency.
The `CompressionCodecName`, refer:
https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
### Why are the changes needed?
Let developers easy to use parquet compression codecs.
### Does this PR introduce _any_ user-facing change?
'No'.
Introduce a new class.
### How was this patch tested?
Exists test cases.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43308 from beliefer/SPARK-45481.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Jiaan Geng <[email protected]>
---
.../parquet/ParquetCompressionCodec.java | 62 ++++++++++++++++++++++
.../datasources/parquet/ParquetOptions.scala | 15 ++----
.../BuiltInDataSourceWriteBenchmark.scala | 6 ++-
.../benchmark/DataSourceReadBenchmark.scala | 9 ++--
.../benchmark/FilterPushdownBenchmark.scala | 5 +-
.../execution/benchmark/TPCDSQueryBenchmark.scala | 6 ++-
.../datasources/FileSourceCodecSuite.scala | 12 +++--
.../ParquetCompressionCodecPrecedenceSuite.scala | 41 +++++++-------
.../datasources/parquet/ParquetIOSuite.scala | 22 ++++----
.../spark/sql/hive/CompressionCodecSuite.scala | 23 ++++++--
.../apache/spark/sql/hive/HiveParquetSuite.scala | 6 ++-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 10 ++--
.../sql/sources/ParquetHadoopFsRelationSuite.scala | 3 +-
13 files changed, 155 insertions(+), 65 deletions(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
new file mode 100644
index 00000000000..1a37c7a33f2
--- /dev/null
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * A mapper class from Spark supported parquet compression codecs to parquet
compression codecs.
+ */
+public enum ParquetCompressionCodec {
+ NONE(CompressionCodecName.UNCOMPRESSED),
+ UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED),
+ SNAPPY(CompressionCodecName.SNAPPY),
+ GZIP(CompressionCodecName.GZIP),
+ LZO(CompressionCodecName.LZO),
+ BROTLI(CompressionCodecName.BROTLI),
+ LZ4(CompressionCodecName.LZ4),
+ LZ4_RAW(CompressionCodecName.LZ4_RAW),
+ ZSTD(CompressionCodecName.ZSTD);
+
+ private final CompressionCodecName compressionCodec;
+
+ ParquetCompressionCodec(CompressionCodecName compressionCodec) {
+ this.compressionCodec = compressionCodec;
+ }
+
+ public CompressionCodecName getCompressionCodec() {
+ return this.compressionCodec;
+ }
+
+ public static ParquetCompressionCodec fromString(String s) {
+ return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT));
+ }
+
+ public static final List<ParquetCompressionCodec> availableCodecs =
+ Arrays.asList(
+ ParquetCompressionCodec.UNCOMPRESSED,
+ ParquetCompressionCodec.SNAPPY,
+ ParquetCompressionCodec.GZIP,
+ ParquetCompressionCodec.ZSTD,
+ ParquetCompressionCodec.LZ4,
+ ParquetCompressionCodec.LZ4_RAW);
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 559a994319d..ae110fdd0d3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Locale
import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
@@ -88,16 +87,10 @@ class ParquetOptions(
object ParquetOptions extends DataSourceOptions {
// The parquet compression short names
- private val shortParquetCompressionCodecNames = Map(
- "none" -> CompressionCodecName.UNCOMPRESSED,
- "uncompressed" -> CompressionCodecName.UNCOMPRESSED,
- "snappy" -> CompressionCodecName.SNAPPY,
- "gzip" -> CompressionCodecName.GZIP,
- "lzo" -> CompressionCodecName.LZO,
- "brotli" -> CompressionCodecName.BROTLI,
- "lz4" -> CompressionCodecName.LZ4,
- "lz4_raw" -> CompressionCodecName.LZ4_RAW,
- "zstd" -> CompressionCodecName.ZSTD)
+ private val shortParquetCompressionCodecNames =
+ ParquetCompressionCodec.values().map {
+ codec => codec.name().toLowerCase(Locale.ROOT) ->
codec.getCompressionCodec
+ }.toMap
def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index 4752787c501..ba3228878ec 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -16,9 +16,12 @@
*/
package org.apache.spark.sql.execution.benchmark
+import java.util.Locale
+
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat
+import
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
/**
@@ -51,7 +54,8 @@ object BuiltInDataSourceWriteBenchmark extends
DataSourceWriteBenchmark {
mainArgs
}
- spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy")
+ spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
+ ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")
formats.foreach { format =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index 771f944f1f6..a8736c04151 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.benchmark
import java.io.File
+import java.util.Locale
import scala.jdk.CollectionConverters._
import scala.util.Random
@@ -28,7 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
+import
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec,
VectorizedParquetRecordReader}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnVector
@@ -99,15 +100,17 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
spark.read.json(dir).createOrReplaceTempView("jsonTable")
}
+ val parquetCodec =
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)
+
private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String):
Unit = {
- df.mode("overwrite").option("compression", "snappy").parquet(dir)
+ df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table")
}
private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String):
Unit = {
withSQLConf(ParquetOutputFormat.WRITER_VERSION ->
ParquetProperties.WriterVersion.PARQUET_2_0.toString) {
- df.mode("overwrite").option("compression", "snappy").parquet(dir)
+ df.mode("overwrite").option("compression", parquetCodec).parquet(dir)
spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table")
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 4862571b9c1..10781ec90fa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -18,12 +18,14 @@
package org.apache.spark.sql.execution.benchmark
import java.io.File
+import java.util.Locale
import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
+import
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.functions.{monotonically_increasing_id,
timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
@@ -50,7 +52,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
.setIfMissing("spark.driver.memory", "3g")
.setIfMissing("spark.executor.memory", "3g")
.setIfMissing("orc.compression", "snappy")
- .setIfMissing("spark.sql.parquet.compression.codec", "snappy")
+ .setIfMissing("spark.sql.parquet.compression.codec",
+ ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
SparkSession.builder().config(conf).getOrCreate()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
index c26272d1dcd..f01cfea62a9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.benchmark
+import java.util.Locale
+
import scala.util.Try
import org.apache.spark.SparkConf
@@ -29,6 +31,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -51,7 +54,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with
Logging {
val conf = new SparkConf()
.setMaster(System.getProperty("spark.sql.test.master", "local[1]"))
.setAppName("test-sql-context")
- .set("spark.sql.parquet.compression.codec", "snappy")
+ .set("spark.sql.parquet.compression.codec",
+ ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
.set("spark.sql.shuffle.partitions",
System.getProperty("spark.sql.shuffle.partitions", "4"))
.set("spark.driver.memory", "3g")
.set("spark.executor.memory", "3g")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index 11e9f4665a9..1f1805a02d7 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -17,7 +17,12 @@
package org.apache.spark.sql.execution.datasources
+import java.util.Locale
+
+import scala.jdk.CollectionConverters._
+
import org.apache.spark.sql.QueryTest
+import
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@@ -58,9 +63,10 @@ class ParquetCodecSuite extends FileSourceCodecSuite {
// Exclude "lzo" because it is GPL-licenced so not included in Hadoop.
// Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is
not available
// on Maven Central.
- override protected def availableCodecs: Seq[String] = {
- Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw")
- }
+ override protected def availableCodecs: Seq[String] =
+ (ParquetCompressionCodec.NONE +:
+ ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq))
+ .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq)
}
class OrcCodecSuite extends FileSourceCodecSuite {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
index 1a387b7d2de..28ea430635a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
+import java.util.Locale
import scala.jdk.CollectionConverters._
@@ -29,18 +30,9 @@ import org.apache.spark.sql.test.SharedSparkSession
class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with
SharedSparkSession {
test("Test `spark.sql.parquet.compression.codec` config") {
- Seq(
- "NONE",
- "UNCOMPRESSED",
- "SNAPPY",
- "GZIP",
- "LZO",
- "LZ4",
- "BROTLI",
- "ZSTD",
- "LZ4_RAW").foreach { c =>
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) {
- val expected = if (c == "NONE") "UNCOMPRESSED" else c
+ ParquetCompressionCodec.values().foreach { codec =>
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
+ val expected = codec.getCompressionCodec.name()
val option = new ParquetOptions(Map.empty[String, String],
spark.sessionState.conf)
assert(option.compressionCodecClassName == expected)
}
@@ -49,25 +41,32 @@ class ParquetCompressionCodecPrecedenceSuite extends
ParquetTest with SharedSpar
test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet
in right order.") {
// When "compression" is configured, it should be the first choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
- val props = Map("compression" -> "uncompressed",
ParquetOutputFormat.COMPRESSION -> "gzip")
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ val props = Map(
+ "compression" ->
ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT),
+ ParquetOutputFormat.COMPRESSION ->
+ ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
val option = new ParquetOptions(props, spark.sessionState.conf)
- assert(option.compressionCodecClassName == "UNCOMPRESSED")
+ assert(option.compressionCodecClassName ==
ParquetCompressionCodec.UNCOMPRESSED.name)
}
// When "compression" is not configured, "parquet.compression" should be
the preferred choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
- val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip")
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ val props = Map(ParquetOutputFormat.COMPRESSION ->
+ ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT))
val option = new ParquetOptions(props, spark.sessionState.conf)
- assert(option.compressionCodecClassName == "GZIP")
+ assert(option.compressionCodecClassName ==
ParquetCompressionCodec.GZIP.name)
}
// When both "compression" and "parquet.compression" are not configured,
// spark.sql.parquet.compression.codec should be the right choice.
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
val props = Map.empty[String, String]
val option = new ParquetOptions(props, spark.sessionState.conf)
- assert(option.compressionCodecClassName == "SNAPPY")
+ assert(option.compressionCodecClassName ==
ParquetCompressionCodec.SNAPPY.name)
}
}
@@ -113,8 +112,8 @@ class ParquetCompressionCodecPrecedenceSuite extends
ParquetTest with SharedSpar
}
test("Create parquet table with compression") {
+ val codecs = ParquetCompressionCodec.availableCodecs.asScala.map(_.name())
Seq(true, false).foreach { isPartitioned =>
- val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4",
"LZ4_RAW")
codecs.foreach { compressionCodec =>
checkCompressionCodec(compressionCodec, isPartitioned)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 95a45e52bfb..a5d5f8ce30f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -34,8 +34,6 @@ import org.apache.parquet.example.data.Group
import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory}
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.example.ExampleParquetWriter
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP
import org.apache.parquet.io.api.Binary
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
@@ -845,7 +843,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
val data = (0 until 10).map(i => (i, i.toString))
- def checkCompressionCodec(codec: CompressionCodecName): Unit = {
+ def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = {
withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
withParquetFile(data) { path =>
assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT))
{
@@ -857,12 +855,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
// Checks default compression codec
checkCompressionCodec(
-
CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
+
ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION)))
- checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
- checkCompressionCodec(CompressionCodecName.GZIP)
- checkCompressionCodec(CompressionCodecName.SNAPPY)
- checkCompressionCodec(CompressionCodecName.ZSTD)
+
ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_))
}
private def createParquetWriter(
@@ -878,7 +873,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
.withDictionaryEncoding(dictionaryEnabled)
.withType(schema)
.withWriterVersion(PARQUET_1_0)
- .withCompressionCodec(GZIP)
+ .withCompressionCodec(ParquetCompressionCodec.GZIP.getCompressionCodec)
.withRowGroupSize(1024 * 1024)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
@@ -1507,9 +1502,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
test("SPARK-18433: Improve DataSource option keys to be more
case-insensitive") {
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
- val option = new ParquetOptions(Map("Compression" -> "uncompressed"),
spark.sessionState.conf)
- assert(option.compressionCodecClassName == "UNCOMPRESSED")
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) {
+ val option = new ParquetOptions(
+ Map("Compression" ->
ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)),
+ spark.sessionState.conf)
+ assert(option.compressionCodecClassName ==
ParquetCompressionCodec.UNCOMPRESSED.name)
}
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
index a5d11f6e0e1..df28e7b4485 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.execution.datasources.orc.OrcOptions
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions,
ParquetTest}
+import
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec,
ParquetOptions, ParquetTest}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -289,8 +289,14 @@ class CompressionCodecSuite extends TestHiveSingleton with
ParquetTest with Befo
test("both table-level and session-level compression are set") {
checkForTableWithCompressProp("parquet",
- tableCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"),
- sessionCompressCodecs = List("SNAPPY", "GZIP", "SNAPPY"))
+ tableCompressCodecs = List(
+ ParquetCompressionCodec.UNCOMPRESSED.name,
+ ParquetCompressionCodec.SNAPPY.name,
+ ParquetCompressionCodec.GZIP.name),
+ sessionCompressCodecs = List(
+ ParquetCompressionCodec.SNAPPY.name,
+ ParquetCompressionCodec.GZIP.name,
+ ParquetCompressionCodec.SNAPPY.name))
checkForTableWithCompressProp("orc",
tableCompressCodecs =
List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name,
CompressionKind.ZLIB.name),
@@ -301,7 +307,10 @@ class CompressionCodecSuite extends TestHiveSingleton with
ParquetTest with Befo
test("table-level compression is not set but session-level compressions is
set ") {
checkForTableWithCompressProp("parquet",
tableCompressCodecs = List.empty,
- sessionCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"))
+ sessionCompressCodecs = List(
+ ParquetCompressionCodec.UNCOMPRESSED.name,
+ ParquetCompressionCodec.SNAPPY.name,
+ ParquetCompressionCodec.GZIP.name))
checkForTableWithCompressProp("orc",
tableCompressCodecs = List.empty,
sessionCompressCodecs =
@@ -339,7 +348,11 @@ class CompressionCodecSuite extends TestHiveSingleton with
ParquetTest with Befo
}
test("test table containing mixed compression codec") {
- checkTableWriteWithCompressionCodecs("parquet", List("UNCOMPRESSED",
"SNAPPY", "GZIP"))
+ checkTableWriteWithCompressionCodecs("parquet",
+ List(
+ ParquetCompressionCodec.UNCOMPRESSED.name,
+ ParquetCompressionCodec.SNAPPY.name,
+ ParquetCompressionCodec.GZIP.name))
checkTableWriteWithCompressionCodecs(
"orc",
List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name,
CompressionKind.ZLIB.name))
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 2a3c77a56e6..45dd8da6e02 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.hive
import java.time.{Duration, Period}
import java.time.temporal.ChronoUnit
+import java.util.Locale
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
+import
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec,
ParquetTest}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
@@ -157,7 +158,8 @@ class HiveParquetSuite extends QueryTest
test("SPARK-37098: Alter table properties should invalidate cache") {
// specify the compression in case we change it in future
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
+ ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) {
withTempPath { dir =>
withTable("t") {
sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION
'${dir.getCanonicalPath}'")
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 05d2ca1e210..78365d25c89 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader
+import
org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec,
ParquetFooterReader}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC,
CONVERT_METASTORE_PARQUET}
@@ -2709,7 +2709,9 @@ class HiveDDLSuite
assert(compression === actualCompression)
}
- Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat,
compression) =>
+ Seq(
+ ("orc", "ZLIB"),
+ ("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case
(fileFormat, compression) =>
test(s"SPARK-22158 convertMetastore should not ignore table property -
$fileFormat") {
withSQLConf(CONVERT_METASTORE_ORC.key -> "true",
CONVERT_METASTORE_PARQUET.key -> "true") {
withTable("t") {
@@ -2804,14 +2806,14 @@ class HiveDDLSuite
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde.get.contains("parquet"))
val properties = table.properties
- assert(properties.get("parquet.compression") == Some("GZIP"))
+ assert(properties.get("parquet.compression") ==
Some(ParquetCompressionCodec.GZIP.name))
assert(spark.table("t").collect().isEmpty)
sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
- assertCompression(maybeFile, "parquet", "GZIP")
+ assertCompression(maybeFile, "parquet",
ParquetCompressionCodec.GZIP.name)
}
}
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index 18e8401ee3d..84ee19e62bc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import
org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
+import
org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -199,7 +200,7 @@ class ParquetHadoopFsRelationSuite extends
HadoopFsRelationTest {
}
test("SPARK-13543: Support for specifying compression codec for Parquet via
option()") {
- withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key ->
ParquetCompressionCodec.UNCOMPRESSED.name) {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]