This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 206e4be5f [GLUTEN-6067][VL] [Part 3-1] Refactor: Rename 
VeloxColumnarWriteFilesExec to ColumnarWriteFilesExec (#6403)
206e4be5f is described below

commit 206e4be5f672249c0e987c5de1f49ac38caec7b0
Author: Chang chen <[email protected]>
AuthorDate: Fri Jul 19 16:46:51 2024 +0800

    [GLUTEN-6067][VL] [Part 3-1] Refactor: Rename VeloxColumnarWriteFilesExec 
to ColumnarWriteFilesExec (#6403)
    
    * Rename VeloxColumnarWriteFilesExec to GlutenColumnarWriteFilesExec, and 
move it to gluten-core
    
    1. Return GlutenColumnarWriteFilesExec at SparkPlanExecApi
    2. Move SparkWriteFilesCommitProtocol to gluten-core
    3. SparkWriteFilesCommitProtocol support getFilename from internal commiter
    4. Remove supportTransformWriteFiles from BackendSettingsApi
    5. injectWriteFilesTempPath with fileName
    
    * support pass format to backend
    
    * Rename GlutenColumnarWriteFilesExec to ColumnarWriteFilesExec and 
GlutenColumnarWriteFilesRDD to ColumnarWriteFilesRDD
---
 .../vectorized/CHNativeExpressionEvaluator.java    |   5 +
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   9 --
 .../backendsapi/clickhouse/CHIteratorApi.scala     |   4 +
 .../clickhouse/CHSparkPlanExecApi.scala            |  21 +--
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   2 -
 .../backendsapi/velox/VeloxIteratorApi.scala       |   4 +-
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  20 +--
 .../spark/sql/execution/VeloxBackendWrite.scala    | 138 ++++++++++++++++++++
 .../execution/VeloxParquetWriteForHiveSuite.scala  |   4 +-
 .../gluten/backendsapi/BackendSettingsApi.scala    |   2 -
 .../apache/gluten/backendsapi/IteratorApi.scala    |   9 +-
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  17 ++-
 .../CartesianProductExecTransformer.scala          |  10 +-
 .../org/apache/gluten/execution/JoinUtils.scala    |  55 ++++----
 .../execution/WriteFilesExecTransformer.scala      |  41 +++---
 .../extension/columnar/validator/Validators.scala  |   3 +-
 .../org/apache/gluten/utils/SubstraitUtil.scala    |  29 +++++
 .../sql/execution/ColumnarWriteFilesExec.scala     | 143 ++++-----------------
 .../execution/SparkWriteFilesCommitProtocol.scala  |  23 +++-
 .../datasources/GlutenV1WriteCommandSuite.scala    |  10 +-
 .../spark/sql/sources/GlutenInsertSuite.scala      |  20 +--
 .../gluten/GlutenColumnarWriteTestSupport.scala    |   6 +-
 22 files changed, 317 insertions(+), 258 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
index b8b4138dc..45a9f01f7 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java
@@ -81,6 +81,11 @@ public class CHNativeExpressionEvaluator {
         BackendsApiManager.getSettings().getBackendConfigPrefix(), 
SQLConf.get().getAllConfs());
   }
 
+  public static void injectWriteFilesTempPath(String path, String fileName) {
+    throw new UnsupportedOperationException(
+        "injectWriteFilesTempPath Not supported in 
CHNativeExpressionEvaluator");
+  }
+
   // Used by WholeStageTransform to create the native computing pipeline and
   // return a columnar result iterator.
   public BatchIterator createKernelWithBatchIterator(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 341a3e0f0..07129e69a 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -25,13 +25,11 @@ import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
-import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
 
@@ -286,13 +284,6 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
       .getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, 
GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT)
   }
 
-  override def supportWriteFilesExec(
-      format: FileFormat,
-      fields: Array[StructField],
-      bucketSpec: Option[BucketSpec],
-      options: Map[String, String]): ValidationResult =
-    ValidationResult.failed("CH backend is unsupported.")
-
   override def enableNativeWriteFiles(): Boolean = {
     GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 4b9ec7390..6c86583f4 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -290,6 +290,10 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       None,
       createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, 
inputIterators))
   }
+
+  override def injectWriteFilesTempPath(path: String, fileName: String): Unit 
= {
+    CHNativeExpressionEvaluator.injectWriteFilesTempPath(path, fileName)
+  }
 }
 
 class CollectMetricIterator(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 547458d8c..920c61cd4 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -37,8 +37,6 @@ import org.apache.spark.shuffle.{GenShuffleWriterParameters, 
GlutenShuffleWriter
 import org.apache.spark.shuffle.utils.CHShuffleUtil
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, 
EqualToRewrite}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
CollectList, CollectSet}
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
@@ -49,7 +47,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.delta.files.TahoeFileIndex
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
-import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
HadoopFsRelation, WriteJobDescription}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
@@ -145,10 +143,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
     }
 
     child match {
-      case scan: FileSourceScanExec if 
(checkMergeTreeFileFormat(scan.relation)) =>
+      case scan: FileSourceScanExec if checkMergeTreeFileFormat(scan.relation) 
=>
         // For the validation phase of the AddFallbackTagRule
         CHFilterExecTransformer(condition, child)
-      case scan: FileSourceScanExecTransformerBase if 
(checkMergeTreeFileFormat(scan.relation)) =>
+      case scan: FileSourceScanExecTransformerBase if 
checkMergeTreeFileFormat(scan.relation) =>
         // For the transform phase, the FileSourceScanExec is already 
transformed
         CHFilterExecTransformer(condition, child)
       case _ =>
@@ -395,7 +393,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
       left: ExpressionTransformer,
       right: ExpressionTransformer,
       original: GetMapValue): ExpressionTransformer =
-    GetMapValueTransformer(substraitExprName, left, right, false, original)
+    GetMapValueTransformer(substraitExprName, left, right, failOnError = 
false, original)
 
   /**
    * Generate ShuffleDependency for ColumnarShuffleExchangeExec.
@@ -669,15 +667,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
     CHRegExpReplaceTransformer(substraitExprName, children, expr)
   }
 
-  override def createColumnarWriteFilesExec(
-      child: SparkPlan,
-      fileFormat: FileFormat,
-      partitionColumns: Seq[Attribute],
-      bucketSpec: Option[BucketSpec],
-      options: Map[String, String],
-      staticPartitions: TablePartitionSpec): SparkPlan = {
-    throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support 
in ch backend.")
-  }
+  def createBackendWrite(description: WriteJobDescription): BackendWrite =
+    throw new UnsupportedOperationException("createBackendWrite is not 
supported in ch backend.")
 
   override def createColumnarArrowEvalPythonExec(
       udfs: Seq[PythonUDF],
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index b0692816a..933947301 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -495,8 +495,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def staticPartitionWriteOnly(): Boolean = true
 
-  override def supportTransformWriteFiles: Boolean = true
-
   override def allowDecimalArithmetic: Boolean = true
 
   override def enableNativeWriteFiles(): Boolean = {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
index 613e53945..1c7f91348 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala
@@ -161,7 +161,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     (paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, 
metadataColumns)
   }
 
-  override def injectWriteFilesTempPath(path: String): Unit = {
+  override def injectWriteFilesTempPath(path: String, fileName: String): Unit 
= {
     val transKernel = NativePlanEvaluator.create()
     transKernel.injectWriteFilesTempPath(path)
   }
@@ -171,7 +171,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
       inputPartition: BaseGlutenPartition,
       context: TaskContext,
       pipelineTime: SQLMetric,
-      updateInputMetrics: (InputMetricsWrapper) => Unit,
+      updateInputMetrics: InputMetricsWrapper => Unit,
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
       inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): 
Iterator[ColumnarBatch] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 37b46df3e..098d65a96 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -39,8 +39,6 @@ import org.apache.spark.shuffle.utils.ShuffleUtil
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.BuildSide
@@ -50,7 +48,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.WriteJobDescription
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BuildSideRelation, 
HashedRelationBroadcastMode}
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -550,20 +548,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
     ShuffleUtil.genColumnarShuffleWriter(parameters)
   }
 
-  override def createColumnarWriteFilesExec(
-      child: SparkPlan,
-      fileFormat: FileFormat,
-      partitionColumns: Seq[Attribute],
-      bucketSpec: Option[BucketSpec],
-      options: Map[String, String],
-      staticPartitions: TablePartitionSpec): SparkPlan = {
-    VeloxColumnarWriteFilesExec(
-      child,
-      fileFormat,
-      partitionColumns,
-      bucketSpec,
-      options,
-      staticPartitions)
+  override def createBackendWrite(description: WriteJobDescription): 
BackendWrite = {
+    VeloxBackendWrite(description)
   }
 
   override def createColumnarArrowEvalPythonExec(
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
new file mode 100644
index 000000000..5d47aff04
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import scala.collection.mutable
+
+// Velox write files metrics start
+//
+// Follows the code in velox `HiveDataSink::close()`
+// The json can be as following:
+// {
+//     "inMemoryDataSizeInBytes":0,
+//     "containsNumberedFileNames":true,
+//     "onDiskDataSizeInBytes":307,
+//     "fileWriteInfos":[
+//         {
+//             "fileSize":307,
+//             "writeFileName":
+//                
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
+//             "targetFileName":
+//                
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
+//         }
+//     ],
+//     
"writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+//     "rowCount":1,
+//     
"targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
+//     "updateMode":"NEW",
+//     "name":"part1=1/part2=1"
+// }
+case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, 
fileSize: Long)
+
+case class VeloxWriteFilesMetrics(
+    name: String,
+    updateMode: String,
+    writePath: String,
+    targetPath: String,
+    fileWriteInfos: Seq[VeloxWriteFilesInfo],
+    rowCount: Long,
+    inMemoryDataSizeInBytes: Long,
+    onDiskDataSizeInBytes: Long,
+    containsNumberedFileNames: Boolean)
+
+// Velox write files metrics end
+
+case class VeloxBackendWrite(description: WriteJobDescription) extends 
BackendWrite with Logging {
+
+  override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): 
Option[WriteTaskResult] = {
+    // Currently, the cb contains three columns: row, fragments, and context.
+    // The first row in the row column contains the number of written numRows.
+    // The fragments column contains detailed information about the file 
writes.
+    val loadedCb = 
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
+    assert(loadedCb.numCols() == 3)
+    val numWrittenRows = loadedCb.column(0).getLong(0)
+
+    var updatedPartitions = Set.empty[String]
+    val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, 
String]()
+    var numBytes = 0L
+    val objectMapper = new ObjectMapper()
+    objectMapper.registerModule(DefaultScalaModule)
+    for (i <- 0 until loadedCb.numRows() - 1) {
+      val fragments = loadedCb.column(1).getUTF8String(i + 1)
+      val metrics = objectMapper
+        .readValue(fragments.toString.getBytes("UTF-8"), 
classOf[VeloxWriteFilesMetrics])
+      logDebug(s"Velox write files metrics: $metrics")
+
+      val fileWriteInfos = metrics.fileWriteInfos
+      assert(fileWriteInfos.length == 1)
+      val fileWriteInfo = fileWriteInfos.head
+      numBytes += fileWriteInfo.fileSize
+      val targetFileName = fileWriteInfo.targetFileName
+      val outputPath = description.path
+
+      // part1=1/part2=1
+      val partitionFragment = metrics.name
+      // Write a partitioned table
+      if (partitionFragment != "") {
+        updatedPartitions += partitionFragment
+        val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + 
targetFileName
+        val customOutputPath = description.customPartitionLocations.get(
+          PartitioningUtils.parsePathFragment(partitionFragment))
+        if (customOutputPath.isDefined) {
+          addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
+        }
+      }
+    }
+
+    val numFiles = loadedCb.numRows() - 1
+    val partitionsInternalRows = updatedPartitions.map {
+      part =>
+        val parts = new Array[Any](1)
+        parts(0) = part
+        new GenericInternalRow(parts)
+    }.toSeq
+    val stats = BasicWriteTaskStats(
+      partitions = partitionsInternalRows,
+      numFiles = numFiles,
+      numBytes = numBytes,
+      numRows = numWrittenRows)
+    val summary =
+      ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = 
Seq(stats))
+
+    // Write an empty iterator
+    if (numFiles == 0) {
+      None
+    } else {
+      Some(
+        WriteTaskResult(
+          new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+          summary))
+    }
+  }
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
index 731f5ef48..412548de9 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.util.QueryExecutionListener
 
 class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
-  private var _spark: SparkSession = null
+  private var _spark: SparkSession = _
 
   override protected def beforeAll(): Unit = {
     super.beforeAll()
@@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest 
with SQLTestUtils {
       override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
         if (!nativeUsed) {
           nativeUsed = if (isSparkVersionGE("3.4")) {
-            
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined
+            
qe.executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined
           } else {
             qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
           }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 8b4c18b01..358043cc5 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -129,8 +129,6 @@ trait BackendSettingsApi {
 
   def staticPartitionWriteOnly(): Boolean = false
 
-  def supportTransformWriteFiles: Boolean = false
-
   def requiredInputFilePaths(): Boolean = false
 
   // TODO: Move this to test settings as used in UT only.
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
index 53dc8f478..495b91c50 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala
@@ -47,8 +47,13 @@ trait IteratorApi {
   /**
    * Inject the task attempt temporary path for native write files, this 
method should be called
    * before `genFirstStageIterator` or `genFinalStageIterator`
+   * @param path
+   *   is the temporary directory for native write pipeline
+   * @param fileName
+   *   is the file name for native write pipeline, backend could generate it 
by itself.
    */
-  def injectWriteFilesTempPath(path: String): Unit = throw new 
UnsupportedOperationException()
+  def injectWriteFilesTempPath(path: String, fileName: String): Unit =
+    throw new UnsupportedOperationException()
 
   /**
    * Generate Iterator[ColumnarBatch] for first stage. ("first" means it does 
not depend on other
@@ -58,7 +63,7 @@ trait IteratorApi {
       inputPartition: BaseGlutenPartition,
       context: TaskContext,
       pipelineTime: SQLMetric,
-      updateInputMetrics: (InputMetricsWrapper) => Unit,
+      updateInputMetrics: InputMetricsWrapper => Unit,
       updateNativeMetrics: IMetrics => Unit,
       partitionIndex: Int,
       inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 58a08192d..273443f64 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{FileSourceScanExec, GenerateExec, 
LeafExecNode, SparkPlan}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.{BackendWrite, ColumnarWriteFilesExec, 
FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
WriteJobDescription}
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.BuildSideRelation
@@ -388,7 +388,18 @@ trait SparkPlanExecApi {
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       options: Map[String, String],
-      staticPartitions: TablePartitionSpec): SparkPlan
+      staticPartitions: TablePartitionSpec): SparkPlan = {
+    ColumnarWriteFilesExec(
+      child,
+      fileFormat,
+      partitionColumns,
+      bucketSpec,
+      options,
+      staticPartitions)
+  }
+
+  /** Create BackendWrite */
+  def createBackendWrite(description: WriteJobDescription): BackendWrite
 
   /** Create ColumnarArrowEvalPythonExec, for velox backend */
   def createColumnarArrowEvalPythonExec(
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 0dd110fa5..28bf1eeab 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -86,12 +86,10 @@ case class CartesianProductExecTransformer(
     val (inputRightRelNode, inputRightOutput) =
       (rightPlanContext.root, rightPlanContext.outputAttributes)
 
-    val expressionNode = condition.map {
-      expr =>
-        ExpressionConverter
-          .replaceWithExpressionTransformer(expr, inputLeftOutput ++ 
inputRightOutput)
-          .doTransform(context.registeredFunction)
-    }
+    val expressionNode =
+      condition.map {
+        SubstraitUtil.toSubstraitExpression(_, inputLeftOutput ++ 
inputRightOutput, context)
+      }
 
     val extensionNode =
       JoinUtils.createExtensionNode(inputLeftOutput ++ inputRightOutput, 
validation = false)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala 
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
index 9dd73800e..12d085185 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinUtils.scala
@@ -16,13 +16,12 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.expression.{AttributeReferenceTransformer, 
ConverterUtils, ExpressionConverter}
-import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.expression.{AttributeReferenceTransformer, 
ExpressionConverter}
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode}
 import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, 
ExtensionBuilder}
 import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+import org.apache.gluten.utils.SubstraitUtil
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.plans._
@@ -34,21 +33,11 @@ import io.substrait.proto.{CrossRel, JoinRel}
 import scala.collection.JavaConverters._
 
 object JoinUtils {
-  private def createEnhancement(output: Seq[Attribute]): 
com.google.protobuf.Any = {
-    val inputTypeNodes = output.map {
-      attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)
-    }
-    // Normally the enhancement node is only used for plan validation. But 
here the enhancement
-    // is also used in execution phase. In this case an empty typeUrlPrefix 
need to be passed,
-    // so that it can be correctly parsed into json string on the cpp side.
-    BackendsApiManager.getTransformerApiInstance.packPBMessage(
-      TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf)
-  }
 
   def createExtensionNode(output: Seq[Attribute], validation: Boolean): 
AdvancedExtensionNode = {
     // Use field [enhancement] in a extension node for input type validation.
     if (validation) {
-      ExtensionBuilder.makeAdvancedExtension(createEnhancement(output))
+      
ExtensionBuilder.makeAdvancedExtension(SubstraitUtil.createEnhancement(output))
     } else {
       null
     }
@@ -58,7 +47,7 @@ object JoinUtils {
     !keyExprs.forall(_.isInstanceOf[AttributeReference])
   }
 
-  def createPreProjectionIfNeeded(
+  private def createPreProjectionIfNeeded(
       keyExprs: Seq[Expression],
       inputNode: RelNode,
       inputNodeOutput: Seq[Attribute],
@@ -131,17 +120,17 @@ object JoinUtils {
     }
   }
 
-  def createJoinExtensionNode(
+  private def createJoinExtensionNode(
       joinParameters: Any,
       output: Seq[Attribute]): AdvancedExtensionNode = {
     // Use field [optimization] in a extension node
     // to send some join parameters through Substrait plan.
-    val enhancement = createEnhancement(output)
+    val enhancement = SubstraitUtil.createEnhancement(output)
     ExtensionBuilder.makeAdvancedExtension(joinParameters, enhancement)
   }
 
   // Return the direct join output.
-  protected def getDirectJoinOutput(
+  private def getDirectJoinOutput(
       joinType: JoinType,
       leftOutput: Seq[Attribute],
       rightOutput: Seq[Attribute]): (Seq[Attribute], Seq[Attribute]) = {
@@ -164,7 +153,7 @@ object JoinUtils {
     }
   }
 
-  protected def getDirectJoinOutputSeq(
+  private def getDirectJoinOutputSeq(
       joinType: JoinType,
       leftOutput: Seq[Attribute],
       rightOutput: Seq[Attribute]): Seq[Attribute] = {
@@ -209,8 +198,8 @@ object JoinUtils {
       validation)
 
     // Combine join keys to make a single expression.
-    val joinExpressionNode = (streamedKeys
-      .zip(buildKeys))
+    val joinExpressionNode = streamedKeys
+      .zip(buildKeys)
       .map {
         case ((leftKey, leftType), (rightKey, rightType)) =>
           HashJoinLikeExecTransformer.makeEqualToExpression(
@@ -225,12 +214,10 @@ object JoinUtils {
           HashJoinLikeExecTransformer.makeAndExpression(l, r, 
substraitContext.registeredFunction))
 
     // Create post-join filter, which will be computed in hash join.
-    val postJoinFilter = condition.map {
-      expr =>
-        ExpressionConverter
-          .replaceWithExpressionTransformer(expr, streamedOutput ++ 
buildOutput)
-          .doTransform(substraitContext.registeredFunction)
-    }
+    val postJoinFilter =
+      condition.map {
+        SubstraitUtil.toSubstraitExpression(_, streamedOutput ++ buildOutput, 
substraitContext)
+      }
 
     // Create JoinRel.
     val joinRel = RelBuilder.makeJoinRel(
@@ -340,12 +327,14 @@ object JoinUtils {
       joinParameters: Any,
       validation: Boolean = false
   ): RelNode = {
-    val expressionNode = condition.map {
-      expr =>
-        ExpressionConverter
-          .replaceWithExpressionTransformer(expr, inputStreamedOutput ++ 
inputBuildOutput)
-          .doTransform(substraitContext.registeredFunction)
-    }
+    val expressionNode =
+      condition.map {
+        SubstraitUtil.toSubstraitExpression(
+          _,
+          inputStreamedOutput ++ inputBuildOutput,
+          substraitContext)
+      }
+
     val extensionNode =
       createJoinExtensionNode(joinParameters, inputStreamedOutput ++ 
inputBuildOutput)
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index d78f21bea..d2ec994ba 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -21,10 +21,11 @@ import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.expression.ConverterUtils
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.metrics.MetricsUpdater
-import org.apache.gluten.substrait.`type`.{ColumnTypeNode, TypeBuilder}
+import org.apache.gluten.substrait.`type`.ColumnTypeNode
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.extensions.ExtensionBuilder
 import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+import org.apache.gluten.utils.SubstraitUtil
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -32,7 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types.MetadataBuilder
 
 import com.google.protobuf.{Any, StringValue}
@@ -40,7 +43,6 @@ import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import java.util.Locale
 
-import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
 
 /**
@@ -56,7 +58,7 @@ case class WriteFilesExecTransformer(
     staticPartitions: TablePartitionSpec)
   extends UnaryTransformSupport {
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
-  @transient override lazy val metrics =
+  @transient override lazy val metrics: Map[String, SQLMetric] =
     
BackendsApiManager.getMetricsApiInstance.genWriteFilesTransformerMetrics(sparkContext)
 
   override def metricsUpdater(): MetricsUpdater =
@@ -66,11 +68,18 @@ case class WriteFilesExecTransformer(
 
   private val caseInsensitiveOptions = CaseInsensitiveMap(options)
 
-  def genWriteParameters(): Any = {
+  private def genWriteParameters(): Any = {
+    val fileFormatStr = fileFormat match {
+      case register: DataSourceRegister =>
+        register.shortName
+      case _ => "UnknownFileFormat"
+    }
     val compressionCodec =
       
WriteFilesExecTransformer.getCompressionCodec(caseInsensitiveOptions).capitalize
     val writeParametersStr = new StringBuffer("WriteParameters:")
-    
writeParametersStr.append("is").append(compressionCodec).append("=1").append("\n")
+    writeParametersStr.append("is").append(compressionCodec).append("=1")
+    writeParametersStr.append(";format=").append(fileFormatStr).append("\n")
+
     val message = StringValue
       .newBuilder()
       .setValue(writeParametersStr.toString)
@@ -78,15 +87,6 @@ case class WriteFilesExecTransformer(
     BackendsApiManager.getTransformerApiInstance.packPBMessage(message)
   }
 
-  def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = {
-    val inputTypeNodes = output.map {
-      attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)
-    }
-
-    BackendsApiManager.getTransformerApiInstance.packPBMessage(
-      TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf)
-  }
-
   def getRelNode(
       context: SubstraitContext,
       originalInputAttributes: Seq[Attribute],
@@ -118,10 +118,11 @@ case class WriteFilesExecTransformer(
     val extensionNode = if (!validation) {
       ExtensionBuilder.makeAdvancedExtension(
         genWriteParameters(),
-        createEnhancement(originalInputAttributes))
+        SubstraitUtil.createEnhancement(originalInputAttributes))
     } else {
       // Use a extension node to send the input types through Substrait plan 
for validation.
-      
ExtensionBuilder.makeAdvancedExtension(createEnhancement(originalInputAttributes))
+      ExtensionBuilder.makeAdvancedExtension(
+        SubstraitUtil.createEnhancement(originalInputAttributes))
     }
     RelBuilder.makeWriteRel(
       input,
@@ -133,7 +134,7 @@ case class WriteFilesExecTransformer(
       operatorId)
   }
 
-  private def getFinalChildOutput(): Seq[Attribute] = {
+  private def getFinalChildOutput: Seq[Attribute] = {
     val metadataExclusionList = conf
       .getConf(GlutenConfig.NATIVE_WRITE_FILES_COLUMN_METADATA_EXCLUSION_LIST)
       .split(",")
@@ -143,7 +144,7 @@ case class WriteFilesExecTransformer(
   }
 
   override protected def doValidateInternal(): ValidationResult = {
-    val finalChildOutput = getFinalChildOutput()
+    val finalChildOutput = getFinalChildOutput
     val validationResult =
       BackendsApiManager.getSettings.supportWriteFilesExec(
         fileFormat,
@@ -165,7 +166,7 @@ case class WriteFilesExecTransformer(
     val childCtx = child.asInstanceOf[TransformSupport].transform(context)
     val operatorId = context.nextOperatorId(this.nodeName)
     val currRel =
-      getRelNode(context, getFinalChildOutput(), operatorId, childCtx.root, 
validation = false)
+      getRelNode(context, getFinalChildOutput, operatorId, childCtx.root, 
validation = false)
     assert(currRel != null, "Write Rel should be valid")
     TransformContext(childCtx.outputAttributes, output, currRel)
   }
@@ -196,7 +197,7 @@ object WriteFilesExecTransformer {
     "__file_source_generated_metadata_col"
   )
 
-  def removeMetadata(attr: Attribute, metadataExclusionList: Seq[String]): 
Attribute = {
+  private def removeMetadata(attr: Attribute, metadataExclusionList: 
Seq[String]): Attribute = {
     val metadataKeys = INTERNAL_METADATA_KEYS ++ metadataExclusionList
     attr.withMetadata {
       var builder = new MetadataBuilder().withMetadata(attr.metadata)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 903723ccb..a85cb163c 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -141,8 +141,7 @@ object Validators {
     override def validate(plan: SparkPlan): Validator.OutCome = plan match {
       case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() => 
fail(p)
       case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => 
fail(p)
-      case p: WriteFilesExec
-          if !(settings.enableNativeWriteFiles() && 
settings.supportTransformWriteFiles) =>
+      case p: WriteFilesExec if !settings.enableNativeWriteFiles() =>
         fail(p)
       case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
         fail(p)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala 
b/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
index e8e7ce06f..c641cb448 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/SubstraitUtil.scala
@@ -16,10 +16,19 @@
  */
 package org.apache.gluten.utils
 
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.expression.ExpressionNode
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.{FullOuter, InnerLike, JoinType, 
LeftAnti, LeftOuter, LeftSemi, RightOuter}
 
 import io.substrait.proto.{CrossRel, JoinRel}
 
+import scala.collection.JavaConverters._
+
 object SubstraitUtil {
   def toSubstrait(sparkJoin: JoinType): JoinRel.JoinType = sparkJoin match {
     case _: InnerLike =>
@@ -55,4 +64,24 @@ object SubstraitUtil {
     case _ =>
       CrossRel.JoinType.UNRECOGNIZED
   }
+
+  def createEnhancement(output: Seq[Attribute]): com.google.protobuf.Any = {
+    val inputTypeNodes = output.map {
+      attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)
+    }
+    // Normally the enhancement node is only used for plan validation. But 
here the enhancement
+    // is also used in execution phase. In this case an empty typeUrlPrefix 
need to be passed,
+    // so that it can be correctly parsed into json string on the cpp side.
+    BackendsApiManager.getTransformerApiInstance.packPBMessage(
+      TypeBuilder.makeStruct(false, inputTypeNodes.asJava).toProtobuf)
+  }
+
+  def toSubstraitExpression(
+      expr: Expression,
+      attributeSeq: Seq[Attribute],
+      context: SubstraitContext): ExpressionNode = {
+    ExpressionConverter
+      .replaceWithExpressionTransformer(expr, attributeSeq)
+      .doTransform(context.registeredFunction)
+  }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
similarity index 67%
rename from 
backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
rename to 
gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index c87b8d4f6..6f04b8480 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -17,150 +17,47 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.{Partition, SparkException, TaskContext, 
TaskOutputFileAlreadyExistException}
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.rdd.RDD
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
GenericInternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.Utils
 
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.hadoop.fs.FileAlreadyExistsException
 
 import java.util.Date
 
-import scala.collection.mutable
-
-// Velox write files metrics start
-//
-// Follows the code in velox `HiveDataSink::close()`
-// The json can be as following:
-// {
-//     "inMemoryDataSizeInBytes":0,
-//     "containsNumberedFileNames":true,
-//     "onDiskDataSizeInBytes":307,
-//     "fileWriteInfos":[
-//         {
-//             "fileSize":307,
-//             "writeFileName":
-//                
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
-//             "targetFileName":
-//                
"Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
-//         }
-//     ],
-//     
"writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
-//     "rowCount":1,
-//     
"targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
-//     "updateMode":"NEW",
-//     "name":"part1=1/part2=1"
-// }
-case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, 
fileSize: Long)
-
-case class VeloxWriteFilesMetrics(
-    name: String,
-    updateMode: String,
-    writePath: String,
-    targetPath: String,
-    fileWriteInfos: Seq[VeloxWriteFilesInfo],
-    rowCount: Long,
-    inMemoryDataSizeInBytes: Long,
-    onDiskDataSizeInBytes: Long,
-    containsNumberedFileNames: Boolean)
-
-// Velox write files metrics end
+/**
+ * This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write 
path before
+ * initializing the native plan and collect native write files metrics for 
each backend.
+ */
+trait BackendWrite {
+  def collectNativeWriteFilesMetrics(batch: ColumnarBatch): 
Option[WriteTaskResult]
+}
 
 /**
  * This RDD is used to make sure we have injected staging write path before 
initializing the native
  * plan, and support Spark file commit protocol.
  */
-class VeloxColumnarWriteFilesRDD(
+class ColumnarWriteFilesRDD(
     var prev: RDD[ColumnarBatch],
     description: WriteJobDescription,
     committer: FileCommitProtocol,
     jobTrackerID: String)
   extends RDD[WriterCommitMessage](prev) {
 
-  private def collectNativeWriteFilesMetrics(cb: ColumnarBatch): 
Option[WriteTaskResult] = {
-    // Currently, the cb contains three columns: row, fragments, and context.
-    // The first row in the row column contains the number of written numRows.
-    // The fragments column contains detailed information about the file 
writes.
-    val loadedCb = 
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
-    assert(loadedCb.numCols() == 3)
-    val numWrittenRows = loadedCb.column(0).getLong(0)
-
-    var updatedPartitions = Set.empty[String]
-    val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, 
String]()
-    var numBytes = 0L
-    val objectMapper = new ObjectMapper()
-    objectMapper.registerModule(DefaultScalaModule)
-    for (i <- 0 until loadedCb.numRows() - 1) {
-      val fragments = loadedCb.column(1).getUTF8String(i + 1)
-      val metrics = objectMapper
-        .readValue(fragments.toString.getBytes("UTF-8"), 
classOf[VeloxWriteFilesMetrics])
-      logDebug(s"Velox write files metrics: $metrics")
-
-      val fileWriteInfos = metrics.fileWriteInfos
-      assert(fileWriteInfos.length == 1)
-      val fileWriteInfo = fileWriteInfos.head
-      numBytes += fileWriteInfo.fileSize
-      val targetFileName = fileWriteInfo.targetFileName
-      val outputPath = description.path
-
-      // part1=1/part2=1
-      val partitionFragment = metrics.name
-      // Write a partitioned table
-      if (partitionFragment != "") {
-        updatedPartitions += partitionFragment
-        val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + 
targetFileName
-        val customOutputPath = description.customPartitionLocations.get(
-          PartitioningUtils.parsePathFragment(partitionFragment))
-        if (customOutputPath.isDefined) {
-          addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
-        }
-      }
-    }
-
-    val numFiles = loadedCb.numRows() - 1
-    val partitionsInternalRows = updatedPartitions.map {
-      part =>
-        val parts = new Array[Any](1)
-        parts(0) = part
-        new GenericInternalRow(parts)
-    }.toSeq
-    val stats = BasicWriteTaskStats(
-      partitions = partitionsInternalRows,
-      numFiles = numFiles,
-      numBytes = numBytes,
-      numRows = numWrittenRows)
-    val summary =
-      ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = 
Seq(stats))
-
-    // Write an empty iterator
-    if (numFiles == 0) {
-      None
-    } else {
-      Some(
-        WriteTaskResult(
-          new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
-          summary))
-    }
-  }
-
   private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
     val stats = 
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
     val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
@@ -194,21 +91,25 @@ class VeloxColumnarWriteFilesRDD(
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[WriterCommitMessage] = {
     val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, 
description, committer)
+    val backendWrite =
+      
BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description)
 
     commitProtocol.setupTask()
     val writePath = commitProtocol.newTaskAttemptTempPath()
-    logDebug(s"Velox staging write path: $writePath")
+    val writeFileName = commitProtocol.getFilename
+    logDebug(s"Native staging write path: $writePath and file name: 
$writeFileName")
+
     var writeTaskResult: WriteTaskResult = null
     try {
       Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-        
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath)
+        
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
writeFileName)
 
         // Initialize the native plan
         val iter = firstParent[ColumnarBatch].iterator(split, context)
         assert(iter.hasNext)
         val resultColumnarBatch = iter.next()
         assert(resultColumnarBatch != null)
-        val nativeWriteTaskResult = 
collectNativeWriteFilesMetrics(resultColumnarBatch)
+        val nativeWriteTaskResult = 
backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch)
         if (nativeWriteTaskResult.isEmpty) {
           // If we are writing an empty iterator, then velox would do nothing.
           // Here we fallback to use vanilla Spark write files to generate an 
empty file for
@@ -255,7 +156,7 @@ class VeloxColumnarWriteFilesRDD(
 // we need to expose a dummy child (as right child) with type "WriteFilesExec" 
to let Spark
 // choose the new write code path (version >= 3.4). The actual plan to write 
is the left child
 // of this operator.
-case class VeloxColumnarWriteFilesExec private (
+case class ColumnarWriteFilesExec private (
     override val left: SparkPlan,
     override val right: SparkPlan,
     fileFormat: FileFormat,
@@ -265,7 +166,7 @@ case class VeloxColumnarWriteFilesExec private (
     staticPartitions: TablePartitionSpec)
   extends BinaryExecNode
   with GlutenPlan
-  with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible {
+  with ColumnarWriteFilesExec.ExecuteWriteCompatible {
 
   val child: SparkPlan = left
 
@@ -316,7 +217,7 @@ case class VeloxColumnarWriteFilesExec private (
       // partition rdd to make sure we at least set up one write task to write 
the metadata.
       writeFilesForEmptyRDD(description, committer, jobTrackerID)
     } else {
-      new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
+      new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
     }
   }
   override protected def withNewChildrenInternal(
@@ -325,7 +226,7 @@ case class VeloxColumnarWriteFilesExec private (
     copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, 
staticPartitions)
 }
 
-object VeloxColumnarWriteFilesExec {
+object ColumnarWriteFilesExec {
 
   def apply(
       child: SparkPlan,
@@ -333,7 +234,7 @@ object VeloxColumnarWriteFilesExec {
       partitionColumns: Seq[Attribute],
       bucketSpec: Option[BucketSpec],
       options: Map[String, String],
-      staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = {
+      staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
     // This is a workaround for FileFormatWriter#write. Vanilla Spark (version 
>= 3.4) requires for
     // a plan that has at least one node exactly of type `WriteFilesExec` that 
is a Scala
     // case-class, to decide to choose new `#executeWrite` code path over the 
legacy `#execute`
@@ -352,7 +253,7 @@ object VeloxColumnarWriteFilesExec {
         options,
         staticPartitions)
 
-    VeloxColumnarWriteFilesExec(
+    ColumnarWriteFilesExec(
       child,
       right,
       fileFormat,
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
similarity index 82%
rename from 
backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
rename to 
gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 845f2f98f..5e3ab83e3 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, 
HadoopMapReduceCommitProtocol}
 import org.apache.spark.sql.execution.datasources.WriteJobDescription
 import org.apache.spark.util.Utils
 
@@ -41,9 +41,9 @@ class SparkWriteFilesCommitProtocol(
   extends Logging {
   assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol])
 
-  val sparkStageId = TaskContext.get().stageId()
-  val sparkPartitionId = TaskContext.get().partitionId()
-  val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
+  val sparkStageId: Int = TaskContext.get().stageId()
+  val sparkPartitionId: Int = TaskContext.get().partitionId()
+  private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
   private val jobId = createJobID(jobTrackerID, sparkStageId)
 
   private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -68,6 +68,21 @@ class SparkWriteFilesCommitProtocol(
     field.get(committer).asInstanceOf[OutputCommitter]
   }
 
+  private lazy val internalGetFilename = {
+    val m = classOf[HadoopMapReduceCommitProtocol]
+      .getDeclaredMethod("getFilename", classOf[TaskAttemptContext], 
classOf[FileNameSpec])
+    m.setAccessible(true)
+    m
+  }
+
+  def getFilename: String = {
+    val fileCounter = 0
+    val suffix = f".c$fileCounter%03d" +
+      description.outputWriterFactory.getFileExtension(taskAttemptContext)
+    val fileNameSpec = FileNameSpec("", suffix)
+    internalGetFilename.invoke(committer, taskAttemptContext, 
fileNameSpec).asInstanceOf[String]
+  }
+
   def setupTask(): Unit = {
     committer.setupTask(taskAttemptContext)
   }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
index 3d277b94c..726ace3a1 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala
@@ -21,7 +21,7 @@ import org.apache.gluten.execution.SortExecTransformer
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 import org.apache.spark.sql.catalyst.expressions.{Ascending, 
AttributeReference, NullsFirst, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
-import org.apache.spark.sql.execution.{QueryExecution, SortExec, 
VeloxColumnarWriteFilesExec}
+import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, QueryExecution, 
SortExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, StringType}
@@ -122,8 +122,8 @@ class GlutenV1WriteCommandSuite
             val executedPlan = FileFormatWriter.executedPlan.get
 
             val plan = if (enabled) {
-              assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
-              executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
+              assert(executedPlan.isInstanceOf[ColumnarWriteFilesExec])
+              executedPlan.asInstanceOf[ColumnarWriteFilesExec].child
             } else {
               executedPlan.transformDown { case a: AdaptiveSparkPlanExec => 
a.executedPlan }
             }
@@ -204,8 +204,8 @@ class GlutenV1WriteCommandSuite
           val executedPlan = FileFormatWriter.executedPlan.get
 
           val plan = if (enabled) {
-            assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
-            executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
+            assert(executedPlan.isInstanceOf[ColumnarWriteFilesExec])
+            executedPlan.asInstanceOf[ColumnarWriteFilesExec].child
           } else {
             executedPlan.transformDown { case a: AdaptiveSparkPlanExec => 
a.executedPlan }
           }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index ca4b3740a..5c60115c5 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, 
VeloxColumnarWriteFilesExec}
+import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, 
CommandResultExec, QueryExecution}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -60,13 +60,13 @@ class GlutenInsertSuite
     super.afterAll()
   }
 
-  private def checkAndGetWriteFiles(df: DataFrame): 
VeloxColumnarWriteFilesExec = {
+  private def checkAndGetWriteFiles(df: DataFrame): ColumnarWriteFilesExec = {
     val writeFiles = stripAQEPlan(
       df.queryExecution.executedPlan
         .asInstanceOf[CommandResultExec]
         .commandPhysicalPlan).children.head
-    assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec])
-    writeFiles.asInstanceOf[VeloxColumnarWriteFilesExec]
+    assert(writeFiles.isInstanceOf[ColumnarWriteFilesExec])
+    writeFiles.asInstanceOf[ColumnarWriteFilesExec]
   }
 
   testGluten("insert partition table") {
@@ -405,7 +405,7 @@ class GlutenInsertSuite
           withTable("t") {
             sql(s"create table t(i boolean) using ${config.dataSource}")
             if (config.useDataFrames) {
-              Seq((false)).toDF.write.insertInto("t")
+              Seq(false).toDF.write.insertInto("t")
             } else {
               sql("insert into t select false")
             }
@@ -425,7 +425,7 @@ class GlutenInsertSuite
           withTable("t") {
             sql(s"create table t(i boolean) using ${config.dataSource}")
             if (config.useDataFrames) {
-              Seq((false)).toDF.write.insertInto("t")
+              Seq(false).toDF.write.insertInto("t")
             } else {
               sql("insert into t select false")
             }
@@ -452,7 +452,7 @@ class GlutenInsertSuite
           withTable("t") {
             sql(s"create table t(i boolean) using ${config.dataSource}")
             if (config.useDataFrames) {
-              Seq((false)).toDF.write.insertInto("t")
+              Seq(false).toDF.write.insertInto("t")
             } else {
               sql("insert into t select false")
             }
@@ -474,7 +474,7 @@ class GlutenInsertSuite
           withTable("t") {
             sql(s"create table t(i boolean) using ${config.dataSource}")
             if (config.useDataFrames) {
-              Seq((false)).toDF.write.insertInto("t")
+              Seq(false).toDF.write.insertInto("t")
             } else {
               sql("insert into t select false")
             }
@@ -501,7 +501,7 @@ class GlutenInsertSuite
           withTable("t") {
             sql(s"create table t(i boolean) using ${config.dataSource}")
             if (config.useDataFrames) {
-              Seq((false)).toDF.write.insertInto("t")
+              Seq(false).toDF.write.insertInto("t")
             } else {
               sql("insert into t select false")
             }
@@ -571,7 +571,7 @@ class GlutenInsertSuite
           withTable("t") {
             sql(s"create table t(i boolean) using ${config.dataSource}")
             if (config.useDataFrames) {
-              Seq((false)).toDF.write.insertInto("t")
+              Seq(false).toDF.write.insertInto("t")
             } else {
               sql("insert into t select false")
             }
diff --git 
a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala
 
b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala
index c7ad606bc..68c0e1c93 100644
--- 
a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala
+++ 
b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala
@@ -16,12 +16,12 @@
  */
 package org.apache.gluten
 
-import org.apache.spark.sql.execution.{SparkPlan, VeloxColumnarWriteFilesExec}
+import org.apache.spark.sql.execution.{SparkPlan, ColumnarWriteFilesExec}
 
 trait GlutenColumnarWriteTestSupport {
 
   def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = {
-    assert(sparkPlan.isInstanceOf[VeloxColumnarWriteFilesExec])
-    sparkPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child
+    assert(sparkPlan.isInstanceOf[ColumnarWriteFilesExec])
+    sparkPlan.asInstanceOf[ColumnarWriteFilesExec].child
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to