This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2dd5632ef [GLUTEN-6705][CH] Basic Support Delta write (#6767)
2dd5632ef is described below

commit 2dd5632efffc2636cdcbe532983530325be22978
Author: Chang chen <[email protected]>
AuthorDate: Fri Aug 9 23:17:43 2024 +0800

    [GLUTEN-6705][CH] Basic Support Delta write (#6767)
    
    * Basic Support Native Delta Write
    
    * Fix Build for spark 3.2 and 3.3
    
    * Clean up
    
    * Fix UT failed.
    
    * Fix build error and warning in scala 2.13
---
 .../spark/sql/execution/CHDeltaColumnarWrite.scala |  30 +++
 .../spark/sql/execution/CHDeltaColumnarWrite.scala |  30 +++
 .../delta/ClickhouseOptimisticTransaction.scala    | 166 +++++++++++++-
 .../spark/sql/execution/CHDeltaColumnarWrite.scala | 144 ++++++++++++
 .../spark/shuffle/CHColumnarShuffleWriter.scala    |   2 +-
 .../spark/sql/execution/CHColumnarWrite.scala      | 241 +++++++++++++++++++++
 .../sql/execution/CHColumnarWriteFilesExec.scala   | 108 ++-------
 .../spark/sql/execution/utils/CHExecUtil.scala     |   2 +-
 .../GlutenClickHouseDeltaParquetWriteSuite.scala   |  14 +-
 .../GlutenClickHouseNativeWriteTableSuite.scala    |  35 ++-
 .../apache/spark/gluten/NativeWriteChecker.scala   |   5 +-
 .../execution/SparkWriteFilesCommitProtocol.scala  |  23 +-
 .../gluten/planner/plan/GlutenPlanModel.scala      |   2 +-
 .../sql/execution/ColumnarWriteFilesExec.scala     |   2 +-
 14 files changed, 675 insertions(+), 129 deletions(-)

diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
new file mode 100644
index 000000000..e949bebf2
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.exception.GlutenNotSupportException
+
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.execution.datasources.WriteJobDescription
+
+object CHDeltaColumnarWrite {
+  def apply(
+      jobTrackerID: String,
+      description: WriteJobDescription,
+      committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] =
+    throw new GlutenNotSupportException("Delta Native is not supported in 
Spark 3.2")
+}
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
new file mode 100644
index 000000000..0a1aee5c4
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.exception.GlutenNotSupportException
+
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.execution.datasources.WriteJobDescription
+
+object CHDeltaColumnarWrite {
+  def apply(
+      jobTrackerID: String,
+      description: WriteJobDescription,
+      committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] =
+    throw new GlutenNotSupportException("Delta Native is not supported in 
Spark 3.3")
+}
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 6eec68efe..e023d3d7c 100644
--- 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -25,17 +25,21 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.delta.actions._
 import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
 import org.apache.spark.sql.delta.constraints.{Constraint, Constraints}
-import org.apache.spark.sql.delta.files.MergeTreeCommitProtocol
-import org.apache.spark.sql.delta.schema.InvariantViolationException
+import org.apache.spark.sql.delta.files.{DelayedCommitProtocol, 
DeltaFileFormatWriter, MergeTreeCommitProtocol, TransactionalWrite}
+import org.apache.spark.sql.delta.hooks.AutoCompact
+import org.apache.spark.sql.delta.schema.{InnerInvariantViolationException, 
InvariantViolationException}
 import org.apache.spark.sql.delta.sources.DeltaSQLConf
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.delta.stats.DeltaJobStatisticsTracker
+import org.apache.spark.sql.execution.{CHDelayedCommitProtocol, 
QueryExecution, SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
FakeRowAdaptor, FileFormatWriter, WriteJobStatsTracker}
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
FakeRowAdaptor, FileFormatWriter, WriteFiles, WriteJobStatsTracker}
 import 
org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.SerializableConfiguration
 
 import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.hadoop.fs.Path
 
 import scala.collection.mutable.ListBuffer
 
@@ -190,4 +194,158 @@ class ClickhouseOptimisticTransaction(
       super.writeFiles(inputData, writeOptions, additionalConstraints)
     }
   }
+
+  private def shouldOptimizeWrite(
+      writeOptions: Option[DeltaOptions],
+      sessionConf: SQLConf): Boolean = {
+    writeOptions
+      .flatMap(_.optimizeWrite)
+      .getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf))
+  }
+
+  override protected def getCommitter(outputPath: Path): DelayedCommitProtocol 
=
+    new CHDelayedCommitProtocol("delta", outputPath.toString, None, 
deltaDataSubdir)
+
+  override def writeFiles(
+      inputData: Dataset[_],
+      writeOptions: Option[DeltaOptions],
+      isOptimize: Boolean,
+      additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
+
+    if (isOptimize)
+      throw new UnsupportedOperationException("Optimize is not supported for 
ClickHouse")
+
+    hasWritten = true
+
+    val spark = inputData.sparkSession
+    val (data, partitionSchema) = performCDCPartition(inputData)
+    val outputPath = deltaLog.dataPath
+
+    val fileFormat = deltaLog.fileFormat(protocol, metadata) // TODO support 
changing formats.
+
+    // Iceberg spec requires partition columns in data files
+    val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
+    // Retain only a minimal selection of Spark writer options to avoid any 
potential
+    // compatibility issues
+    val options = (writeOptions match {
+      case None => Map.empty[String, String]
+      case Some(writeOptions) =>
+        writeOptions.options.filterKeys {
+          key =>
+            key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
+            key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
+        }.toMap
+    }) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> 
writePartitionColumns.toString)
+
+    val (normalQueryExecution, output, generatedColumnConstraints, _) =
+      normalizeData(deltaLog, writeOptions, data)
+    val partitioningColumns = getPartitioningColumns(partitionSchema, output)
+
+    val logicalPlan = normalQueryExecution.optimizedPlan
+    val write =
+      WriteFiles(logicalPlan, fileFormat, partitioningColumns, None, options, 
Map.empty)
+
+    val queryExecution = new QueryExecution(spark, write)
+    val committer = getCommitter(outputPath)
+
+    // If Statistics Collection is enabled, then create a stats tracker that 
will be injected during
+    // the FileFormatWriter.write call below and will collect per-file stats 
using
+    // StatisticsCollection
+    //    val (optionalStatsTracker, _) =
+    //      getOptionalStatsTrackerAndStatsCollection(output, outputPath, 
partitionSchema, data)
+    val optionalStatsTracker: Option[DeltaJobStatisticsTracker] = None
+
+    val constraints =
+      Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ 
additionalConstraints
+
+    SQLExecution.withNewExecutionId(queryExecution, 
Option("deltaTransactionalWrite")) {
+      val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, 
Map.empty, output)
+
+      val physicalPlan = 
materializeAdaptiveSparkPlan(queryExecution.executedPlan)
+      // convertEmptyToNullIfNeeded(queryExecution.executedPlan, 
partitioningColumns, constraints)
+      /*      val checkInvariants = DeltaInvariantCheckerExec(empty2NullPlan, 
constraints)
+      // No need to plan optimized write if the write command is OPTIMIZE, 
which aims to produce
+      // evenly-balanced data files already.
+      val physicalPlan =
+        if (
+          !isOptimize &&
+          shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
+        ) {
+          DeltaOptimizedWriterExec(checkInvariants, metadata.partitionColumns, 
deltaLog)
+        } else {
+          checkInvariants
+        }*/
+      val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
+
+      if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
+        val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
+          new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
+          BasicWriteJobStatsTracker.metrics)
+        registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
+        statsTrackers.append(basicWriteJobStatsTracker)
+      }
+
+      try {
+        DeltaFileFormatWriter.write(
+          sparkSession = spark,
+          plan = physicalPlan,
+          fileFormat = fileFormat,
+          committer = committer,
+          outputSpec = outputSpec,
+          // scalastyle:off deltahadoopconfiguration
+          hadoopConf =
+            spark.sessionState.newHadoopConfWithOptions(metadata.configuration 
++ deltaLog.options),
+          // scalastyle:on deltahadoopconfiguration
+          partitionColumns = partitioningColumns,
+          bucketSpec = None,
+          statsTrackers = optionalStatsTracker.toSeq
+            ++ statsTrackers,
+          options = options
+        )
+      } catch {
+        case InnerInvariantViolationException(violationException) =>
+          // Pull an InvariantViolationException up to the top level if it was 
the root cause.
+          throw violationException
+      }
+    }
+
+    var resultFiles =
+      (if (optionalStatsTracker.isDefined) {
+         committer.addedStatuses.map {
+           a =>
+             a.copy(stats =
+               
optionalStatsTracker.map(_.recordedStats(a.toPath.getName)).getOrElse(a.stats))
+         }
+       } else {
+         committer.addedStatuses
+       })
+        .filter {
+          // In some cases, we can write out an empty `inputData`. Some 
examples of this (though, they
+          // may be fixed in the future) are the MERGE command when you delete 
with empty source, or
+          // empty target, or on disjoint tables. This is hard to catch before 
the write without
+          // collecting the DF ahead of time. Instead, we can return only the 
AddFiles that
+          // a) actually add rows, or
+          // b) don't have any stats so we don't know the number of rows at all
+          case a: AddFile => a.numLogicalRecords.forall(_ > 0)
+          case _ => true
+        }
+
+    // add [[AddFile.Tags.ICEBERG_COMPAT_VERSION.name]] tags to addFiles
+    if (IcebergCompatV2.isEnabled(metadata)) {
+      resultFiles = resultFiles.map {
+        addFile =>
+          val tags = if (addFile.tags != null) addFile.tags else 
Map.empty[String, String]
+          addFile.copy(tags = tags + (AddFile.Tags.ICEBERG_COMPAT_VERSION.name 
-> "2"))
+      }
+    }
+
+    if (resultFiles.nonEmpty && !isOptimize) 
registerPostCommitHook(AutoCompact)
+
+    resultFiles.toSeq ++ committer.changeFiles
+  }
+
+  private def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan 
match {
+    case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
+    case p: SparkPlan => p
+  }
 }
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
new file mode 100644
index 000000000..66f502038
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/CHDeltaColumnarWrite.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.exception.GlutenNotSupportException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.delta.files.DelayedCommitProtocol
+import org.apache.spark.sql.execution.datasources.{ExecutedWriteSummary, 
WriteJobDescription, WriteTaskResult}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/** A Wrapper of [[DelayedCommitProtocol]] for accessing protected methods and 
fields. */
+class CHDelayedCommitProtocol(
+    jobId: String,
+    val outputPath: String,
+    randomPrefixLength: Option[Int],
+    subdir: Option[String])
+  extends DelayedCommitProtocol(jobId, outputPath, randomPrefixLength, subdir) 
{
+
+  override def getFileName(
+      taskContext: TaskAttemptContext,
+      ext: String,
+      partitionValues: Map[String, String]): String = {
+    super.getFileName(taskContext, ext, partitionValues)
+  }
+
+  def updateAddedFiles(files: Seq[(Map[String, String], String)]): Unit = {
+    assert(addedFiles.isEmpty)
+    addedFiles ++= files
+  }
+
+  override def parsePartitions(dir: String): Map[String, String] =
+    super.parsePartitions(dir)
+}
+
+case class CHDelayedCommitProtocolWrite(
+    override val jobTrackerID: String,
+    override val description: WriteJobDescription,
+    override val committer: CHDelayedCommitProtocol)
+  extends CHColumnarWrite[CHDelayedCommitProtocol]
+  with Logging {
+
+  override def doSetupNativeTask(): Unit = {
+    assert(description.path == committer.outputPath)
+    val nameSpec = CreateFileNameSpec(taskAttemptContext, description)
+    val writePath = description.path
+    val writeFileName = committer.getFileName(taskAttemptContext, 
nameSpec.suffix, Map.empty)
+    logDebug(s"Native staging write path: $writePath and file name: 
$writeFileName")
+    
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
writeFileName)
+  }
+
+  private def doCollectNativeResult(
+      cb: ColumnarBatch): Option[(Seq[(Map[String, String], String)], 
ExecutedWriteSummary)] = {
+    val numFiles = cb.numRows()
+    // Write an empty iterator
+    if (numFiles == 0) {
+      None
+    } else {
+      val file_col = cb.column(0)
+      val partition_col = cb.column(1)
+      val count_col = cb.column(2)
+
+      val partitions: mutable.Set[String] = mutable.Set[String]()
+      val addedFiles: ArrayBuffer[(Map[String, String], String)] =
+        new ArrayBuffer[(Map[String, String], String)]
+
+      var numWrittenRows: Long = 0
+      Range(0, cb.numRows()).foreach {
+        i =>
+          val fileName = file_col.getUTF8String(i).toString
+          val partition = partition_col.getUTF8String(i).toString
+          if (partition == "__NO_PARTITION_ID__") {
+            addedFiles.append((Map.empty[String, String], fileName))
+          } else {
+            val partitionValues = committer.parsePartitions(partition)
+            addedFiles.append((partitionValues, s"$partition/$fileName"))
+          }
+          numWrittenRows += count_col.getLong(i)
+      }
+      val updatedPartitions = partitions.toSet
+      Some(
+        (
+          addedFiles.toSeq,
+          ExecutedWriteSummary(
+            updatedPartitions = updatedPartitions,
+            stats = Seq(CreateBasicWriteTaskStats(numFiles, updatedPartitions, 
numWrittenRows)))))
+    }
+  }
+
+  override def commitTask(batch: ColumnarBatch): Option[WriteTaskResult] = {
+    doCollectNativeResult(batch).map {
+      case (addedFiles, summary) =>
+        require(addedFiles.nonEmpty, "No files to commit")
+
+        committer.updateAddedFiles(addedFiles)
+
+        val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
+          committer.commitTask(taskAttemptContext)
+        }
+
+        // Just for update task commit time
+        description.statsTrackers.foreach {
+          stats => stats.newTaskInstance().getFinalStats(taskCommitTime)
+        }
+        WriteTaskResult(taskCommitMessage, summary)
+    }
+  }
+}
+
+object CHDeltaColumnarWrite {
+  def apply(
+      jobTrackerID: String,
+      description: WriteJobDescription,
+      committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] = 
committer match {
+    case c: CHDelayedCommitProtocol =>
+      CHDelayedCommitProtocolWrite(jobTrackerID, description, c)
+        .asInstanceOf[CHColumnarWrite[FileCommitProtocol]]
+    case _ =>
+      throw new GlutenNotSupportException(
+        s"Unsupported committer type: ${committer.getClass.getSimpleName}")
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
index db9bba5f1..d60873430 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala
@@ -178,6 +178,6 @@ class CHColumnarShuffleWriter[K, V](
   }
 
   // VisibleForTesting
-  def getPartitionLengths: Array[Long] = partitionLengths
+  def getPartitionLengths(): Array[Long] = partitionLengths
 
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
new file mode 100644
index 000000000..6a5c19a4f
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.{BasicWriteTaskStats, 
ExecutedWriteSummary, PartitioningUtils, WriteJobDescription, WriteTaskResult}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.util.Utils
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobID, OutputCommitter, 
TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import java.lang.reflect.Field
+
+import scala.collection.mutable
+
+trait CHColumnarWrite[T <: FileCommitProtocol] {
+
+  def description: WriteJobDescription
+  def jobTrackerID: String
+  def committer: T
+  def doSetupNativeTask(): Unit
+
+  def setupTask(): Unit = {
+    committer.setupTask(taskAttemptContext)
+    doSetupNativeTask()
+  }
+
+  def abortTask(): Unit = {
+    committer.abortTask(taskAttemptContext)
+  }
+  def commitTask(batch: ColumnarBatch): Option[WriteTaskResult]
+
+  lazy val (taskAttemptContext: TaskAttemptContext, jobId: String) = {
+    // Copied from `SparkHadoopWriterUtils.createJobID` to be compatible with 
multi-version
+    def createJobID(jobTrackerID: String, id: Int): JobID = {
+      if (id < 0) {
+        throw new IllegalArgumentException("Job number is negative")
+      }
+      new JobID(jobTrackerID, id)
+    }
+
+    val sparkStageId: Int = TaskContext.get().stageId()
+    val sparkPartitionId: Int = TaskContext.get().partitionId()
+    val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
+    val jobID = createJobID(jobTrackerID, sparkStageId)
+    val taskId = new TaskID(jobID, TaskType.MAP, sparkPartitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+    // Set up the configuration object
+    val hadoopConf = description.serializableHadoopConf.value
+    hadoopConf.set("mapreduce.job.id", jobID.toString)
+    hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
+    hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
+    hadoopConf.setBoolean("mapreduce.task.ismap", true)
+    hadoopConf.setInt("mapreduce.task.partition", 0)
+
+    (new TaskAttemptContextImpl(hadoopConf, taskAttemptId), jobID.toString)
+  }
+}
+
+object CreateFileNameSpec {
+  def apply(taskContext: TaskAttemptContext, description: 
WriteJobDescription): FileNameSpec = {
+    val fileCounter = 0
+    val suffix = f".c$fileCounter%03d" +
+      description.outputWriterFactory.getFileExtension(taskContext)
+    FileNameSpec("", suffix)
+  }
+}
+
+object CreateBasicWriteTaskStats {
+  def apply(
+      numFiles: Int,
+      updatedPartitions: Set[String],
+      numWrittenRows: Long): BasicWriteTaskStats = {
+    val partitionsInternalRows = updatedPartitions.map {
+      part =>
+        val parts = new Array[Any](1)
+        parts(0) = part
+        new GenericInternalRow(parts)
+    }.toSeq
+    BasicWriteTaskStats(
+      partitions = partitionsInternalRows,
+      numFiles = numFiles,
+      numBytes = 101,
+      numRows = numWrittenRows)
+  }
+}
+
+/** [[HadoopMapReduceAdapter]] for [[HadoopMapReduceCommitProtocol]]. */
+case class HadoopMapReduceAdapter(sparkCommitter: 
HadoopMapReduceCommitProtocol) {
+  private lazy val committer: OutputCommitter = {
+    val field: Field = 
classOf[HadoopMapReduceCommitProtocol].getDeclaredField("committer")
+    field.setAccessible(true)
+    field.get(sparkCommitter).asInstanceOf[OutputCommitter]
+  }
+  private lazy val GetFilename = {
+    val m = classOf[HadoopMapReduceCommitProtocol]
+      .getDeclaredMethod("getFilename", classOf[TaskAttemptContext], 
classOf[FileNameSpec])
+    m.setAccessible(true)
+    m
+  }
+
+  private def newTaskAttemptTempPath(defaultPath: String): String = {
+    assert(committer != null)
+    val stagingDir: Path = committer match {
+      // For FileOutputCommitter it has its own staging path called "work 
path".
+      case f: FileOutputCommitter =>
+        new Path(Option(f.getWorkPath).map(_.toString).getOrElse(defaultPath))
+      case _ =>
+        new Path(defaultPath)
+    }
+    stagingDir.toString
+  }
+
+  private def getFilename(taskContext: TaskAttemptContext, spec: 
FileNameSpec): String = {
+    GetFilename.invoke(sparkCommitter, taskContext, spec).asInstanceOf[String]
+  }
+
+  def getTaskAttemptTempPathAndFilename(
+      taskContext: TaskAttemptContext,
+      description: WriteJobDescription): (String, String) = {
+    val stageDir = newTaskAttemptTempPath(description.path)
+    val filename = getFilename(taskContext, CreateFileNameSpec(taskContext, 
description))
+    (stageDir, filename)
+  }
+}
+
+case class HadoopMapReduceCommitProtocolWrite(
+    override val jobTrackerID: String,
+    override val description: WriteJobDescription,
+    override val committer: HadoopMapReduceCommitProtocol)
+  extends CHColumnarWrite[HadoopMapReduceCommitProtocol]
+  with Logging {
+
+  private lazy val adapter: HadoopMapReduceAdapter = 
HadoopMapReduceAdapter(committer)
+
+  /**
+   * This function is used in [[CHColumnarWriteFilesRDD]] to inject the 
staging write path before
+   * initializing the native plan and collect native write files metrics for 
each backend.
+   */
+  override def doSetupNativeTask(): Unit = {
+    val (writePath, writeFileName) =
+      adapter.getTaskAttemptTempPathAndFilename(taskAttemptContext, 
description)
+    logDebug(s"Native staging write path: $writePath and file name: 
$writeFileName")
+    
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
writeFileName)
+  }
+
+  def doCollectNativeResult(cb: ColumnarBatch): Option[WriteTaskResult] = {
+    val numFiles = cb.numRows()
+    // Write an empty iterator
+    if (numFiles == 0) {
+      None
+    } else {
+      val file_col = cb.column(0)
+      val partition_col = cb.column(1)
+      val count_col = cb.column(2)
+
+      val outputPath = description.path
+      val partitions: mutable.Set[String] = mutable.Set[String]()
+      val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, 
String]()
+
+      var numWrittenRows: Long = 0
+      Range(0, cb.numRows()).foreach {
+        i =>
+          val targetFileName = file_col.getUTF8String(i).toString
+          val partition = partition_col.getUTF8String(i).toString
+          if (partition != "__NO_PARTITION_ID__") {
+            partitions += partition
+            val tmpOutputPath = outputPath + "/" + partition + "/" + 
targetFileName
+            val customOutputPath =
+              description.customPartitionLocations.get(
+                PartitioningUtils.parsePathFragment(partition))
+            if (customOutputPath.isDefined) {
+              addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
+            }
+          }
+          numWrittenRows += count_col.getLong(i)
+      }
+
+      val updatedPartitions = partitions.toSet
+      val summary =
+        ExecutedWriteSummary(
+          updatedPartitions = updatedPartitions,
+          stats = Seq(CreateBasicWriteTaskStats(numFiles, updatedPartitions, 
numWrittenRows)))
+      Some(
+        WriteTaskResult(
+          new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
+          summary))
+    }
+  }
+
+  override def commitTask(batch: ColumnarBatch): Option[WriteTaskResult] = {
+    doCollectNativeResult(batch).map(
+      nativeWriteTaskResult => {
+        val (_, taskCommitTime) = Utils.timeTakenMs {
+          committer.commitTask(taskAttemptContext)
+        }
+
+        // Just for update task commit time
+        description.statsTrackers.foreach {
+          stats => stats.newTaskInstance().getFinalStats(taskCommitTime)
+        }
+        nativeWriteTaskResult
+      })
+  }
+}
+
+object CHColumnarWrite {
+  def apply(
+      jobTrackerID: String,
+      description: WriteJobDescription,
+      committer: FileCommitProtocol): CHColumnarWrite[FileCommitProtocol] = 
committer match {
+    case h: HadoopMapReduceCommitProtocol =>
+      HadoopMapReduceCommitProtocolWrite(jobTrackerID, description, h)
+        .asInstanceOf[CHColumnarWrite[FileCommitProtocol]]
+    case other => CHDeltaColumnarWrite(jobTrackerID, description, other)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
index 52019d770..bf051671f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
@@ -16,16 +16,13 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.backendsapi.BackendsApiManager
-
 import org.apache.spark.{Partition, SparkException, TaskContext, 
TaskOutputFileAlreadyExistException}
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.rdd.RDD
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.SQLConf
@@ -33,11 +30,10 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.Utils
 
 import org.apache.hadoop.fs.FileAlreadyExistsException
+import org.apache.hadoop.mapreduce.TaskAttemptContext
 
 import java.util.Date
 
-import scala.collection.mutable
-
 /**
  * This RDD is used to make sure we have injected staging write path before 
initializing the native
  * plan, and support Spark file commit protocol.
@@ -49,60 +45,6 @@ class CHColumnarWriteFilesRDD(
     jobTrackerID: String)
   extends RDD[WriterCommitMessage](prev) {
 
-  private def collectNativeResult(cb: ColumnarBatch): Option[WriteTaskResult] 
= {
-    val numFiles = cb.numRows()
-    // Write an empty iterator
-    if (numFiles == 0) {
-      None
-    } else {
-      val file_col = cb.column(0)
-      val partition_col = cb.column(1)
-      val count_col = cb.column(2)
-
-      val outputPath = description.path
-      var updatedPartitions = Set.empty[String]
-      val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, 
String]()
-
-      val write_stats = Range(0, cb.numRows()).map {
-        i =>
-          val targetFileName = file_col.getUTF8String(i).toString
-          val partition = partition_col.getUTF8String(i).toString
-          if (partition != "__NO_PARTITION_ID__") {
-            updatedPartitions += partition
-            val tmpOutputPath = outputPath + "/" + partition + "/" + 
targetFileName
-            val customOutputPath =
-              description.customPartitionLocations.get(
-                PartitioningUtils.parsePathFragment(partition))
-            if (customOutputPath.isDefined) {
-              addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + 
targetFileName
-            }
-          }
-          count_col.getLong(i)
-      }
-
-      val partitionsInternalRows = updatedPartitions.map {
-        part =>
-          val parts = new Array[Any](1)
-          parts(0) = part
-          new GenericInternalRow(parts)
-      }.toSeq
-
-      val numWrittenRows = write_stats.sum
-      val stats = BasicWriteTaskStats(
-        partitions = partitionsInternalRows,
-        numFiles = numFiles,
-        numBytes = 101,
-        numRows = numWrittenRows)
-      val summary =
-        ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = 
Seq(stats))
-
-      Some(
-        WriteTaskResult(
-          new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
-          summary))
-    }
-  }
-
   private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = {
     val stats = 
writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats]
     val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows)
@@ -116,11 +58,12 @@ class CHColumnarWriteFilesRDD(
   }
 
   private def writeFilesForEmptyIterator(
-      commitProtocol: SparkWriteFilesCommitProtocol): WriteTaskResult = {
-    val taskAttemptContext = commitProtocol.taskAttemptContext
+      taskAttemptContext: TaskAttemptContext,
+      sparkPartitionId: Int
+  ): WriteTaskResult = {
 
     val dataWriter =
-      if (commitProtocol.sparkPartitionId != 0) {
+      if (sparkPartitionId != 0) {
         // In case of empty job, leave first partition to save meta for file 
format like parquet.
         new EmptyDirectoryDataWriter(description, taskAttemptContext, 
committer)
       } else if (description.partitionColumns.isEmpty) {
@@ -135,39 +78,35 @@ class CHColumnarWriteFilesRDD(
   }
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[WriterCommitMessage] = {
-    val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, 
description, committer)
 
+    val commitProtocol = CHColumnarWrite(jobTrackerID, description, committer)
     commitProtocol.setupTask()
-    val writePath = commitProtocol.newTaskAttemptTempPath()
-    val writeFileName = commitProtocol.getFilename
-    logDebug(s"Native staging write path: $writePath and file name: 
$writeFileName")
 
-    var writeTaskResult: WriteTaskResult = null
     try {
       Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-        
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath, 
writeFileName)
 
         // Initialize the native plan
         val iter = firstParent[ColumnarBatch].iterator(split, context)
         assert(iter.hasNext)
         val resultColumnarBatch = iter.next()
         assert(resultColumnarBatch != null)
-        val nativeWriteTaskResult = collectNativeResult(resultColumnarBatch)
-        if (nativeWriteTaskResult.isEmpty) {
-          // If we are writing an empty iterator, then velox would do nothing.
-          // Here we fallback to use vanilla Spark write files to generate an 
empty file for
-          // metadata only.
-          writeTaskResult = writeFilesForEmptyIterator(commitProtocol)
-          // We have done commit task inside `writeFilesForEmptyIterator`.
-        } else {
-          writeTaskResult = nativeWriteTaskResult.get
-          commitProtocol.commitTask()
-        }
+        val writeTaskResult = commitProtocol
+          .commitTask(resultColumnarBatch)
+          .orElse({
+            // If we are writing an empty iterator, then gluten backend would 
do nothing.
+            // Here we fallback to use vanilla Spark write files to generate 
an empty file for
+            // metadata only.
+            Some(writeFilesForEmptyIterator(commitProtocol.taskAttemptContext, 
context.partitionId))
+            // We have done commit task inside `writeFilesForEmptyIterator`.
+          })
+          .get
+        reportTaskMetrics(writeTaskResult)
+        Iterator.single(writeTaskResult)
       })(
         catchBlock = {
           // If there is an error, abort the task
           commitProtocol.abortTask()
-          logError(s"Job ${commitProtocol.getJobId} aborted.")
+          logError(s"Job ${commitProtocol.jobId} aborted.")
         }
       )
     } catch {
@@ -177,14 +116,9 @@ class CHColumnarWriteFilesRDD(
         throw new TaskOutputFileAlreadyExistException(f)
       case t: Throwable =>
         throw new SparkException(
-          s"Task failed while writing rows to staging path: $writePath, " +
-            s"output path: ${description.path}",
+          s"Task failed while writing rows to output path: 
${description.path}",
           t)
     }
-
-    assert(writeTaskResult != null)
-    reportTaskMetrics(writeTaskResult)
-    Iterator.single(writeTaskResult)
   }
 
   override protected def getPartitions: Array[Partition] = 
firstParent[ColumnarBatch].partitions
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
index 17eb0ed0b..7526e6d3d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala
@@ -127,7 +127,7 @@ object CHExecUtil extends Logging {
         result
       }
 
-      override def next: UnsafeRow = {
+      override def next(): UnsafeRow = {
         if (rowId >= rows) throw new NoSuchElementException
 
         val (offset, length) = (rowInfo.offsets(rowId), rowInfo.lengths(rowId))
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
index 8f8351bae..d6f9a0162 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseDeltaParquetWriteSuite.scala
@@ -1251,7 +1251,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
     runTPCHQueryBySQL(1, sqlStr) { _ => {} }
   }
 
-  test("test parquet optimize basic") {
+  testSparkVersionLE33("test parquet optimize basic") {
     withSQLConf("spark.databricks.delta.optimize.maxFileSize" -> "20000000") {
       spark.sql(s"""
                    |DROP TABLE IF EXISTS lineitem_delta_parquet_optimize;
@@ -1286,7 +1286,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
     }
   }
 
-  test("test parquet optimize partitioned by one low card column") {
+  testSparkVersionLE33("test parquet optimize partitioned by one low card 
column") {
     spark.sql(s"""
                  |DROP TABLE IF EXISTS lineitem_delta_parquet_optimize_p2;
                  |""".stripMargin)
@@ -1325,7 +1325,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
     assert(ret2.apply(0).get(0) == 600572)
   }
 
-  test("test parquet optimize parallel delete") {
+  testSparkVersionLE33("test parquet optimize parallel delete") {
     withSQLConf("spark.databricks.delta.vacuum.parallelDelete.enabled" -> 
"true") {
       spark.sql(s"""
                    |DROP TABLE IF EXISTS lineitem_delta_parquet_optimize_p4;
@@ -1356,7 +1356,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
     }
   }
 
-  test("test parquet optimize with the path based table") {
+  testSparkVersionLE33("test parquet optimize with the path based table") {
     val dataPath = s"$basePath/lineitem_delta_parquet_optimize_path_based"
     clearDataPath(dataPath)
     withSQLConf(
@@ -1372,14 +1372,16 @@ class GlutenClickHouseDeltaParquetWriteSuite
         .mode(SaveMode.Append)
         .save(dataPath)
 
+      assert(countFiles(new File(dataPath)) === 51)
+
       val clickhouseTable = DeltaTable.forPath(spark, dataPath)
       clickhouseTable.optimize().executeCompaction()
 
       clickhouseTable.vacuum(0.0)
       if (sparkVersion.equals("3.2")) {
-        assert(countFiles(new File(dataPath)) == 27)
+        assert(countFiles(new File(dataPath)) === 27)
       } else {
-        assert(countFiles(new File(dataPath)) == 29)
+        assert(countFiles(new File(dataPath)) === 29)
       }
 
       val ret = spark.sql(s"select count(*) from 
clickhouse.`$dataPath`").collect()
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
index 1f99947e5..11710a758 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeWriteTableSuite.scala
@@ -187,7 +187,7 @@ class GlutenClickHouseNativeWriteTableSuite
       checkNative: Boolean = true): Unit = nativeWrite {
     format =>
       val (table_name, table_create_sql, insert_sql) = f(format)
-      withDestinationTable(table_name, table_create_sql) {
+      withDestinationTable(table_name, Option(table_create_sql)) {
         checkInsertQuery(insert_sql, checkNative)
         Option(extraCheck).foreach(_(table_name, format))
       }
@@ -218,15 +218,36 @@ class GlutenClickHouseNativeWriteTableSuite
   }
 
   test("supplier: csv to parquet- insert overwrite local directory") {
+    val partitionNumber = 7
     withSource(supplierDF, "supplier") {
-      nativeWrite {
-        format =>
+      nativeWrite2(
+        format => {
           val sql =
             s"""insert overwrite local directory
                |'$basePath/test_insert_into_${format}_supplier'
-               |stored as $format select * from supplier""".stripMargin
-          checkInsertQuery(sql, checkNative = true)
-      }
+               |stored as $format
+               |select /*+ REPARTITION($partitionNumber) */ * from 
supplier""".stripMargin
+          (s"test_insert_into_${format}_supplier", null, sql)
+        },
+        (table_name, format) => {
+          // spark 3.2 without orc or parquet suffix
+          val files = recursiveListFiles(new File(s"$basePath/$table_name"))
+            .map(_.getName)
+            .filterNot(s => s.endsWith(s".crc") || s.equals("_SUCCESS"))
+
+          lazy val fileNames = {
+            val dir = s"$basePath/$table_name"
+            recursiveListFiles(new File(dir))
+              .map(f => f.getAbsolutePath.stripPrefix(dir))
+              .sorted
+              .mkString("\n")
+          }
+
+          lazy val errorMessage =
+            s"Search $basePath/$table_name with suffix .$format, all files: \n 
$fileNames"
+          assert(files.length === partitionNumber, errorMessage)
+        }
+      )
     }
   }
 
@@ -851,7 +872,7 @@ class GlutenClickHouseNativeWriteTableSuite
         val table_name = "t_" + format
         withDestinationTable(
           table_name,
-          s"create table $table_name (id int, str string) stored as $format") {
+          Some(s"create table $table_name (id int, str string) stored as 
$format")) {
           checkInsertQuery(
             s"insert overwrite table $table_name " +
               "select id, cast(id as string) from range(10) union all " +
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
 
b/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
index fc30d151b..4bee3f177 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/spark/gluten/NativeWriteChecker.scala
@@ -62,9 +62,10 @@ trait NativeWriteChecker
       spark.sql(sqlStr)
     }
 
-  def withDestinationTable(table: String, createTableSql: String = "select 
1")(f: => Unit): Unit = {
+  def withDestinationTable(table: String, createTableSql: Option[String] = 
None)(
+      f: => Unit): Unit = {
     spark.sql(s"drop table IF EXISTS $table")
-    spark.sql(s"$createTableSql")
+    createTableSql.foreach(spark.sql)
     f
   }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
similarity index 82%
rename from 
gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
rename to 
backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
index 5e3ab83e3..845f2f98f 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec, 
HadoopMapReduceCommitProtocol}
+import org.apache.spark.internal.io.{FileCommitProtocol, 
HadoopMapReduceCommitProtocol}
 import org.apache.spark.sql.execution.datasources.WriteJobDescription
 import org.apache.spark.util.Utils
 
@@ -41,9 +41,9 @@ class SparkWriteFilesCommitProtocol(
   extends Logging {
   assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol])
 
-  val sparkStageId: Int = TaskContext.get().stageId()
-  val sparkPartitionId: Int = TaskContext.get().partitionId()
-  private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
+  val sparkStageId = TaskContext.get().stageId()
+  val sparkPartitionId = TaskContext.get().partitionId()
+  val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & 
Int.MaxValue
   private val jobId = createJobID(jobTrackerID, sparkStageId)
 
   private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
@@ -68,21 +68,6 @@ class SparkWriteFilesCommitProtocol(
     field.get(committer).asInstanceOf[OutputCommitter]
   }
 
-  private lazy val internalGetFilename = {
-    val m = classOf[HadoopMapReduceCommitProtocol]
-      .getDeclaredMethod("getFilename", classOf[TaskAttemptContext], 
classOf[FileNameSpec])
-    m.setAccessible(true)
-    m
-  }
-
-  def getFilename: String = {
-    val fileCounter = 0
-    val suffix = f".c$fileCounter%03d" +
-      description.outputWriterFactory.getFileExtension(taskAttemptContext)
-    val fileNameSpec = FileNameSpec("", suffix)
-    internalGetFilename.invoke(committer, taskAttemptContext, 
fileNameSpec).asInstanceOf[String]
-  }
-
   def setupTask(): Unit = {
     committer.setupTask(taskAttemptContext)
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
index 727613f56..d981de804 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/planner/plan/GlutenPlanModel.scala
@@ -49,7 +49,7 @@ object GlutenPlanModel {
     override protected def doExecute(): RDD[InternalRow] = throw new 
IllegalStateException()
     override def output: Seq[Attribute] = metadata.schema().output
 
-    override def supportsColumnar(): Boolean = {
+    override def supportsColumnar: Boolean = {
       batchType != Convention.BatchType.None
     }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index 7f6fd01ac..9c54653d9 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -49,7 +49,7 @@ abstract class ColumnarWriteFilesExec protected (
 
   override lazy val references: AttributeSet = AttributeSet.empty
 
-  override def supportsColumnar(): Boolean = true
+  override def supportsColumnar: Boolean = true
 
   override def output: Seq[Attribute] = Seq.empty
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to