This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 445616109 [GLUTEN-4964][CORE]Fallback complex data type in parquet
write for Spark32 & Spark33 (#5107)
445616109 is described below
commit 44561610991f5b4c258a4c65e97e01cb13c2aa14
Author: JiaKe <[email protected]>
AuthorDate: Thu Mar 28 08:54:47 2024 +0800
[GLUTEN-4964][CORE]Fallback complex data type in parquet write for Spark32
& Spark33 (#5107)
---
.../io/glutenproject/backendsapi/velox/VeloxBackend.scala | 11 +++++++++++
.../spark/sql/execution/VeloxParquetWriteSuite.scala | 14 ++++++++++++++
.../io/glutenproject/backendsapi/BackendSettingsApi.scala | 1 +
.../execution/datasources/GlutenWriterColumnarRules.scala | 3 ++-
4 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
index 3293abe3e..9d252149d 100644
---
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
@@ -255,6 +255,17 @@ object BackendSettings extends BackendSettingsApi {
}
}
+ override def supportNativeWrite(fields: Array[StructField]): Boolean = {
+ fields.map {
+ field =>
+ field.dataType match {
+ case _: TimestampType | _: StructType | _: ArrayType | _: MapType =>
return false
+ case _ =>
+ }
+ }
+ true
+ }
+
override def supportNativeMetadataColumns(): Boolean = true
override def supportExpandExec(): Boolean = true
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index dc30f0559..6f938b7b9 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -37,6 +37,20 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
super.sparkConf.set("spark.gluten.sql.native.writer.enabled", "true")
}
+ test("test Array(Struct) fallback") {
+ withTempPath {
+ f =>
+ val path = f.getCanonicalPath
+ val testAppender = new LogAppender("native write tracker")
+ withLogAppender(testAppender) {
+ spark.sql("select array(struct(1), null) as
var1").write.mode("overwrite").save(path)
+ }
+ assert(
+ testAppender.loggingEvents.exists(
+ _.getMessage.toString.contains("Use Gluten parquet write for
hive")) == false)
+ }
+ }
+
test("test write parquet with compression codec") {
// compression codec details see `VeloxParquetDatasource.cc`
Seq("snappy", "gzip", "zstd", "lz4", "none", "uncompressed")
diff --git
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
index 25d71f0fc..950eed2eb 100644
---
a/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
+++
b/gluten-core/src/main/scala/io/glutenproject/backendsapi/BackendSettingsApi.scala
@@ -41,6 +41,7 @@ trait BackendSettingsApi {
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = ValidationResult.ok
+ def supportNativeWrite(fields: Array[StructField]): Boolean = true
def supportNativeMetadataColumns(): Boolean = false
def supportExpandExec(): Boolean = false
def supportSortExec(): Boolean = false
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index d2c010cc4..80ef67ad6 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -156,7 +156,8 @@ object GlutenWriterColumnarRules {
if write.getClass.getName == NOOP_WRITE &&
BackendsApiManager.getSettings.enableNativeWriteFiles() =>
injectFakeRowAdaptor(rc, rc.child)
- case rc @ DataWritingCommandExec(cmd, child) =>
+ case rc @ DataWritingCommandExec(cmd, child)
+ if
BackendsApiManager.getSettings.supportNativeWrite(child.output.toStructType.fields)
=>
val format = getNativeFormat(cmd)
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]