This is an automated email from the ASF dual-hosted git repository. beliefer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 62a3868b93a [SPARK-45481][SQL] Introduce a mapper for parquet compression codecs 62a3868b93a is described below commit 62a3868b93a51a4dc424e87d6fd06df914158e1b Author: Jiaan Geng <belie...@163.com> AuthorDate: Fri Oct 27 10:48:44 2023 +0800 [SPARK-45481][SQL] Introduce a mapper for parquet compression codecs ### What changes were proposed in this pull request? Currently, Spark supported all the parquet compression codecs, but the parquet supported compression codecs and spark supported are not completely one-on-one due to Spark introduce a fake compression codecs none. On the other hand, there are a lot of magic strings copy from parquet compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency. The `CompressionCodecName`, refer: https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java ### Why are the changes needed? Let developers easy to use parquet compression codecs. ### Does this PR introduce _any_ user-facing change? 'No'. Introduce a new class. ### How was this patch tested? Exists test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #43308 from beliefer/SPARK-45481. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Jiaan Geng <belie...@163.com> --- .../parquet/ParquetCompressionCodec.java | 62 ++++++++++++++++++++++ .../datasources/parquet/ParquetOptions.scala | 15 ++---- .../BuiltInDataSourceWriteBenchmark.scala | 6 ++- .../benchmark/DataSourceReadBenchmark.scala | 9 ++-- .../benchmark/FilterPushdownBenchmark.scala | 5 +- .../execution/benchmark/TPCDSQueryBenchmark.scala | 6 ++- .../datasources/FileSourceCodecSuite.scala | 12 +++-- .../ParquetCompressionCodecPrecedenceSuite.scala | 41 +++++++------- .../datasources/parquet/ParquetIOSuite.scala | 22 ++++---- .../spark/sql/hive/CompressionCodecSuite.scala | 23 ++++++-- .../apache/spark/sql/hive/HiveParquetSuite.scala | 6 ++- .../spark/sql/hive/execution/HiveDDLSuite.scala | 10 ++-- .../sql/sources/ParquetHadoopFsRelationSuite.scala | 3 +- 13 files changed, 155 insertions(+), 65 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java new file mode 100644 index 00000000000..1a37c7a33f2 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * A mapper class from Spark supported parquet compression codecs to parquet compression codecs. + */ +public enum ParquetCompressionCodec { + NONE(CompressionCodecName.UNCOMPRESSED), + UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED), + SNAPPY(CompressionCodecName.SNAPPY), + GZIP(CompressionCodecName.GZIP), + LZO(CompressionCodecName.LZO), + BROTLI(CompressionCodecName.BROTLI), + LZ4(CompressionCodecName.LZ4), + LZ4_RAW(CompressionCodecName.LZ4_RAW), + ZSTD(CompressionCodecName.ZSTD); + + private final CompressionCodecName compressionCodec; + + ParquetCompressionCodec(CompressionCodecName compressionCodec) { + this.compressionCodec = compressionCodec; + } + + public CompressionCodecName getCompressionCodec() { + return this.compressionCodec; + } + + public static ParquetCompressionCodec fromString(String s) { + return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT)); + } + + public static final List<ParquetCompressionCodec> availableCodecs = + Arrays.asList( + ParquetCompressionCodec.UNCOMPRESSED, + ParquetCompressionCodec.SNAPPY, + ParquetCompressionCodec.GZIP, + ParquetCompressionCodec.ZSTD, + ParquetCompressionCodec.LZ4, + ParquetCompressionCodec.LZ4_RAW); +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 559a994319d..ae110fdd0d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Locale import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -88,16 +87,10 @@ class ParquetOptions( object ParquetOptions extends DataSourceOptions { // The parquet compression short names - private val shortParquetCompressionCodecNames = Map( - "none" -> CompressionCodecName.UNCOMPRESSED, - "uncompressed" -> CompressionCodecName.UNCOMPRESSED, - "snappy" -> CompressionCodecName.SNAPPY, - "gzip" -> CompressionCodecName.GZIP, - "lzo" -> CompressionCodecName.LZO, - "brotli" -> CompressionCodecName.BROTLI, - "lz4" -> CompressionCodecName.LZ4, - "lz4_raw" -> CompressionCodecName.LZ4_RAW, - "zstd" -> CompressionCodecName.ZSTD) + private val shortParquetCompressionCodecNames = + ParquetCompressionCodec.values().map { + codec => codec.name().toLowerCase(Locale.ROOT) -> codec.getCompressionCodec + }.toMap def getParquetCompressionCodecName(name: String): String = { shortParquetCompressionCodecNames(name).name() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala index 4752787c501..ba3228878ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala @@ -16,9 +16,12 @@ */ package org.apache.spark.sql.execution.benchmark +import java.util.Locale + import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf /** @@ -51,7 +54,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark { mainArgs } - spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, "snappy") + spark.conf.set(SQLConf.PARQUET_COMPRESSION.key, + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy") formats.foreach { format => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index 771f944f1f6..a8736c04151 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.util.Locale import scala.jdk.CollectionConverters._ import scala.util.Random @@ -28,7 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnVector @@ -99,15 +100,17 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { spark.read.json(dir).createOrReplaceTempView("jsonTable") } + val parquetCodec = ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT) + private def saveAsParquetV1Table(df: DataFrameWriter[Row], dir: String): Unit = { - df.mode("overwrite").option("compression", "snappy").parquet(dir) + df.mode("overwrite").option("compression", parquetCodec).parquet(dir) spark.read.parquet(dir).createOrReplaceTempView("parquetV1Table") } private def saveAsParquetV2Table(df: DataFrameWriter[Row], dir: String): Unit = { withSQLConf(ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString) { - df.mode("overwrite").option("compression", "snappy").parquet(dir) + df.mode("overwrite").option("compression", parquetCodec).parquet(dir) spark.read.parquet(dir).createOrReplaceTempView("parquetV2Table") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala index 4862571b9c1..10781ec90fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.execution.benchmark import java.io.File +import java.util.Locale import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType @@ -50,7 +52,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark { .setIfMissing("spark.driver.memory", "3g") .setIfMissing("spark.executor.memory", "3g") .setIfMissing("orc.compression", "snappy") - .setIfMissing("spark.sql.parquet.compression.codec", "snappy") + .setIfMissing("spark.sql.parquet.compression.codec", + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) SparkSession.builder().config(conf).getOrCreate() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index c26272d1dcd..f01cfea62a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.benchmark +import java.util.Locale + import scala.util.Try import org.apache.spark.SparkConf @@ -29,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -51,7 +54,8 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark with Logging { val conf = new SparkConf() .setMaster(System.getProperty("spark.sql.test.master", "local[1]")) .setAppName("test-sql-context") - .set("spark.sql.parquet.compression.codec", "snappy") + .set("spark.sql.parquet.compression.codec", + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) .set("spark.sql.shuffle.partitions", System.getProperty("spark.sql.shuffle.partitions", "4")) .set("spark.driver.memory", "3g") .set("spark.executor.memory", "3g") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala index 11e9f4665a9..1f1805a02d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala @@ -17,7 +17,12 @@ package org.apache.spark.sql.execution.datasources +import java.util.Locale + +import scala.jdk.CollectionConverters._ + import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} @@ -58,9 +63,10 @@ class ParquetCodecSuite extends FileSourceCodecSuite { // Exclude "lzo" because it is GPL-licenced so not included in Hadoop. // Exclude "brotli" because the com.github.rdblue:brotli-codec dependency is not available // on Maven Central. - override protected def availableCodecs: Seq[String] = { - Seq("none", "uncompressed", "snappy", "gzip", "zstd", "lz4", "lz4_raw") - } + override protected def availableCodecs: Seq[String] = + (ParquetCompressionCodec.NONE +: + ParquetCompressionCodec.availableCodecs.asScala.iterator.to(Seq)) + .map(_.name().toLowerCase(Locale.ROOT)).iterator.to(Seq) } class OrcCodecSuite extends FileSourceCodecSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala index 1a387b7d2de..28ea430635a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import java.util.Locale import scala.jdk.CollectionConverters._ @@ -29,18 +30,9 @@ import org.apache.spark.sql.test.SharedSparkSession class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSparkSession { test("Test `spark.sql.parquet.compression.codec` config") { - Seq( - "NONE", - "UNCOMPRESSED", - "SNAPPY", - "GZIP", - "LZO", - "LZ4", - "BROTLI", - "ZSTD", - "LZ4_RAW").foreach { c => - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> c) { - val expected = if (c == "NONE") "UNCOMPRESSED" else c + ParquetCompressionCodec.values().foreach { codec => + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { + val expected = codec.getCompressionCodec.name() val option = new ParquetOptions(Map.empty[String, String], spark.sessionState.conf) assert(option.compressionCodecClassName == expected) } @@ -49,25 +41,32 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar test("[SPARK-21786] Test Acquiring 'compressionCodecClassName' for parquet in right order.") { // When "compression" is configured, it should be the first choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val props = Map("compression" -> "uncompressed", ParquetOutputFormat.COMPRESSION -> "gzip") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val props = Map( + "compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT), + ParquetOutputFormat.COMPRESSION -> + ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT)) val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "UNCOMPRESSED") + assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name) } // When "compression" is not configured, "parquet.compression" should be the preferred choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val props = Map(ParquetOutputFormat.COMPRESSION -> "gzip") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val props = Map(ParquetOutputFormat.COMPRESSION -> + ParquetCompressionCodec.GZIP.name.toLowerCase(Locale.ROOT)) val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "GZIP") + assert(option.compressionCodecClassName == ParquetCompressionCodec.GZIP.name) } // When both "compression" and "parquet.compression" are not configured, // spark.sql.parquet.compression.codec should be the right choice. - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { val props = Map.empty[String, String] val option = new ParquetOptions(props, spark.sessionState.conf) - assert(option.compressionCodecClassName == "SNAPPY") + assert(option.compressionCodecClassName == ParquetCompressionCodec.SNAPPY.name) } } @@ -113,8 +112,8 @@ class ParquetCompressionCodecPrecedenceSuite extends ParquetTest with SharedSpar } test("Create parquet table with compression") { + val codecs = ParquetCompressionCodec.availableCodecs.asScala.map(_.name()) Seq(true, false).foreach { isPartitioned => - val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", "LZ4_RAW") codecs.foreach { compressionCodec => checkCompressionCodec(compressionCodec, isPartitioned) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 95a45e52bfb..a5d5f8ce30f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -34,8 +34,6 @@ import org.apache.parquet.example.data.Group import org.apache.parquet.example.data.simple.{SimpleGroup, SimpleGroupFactory} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.example.ExampleParquetWriter -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -845,7 +843,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val data = (0 until 10).map(i => (i, i.toString)) - def checkCompressionCodec(codec: CompressionCodecName): Unit = { + def checkCompressionCodec(codec: ParquetCompressionCodec): Unit = { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { withParquetFile(data) { path => assertResult(spark.conf.get(SQLConf.PARQUET_COMPRESSION).toUpperCase(Locale.ROOT)) { @@ -857,12 +855,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession // Checks default compression codec checkCompressionCodec( - CompressionCodecName.fromConf(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) + ParquetCompressionCodec.fromString(spark.conf.get(SQLConf.PARQUET_COMPRESSION))) - checkCompressionCodec(CompressionCodecName.UNCOMPRESSED) - checkCompressionCodec(CompressionCodecName.GZIP) - checkCompressionCodec(CompressionCodecName.SNAPPY) - checkCompressionCodec(CompressionCodecName.ZSTD) + ParquetCompressionCodec.availableCodecs.asScala.foreach(checkCompressionCodec(_)) } private def createParquetWriter( @@ -878,7 +873,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .withDictionaryEncoding(dictionaryEnabled) .withType(schema) .withWriterVersion(PARQUET_1_0) - .withCompressionCodec(GZIP) + .withCompressionCodec(ParquetCompressionCodec.GZIP.getCompressionCodec) .withRowGroupSize(1024 * 1024) .withPageSize(pageSize) .withDictionaryPageSize(dictionaryPageSize) @@ -1507,9 +1502,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { - val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf) - assert(option.compressionCodecClassName == "UNCOMPRESSED") + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name.toLowerCase(Locale.ROOT)) { + val option = new ParquetOptions( + Map("Compression" -> ParquetCompressionCodec.UNCOMPRESSED.name.toLowerCase(Locale.ROOT)), + spark.sessionState.conf) + assert(option.compressionCodecClassName == ParquetCompressionCodec.UNCOMPRESSED.name) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala index a5d11f6e0e1..df28e7b4485 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala @@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.execution.datasources.orc.OrcOptions -import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetTest} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetOptions, ParquetTest} import org.apache.spark.sql.hive.orc.OrcFileOperator import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -289,8 +289,14 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo test("both table-level and session-level compression are set") { checkForTableWithCompressProp("parquet", - tableCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP"), - sessionCompressCodecs = List("SNAPPY", "GZIP", "SNAPPY")) + tableCompressCodecs = List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name), + sessionCompressCodecs = List( + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name, + ParquetCompressionCodec.SNAPPY.name)) checkForTableWithCompressProp("orc", tableCompressCodecs = List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name), @@ -301,7 +307,10 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo test("table-level compression is not set but session-level compressions is set ") { checkForTableWithCompressProp("parquet", tableCompressCodecs = List.empty, - sessionCompressCodecs = List("UNCOMPRESSED", "SNAPPY", "GZIP")) + sessionCompressCodecs = List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name)) checkForTableWithCompressProp("orc", tableCompressCodecs = List.empty, sessionCompressCodecs = @@ -339,7 +348,11 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo } test("test table containing mixed compression codec") { - checkTableWriteWithCompressionCodecs("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) + checkTableWriteWithCompressionCodecs("parquet", + List( + ParquetCompressionCodec.UNCOMPRESSED.name, + ParquetCompressionCodec.SNAPPY.name, + ParquetCompressionCodec.GZIP.name)) checkTableWriteWithCompressionCodecs( "orc", List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index 2a3c77a56e6..45dd8da6e02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive import java.time.{Duration, Period} import java.time.temporal.ChronoUnit +import java.util.Locale import org.apache.spark.sql.{AnalysisException, QueryTest, Row} -import org.apache.spark.sql.execution.datasources.parquet.ParquetTest +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetTest} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -157,7 +158,8 @@ class HiveParquetSuite extends QueryTest test("SPARK-37098: Alter table properties should invalidate cache") { // specify the compression in case we change it in future - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> + ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT)) { withTempPath { dir => withTable("t") { sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 05d2ca1e210..78365d25c89 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} @@ -2709,7 +2709,9 @@ class HiveDDLSuite assert(compression === actualCompression) } - Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) => + Seq( + ("orc", "ZLIB"), + ("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case (fileFormat, compression) => test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") { withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") { withTable("t") { @@ -2804,14 +2806,14 @@ class HiveDDLSuite assert(DDLUtils.isHiveTable(table)) assert(table.storage.serde.get.contains("parquet")) val properties = table.properties - assert(properties.get("parquet.compression") == Some("GZIP")) + assert(properties.get("parquet.compression") == Some(ParquetCompressionCodec.GZIP.name)) assert(spark.table("t").collect().isEmpty) sql("INSERT INTO t SELECT 1") checkAnswer(spark.table("t"), Row(1)) val maybeFile = path.listFiles().find(_.getName.startsWith("part")) - assertCompression(maybeFile, "parquet", "GZIP") + assertCompression(maybeFile, "parquet", ParquetCompressionCodec.GZIP.name) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index 18e8401ee3d..84ee19e62bc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -199,7 +200,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } test("SPARK-13543: Support for specifying compression codec for Parquet via option()") { - withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "UNCOMPRESSED") { + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> ParquetCompressionCodec.UNCOMPRESSED.name) { withTempPath { dir => val path = s"${dir.getCanonicalPath}/table1" val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org