This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new eacb5f4ade [GLUTEN-11088][VL] Fix Spark 4.0 exception wrap (#11200)
eacb5f4ade is described below

commit eacb5f4ade6d04cdb9c9334764d6c64eac93c070
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Nov 27 08:31:45 2025 +0000

    [GLUTEN-11088][VL] Fix Spark 4.0 exception wrap (#11200)
    
    After this PR apache/spark#45797, exception will not be wrapped. This patch 
fixed the behavior in Spark shim layers
---
 .../spark/sql/execution/VeloxColumnarWriteFilesExec.scala     | 11 ++++-------
 .../org/apache/gluten/utils/velox/VeloxTestSettings.scala     |  4 ----
 .../main/scala/org/apache/gluten/sql/shims/SparkShims.scala   |  9 ++++++++-
 .../org/apache/gluten/sql/shims/spark40/Spark40Shims.scala    |  7 +++++++
 4 files changed, 19 insertions(+), 12 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 8c8dee902c..cc91e5b68a 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -18,11 +18,11 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.execution.ValidationResult
-import org.apache.gluten.execution.WriteFilesExecTransformer
+import org.apache.gluten.execution.{ValidationResult, 
WriteFilesExecTransformer}
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.sql.shims.SparkShimLoader
 
-import org.apache.spark.{Partition, SparkException, TaskContext, 
TaskOutputFileAlreadyExistException}
+import org.apache.spark.{Partition, TaskContext, 
TaskOutputFileAlreadyExistException}
 import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, 
SparkHadoopWriterUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.rdd.RDD
@@ -237,10 +237,7 @@ class VeloxColumnarWriteFilesRDD(
       case f: FileAlreadyExistsException if 
SQLConf.get.fastFailFileFormatOutput =>
         throw new TaskOutputFileAlreadyExistException(f)
       case t: Throwable =>
-        throw new SparkException(
-          s"Task failed while writing rows to staging path: $writePath, " +
-            s"output path: ${description.path}",
-          t)
+        SparkShimLoader.getSparkShims.throwExceptionInWrite(t, writePath, 
description.path)
     }
 
     assert(writeTaskResult != null)
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 536b462521..39786d859a 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -694,10 +694,6 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-39557 INSERT INTO statements with tables with array 
defaults")
     .exclude("SPARK-39557 INSERT INTO statements with tables with struct 
defaults")
     .exclude("SPARK-39557 INSERT INTO statements with tables with map 
defaults")
-    // TODO: fix in Spark-4.0
-    .exclude("Throw exceptions on inserting out-of-range decimal value with 
ANSI casting policy")
-    .exclude("Throw exceptions on inserting out-of-range long value with ANSI 
casting policy")
-    .exclude("Throw exceptions on inserting out-of-range int value with ANSI 
casting policy")
   enableSuite[GlutenPartitionedWriteSuite]
   enableSuite[GlutenPathOptionSuite]
   enableSuite[GlutenPrunedScanSuite]
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 2ffb1fff60..85916b316c 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.sql.shims
 import org.apache.gluten.GlutenBuildInfo.SPARK_COMPILE_VERSION
 import org.apache.gluten.expression.Sig
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.scheduler.TaskInfo
@@ -334,4 +334,11 @@ trait SparkShims {
 
   /** Shim method for get the "errorMessage" value for Spark 4.0 and above */
   def getErrorMessage(raiseError: RaiseError): Option[Expression]
+
+  def throwExceptionInWrite(t: Throwable, writePath: String, descriptionPath: 
String): Unit = {
+    throw new SparkException(
+      s"Task failed while writing rows to staging path: $writePath, " +
+        s"output path: $descriptionPath",
+      t)
+  }
 }
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 87fce3de0d..85b057ebaf 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -738,4 +738,11 @@ class Spark40Shims extends SparkShims {
         None
     }
   }
+
+  override def throwExceptionInWrite(
+      t: Throwable,
+      writePath: String,
+      descriptionPath: String): Unit = {
+    throw t
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to