This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new fa90ebd1a4 [GLUTEN-8721][VL] Native writer should keep the same
compression with vanilla if hive.exec.compress.output is true (#8722)
fa90ebd1a4 is described below
commit fa90ebd1a4af004eb3dc8c1ff84240b55c479693
Author: Kaifei Yi <[email protected]>
AuthorDate: Wed Feb 26 10:31:06 2025 +0800
[GLUTEN-8721][VL] Native writer should keep the same compression with
vanilla if hive.exec.compress.output is true (#8722)
---
.../execution/VeloxParquetWriteForHiveSuite.scala | 39 ++++++++++++++++++++++
.../spark/sql/hive/execution/HiveFileFormat.scala | 31 +++++++++++------
.../spark/sql/hive/execution/HiveFileFormat.scala | 31 +++++++++++------
3 files changed, 79 insertions(+), 22 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 11efdccfcf..44e36df09a 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -330,4 +330,43 @@ class VeloxParquetWriteForHiveSuite
}
}
}
+
+ testWithSpecifiedSparkVersion(
+ "Native writer should keep the same compression codec if
`hive.exec.compress.output` is true",
+ maxSparkVersion = Some("3.3")) {
+ Seq(false, true).foreach {
+ enableNativeWrite =>
+ withSQLConf("spark.gluten.sql.native.writer.enabled" ->
enableNativeWrite.toString) {
+ withTable("t") {
+ withSQLConf(
+ "spark.sql.hive.convertMetastoreParquet" -> "false",
+ "spark.sql.parquet.compression.codec" -> "gzip") {
+ spark.sql("SET hive.exec.compress.output=true")
+ spark.sql("SET parquet.compression=gzip")
+ spark.sql(
+ "SET
mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec")
+ checkNativeWrite(
+ "CREATE TABLE t STORED AS PARQUET TBLPROPERTIES
('parquet.compression'='zstd') " +
+ "AS SELECT 1 as c",
+ checkNative = enableNativeWrite)
+ val tableDir = new
Path(s"${conf.getConf(StaticSQLConf.WAREHOUSE_PATH)}/t")
+ val configuration = spark.sessionState.newHadoopConf()
+ val files = tableDir
+ .getFileSystem(configuration)
+ .listStatus(tableDir)
+ .filterNot(_.getPath.getName.startsWith("\\."))
+ assert(files.nonEmpty)
+ val in = HadoopInputFile.fromStatus(files.head,
spark.sessionState.newHadoopConf())
+ Utils.tryWithResource(ParquetFileReader.open(in)) {
+ reader =>
+ val compression =
+
reader.getFooter.getBlocks.get(0).getColumns.get(0).getCodec.toString
+ // native writer and vanilla spark hive writer should be
consistent
+ assert("zstd".equalsIgnoreCase(compression))
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 821d8317d8..6cec17878e 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -41,6 +41,8 @@ import
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{JobConf, Reporter}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.collection.JavaConverters._
@@ -103,17 +105,24 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
val tableOptions = tableDesc.getProperties.asScala.toMap
- val isParquetFormat = nativeFormat == "parquet"
- val compressionCodec = if (fileSinkConf.compressed) {
- // hive related configurations
- fileSinkConf.compressCodec
- } else if (isParquetFormat) {
- val parquetOptions =
- new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
- parquetOptions.compressionCodecClassName
- } else {
- val orcOptions = new OrcOptions(tableOptions,
sparkSession.sessionState.conf)
- orcOptions.compressionCodec
+ val compressionCodec = nativeFormat match {
+ case "parquet" if fileSinkConf.compressed =>
+ // MapredParquetOutputFormat use the
`ParquetOutputFormat.COMPRESSION` as
+ // the compression codec.
+ tableOptions.getOrElse(
+ ParquetOutputFormat.COMPRESSION,
+ conf.get(ParquetOutputFormat.COMPRESSION,
CompressionCodecName.UNCOMPRESSED.name))
+ case "parquet" =>
+ val parquetOptions =
+ new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
+ parquetOptions.compressionCodecClassName
+ case _ =>
+ if (fileSinkConf.compressed) {
+ fileSinkConf.compressCodec
+ } else {
+ val orcOptions = new OrcOptions(tableOptions,
sparkSession.sessionState.conf)
+ orcOptions.compressionCodec
+ }
}
val nativeConf =
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 61ad8fe72b..23a752c87f 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{JobConf, Reporter}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
import scala.collection.JavaConverters._
@@ -99,17 +101,24 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
if ("true" ==
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
val nativeFormat =
sparkSession.sparkContext.getLocalProperty("nativeFormat")
val tableOptions = tableDesc.getProperties.asScala.toMap
- val isParquetFormat = nativeFormat == "parquet"
- val compressionCodec = if (fileSinkConf.compressed) {
- // hive related configurations
- fileSinkConf.compressCodec
- } else if (isParquetFormat) {
- val parquetOptions =
- new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
- parquetOptions.compressionCodecClassName
- } else {
- val orcOptions = new OrcOptions(tableOptions,
sparkSession.sessionState.conf)
- orcOptions.compressionCodec
+ val compressionCodec = nativeFormat match {
+ case "parquet" if fileSinkConf.compressed =>
+ // MapredParquetOutputFormat use the
`ParquetOutputFormat.COMPRESSION` as
+ // the compression codec.
+ tableOptions.getOrElse(
+ ParquetOutputFormat.COMPRESSION,
+ conf.get(ParquetOutputFormat.COMPRESSION,
CompressionCodecName.UNCOMPRESSED.name))
+ case "parquet" =>
+ val parquetOptions =
+ new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
+ parquetOptions.compressionCodecClassName
+ case _ =>
+ if (fileSinkConf.compressed) {
+ fileSinkConf.compressCodec
+ } else {
+ val orcOptions = new OrcOptions(tableOptions,
sparkSession.sessionState.conf)
+ orcOptions.compressionCodec
+ }
}
val nativeConf =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]