This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 284b304a9 [GLUTEN-6026][VL] Add Support for HiveFileFormat parquet 
write for Spark 3.4+ (#6062)
284b304a9 is described below

commit 284b304a9aec28c506fbb69a3c8393125ff0bac2
Author: Suraj Naik <[email protected]>
AuthorDate: Fri Jun 14 14:19:19 2024 +0530

    [GLUTEN-6026][VL] Add Support for HiveFileFormat parquet write for Spark 
3.4+ (#6062)
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 38 +++++++++++++++++++---
 .../execution/VeloxParquetWriteForHiveSuite.scala  |  6 +---
 docs/velox-backend-limitations.md                  |  4 +++
 .../scala/org/apache/gluten/GlutenConfig.scala     | 12 +++++++
 4 files changed, 51 insertions(+), 9 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 6bc7df98c..158be10f4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -36,6 +36,7 @@ import 
org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
InsertIntoHadoopFsRelationCommand}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.execution.HiveFileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -182,6 +183,30 @@ object VeloxBackendSettings extends BackendSettingsApi {
       bucketSpec: Option[BucketSpec],
       options: Map[String, String]): ValidationResult = {
 
+    // Validate if HiveFileFormat write is supported based on output file type
+    def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] 
= {
+      // Reflect to get access to fileSinkConf which contains the output file 
format
+      val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf")
+      fileSinkConfField.setAccessible(true)
+      val fileSinkConf = fileSinkConfField.get(hiveFileFormat)
+      val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo")
+      tableInfoField.setAccessible(true)
+      val tableInfo = tableInfoField.get(fileSinkConf)
+      val getOutputFileFormatClassNameMethod = tableInfo.getClass
+        .getDeclaredMethod("getOutputFileFormatClassName")
+      val outputFileFormatClassName = 
getOutputFileFormatClassNameMethod.invoke(tableInfo)
+
+      // Match based on the output file format class name
+      outputFileFormatClassName match {
+        case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" 
=>
+          None
+        case _ =>
+          Some(
+            "HiveFileFormat is supported only with Parquet as the output file 
type"
+          ) // Unsupported format
+      }
+    }
+
     def validateCompressionCodec(): Option[String] = {
       // Velox doesn't support brotli and lzo.
       val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
@@ -194,7 +219,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
     }
 
     // Validate if all types are supported.
-    def validateDateTypes(): Option[String] = {
+    def validateDataTypes(): Option[String] = {
       val unsupportedTypes = fields.flatMap {
         field =>
           field.dataType match {
@@ -222,8 +247,13 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
     def validateFileFormat(): Option[String] = {
       format match {
-        case _: ParquetFileFormat => None
-        case _: FileFormat => Some("Only parquet fileformat is supported in 
Velox backend.")
+        case _: ParquetFileFormat => None // Parquet is directly supported
+        case h: HiveFileFormat if 
GlutenConfig.getConf.enableHiveFileFormatWriter =>
+          validateHiveFileFormat(h) // Parquet via Hive SerDe
+        case _ =>
+          Some(
+            "Only ParquetFileFormat and HiveFileFormat are supported."
+          ) // Unsupported format
       }
     }
 
@@ -250,7 +280,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
     validateCompressionCodec()
       .orElse(validateFileFormat())
       .orElse(validateFieldMetadata())
-      .orElse(validateDateTypes())
+      .orElse(validateDataTypes())
       .orElse(validateWriteFilesOptions())
       .orElse(validateBucketSpec()) match {
       case Some(reason) => ValidationResult.notOk(reason)
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 9597e3110..731f5ef48 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -139,11 +139,7 @@ class VeloxParquetWriteForHiveSuite extends 
GlutenQueryTest with SQLTestUtils {
     withTable("t") {
       spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
       withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
-        if (isSparkVersionGE("3.4")) {
-          checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", 
checkNative = false)
-        } else {
-          checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", 
checkNative = true)
-        }
+        checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative 
= true)
       }
       checkAnswer(spark.table("t"), Row(1))
     }
diff --git a/docs/velox-backend-limitations.md 
b/docs/velox-backend-limitations.md
index 75b52f38e..002bbb3c3 100644
--- a/docs/velox-backend-limitations.md
+++ b/docs/velox-backend-limitations.md
@@ -118,6 +118,10 @@ spark.range(100).toDF("id")
   .saveAsTable("velox_ctas")
 ```
 
+#### HiveFileFormat write
+
+Gluten supports writes of HiveFileFormat when the output file type is of type 
`parquet` only
+
 #### NaN support
 Velox does NOT support NaN. So unexpected result can be obtained for a few 
cases, e.g., comparing a number with NaN.
 
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 13ad8e471..a4e5a4425 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -438,6 +438,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def dynamicOffHeapSizingEnabled: Boolean =
     conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
+
+  def enableHiveFileFormatWriter: Boolean = 
conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
 }
 
 object GlutenConfig {
@@ -1578,6 +1580,16 @@ object GlutenConfig {
       .booleanConf
       .createOptional
 
+  val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
+    buildConf("spark.gluten.sql.native.hive.writer.enabled")
+      .internal()
+      .doc(
+        "This is config to specify whether to enable the native columnar 
writer for " +
+          "HiveFileFormat. Currently only supports HiveFileFormat with Parquet 
as the output " +
+          "file type.")
+      .booleanConf
+      .createWithDefault(true)
+
   val NATIVE_ARROW_READER_ENABLED =
     buildConf("spark.gluten.sql.native.arrow.reader.enabled")
       .internal()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to