This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ee153ed6fa [GLUTEN-11088][VL] Fix GlutenParquetIOSuite compatibility 
issues for Spark 4.0 (#11281)
ee153ed6fa is described below

commit ee153ed6fa877bbde6986c5eae8b991da8d7b73d
Author: Chang chen <[email protected]>
AuthorDate: Wed Dec 17 14:55:33 2025 +0800

    [GLUTEN-11088][VL] Fix GlutenParquetIOSuite compatibility issues for Spark 
4.0 (#11281)
    
    * Replace direct exception throwing with 
`GlutenFileFormatWriter.throwWriteError` for task failure handling.
    
    * Respect 'mapreduce.output.basename' configuration for file name 
generation, according to https://github.com/apache/spark/pull/48494
    
    * Refactor imports and variable initializations for improved clarity and 
consistency
    
    * Remove exclusions
    
    * Assert on the cause message
    
    * Enhance error handling in commit and abort tasks to provide better 
diagnostics
    
    * Fix minor syntax inconsistency
    
    ---------
    
    Co-authored-by: Chang chen <[email protected]>
---
 .../execution/SparkWriteFilesCommitProtocol.scala  | 24 +++++++++++++++-------
 .../gluten/utils/velox/VeloxTestSettings.scala     |  3 ---
 .../org/apache/gluten/sql/shims/SparkShims.scala   |  9 ++++++--
 .../gluten/sql/shims/spark40/Spark40Shims.scala    |  7 +++++--
 .../sql/execution/GlutenFileFormatWriter.scala     |  6 ++++++
 5 files changed, 35 insertions(+), 14 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 13a9b987f3..b29f6a10f1 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.execution
 
+import org.apache.gluten.sql.shims.SparkShimLoader
+
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, 
HadoopMapReduceCommitProtocol}
@@ -44,15 +46,15 @@ class SparkWriteFilesCommitProtocol(
   extends Logging {
   assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol])
 
-  val sparkStageId = TaskContext.get().stageId()
-  val sparkPartitionId = TaskContext.get().partitionId()
-  val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
+  val sparkStageId: Int = TaskContext.get().stageId()
+  val sparkPartitionId: Int = TaskContext.get().partitionId()
+  val sparkAttemptNumber: Int = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
   private val jobId = createJobID(jobTrackerID, sparkStageId)
 
   private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
   private val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
 
-  private var fileNames: mutable.Set[String] = null
+  private var fileNames: mutable.Set[String] = _
 
   // Set up the attempt context required to use in the output committer.
   val taskAttemptContext: TaskAttemptContext = {
@@ -86,7 +88,9 @@ class SparkWriteFilesCommitProtocol(
     // Note that %05d does not truncate the split number, so if we have more 
than 100000 tasks,
     // the file name is fine and won't overflow.
     val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
-    val fileName = 
f"${spec.prefix}part-$split%05d-${UUID.randomUUID().toString()}${spec.suffix}"
+    val basename = 
taskAttemptContext.getConfiguration.get("mapreduce.output.basename", "part")
+    val fileName = 
f"${spec.prefix}$basename-$split%05d-${UUID.randomUUID().toString}${spec.suffix}"
+
     fileNames += fileName
     fileName
   }
@@ -103,7 +107,13 @@ class SparkWriteFilesCommitProtocol(
     stagingDir.toString
   }
 
-  def commitTask(): Unit = {
+  private def enrichWriteError[T](path: => String)(f: => T): T = try {
+    f
+  } catch {
+    case t: Throwable => SparkShimLoader.getSparkShims.enrichWriteException(t, 
description.path)
+  }
+
+  def commitTask(): Unit = enrichWriteError(description.path) {
     val (_, taskCommitTime) = Utils.timeTakenMs {
       committer.commitTask(taskAttemptContext)
     }
@@ -114,7 +124,7 @@ class SparkWriteFilesCommitProtocol(
     }
   }
 
-  def abortTask(writePath: String): Unit = {
+  def abortTask(writePath: String): Unit = enrichWriteError(description.path) {
     committer.abortTask(taskAttemptContext)
 
     // Deletes the files written by current task.
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a63ca37d3f..bad6ccb015 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -475,9 +475,6 @@ class VeloxTestSettings extends BackendTestSettings {
     // Velox parquet reader not allow offset zero.
     .exclude("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings")
     // TODO: fix in Spark-4.0
-    .exclude("SPARK-49991: Respect 'mapreduce.output.basename' to generate 
file names")
-    .exclude("SPARK-6330 regression test")
-    .exclude("SPARK-7837 Do not close output writer twice when commitTask() 
fails")
     .exclude("explode nested lists crossing a rowgroup boundary")
   enableSuite[GlutenParquetV1PartitionDiscoverySuite]
   enableSuite[GlutenParquetV2PartitionDiscoverySuite]
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 33f59ff20e..de220cab82 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -97,7 +97,7 @@ trait SparkShims {
 
   def generateFileScanRDD(
       sparkSession: SparkSession,
-      readFunction: (PartitionedFile) => Iterator[InternalRow],
+      readFunction: PartitionedFile => Iterator[InternalRow],
       filePartitions: Seq[FilePartition],
       fileSourceScanExec: FileSourceScanExec): FileScanRDD
 
@@ -145,7 +145,7 @@ trait SparkShims {
           Expression,
           Expression,
           Int,
-          Int) => TypedImperativeAggregate[T]): Expression;
+          Int) => TypedImperativeAggregate[T]): Expression
 
   def replaceMightContain[T](
       expr: Expression,
@@ -343,6 +343,11 @@ trait SparkShims {
       t)
   }
 
+  // Compatibility method for Spark 4.0: rethrows the exception cause to 
maintain API compatibility
+  def enrichWriteException(cause: Throwable, path: String): Nothing = {
+    throw cause
+  }
+
   def getFileSourceScanStream(scan: FileSourceScanExec): 
Option[SparkDataStream] = {
     None
   }
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index 88fe373175..80c22f2fad 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -65,8 +65,8 @@ import org.apache.parquet.schema.MessageType
 import java.time.ZoneOffset
 import java.util.{Map => JMap}
 
-import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
 class Spark40Shims extends SparkShims {
@@ -151,7 +151,7 @@ class Spark40Shims extends SparkShims {
       options: CaseInsensitiveStringMap,
       partitionFilters: Seq[Expression],
       dataFilters: Seq[Expression]): TextScan = {
-    new TextScan(
+    TextScan(
       sparkSession,
       fileIndex,
       dataSchema,
@@ -742,6 +742,9 @@ class Spark40Shims extends SparkShims {
     throw t
   }
 
+  override def enrichWriteException(cause: Throwable, path: String): Nothing = 
{
+    GlutenFileFormatWriter.wrapWriteError(cause, path)
+  }
   override def getFileSourceScanStream(scan: FileSourceScanExec): 
Option[SparkDataStream] = {
     scan.stream
   }
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/GlutenFileFormatWriter.scala
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/GlutenFileFormatWriter.scala
index bd51939f9d..8dc5fbc964 100644
--- 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/GlutenFileFormatWriter.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/GlutenFileFormatWriter.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.{FileFormatWriter, 
WriteJobDescription, WriteTaskResult}
 
 object GlutenFileFormatWriter {
@@ -40,4 +41,9 @@ object GlutenFileFormatWriter {
       None
     )
   }
+
+  // Wrapper for throwing standardized write error using QueryExecutionErrors
+  def wrapWriteError(cause: Throwable, writePath: String): Nothing = {
+    throw QueryExecutionErrors.taskFailedWhileWritingRowsError(writePath, 
cause)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to