This is an automated email from the ASF dual-hosted git repository.

jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f12dbef7c [VL] Avoid use WriteFilesSpec which is not serialzable 
(#6144)
f12dbef7c is described below

commit f12dbef7c41cc57116bc579ebaff1091e6e78c4d
Author: Jacky Lee <[email protected]>
AuthorDate: Thu Jun 20 11:04:44 2024 +0800

    [VL] Avoid use WriteFilesSpec which is not serialzable (#6144)
---
 .../execution/VeloxColumnarWriteFilesExec.scala    | 29 ++++++++++------------
 1 file changed, 13 insertions(+), 16 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 1d3d55afb..c87b8d4f6 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -24,8 +24,8 @@ import 
org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.{Partition, SparkException, TaskContext, 
TaskOutputFileAlreadyExistException}
+import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.sql.catalyst.InternalRow
@@ -90,7 +90,8 @@ case class VeloxWriteFilesMetrics(
  */
 class VeloxColumnarWriteFilesRDD(
     var prev: RDD[ColumnarBatch],
-    writeFilesSpec: WriteFilesSpec,
+    description: WriteJobDescription,
+    committer: FileCommitProtocol,
     jobTrackerID: String)
   extends RDD[WriterCommitMessage](prev) {
 
@@ -118,7 +119,7 @@ class VeloxColumnarWriteFilesRDD(
       val fileWriteInfo = fileWriteInfos.head
       numBytes += fileWriteInfo.fileSize
       val targetFileName = fileWriteInfo.targetFileName
-      val outputPath = writeFilesSpec.description.path
+      val outputPath = description.path
 
       // part1=1/part2=1
       val partitionFragment = metrics.name
@@ -126,7 +127,7 @@ class VeloxColumnarWriteFilesRDD(
       if (partitionFragment != "") {
         updatedPartitions += partitionFragment
         val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + 
targetFileName
-        val customOutputPath = 
writeFilesSpec.description.customPartitionLocations.get(
+        val customOutputPath = description.customPartitionLocations.get(
           PartitioningUtils.parsePathFragment(partitionFragment))
         if (customOutputPath.isDefined) {
           addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
@@ -174,8 +175,6 @@ class VeloxColumnarWriteFilesRDD(
 
   private def writeFilesForEmptyIterator(
       commitProtocol: SparkWriteFilesCommitProtocol): WriteTaskResult = {
-    val description = writeFilesSpec.description
-    val committer = writeFilesSpec.committer
     val taskAttemptContext = commitProtocol.taskAttemptContext
 
     val dataWriter =
@@ -194,10 +193,7 @@ class VeloxColumnarWriteFilesRDD(
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[WriterCommitMessage] = {
-    val commitProtocol = new SparkWriteFilesCommitProtocol(
-      jobTrackerID,
-      writeFilesSpec.description,
-      writeFilesSpec.committer)
+    val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, 
description, committer)
 
     commitProtocol.setupTask()
     val writePath = commitProtocol.newTaskAttemptTempPath()
@@ -238,7 +234,7 @@ class VeloxColumnarWriteFilesRDD(
       case t: Throwable =>
         throw new SparkException(
           s"Task failed while writing rows to staging path: $writePath, " +
-            s"output path: ${writeFilesSpec.description.path}",
+            s"output path: ${description.path}",
           t)
     }
 
@@ -285,10 +281,9 @@ case class VeloxColumnarWriteFilesExec private (
 
   /** Fallback to use vanilla Spark write files to generate an empty file for 
metadata only. */
   private def writeFilesForEmptyRDD(
-      writeFilesSpec: WriteFilesSpec,
+      description: WriteJobDescription,
+      committer: FileCommitProtocol,
       jobTrackerID: String): RDD[WriterCommitMessage] = {
-    val description = writeFilesSpec.description
-    val committer = writeFilesSpec.committer
     val rddWithNonEmptyPartitions = 
session.sparkContext.parallelize(Seq.empty[InternalRow], 1)
     rddWithNonEmptyPartitions.mapPartitionsInternal {
       iterator =>
@@ -314,12 +309,14 @@ case class VeloxColumnarWriteFilesExec private (
 
     val rdd = child.executeColumnar()
     val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
+    val description = writeFilesSpec.description
+    val committer = writeFilesSpec.committer
     if (rdd.partitions.length == 0) {
       // SPARK-23271 If we are attempting to write a zero partition rdd, 
create a dummy single
       // partition rdd to make sure we at least set up one write task to write 
the metadata.
-      writeFilesForEmptyRDD(writeFilesSpec, jobTrackerID)
+      writeFilesForEmptyRDD(description, committer, jobTrackerID)
     } else {
-      new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID)
+      new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
     }
   }
   override protected def withNewChildrenInternal(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to