This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 284b304a9 [GLUTEN-6026][VL] Add Support for HiveFileFormat parquet
write for Spark 3.4+ (#6062)
284b304a9 is described below
commit 284b304a9aec28c506fbb69a3c8393125ff0bac2
Author: Suraj Naik <[email protected]>
AuthorDate: Fri Jun 14 14:19:19 2024 +0530
[GLUTEN-6026][VL] Add Support for HiveFileFormat parquet write for Spark
3.4+ (#6062)
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 38 +++++++++++++++++++---
.../execution/VeloxParquetWriteForHiveSuite.scala | 6 +---
docs/velox-backend-limitations.md | 4 +++
.../scala/org/apache/gluten/GlutenConfig.scala | 12 +++++++
4 files changed, 51 insertions(+), 9 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 6bc7df98c..158be10f4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -36,6 +36,7 @@ import
org.apache.spark.sql.execution.aggregate.HashAggregateExec
import
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
import org.apache.spark.sql.execution.datasources.{FileFormat,
InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -182,6 +183,30 @@ object VeloxBackendSettings extends BackendSettingsApi {
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = {
+ // Validate if HiveFileFormat write is supported based on output file type
+ def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String]
= {
+ // Reflect to get access to fileSinkConf which contains the output file
format
+ val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf")
+ fileSinkConfField.setAccessible(true)
+ val fileSinkConf = fileSinkConfField.get(hiveFileFormat)
+ val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo")
+ tableInfoField.setAccessible(true)
+ val tableInfo = tableInfoField.get(fileSinkConf)
+ val getOutputFileFormatClassNameMethod = tableInfo.getClass
+ .getDeclaredMethod("getOutputFileFormatClassName")
+ val outputFileFormatClassName =
getOutputFileFormatClassNameMethod.invoke(tableInfo)
+
+ // Match based on the output file format class name
+ outputFileFormatClassName match {
+ case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
=>
+ None
+ case _ =>
+ Some(
+ "HiveFileFormat is supported only with Parquet as the output file
type"
+ ) // Unsupported format
+ }
+ }
+
def validateCompressionCodec(): Option[String] = {
// Velox doesn't support brotli and lzo.
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
@@ -194,7 +219,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
// Validate if all types are supported.
- def validateDateTypes(): Option[String] = {
+ def validateDataTypes(): Option[String] = {
val unsupportedTypes = fields.flatMap {
field =>
field.dataType match {
@@ -222,8 +247,13 @@ object VeloxBackendSettings extends BackendSettingsApi {
def validateFileFormat(): Option[String] = {
format match {
- case _: ParquetFileFormat => None
- case _: FileFormat => Some("Only parquet fileformat is supported in
Velox backend.")
+ case _: ParquetFileFormat => None // Parquet is directly supported
+ case h: HiveFileFormat if
GlutenConfig.getConf.enableHiveFileFormatWriter =>
+ validateHiveFileFormat(h) // Parquet via Hive SerDe
+ case _ =>
+ Some(
+ "Only ParquetFileFormat and HiveFileFormat are supported."
+ ) // Unsupported format
}
}
@@ -250,7 +280,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
- .orElse(validateDateTypes())
+ .orElse(validateDataTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.notOk(reason)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 9597e3110..731f5ef48 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -139,11 +139,7 @@ class VeloxParquetWriteForHiveSuite extends
GlutenQueryTest with SQLTestUtils {
withTable("t") {
spark.sql("CREATE TABLE t (c int) STORED AS PARQUET")
withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "false") {
- if (isSparkVersionGE("3.4")) {
- checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c",
checkNative = false)
- } else {
- checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c",
checkNative = true)
- }
+ checkNativeWrite("INSERT OVERWRITE TABLE t SELECT 1 as c", checkNative
= true)
}
checkAnswer(spark.table("t"), Row(1))
}
diff --git a/docs/velox-backend-limitations.md
b/docs/velox-backend-limitations.md
index 75b52f38e..002bbb3c3 100644
--- a/docs/velox-backend-limitations.md
+++ b/docs/velox-backend-limitations.md
@@ -118,6 +118,10 @@ spark.range(100).toDF("id")
.saveAsTable("velox_ctas")
```
+#### HiveFileFormat write
+
+Gluten supports writes of HiveFileFormat when the output file type is of type
`parquet` only
+
#### NaN support
Velox does NOT support NaN. So unexpected result can be obtained for a few
cases, e.g., comparing a number with NaN.
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 13ad8e471..a4e5a4425 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -438,6 +438,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def dynamicOffHeapSizingEnabled: Boolean =
conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED)
+
+ def enableHiveFileFormatWriter: Boolean =
conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)
}
object GlutenConfig {
@@ -1578,6 +1580,16 @@ object GlutenConfig {
.booleanConf
.createOptional
+ val NATIVE_HIVEFILEFORMAT_WRITER_ENABLED =
+ buildConf("spark.gluten.sql.native.hive.writer.enabled")
+ .internal()
+ .doc(
+ "This is config to specify whether to enable the native columnar
writer for " +
+ "HiveFileFormat. Currently only supports HiveFileFormat with Parquet
as the output " +
+ "file type.")
+ .booleanConf
+ .createWithDefault(true)
+
val NATIVE_ARROW_READER_ENABLED =
buildConf("spark.gluten.sql.native.arrow.reader.enabled")
.internal()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]