This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c13833e3a9 [VL] Validate Iceberg write support before executing 
offload (#10900)
c13833e3a9 is described below

commit c13833e3a9611a9af733069a4f299a0798b1ed8d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 23 11:05:24 2025 +0800

    [VL] Validate Iceberg write support before executing offload (#10900)
---
 .../org/apache/gluten/extension/OffloadIcebergWrite.scala      | 10 ++++++----
 .../main/scala/org/apache/spark/util/SparkReflectionUtil.scala |  9 +++++++++
 .../scala/org/apache/gluten/execution/IcebergWriteExec.scala   |  2 +-
 .../org/apache/iceberg/spark/source/IcebergWriteUtil.scala     |  5 +++--
 4 files changed, 19 insertions(+), 7 deletions(-)

diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
index 67b701f771..1b00c1a788 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
@@ -27,9 +27,11 @@ import org.apache.gluten.extension.injector.Injector
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
 
+import org.apache.iceberg.spark.source.IcebergWriteUtil.supportsWrite
+
 case class OffloadIcebergAppend() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case a: AppendDataExec =>
+    case a: AppendDataExec if supportsWrite(a.write) =>
       VeloxIcebergAppendDataExec(a)
     case other => other
   }
@@ -37,7 +39,7 @@ case class OffloadIcebergAppend() extends OffloadSingleNode {
 
 case class OffloadIcebergReplaceData() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case r: ReplaceDataExec =>
+    case r: ReplaceDataExec if supportsWrite(r.write) =>
       VeloxIcebergReplaceDataExec(r)
     case other => other
   }
@@ -45,7 +47,7 @@ case class OffloadIcebergReplaceData() extends 
OffloadSingleNode {
 
 case class OffloadIcebergOverwrite() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case r: OverwriteByExpressionExec =>
+    case r: OverwriteByExpressionExec if supportsWrite(r.write) =>
       VeloxIcebergOverwriteByExpressionExec(r)
     case other => other
   }
@@ -53,7 +55,7 @@ case class OffloadIcebergOverwrite() extends 
OffloadSingleNode {
 
 case class OffloadIcebergOverwritePartitionsDynamic() extends 
OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case r: OverwritePartitionsDynamicExec =>
+    case r: OverwritePartitionsDynamicExec if supportsWrite(r.write) =>
       VeloxIcebergOverwritePartitionsDynamicExec(r)
     case other => other
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala 
b/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
index ed243abb7f..60a15c8be9 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
@@ -27,4 +27,13 @@ object SparkReflectionUtil {
       noSparkClassLoader: Boolean = false): Class[C] = {
     Utils.classForName(className, initialize, noSparkClassLoader)
   }
+
+  def isInstanceOfClassName(obj: Any, className: String): Boolean = {
+    try {
+      val cls = classForName(className)
+      cls.isInstance(obj)
+    } catch {
+      case _: ClassNotFoundException => false
+    }
+  }
 }
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
index f06eda3c14..0d283b7556 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
@@ -60,7 +60,7 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
   }
 
   override def doValidateInternal(): ValidationResult = {
-    if (!IcebergWriteUtil.isDataWrite(write)) {
+    if (!IcebergWriteUtil.supportsWrite(write)) {
       return ValidationResult.failed(s"Not support the write 
${write.getClass.getSimpleName}")
     }
     if (IcebergWriteUtil.hasUnsupportedDataType(write)) {
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
index 12507af996..1d15259732 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
@@ -17,6 +17,7 @@
 package org.apache.iceberg.spark.source
 
 import org.apache.spark.sql.connector.write.{Write, WriterCommitMessage}
+import org.apache.spark.util.SparkReflectionUtil
 
 import org.apache.iceberg._
 import org.apache.iceberg.spark.SparkWriteConf
@@ -27,8 +28,8 @@ import org.apache.iceberg.types.Types.{ListType, MapType}
 
 object IcebergWriteUtil {
 
-  def isDataWrite(write: Write): Boolean = {
-    write.isInstanceOf[SparkWrite]
+  def supportsWrite(write: Write): Boolean = {
+    SparkReflectionUtil.isInstanceOfClassName(write, 
"org.apache.iceberg.spark.source.SparkWrite")
   }
 
   def hasUnsupportedDataType(write: Write): Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to