This is an automated email from the ASF dual-hosted git repository.

qiansun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a04dec6bb9 [GLUTEN-10215][VL] Delta: Native write support for Delta 
3.3.1 / Spark 3.5 (#10801)
a04dec6bb9 is described below

commit a04dec6bb97779d6f942645ed274ed01a7ba767c
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Oct 15 16:06:27 2025 +0100

    [GLUTEN-10215][VL] Delta: Native write support for Delta 3.3.1 / Spark 3.5 
(#10801)
    
    * [VL] Gluten-it: Record the execution time of data generator features
    
    This will be helpful in measuring the write performance of Gluten's lake 
format supports
    
    * [VL] Delta: Native write support for Delta 3.3.1 / Spark 3.5
---
 .../apache/gluten/config/VeloxDeltaConfig.scala    |  43 ++
 ...che.gluten.component.VeloxDelta33WriteComponent |   0
 .../component/VeloxDelta33WriteComponent.scala     |  57 ++
 .../sql/delta/GlutenDeltaParquetFileFormat.scala   | 619 +++++++++++++++++++++
 .../sql/delta/GlutenOptimisticTransaction.scala    | 221 ++++++++
 .../spark/sql/delta/GlutenParquetFileFormat.scala  | 100 ++++
 .../delta/files/GlutenDeltaFileFormatWriter.scala  | 602 ++++++++++++++++++++
 .../stats/GlutenDeltaWriteJobStatsTracker.scala    |  83 +++
 .../datasources/v2/DeltaWriteOperators.scala       |  85 +++
 .../datasources/v2/OffloadDeltaCommand.scala       |  42 ++
 .../apache/spark/sql/delta/DeleteSQLSuite.scala    |   5 +-
 .../apache/spark/sql/delta/DeltaTestUtils.scala    |   4 +-
 .../spark/sql/delta/test/DeltaSQLCommandTest.scala |   3 +
 .../org/apache/gluten/integration/DataGen.scala    |  11 +
 .../gluten/integration/ds/TpcdsDataGen.scala       |   6 +-
 .../apache/gluten/integration/h/TpchDataGen.scala  |   6 +-
 16 files changed, 1874 insertions(+), 13 deletions(-)

diff --git 
a/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala
 
b/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala
new file mode 100644
index 0000000000..99c2d2c26a
--- /dev/null
+++ 
b/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.config
+
+import org.apache.spark.sql.internal.SQLConf
+
+class VeloxDeltaConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {
+  import VeloxDeltaConfig._
+
+  def enableNativeWrite: Boolean = getConf(ENABLE_NATIVE_WRITE)
+}
+
+object VeloxDeltaConfig extends ConfigRegistry {
+
+  def get: VeloxDeltaConfig = {
+    new VeloxDeltaConfig(SQLConf.get)
+  }
+
+  /**
+   * Experimental as the feature now has performance issue because of the 
fallback processing of
+   * statistics.
+   */
+  val ENABLE_NATIVE_WRITE: ConfigEntry[Boolean] =
+    
buildConf("spark.gluten.sql.columnar.backend.velox.delta.enableNativeWrite")
+      .experimental()
+      .doc("Enable native Delta Lake write for Velox backend.")
+      .booleanConf
+      .createWithDefault(false)
+}
diff --git 
a/backends-velox/src-delta33/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDelta33WriteComponent
 
b/backends-velox/src-delta33/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDelta33WriteComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala
new file mode 100644
index 0000000000..f1e06d0702
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.component
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.datasources.v2.{LeafV2CommandExec, 
OffloadDeltaCommand}
+
+class VeloxDelta33WriteComponent extends Component {
+  override def name(): String = "velox-delta33-write"
+
+  override def buildInfo(): Component.BuildInfo =
+    Component.BuildInfo("VeloxDelta33Write", "N/A", "N/A", "N/A")
+
+  override def dependencies(): Seq[Class[_ <: Component]] = 
classOf[VeloxDeltaComponent] :: Nil
+
+  override def injectRules(injector: Injector): Unit = {
+    val legacy = injector.gluten.legacy
+    val ras = injector.gluten.ras
+    legacy.injectTransform {
+      c =>
+        val offload = Seq(
+          OffloadDeltaCommand()
+        ).map(_.toStrcitRule())
+        HeuristicTransform.Simple(
+          Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+          offload)
+    }
+    val offloads: Seq[RasOffload] = Seq(
+      RasOffload.from[ExecutedCommandExec](OffloadDeltaCommand()),
+      RasOffload.from[LeafV2CommandExec](OffloadDeltaCommand())
+    )
+    offloads.foreach(
+      offload =>
+        ras.injectRasRule(
+          c => RasOffload.Rule(offload, Validators.newValidator(new 
GlutenConfig(c.sqlConf)), Nil)))
+  }
+}
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
new file mode 100644
index 0000000000..c661e820ba
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._
+import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, 
Protocol}
+import 
org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
+import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, 
KeepAllRowsFilter, KeepMarkedRowsFilter}
+import org.apache.spark.sql.delta.logging.DeltaLogKeys
+import org.apache.spark.sql.delta.schema.SchemaMergingUtils
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, 
StructField, StructType}
+import org.apache.spark.sql.util.ScalaExtensions._
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchRow, 
ColumnVector}
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.Job
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.util.ContextUtil
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+// spotless:off
+/**
+ * A thin wrapper over the Parquet file format to support
+ *  - columns names without restrictions.
+ *  - populated a column from the deletion vector of this file (if exists) to 
indicate
+ *    whether the row is deleted or not according to the deletion vector. 
Consumers
+ *    of this scan can use the column values to filter out the deleted rows.
+ */
+case class GlutenDeltaParquetFileFormat(
+                                   protocol: Protocol,
+                                   metadata: Metadata,
+                                   nullableRowTrackingFields: Boolean = false,
+                                   optimizationsEnabled: Boolean = true,
+                                   tablePath: Option[String] = None,
+                                   isCDCRead: Boolean = false)
+  extends GlutenParquetFileFormat
+    with LoggingShims {
+  // Validate either we have all arguments for DV enabled read or none of them.
+  if (hasTablePath) {
+    SparkSession.getActiveSession.map { session =>
+      val useMetadataRowIndex =
+        
session.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
+      require(useMetadataRowIndex == optimizationsEnabled,
+        "Wrong arguments for Delta table scan with deletion vectors")
+    }
+  }
+
+  SparkSession.getActiveSession.ifDefined { session =>
+    TypeWidening.assertTableReadable(session.sessionState.conf, protocol, 
metadata)
+  }
+
+
+  val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
+  val referenceSchema: StructType = metadata.schema
+
+  if (columnMappingMode == IdMapping) {
+    val requiredReadConf = SQLConf.PARQUET_FIELD_ID_READ_ENABLED
+    
require(SparkSession.getActiveSession.exists(_.sessionState.conf.getConf(requiredReadConf)),
+      s"${requiredReadConf.key} must be enabled to support Delta id column 
mapping mode")
+    val requiredWriteConf = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED
+    
require(SparkSession.getActiveSession.exists(_.sessionState.conf.getConf(requiredWriteConf)),
+      s"${requiredWriteConf.key} must be enabled to support Delta id column 
mapping mode")
+  }
+
+  override def shortName(): String = "parquet"
+
+  override def toString: String = "Parquet"
+
+  /**
+   * prepareSchemaForRead must only be used for parquet read.
+   * It removes "PARQUET_FIELD_ID_METADATA_KEY" for name mapping mode which 
address columns by
+   * physical name instead of id.
+   */
+  def prepareSchemaForRead(inputSchema: StructType): StructType = {
+    val schema = DeltaColumnMapping.createPhysicalSchema(
+      inputSchema, referenceSchema, columnMappingMode)
+    if (columnMappingMode == NameMapping) {
+      SchemaMergingUtils.transformColumns(schema) { (_, field, _) =>
+        field.copy(metadata = new MetadataBuilder()
+          .withMetadata(field.metadata)
+          .remove(DeltaColumnMapping.PARQUET_FIELD_ID_METADATA_KEY)
+          .remove(DeltaColumnMapping.PARQUET_FIELD_NESTED_IDS_METADATA_KEY)
+          .build())
+      }
+    } else schema
+  }
+
+  /**
+   * Prepares filters so that they can be pushed down into the Parquet reader.
+   *
+   * If column mapping is enabled, then logical column names in the filters 
will be replaced with
+   * their corresponding physical column names. This is necessary as the 
Parquet files will use
+   * physical column names, and the requested schema pushed down in the 
Parquet reader will also use
+   * physical column names.
+   */
+  private def prepareFiltersForRead(filters: Seq[Filter]): Seq[Filter] = {
+    if (!optimizationsEnabled) {
+      Seq.empty
+    } else if (columnMappingMode != NoMapping) {
+      import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+      val physicalNameMap = 
DeltaColumnMapping.getLogicalNameToPhysicalNameMap(referenceSchema)
+        .map { case (logicalName, physicalName) => (logicalName.quoted, 
physicalName.quoted) }
+      filters.flatMap(translateFilterForColumnMapping(_, physicalNameMap))
+    } else {
+      filters
+    }
+  }
+
+  override def isSplitable(
+                            sparkSession: SparkSession,
+                            options: Map[String, String],
+                            path: Path): Boolean = optimizationsEnabled
+
+  def hasTablePath: Boolean = tablePath.isDefined
+
+  /**
+   * We sometimes need to replace FileFormat within LogicalPlans, so we have 
to override
+   * `equals` to ensure file format changes are captured
+   */
+  override def equals(other: Any): Boolean = {
+    other match {
+      case ff: GlutenDeltaParquetFileFormat =>
+        ff.columnMappingMode == columnMappingMode &&
+          ff.referenceSchema == referenceSchema &&
+          ff.optimizationsEnabled == optimizationsEnabled
+      case _ => false
+    }
+  }
+
+  override def hashCode(): Int = getClass.getCanonicalName.hashCode()
+
+  override def buildReaderWithPartitionValues(
+                                               sparkSession: SparkSession,
+                                               dataSchema: StructType,
+                                               partitionSchema: StructType,
+                                               requiredSchema: StructType,
+                                               filters: Seq[Filter],
+                                               options: Map[String, String],
+                                               hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
+
+    val useMetadataRowIndexConf = 
DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX
+    val useMetadataRowIndex = 
sparkSession.sessionState.conf.getConf(useMetadataRowIndexConf)
+
+    val parquetDataReader: PartitionedFile => Iterator[InternalRow] =
+      super.buildReaderWithPartitionValues(
+        sparkSession,
+        prepareSchemaForRead(dataSchema),
+        prepareSchemaForRead(partitionSchema),
+        prepareSchemaForRead(requiredSchema),
+        prepareFiltersForRead(filters),
+        options,
+        hadoopConf)
+
+    val schemaWithIndices = requiredSchema.fields.zipWithIndex
+    def findColumn(name: String): Option[ColumnMetadata] = {
+      val results = schemaWithIndices.filter(_._1.name == name)
+      if (results.length > 1) {
+        throw new IllegalArgumentException(
+          s"There are more than one column with name=`$name` requested in the 
reader output")
+      }
+      results.headOption.map(e => ColumnMetadata(e._2, e._1))
+    }
+
+    val isRowDeletedColumn = findColumn(IS_ROW_DELETED_COLUMN_NAME)
+    val rowIndexColumnName = if (useMetadataRowIndex) {
+      ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+    } else {
+      ROW_INDEX_COLUMN_NAME
+    }
+    val rowIndexColumn = findColumn(rowIndexColumnName)
+
+    // We don't have any additional columns to generate, just return the 
original reader as is.
+    if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) return 
parquetDataReader
+
+    // We are using the row_index col generated by the parquet reader and 
there are no more
+    // columns to generate.
+    if (useMetadataRowIndex && isRowDeletedColumn.isEmpty) return 
parquetDataReader
+
+    // Verify that either predicate pushdown with metadata column is enabled 
or optimizations
+    // are disabled.
+    require(useMetadataRowIndex || !optimizationsEnabled,
+      "Cannot generate row index related metadata with file splitting or 
predicate pushdown")
+
+    if (hasTablePath && isRowDeletedColumn.isEmpty) {
+      throw new IllegalArgumentException(
+        s"Expected a column $IS_ROW_DELETED_COLUMN_NAME in the schema")
+    }
+
+    val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
+
+    val useOffHeapBuffers = 
sparkSession.sessionState.conf.offHeapColumnVectorEnabled
+    (partitionedFile: PartitionedFile) => {
+      val rowIteratorFromParquet = parquetDataReader(partitionedFile)
+      try {
+        val iterToReturn =
+          iteratorWithAdditionalMetadataColumns(
+            partitionedFile,
+            rowIteratorFromParquet,
+            isRowDeletedColumn,
+            rowIndexColumn,
+            useOffHeapBuffers,
+            serializableHadoopConf,
+            useMetadataRowIndex)
+        iterToReturn.asInstanceOf[Iterator[InternalRow]]
+      } catch {
+        case NonFatal(e) =>
+          // Close the iterator if it is a closeable resource. The 
`ParquetFileFormat` opens
+          // the file and returns `RecordReaderIterator` (which implements 
`AutoCloseable` and
+          // `Iterator`) instance as a `Iterator`.
+          rowIteratorFromParquet match {
+            case resource: AutoCloseable => closeQuietly(resource)
+            case _ => // do nothing
+          }
+          throw e
+      }
+    }
+  }
+
+  override def supportFieldName(name: String): Boolean = {
+    if (columnMappingMode != NoMapping) true else super.supportFieldName(name)
+  }
+
+  override def metadataSchemaFields: Seq[StructField] = {
+    // TODO(SPARK-47731): Parquet reader in Spark has a bug where a file 
containing 2b+ rows
+    // in a single rowgroup causes it to run out of the `Integer` range.
+    // For Delta Parquet readers don't expose the row_index field as a 
metadata field when it is
+    // not strictly required. We do expose it when Row Tracking or DVs are 
enabled.
+    // In general, having 2b+ rows in a single rowgroup is not a common use 
case. When the issue is
+    // hit an exception is thrown.
+    (protocol, metadata) match {
+      // We should not expose row tracking fields for CDC reads.
+      case (p, m) if RowId.isEnabled(p, m) && !isCDCRead =>
+        val extraFields = RowTracking.createMetadataStructFields(p, m, 
nullableRowTrackingFields)
+        super.metadataSchemaFields ++ extraFields
+      case (p, m) if deletionVectorsReadable(p, m) => 
super.metadataSchemaFields
+      case _ => super.metadataSchemaFields.filter(_ != 
ParquetFileFormat.ROW_INDEX_FIELD)
+    }
+  }
+
+  override def prepareWrite(
+                             sparkSession: SparkSession,
+                             job: Job,
+                             options: Map[String, String],
+                             dataSchema: StructType): OutputWriterFactory = {
+    val factory = super.prepareWrite(sparkSession, job, options, dataSchema)
+    val conf = ContextUtil.getConfiguration(job)
+    // Always write timestamp as TIMESTAMP_MICROS for Iceberg compat based on 
Iceberg spec
+    if (IcebergCompatV1.isEnabled(metadata) || 
IcebergCompatV2.isEnabled(metadata)) {
+      conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
+        SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)
+    }
+    if (IcebergCompatV2.isEnabled(metadata)) {
+      // For Uniform with IcebergCompatV2, we need to write nested field IDs 
for list and map
+      // types to the parquet schema. Spark currently does not support it so 
we hook in our
+      // own write support class.
+      ParquetOutputFormat.setWriteSupportClass(job, 
classOf[DeltaParquetWriteSupport])
+    }
+    factory
+  }
+
+  override def fileConstantMetadataExtractors: Map[String, PartitionedFile => 
Any] = {
+    val extractBaseRowId: PartitionedFile => Any = { file =>
+      file.otherConstantMetadataColumnValues.getOrElse(RowId.BASE_ROW_ID, {
+        throw new IllegalStateException(
+          s"Missing ${RowId.BASE_ROW_ID} value for file '${file.filePath}'")
+      })
+    }
+    val extractDefaultRowCommitVersion: PartitionedFile => Any = { file =>
+      file.otherConstantMetadataColumnValues
+        .getOrElse(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, {
+          throw new IllegalStateException(
+            s"Missing ${DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME} 
value " +
+              s"for file '${file.filePath}'")
+        })
+    }
+    super.fileConstantMetadataExtractors
+      .updated(RowId.BASE_ROW_ID, extractBaseRowId)
+      .updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, 
extractDefaultRowCommitVersion)
+  }
+
+  def copyWithDVInfo(
+                      tablePath: String,
+                      optimizationsEnabled: Boolean): 
GlutenDeltaParquetFileFormat = {
+    // When predicate pushdown is enabled we allow both splits and predicate 
pushdown.
+    this.copy(
+      optimizationsEnabled = optimizationsEnabled,
+      tablePath = Some(tablePath))
+  }
+
+  /**
+   * Modifies the data read from underlying Parquet reader by populating one 
or both of the
+   * following metadata columns.
+   *   - [[IS_ROW_DELETED_COLUMN_NAME]] - row deleted status from deletion 
vector corresponding
+   *   to this file
+   *   - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file. Note, 
this column is only
+   *     populated when we are not using _metadata.row_index column.
+   */
+  private def iteratorWithAdditionalMetadataColumns(
+                                                     partitionedFile: 
PartitionedFile,
+                                                     iterator: 
Iterator[Object],
+                                                     isRowDeletedColumnOpt: 
Option[ColumnMetadata],
+                                                     rowIndexColumnOpt: 
Option[ColumnMetadata],
+                                                     useOffHeapBuffers: 
Boolean,
+                                                     serializableHadoopConf: 
SerializableConfiguration,
+                                                     useMetadataRowIndex: 
Boolean): Iterator[Object] = {
+    require(!useMetadataRowIndex || rowIndexColumnOpt.isDefined,
+      "useMetadataRowIndex is enabled but rowIndexColumn is not defined.")
+
+    val rowIndexFilterOpt = isRowDeletedColumnOpt.map { col =>
+      // Fetch the DV descriptor from the broadcast map and create a row index 
filter
+      val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues
+        .get(FILE_ROW_INDEX_FILTER_ID_ENCODED)
+      val filterTypeOpt = partitionedFile.otherConstantMetadataColumnValues
+        .get(FILE_ROW_INDEX_FILTER_TYPE)
+      if (dvDescriptorOpt.isDefined && filterTypeOpt.isDefined) {
+        val rowIndexFilter = filterTypeOpt.get match {
+          case RowIndexFilterType.IF_CONTAINED => DropMarkedRowsFilter
+          case RowIndexFilterType.IF_NOT_CONTAINED => KeepMarkedRowsFilter
+          case unexpectedFilterType => throw new IllegalStateException(
+            s"Unexpected row index filter type: ${unexpectedFilterType}")
+        }
+        rowIndexFilter.createInstance(
+          
DeletionVectorDescriptor.deserializeFromBase64(dvDescriptorOpt.get.asInstanceOf[String]),
+          serializableHadoopConf.value,
+          tablePath.map(new Path(_)))
+      } else if (dvDescriptorOpt.isDefined || filterTypeOpt.isDefined) {
+        throw new IllegalStateException(
+          s"Both ${FILE_ROW_INDEX_FILTER_ID_ENCODED} and 
${FILE_ROW_INDEX_FILTER_TYPE} " +
+            "should either both have values or no values at all.")
+      } else {
+        KeepAllRowsFilter
+      }
+    }
+
+    // We only generate the row index column when predicate pushdown is not 
enabled.
+    val rowIndexColumnToWriteOpt = if (useMetadataRowIndex) None else 
rowIndexColumnOpt
+    val metadataColumnsToWrite =
+      Seq(isRowDeletedColumnOpt, 
rowIndexColumnToWriteOpt).filter(_.nonEmpty).map(_.get)
+
+    // When metadata.row_index is not used there is no way to verify the 
Parquet index is
+    // starting from 0. We disable the splits, so the assumption is 
ParquetFileFormat respects
+    // that.
+    var rowIndex: Long = 0
+
+    // Used only when non-column row batches are received from the Parquet 
reader
+    val tempVector = new OnHeapColumnVector(1, ByteType)
+
+    iterator.map { row =>
+      row match {
+        case batch: ColumnarBatch => // When vectorized Parquet reader is 
enabled.
+          val size = batch.numRows()
+          // Create vectors for all needed metadata columns.
+          // We can't use the one from Parquet reader as it set the
+          // [[WritableColumnVector.isAllNulls]] to true and it can't be reset 
with using any
+          // public APIs.
+          trySafely(useOffHeapBuffers, size, metadataColumnsToWrite) { 
writableVectors =>
+            val indexVectorTuples = new ArrayBuffer[(Int, ColumnVector)]
+
+            // When predicate pushdown is enabled we use _metadata.row_index. 
Therefore,
+            // we only need to construct the isRowDeleted column.
+            var index = 0
+            isRowDeletedColumnOpt.foreach { columnMetadata =>
+              val isRowDeletedVector = writableVectors(index)
+              if (useMetadataRowIndex) {
+                rowIndexFilterOpt.get.materializeIntoVectorWithRowIndex(
+                  size, batch.column(rowIndexColumnOpt.get.index), 
isRowDeletedVector)
+              } else {
+                rowIndexFilterOpt.get
+                  .materializeIntoVector(rowIndex, rowIndex + size, 
isRowDeletedVector)
+              }
+              indexVectorTuples += (columnMetadata.index -> isRowDeletedVector)
+              index += 1
+            }
+
+            rowIndexColumnToWriteOpt.foreach { columnMetadata =>
+              val rowIndexVector = writableVectors(index)
+              // populate the row index column value.
+              for (i <- 0 until size) {
+                rowIndexVector.putLong(i, rowIndex + i)
+              }
+
+              indexVectorTuples += (columnMetadata.index -> rowIndexVector)
+              index += 1
+            }
+
+            val newBatch = replaceVectors(batch, indexVectorTuples.toSeq: _*)
+            rowIndex += size
+            newBatch
+          }
+
+        case columnarRow: ColumnarBatchRow =>
+          // When vectorized reader is enabled but returns immutable rows 
instead of
+          // columnar batches [[ColumnarBatchRow]]. So we have to copy the row 
as a
+          // mutable [[InternalRow]] and set the `row_index` and 
`is_row_deleted`
+          // column values. This is not efficient. It should affect only the 
wide
+          // tables. https://github.com/delta-io/delta/issues/2246
+          val newRow = columnarRow.copy();
+          isRowDeletedColumnOpt.foreach { columnMetadata =>
+            val rowIndexForFiltering = if (useMetadataRowIndex) {
+              columnarRow.getLong(rowIndexColumnOpt.get.index)
+            } else {
+              rowIndex
+            }
+            
rowIndexFilterOpt.get.materializeSingleRowWithRowIndex(rowIndexForFiltering, 
tempVector)
+            newRow.setByte(columnMetadata.index, tempVector.getByte(0))
+          }
+
+          rowIndexColumnToWriteOpt
+            .foreach(columnMetadata => newRow.setLong(columnMetadata.index, 
rowIndex))
+          rowIndex += 1
+
+          newRow
+        case rest: InternalRow => // When vectorized Parquet reader is disabled
+          // Temporary vector variable used to get DV values from 
RowIndexFilter
+          // Currently the RowIndexFilter only supports writing into a 
columnar vector
+          // and doesn't have methods to get DV value for a specific row index.
+          // TODO: This is not efficient, but it is ok given the default 
reader is vectorized
+          isRowDeletedColumnOpt.foreach { columnMetadata =>
+            val rowIndexForFiltering = if (useMetadataRowIndex) {
+              rest.getLong(rowIndexColumnOpt.get.index)
+            } else {
+              rowIndex
+            }
+            
rowIndexFilterOpt.get.materializeSingleRowWithRowIndex(rowIndexForFiltering, 
tempVector)
+            rest.setByte(columnMetadata.index, tempVector.getByte(0))
+          }
+
+          rowIndexColumnToWriteOpt
+            .foreach(columnMetadata => rest.setLong(columnMetadata.index, 
rowIndex))
+          rowIndex += 1
+          rest
+        case others =>
+          throw new RuntimeException(
+            s"Parquet reader returned an unknown row type: 
${others.getClass.getName}")
+      }
+    }
+  }
+
+  /**
+   * Translates the filter to use physical column names instead of logical 
column names.
+   * This is needed when the column mapping mode is set to `NameMapping` or 
`IdMapping`
+   * to match the requested schema that's passed to the [[ParquetFileFormat]].
+   */
+  private def translateFilterForColumnMapping(
+                                               filter: Filter,
+                                               physicalNameMap: Map[String, 
String]): Option[Filter] = {
+    object PhysicalAttribute {
+      def unapply(attribute: String): Option[String] = {
+        physicalNameMap.get(attribute)
+      }
+    }
+
+    filter match {
+      case EqualTo(PhysicalAttribute(physicalAttribute), value) =>
+        Some(EqualTo(physicalAttribute, value))
+      case EqualNullSafe(PhysicalAttribute(physicalAttribute), value) =>
+        Some(EqualNullSafe(physicalAttribute, value))
+      case GreaterThan(PhysicalAttribute(physicalAttribute), value) =>
+        Some(GreaterThan(physicalAttribute, value))
+      case GreaterThanOrEqual(PhysicalAttribute(physicalAttribute), value) =>
+        Some(GreaterThanOrEqual(physicalAttribute, value))
+      case LessThan(PhysicalAttribute(physicalAttribute), value) =>
+        Some(LessThan(physicalAttribute, value))
+      case LessThanOrEqual(PhysicalAttribute(physicalAttribute), value) =>
+        Some(LessThanOrEqual(physicalAttribute, value))
+      case In(PhysicalAttribute(physicalAttribute), values) =>
+        Some(In(physicalAttribute, values))
+      case IsNull(PhysicalAttribute(physicalAttribute)) =>
+        Some(IsNull(physicalAttribute))
+      case IsNotNull(PhysicalAttribute(physicalAttribute)) =>
+        Some(IsNotNull(physicalAttribute))
+      case And(left, right) =>
+        val newLeft = translateFilterForColumnMapping(left, physicalNameMap)
+        val newRight = translateFilterForColumnMapping(right, physicalNameMap)
+        (newLeft, newRight) match {
+          case (Some(l), Some(r)) => Some(And(l, r))
+          case (Some(l), None) => Some(l)
+          case (_, _) => newRight
+        }
+      case Or(left, right) =>
+        val newLeft = translateFilterForColumnMapping(left, physicalNameMap)
+        val newRight = translateFilterForColumnMapping(right, physicalNameMap)
+        (newLeft, newRight) match {
+          case (Some(l), Some(r)) => Some(Or(l, r))
+          case (_, _) => None
+        }
+      case Not(child) =>
+        translateFilterForColumnMapping(child, physicalNameMap).map(Not)
+      case StringStartsWith(PhysicalAttribute(physicalAttribute), value) =>
+        Some(StringStartsWith(physicalAttribute, value))
+      case StringEndsWith(PhysicalAttribute(physicalAttribute), value) =>
+        Some(StringEndsWith(physicalAttribute, value))
+      case StringContains(PhysicalAttribute(physicalAttribute), value) =>
+        Some(StringContains(physicalAttribute, value))
+      case AlwaysTrue() => Some(AlwaysTrue())
+      case AlwaysFalse() => Some(AlwaysFalse())
+      case _ =>
+        logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, 
filter)}")
+        None
+    }
+  }
+}
+
+object GlutenDeltaParquetFileFormat {
+  /**
+   * Column name used to identify whether the row read from the parquet file 
is marked
+   * as deleted according to the Delta table deletion vectors
+   */
+  val IS_ROW_DELETED_COLUMN_NAME = "__delta_internal_is_row_deleted"
+  val IS_ROW_DELETED_STRUCT_FIELD = StructField(IS_ROW_DELETED_COLUMN_NAME, 
ByteType)
+
+  /** Row index for each column */
+  val ROW_INDEX_COLUMN_NAME = "__delta_internal_row_index"
+  val ROW_INDEX_STRUCT_FIELD = StructField(ROW_INDEX_COLUMN_NAME, LongType)
+
+  /** The key to the encoded row index filter identifier value of the
+   * [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */
+  val FILE_ROW_INDEX_FILTER_ID_ENCODED = "row_index_filter_id_encoded"
+
+  /** The key to the row index filter type value of the
+   * [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */
+  val FILE_ROW_INDEX_FILTER_TYPE = "row_index_filter_type"
+
+  /** Utility method to create a new writable vector */
+  private def newVector(
+                         useOffHeapBuffers: Boolean, size: Int, dataType: 
StructField): WritableColumnVector = {
+    if (useOffHeapBuffers) {
+      OffHeapColumnVector.allocateColumns(size, Seq(dataType).toArray)(0)
+    } else {
+      OnHeapColumnVector.allocateColumns(size, Seq(dataType).toArray)(0)
+    }
+  }
+
+  /** Try the operation, if the operation fails release the created resource */
+  private def trySafely[R <: WritableColumnVector, T](
+                                                       useOffHeapBuffers: 
Boolean,
+                                                       size: Int,
+                                                       columns: 
Seq[ColumnMetadata])(f: Seq[WritableColumnVector] => T): T = {
+    val resources = new ArrayBuffer[WritableColumnVector](columns.size)
+    try {
+      columns.foreach(col => resources.append(newVector(useOffHeapBuffers, 
size, col.structField)))
+      f(resources.toSeq)
+    } catch {
+      case NonFatal(e) =>
+        resources.foreach(closeQuietly(_))
+        throw e
+    }
+  }
+
+  /** Utility method to quietly close an [[AutoCloseable]] */
+  private def closeQuietly(closeable: AutoCloseable): Unit = {
+    if (closeable != null) {
+      try {
+        closeable.close()
+      } catch {
+        case NonFatal(_) => // ignore
+      }
+    }
+  }
+
+  /**
+   * Helper method to replace the vectors in given [[ColumnarBatch]].
+   * New vectors and its index in the batch are given as tuples.
+   */
+  private def replaceVectors(
+                              batch: ColumnarBatch,
+                              indexVectorTuples: (Int, ColumnVector) *): 
ColumnarBatch = {
+    val vectors = ArrayBuffer[ColumnVector]()
+    for (i <- 0 until batch.numCols()) {
+      var replaced: Boolean = false
+      for (indexVectorTuple <- indexVectorTuples) {
+        val index = indexVectorTuple._1
+        val vector = indexVectorTuple._2
+        if (indexVectorTuple._1 == i) {
+          vectors += indexVectorTuple._2
+          // Make sure to close the existing vector allocated in the Parquet
+          batch.column(i).close()
+          replaced = true
+        }
+      }
+      if (!replaced) {
+        vectors += batch.column(i)
+      }
+    }
+    new ColumnarBatch(vectors.toArray, batch.numRows())
+  }
+
+  /** Helper class to encapsulate column info */
+  case class ColumnMetadata(index: Int, structField: StructField)
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
new file mode 100644
index 0000000000..275d2c0cd9
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.config.VeloxDeltaConfig
+
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
+import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, 
DeltaInvariantCheckerExec}
+import org.apache.spark.sql.delta.files.{GlutenDeltaFileFormatWriter, 
TransactionalWrite}
+import org.apache.spark.sql.delta.hooks.AutoCompact
+import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec
+import org.apache.spark.sql.delta.schema.InnerInvariantViolationException
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import 
org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker, 
GlutenDeltaJobStatisticsTracker}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
FileFormatWriter, WriteJobStatsTracker}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ScalaExtensions.OptionExt
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.mutable.ListBuffer
+
+class GlutenOptimisticTransaction(delegate: OptimisticTransaction)
+  extends OptimisticTransaction(
+    delegate.deltaLog,
+    delegate.catalogTable,
+    delegate.snapshot
+  ) {
+
+  override def writeFiles(
+      inputData: Dataset[_],
+      writeOptions: Option[DeltaOptions],
+      isOptimize: Boolean,
+      additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
+    hasWritten = true
+
+    val spark = inputData.sparkSession
+    val veloxDeltaConfig = new VeloxDeltaConfig(spark.sessionState.conf)
+
+    val (data, partitionSchema) = performCDCPartition(inputData)
+    val outputPath = deltaLog.dataPath
+
+    val (queryExecution, output, generatedColumnConstraints, trackFromData) =
+      normalizeData(deltaLog, writeOptions, data)
+    // Use the track set from the transaction if set,
+    // otherwise use the track set from `normalizeData()`.
+    val trackIdentityHighWaterMarks = 
trackHighWaterMarks.getOrElse(trackFromData)
+
+    val partitioningColumns = getPartitioningColumns(partitionSchema, output)
+
+    val committer = getCommitter(outputPath)
+
+    val (statsDataSchema, _) = getStatsSchema(output, partitionSchema)
+
+    // If Statistics Collection is enabled, then create a stats tracker that 
will be injected during
+    // the FileFormatWriter.write call below and will collect per-file stats 
using
+    // StatisticsCollection
+    val optionalStatsTracker =
+      getOptionalStatsTrackerAndStatsCollection(output, outputPath, 
partitionSchema, data)._1.map(
+        new GlutenDeltaJobStatisticsTracker(_))
+
+    val constraints =
+      Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ 
additionalConstraints
+
+    val identityTrackerOpt = IdentityColumn
+      .createIdentityColumnStatsTracker(
+        spark,
+        deltaLog.newDeltaHadoopConf(),
+        outputPath,
+        metadata.schema,
+        statsDataSchema,
+        trackIdentityHighWaterMarks
+      )
+      .map(new GlutenDeltaIdentityColumnStatsTracker(_))
+
+    SQLExecution.withNewExecutionId(queryExecution, 
Option("deltaTransactionalWrite")) {
+      val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, 
Map.empty, output)
+
+      val empty2NullPlan =
+        convertEmptyToNullIfNeeded(queryExecution.executedPlan, 
partitioningColumns, constraints)
+      val maybeCheckInvariants = if (constraints.isEmpty) {
+        // Compared to vanilla Delta, we simply avoid adding the invariant 
checker
+        // when the constraint list is empty, to avoid the unnecessary 
transitions
+        // added around the invariant checker.
+        empty2NullPlan
+      } else {
+        DeltaInvariantCheckerExec(empty2NullPlan, constraints)
+      }
+      // No need to plan optimized write if the write command is OPTIMIZE, 
which aims to produce
+      // evenly-balanced data files already.
+      val physicalPlan =
+        if (
+          !isOptimize &&
+          shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
+        ) {
+          DeltaOptimizedWriterExec(maybeCheckInvariants, 
metadata.partitionColumns, deltaLog)
+        } else {
+          maybeCheckInvariants
+        }
+
+      val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
+
+      if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
+        val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
+          new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
+          BasicWriteJobStatsTracker.metrics)
+        registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
+        statsTrackers.append(basicWriteJobStatsTracker)
+      }
+
+      // Iceberg spec requires partition columns in data files
+      val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
+      // Retain only a minimal selection of Spark writer options to avoid any 
potential
+      // compatibility issues
+      val options = (writeOptions match {
+        case None => Map.empty[String, String]
+        case Some(writeOptions) =>
+          writeOptions.options.filterKeys {
+            key =>
+              key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
+              key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
+          }.toMap
+      }) + (DeltaOptions.WRITE_PARTITION_COLUMNS -> 
writePartitionColumns.toString)
+
+      try {
+        GlutenDeltaFileFormatWriter.write(
+          sparkSession = spark,
+          plan = physicalPlan,
+          fileFormat = new GlutenDeltaParquetFileFormat(
+            protocol,
+            metadata
+          ), // This is changed to Gluten's Delta format.
+          committer = committer,
+          outputSpec = outputSpec,
+          // scalastyle:off deltahadoopconfiguration
+          hadoopConf =
+            spark.sessionState.newHadoopConfWithOptions(metadata.configuration 
++ deltaLog.options),
+          // scalastyle:on deltahadoopconfiguration
+          partitionColumns = partitioningColumns,
+          bucketSpec = None,
+          statsTrackers = optionalStatsTracker.toSeq
+            ++ statsTrackers
+            ++ identityTrackerOpt.toSeq,
+          options = options
+        )
+      } catch {
+        case InnerInvariantViolationException(violationException) =>
+          // Pull an InvariantViolationException up to the top level if it was 
the root cause.
+          throw violationException
+      }
+      statsTrackers.foreach {
+        case tracker: BasicWriteJobStatsTracker =>
+          val numOutputRowsOpt = 
tracker.driverSideMetrics.get("numOutputRows").map(_.value)
+          IdentityColumn.logTableWrite(snapshot, trackIdentityHighWaterMarks, 
numOutputRowsOpt)
+        case _ => ()
+      }
+    }
+
+    var resultFiles =
+      (if (optionalStatsTracker.isDefined) {
+         committer.addedStatuses.map {
+           a =>
+             a.copy(stats = optionalStatsTracker
+               .map(_.delegate.recordedStats(a.toPath.getName))
+               .getOrElse(a.stats))
+         }
+       } else {
+         committer.addedStatuses
+       })
+        .filter {
+          // In some cases, we can write out an empty `inputData`. Some 
examples of this (though, they
+          // may be fixed in the future) are the MERGE command when you delete 
with empty source, or
+          // empty target, or on disjoint tables. This is hard to catch before 
the write without
+          // collecting the DF ahead of time. Instead, we can return only the 
AddFiles that
+          // a) actually add rows, or
+          // b) don't have any stats so we don't know the number of rows at all
+          case a: AddFile => a.numLogicalRecords.forall(_ > 0)
+          case _ => true
+        }
+
+    // add [[AddFile.Tags.ICEBERG_COMPAT_VERSION.name]] tags to addFiles
+    if (IcebergCompatV2.isEnabled(metadata)) {
+      resultFiles = resultFiles.map {
+        addFile =>
+          val tags = if (addFile.tags != null) addFile.tags else 
Map.empty[String, String]
+          addFile.copy(tags = tags + (AddFile.Tags.ICEBERG_COMPAT_VERSION.name 
-> "2"))
+      }
+    }
+
+    if (resultFiles.nonEmpty && !isOptimize) 
registerPostCommitHook(AutoCompact)
+    // Record the updated high water marks to be used during transaction 
commit.
+    identityTrackerOpt.ifDefined {
+      tracker => 
updatedIdentityHighWaterMarks.appendAll(tracker.delegate.highWaterMarks.toSeq)
+    }
+
+    resultFiles.toSeq ++ committer.changeFiles
+  }
+
+  private def shouldOptimizeWrite(
+      writeOptions: Option[DeltaOptions],
+      sessionConf: SQLConf): Boolean = {
+    writeOptions
+      .flatMap(_.optimizeWrite)
+      .getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf))
+  }
+}
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala
new file mode 100644
index 0000000000..91bc39bd2e
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.datasource.GlutenFormatFactory
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.{OutputWriter, 
OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructType
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.codec.CodecConfig
+import org.apache.parquet.hadoop.util.ContextUtil
+import org.slf4j.LoggerFactory
+
+class GlutenParquetFileFormat
+  extends ParquetFileFormat
+  with DataSourceRegister
+  with Logging
+  with Serializable {
+  import GlutenParquetFileFormat._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[GlutenParquetFileFormat])
+
+  override def shortName(): String = "gluten-parquet"
+
+  override def toString: String = "GlutenParquet"
+
+  override def hashCode(): Int = getClass.hashCode()
+
+  override def equals(other: Any): Boolean = 
other.isInstanceOf[GlutenParquetFileFormat]
+
+  override def inferSchema(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    super.inferSchema(sparkSession, options, files)
+  }
+
+  override def prepareWrite(
+      sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    if (isNativeWritable(dataSchema)) {
+      // Pass compression to job conf so that the file extension can be aware 
of it.
+      val conf = ContextUtil.getConfiguration(job)
+      val parquetOptions = new ParquetOptions(options, 
sparkSession.sessionState.conf)
+      conf.set(ParquetOutputFormat.COMPRESSION, 
parquetOptions.compressionCodecClassName)
+      val nativeConf =
+        GlutenFormatFactory("parquet")
+          .nativeConf(options, parquetOptions.compressionCodecClassName)
+
+      return new OutputWriterFactory {
+        override def getFileExtension(context: TaskAttemptContext): String = {
+          CodecConfig.from(context).getCodec.getExtension + ".parquet"
+        }
+
+        override def newInstance(
+            path: String,
+            dataSchema: StructType,
+            context: TaskAttemptContext): OutputWriter = {
+          GlutenFormatFactory("parquet")
+            .createOutputWriter(path, dataSchema, context, nativeConf)
+
+        }
+      }
+    }
+    logger.warn(
+      s"Data schema is unsupported by Gluten Parquet writer: $dataSchema, " +
+        s"falling back to the vanilla Spark Parquet writer")
+    super.prepareWrite(sparkSession, job, options, dataSchema)
+  }
+}
+
+object GlutenParquetFileFormat {
+  def isNativeWritable(schema: StructType): Boolean = {
+    BackendsApiManager.getSettings.supportNativeWrite(schema.fields)
+  }
+}
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
new file mode 100644
index 0000000000..90c0fa1bff
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.files
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.execution._
+import org.apache.gluten.execution.datasource.GlutenFormatFactory
+import org.apache.gluten.extension.columnar.transition.Transitions
+
+import org.apache.spark._
+import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
+import org.apache.spark.shuffle.FetchFailedException
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.connector.write.WriterCommitMessage
+import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat}
+import org.apache.spark.sql.delta.logging.DeltaLogKeys
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.FileFormatWriter._
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import java.util.{Date, UUID}
+
+// spotless:off
+/**
+ *  A helper object for writing FileFormat data out to a location.
+ *  Logic is copied from FileFormatWriter from Spark 3.5 with added 
functionality to write partition
+ *  values to data files. Specifically L123-126, L132, and L140 where it adds 
option
+ *  WRITE_PARTITION_COLUMNS
+ */
+object GlutenDeltaFileFormatWriter extends LoggingShims {
+
+  /**
+   * A variable used in tests to check whether the output ordering of the 
query matches the
+   * required ordering of the write command.
+   */
+  private var outputOrderingMatched: Boolean = false
+
+  /**
+   * A variable used in tests to check the final executed plan.
+   */
+  private var executedPlan: Option[SparkPlan] = None
+
+  // scalastyle:off argcount
+  /**
+   * Basic work flow of this command is:
+   * 1. Driver side setup, including output committer initialization and data 
source specific
+   *    preparation work for the write job to be issued.
+   * 2. Issues a write job consists of one or more executor side tasks, each 
of which writes all
+   *    rows within an RDD partition.
+   * 3. If no exception is thrown in a task, commits that task, otherwise 
aborts that task;  If any
+   *    exception is thrown during task commitment, also aborts that task.
+   * 4. If all tasks are committed, commit the job, otherwise aborts the job;  
If any exception is
+   *    thrown during job commitment, also aborts the job.
+   * 5. If the job is successfully committed, perform post-commit operations 
such as
+   *    processing statistics.
+   * @return The set of all partition paths that were updated during this 
write job.
+   */
+  def write(
+             sparkSession: SparkSession,
+             plan: SparkPlan,
+             fileFormat: FileFormat,
+             committer: FileCommitProtocol,
+             outputSpec: OutputSpec,
+             hadoopConf: Configuration,
+             partitionColumns: Seq[Attribute],
+             bucketSpec: Option[BucketSpec],
+             statsTrackers: Seq[WriteJobStatsTracker],
+             options: Map[String, String],
+             numStaticPartitionCols: Int = 0): Set[String] = {
+    require(partitionColumns.size >= numStaticPartitionCols)
+
+    val job = Job.getInstance(hadoopConf)
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
+
+    val partitionSet = AttributeSet(partitionColumns)
+    // cleanup the internal metadata information of
+    // the file source metadata attribute if any before write out
+    val finalOutputSpec = outputSpec.copy(
+      outputColumns = outputSpec.outputColumns
+        .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)
+    )
+    val dataColumns = 
finalOutputSpec.outputColumns.filterNot(partitionSet.contains)
+
+    val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec, 
dataColumns, options)
+    val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec, 
dataColumns)
+
+    val caseInsensitiveOptions = CaseInsensitiveMap(options)
+
+    val dataSchema = dataColumns.toStructType
+    DataSourceUtils.verifySchema(fileFormat, dataSchema)
+    DataSourceUtils.checkFieldNames(fileFormat, dataSchema)
+    // Note: prepareWrite has side effect. It sets "job".
+
+    val outputDataColumns =
+      if 
(caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true"))
 {
+        dataColumns ++ partitionColumns
+      } else dataColumns
+
+    val outputWriterFactory =
+      fileFormat.prepareWrite(
+        sparkSession,
+        job,
+        caseInsensitiveOptions,
+        outputDataColumns.toStructType
+      )
+
+    val description = new WriteJobDescription(
+      uuid = UUID.randomUUID.toString,
+      serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
+      outputWriterFactory = outputWriterFactory,
+      allColumns = finalOutputSpec.outputColumns,
+      dataColumns = outputDataColumns,
+      partitionColumns = partitionColumns,
+      bucketSpec = writerBucketSpec,
+      path = finalOutputSpec.outputPath,
+      customPartitionLocations = finalOutputSpec.customPartitionLocations,
+      maxRecordsPerFile = caseInsensitiveOptions
+        .get("maxRecordsPerFile")
+        .map(_.toLong)
+        .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
+      timeZoneId = caseInsensitiveOptions
+        .get(DateTimeUtils.TIMEZONE_OPTION)
+        .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone),
+      statsTrackers = statsTrackers
+    )
+
+    // We should first sort by dynamic partition columns, then bucket id, and 
finally sorting
+    // columns.
+    val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
+      writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
+    val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan)
+
+    // SPARK-40588: when planned writing is disabled and AQE is enabled,
+    // plan contains an AdaptiveSparkPlanExec, which does not know
+    // its final plan's ordering, so we have to materialize that plan first
+    // it is fine to use plan further down as the final plan is cached in that 
plan
+    def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
+      case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
+      case p: SparkPlan => 
p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan))
+    }
+
+    // the sort order doesn't matter
+    val actualOrdering = writeFilesOpt
+      .map(_.child)
+      .getOrElse(materializeAdaptiveSparkPlan(plan))
+      .outputOrdering
+    val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering, 
actualOrdering)
+
+    SQLExecution.checkSQLExecutionId(sparkSession)
+
+    // propagate the description UUID into the jobs, so that committers
+    // get an ID guaranteed to be unique.
+    job.getConfiguration.set("spark.sql.sources.writeJobUUID", 
description.uuid)
+
+    // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will 
add logical sort
+    // operator based on the required ordering of the V1 write command. So the 
output
+    // ordering of the physical plan should always match the required 
ordering. Here
+    // we set the variable to verify this behavior in tests.
+    // There are two cases where FileFormatWriter still needs to add physical 
sort:
+    // 1) When the planned write config is disabled.
+    // 2) When the concurrent writers are enabled (in this case the required 
ordering of a
+    //    V1 write command will be empty).
+    if (Utils.isTesting) outputOrderingMatched = orderingMatched
+
+    if (writeFilesOpt.isDefined) {
+      // build `WriteFilesSpec` for `WriteFiles`
+      val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {
+        val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)
+        createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)
+      }
+      val writeSpec = WriteFilesSpec(
+        description = description,
+        committer = committer,
+        concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc
+      )
+      executeWrite(sparkSession, plan, writeSpec, job)
+    } else {
+      executeWrite(
+        sparkSession,
+        plan,
+        job,
+        description,
+        committer,
+        outputSpec,
+        requiredOrdering,
+        partitionColumns,
+        sortColumns,
+        orderingMatched,
+        GlutenParquetFileFormat.isNativeWritable(dataSchema)
+      )
+    }
+  }
+  // scalastyle:on argcount
+
+  private def executeWrite(
+                            sparkSession: SparkSession,
+                            plan: SparkPlan,
+                            job: Job,
+                            description: WriteJobDescription,
+                            committer: FileCommitProtocol,
+                            outputSpec: OutputSpec,
+                            requiredOrdering: Seq[Expression],
+                            partitionColumns: Seq[Attribute],
+                            sortColumns: Seq[Attribute],
+                            orderingMatched: Boolean,
+                            writeOffloadable: Boolean): Set[String] = {
+    val projectList = V1WritesUtils.convertEmptyToNull(plan.output, 
partitionColumns)
+    val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, 
plan) else plan
+
+    writeAndCommit(job, description, committer) {
+      val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) {
+        (empty2NullPlan, None)
+      } else {
+        val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering, 
outputSpec)
+        val concurrentOutputWriterSpec =
+          createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)
+        if (concurrentOutputWriterSpec.isDefined) {
+          (empty2NullPlan, concurrentOutputWriterSpec)
+        } else {
+          def addNativeSort(input: SparkPlan): SparkPlan = {
+            val nativeSortPlan = SortExecTransformer(sortPlan.sortOrder, 
sortPlan.global, child = input)
+            val validationResult = nativeSortPlan.doValidate()
+            assert(validationResult.ok(),
+              s"Sort operation for Delta write is not offload-able: 
${validationResult.reason()}")
+            nativeSortPlan
+          }
+          val newPlan = sortPlan.child match {
+            case WholeStageTransformer(wholeStageChild, materializeInput) =>
+              WholeStageTransformer(addNativeSort(wholeStageChild),
+                
materializeInput)(ColumnarCollapseTransformStages.transformStageCounter.incrementAndGet())
+            case other =>
+              Transitions.toBatchPlan(sortPlan, VeloxBatchType)
+          }
+          (newPlan, None)
+        }
+      }
+
+      val wrappedPlanToExecute = if (writeOffloadable) {
+        
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(planToExecute)
+      } else {
+        planToExecute
+      }
+
+      // In testing, this is the only way to get hold of the actually executed 
plan written to file
+      if (Utils.isTesting) executedPlan = Some(wrappedPlanToExecute)
+
+      val rdd = wrappedPlanToExecute.execute()
+
+      // SPARK-23271 If we are attempting to write a zero partition rdd, 
create a dummy single
+      // partition rdd to make sure we at least set up one write task to write 
the metadata.
+      val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) {
+        sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)
+      } else {
+        rdd
+      }
+
+      val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
+      val ret = new 
Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
+      val partitionColumnToDataType = description.partitionColumns
+        .map(attr => (attr.name, attr.dataType)).toMap
+      sparkSession.sparkContext.runJob(
+        rddWithNonEmptyPartitions,
+        (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
+          executeTask(
+            description = description,
+            jobTrackerID = jobTrackerID,
+            sparkStageId = taskContext.stageId(),
+            sparkPartitionId = taskContext.partitionId(),
+            sparkAttemptNumber = taskContext.taskAttemptId().toInt & 
Integer.MAX_VALUE,
+            committer,
+            iterator = iter,
+            concurrentOutputWriterSpec = concurrentOutputWriterSpec,
+            partitionColumnToDataType
+          )
+        },
+        rddWithNonEmptyPartitions.partitions.indices,
+        (index, res: WriteTaskResult) => {
+          committer.onTaskCommit(res.commitMsg)
+          ret(index) = res
+        }
+      )
+      ret
+    }
+  }
+
+  private def writeAndCommit(
+                              job: Job,
+                              description: WriteJobDescription,
+                              committer: FileCommitProtocol)(f: => 
Array[WriteTaskResult]): Set[String] = {
+    // This call shouldn't be put into the `try` block below because it only 
initializes and
+    // prepares the job, any exception thrown from here shouldn't cause 
abortJob() to be called.
+    committer.setupJob(job)
+    try {
+      val ret = f
+      val commitMsgs = ret.map(_.commitMsg)
+
+      logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, 
description.uuid)}.")
+      val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, 
commitMsgs) }
+      logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} 
committed. " +
+        log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.")
+
+      processStats(description.statsTrackers, ret.map(_.summary.stats), 
duration)
+      logInfo(log"Finished processing stats for write job " +
+        log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
+
+      // return a set of all the partition paths that were updated during this 
job
+      ret.map(_.summary.updatedPartitions).reduceOption(_ ++ 
_).getOrElse(Set.empty)
+    } catch {
+      case cause: Throwable =>
+        logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, 
description.uuid)}", cause)
+        committer.abortJob(job)
+        throw cause
+    }
+  }
+
+  /**
+   * Write files using [[SparkPlan.executeWrite]]
+   */
+  private def executeWrite(
+                            session: SparkSession,
+                            planForWrites: SparkPlan,
+                            writeFilesSpec: WriteFilesSpec,
+                            job: Job): Set[String] = {
+    val committer = writeFilesSpec.committer
+    val description = writeFilesSpec.description
+
+    // In testing, this is the only way to get hold of the actually executed 
plan written to file
+    if (Utils.isTesting) executedPlan = Some(planForWrites)
+
+    writeAndCommit(job, description, committer) {
+      val rdd = planForWrites.executeWrite(writeFilesSpec)
+      val ret = new Array[WriteTaskResult](rdd.partitions.length)
+      session.sparkContext.runJob(
+        rdd,
+        (context: TaskContext, iter: Iterator[WriterCommitMessage]) => {
+          assert(iter.hasNext)
+          val commitMessage = iter.next()
+          assert(!iter.hasNext)
+          commitMessage
+        },
+        rdd.partitions.indices,
+        (index, res: WriterCommitMessage) => {
+          assert(res.isInstanceOf[WriteTaskResult])
+          val writeTaskResult = res.asInstanceOf[WriteTaskResult]
+          committer.onTaskCommit(writeTaskResult.commitMsg)
+          ret(index) = writeTaskResult
+        }
+      )
+      ret
+    }
+  }
+
+  private def createSortPlan(
+                              plan: SparkPlan,
+                              requiredOrdering: Seq[Expression],
+                              outputSpec: OutputSpec): SortExec = {
+    // SPARK-21165: the `requiredOrdering` is based on the attributes from 
analyzed plan, and
+    // the physical plan may have different attribute ids due to optimizer 
removing some
+    // aliases. Here we bind the expression ahead to avoid potential attribute 
ids mismatch.
+    val orderingExpr =
+      bindReferences(requiredOrdering.map(SortOrder(_, Ascending)), 
outputSpec.outputColumns)
+    SortExec(orderingExpr, global = false, child = plan)
+  }
+
+  private def createConcurrentOutputWriterSpec(
+                                                sparkSession: SparkSession,
+                                                sortPlan: SortExec,
+                                                sortColumns: Seq[Attribute]): 
Option[ConcurrentOutputWriterSpec] = {
+    val maxWriters = 
sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
+    val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
+    if (concurrentWritersEnabled) {
+      Some(ConcurrentOutputWriterSpec(maxWriters, () => 
sortPlan.createSorter()))
+    } else {
+      None
+    }
+  }
+
+  /** Writes data out in a single Spark task. */
+  private def executeTask(
+                           description: WriteJobDescription,
+                           jobTrackerID: String,
+                           sparkStageId: Int,
+                           sparkPartitionId: Int,
+                           sparkAttemptNumber: Int,
+                           committer: FileCommitProtocol,
+                           iterator: Iterator[InternalRow],
+                           concurrentOutputWriterSpec: 
Option[ConcurrentOutputWriterSpec],
+                           partitionColumnToDataType: Map[String, DataType]): 
WriteTaskResult = {
+
+    val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId)
+    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+    // Set up the attempt context required to use in the output committer.
+    val taskAttemptContext: TaskAttemptContext = {
+      // Set up the configuration object
+      val hadoopConf = description.serializableHadoopConf.value
+      hadoopConf.set("mapreduce.job.id", jobId.toString)
+      hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
+      hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
+      hadoopConf.setBoolean("mapreduce.task.ismap", true)
+      hadoopConf.setInt("mapreduce.task.partition", 0)
+
+      if (partitionColumnToDataType.isEmpty) {
+        new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+      } else {
+        new 
DeltaFileFormatWriter.PartitionedTaskAttemptContextImpl(hadoopConf, 
taskAttemptId, partitionColumnToDataType)
+      }
+    }
+
+    committer.setupTask(taskAttemptContext)
+
+    val dataWriter =
+      if (sparkPartitionId != 0 && !iterator.hasNext) {
+        // In case of empty job, leave first partition to save meta for file 
format like parquet.
+        new EmptyDirectoryDataWriter(description, taskAttemptContext, 
committer)
+      } else if (description.partitionColumns.isEmpty && 
description.bucketSpec.isEmpty) {
+        new SingleDirectoryDataWriter(description, taskAttemptContext, 
committer)
+      } else {
+        concurrentOutputWriterSpec match {
+          case Some(spec) =>
+            new DynamicPartitionDataConcurrentWriter(
+              description,
+              taskAttemptContext,
+              committer,
+              spec
+            )
+          case _ =>
+            // Columnar-based partition writer to divide the input batch by 
partition values
+            // and bucket IDs in advance.
+            new ColumnarDynamicPartitionDataSingleWriter(description, 
taskAttemptContext, committer)
+        }
+      }
+
+    try {
+      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+        // Execute the task to write rows out and commit the task.
+        dataWriter.writeWithIterator(iterator)
+        dataWriter.commit()
+      })(catchBlock = {
+        // If there is an error, abort the task
+        dataWriter.abort()
+        logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.")
+      }, finallyBlock = {
+        dataWriter.close()
+      })
+    } catch {
+      case e: FetchFailedException =>
+        throw e
+      case f: FileAlreadyExistsException if 
SQLConf.get.fastFailFileFormatOutput =>
+        // If any output file to write already exists, it does not make sense 
to re-run this task.
+        // We throw the exception and let Executor throw ExceptionFailure to 
abort the job.
+        throw new TaskOutputFileAlreadyExistException(f)
+      case t: Throwable =>
+        throw 
QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t)
+    }
+  }
+
+  /**
+   * For every registered [[WriteJobStatsTracker]], call `processStats()` on 
it, passing it
+   * the corresponding [[WriteTaskStats]] from all executors.
+   */
+  private def processStats(
+                            statsTrackers: Seq[WriteJobStatsTracker],
+                            statsPerTask: Seq[Seq[WriteTaskStats]],
+                            jobCommitDuration: Long): Unit = {
+
+    val numStatsTrackers = statsTrackers.length
+    assert(
+      statsPerTask.forall(_.length == numStatsTrackers),
+      s"""Every WriteTask should have produced one `WriteTaskStats` object for 
every tracker.
+         |There are $numStatsTrackers statsTrackers, but some task returned
+         |${statsPerTask.find(_.length != numStatsTrackers).get.length} 
results instead.
+       """.stripMargin
+    )
+
+    val statsPerTracker = if (statsPerTask.nonEmpty) {
+      statsPerTask.transpose
+    } else {
+      statsTrackers.map(_ => Seq.empty)
+    }
+
+    statsTrackers.zip(statsPerTracker).foreach {
+      case (statsTracker, stats) => statsTracker.processStats(stats, 
jobCommitDuration)
+    }
+  }
+
+  private class ColumnarDynamicPartitionDataSingleWriter(
+                                                          description: 
WriteJobDescription,
+                                                          taskAttemptContext: 
TaskAttemptContext,
+                                                          committer: 
FileCommitProtocol,
+                                                          customMetrics: 
Map[String, SQLMetric] = Map.empty)
+    extends BaseDynamicPartitionDataWriter(
+      description,
+      taskAttemptContext,
+      committer,
+      customMetrics) {
+
+    private var currentPartitionValues: Option[UnsafeRow] = None
+    private var currentBucketId: Option[Int] = None
+
+    private val partitionColIndice: Array[Int] =
+      description.partitionColumns.flatMap {
+        pcol =>
+          description.allColumns.zipWithIndex.collect {
+            case (acol, index) if acol.name == pcol.name && acol.exprId == 
pcol.exprId => index
+          }
+      }.toArray
+
+    private def beforeWrite(record: InternalRow): Unit = {
+      val nextPartitionValues = if (isPartitioned) 
Some(getPartitionValues(record)) else None
+      val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None
+
+      if (currentPartitionValues != nextPartitionValues || currentBucketId != 
nextBucketId) {
+        // See a new partition or bucket - write to a new partition dir (or a 
new bucket file).
+        if (isPartitioned && currentPartitionValues != nextPartitionValues) {
+          currentPartitionValues = Some(nextPartitionValues.get.copy())
+          statsTrackers.foreach(_.newPartition(currentPartitionValues.get))
+        }
+        if (isBucketed) {
+          currentBucketId = nextBucketId
+        }
+
+        fileCounter = 0
+        renewCurrentWriter(currentPartitionValues, currentBucketId, 
closeCurrentWriter = true)
+      } else if (description.maxRecordsPerFile > 0 &&
+        recordsInFile >= description.maxRecordsPerFile) {
+        renewCurrentWriterIfTooManyRecords(currentPartitionValues, 
currentBucketId)
+      }
+    }
+
+    override def write(record: InternalRow): Unit = {
+      record match {
+        case carrierRow: BatchCarrierRow =>
+          carrierRow match {
+            case placeholderRow: PlaceholderRow =>
+            // Do nothing.
+            case terminalRow: TerminalRow =>
+              val numRows = terminalRow.batch().numRows()
+              if (numRows > 0) {
+                val blockStripes = GlutenFormatFactory.rowSplitter
+                  .splitBlockByPartitionAndBucket(terminalRow.batch(), 
partitionColIndice,
+                    isBucketed)
+                val iter = blockStripes.iterator()
+                while (iter.hasNext) {
+                  val blockStripe = iter.next()
+                  val headingRow = blockStripe.getHeadingRow
+                  beforeWrite(headingRow)
+                  val columnBatch = blockStripe.getColumnarBatch
+                  currentWriter.write(terminalRow.withNewBatch(columnBatch))
+                  columnBatch.close()
+                }
+                blockStripes.release()
+                for (_ <- 0 until numRows) {
+                  statsTrackers.foreach(_.newRow(currentWriter.path, record))
+                }
+                recordsInFile += numRows
+              }
+          }
+        case _ =>
+          beforeWrite(record)
+          writeRecord(record)
+      }
+    }
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
new file mode 100644
index 0000000000..30e61730c1
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.stats
+
+import org.apache.gluten.execution.{PlaceholderRow, TerminalRow, 
VeloxColumnarToRowExec}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker
+import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, 
WriteTaskStats, WriteTaskStatsTracker}
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker)
+  extends WriteJobStatsTracker {
+  import GlutenDeltaJobStatisticsTracker._
+
+  override def newTaskInstance(): WriteTaskStatsTracker = {
+    new GlutenDeltaTaskStatisticsTracker(
+      delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker])
+  }
+
+  override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): 
Unit = {
+    delegate.processStats(stats, jobCommitTime)
+  }
+}
+
+class GlutenDeltaIdentityColumnStatsTracker(override val delegate: 
DeltaIdentityColumnStatsTracker)
+  extends GlutenDeltaJobStatisticsTracker(delegate)
+
+private object GlutenDeltaJobStatisticsTracker {
+
+  /**
+   * This is a temporary implementation of statistics tracker for Delta Lake. 
It's sub-optimal in
+   * performance because it internally performs C2R then send rows to the 
delegate row-based
+   * tracker.
+   *
+   * TODO: Columnar-based statistics collection.
+   */
+  private class GlutenDeltaTaskStatisticsTracker(delegate: 
DeltaTaskStatisticsTracker)
+    extends WriteTaskStatsTracker {
+
+    private val c2r = new VeloxColumnarToRowExec.Converter(new 
SQLMetric("convertTime"))
+
+    override def newPartition(partitionValues: InternalRow): Unit = {
+      delegate.newPartition(partitionValues)
+    }
+
+    override def newFile(filePath: String): Unit = {
+      delegate.newFile(filePath)
+    }
+
+    override def closeFile(filePath: String): Unit = {
+      delegate.closeFile(filePath)
+    }
+
+    override def newRow(filePath: String, row: InternalRow): Unit = {
+      row match {
+        case _: PlaceholderRow =>
+        case t: TerminalRow =>
+          c2r.toRowIterator(t.batch()).foreach(eachRow => 
delegate.newRow(filePath, eachRow))
+        case otherRow =>
+          delegate.newRow(filePath, otherRow)
+      }
+    }
+
+    override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
+      delegate.getFinalStats(taskCommitTime)
+    }
+  }
+}
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
new file mode 100644
index 0000000000..832cc084bb
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.delta.{GlutenOptimisticTransaction, 
OptimisticTransaction, TransactionExecutionObserver}
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+case class GlutenDeltaLeafV2CommandExec(delegate: LeafV2CommandExec) extends 
LeafV2CommandExec {
+
+  override def metrics: Map[String, SQLMetric] = delegate.metrics
+
+  override protected def run(): Seq[InternalRow] = {
+    TransactionExecutionObserver.withObserver(
+      DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) {
+      delegate.executeCollect()
+    }
+  }
+
+  override def output: Seq[Attribute] = {
+    delegate.output
+  }
+
+  override def nodeName: String = "GlutenDelta " + delegate.nodeName
+}
+
+case class GlutenDeltaLeafRunnableCommand(delegate: LeafRunnableCommand)
+  extends LeafRunnableCommand {
+  override lazy val metrics: Map[String, SQLMetric] = delegate.metrics
+
+  override def output: Seq[Attribute] = {
+    delegate.output
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    TransactionExecutionObserver.withObserver(
+      DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) {
+      delegate.run(sparkSession)
+    }
+  }
+
+  override def nodeName: String = "GlutenDelta " + delegate.nodeName
+}
+
+object DeltaV2WriteOperators {
+  object UseColumnarDeltaTransactionLog extends TransactionExecutionObserver {
+    override def startingTransaction(f: => OptimisticTransaction): 
OptimisticTransaction = {
+      val delegate = f
+      new GlutenOptimisticTransaction(delegate)
+    }
+
+    override def preparingCommit[T](f: => T): T = f
+
+    override def beginDoCommit(): Unit = ()
+
+    override def beginBackfill(): Unit = ()
+
+    override def beginPostCommit(): Unit = ()
+
+    override def transactionCommitted(): Unit = ()
+
+    override def transactionAborted(): Unit = ()
+
+    override def createChild(): TransactionExecutionObserver = {
+      TransactionExecutionObserver.getObserver
+    }
+  }
+}
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
new file mode 100644
index 0000000000..60865682b3
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.gluten.config.VeloxDeltaConfig
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+
+import org.apache.spark.sql.delta.catalog.DeltaCatalog
+import org.apache.spark.sql.delta.commands.DeleteCommand
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+
+case class OffloadDeltaCommand() extends OffloadSingleNode {
+  override def offload(plan: SparkPlan): SparkPlan = {
+    if (!VeloxDeltaConfig.get.enableNativeWrite) {
+      return plan
+    }
+    plan match {
+      case ExecutedCommandExec(dc: DeleteCommand) =>
+        ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc))
+      case ctas: AtomicCreateTableAsSelectExec if 
ctas.catalog.isInstanceOf[DeltaCatalog] =>
+        GlutenDeltaLeafV2CommandExec(ctas)
+      case rtas: AtomicReplaceTableAsSelectExec if 
rtas.catalog.isInstanceOf[DeltaCatalog] =>
+        GlutenDeltaLeafV2CommandExec(rtas)
+      case other => other
+    }
+  }
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
index 7bd6a59b7b..d32772f565 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
@@ -36,7 +36,10 @@ class DeleteSQLSuite extends DeleteSuiteBase
     Seq(
       // FIXME: Excluded by Gluten as results are mismatch.
       "test delete on temp view - nontrivial projection - SQL TempView",
-      "test delete on temp view - nontrivial projection - Dataset TempView"
+      "test delete on temp view - nontrivial projection - Dataset TempView",
+      // FIXME: Different error messages.
+      "test delete on temp view - superset cols - SQL TempView",
+      "test delete on temp view - superset cols - Dataset TempView"
     )
 
   // For EXPLAIN, which is not supported in OSS
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
index 4e7326c8c1..8b290e4db7 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
@@ -507,14 +507,14 @@ trait DeltaTestUtilsForTempViews
       expectedErrorClassForDataSetTempView: String = null): Unit = {
     if (isSQLTempView) {
       if (expectedErrorMsgForSQLTempView != null) {
-        errorContains(ex.getMessage, expectedErrorMsgForSQLTempView)
+        checkError(ex, expectedErrorMsgForSQLTempView)
       }
       if (expectedErrorClassForSQLTempView != null) {
         assert(ex.getErrorClass == expectedErrorClassForSQLTempView)
       }
     } else {
       if (expectedErrorMsgForDataSetTempView != null) {
-        errorContains(ex.getMessage, expectedErrorMsgForDataSetTempView)
+        checkError(ex, expectedErrorMsgForDataSetTempView)
       }
       if (expectedErrorClassForDataSetTempView != null) {
         assert(ex.getErrorClass == expectedErrorClassForDataSetTempView, 
ex.getMessage)
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
index 3d94d2bde3..bf64858399 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.delta.test
 
+import org.apache.gluten.config.VeloxDeltaConfig
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.delta.catalog.DeltaCatalog
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -47,6 +49,7 @@ trait DeltaSQLCommandTest extends SharedSparkSession {
       .set("spark.sql.shuffle.partitions", "1")
       .set("spark.memory.offHeap.size", "2g")
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key, "true")
   }
 }
 // spotless:on
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
index e722b2767a..9ada805c6e 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
@@ -74,6 +74,17 @@ object DataGen {
     def run(spark: SparkSession, source: String)
   }
 
+  object Feature {
+    def run(spark: SparkSession, source: String, feature: Feature): Unit = {
+      println(s"Executing feature: ${feature.name()}")
+      val start = System.nanoTime()
+      feature.run(spark, source)
+      val end = System.nanoTime()
+      println(
+        s"Finished executing feature: ${feature.name()}, elapsed time: ${(end 
- start) / 1e6} ms.")
+    }
+  }
+
   class FeatureRegistry extends Serializable {
     private val lookup: mutable.LinkedHashMap[String, Feature] = 
mutable.LinkedHashMap()
 
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
index 50404228f5..0c4bf94c71 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
@@ -137,11 +137,7 @@ class TpcdsDataGen(
   override def gen(): Unit = {
     Table.getBaseTables.forEach(t => writeParquetTable(t))
 
-    features.foreach {
-      feature =>
-        println(s"Execute feature: ${feature.name()}")
-        feature.run(spark, source)
-    }
+    features.foreach(feature => DataGen.Feature.run(spark, source, feature))
   }
 }
 
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
index eaf4b4ded9..aed8653f62 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
@@ -58,11 +58,7 @@ class TpchDataGen(
     generate(dir, "part", partSchema, partitions, partGenerator, partParser)
     generate(dir, "region", regionSchema, regionGenerator, regionParser)
 
-    features.foreach {
-      feature =>
-        println(s"Execute feature: ${feature.name()}")
-        feature.run(spark, source)
-    }
+    features.foreach(feature => DataGen.Feature.run(spark, source, feature))
   }
 
   // lineitem


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to