This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new fa90ebd1a4 [GLUTEN-8721][VL] Native writer should keep the same 
compression with vanilla if hive.exec.compress.output is true (#8722)
fa90ebd1a4 is described below

commit fa90ebd1a4af004eb3dc8c1ff84240b55c479693
Author: Kaifei Yi <[email protected]>
AuthorDate: Wed Feb 26 10:31:06 2025 +0800

    [GLUTEN-8721][VL] Native writer should keep the same compression with 
vanilla if hive.exec.compress.output is true (#8722)
---
 .../execution/VeloxParquetWriteForHiveSuite.scala  | 39 ++++++++++++++++++++++
 .../spark/sql/hive/execution/HiveFileFormat.scala  | 31 +++++++++++------
 .../spark/sql/hive/execution/HiveFileFormat.scala  | 31 +++++++++++------
 3 files changed, 79 insertions(+), 22 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 11efdccfcf..44e36df09a 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -330,4 +330,43 @@ class VeloxParquetWriteForHiveSuite
       }
     }
   }
+
+  testWithSpecifiedSparkVersion(
+    "Native writer should keep the same compression codec if 
`hive.exec.compress.output` is true",
+    maxSparkVersion = Some("3.3")) {
+    Seq(false, true).foreach {
+      enableNativeWrite =>
+        withSQLConf("spark.gluten.sql.native.writer.enabled" -> 
enableNativeWrite.toString) {
+          withTable("t") {
+            withSQLConf(
+              "spark.sql.hive.convertMetastoreParquet" -> "false",
+              "spark.sql.parquet.compression.codec" -> "gzip") {
+              spark.sql("SET hive.exec.compress.output=true")
+              spark.sql("SET parquet.compression=gzip")
+              spark.sql(
+                "SET 
mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec")
+              checkNativeWrite(
+                "CREATE TABLE t STORED AS PARQUET TBLPROPERTIES 
('parquet.compression'='zstd') " +
+                  "AS SELECT 1 as c",
+                checkNative = enableNativeWrite)
+              val tableDir = new 
Path(s"${conf.getConf(StaticSQLConf.WAREHOUSE_PATH)}/t")
+              val configuration = spark.sessionState.newHadoopConf()
+              val files = tableDir
+                .getFileSystem(configuration)
+                .listStatus(tableDir)
+                .filterNot(_.getPath.getName.startsWith("\\."))
+              assert(files.nonEmpty)
+              val in = HadoopInputFile.fromStatus(files.head, 
spark.sessionState.newHadoopConf())
+              Utils.tryWithResource(ParquetFileReader.open(in)) {
+                reader =>
+                  val compression =
+                    
reader.getFooter.getBlocks.get(0).getColumns.get(0).getCodec.toString
+                  // native writer and vanilla spark hive writer should be 
consistent
+                  assert("zstd".equalsIgnoreCase(compression))
+              }
+            }
+          }
+        }
+    }
+  }
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 821d8317d8..6cec17878e 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -41,6 +41,8 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{JobConf, Reporter}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
 import scala.collection.JavaConverters._
 
@@ -103,17 +105,24 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
     if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
       val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
       val tableOptions = tableDesc.getProperties.asScala.toMap
-      val isParquetFormat = nativeFormat == "parquet"
-      val compressionCodec = if (fileSinkConf.compressed) {
-        // hive related configurations
-        fileSinkConf.compressCodec
-      } else if (isParquetFormat) {
-        val parquetOptions =
-          new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
-        parquetOptions.compressionCodecClassName
-      } else {
-        val orcOptions = new OrcOptions(tableOptions, 
sparkSession.sessionState.conf)
-        orcOptions.compressionCodec
+      val compressionCodec = nativeFormat match {
+        case "parquet" if fileSinkConf.compressed =>
+          // MapredParquetOutputFormat use the 
`ParquetOutputFormat.COMPRESSION` as
+          // the compression codec.
+          tableOptions.getOrElse(
+            ParquetOutputFormat.COMPRESSION,
+            conf.get(ParquetOutputFormat.COMPRESSION, 
CompressionCodecName.UNCOMPRESSED.name))
+        case "parquet" =>
+          val parquetOptions =
+            new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
+          parquetOptions.compressionCodecClassName
+        case _ =>
+          if (fileSinkConf.compressed) {
+            fileSinkConf.compressCodec
+          } else {
+            val orcOptions = new OrcOptions(tableOptions, 
sparkSession.sessionState.conf)
+            orcOptions.compressionCodec
+          }
       }
 
       val nativeConf =
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 61ad8fe72b..23a752c87f 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{JobConf, Reporter}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
 import scala.collection.JavaConverters._
 
@@ -99,17 +101,24 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
     if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
       val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
       val tableOptions = tableDesc.getProperties.asScala.toMap
-      val isParquetFormat = nativeFormat == "parquet"
-      val compressionCodec = if (fileSinkConf.compressed) {
-        // hive related configurations
-        fileSinkConf.compressCodec
-      } else if (isParquetFormat) {
-        val parquetOptions =
-          new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
-        parquetOptions.compressionCodecClassName
-      } else {
-        val orcOptions = new OrcOptions(tableOptions, 
sparkSession.sessionState.conf)
-        orcOptions.compressionCodec
+      val compressionCodec = nativeFormat match {
+        case "parquet" if fileSinkConf.compressed =>
+          // MapredParquetOutputFormat use the 
`ParquetOutputFormat.COMPRESSION` as
+          // the compression codec.
+          tableOptions.getOrElse(
+            ParquetOutputFormat.COMPRESSION,
+            conf.get(ParquetOutputFormat.COMPRESSION, 
CompressionCodecName.UNCOMPRESSED.name))
+        case "parquet" =>
+          val parquetOptions =
+            new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
+          parquetOptions.compressionCodecClassName
+        case _ =>
+          if (fileSinkConf.compressed) {
+            fileSinkConf.compressCodec
+          } else {
+            val orcOptions = new OrcOptions(tableOptions, 
sparkSession.sessionState.conf)
+            orcOptions.compressionCodec
+          }
       }
 
       val nativeConf =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to