This is an automated email from the ASF dual-hosted git repository.
qiansun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a04dec6bb9 [GLUTEN-10215][VL] Delta: Native write support for Delta
3.3.1 / Spark 3.5 (#10801)
a04dec6bb9 is described below
commit a04dec6bb97779d6f942645ed274ed01a7ba767c
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Oct 15 16:06:27 2025 +0100
[GLUTEN-10215][VL] Delta: Native write support for Delta 3.3.1 / Spark 3.5
(#10801)
* [VL] Gluten-it: Record the execution time of data generator features
This will be helpful in measuring the write performance of Gluten's lake
format supports
* [VL] Delta: Native write support for Delta 3.3.1 / Spark 3.5
---
.../apache/gluten/config/VeloxDeltaConfig.scala | 43 ++
...che.gluten.component.VeloxDelta33WriteComponent | 0
.../component/VeloxDelta33WriteComponent.scala | 57 ++
.../sql/delta/GlutenDeltaParquetFileFormat.scala | 619 +++++++++++++++++++++
.../sql/delta/GlutenOptimisticTransaction.scala | 221 ++++++++
.../spark/sql/delta/GlutenParquetFileFormat.scala | 100 ++++
.../delta/files/GlutenDeltaFileFormatWriter.scala | 602 ++++++++++++++++++++
.../stats/GlutenDeltaWriteJobStatsTracker.scala | 83 +++
.../datasources/v2/DeltaWriteOperators.scala | 85 +++
.../datasources/v2/OffloadDeltaCommand.scala | 42 ++
.../apache/spark/sql/delta/DeleteSQLSuite.scala | 5 +-
.../apache/spark/sql/delta/DeltaTestUtils.scala | 4 +-
.../spark/sql/delta/test/DeltaSQLCommandTest.scala | 3 +
.../org/apache/gluten/integration/DataGen.scala | 11 +
.../gluten/integration/ds/TpcdsDataGen.scala | 6 +-
.../apache/gluten/integration/h/TpchDataGen.scala | 6 +-
16 files changed, 1874 insertions(+), 13 deletions(-)
diff --git
a/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala
b/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala
new file mode 100644
index 0000000000..99c2d2c26a
--- /dev/null
+++
b/backends-velox/src-delta/main/scala/org/apache/gluten/config/VeloxDeltaConfig.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.config
+
+import org.apache.spark.sql.internal.SQLConf
+
+class VeloxDeltaConfig(conf: SQLConf) extends GlutenCoreConfig(conf) {
+ import VeloxDeltaConfig._
+
+ def enableNativeWrite: Boolean = getConf(ENABLE_NATIVE_WRITE)
+}
+
+object VeloxDeltaConfig extends ConfigRegistry {
+
+ def get: VeloxDeltaConfig = {
+ new VeloxDeltaConfig(SQLConf.get)
+ }
+
+ /**
+ * Experimental as the feature now has performance issue because of the
fallback processing of
+ * statistics.
+ */
+ val ENABLE_NATIVE_WRITE: ConfigEntry[Boolean] =
+
buildConf("spark.gluten.sql.columnar.backend.velox.delta.enableNativeWrite")
+ .experimental()
+ .doc("Enable native Delta Lake write for Velox backend.")
+ .booleanConf
+ .createWithDefault(false)
+}
diff --git
a/backends-velox/src-delta33/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDelta33WriteComponent
b/backends-velox/src-delta33/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxDelta33WriteComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala
b/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala
new file mode 100644
index 0000000000..f1e06d0702
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/gluten/component/VeloxDelta33WriteComponent.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.component
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.datasources.v2.{LeafV2CommandExec,
OffloadDeltaCommand}
+
+class VeloxDelta33WriteComponent extends Component {
+ override def name(): String = "velox-delta33-write"
+
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("VeloxDelta33Write", "N/A", "N/A", "N/A")
+
+ override def dependencies(): Seq[Class[_ <: Component]] =
classOf[VeloxDeltaComponent] :: Nil
+
+ override def injectRules(injector: Injector): Unit = {
+ val legacy = injector.gluten.legacy
+ val ras = injector.gluten.ras
+ legacy.injectTransform {
+ c =>
+ val offload = Seq(
+ OffloadDeltaCommand()
+ ).map(_.toStrcitRule())
+ HeuristicTransform.Simple(
+ Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+ offload)
+ }
+ val offloads: Seq[RasOffload] = Seq(
+ RasOffload.from[ExecutedCommandExec](OffloadDeltaCommand()),
+ RasOffload.from[LeafV2CommandExec](OffloadDeltaCommand())
+ )
+ offloads.foreach(
+ offload =>
+ ras.injectRasRule(
+ c => RasOffload.Rule(offload, Validators.newValidator(new
GlutenConfig(c.sqlConf)), Nil)))
+ }
+}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
new file mode 100644
index 0000000000..c661e820ba
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._
+import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata,
Protocol}
+import
org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
+import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter,
KeepAllRowsFilter, KeepMarkedRowsFilter}
+import org.apache.spark.sql.delta.logging.DeltaLogKeys
+import org.apache.spark.sql.delta.schema.SchemaMergingUtils
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector,
OnHeapColumnVector, WritableColumnVector}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder,
StructField, StructType}
+import org.apache.spark.sql.util.ScalaExtensions._
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchRow,
ColumnVector}
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.Job
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.util.ContextUtil
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.control.NonFatal
+
+// spotless:off
+/**
+ * A thin wrapper over the Parquet file format to support
+ * - columns names without restrictions.
+ * - populated a column from the deletion vector of this file (if exists) to
indicate
+ * whether the row is deleted or not according to the deletion vector.
Consumers
+ * of this scan can use the column values to filter out the deleted rows.
+ */
+case class GlutenDeltaParquetFileFormat(
+ protocol: Protocol,
+ metadata: Metadata,
+ nullableRowTrackingFields: Boolean = false,
+ optimizationsEnabled: Boolean = true,
+ tablePath: Option[String] = None,
+ isCDCRead: Boolean = false)
+ extends GlutenParquetFileFormat
+ with LoggingShims {
+ // Validate either we have all arguments for DV enabled read or none of them.
+ if (hasTablePath) {
+ SparkSession.getActiveSession.map { session =>
+ val useMetadataRowIndex =
+
session.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
+ require(useMetadataRowIndex == optimizationsEnabled,
+ "Wrong arguments for Delta table scan with deletion vectors")
+ }
+ }
+
+ SparkSession.getActiveSession.ifDefined { session =>
+ TypeWidening.assertTableReadable(session.sessionState.conf, protocol,
metadata)
+ }
+
+
+ val columnMappingMode: DeltaColumnMappingMode = metadata.columnMappingMode
+ val referenceSchema: StructType = metadata.schema
+
+ if (columnMappingMode == IdMapping) {
+ val requiredReadConf = SQLConf.PARQUET_FIELD_ID_READ_ENABLED
+
require(SparkSession.getActiveSession.exists(_.sessionState.conf.getConf(requiredReadConf)),
+ s"${requiredReadConf.key} must be enabled to support Delta id column
mapping mode")
+ val requiredWriteConf = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED
+
require(SparkSession.getActiveSession.exists(_.sessionState.conf.getConf(requiredWriteConf)),
+ s"${requiredWriteConf.key} must be enabled to support Delta id column
mapping mode")
+ }
+
+ override def shortName(): String = "parquet"
+
+ override def toString: String = "Parquet"
+
+ /**
+ * prepareSchemaForRead must only be used for parquet read.
+ * It removes "PARQUET_FIELD_ID_METADATA_KEY" for name mapping mode which
address columns by
+ * physical name instead of id.
+ */
+ def prepareSchemaForRead(inputSchema: StructType): StructType = {
+ val schema = DeltaColumnMapping.createPhysicalSchema(
+ inputSchema, referenceSchema, columnMappingMode)
+ if (columnMappingMode == NameMapping) {
+ SchemaMergingUtils.transformColumns(schema) { (_, field, _) =>
+ field.copy(metadata = new MetadataBuilder()
+ .withMetadata(field.metadata)
+ .remove(DeltaColumnMapping.PARQUET_FIELD_ID_METADATA_KEY)
+ .remove(DeltaColumnMapping.PARQUET_FIELD_NESTED_IDS_METADATA_KEY)
+ .build())
+ }
+ } else schema
+ }
+
+ /**
+ * Prepares filters so that they can be pushed down into the Parquet reader.
+ *
+ * If column mapping is enabled, then logical column names in the filters
will be replaced with
+ * their corresponding physical column names. This is necessary as the
Parquet files will use
+ * physical column names, and the requested schema pushed down in the
Parquet reader will also use
+ * physical column names.
+ */
+ private def prepareFiltersForRead(filters: Seq[Filter]): Seq[Filter] = {
+ if (!optimizationsEnabled) {
+ Seq.empty
+ } else if (columnMappingMode != NoMapping) {
+ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+ val physicalNameMap =
DeltaColumnMapping.getLogicalNameToPhysicalNameMap(referenceSchema)
+ .map { case (logicalName, physicalName) => (logicalName.quoted,
physicalName.quoted) }
+ filters.flatMap(translateFilterForColumnMapping(_, physicalNameMap))
+ } else {
+ filters
+ }
+ }
+
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = optimizationsEnabled
+
+ def hasTablePath: Boolean = tablePath.isDefined
+
+ /**
+ * We sometimes need to replace FileFormat within LogicalPlans, so we have
to override
+ * `equals` to ensure file format changes are captured
+ */
+ override def equals(other: Any): Boolean = {
+ other match {
+ case ff: GlutenDeltaParquetFileFormat =>
+ ff.columnMappingMode == columnMappingMode &&
+ ff.referenceSchema == referenceSchema &&
+ ff.optimizationsEnabled == optimizationsEnabled
+ case _ => false
+ }
+ }
+
+ override def hashCode(): Int = getClass.getCanonicalName.hashCode()
+
+ override def buildReaderWithPartitionValues(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+
+ val useMetadataRowIndexConf =
DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX
+ val useMetadataRowIndex =
sparkSession.sessionState.conf.getConf(useMetadataRowIndexConf)
+
+ val parquetDataReader: PartitionedFile => Iterator[InternalRow] =
+ super.buildReaderWithPartitionValues(
+ sparkSession,
+ prepareSchemaForRead(dataSchema),
+ prepareSchemaForRead(partitionSchema),
+ prepareSchemaForRead(requiredSchema),
+ prepareFiltersForRead(filters),
+ options,
+ hadoopConf)
+
+ val schemaWithIndices = requiredSchema.fields.zipWithIndex
+ def findColumn(name: String): Option[ColumnMetadata] = {
+ val results = schemaWithIndices.filter(_._1.name == name)
+ if (results.length > 1) {
+ throw new IllegalArgumentException(
+ s"There are more than one column with name=`$name` requested in the
reader output")
+ }
+ results.headOption.map(e => ColumnMetadata(e._2, e._1))
+ }
+
+ val isRowDeletedColumn = findColumn(IS_ROW_DELETED_COLUMN_NAME)
+ val rowIndexColumnName = if (useMetadataRowIndex) {
+ ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
+ } else {
+ ROW_INDEX_COLUMN_NAME
+ }
+ val rowIndexColumn = findColumn(rowIndexColumnName)
+
+ // We don't have any additional columns to generate, just return the
original reader as is.
+ if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) return
parquetDataReader
+
+ // We are using the row_index col generated by the parquet reader and
there are no more
+ // columns to generate.
+ if (useMetadataRowIndex && isRowDeletedColumn.isEmpty) return
parquetDataReader
+
+ // Verify that either predicate pushdown with metadata column is enabled
or optimizations
+ // are disabled.
+ require(useMetadataRowIndex || !optimizationsEnabled,
+ "Cannot generate row index related metadata with file splitting or
predicate pushdown")
+
+ if (hasTablePath && isRowDeletedColumn.isEmpty) {
+ throw new IllegalArgumentException(
+ s"Expected a column $IS_ROW_DELETED_COLUMN_NAME in the schema")
+ }
+
+ val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
+
+ val useOffHeapBuffers =
sparkSession.sessionState.conf.offHeapColumnVectorEnabled
+ (partitionedFile: PartitionedFile) => {
+ val rowIteratorFromParquet = parquetDataReader(partitionedFile)
+ try {
+ val iterToReturn =
+ iteratorWithAdditionalMetadataColumns(
+ partitionedFile,
+ rowIteratorFromParquet,
+ isRowDeletedColumn,
+ rowIndexColumn,
+ useOffHeapBuffers,
+ serializableHadoopConf,
+ useMetadataRowIndex)
+ iterToReturn.asInstanceOf[Iterator[InternalRow]]
+ } catch {
+ case NonFatal(e) =>
+ // Close the iterator if it is a closeable resource. The
`ParquetFileFormat` opens
+ // the file and returns `RecordReaderIterator` (which implements
`AutoCloseable` and
+ // `Iterator`) instance as a `Iterator`.
+ rowIteratorFromParquet match {
+ case resource: AutoCloseable => closeQuietly(resource)
+ case _ => // do nothing
+ }
+ throw e
+ }
+ }
+ }
+
+ override def supportFieldName(name: String): Boolean = {
+ if (columnMappingMode != NoMapping) true else super.supportFieldName(name)
+ }
+
+ override def metadataSchemaFields: Seq[StructField] = {
+ // TODO(SPARK-47731): Parquet reader in Spark has a bug where a file
containing 2b+ rows
+ // in a single rowgroup causes it to run out of the `Integer` range.
+ // For Delta Parquet readers don't expose the row_index field as a
metadata field when it is
+ // not strictly required. We do expose it when Row Tracking or DVs are
enabled.
+ // In general, having 2b+ rows in a single rowgroup is not a common use
case. When the issue is
+ // hit an exception is thrown.
+ (protocol, metadata) match {
+ // We should not expose row tracking fields for CDC reads.
+ case (p, m) if RowId.isEnabled(p, m) && !isCDCRead =>
+ val extraFields = RowTracking.createMetadataStructFields(p, m,
nullableRowTrackingFields)
+ super.metadataSchemaFields ++ extraFields
+ case (p, m) if deletionVectorsReadable(p, m) =>
super.metadataSchemaFields
+ case _ => super.metadataSchemaFields.filter(_ !=
ParquetFileFormat.ROW_INDEX_FIELD)
+ }
+ }
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val factory = super.prepareWrite(sparkSession, job, options, dataSchema)
+ val conf = ContextUtil.getConfiguration(job)
+ // Always write timestamp as TIMESTAMP_MICROS for Iceberg compat based on
Iceberg spec
+ if (IcebergCompatV1.isEnabled(metadata) ||
IcebergCompatV2.isEnabled(metadata)) {
+ conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
+ SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)
+ }
+ if (IcebergCompatV2.isEnabled(metadata)) {
+ // For Uniform with IcebergCompatV2, we need to write nested field IDs
for list and map
+ // types to the parquet schema. Spark currently does not support it so
we hook in our
+ // own write support class.
+ ParquetOutputFormat.setWriteSupportClass(job,
classOf[DeltaParquetWriteSupport])
+ }
+ factory
+ }
+
+ override def fileConstantMetadataExtractors: Map[String, PartitionedFile =>
Any] = {
+ val extractBaseRowId: PartitionedFile => Any = { file =>
+ file.otherConstantMetadataColumnValues.getOrElse(RowId.BASE_ROW_ID, {
+ throw new IllegalStateException(
+ s"Missing ${RowId.BASE_ROW_ID} value for file '${file.filePath}'")
+ })
+ }
+ val extractDefaultRowCommitVersion: PartitionedFile => Any = { file =>
+ file.otherConstantMetadataColumnValues
+ .getOrElse(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, {
+ throw new IllegalStateException(
+ s"Missing ${DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME}
value " +
+ s"for file '${file.filePath}'")
+ })
+ }
+ super.fileConstantMetadataExtractors
+ .updated(RowId.BASE_ROW_ID, extractBaseRowId)
+ .updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME,
extractDefaultRowCommitVersion)
+ }
+
+ def copyWithDVInfo(
+ tablePath: String,
+ optimizationsEnabled: Boolean):
GlutenDeltaParquetFileFormat = {
+ // When predicate pushdown is enabled we allow both splits and predicate
pushdown.
+ this.copy(
+ optimizationsEnabled = optimizationsEnabled,
+ tablePath = Some(tablePath))
+ }
+
+ /**
+ * Modifies the data read from underlying Parquet reader by populating one
or both of the
+ * following metadata columns.
+ * - [[IS_ROW_DELETED_COLUMN_NAME]] - row deleted status from deletion
vector corresponding
+ * to this file
+ * - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file. Note,
this column is only
+ * populated when we are not using _metadata.row_index column.
+ */
+ private def iteratorWithAdditionalMetadataColumns(
+ partitionedFile:
PartitionedFile,
+ iterator:
Iterator[Object],
+ isRowDeletedColumnOpt:
Option[ColumnMetadata],
+ rowIndexColumnOpt:
Option[ColumnMetadata],
+ useOffHeapBuffers:
Boolean,
+ serializableHadoopConf:
SerializableConfiguration,
+ useMetadataRowIndex:
Boolean): Iterator[Object] = {
+ require(!useMetadataRowIndex || rowIndexColumnOpt.isDefined,
+ "useMetadataRowIndex is enabled but rowIndexColumn is not defined.")
+
+ val rowIndexFilterOpt = isRowDeletedColumnOpt.map { col =>
+ // Fetch the DV descriptor from the broadcast map and create a row index
filter
+ val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues
+ .get(FILE_ROW_INDEX_FILTER_ID_ENCODED)
+ val filterTypeOpt = partitionedFile.otherConstantMetadataColumnValues
+ .get(FILE_ROW_INDEX_FILTER_TYPE)
+ if (dvDescriptorOpt.isDefined && filterTypeOpt.isDefined) {
+ val rowIndexFilter = filterTypeOpt.get match {
+ case RowIndexFilterType.IF_CONTAINED => DropMarkedRowsFilter
+ case RowIndexFilterType.IF_NOT_CONTAINED => KeepMarkedRowsFilter
+ case unexpectedFilterType => throw new IllegalStateException(
+ s"Unexpected row index filter type: ${unexpectedFilterType}")
+ }
+ rowIndexFilter.createInstance(
+
DeletionVectorDescriptor.deserializeFromBase64(dvDescriptorOpt.get.asInstanceOf[String]),
+ serializableHadoopConf.value,
+ tablePath.map(new Path(_)))
+ } else if (dvDescriptorOpt.isDefined || filterTypeOpt.isDefined) {
+ throw new IllegalStateException(
+ s"Both ${FILE_ROW_INDEX_FILTER_ID_ENCODED} and
${FILE_ROW_INDEX_FILTER_TYPE} " +
+ "should either both have values or no values at all.")
+ } else {
+ KeepAllRowsFilter
+ }
+ }
+
+ // We only generate the row index column when predicate pushdown is not
enabled.
+ val rowIndexColumnToWriteOpt = if (useMetadataRowIndex) None else
rowIndexColumnOpt
+ val metadataColumnsToWrite =
+ Seq(isRowDeletedColumnOpt,
rowIndexColumnToWriteOpt).filter(_.nonEmpty).map(_.get)
+
+ // When metadata.row_index is not used there is no way to verify the
Parquet index is
+ // starting from 0. We disable the splits, so the assumption is
ParquetFileFormat respects
+ // that.
+ var rowIndex: Long = 0
+
+ // Used only when non-column row batches are received from the Parquet
reader
+ val tempVector = new OnHeapColumnVector(1, ByteType)
+
+ iterator.map { row =>
+ row match {
+ case batch: ColumnarBatch => // When vectorized Parquet reader is
enabled.
+ val size = batch.numRows()
+ // Create vectors for all needed metadata columns.
+ // We can't use the one from Parquet reader as it set the
+ // [[WritableColumnVector.isAllNulls]] to true and it can't be reset
with using any
+ // public APIs.
+ trySafely(useOffHeapBuffers, size, metadataColumnsToWrite) {
writableVectors =>
+ val indexVectorTuples = new ArrayBuffer[(Int, ColumnVector)]
+
+ // When predicate pushdown is enabled we use _metadata.row_index.
Therefore,
+ // we only need to construct the isRowDeleted column.
+ var index = 0
+ isRowDeletedColumnOpt.foreach { columnMetadata =>
+ val isRowDeletedVector = writableVectors(index)
+ if (useMetadataRowIndex) {
+ rowIndexFilterOpt.get.materializeIntoVectorWithRowIndex(
+ size, batch.column(rowIndexColumnOpt.get.index),
isRowDeletedVector)
+ } else {
+ rowIndexFilterOpt.get
+ .materializeIntoVector(rowIndex, rowIndex + size,
isRowDeletedVector)
+ }
+ indexVectorTuples += (columnMetadata.index -> isRowDeletedVector)
+ index += 1
+ }
+
+ rowIndexColumnToWriteOpt.foreach { columnMetadata =>
+ val rowIndexVector = writableVectors(index)
+ // populate the row index column value.
+ for (i <- 0 until size) {
+ rowIndexVector.putLong(i, rowIndex + i)
+ }
+
+ indexVectorTuples += (columnMetadata.index -> rowIndexVector)
+ index += 1
+ }
+
+ val newBatch = replaceVectors(batch, indexVectorTuples.toSeq: _*)
+ rowIndex += size
+ newBatch
+ }
+
+ case columnarRow: ColumnarBatchRow =>
+ // When vectorized reader is enabled but returns immutable rows
instead of
+ // columnar batches [[ColumnarBatchRow]]. So we have to copy the row
as a
+ // mutable [[InternalRow]] and set the `row_index` and
`is_row_deleted`
+ // column values. This is not efficient. It should affect only the
wide
+ // tables. https://github.com/delta-io/delta/issues/2246
+ val newRow = columnarRow.copy();
+ isRowDeletedColumnOpt.foreach { columnMetadata =>
+ val rowIndexForFiltering = if (useMetadataRowIndex) {
+ columnarRow.getLong(rowIndexColumnOpt.get.index)
+ } else {
+ rowIndex
+ }
+
rowIndexFilterOpt.get.materializeSingleRowWithRowIndex(rowIndexForFiltering,
tempVector)
+ newRow.setByte(columnMetadata.index, tempVector.getByte(0))
+ }
+
+ rowIndexColumnToWriteOpt
+ .foreach(columnMetadata => newRow.setLong(columnMetadata.index,
rowIndex))
+ rowIndex += 1
+
+ newRow
+ case rest: InternalRow => // When vectorized Parquet reader is disabled
+ // Temporary vector variable used to get DV values from
RowIndexFilter
+ // Currently the RowIndexFilter only supports writing into a
columnar vector
+ // and doesn't have methods to get DV value for a specific row index.
+ // TODO: This is not efficient, but it is ok given the default
reader is vectorized
+ isRowDeletedColumnOpt.foreach { columnMetadata =>
+ val rowIndexForFiltering = if (useMetadataRowIndex) {
+ rest.getLong(rowIndexColumnOpt.get.index)
+ } else {
+ rowIndex
+ }
+
rowIndexFilterOpt.get.materializeSingleRowWithRowIndex(rowIndexForFiltering,
tempVector)
+ rest.setByte(columnMetadata.index, tempVector.getByte(0))
+ }
+
+ rowIndexColumnToWriteOpt
+ .foreach(columnMetadata => rest.setLong(columnMetadata.index,
rowIndex))
+ rowIndex += 1
+ rest
+ case others =>
+ throw new RuntimeException(
+ s"Parquet reader returned an unknown row type:
${others.getClass.getName}")
+ }
+ }
+ }
+
+ /**
+ * Translates the filter to use physical column names instead of logical
column names.
+ * This is needed when the column mapping mode is set to `NameMapping` or
`IdMapping`
+ * to match the requested schema that's passed to the [[ParquetFileFormat]].
+ */
+ private def translateFilterForColumnMapping(
+ filter: Filter,
+ physicalNameMap: Map[String,
String]): Option[Filter] = {
+ object PhysicalAttribute {
+ def unapply(attribute: String): Option[String] = {
+ physicalNameMap.get(attribute)
+ }
+ }
+
+ filter match {
+ case EqualTo(PhysicalAttribute(physicalAttribute), value) =>
+ Some(EqualTo(physicalAttribute, value))
+ case EqualNullSafe(PhysicalAttribute(physicalAttribute), value) =>
+ Some(EqualNullSafe(physicalAttribute, value))
+ case GreaterThan(PhysicalAttribute(physicalAttribute), value) =>
+ Some(GreaterThan(physicalAttribute, value))
+ case GreaterThanOrEqual(PhysicalAttribute(physicalAttribute), value) =>
+ Some(GreaterThanOrEqual(physicalAttribute, value))
+ case LessThan(PhysicalAttribute(physicalAttribute), value) =>
+ Some(LessThan(physicalAttribute, value))
+ case LessThanOrEqual(PhysicalAttribute(physicalAttribute), value) =>
+ Some(LessThanOrEqual(physicalAttribute, value))
+ case In(PhysicalAttribute(physicalAttribute), values) =>
+ Some(In(physicalAttribute, values))
+ case IsNull(PhysicalAttribute(physicalAttribute)) =>
+ Some(IsNull(physicalAttribute))
+ case IsNotNull(PhysicalAttribute(physicalAttribute)) =>
+ Some(IsNotNull(physicalAttribute))
+ case And(left, right) =>
+ val newLeft = translateFilterForColumnMapping(left, physicalNameMap)
+ val newRight = translateFilterForColumnMapping(right, physicalNameMap)
+ (newLeft, newRight) match {
+ case (Some(l), Some(r)) => Some(And(l, r))
+ case (Some(l), None) => Some(l)
+ case (_, _) => newRight
+ }
+ case Or(left, right) =>
+ val newLeft = translateFilterForColumnMapping(left, physicalNameMap)
+ val newRight = translateFilterForColumnMapping(right, physicalNameMap)
+ (newLeft, newRight) match {
+ case (Some(l), Some(r)) => Some(Or(l, r))
+ case (_, _) => None
+ }
+ case Not(child) =>
+ translateFilterForColumnMapping(child, physicalNameMap).map(Not)
+ case StringStartsWith(PhysicalAttribute(physicalAttribute), value) =>
+ Some(StringStartsWith(physicalAttribute, value))
+ case StringEndsWith(PhysicalAttribute(physicalAttribute), value) =>
+ Some(StringEndsWith(physicalAttribute, value))
+ case StringContains(PhysicalAttribute(physicalAttribute), value) =>
+ Some(StringContains(physicalAttribute, value))
+ case AlwaysTrue() => Some(AlwaysTrue())
+ case AlwaysFalse() => Some(AlwaysFalse())
+ case _ =>
+ logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER,
filter)}")
+ None
+ }
+ }
+}
+
+object GlutenDeltaParquetFileFormat {
+ /**
+ * Column name used to identify whether the row read from the parquet file
is marked
+ * as deleted according to the Delta table deletion vectors
+ */
+ val IS_ROW_DELETED_COLUMN_NAME = "__delta_internal_is_row_deleted"
+ val IS_ROW_DELETED_STRUCT_FIELD = StructField(IS_ROW_DELETED_COLUMN_NAME,
ByteType)
+
+ /** Row index for each column */
+ val ROW_INDEX_COLUMN_NAME = "__delta_internal_row_index"
+ val ROW_INDEX_STRUCT_FIELD = StructField(ROW_INDEX_COLUMN_NAME, LongType)
+
+ /** The key to the encoded row index filter identifier value of the
+ * [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */
+ val FILE_ROW_INDEX_FILTER_ID_ENCODED = "row_index_filter_id_encoded"
+
+ /** The key to the row index filter type value of the
+ * [[PartitionedFile]]'s otherConstantMetadataColumnValues map. */
+ val FILE_ROW_INDEX_FILTER_TYPE = "row_index_filter_type"
+
+ /** Utility method to create a new writable vector */
+ private def newVector(
+ useOffHeapBuffers: Boolean, size: Int, dataType:
StructField): WritableColumnVector = {
+ if (useOffHeapBuffers) {
+ OffHeapColumnVector.allocateColumns(size, Seq(dataType).toArray)(0)
+ } else {
+ OnHeapColumnVector.allocateColumns(size, Seq(dataType).toArray)(0)
+ }
+ }
+
+ /** Try the operation, if the operation fails release the created resource */
+ private def trySafely[R <: WritableColumnVector, T](
+ useOffHeapBuffers:
Boolean,
+ size: Int,
+ columns:
Seq[ColumnMetadata])(f: Seq[WritableColumnVector] => T): T = {
+ val resources = new ArrayBuffer[WritableColumnVector](columns.size)
+ try {
+ columns.foreach(col => resources.append(newVector(useOffHeapBuffers,
size, col.structField)))
+ f(resources.toSeq)
+ } catch {
+ case NonFatal(e) =>
+ resources.foreach(closeQuietly(_))
+ throw e
+ }
+ }
+
+ /** Utility method to quietly close an [[AutoCloseable]] */
+ private def closeQuietly(closeable: AutoCloseable): Unit = {
+ if (closeable != null) {
+ try {
+ closeable.close()
+ } catch {
+ case NonFatal(_) => // ignore
+ }
+ }
+ }
+
+ /**
+ * Helper method to replace the vectors in given [[ColumnarBatch]].
+ * New vectors and its index in the batch are given as tuples.
+ */
+ private def replaceVectors(
+ batch: ColumnarBatch,
+ indexVectorTuples: (Int, ColumnVector) *):
ColumnarBatch = {
+ val vectors = ArrayBuffer[ColumnVector]()
+ for (i <- 0 until batch.numCols()) {
+ var replaced: Boolean = false
+ for (indexVectorTuple <- indexVectorTuples) {
+ val index = indexVectorTuple._1
+ val vector = indexVectorTuple._2
+ if (indexVectorTuple._1 == i) {
+ vectors += indexVectorTuple._2
+ // Make sure to close the existing vector allocated in the Parquet
+ batch.column(i).close()
+ replaced = true
+ }
+ }
+ if (!replaced) {
+ vectors += batch.column(i)
+ }
+ }
+ new ColumnarBatch(vectors.toArray, batch.numRows())
+ }
+
+ /** Helper class to encapsulate column info */
+ case class ColumnMetadata(index: Int, structField: StructField)
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
new file mode 100644
index 0000000000..275d2c0cd9
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.config.VeloxDeltaConfig
+
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
+import org.apache.spark.sql.delta.constraints.{Constraint, Constraints,
DeltaInvariantCheckerExec}
+import org.apache.spark.sql.delta.files.{GlutenDeltaFileFormatWriter,
TransactionalWrite}
+import org.apache.spark.sql.delta.hooks.AutoCompact
+import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec
+import org.apache.spark.sql.delta.schema.InnerInvariantViolationException
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import
org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker,
GlutenDeltaJobStatisticsTracker}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker,
FileFormatWriter, WriteJobStatsTracker}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.ScalaExtensions.OptionExt
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.mutable.ListBuffer
+
+class GlutenOptimisticTransaction(delegate: OptimisticTransaction)
+ extends OptimisticTransaction(
+ delegate.deltaLog,
+ delegate.catalogTable,
+ delegate.snapshot
+ ) {
+
+ override def writeFiles(
+ inputData: Dataset[_],
+ writeOptions: Option[DeltaOptions],
+ isOptimize: Boolean,
+ additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
+ hasWritten = true
+
+ val spark = inputData.sparkSession
+ val veloxDeltaConfig = new VeloxDeltaConfig(spark.sessionState.conf)
+
+ val (data, partitionSchema) = performCDCPartition(inputData)
+ val outputPath = deltaLog.dataPath
+
+ val (queryExecution, output, generatedColumnConstraints, trackFromData) =
+ normalizeData(deltaLog, writeOptions, data)
+ // Use the track set from the transaction if set,
+ // otherwise use the track set from `normalizeData()`.
+ val trackIdentityHighWaterMarks =
trackHighWaterMarks.getOrElse(trackFromData)
+
+ val partitioningColumns = getPartitioningColumns(partitionSchema, output)
+
+ val committer = getCommitter(outputPath)
+
+ val (statsDataSchema, _) = getStatsSchema(output, partitionSchema)
+
+ // If Statistics Collection is enabled, then create a stats tracker that
will be injected during
+ // the FileFormatWriter.write call below and will collect per-file stats
using
+ // StatisticsCollection
+ val optionalStatsTracker =
+ getOptionalStatsTrackerAndStatsCollection(output, outputPath,
partitionSchema, data)._1.map(
+ new GlutenDeltaJobStatisticsTracker(_))
+
+ val constraints =
+ Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++
additionalConstraints
+
+ val identityTrackerOpt = IdentityColumn
+ .createIdentityColumnStatsTracker(
+ spark,
+ deltaLog.newDeltaHadoopConf(),
+ outputPath,
+ metadata.schema,
+ statsDataSchema,
+ trackIdentityHighWaterMarks
+ )
+ .map(new GlutenDeltaIdentityColumnStatsTracker(_))
+
+ SQLExecution.withNewExecutionId(queryExecution,
Option("deltaTransactionalWrite")) {
+ val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString,
Map.empty, output)
+
+ val empty2NullPlan =
+ convertEmptyToNullIfNeeded(queryExecution.executedPlan,
partitioningColumns, constraints)
+ val maybeCheckInvariants = if (constraints.isEmpty) {
+ // Compared to vanilla Delta, we simply avoid adding the invariant
checker
+ // when the constraint list is empty, to avoid the unnecessary
transitions
+ // added around the invariant checker.
+ empty2NullPlan
+ } else {
+ DeltaInvariantCheckerExec(empty2NullPlan, constraints)
+ }
+ // No need to plan optimized write if the write command is OPTIMIZE,
which aims to produce
+ // evenly-balanced data files already.
+ val physicalPlan =
+ if (
+ !isOptimize &&
+ shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
+ ) {
+ DeltaOptimizedWriterExec(maybeCheckInvariants,
metadata.partitionColumns, deltaLog)
+ } else {
+ maybeCheckInvariants
+ }
+
+ val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
+
+ if (spark.conf.get(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED)) {
+ val basicWriteJobStatsTracker = new BasicWriteJobStatsTracker(
+ new SerializableConfiguration(deltaLog.newDeltaHadoopConf()),
+ BasicWriteJobStatsTracker.metrics)
+ registerSQLMetrics(spark, basicWriteJobStatsTracker.driverSideMetrics)
+ statsTrackers.append(basicWriteJobStatsTracker)
+ }
+
+ // Iceberg spec requires partition columns in data files
+ val writePartitionColumns = IcebergCompat.isAnyEnabled(metadata)
+ // Retain only a minimal selection of Spark writer options to avoid any
potential
+ // compatibility issues
+ val options = (writeOptions match {
+ case None => Map.empty[String, String]
+ case Some(writeOptions) =>
+ writeOptions.options.filterKeys {
+ key =>
+ key.equalsIgnoreCase(DeltaOptions.MAX_RECORDS_PER_FILE) ||
+ key.equalsIgnoreCase(DeltaOptions.COMPRESSION)
+ }.toMap
+ }) + (DeltaOptions.WRITE_PARTITION_COLUMNS ->
writePartitionColumns.toString)
+
+ try {
+ GlutenDeltaFileFormatWriter.write(
+ sparkSession = spark,
+ plan = physicalPlan,
+ fileFormat = new GlutenDeltaParquetFileFormat(
+ protocol,
+ metadata
+ ), // This is changed to Gluten's Delta format.
+ committer = committer,
+ outputSpec = outputSpec,
+ // scalastyle:off deltahadoopconfiguration
+ hadoopConf =
+ spark.sessionState.newHadoopConfWithOptions(metadata.configuration
++ deltaLog.options),
+ // scalastyle:on deltahadoopconfiguration
+ partitionColumns = partitioningColumns,
+ bucketSpec = None,
+ statsTrackers = optionalStatsTracker.toSeq
+ ++ statsTrackers
+ ++ identityTrackerOpt.toSeq,
+ options = options
+ )
+ } catch {
+ case InnerInvariantViolationException(violationException) =>
+ // Pull an InvariantViolationException up to the top level if it was
the root cause.
+ throw violationException
+ }
+ statsTrackers.foreach {
+ case tracker: BasicWriteJobStatsTracker =>
+ val numOutputRowsOpt =
tracker.driverSideMetrics.get("numOutputRows").map(_.value)
+ IdentityColumn.logTableWrite(snapshot, trackIdentityHighWaterMarks,
numOutputRowsOpt)
+ case _ => ()
+ }
+ }
+
+ var resultFiles =
+ (if (optionalStatsTracker.isDefined) {
+ committer.addedStatuses.map {
+ a =>
+ a.copy(stats = optionalStatsTracker
+ .map(_.delegate.recordedStats(a.toPath.getName))
+ .getOrElse(a.stats))
+ }
+ } else {
+ committer.addedStatuses
+ })
+ .filter {
+ // In some cases, we can write out an empty `inputData`. Some
examples of this (though, they
+ // may be fixed in the future) are the MERGE command when you delete
with empty source, or
+ // empty target, or on disjoint tables. This is hard to catch before
the write without
+ // collecting the DF ahead of time. Instead, we can return only the
AddFiles that
+ // a) actually add rows, or
+ // b) don't have any stats so we don't know the number of rows at all
+ case a: AddFile => a.numLogicalRecords.forall(_ > 0)
+ case _ => true
+ }
+
+ // add [[AddFile.Tags.ICEBERG_COMPAT_VERSION.name]] tags to addFiles
+ if (IcebergCompatV2.isEnabled(metadata)) {
+ resultFiles = resultFiles.map {
+ addFile =>
+ val tags = if (addFile.tags != null) addFile.tags else
Map.empty[String, String]
+ addFile.copy(tags = tags + (AddFile.Tags.ICEBERG_COMPAT_VERSION.name
-> "2"))
+ }
+ }
+
+ if (resultFiles.nonEmpty && !isOptimize)
registerPostCommitHook(AutoCompact)
+ // Record the updated high water marks to be used during transaction
commit.
+ identityTrackerOpt.ifDefined {
+ tracker =>
updatedIdentityHighWaterMarks.appendAll(tracker.delegate.highWaterMarks.toSeq)
+ }
+
+ resultFiles.toSeq ++ committer.changeFiles
+ }
+
+ private def shouldOptimizeWrite(
+ writeOptions: Option[DeltaOptions],
+ sessionConf: SQLConf): Boolean = {
+ writeOptions
+ .flatMap(_.optimizeWrite)
+ .getOrElse(TransactionalWrite.shouldOptimizeWrite(metadata, sessionConf))
+ }
+}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala
new file mode 100644
index 0000000000..91bc39bd2e
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenParquetFileFormat.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.datasource.GlutenFormatFactory
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.{OutputWriter,
OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetOptions}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructType
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.parquet.hadoop.codec.CodecConfig
+import org.apache.parquet.hadoop.util.ContextUtil
+import org.slf4j.LoggerFactory
+
+class GlutenParquetFileFormat
+ extends ParquetFileFormat
+ with DataSourceRegister
+ with Logging
+ with Serializable {
+ import GlutenParquetFileFormat._
+
+ private val logger =
LoggerFactory.getLogger(classOf[GlutenParquetFileFormat])
+
+ override def shortName(): String = "gluten-parquet"
+
+ override def toString: String = "GlutenParquet"
+
+ override def hashCode(): Int = getClass.hashCode()
+
+ override def equals(other: Any): Boolean =
other.isInstanceOf[GlutenParquetFileFormat]
+
+ override def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ super.inferSchema(sparkSession, options, files)
+ }
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ if (isNativeWritable(dataSchema)) {
+ // Pass compression to job conf so that the file extension can be aware
of it.
+ val conf = ContextUtil.getConfiguration(job)
+ val parquetOptions = new ParquetOptions(options,
sparkSession.sessionState.conf)
+ conf.set(ParquetOutputFormat.COMPRESSION,
parquetOptions.compressionCodecClassName)
+ val nativeConf =
+ GlutenFormatFactory("parquet")
+ .nativeConf(options, parquetOptions.compressionCodecClassName)
+
+ return new OutputWriterFactory {
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ CodecConfig.from(context).getCodec.getExtension + ".parquet"
+ }
+
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ GlutenFormatFactory("parquet")
+ .createOutputWriter(path, dataSchema, context, nativeConf)
+
+ }
+ }
+ }
+ logger.warn(
+ s"Data schema is unsupported by Gluten Parquet writer: $dataSchema, " +
+ s"falling back to the vanilla Spark Parquet writer")
+ super.prepareWrite(sparkSession, job, options, dataSchema)
+ }
+}
+
+object GlutenParquetFileFormat {
+ def isNativeWritable(schema: StructType): Boolean = {
+ BackendsApiManager.getSettings.supportNativeWrite(schema.fields)
+ }
+}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
new file mode 100644
index 0000000000..90c0fa1bff
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.files
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.execution._
+import org.apache.gluten.execution.datasource.GlutenFormatFactory
+import org.apache.gluten.extension.columnar.transition.Transitions
+
+import org.apache.spark._
+import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.internal.io.{FileCommitProtocol,
SparkHadoopWriterUtils}
+import org.apache.spark.shuffle.FetchFailedException
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.connector.write.WriterCommitMessage
+import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat}
+import org.apache.spark.sql.delta.logging.DeltaLogKeys
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.FileFormatWriter._
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import java.util.{Date, UUID}
+
+// spotless:off
+/**
+ * A helper object for writing FileFormat data out to a location.
+ * Logic is copied from FileFormatWriter from Spark 3.5 with added
functionality to write partition
+ * values to data files. Specifically L123-126, L132, and L140 where it adds
option
+ * WRITE_PARTITION_COLUMNS
+ */
+object GlutenDeltaFileFormatWriter extends LoggingShims {
+
+ /**
+ * A variable used in tests to check whether the output ordering of the
query matches the
+ * required ordering of the write command.
+ */
+ private var outputOrderingMatched: Boolean = false
+
+ /**
+ * A variable used in tests to check the final executed plan.
+ */
+ private var executedPlan: Option[SparkPlan] = None
+
+ // scalastyle:off argcount
+ /**
+ * Basic work flow of this command is:
+ * 1. Driver side setup, including output committer initialization and data
source specific
+ * preparation work for the write job to be issued.
+ * 2. Issues a write job consists of one or more executor side tasks, each
of which writes all
+ * rows within an RDD partition.
+ * 3. If no exception is thrown in a task, commits that task, otherwise
aborts that task; If any
+ * exception is thrown during task commitment, also aborts that task.
+ * 4. If all tasks are committed, commit the job, otherwise aborts the job;
If any exception is
+ * thrown during job commitment, also aborts the job.
+ * 5. If the job is successfully committed, perform post-commit operations
such as
+ * processing statistics.
+ * @return The set of all partition paths that were updated during this
write job.
+ */
+ def write(
+ sparkSession: SparkSession,
+ plan: SparkPlan,
+ fileFormat: FileFormat,
+ committer: FileCommitProtocol,
+ outputSpec: OutputSpec,
+ hadoopConf: Configuration,
+ partitionColumns: Seq[Attribute],
+ bucketSpec: Option[BucketSpec],
+ statsTrackers: Seq[WriteJobStatsTracker],
+ options: Map[String, String],
+ numStaticPartitionCols: Int = 0): Set[String] = {
+ require(partitionColumns.size >= numStaticPartitionCols)
+
+ val job = Job.getInstance(hadoopConf)
+ job.setOutputKeyClass(classOf[Void])
+ job.setOutputValueClass(classOf[InternalRow])
+ FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
+
+ val partitionSet = AttributeSet(partitionColumns)
+ // cleanup the internal metadata information of
+ // the file source metadata attribute if any before write out
+ val finalOutputSpec = outputSpec.copy(
+ outputColumns = outputSpec.outputColumns
+ .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)
+ )
+ val dataColumns =
finalOutputSpec.outputColumns.filterNot(partitionSet.contains)
+
+ val writerBucketSpec = V1WritesUtils.getWriterBucketSpec(bucketSpec,
dataColumns, options)
+ val sortColumns = V1WritesUtils.getBucketSortColumns(bucketSpec,
dataColumns)
+
+ val caseInsensitiveOptions = CaseInsensitiveMap(options)
+
+ val dataSchema = dataColumns.toStructType
+ DataSourceUtils.verifySchema(fileFormat, dataSchema)
+ DataSourceUtils.checkFieldNames(fileFormat, dataSchema)
+ // Note: prepareWrite has side effect. It sets "job".
+
+ val outputDataColumns =
+ if
(caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true"))
{
+ dataColumns ++ partitionColumns
+ } else dataColumns
+
+ val outputWriterFactory =
+ fileFormat.prepareWrite(
+ sparkSession,
+ job,
+ caseInsensitiveOptions,
+ outputDataColumns.toStructType
+ )
+
+ val description = new WriteJobDescription(
+ uuid = UUID.randomUUID.toString,
+ serializableHadoopConf = new
SerializableConfiguration(job.getConfiguration),
+ outputWriterFactory = outputWriterFactory,
+ allColumns = finalOutputSpec.outputColumns,
+ dataColumns = outputDataColumns,
+ partitionColumns = partitionColumns,
+ bucketSpec = writerBucketSpec,
+ path = finalOutputSpec.outputPath,
+ customPartitionLocations = finalOutputSpec.customPartitionLocations,
+ maxRecordsPerFile = caseInsensitiveOptions
+ .get("maxRecordsPerFile")
+ .map(_.toLong)
+ .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
+ timeZoneId = caseInsensitiveOptions
+ .get(DateTimeUtils.TIMEZONE_OPTION)
+ .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone),
+ statsTrackers = statsTrackers
+ )
+
+ // We should first sort by dynamic partition columns, then bucket id, and
finally sorting
+ // columns.
+ val requiredOrdering = partitionColumns.drop(numStaticPartitionCols) ++
+ writerBucketSpec.map(_.bucketIdExpression) ++ sortColumns
+ val writeFilesOpt = V1WritesUtils.getWriteFilesOpt(plan)
+
+ // SPARK-40588: when planned writing is disabled and AQE is enabled,
+ // plan contains an AdaptiveSparkPlanExec, which does not know
+ // its final plan's ordering, so we have to materialize that plan first
+ // it is fine to use plan further down as the final plan is cached in that
plan
+ def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
+ case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
+ case p: SparkPlan =>
p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan))
+ }
+
+ // the sort order doesn't matter
+ val actualOrdering = writeFilesOpt
+ .map(_.child)
+ .getOrElse(materializeAdaptiveSparkPlan(plan))
+ .outputOrdering
+ val orderingMatched = V1WritesUtils.isOrderingMatched(requiredOrdering,
actualOrdering)
+
+ SQLExecution.checkSQLExecutionId(sparkSession)
+
+ // propagate the description UUID into the jobs, so that committers
+ // get an ID guaranteed to be unique.
+ job.getConfiguration.set("spark.sql.sources.writeJobUUID",
description.uuid)
+
+ // When `PLANNED_WRITE_ENABLED` is true, the optimizer rule V1Writes will
add logical sort
+ // operator based on the required ordering of the V1 write command. So the
output
+ // ordering of the physical plan should always match the required
ordering. Here
+ // we set the variable to verify this behavior in tests.
+ // There are two cases where FileFormatWriter still needs to add physical
sort:
+ // 1) When the planned write config is disabled.
+ // 2) When the concurrent writers are enabled (in this case the required
ordering of a
+ // V1 write command will be empty).
+ if (Utils.isTesting) outputOrderingMatched = orderingMatched
+
+ if (writeFilesOpt.isDefined) {
+ // build `WriteFilesSpec` for `WriteFiles`
+ val concurrentOutputWriterSpecFunc = (plan: SparkPlan) => {
+ val sortPlan = createSortPlan(plan, requiredOrdering, outputSpec)
+ createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)
+ }
+ val writeSpec = WriteFilesSpec(
+ description = description,
+ committer = committer,
+ concurrentOutputWriterSpecFunc = concurrentOutputWriterSpecFunc
+ )
+ executeWrite(sparkSession, plan, writeSpec, job)
+ } else {
+ executeWrite(
+ sparkSession,
+ plan,
+ job,
+ description,
+ committer,
+ outputSpec,
+ requiredOrdering,
+ partitionColumns,
+ sortColumns,
+ orderingMatched,
+ GlutenParquetFileFormat.isNativeWritable(dataSchema)
+ )
+ }
+ }
+ // scalastyle:on argcount
+
+ private def executeWrite(
+ sparkSession: SparkSession,
+ plan: SparkPlan,
+ job: Job,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol,
+ outputSpec: OutputSpec,
+ requiredOrdering: Seq[Expression],
+ partitionColumns: Seq[Attribute],
+ sortColumns: Seq[Attribute],
+ orderingMatched: Boolean,
+ writeOffloadable: Boolean): Set[String] = {
+ val projectList = V1WritesUtils.convertEmptyToNull(plan.output,
partitionColumns)
+ val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList,
plan) else plan
+
+ writeAndCommit(job, description, committer) {
+ val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) {
+ (empty2NullPlan, None)
+ } else {
+ val sortPlan = createSortPlan(empty2NullPlan, requiredOrdering,
outputSpec)
+ val concurrentOutputWriterSpec =
+ createConcurrentOutputWriterSpec(sparkSession, sortPlan, sortColumns)
+ if (concurrentOutputWriterSpec.isDefined) {
+ (empty2NullPlan, concurrentOutputWriterSpec)
+ } else {
+ def addNativeSort(input: SparkPlan): SparkPlan = {
+ val nativeSortPlan = SortExecTransformer(sortPlan.sortOrder,
sortPlan.global, child = input)
+ val validationResult = nativeSortPlan.doValidate()
+ assert(validationResult.ok(),
+ s"Sort operation for Delta write is not offload-able:
${validationResult.reason()}")
+ nativeSortPlan
+ }
+ val newPlan = sortPlan.child match {
+ case WholeStageTransformer(wholeStageChild, materializeInput) =>
+ WholeStageTransformer(addNativeSort(wholeStageChild),
+
materializeInput)(ColumnarCollapseTransformStages.transformStageCounter.incrementAndGet())
+ case other =>
+ Transitions.toBatchPlan(sortPlan, VeloxBatchType)
+ }
+ (newPlan, None)
+ }
+ }
+
+ val wrappedPlanToExecute = if (writeOffloadable) {
+
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(planToExecute)
+ } else {
+ planToExecute
+ }
+
+ // In testing, this is the only way to get hold of the actually executed
plan written to file
+ if (Utils.isTesting) executedPlan = Some(wrappedPlanToExecute)
+
+ val rdd = wrappedPlanToExecute.execute()
+
+ // SPARK-23271 If we are attempting to write a zero partition rdd,
create a dummy single
+ // partition rdd to make sure we at least set up one write task to write
the metadata.
+ val rddWithNonEmptyPartitions = if (rdd.partitions.length == 0) {
+ sparkSession.sparkContext.parallelize(Array.empty[InternalRow], 1)
+ } else {
+ rdd
+ }
+
+ val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
+ val ret = new
Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
+ val partitionColumnToDataType = description.partitionColumns
+ .map(attr => (attr.name, attr.dataType)).toMap
+ sparkSession.sparkContext.runJob(
+ rddWithNonEmptyPartitions,
+ (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
+ executeTask(
+ description = description,
+ jobTrackerID = jobTrackerID,
+ sparkStageId = taskContext.stageId(),
+ sparkPartitionId = taskContext.partitionId(),
+ sparkAttemptNumber = taskContext.taskAttemptId().toInt &
Integer.MAX_VALUE,
+ committer,
+ iterator = iter,
+ concurrentOutputWriterSpec = concurrentOutputWriterSpec,
+ partitionColumnToDataType
+ )
+ },
+ rddWithNonEmptyPartitions.partitions.indices,
+ (index, res: WriteTaskResult) => {
+ committer.onTaskCommit(res.commitMsg)
+ ret(index) = res
+ }
+ )
+ ret
+ }
+ }
+
+ private def writeAndCommit(
+ job: Job,
+ description: WriteJobDescription,
+ committer: FileCommitProtocol)(f: =>
Array[WriteTaskResult]): Set[String] = {
+ // This call shouldn't be put into the `try` block below because it only
initializes and
+ // prepares the job, any exception thrown from here shouldn't cause
abortJob() to be called.
+ committer.setupJob(job)
+ try {
+ val ret = f
+ val commitMsgs = ret.map(_.commitMsg)
+
+ logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID,
description.uuid)}.")
+ val (_, duration) = Utils.timeTakenMs { committer.commitJob(job,
commitMsgs) }
+ logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}
committed. " +
+ log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.")
+
+ processStats(description.statsTrackers, ret.map(_.summary.stats),
duration)
+ logInfo(log"Finished processing stats for write job " +
+ log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
+
+ // return a set of all the partition paths that were updated during this
job
+ ret.map(_.summary.updatedPartitions).reduceOption(_ ++
_).getOrElse(Set.empty)
+ } catch {
+ case cause: Throwable =>
+ logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID,
description.uuid)}", cause)
+ committer.abortJob(job)
+ throw cause
+ }
+ }
+
+ /**
+ * Write files using [[SparkPlan.executeWrite]]
+ */
+ private def executeWrite(
+ session: SparkSession,
+ planForWrites: SparkPlan,
+ writeFilesSpec: WriteFilesSpec,
+ job: Job): Set[String] = {
+ val committer = writeFilesSpec.committer
+ val description = writeFilesSpec.description
+
+ // In testing, this is the only way to get hold of the actually executed
plan written to file
+ if (Utils.isTesting) executedPlan = Some(planForWrites)
+
+ writeAndCommit(job, description, committer) {
+ val rdd = planForWrites.executeWrite(writeFilesSpec)
+ val ret = new Array[WriteTaskResult](rdd.partitions.length)
+ session.sparkContext.runJob(
+ rdd,
+ (context: TaskContext, iter: Iterator[WriterCommitMessage]) => {
+ assert(iter.hasNext)
+ val commitMessage = iter.next()
+ assert(!iter.hasNext)
+ commitMessage
+ },
+ rdd.partitions.indices,
+ (index, res: WriterCommitMessage) => {
+ assert(res.isInstanceOf[WriteTaskResult])
+ val writeTaskResult = res.asInstanceOf[WriteTaskResult]
+ committer.onTaskCommit(writeTaskResult.commitMsg)
+ ret(index) = writeTaskResult
+ }
+ )
+ ret
+ }
+ }
+
+ private def createSortPlan(
+ plan: SparkPlan,
+ requiredOrdering: Seq[Expression],
+ outputSpec: OutputSpec): SortExec = {
+ // SPARK-21165: the `requiredOrdering` is based on the attributes from
analyzed plan, and
+ // the physical plan may have different attribute ids due to optimizer
removing some
+ // aliases. Here we bind the expression ahead to avoid potential attribute
ids mismatch.
+ val orderingExpr =
+ bindReferences(requiredOrdering.map(SortOrder(_, Ascending)),
outputSpec.outputColumns)
+ SortExec(orderingExpr, global = false, child = plan)
+ }
+
+ private def createConcurrentOutputWriterSpec(
+ sparkSession: SparkSession,
+ sortPlan: SortExec,
+ sortColumns: Seq[Attribute]):
Option[ConcurrentOutputWriterSpec] = {
+ val maxWriters =
sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
+ val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
+ if (concurrentWritersEnabled) {
+ Some(ConcurrentOutputWriterSpec(maxWriters, () =>
sortPlan.createSorter()))
+ } else {
+ None
+ }
+ }
+
+ /** Writes data out in a single Spark task. */
+ private def executeTask(
+ description: WriteJobDescription,
+ jobTrackerID: String,
+ sparkStageId: Int,
+ sparkPartitionId: Int,
+ sparkAttemptNumber: Int,
+ committer: FileCommitProtocol,
+ iterator: Iterator[InternalRow],
+ concurrentOutputWriterSpec:
Option[ConcurrentOutputWriterSpec],
+ partitionColumnToDataType: Map[String, DataType]):
WriteTaskResult = {
+
+ val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId)
+ val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
+ val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+ // Set up the attempt context required to use in the output committer.
+ val taskAttemptContext: TaskAttemptContext = {
+ // Set up the configuration object
+ val hadoopConf = description.serializableHadoopConf.value
+ hadoopConf.set("mapreduce.job.id", jobId.toString)
+ hadoopConf.set("mapreduce.task.id", taskAttemptId.getTaskID.toString)
+ hadoopConf.set("mapreduce.task.attempt.id", taskAttemptId.toString)
+ hadoopConf.setBoolean("mapreduce.task.ismap", true)
+ hadoopConf.setInt("mapreduce.task.partition", 0)
+
+ if (partitionColumnToDataType.isEmpty) {
+ new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+ } else {
+ new
DeltaFileFormatWriter.PartitionedTaskAttemptContextImpl(hadoopConf,
taskAttemptId, partitionColumnToDataType)
+ }
+ }
+
+ committer.setupTask(taskAttemptContext)
+
+ val dataWriter =
+ if (sparkPartitionId != 0 && !iterator.hasNext) {
+ // In case of empty job, leave first partition to save meta for file
format like parquet.
+ new EmptyDirectoryDataWriter(description, taskAttemptContext,
committer)
+ } else if (description.partitionColumns.isEmpty &&
description.bucketSpec.isEmpty) {
+ new SingleDirectoryDataWriter(description, taskAttemptContext,
committer)
+ } else {
+ concurrentOutputWriterSpec match {
+ case Some(spec) =>
+ new DynamicPartitionDataConcurrentWriter(
+ description,
+ taskAttemptContext,
+ committer,
+ spec
+ )
+ case _ =>
+ // Columnar-based partition writer to divide the input batch by
partition values
+ // and bucket IDs in advance.
+ new ColumnarDynamicPartitionDataSingleWriter(description,
taskAttemptContext, committer)
+ }
+ }
+
+ try {
+ Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+ // Execute the task to write rows out and commit the task.
+ dataWriter.writeWithIterator(iterator)
+ dataWriter.commit()
+ })(catchBlock = {
+ // If there is an error, abort the task
+ dataWriter.abort()
+ logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.")
+ }, finallyBlock = {
+ dataWriter.close()
+ })
+ } catch {
+ case e: FetchFailedException =>
+ throw e
+ case f: FileAlreadyExistsException if
SQLConf.get.fastFailFileFormatOutput =>
+ // If any output file to write already exists, it does not make sense
to re-run this task.
+ // We throw the exception and let Executor throw ExceptionFailure to
abort the job.
+ throw new TaskOutputFileAlreadyExistException(f)
+ case t: Throwable =>
+ throw
QueryExecutionErrors.taskFailedWhileWritingRowsError(description.path, t)
+ }
+ }
+
+ /**
+ * For every registered [[WriteJobStatsTracker]], call `processStats()` on
it, passing it
+ * the corresponding [[WriteTaskStats]] from all executors.
+ */
+ private def processStats(
+ statsTrackers: Seq[WriteJobStatsTracker],
+ statsPerTask: Seq[Seq[WriteTaskStats]],
+ jobCommitDuration: Long): Unit = {
+
+ val numStatsTrackers = statsTrackers.length
+ assert(
+ statsPerTask.forall(_.length == numStatsTrackers),
+ s"""Every WriteTask should have produced one `WriteTaskStats` object for
every tracker.
+ |There are $numStatsTrackers statsTrackers, but some task returned
+ |${statsPerTask.find(_.length != numStatsTrackers).get.length}
results instead.
+ """.stripMargin
+ )
+
+ val statsPerTracker = if (statsPerTask.nonEmpty) {
+ statsPerTask.transpose
+ } else {
+ statsTrackers.map(_ => Seq.empty)
+ }
+
+ statsTrackers.zip(statsPerTracker).foreach {
+ case (statsTracker, stats) => statsTracker.processStats(stats,
jobCommitDuration)
+ }
+ }
+
+ private class ColumnarDynamicPartitionDataSingleWriter(
+ description:
WriteJobDescription,
+ taskAttemptContext:
TaskAttemptContext,
+ committer:
FileCommitProtocol,
+ customMetrics:
Map[String, SQLMetric] = Map.empty)
+ extends BaseDynamicPartitionDataWriter(
+ description,
+ taskAttemptContext,
+ committer,
+ customMetrics) {
+
+ private var currentPartitionValues: Option[UnsafeRow] = None
+ private var currentBucketId: Option[Int] = None
+
+ private val partitionColIndice: Array[Int] =
+ description.partitionColumns.flatMap {
+ pcol =>
+ description.allColumns.zipWithIndex.collect {
+ case (acol, index) if acol.name == pcol.name && acol.exprId ==
pcol.exprId => index
+ }
+ }.toArray
+
+ private def beforeWrite(record: InternalRow): Unit = {
+ val nextPartitionValues = if (isPartitioned)
Some(getPartitionValues(record)) else None
+ val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None
+
+ if (currentPartitionValues != nextPartitionValues || currentBucketId !=
nextBucketId) {
+ // See a new partition or bucket - write to a new partition dir (or a
new bucket file).
+ if (isPartitioned && currentPartitionValues != nextPartitionValues) {
+ currentPartitionValues = Some(nextPartitionValues.get.copy())
+ statsTrackers.foreach(_.newPartition(currentPartitionValues.get))
+ }
+ if (isBucketed) {
+ currentBucketId = nextBucketId
+ }
+
+ fileCounter = 0
+ renewCurrentWriter(currentPartitionValues, currentBucketId,
closeCurrentWriter = true)
+ } else if (description.maxRecordsPerFile > 0 &&
+ recordsInFile >= description.maxRecordsPerFile) {
+ renewCurrentWriterIfTooManyRecords(currentPartitionValues,
currentBucketId)
+ }
+ }
+
+ override def write(record: InternalRow): Unit = {
+ record match {
+ case carrierRow: BatchCarrierRow =>
+ carrierRow match {
+ case placeholderRow: PlaceholderRow =>
+ // Do nothing.
+ case terminalRow: TerminalRow =>
+ val numRows = terminalRow.batch().numRows()
+ if (numRows > 0) {
+ val blockStripes = GlutenFormatFactory.rowSplitter
+ .splitBlockByPartitionAndBucket(terminalRow.batch(),
partitionColIndice,
+ isBucketed)
+ val iter = blockStripes.iterator()
+ while (iter.hasNext) {
+ val blockStripe = iter.next()
+ val headingRow = blockStripe.getHeadingRow
+ beforeWrite(headingRow)
+ val columnBatch = blockStripe.getColumnarBatch
+ currentWriter.write(terminalRow.withNewBatch(columnBatch))
+ columnBatch.close()
+ }
+ blockStripes.release()
+ for (_ <- 0 until numRows) {
+ statsTrackers.foreach(_.newRow(currentWriter.path, record))
+ }
+ recordsInFile += numRows
+ }
+ }
+ case _ =>
+ beforeWrite(record)
+ writeRecord(record)
+ }
+ }
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
new file mode 100644
index 0000000000..30e61730c1
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.stats
+
+import org.apache.gluten.execution.{PlaceholderRow, TerminalRow,
VeloxColumnarToRowExec}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker
+import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker,
WriteTaskStats, WriteTaskStatsTracker}
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker)
+ extends WriteJobStatsTracker {
+ import GlutenDeltaJobStatisticsTracker._
+
+ override def newTaskInstance(): WriteTaskStatsTracker = {
+ new GlutenDeltaTaskStatisticsTracker(
+ delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker])
+ }
+
+ override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long):
Unit = {
+ delegate.processStats(stats, jobCommitTime)
+ }
+}
+
+class GlutenDeltaIdentityColumnStatsTracker(override val delegate:
DeltaIdentityColumnStatsTracker)
+ extends GlutenDeltaJobStatisticsTracker(delegate)
+
+private object GlutenDeltaJobStatisticsTracker {
+
+ /**
+ * This is a temporary implementation of statistics tracker for Delta Lake.
It's sub-optimal in
+ * performance because it internally performs C2R then send rows to the
delegate row-based
+ * tracker.
+ *
+ * TODO: Columnar-based statistics collection.
+ */
+ private class GlutenDeltaTaskStatisticsTracker(delegate:
DeltaTaskStatisticsTracker)
+ extends WriteTaskStatsTracker {
+
+ private val c2r = new VeloxColumnarToRowExec.Converter(new
SQLMetric("convertTime"))
+
+ override def newPartition(partitionValues: InternalRow): Unit = {
+ delegate.newPartition(partitionValues)
+ }
+
+ override def newFile(filePath: String): Unit = {
+ delegate.newFile(filePath)
+ }
+
+ override def closeFile(filePath: String): Unit = {
+ delegate.closeFile(filePath)
+ }
+
+ override def newRow(filePath: String, row: InternalRow): Unit = {
+ row match {
+ case _: PlaceholderRow =>
+ case t: TerminalRow =>
+ c2r.toRowIterator(t.batch()).foreach(eachRow =>
delegate.newRow(filePath, eachRow))
+ case otherRow =>
+ delegate.newRow(filePath, otherRow)
+ }
+ }
+
+ override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
+ delegate.getFinalStats(taskCommitTime)
+ }
+ }
+}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
new file mode 100644
index 0000000000..832cc084bb
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.delta.{GlutenOptimisticTransaction,
OptimisticTransaction, TransactionExecutionObserver}
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+case class GlutenDeltaLeafV2CommandExec(delegate: LeafV2CommandExec) extends
LeafV2CommandExec {
+
+ override def metrics: Map[String, SQLMetric] = delegate.metrics
+
+ override protected def run(): Seq[InternalRow] = {
+ TransactionExecutionObserver.withObserver(
+ DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) {
+ delegate.executeCollect()
+ }
+ }
+
+ override def output: Seq[Attribute] = {
+ delegate.output
+ }
+
+ override def nodeName: String = "GlutenDelta " + delegate.nodeName
+}
+
+case class GlutenDeltaLeafRunnableCommand(delegate: LeafRunnableCommand)
+ extends LeafRunnableCommand {
+ override lazy val metrics: Map[String, SQLMetric] = delegate.metrics
+
+ override def output: Seq[Attribute] = {
+ delegate.output
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ TransactionExecutionObserver.withObserver(
+ DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) {
+ delegate.run(sparkSession)
+ }
+ }
+
+ override def nodeName: String = "GlutenDelta " + delegate.nodeName
+}
+
+object DeltaV2WriteOperators {
+ object UseColumnarDeltaTransactionLog extends TransactionExecutionObserver {
+ override def startingTransaction(f: => OptimisticTransaction):
OptimisticTransaction = {
+ val delegate = f
+ new GlutenOptimisticTransaction(delegate)
+ }
+
+ override def preparingCommit[T](f: => T): T = f
+
+ override def beginDoCommit(): Unit = ()
+
+ override def beginBackfill(): Unit = ()
+
+ override def beginPostCommit(): Unit = ()
+
+ override def transactionCommitted(): Unit = ()
+
+ override def transactionAborted(): Unit = ()
+
+ override def createChild(): TransactionExecutionObserver = {
+ TransactionExecutionObserver.getObserver
+ }
+ }
+}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
new file mode 100644
index 0000000000..60865682b3
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.gluten.config.VeloxDeltaConfig
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+
+import org.apache.spark.sql.delta.catalog.DeltaCatalog
+import org.apache.spark.sql.delta.commands.DeleteCommand
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+
+case class OffloadDeltaCommand() extends OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = {
+ if (!VeloxDeltaConfig.get.enableNativeWrite) {
+ return plan
+ }
+ plan match {
+ case ExecutedCommandExec(dc: DeleteCommand) =>
+ ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc))
+ case ctas: AtomicCreateTableAsSelectExec if
ctas.catalog.isInstanceOf[DeltaCatalog] =>
+ GlutenDeltaLeafV2CommandExec(ctas)
+ case rtas: AtomicReplaceTableAsSelectExec if
rtas.catalog.isInstanceOf[DeltaCatalog] =>
+ GlutenDeltaLeafV2CommandExec(rtas)
+ case other => other
+ }
+ }
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
index 7bd6a59b7b..d32772f565 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
@@ -36,7 +36,10 @@ class DeleteSQLSuite extends DeleteSuiteBase
Seq(
// FIXME: Excluded by Gluten as results are mismatch.
"test delete on temp view - nontrivial projection - SQL TempView",
- "test delete on temp view - nontrivial projection - Dataset TempView"
+ "test delete on temp view - nontrivial projection - Dataset TempView",
+ // FIXME: Different error messages.
+ "test delete on temp view - superset cols - SQL TempView",
+ "test delete on temp view - superset cols - Dataset TempView"
)
// For EXPLAIN, which is not supported in OSS
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
index 4e7326c8c1..8b290e4db7 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
@@ -507,14 +507,14 @@ trait DeltaTestUtilsForTempViews
expectedErrorClassForDataSetTempView: String = null): Unit = {
if (isSQLTempView) {
if (expectedErrorMsgForSQLTempView != null) {
- errorContains(ex.getMessage, expectedErrorMsgForSQLTempView)
+ checkError(ex, expectedErrorMsgForSQLTempView)
}
if (expectedErrorClassForSQLTempView != null) {
assert(ex.getErrorClass == expectedErrorClassForSQLTempView)
}
} else {
if (expectedErrorMsgForDataSetTempView != null) {
- errorContains(ex.getMessage, expectedErrorMsgForDataSetTempView)
+ checkError(ex, expectedErrorMsgForDataSetTempView)
}
if (expectedErrorClassForDataSetTempView != null) {
assert(ex.getErrorClass == expectedErrorClassForDataSetTempView,
ex.getMessage)
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
index 3d94d2bde3..bf64858399 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.delta.test
+import org.apache.gluten.config.VeloxDeltaConfig
+
import org.apache.spark.SparkConf
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -47,6 +49,7 @@ trait DeltaSQLCommandTest extends SharedSparkSession {
.set("spark.sql.shuffle.partitions", "1")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ .set(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key, "true")
}
}
// spotless:on
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
index e722b2767a..9ada805c6e 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/DataGen.scala
@@ -74,6 +74,17 @@ object DataGen {
def run(spark: SparkSession, source: String)
}
+ object Feature {
+ def run(spark: SparkSession, source: String, feature: Feature): Unit = {
+ println(s"Executing feature: ${feature.name()}")
+ val start = System.nanoTime()
+ feature.run(spark, source)
+ val end = System.nanoTime()
+ println(
+ s"Finished executing feature: ${feature.name()}, elapsed time: ${(end
- start) / 1e6} ms.")
+ }
+ }
+
class FeatureRegistry extends Serializable {
private val lookup: mutable.LinkedHashMap[String, Feature] =
mutable.LinkedHashMap()
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
index 50404228f5..0c4bf94c71 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/ds/TpcdsDataGen.scala
@@ -137,11 +137,7 @@ class TpcdsDataGen(
override def gen(): Unit = {
Table.getBaseTables.forEach(t => writeParquetTable(t))
- features.foreach {
- feature =>
- println(s"Execute feature: ${feature.name()}")
- feature.run(spark, source)
- }
+ features.foreach(feature => DataGen.Feature.run(spark, source, feature))
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
index eaf4b4ded9..aed8653f62 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/h/TpchDataGen.scala
@@ -58,11 +58,7 @@ class TpchDataGen(
generate(dir, "part", partSchema, partitions, partGenerator, partParser)
generate(dir, "region", regionSchema, regionGenerator, regionParser)
- features.foreach {
- feature =>
- println(s"Execute feature: ${feature.name()}")
- feature.run(spark, source)
- }
+ features.foreach(feature => DataGen.Feature.run(spark, source, feature))
}
// lineitem
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]