This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f633870b41 [GLUTEN-8039][VL] Native writer should respect table 
properties (#8040)
f633870b41 is described below

commit f633870b41613bd04d1a63c587b7c1de4ed234d4
Author: Kaifei Yi <[email protected]>
AuthorDate: Wed Nov 27 14:16:05 2024 +0800

    [GLUTEN-8039][VL] Native writer should respect table properties (#8040)
---
 .../execution/VeloxParquetWriteForHiveSuite.scala  | 37 ++++++++++++++++++++++
 .../spark/sql/hive/execution/HiveFileFormat.scala  | 10 ++++--
 .../spark/sql/hive/execution/HiveFileFormat.scala  |  9 ++++--
 3 files changed, 50 insertions(+), 6 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 8c2b98988e..5932f4e5a7 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -27,6 +27,11 @@ import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.util.QueryExecutionListener
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.util.HadoopInputFile
 
 class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
   private var _spark: SparkSession = _
@@ -185,4 +190,36 @@ class VeloxParquetWriteForHiveSuite extends 
GlutenQueryTest with SQLTestUtils {
       checkAnswer(spark.table("t"), Row(1))
     }
   }
+
+  test("native writer should respect table properties") {
+    Seq(true, false).foreach {
+      enableNativeWrite =>
+        withSQLConf("spark.gluten.sql.native.writer.enabled" -> 
enableNativeWrite.toString) {
+          withTable("t") {
+            withSQLConf(
+              "spark.sql.hive.convertMetastoreParquet" -> "false",
+              "spark.sql.parquet.compression.codec" -> "gzip") {
+              checkNativeWrite(
+                "CREATE TABLE t STORED AS PARQUET TBLPROPERTIES 
('parquet.compression'='zstd') " +
+                  "AS SELECT 1 as c",
+                checkNative = enableNativeWrite)
+              val tableDir = new 
Path(s"${conf.getConf(StaticSQLConf.WAREHOUSE_PATH)}/t")
+              val configuration = spark.sessionState.newHadoopConf()
+              val files = tableDir
+                .getFileSystem(configuration)
+                .listStatus(tableDir)
+                .filterNot(_.getPath.getName.startsWith("\\."))
+              assert(files.nonEmpty)
+              val in = HadoopInputFile.fromStatus(files.head, 
spark.sessionState.newHadoopConf())
+              Utils.tryWithResource(ParquetFileReader.open(in)) {
+                reader =>
+                  val column = 
reader.getFooter.getBlocks.get(0).getColumns.get(0)
+                  // native writer and vanilla spark hive writer should be 
consistent
+                  "zstd".equalsIgnoreCase(column.getCodec.toString)
+              }
+            }
+          }
+        }
+    }
+  }
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index c21c67f654..821d8317d8 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapred.{JobConf, Reporter}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import scala.collection.JavaConverters._
+
 /* -
  * This class is copied from Spark 3.2 and modified for Gluten. \n
  * Gluten should make sure this class is loaded before the original class.
@@ -101,19 +102,22 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
     val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
     if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
       val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
+      val tableOptions = tableDesc.getProperties.asScala.toMap
       val isParquetFormat = nativeFormat == "parquet"
       val compressionCodec = if (fileSinkConf.compressed) {
         // hive related configurations
         fileSinkConf.compressCodec
       } else if (isParquetFormat) {
-        val parquetOptions = new ParquetOptions(options, 
sparkSession.sessionState.conf)
+        val parquetOptions =
+          new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
         parquetOptions.compressionCodecClassName
       } else {
-        val orcOptions = new OrcOptions(options, 
sparkSession.sessionState.conf)
+        val orcOptions = new OrcOptions(tableOptions, 
sparkSession.sessionState.conf)
         orcOptions.compressionCodec
       }
 
-      val nativeConf = GlutenFormatFactory(nativeFormat).nativeConf(options, 
compressionCodec)
+      val nativeConf =
+        GlutenFormatFactory(nativeFormat).nativeConf(tableOptions, 
compressionCodec)
 
       new OutputWriterFactory {
         private val jobConf = new SerializableJobConf(new JobConf(conf))
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 6ed1b4d215..61ad8fe72b 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -98,19 +98,22 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
     val outputFormat = fileSinkConf.tableInfo.getOutputFileFormatClassName
     if ("true" == 
sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
       val nativeFormat = 
sparkSession.sparkContext.getLocalProperty("nativeFormat")
+      val tableOptions = tableDesc.getProperties.asScala.toMap
       val isParquetFormat = nativeFormat == "parquet"
       val compressionCodec = if (fileSinkConf.compressed) {
         // hive related configurations
         fileSinkConf.compressCodec
       } else if (isParquetFormat) {
-        val parquetOptions = new ParquetOptions(options, 
sparkSession.sessionState.conf)
+        val parquetOptions =
+          new ParquetOptions(tableOptions, sparkSession.sessionState.conf)
         parquetOptions.compressionCodecClassName
       } else {
-        val orcOptions = new OrcOptions(options, 
sparkSession.sessionState.conf)
+        val orcOptions = new OrcOptions(tableOptions, 
sparkSession.sessionState.conf)
         orcOptions.compressionCodec
       }
 
-      val nativeConf = GlutenFormatFactory(nativeFormat).nativeConf(options, 
compressionCodec)
+      val nativeConf =
+        GlutenFormatFactory(nativeFormat).nativeConf(tableOptions, 
compressionCodec)
 
       new OutputWriterFactory {
         private val jobConf = new SerializableJobConf(new JobConf(conf))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to