This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new eacb5f4ade [GLUTEN-11088][VL] Fix Spark 4.0 exception wrap (#11200)
eacb5f4ade is described below
commit eacb5f4ade6d04cdb9c9334764d6c64eac93c070
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Nov 27 08:31:45 2025 +0000
[GLUTEN-11088][VL] Fix Spark 4.0 exception wrap (#11200)
After this PR apache/spark#45797, exception will not be wrapped. This patch
fixed the behavior in Spark shim layers
---
.../spark/sql/execution/VeloxColumnarWriteFilesExec.scala | 11 ++++-------
.../org/apache/gluten/utils/velox/VeloxTestSettings.scala | 4 ----
.../main/scala/org/apache/gluten/sql/shims/SparkShims.scala | 9 ++++++++-
.../org/apache/gluten/sql/shims/spark40/Spark40Shims.scala | 7 +++++++
4 files changed, 19 insertions(+), 12 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 8c8dee902c..cc91e5b68a 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -18,11 +18,11 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.execution.ValidationResult
-import org.apache.gluten.execution.WriteFilesExecTransformer
+import org.apache.gluten.execution.{ValidationResult,
WriteFilesExecTransformer}
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.spark.{Partition, SparkException, TaskContext,
TaskOutputFileAlreadyExistException}
+import org.apache.spark.{Partition, TaskContext,
TaskOutputFileAlreadyExistException}
import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec,
SparkHadoopWriterUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.rdd.RDD
@@ -237,10 +237,7 @@ class VeloxColumnarWriteFilesRDD(
case f: FileAlreadyExistsException if
SQLConf.get.fastFailFileFormatOutput =>
throw new TaskOutputFileAlreadyExistException(f)
case t: Throwable =>
- throw new SparkException(
- s"Task failed while writing rows to staging path: $writePath, " +
- s"output path: ${description.path}",
- t)
+ SparkShimLoader.getSparkShims.throwExceptionInWrite(t, writePath,
description.path)
}
assert(writeTaskResult != null)
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 536b462521..39786d859a 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -694,10 +694,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-39557 INSERT INTO statements with tables with array
defaults")
.exclude("SPARK-39557 INSERT INTO statements with tables with struct
defaults")
.exclude("SPARK-39557 INSERT INTO statements with tables with map
defaults")
- // TODO: fix in Spark-4.0
- .exclude("Throw exceptions on inserting out-of-range decimal value with
ANSI casting policy")
- .exclude("Throw exceptions on inserting out-of-range long value with ANSI
casting policy")
- .exclude("Throw exceptions on inserting out-of-range int value with ANSI
casting policy")
enableSuite[GlutenPartitionedWriteSuite]
enableSuite[GlutenPathOptionSuite]
enableSuite[GlutenPrunedScanSuite]
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 2ffb1fff60..85916b316c 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.sql.shims
import org.apache.gluten.GlutenBuildInfo.SPARK_COMPILE_VERSION
import org.apache.gluten.expression.Sig
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.TaskInfo
@@ -334,4 +334,11 @@ trait SparkShims {
/** Shim method for get the "errorMessage" value for Spark 4.0 and above */
def getErrorMessage(raiseError: RaiseError): Option[Expression]
+
+ def throwExceptionInWrite(t: Throwable, writePath: String, descriptionPath:
String): Unit = {
+ throw new SparkException(
+ s"Task failed while writing rows to staging path: $writePath, " +
+ s"output path: $descriptionPath",
+ t)
+ }
}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 87fce3de0d..85b057ebaf 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -738,4 +738,11 @@ class Spark40Shims extends SparkShims {
None
}
}
+
+ override def throwExceptionInWrite(
+ t: Throwable,
+ writePath: String,
+ descriptionPath: String): Unit = {
+ throw t
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]