This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f64eb3a4f [GLUTEN-6705]][CORE][VL][CH] [Part-2] Rework 
CumnarWriteFilesExec (#6761)
f64eb3a4f is described below

commit f64eb3a4f4a49720f917a41ec5349e54fb82c31a
Author: Chang chen <[email protected]>
AuthorDate: Fri Aug 9 09:29:33 2024 +0800

    [GLUTEN-6705]][CORE][VL][CH] [Part-2] Rework CumnarWriteFilesExec (#6761)
---
 .../clickhouse/CHSparkPlanExecApi.scala            |  22 +-
 .../sql/execution/CHColumnarWriteFilesExec.scala   | 201 ++++++----------
 .../sql/execution/ClickhouseBackendWrite.scala     |  84 -------
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  23 +-
 .../spark/sql/execution/VeloxBackendWrite.scala    | 138 -----------
 .../execution/VeloxColumnarWriteFilesExec.scala    | 253 ++++++++++-----------
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  18 +-
 .../extension/columnar/OffloadSingleNode.scala     |   3 +-
 .../sql/execution/ColumnarWriteFilesExec.scala     | 157 +------------
 9 files changed, 238 insertions(+), 661 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index bba5525ed..8254a1e14 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -38,6 +38,8 @@ import org.apache.spark.shuffle.{GenShuffleWriterParameters, 
GlutenShuffleWriter
 import org.apache.spark.shuffle.utils.CHShuffleUtil
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, 
EqualToRewrite}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
CollectList, CollectSet}
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
@@ -49,7 +51,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.delta.files.TahoeFileIndex
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
-import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation, WriteJobDescription}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
@@ -682,8 +684,22 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
     CHRegExpReplaceTransformer(substraitExprName, children, expr)
   }
 
-  def createBackendWrite(description: WriteJobDescription): BackendWrite = 
ClickhouseBackendWrite(
-    description)
+  override def createColumnarWriteFilesExec(
+      child: SparkPlan,
+      noop: SparkPlan,
+      fileFormat: FileFormat,
+      partitionColumns: Seq[Attribute],
+      bucketSpec: Option[BucketSpec],
+      options: Map[String, String],
+      staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec =
+    CHColumnarWriteFilesExec(
+      child,
+      noop,
+      fileFormat,
+      partitionColumns,
+      bucketSpec,
+      options,
+      staticPartitions)
 
   override def createColumnarArrowEvalPythonExec(
       udfs: Seq[PythonUDF],
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
similarity index 57%
copy from 
gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
copy to 
backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
index b809a4b65..52019d770 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
@@ -17,21 +17,15 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.extension.GlutenPlan
-import 
org.apache.gluten.extension.columnar.transition.Convention.{KnownRowType, 
RowType}
-import org.apache.gluten.extension.columnar.transition.ConventionReq
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
-import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.{Partition, SparkException, TaskContext, 
TaskOutputFileAlreadyExistException}
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.rdd.RDD
 import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
@@ -42,25 +36,73 @@ import org.apache.hadoop.fs.FileAlreadyExistsException
 
 import java.util.Date
 
-/**
- * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write 
path before
- * initializing the native plan and collect native write files metrics for 
each backend.
- */
-trait BackendWrite {
-  def collectNativeWriteFilesMetrics(batch: ColumnarBatch): 
Option[WriteTaskResult]
-}
+import scala.collection.mutable
 
 /**
  * This RDD is used to make sure we have injected staging write path before 
initializing the native
  * plan, and support Spark file commit protocol.
  */
-class ColumnarWriteFilesRDD(
+class CHColumnarWriteFilesRDD(
     var prev: RDD[ColumnarBatch],
     description: WriteJobDescription,
     committer: FileCommitProtocol,
     jobTrackerID: String)
   extends RDD[WriterCommitMessage](prev) {
 
+  private def collectNativeResult(cb: ColumnarBatch): Option[WriteTaskResult] 
= {
+    val numFiles = cb.numRows()
+    // Write an empty iterator
+    if (numFiles == 0) {
+      None
+    } else {
+      val file_col = cb.column(0)
+      val partition_col = cb.column(1)
+      val count_col = cb.column(2)
+
+      val outputPath = description.path
+      var updatedPartitions = Set.empty[String]
+      val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, 
String]()
+
+      val write_stats = Range(0, cb.numRows()).map {
+        i =>
+          val targetFileName = file_col.getUTF8String(i).toString
+          val partition = partition_col.getUTF8String(i).toString
+          if (partition != "__NO_PARTITION_ID__") {
+            updatedPartitions += partition
+            val tmpOutputPath = outputPath + "/" + partition + "/" + 
targetFileName
+            val customOutputPath =
+              description.customPartitionLocations.get(
+                PartitioningUtils.parsePathFragment(partition))
+            if (customOutputPath.isDefined) {
+              addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
+            }
+          }
+          count_col.getLong(i)
+      }
+
+      val partitionsInternalRows = updatedPartitions.map {
+        part =>
+          val parts = new Array[Any](1)
+          parts(0) = part
+          new GenericInternalRow(parts)
+      }.toSeq
+
+      val numWrittenRows = write_stats.sum
+      val stats = BasicWriteTaskStats(
+        partitions = partitionsInternalRows,
+        numFiles = numFiles,
+        numBytes = 101,
+        numRows = numWrittenRows)
+      val summary =
+        ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = 
Seq(stats))
+
+      Some(
+        WriteTaskResult(
+          new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+          summary))
+    }
+  }
+
   private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
     val stats = 
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
     val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
@@ -94,8 +136,6 @@ class ColumnarWriteFilesRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[WriterCommitMessage] = {
     val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, 
description, committer)
-    val backendWrite =
-      
BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description)
 
     commitProtocol.setupTask()
     val writePath = commitProtocol.newTaskAttemptTempPath()
@@ -112,7 +152,7 @@ class ColumnarWriteFilesRDD(
         assert(iter.hasNext)
         val resultColumnarBatch = iter.next()
         assert(resultColumnarBatch != null)
-        val nativeWriteTaskResult = 
backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch)
+        val nativeWriteTaskResult = collectNativeResult(resultColumnarBatch)
         if (nativeWriteTaskResult.isEmpty) {
           // If we are writing an empty iterator, then velox would do nothing.
           // Here we fallback to use vanilla Spark write files to generate an 
empty file for
@@ -155,58 +195,20 @@ class ColumnarWriteFilesRDD(
   }
 }
 
-// The class inherits from "BinaryExecNode" instead of "UnaryExecNode" because
-// we need to expose a dummy child (as right child) with type "WriteFilesExec" 
to let Spark
-// choose the new write code path (version >= 3.4). The actual plan to write 
is the left child
-// of this operator.
-case class ColumnarWriteFilesExec private (
+case class CHColumnarWriteFilesExec(
     override val left: SparkPlan,
     override val right: SparkPlan,
     fileFormat: FileFormat,
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
     options: Map[String, String],
-    staticPartitions: TablePartitionSpec)
-  extends BinaryExecNode
-  with GlutenPlan
-  with ColumnarWriteFilesExec.ExecuteWriteCompatible {
-
-  val child: SparkPlan = left
+    staticPartitions: TablePartitionSpec
+) extends ColumnarWriteFilesExec(left, right) {
 
-  override lazy val references: AttributeSet = AttributeSet.empty
-
-  override def supportsColumnar(): Boolean = true
-
-  override def output: Seq[Attribute] = Seq.empty
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    throw new GlutenException(s"$nodeName does not support doExecute")
-  }
-
-  /** Fallback to use vanilla Spark write files to generate an empty file for 
metadata only. */
-  private def writeFilesForEmptyRDD(
-      description: WriteJobDescription,
-      committer: FileCommitProtocol,
-      jobTrackerID: String): RDD[WriterCommitMessage] = {
-    val rddWithNonEmptyPartitions = 
session.sparkContext.parallelize(Seq.empty[InternalRow], 1)
-    rddWithNonEmptyPartitions.mapPartitionsInternal {
-      iterator =>
-        val sparkStageId = TaskContext.get().stageId()
-        val sparkPartitionId = TaskContext.get().partitionId()
-        val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
-
-        val ret = SparkShimLoader.getSparkShims.writeFilesExecuteTask(
-          description,
-          jobTrackerID,
-          sparkStageId,
-          sparkPartitionId,
-          sparkAttemptNumber,
-          committer,
-          iterator
-        )
-        Iterator(ret)
-    }
-  }
+  override protected def withNewChildrenInternal(
+      newLeft: SparkPlan,
+      newRight: SparkPlan): SparkPlan =
+    copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
 
   override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): 
RDD[WriterCommitMessage] = {
     assert(child.supportsColumnar)
@@ -220,76 +222,7 @@ case class ColumnarWriteFilesExec private (
       // partition rdd to make sure we at least set up one write task to write 
the metadata.
       writeFilesForEmptyRDD(description, committer, jobTrackerID)
     } else {
-      new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
-    }
-  }
-  override protected def withNewChildrenInternal(
-      newLeft: SparkPlan,
-      newRight: SparkPlan): SparkPlan =
-    copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
-}
-
-object ColumnarWriteFilesExec {
-
-  def apply(
-      child: SparkPlan,
-      fileFormat: FileFormat,
-      partitionColumns: Seq[Attribute],
-      bucketSpec: Option[BucketSpec],
-      options: Map[String, String],
-      staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
-    // This is a workaround for FileFormatWriter#write. Vanilla Spark (version 
>= 3.4) requires for
-    // a plan that has at least one node exactly of type `WriteFilesExec` that 
is a Scala
-    // case-class, to decide to choose new `#executeWrite` code path over the 
legacy `#execute`
-    // for write operation.
-    //
-    // So we add a no-op `WriteFilesExec` child to let Spark pick the new code 
path.
-    //
-    // See: FileFormatWriter#write
-    // See: V1Writes#getWriteFilesOpt
-    val right: SparkPlan =
-      WriteFilesExec(
-        NoopLeaf(),
-        fileFormat,
-        partitionColumns,
-        bucketSpec,
-        options,
-        staticPartitions)
-
-    ColumnarWriteFilesExec(
-      child,
-      right,
-      fileFormat,
-      partitionColumns,
-      bucketSpec,
-      options,
-      staticPartitions)
-  }
-
-  private case class NoopLeaf() extends LeafExecNode {
-    override protected def doExecute(): RDD[InternalRow] =
-      throw new GlutenException(s"$nodeName does not support doExecute")
-    override def output: Seq[Attribute] = Seq.empty
-  }
-
-  /**
-   * ColumnarWriteFilesExec neither output Row nor columnar data. We output 
both row and columnar to
-   * avoid c2r and r2c transitions. Please note, [[GlutenPlan]] already 
implement batchType()
-   */
-  sealed trait ExecuteWriteCompatible extends KnownChildrenConventions with 
KnownRowType {
-    // To be compatible with Spark (version < 3.4)
-    protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): 
RDD[WriterCommitMessage] = {
-      throw new GlutenException(
-        s"Internal Error ${this.getClass} has write support" +
-          s" mismatch:\n${this}")
-    }
-
-    override def requiredChildrenConventions(): Seq[ConventionReq] = {
-      List(ConventionReq.backendBatch)
-    }
-
-    override def rowType(): RowType = {
-      RowType.VanillaRow
+      new CHColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
     }
   }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala
deleted file mode 100644
index 225d9688c..000000000
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import scala.collection.mutable
-
-case class ClickhouseBackendWrite(description: WriteJobDescription)
-  extends BackendWrite
-  with Logging {
-
-  override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): 
Option[WriteTaskResult] = {
-    val numFiles = cb.numRows()
-    // Write an empty iterator
-    if (numFiles == 0) {
-      None
-    } else {
-      val file_col = cb.column(0)
-      val partition_col = cb.column(1)
-      val count_col = cb.column(2)
-
-      val outputPath = description.path
-      var updatedPartitions = Set.empty[String]
-      val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, 
String]()
-
-      val write_stats = Range(0, cb.numRows()).map {
-        i =>
-          val targetFileName = file_col.getUTF8String(i).toString
-          val partition = partition_col.getUTF8String(i).toString
-          if (partition != "__NO_PARTITION_ID__") {
-            updatedPartitions += partition
-            val tmpOutputPath = outputPath + "/" + partition + "/" + 
targetFileName
-            val customOutputPath =
-              description.customPartitionLocations.get(
-                PartitioningUtils.parsePathFragment(partition))
-            if (customOutputPath.isDefined) {
-              addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
-            }
-          }
-          count_col.getLong(i)
-      }
-
-      val partitionsInternalRows = updatedPartitions.map {
-        part =>
-          val parts = new Array[Any](1)
-          parts(0) = part
-          new GenericInternalRow(parts)
-      }.toSeq
-
-      val numWrittenRows = write_stats.sum
-      val stats = BasicWriteTaskStats(
-        partitions = partitionsInternalRows,
-        numFiles = numFiles,
-        numBytes = 101,
-        numRows = numWrittenRows)
-      val summary =
-        ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = 
Seq(stats))
-
-      Some(
-        WriteTaskResult(
-          new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
-          summary))
-    }
-  }
-}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 9e0d85e25..c1c5332d7 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -39,6 +39,8 @@ import org.apache.spark.shuffle.utils.ShuffleUtil
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
@@ -48,7 +50,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.datasources.WriteJobDescription
+import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BuildSideRelation, 
HashedRelationBroadcastMode}
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -555,9 +557,22 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       parameters: GenShuffleWriterParameters[K, V]): 
GlutenShuffleWriterWrapper[K, V] = {
     ShuffleUtil.genColumnarShuffleWriter(parameters)
   }
-
-  override def createBackendWrite(description: WriteJobDescription): 
BackendWrite = {
-    VeloxBackendWrite(description)
+  override def createColumnarWriteFilesExec(
+      child: SparkPlan,
+      noop: SparkPlan,
+      fileFormat: FileFormat,
+      partitionColumns: Seq[Attribute],
+      bucketSpec: Option[BucketSpec],
+      options: Map[String, String],
+      staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
+    VeloxColumnarWriteFilesExec(
+      child,
+      noop,
+      fileFormat,
+      partitionColumns,
+      bucketSpec,
+      options,
+      staticPartitions)
   }
 
   override def createColumnarArrowEvalPythonExec(
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
deleted file mode 100644
index 5d47aff04..000000000
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution
-
-import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.vectorized.ColumnarBatch
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-
-import scala.collection.mutable
-
-// Velox write files metrics start
-//
-// Follows the code in velox `HiveDataSink::close()`
-// The json can be as following:
-// {
-//     "inMemoryDataSizeInBytes":0,
-//     "containsNumberedFileNames":true,
-//     "onDiskDataSizeInBytes":307,
-//     "fileWriteInfos":[
-//         {
-//             "fileSize":307,
-//             "writeFileName":
-//                
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
-//             "targetFileName":
-//                
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
-//         }
-//     ],
-//     
"writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
-//     "rowCount":1,
-//     
"targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
-//     "updateMode":"NEW",
-//     "name":"part1=1/part2=1"
-// }
-case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, 
fileSize: Long)
-
-case class VeloxWriteFilesMetrics(
-    name: String,
-    updateMode: String,
-    writePath: String,
-    targetPath: String,
-    fileWriteInfos: Seq[VeloxWriteFilesInfo],
-    rowCount: Long,
-    inMemoryDataSizeInBytes: Long,
-    onDiskDataSizeInBytes: Long,
-    containsNumberedFileNames: Boolean)
-
-// Velox write files metrics end
-
-case class VeloxBackendWrite(description: WriteJobDescription) extends 
BackendWrite with Logging {
-
-  override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): 
Option[WriteTaskResult] = {
-    // Currently, the cb contains three columns: row, fragments, and context.
-    // The first row in the row column contains the number of written numRows.
-    // The fragments column contains detailed information about the file 
writes.
-    val loadedCb = 
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
-    assert(loadedCb.numCols() == 3)
-    val numWrittenRows = loadedCb.column(0).getLong(0)
-
-    var updatedPartitions = Set.empty[String]
-    val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, 
String]()
-    var numBytes = 0L
-    val objectMapper = new ObjectMapper()
-    objectMapper.registerModule(DefaultScalaModule)
-    for (i <- 0 until loadedCb.numRows() - 1) {
-      val fragments = loadedCb.column(1).getUTF8String(i + 1)
-      val metrics = objectMapper
-        .readValue(fragments.toString.getBytes("UTF-8"), 
classOf[VeloxWriteFilesMetrics])
-      logDebug(s"Velox write files metrics: $metrics")
-
-      val fileWriteInfos = metrics.fileWriteInfos
-      assert(fileWriteInfos.length == 1)
-      val fileWriteInfo = fileWriteInfos.head
-      numBytes += fileWriteInfo.fileSize
-      val targetFileName = fileWriteInfo.targetFileName
-      val outputPath = description.path
-
-      // part1=1/part2=1
-      val partitionFragment = metrics.name
-      // Write a partitioned table
-      if (partitionFragment != "") {
-        updatedPartitions += partitionFragment
-        val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + 
targetFileName
-        val customOutputPath = description.customPartitionLocations.get(
-          PartitioningUtils.parsePathFragment(partitionFragment))
-        if (customOutputPath.isDefined) {
-          addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
-        }
-      }
-    }
-
-    val numFiles = loadedCb.numRows() - 1
-    val partitionsInternalRows = updatedPartitions.map {
-      part =>
-        val parts = new Array[Any](1)
-        parts(0) = part
-        new GenericInternalRow(parts)
-    }.toSeq
-    val stats = BasicWriteTaskStats(
-      partitions = partitionsInternalRows,
-      numFiles = numFiles,
-      numBytes = numBytes,
-      numRows = numWrittenRows)
-    val summary =
-      ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = 
Seq(stats))
-
-    // Write an empty iterator
-    if (numFiles == 0) {
-      None
-    } else {
-      Some(
-        WriteTaskResult(
-          new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
-          summary))
-    }
-  }
-}
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
similarity index 56%
copy from 
gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
copy to 
backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
index b809a4b65..c339014c5 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
@@ -17,50 +17,146 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.extension.GlutenPlan
-import 
org.apache.gluten.extension.columnar.transition.Convention.{KnownRowType, 
RowType}
-import org.apache.gluten.extension.columnar.transition.ConventionReq
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
-import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 
 import org.apache.spark.{Partition, SparkException, TaskContext, 
TaskOutputFileAlreadyExistException}
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.rdd.RDD
 import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.Utils
 
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.hadoop.fs.FileAlreadyExistsException
 
 import java.util.Date
 
-/**
- * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write 
path before
- * initializing the native plan and collect native write files metrics for 
each backend.
- */
-trait BackendWrite {
-  def collectNativeWriteFilesMetrics(batch: ColumnarBatch): 
Option[WriteTaskResult]
-}
+import scala.collection.mutable
+
+// Velox write files metrics start
+//
+// Follows the code in velox `HiveDataSink::close()`
+// The json can be as following:
+// {
+//     "inMemoryDataSizeInBytes":0,
+//     "containsNumberedFileNames":true,
+//     "onDiskDataSizeInBytes":307,
+//     "fileWriteInfos":[
+//         {
+//             "fileSize":307,
+//             "writeFileName":
+//                
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
+//             "targetFileName":
+//                
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
+//         }
+//     ],
+//     
"writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+//     "rowCount":1,
+//     
"targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+//     "updateMode":"NEW",
+//     "name":"part1=1/part2=1"
+// }
+case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, 
fileSize: Long)
+
+case class VeloxWriteFilesMetrics(
+    name: String,
+    updateMode: String,
+    writePath: String,
+    targetPath: String,
+    fileWriteInfos: Seq[VeloxWriteFilesInfo],
+    rowCount: Long,
+    inMemoryDataSizeInBytes: Long,
+    onDiskDataSizeInBytes: Long,
+    containsNumberedFileNames: Boolean)
+
+// Velox write files metrics end
 
 /**
  * This RDD is used to make sure we have injected staging write path before 
initializing the native
  * plan, and support Spark file commit protocol.
  */
-class ColumnarWriteFilesRDD(
+class VeloxColumnarWriteFilesRDD(
     var prev: RDD[ColumnarBatch],
     description: WriteJobDescription,
     committer: FileCommitProtocol,
     jobTrackerID: String)
   extends RDD[WriterCommitMessage](prev) {
 
+  private def collectNativeWriteFilesMetrics(cb: ColumnarBatch): 
Option[WriteTaskResult] = {
+    // Currently, the cb contains three columns: row, fragments, and context.
+    // The first row in the row column contains the number of written numRows.
+    // The fragments column contains detailed information about the file 
writes.
+    val loadedCb = 
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
+    assert(loadedCb.numCols() == 3)
+    val numWrittenRows = loadedCb.column(0).getLong(0)
+
+    var updatedPartitions = Set.empty[String]
+    val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, 
String]()
+    var numBytes = 0L
+    val objectMapper = new ObjectMapper()
+    objectMapper.registerModule(DefaultScalaModule)
+    for (i <- 0 until loadedCb.numRows() - 1) {
+      val fragments = loadedCb.column(1).getUTF8String(i + 1)
+      val metrics = objectMapper
+        .readValue(fragments.toString.getBytes("UTF-8"), 
classOf[VeloxWriteFilesMetrics])
+      logDebug(s"Velox write files metrics: $metrics")
+
+      val fileWriteInfos = metrics.fileWriteInfos
+      assert(fileWriteInfos.length == 1)
+      val fileWriteInfo = fileWriteInfos.head
+      numBytes += fileWriteInfo.fileSize
+      val targetFileName = fileWriteInfo.targetFileName
+      val outputPath = description.path
+
+      // part1=1/part2=1
+      val partitionFragment = metrics.name
+      // Write a partitioned table
+      if (partitionFragment != "") {
+        updatedPartitions += partitionFragment
+        val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + 
targetFileName
+        val customOutputPath = description.customPartitionLocations.get(
+          PartitioningUtils.parsePathFragment(partitionFragment))
+        if (customOutputPath.isDefined) {
+          addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
+        }
+      }
+    }
+
+    val numFiles = loadedCb.numRows() - 1
+    val partitionsInternalRows = updatedPartitions.map {
+      part =>
+        val parts = new Array[Any](1)
+        parts(0) = part
+        new GenericInternalRow(parts)
+    }.toSeq
+    val stats = BasicWriteTaskStats(
+      partitions = partitionsInternalRows,
+      numFiles = numFiles,
+      numBytes = numBytes,
+      numRows = numWrittenRows)
+    val summary =
+      ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = 
Seq(stats))
+
+    // Write an empty iterator
+    if (numFiles == 0) {
+      None
+    } else {
+      Some(
+        WriteTaskResult(
+          new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+          summary))
+    }
+  }
+
   private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
     val stats = 
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
     val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
@@ -94,25 +190,21 @@ class ColumnarWriteFilesRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[WriterCommitMessage] = {
     val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, 
description, committer)
-    val backendWrite =
-      
BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description)
 
     commitProtocol.setupTask()
     val writePath = commitProtocol.newTaskAttemptTempPath()
-    val writeFileName = commitProtocol.getFilename
-    logDebug(s"Native staging write path: $writePath and file name: 
$writeFileName")
-
+    logDebug(s"Velox staging write path: $writePath")
     var writeTaskResult: WriteTaskResult = null
     try {
       Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-        
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
writeFileName)
+        
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
"")
 
         // Initialize the native plan
         val iter = firstParent[ColumnarBatch].iterator(split, context)
         assert(iter.hasNext)
         val resultColumnarBatch = iter.next()
         assert(resultColumnarBatch != null)
-        val nativeWriteTaskResult = 
backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch)
+        val nativeWriteTaskResult = 
collectNativeWriteFilesMetrics(resultColumnarBatch)
         if (nativeWriteTaskResult.isEmpty) {
           // If we are writing an empty iterator, then velox would do nothing.
           // Here we fallback to use vanilla Spark write files to generate an 
empty file for
@@ -155,11 +247,7 @@ class ColumnarWriteFilesRDD(
   }
 }
 
-// The class inherits from "BinaryExecNode" instead of "UnaryExecNode" because
-// we need to expose a dummy child (as right child) with type "WriteFilesExec" 
to let Spark
-// choose the new write code path (version >= 3.4). The actual plan to write 
is the left child
-// of this operator.
-case class ColumnarWriteFilesExec private (
+case class VeloxColumnarWriteFilesExec private (
     override val left: SparkPlan,
     override val right: SparkPlan,
     fileFormat: FileFormat,
@@ -167,46 +255,7 @@ case class ColumnarWriteFilesExec private (
     bucketSpec: Option[BucketSpec],
     options: Map[String, String],
     staticPartitions: TablePartitionSpec)
-  extends BinaryExecNode
-  with GlutenPlan
-  with ColumnarWriteFilesExec.ExecuteWriteCompatible {
-
-  val child: SparkPlan = left
-
-  override lazy val references: AttributeSet = AttributeSet.empty
-
-  override def supportsColumnar(): Boolean = true
-
-  override def output: Seq[Attribute] = Seq.empty
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    throw new GlutenException(s"$nodeName does not support doExecute")
-  }
-
-  /** Fallback to use vanilla Spark write files to generate an empty file for 
metadata only. */
-  private def writeFilesForEmptyRDD(
-      description: WriteJobDescription,
-      committer: FileCommitProtocol,
-      jobTrackerID: String): RDD[WriterCommitMessage] = {
-    val rddWithNonEmptyPartitions = 
session.sparkContext.parallelize(Seq.empty[InternalRow], 1)
-    rddWithNonEmptyPartitions.mapPartitionsInternal {
-      iterator =>
-        val sparkStageId = TaskContext.get().stageId()
-        val sparkPartitionId = TaskContext.get().partitionId()
-        val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
-
-        val ret = SparkShimLoader.getSparkShims.writeFilesExecuteTask(
-          description,
-          jobTrackerID,
-          sparkStageId,
-          sparkPartitionId,
-          sparkAttemptNumber,
-          committer,
-          iterator
-        )
-        Iterator(ret)
-    }
-  }
+  extends ColumnarWriteFilesExec(left, right) {
 
   override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): 
RDD[WriterCommitMessage] = {
     assert(child.supportsColumnar)
@@ -220,76 +269,12 @@ case class ColumnarWriteFilesExec private (
       // partition rdd to make sure we at least set up one write task to write 
the metadata.
       writeFilesForEmptyRDD(description, committer, jobTrackerID)
     } else {
-      new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
+      new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
     }
   }
+
   override protected def withNewChildrenInternal(
       newLeft: SparkPlan,
       newRight: SparkPlan): SparkPlan =
     copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
 }
-
-object ColumnarWriteFilesExec {
-
-  def apply(
-      child: SparkPlan,
-      fileFormat: FileFormat,
-      partitionColumns: Seq[Attribute],
-      bucketSpec: Option[BucketSpec],
-      options: Map[String, String],
-      staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
-    // This is a workaround for FileFormatWriter#write. Vanilla Spark (version 
>= 3.4) requires for
-    // a plan that has at least one node exactly of type `WriteFilesExec` that 
is a Scala
-    // case-class, to decide to choose new `#executeWrite` code path over the 
legacy `#execute`
-    // for write operation.
-    //
-    // So we add a no-op `WriteFilesExec` child to let Spark pick the new code 
path.
-    //
-    // See: FileFormatWriter#write
-    // See: V1Writes#getWriteFilesOpt
-    val right: SparkPlan =
-      WriteFilesExec(
-        NoopLeaf(),
-        fileFormat,
-        partitionColumns,
-        bucketSpec,
-        options,
-        staticPartitions)
-
-    ColumnarWriteFilesExec(
-      child,
-      right,
-      fileFormat,
-      partitionColumns,
-      bucketSpec,
-      options,
-      staticPartitions)
-  }
-
-  private case class NoopLeaf() extends LeafExecNode {
-    override protected def doExecute(): RDD[InternalRow] =
-      throw new GlutenException(s"$nodeName does not support doExecute")
-    override def output: Seq[Attribute] = Seq.empty
-  }
-
-  /**
-   * ColumnarWriteFilesExec neither output Row nor columnar data. We output 
both row and columnar to
-   * avoid c2r and r2c transitions. Please note, [[GlutenPlan]] already 
implement batchType()
-   */
-  sealed trait ExecuteWriteCompatible extends KnownChildrenConventions with 
KnownRowType {
-    // To be compatible with Spark (version < 3.4)
-    protected def doExecuteWrite(writeFilesSpec: WriteFilesSpec): 
RDD[WriterCommitMessage] = {
-      throw new GlutenException(
-        s"Internal Error ${this.getClass} has write support" +
-          s" mismatch:\n${this}")
-    }
-
-    override def requiredChildrenConventions(): Seq[ConventionReq] = {
-      List(ConventionReq.backendBatch)
-    }
-
-    override def rowType(): RowType = {
-      RowType.VanillaRow
-    }
-  }
-}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 8f24afae1..c37c55a5f 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -40,8 +40,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{BackendWrite, ColumnarWriteFilesExec, 
FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
-import org.apache.spark.sql.execution.datasources.{FileFormat, 
WriteJobDescription}
+import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, 
FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.BuildSideRelation
@@ -385,22 +385,12 @@ trait SparkPlanExecApi {
   /** Create ColumnarWriteFilesExec */
   def createColumnarWriteFilesExec(
       child: SparkPlan,
+      noop: SparkPlan,
       fileFormat: FileFormat,
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       options: Map[String, String],
-      staticPartitions: TablePartitionSpec): SparkPlan = {
-    ColumnarWriteFilesExec(
-      child,
-      fileFormat,
-      partitionColumns,
-      bucketSpec,
-      options,
-      staticPartitions)
-  }
-
-  /** Create BackendWrite */
-  def createBackendWrite(description: WriteJobDescription): BackendWrite
+      staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec
 
   /** Create ColumnarArrowEvalPythonExec, for velox backend */
   def createColumnarArrowEvalPythonExec(
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 1d4e11d91..a8cc79128 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -457,7 +457,8 @@ object OffloadOthers {
             plan.bucketSpec,
             plan.options,
             plan.staticPartitions)
-          
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarWriteFilesExec(
+
+          ColumnarWriteFilesExec(
             writeTransformer,
             plan.fileFormat,
             plan.partitionColumns,
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index b809a4b65..7f6fd01ac 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -24,149 +24,23 @@ import 
org.apache.gluten.extension.columnar.transition.ConventionReq
 import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
 import org.apache.gluten.sql.shims.SparkShimLoader
 
-import org.apache.spark.{Partition, SparkException, TaskContext, 
TaskOutputFileAlreadyExistException}
-import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.io.FileCommitProtocol
 import org.apache.spark.rdd.RDD
-import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.Utils
-
-import org.apache.hadoop.fs.FileAlreadyExistsException
-
-import java.util.Date
-
-/**
- * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write 
path before
- * initializing the native plan and collect native write files metrics for 
each backend.
- */
-trait BackendWrite {
-  def collectNativeWriteFilesMetrics(batch: ColumnarBatch): 
Option[WriteTaskResult]
-}
-
-/**
- * This RDD is used to make sure we have injected staging write path before 
initializing the native
- * plan, and support Spark file commit protocol.
- */
-class ColumnarWriteFilesRDD(
-    var prev: RDD[ColumnarBatch],
-    description: WriteJobDescription,
-    committer: FileCommitProtocol,
-    jobTrackerID: String)
-  extends RDD[WriterCommitMessage](prev) {
-
-  private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
-    val stats = 
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
-    val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
-    // Reports bytesWritten and recordsWritten to the Spark output metrics.
-    // We should update it after calling `commitTask` to overwrite the metrics.
-    Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach {
-      outputMetrics =>
-        outputMetrics.setBytesWritten(numBytes)
-        outputMetrics.setRecordsWritten(numWrittenRows)
-    }
-  }
-
-  private def writeFilesForEmptyIterator(
-      commitProtocol: SparkWriteFilesCommitProtocol): WriteTaskResult = {
-    val taskAttemptContext = commitProtocol.taskAttemptContext
-
-    val dataWriter =
-      if (commitProtocol.sparkPartitionId != 0) {
-        // In case of empty job, leave first partition to save meta for file 
format like parquet.
-        new EmptyDirectoryDataWriter(description, taskAttemptContext, 
committer)
-      } else if (description.partitionColumns.isEmpty) {
-        new SingleDirectoryDataWriter(description, taskAttemptContext, 
committer)
-      } else {
-        new DynamicPartitionDataSingleWriter(description, taskAttemptContext, 
committer)
-      }
-
-    // We have done `setupTask` outside
-    dataWriter.writeWithIterator(Iterator.empty)
-    dataWriter.commit()
-  }
-
-  override def compute(split: Partition, context: TaskContext): 
Iterator[WriterCommitMessage] = {
-    val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, 
description, committer)
-    val backendWrite =
-      
BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description)
-
-    commitProtocol.setupTask()
-    val writePath = commitProtocol.newTaskAttemptTempPath()
-    val writeFileName = commitProtocol.getFilename
-    logDebug(s"Native staging write path: $writePath and file name: 
$writeFileName")
-
-    var writeTaskResult: WriteTaskResult = null
-    try {
-      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-        
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
writeFileName)
-
-        // Initialize the native plan
-        val iter = firstParent[ColumnarBatch].iterator(split, context)
-        assert(iter.hasNext)
-        val resultColumnarBatch = iter.next()
-        assert(resultColumnarBatch != null)
-        val nativeWriteTaskResult = 
backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch)
-        if (nativeWriteTaskResult.isEmpty) {
-          // If we are writing an empty iterator, then velox would do nothing.
-          // Here we fallback to use vanilla Spark write files to generate an 
empty file for
-          // metadata only.
-          writeTaskResult = writeFilesForEmptyIterator(commitProtocol)
-          // We have done commit task inside `writeFilesForEmptyIterator`.
-        } else {
-          writeTaskResult = nativeWriteTaskResult.get
-          commitProtocol.commitTask()
-        }
-      })(
-        catchBlock = {
-          // If there is an error, abort the task
-          commitProtocol.abortTask()
-          logError(s"Job ${commitProtocol.getJobId} aborted.")
-        }
-      )
-    } catch {
-      case e: FetchFailedException =>
-        throw e
-      case f: FileAlreadyExistsException if 
SQLConf.get.fastFailFileFormatOutput =>
-        throw new TaskOutputFileAlreadyExistException(f)
-      case t: Throwable =>
-        throw new SparkException(
-          s"Task failed while writing rows to staging path: $writePath, " +
-            s"output path: ${description.path}",
-          t)
-    }
-
-    assert(writeTaskResult != null)
-    reportTaskMetrics(writeTaskResult)
-    Iterator.single(writeTaskResult)
-  }
-
-  override protected def getPartitions: Array[Partition] = 
firstParent[ColumnarBatch].partitions
-
-  override def clearDependencies(): Unit = {
-    super.clearDependencies()
-    prev = null
-  }
-}
 
 // The class inherits from "BinaryExecNode" instead of "UnaryExecNode" because
 // we need to expose a dummy child (as right child) with type "WriteFilesExec" 
to let Spark
 // choose the new write code path (version >= 3.4). The actual plan to write 
is the left child
 // of this operator.
-case class ColumnarWriteFilesExec private (
+abstract class ColumnarWriteFilesExec protected (
     override val left: SparkPlan,
-    override val right: SparkPlan,
-    fileFormat: FileFormat,
-    partitionColumns: Seq[Attribute],
-    bucketSpec: Option[BucketSpec],
-    options: Map[String, String],
-    staticPartitions: TablePartitionSpec)
+    override val right: SparkPlan)
   extends BinaryExecNode
   with GlutenPlan
   with ColumnarWriteFilesExec.ExecuteWriteCompatible {
@@ -184,7 +58,7 @@ case class ColumnarWriteFilesExec private (
   }
 
   /** Fallback to use vanilla Spark write files to generate an empty file for 
metadata only. */
-  private def writeFilesForEmptyRDD(
+  protected def writeFilesForEmptyRDD(
       description: WriteJobDescription,
       committer: FileCommitProtocol,
       jobTrackerID: String): RDD[WriterCommitMessage] = {
@@ -208,25 +82,10 @@ case class ColumnarWriteFilesExec private (
     }
   }
 
+  /** We need this to avoid compiler error. */
   override def doExecuteWrite(writeFilesSpec: WriteFilesSpec): 
RDD[WriterCommitMessage] = {
-    assert(child.supportsColumnar)
-
-    val rdd = child.executeColumnar()
-    val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
-    val description = writeFilesSpec.description
-    val committer = writeFilesSpec.committer
-    if (rdd.partitions.length == 0) {
-      // SPARK-23271 If we are attempting to write a zero partition rdd, 
create a dummy single
-      // partition rdd to make sure we at least set up one write task to write 
the metadata.
-      writeFilesForEmptyRDD(description, committer, jobTrackerID)
-    } else {
-      new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
-    }
+    super.doExecuteWrite(writeFilesSpec)
   }
-  override protected def withNewChildrenInternal(
-      newLeft: SparkPlan,
-      newRight: SparkPlan): SparkPlan =
-    copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
 }
 
 object ColumnarWriteFilesExec {
@@ -256,7 +115,7 @@ object ColumnarWriteFilesExec {
         options,
         staticPartitions)
 
-    ColumnarWriteFilesExec(
+    
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarWriteFilesExec(
       child,
       right,
       fileFormat,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to