This is an automated email from the ASF dual-hosted git repository.
jackylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f12dbef7c [VL] Avoid use WriteFilesSpec which is not serialzable
(#6144)
f12dbef7c is described below
commit f12dbef7c41cc57116bc579ebaff1091e6e78c4d
Author: Jacky Lee <[email protected]>
AuthorDate: Thu Jun 20 11:04:44 2024 +0800
[VL] Avoid use WriteFilesSpec which is not serialzable (#6144)
---
.../execution/VeloxColumnarWriteFilesExec.scala | 29 ++++++++++------------
1 file changed, 13 insertions(+), 16 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index 1d3d55afb..c87b8d4f6 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -24,8 +24,8 @@ import
org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.{Partition, SparkException, TaskContext,
TaskOutputFileAlreadyExistException}
+import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.catalyst.InternalRow
@@ -90,7 +90,8 @@ case class VeloxWriteFilesMetrics(
*/
class VeloxColumnarWriteFilesRDD(
var prev: RDD[ColumnarBatch],
- writeFilesSpec: WriteFilesSpec,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol,
jobTrackerID: String)
extends RDD[WriterCommitMessage](prev) {
@@ -118,7 +119,7 @@ class VeloxColumnarWriteFilesRDD(
val fileWriteInfo = fileWriteInfos.head
numBytes += fileWriteInfo.fileSize
val targetFileName = fileWriteInfo.targetFileName
- val outputPath = writeFilesSpec.description.path
+ val outputPath = description.path
// part1=1/part2=1
val partitionFragment = metrics.name
@@ -126,7 +127,7 @@ class VeloxColumnarWriteFilesRDD(
if (partitionFragment != "") {
updatedPartitions += partitionFragment
val tmpOutputPath = outputPath + "/" + partitionFragment + "/" +
targetFileName
- val customOutputPath =
writeFilesSpec.description.customPartitionLocations.get(
+ val customOutputPath = description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionFragment))
if (customOutputPath.isDefined) {
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" +
targetFileName
@@ -174,8 +175,6 @@ class VeloxColumnarWriteFilesRDD(
private def writeFilesForEmptyIterator(
commitProtocol: SparkWriteFilesCommitProtocol): WriteTaskResult = {
- val description = writeFilesSpec.description
- val committer = writeFilesSpec.committer
val taskAttemptContext = commitProtocol.taskAttemptContext
val dataWriter =
@@ -194,10 +193,7 @@ class VeloxColumnarWriteFilesRDD(
}
override def compute(split: Partition, context: TaskContext):
Iterator[WriterCommitMessage] = {
- val commitProtocol = new SparkWriteFilesCommitProtocol(
- jobTrackerID,
- writeFilesSpec.description,
- writeFilesSpec.committer)
+ val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID,
description, committer)
commitProtocol.setupTask()
val writePath = commitProtocol.newTaskAttemptTempPath()
@@ -238,7 +234,7 @@ class VeloxColumnarWriteFilesRDD(
case t: Throwable =>
throw new SparkException(
s"Task failed while writing rows to staging path: $writePath, " +
- s"output path: ${writeFilesSpec.description.path}",
+ s"output path: ${description.path}",
t)
}
@@ -285,10 +281,9 @@ case class VeloxColumnarWriteFilesExec private (
/** Fallback to use vanilla Spark write files to generate an empty file for
metadata only. */
private def writeFilesForEmptyRDD(
- writeFilesSpec: WriteFilesSpec,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol,
jobTrackerID: String): RDD[WriterCommitMessage] = {
- val description = writeFilesSpec.description
- val committer = writeFilesSpec.committer
val rddWithNonEmptyPartitions =
session.sparkContext.parallelize(Seq.empty[InternalRow], 1)
rddWithNonEmptyPartitions.mapPartitionsInternal {
iterator =>
@@ -314,12 +309,14 @@ case class VeloxColumnarWriteFilesExec private (
val rdd = child.executeColumnar()
val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
+ val description = writeFilesSpec.description
+ val committer = writeFilesSpec.committer
if (rdd.partitions.length == 0) {
// SPARK-23271 If we are attempting to write a zero partition rdd,
create a dummy single
// partition rdd to make sure we at least set up one write task to write
the metadata.
- writeFilesForEmptyRDD(writeFilesSpec, jobTrackerID)
+ writeFilesForEmptyRDD(description, committer, jobTrackerID)
} else {
- new VeloxColumnarWriteFilesRDD(rdd, writeFilesSpec, jobTrackerID)
+ new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
}
}
override protected def withNewChildrenInternal(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]