This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5cfa0d70a3 [GLUTEN-10215][VL] Delta Write: Offload
DeltaOptimizedWriterExec (#11461)
5cfa0d70a3 is described below
commit 5cfa0d70a37d2adff355dd834e01b00f4839271f
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jan 23 09:38:40 2026 +0000
[GLUTEN-10215][VL] Delta Write: Offload DeltaOptimizedWriterExec (#11461)
---
backends-velox/pom.xml | 6 +
.../sql/delta/GlutenOptimisticTransaction.scala | 34 +-
.../delta/files/GlutenDeltaFileFormatWriter.scala | 8 +-
.../perf/GlutenDeltaOptimizedWriterExec.scala | 365 ++++++++++++++++++
.../clustering/ClusteredTableClusteringSuite.scala | 153 ++++++++
.../CoordinatedCommitsTestUtils.scala | 421 +++++++++++++++++++++
.../delta/skipping/ClusteredTableTestUtils.scala | 385 +++++++++++++++++++
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 10 +-
.../ColumnarCollapseTransformStages.scala | 5 +-
pom.xml | 1 +
10 files changed, 1380 insertions(+), 8 deletions(-)
diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index d19ec7ea1b..986728a47a 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -538,6 +538,12 @@
<artifactId>${delta.package.name}_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-dynamodb</artifactId>
+ <version>${aws-java-sdk-dynamodb.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
index 275d2c0cd9..0f2381f454 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
@@ -16,14 +16,15 @@
*/
package org.apache.spark.sql.delta
-import org.apache.gluten.config.VeloxDeltaConfig
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.extension.columnar.transition.Transitions
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints,
DeltaInvariantCheckerExec}
import org.apache.spark.sql.delta.files.{GlutenDeltaFileFormatWriter,
TransactionalWrite}
import org.apache.spark.sql.delta.hooks.AutoCompact
-import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec
+import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec,
GlutenDeltaOptimizedWriterExec}
import org.apache.spark.sql.delta.schema.InnerInvariantViolationException
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import
org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker,
GlutenDeltaJobStatisticsTracker}
@@ -50,7 +51,6 @@ class GlutenOptimisticTransaction(delegate:
OptimisticTransaction)
hasWritten = true
val spark = inputData.sparkSession
- val veloxDeltaConfig = new VeloxDeltaConfig(spark.sessionState.conf)
val (data, partitionSchema) = performCDCPartition(inputData)
val outputPath = deltaLog.dataPath
@@ -108,7 +108,31 @@ class GlutenOptimisticTransaction(delegate:
OptimisticTransaction)
!isOptimize &&
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
) {
- DeltaOptimizedWriterExec(maybeCheckInvariants,
metadata.partitionColumns, deltaLog)
+ // FIXME: This may create unexpected C2R2C / R2C where the original
plan is better to be
+ // written with the vanilla DeltaOptimizedWriterExec. We'd optimize
the query plan
+ // here further.
+ val planWithVeloxOutput =
Transitions.toBatchPlan(maybeCheckInvariants, VeloxBatchType)
+ try {
+ val glutenWriterExec = GlutenDeltaOptimizedWriterExec(
+ planWithVeloxOutput,
+ metadata.partitionColumns,
+ deltaLog)
+ val validationResult = glutenWriterExec.doValidate()
+ if (validationResult.ok()) {
+ glutenWriterExec
+ } else {
+ logInfo(
+ s"GlutenDeltaOptimizedWriterExec: Internal shuffle validated
negative," +
+ s" reason: ${validationResult.reason()}. Falling back to
row-based shuffle.")
+ DeltaOptimizedWriterExec(maybeCheckInvariants,
metadata.partitionColumns, deltaLog)
+ }
+ } catch {
+ case e: AnalysisException =>
+ logWarning(
+ s"GlutenDeltaOptimizedWriterExec: Failed to create internal
shuffle," +
+ s" reason: ${e.getMessage()}. Falling back to row-based
shuffle.")
+ DeltaOptimizedWriterExec(maybeCheckInvariants,
metadata.partitionColumns, deltaLog)
+ }
} else {
maybeCheckInvariants
}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index 8b5a4fdc34..3ea64ab6e1 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -18,9 +18,10 @@ package org.apache.spark.sql.delta.files
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._
import org.apache.gluten.execution.datasource.GlutenFormatFactory
-import org.apache.gluten.extension.columnar.transition.Transitions
+import org.apache.gluten.extension.columnar.transition.{Convention,
Transitions}
import org.apache.spark._
import org.apache.spark.internal.{LoggingShims, MDC}
@@ -262,6 +263,11 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
val newPlan = sortPlan.child match {
case wst @ WholeStageTransformer(wholeStageChild, _) =>
wst.withNewChildren(Seq(addNativeSort(wholeStageChild)))
+ case other if Convention.get(other).batchType == VeloxBatchType =>
+ val nativeSortPlan = addNativeSort(other)
+ val nativeSortPlanWithWst =
+ GenerateTransformStageId()(ColumnarCollapseTransformStages(new
GlutenConfig(sparkSession.sessionState.conf))(nativeSortPlan))
+ nativeSortPlanWithWst
case other =>
Transitions.toBatchPlan(sortPlan, VeloxBatchType)
}
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala
new file mode 100644
index 0000000000..8e83a8af87
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.perf
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{ValidatablePlan, ValidationResult}
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.vectorized.ColumnarBatchSerializerInstance
+
+// scalastyle:off import.ordering.noEmptyLine
+import org.apache.spark._
+import org.apache.spark.internal.config
+import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rdd.RDD
+import org.apache.spark.shuffle._
+import org.apache.spark.shuffle.sort.ColumnarShuffleManager
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.util.BinPackingUtils
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
ColumnarShuffleExchangeExec, GenerateTransformStageId}
+import org.apache.spark.sql.execution.{ShuffledColumnarBatchRDD, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics,
SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage._
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+/** Gluten's vectorized version of [[DeltaOptimizedWriterExec]]. */
+case class GlutenDeltaOptimizedWriterExec(
+ child: SparkPlan,
+ partitionColumns: Seq[String],
+ @transient deltaLog: DeltaLog
+) extends ValidatablePlan
+ with UnaryExecNode
+ with DeltaLogging {
+
+ override def output: Seq[Attribute] = child.output
+
+ private lazy val writeMetrics =
+ SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+ private lazy val readMetrics =
+ SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+ override lazy val metrics: Map[String, SQLMetric] = Map(
+ "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
+ ) ++ readMetrics ++ writeMetrics
+
+ private lazy val childNumPartitions =
child.executeColumnar().getNumPartitions
+
+ private lazy val numPartitions: Int = {
+ val targetShuffleBlocks =
getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS)
+ math.min(
+ math.max(targetShuffleBlocks / childNumPartitions, 1),
+ getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS))
+ }
+
+ @transient private var cachedShuffleRDD: ShuffledColumnarBatchRDD = _
+
+ @transient private lazy val mapTracker = SparkEnv.get.mapOutputTracker
+
+ private lazy val columnarShufflePlan = {
+ val resolver =
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+ val saltedPartitioning = HashPartitioning(
+ partitionColumns.map(
+ p =>
+ output
+ .find(o => resolver(p, o.name))
+ .getOrElse(throw
DeltaErrors.failedFindPartitionColumnInOutputPlan(p))),
+ numPartitions)
+ val shuffle =
+ ShuffleExchangeExec(saltedPartitioning, child)
+ val columnarShuffle =
+
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(shuffle)
+ val columnarShuffleWithWst =
+ GenerateTransformStageId()(
+ ColumnarCollapseTransformStages(new
GlutenConfig(conf))(columnarShuffle))
+ columnarShuffleWithWst.asInstanceOf[ColumnarShuffleExchangeExec]
+ }
+
+ /** Creates a ShuffledRowRDD for facilitating the shuffle in the map side. */
+ private def getShuffleRDD: ShuffledColumnarBatchRDD = {
+ if (cachedShuffleRDD == null) {
+ val columnarShuffleRdd =
+
columnarShufflePlan.executeColumnar().asInstanceOf[ShuffledColumnarBatchRDD]
+ cachedShuffleRDD = columnarShuffleRdd
+ }
+ cachedShuffleRDD
+ }
+
+ private def computeBins(): Array[List[(BlockManagerId, ArrayBuffer[(BlockId,
Long, Int)])]] = {
+ // Get all shuffle information
+ val shuffleStats = getShuffleStats()
+
+ // Group by blockId instead of block manager
+ val blockInfo = shuffleStats.flatMap {
+ case (bmId, blocks) =>
+ blocks.map {
+ case (blockId, size, index) =>
+ (blockId, (bmId, size, index))
+ }
+ }.toMap
+
+ val maxBinSize =
+
ByteUnit.BYTE.convertFrom(getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_BIN_SIZE),
ByteUnit.MiB)
+
+ val bins = shuffleStats.toSeq
+ .flatMap(_._2)
+ .groupBy(_._1.asInstanceOf[ShuffleBlockId].reduceId)
+ .flatMap {
+ case (_, blocks) =>
+ BinPackingUtils.binPackBySize[(BlockId, Long, Int), BlockId](
+ blocks,
+ _._2, // size
+ _._1, // blockId
+ maxBinSize)
+ }
+
+ bins
+ .map {
+ bin =>
+ var binSize = 0L
+ val blockLocations =
+ new mutable.HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long,
Int)]]()
+ for (blockId <- bin) {
+ val (bmId, size, index) = blockInfo(blockId)
+ binSize += size
+ val blocksAtBM =
+ blockLocations.getOrElseUpdate(bmId, new ArrayBuffer[(BlockId,
Long, Int)]())
+ blocksAtBM.append((blockId, size, index))
+ }
+ (binSize, blockLocations.toList)
+ }
+ .toArray
+ .sortBy(_._1)(Ordering[Long].reverse) // submit largest blocks first
+ .map(_._2)
+ }
+
+ /** Performs the shuffle before the write, so that we can bin-pack output
data. */
+ private def getShuffleStats(): Array[(BlockManagerId,
collection.Seq[(BlockId, Long, Int)])] = {
+ val dep = getShuffleRDD.dependency
+ // Gets the shuffle output stats
+ def getStats() =
+ mapTracker.getMapSizesByExecutorId(dep.shuffleId, 0, Int.MaxValue, 0,
numPartitions).toArray
+
+ // Executes the shuffle map stage in case we are missing output stats
+ def awaitShuffleMapStage(): Unit = {
+ assert(dep != null, "Shuffle dependency should not be null")
+ // hack to materialize the shuffle files in a fault tolerant way
+ ThreadUtils.awaitResult(sparkContext.submitMapStage(dep), Duration.Inf)
+ }
+
+ try {
+ val res = getStats()
+ if (res.isEmpty) awaitShuffleMapStage()
+ getStats()
+ } catch {
+ case e: FetchFailedException =>
+ logWarning(log"Failed to fetch shuffle blocks for the optimized
writer. Retrying", e)
+ awaitShuffleMapStage()
+ getStats()
+ throw e
+ }
+ }
+
+ override protected def doValidateInternal(): ValidationResult = {
+ // Single partitioned tasks can simply be written.
+ if (childNumPartitions <= 1) {
+ return ValidationResult.succeeded
+ }
+ columnarShufflePlan.doValidate()
+ }
+
+ override def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException
+
+ override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ // Single partitioned tasks can simply be written.
+ if (childNumPartitions <= 1) {
+ return child.executeColumnar()
+ }
+ val shuffledRDD = getShuffleRDD
+
+ val partitions = computeBins()
+
+ recordDeltaEvent(
+ deltaLog,
+ "delta.optimizeWrite.planned",
+ data = Map(
+ "originalPartitions" -> childNumPartitions,
+ "outputPartitions" -> partitions.length,
+ "shufflePartitions" -> numPartitions,
+ "numShuffleBlocks" ->
getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS),
+ "binSize" -> getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_BIN_SIZE),
+ "maxShufflePartitions" ->
+ getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS)
+ )
+ )
+
+ new GlutenDeltaOptimizedWriterRDD(
+ sparkContext,
+ shuffledRDD.dependency,
+ readMetrics,
+ new OptimizedWriterBlocks(partitions))
+ }
+
+ private def getConf[T](entry: ConfigEntry[T]): T = {
+ conf.getConf(entry)
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
GlutenDeltaOptimizedWriterExec =
+ copy(child = newChild)
+
+ override def batchType(): Convention.BatchType = VeloxBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+}
+
+/**
+ * A specialized implementation similar to `ShuffledRowRDD`, where a partition
reads a prepared set
+ * of shuffle blocks.
+ */
+private class GlutenDeltaOptimizedWriterRDD(
+ @transient sparkContext: SparkContext,
+ var dep: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
+ metrics: Map[String, SQLMetric],
+ @transient blocks: OptimizedWriterBlocks)
+ extends RDD[ColumnarBatch](sparkContext, Seq(dep))
+ with DeltaLogging {
+
+ override def getPartitions: Array[Partition] =
Array.tabulate(blocks.bins.length) {
+ i => ShuffleBlockRDDPartition(i, blocks.bins(i))
+ }
+
+ override def compute(split: Partition, context: TaskContext):
Iterator[ColumnarBatch] = {
+ val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+ val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics,
metrics)
+
+ val blocks = if (context.stageAttemptNumber() > 0) {
+ // We lost shuffle blocks, so we need to now get new manager addresses
+ val executorTracker = SparkEnv.get.mapOutputTracker
+ val oldBlockLocations =
split.asInstanceOf[ShuffleBlockRDDPartition].blocks
+
+ // assumes we bin-pack by reducerId
+ val reducerId =
oldBlockLocations.head._2.head._1.asInstanceOf[ShuffleBlockId].reduceId
+ // Get block addresses
+ val newLocations = executorTracker
+ .getMapSizesByExecutorId(dep.shuffleId, reducerId)
+ .flatMap {
+ case (bmId, newBlocks) =>
+ newBlocks.map(blockInfo => (blockInfo._3, (bmId, blockInfo)))
+ }
+ .toMap
+
+ val blockLocations = new mutable.HashMap[BlockManagerId,
ArrayBuffer[(BlockId, Long, Int)]]()
+ oldBlockLocations.foreach {
+ case (_, oldBlocks) =>
+ oldBlocks.foreach {
+ oldBlock =>
+ val (bmId, blockInfo) = newLocations(oldBlock._3)
+ val blocksAtBM =
+ blockLocations.getOrElseUpdate(bmId, new ArrayBuffer[(BlockId,
Long, Int)]())
+ blocksAtBM.append(blockInfo)
+ }
+ }
+
+ blockLocations.iterator
+ } else {
+ split.asInstanceOf[ShuffleBlockRDDPartition].blocks.iterator
+ }
+
+ val reader = new GlutenOptimizedWriterShuffleReader(dep, context, blocks,
sqlMetricsReporter)
+ reader.read().map(_._2)
+ }
+
+ override def clearDependencies(): Unit = {
+ super.clearDependencies()
+ dep = null
+ }
+}
+
+/** A simplified implementation of the `BlockStoreShuffleReader` for reading
shuffle blocks. */
+private class GlutenOptimizedWriterShuffleReader(
+ dep: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
+ context: TaskContext,
+ blocks: Iterator[(BlockManagerId, ArrayBuffer[(BlockId, Long, Int)])],
+ readMetrics: ShuffleReadMetricsReporter)
+ extends ShuffleReader[Int, ColumnarBatch] {
+
+ /** Read the combined key-values for this reduce task */
+ override def read(): Iterator[Product2[Int, ColumnarBatch]] = {
+ val serializerManager = dep match {
+ case _: ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] =>
+ ColumnarShuffleManager.bypassDecompressionSerializerManger
+ case _ =>
+ SparkEnv.get.serializerManager
+ }
+ val wrappedStreams = new ShuffleBlockFetcherIterator(
+ context,
+ SparkEnv.get.blockManager.blockStoreClient,
+ SparkEnv.get.blockManager,
+ SparkEnv.get.mapOutputTracker,
+ blocks,
+ serializerManager.wrapStream,
+ // Note: we use getSizeAsMb when no suffix is provided for backwards
compatibility
+ SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
+ SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
+ SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
+ SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
+ SparkEnv.get.conf.get(config.SHUFFLE_MAX_ATTEMPTS_ON_NETTY_OOM),
+ SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
+ SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT_MEMORY),
+ SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ENABLED),
+ SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM),
+ readMetrics,
+ false
+ ).toCompletionIterator
+
+ // Create a key/value iterator for each stream
+ val recordIter = dep match {
+ case columnarDep: ColumnarShuffleDependency[Int, ColumnarBatch,
ColumnarBatch] =>
+ // If the dependency is a ColumnarShuffleDependency, we use the
columnar serializer.
+ columnarDep.serializer
+ .newInstance()
+ .asInstanceOf[ColumnarBatchSerializerInstance]
+ .deserializeStreams(wrappedStreams)
+ .asKeyValueIterator
+ case _ =>
+ val serializerInstance = dep.serializer.newInstance()
+ // Create a key/value iterator for each stream
+ wrappedStreams.flatMap {
+ case (blockId, wrappedStream) =>
+ // Note: the asKeyValueIterator below wraps a key/value iterator
inside of a
+ // NextIterator. The NextIterator makes sure that close() is
called on the
+ // underlying InputStream when all records have been read.
+
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
+ }
+ }
+ // An interruptible iterator must be used here in order to support task
cancellation
+ new InterruptibleIterator[(Any, Any)](context, recordIter)
+ .asInstanceOf[Iterator[Product2[Int, ColumnarBatch]]]
+ }
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala
new file mode 100644
index 0000000000..2391f48d1b
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.clustering
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.DeltaLog
+import org.apache.spark.sql.delta.actions.AddFile
+import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtils
+import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ClusteredTableClusteringSuite
+ extends SparkFunSuite
+ with SharedSparkSession
+ with ClusteredTableTestUtils
+ with DeltaSQLCommandTest {
+ import testImplicits._
+
+ private val table: String = "test_table"
+
+ // Ingest data to create numFiles files with one row in each file.
+ private def addFiles(table: String, numFiles: Int): Unit = {
+ val df = (1 to numFiles).map(i => (i, i)).toDF("col1", "col2")
+ withSQLConf(SQLConf.MAX_RECORDS_PER_FILE.key -> "1") {
+ df.write.format("delta").mode("append").saveAsTable(table)
+ }
+ }
+
+ private def getFiles(table: String): Set[AddFile] = {
+ val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
+ deltaLog.update().allFiles.collect().toSet
+ }
+
+ private def assertClustered(files: Set[AddFile]): Unit = {
+
assert(files.forall(_.clusteringProvider.contains(ClusteredTableUtils.clusteringProvider)))
+ }
+
+ private def assertNotClustered(files: Set[AddFile]): Unit = {
+ assert(files.forall(_.clusteringProvider.isEmpty))
+ }
+
+ test("optimize clustered table") {
+ withSQLConf(SQLConf.MAX_RECORDS_PER_FILE.key -> "2") {
+ withClusteredTable(table = table, schema = "col1 int, col2 int",
clusterBy = "col1, col2") {
+ addFiles(table, numFiles = 4)
+ val files0 = getFiles(table)
+ assert(files0.size === 4)
+ assertNotClustered(files0)
+
+ // Optimize should cluster the data into two 2 files since
MAX_RECORDS_PER_FILE is 2.
+ runOptimize(table) {
+ metrics =>
+ assert(metrics.numFilesRemoved == 4)
+ assert(metrics.numFilesAdded == 2)
+ }
+
+ val files1 = getFiles(table)
+ assert(files1.size == 2)
+ assertClustered(files1)
+ }
+ }
+ }
+
+ test("cluster by 1 column") {
+ withSQLConf(SQLConf.MAX_RECORDS_PER_FILE.key -> "2") {
+ withClusteredTable(table = table, schema = "col1 int, col2 int",
clusterBy = "col1") {
+ addFiles(table, numFiles = 4)
+ val files0 = getFiles(table)
+ assert(files0.size === 4)
+ assertNotClustered(files0)
+
+ // Optimize should cluster the data into two 2 files since
MAX_RECORDS_PER_FILE is 2.
+ runOptimize(table) {
+ metrics =>
+ assert(metrics.numFilesRemoved == 4)
+ assert(metrics.numFilesAdded == 2)
+ }
+
+ val files1 = getFiles(table)
+ assert(files1.size == 2)
+ assertClustered(files1)
+ }
+ }
+ }
+
+ test("optimize clustered table with batching") {
+ Seq(("1", 2), ("1g", 1)).foreach {
+ case (batchSize, optimizeCommits) =>
+ withClusteredTable(table = table, schema = "col1 int, col2 int",
clusterBy = "col1, col2") {
+ addFiles(table, numFiles = 4)
+ val files0 = getFiles(table)
+ assert(files0.size === 4)
+ assertNotClustered(files0)
+
+ val totalSize = files0.toSeq.map(_.size).sum
+ val halfSize = totalSize / 2
+
+ withSQLConf(
+ DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE.key -> batchSize,
+ DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key ->
halfSize.toString,
+ DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_TARGET_CUBE_SIZE.key ->
halfSize.toString
+ ) {
+ // Optimize should create 2 cubes, which will be in separate
batches if the batch size
+ // is small enough
+ runOptimize(table) {
+ metrics =>
+ assert(metrics.numFilesRemoved == 4)
+ assert(metrics.numFilesAdded == 2)
+ }
+
+ val files1 = getFiles(table)
+ assert(files1.size == 2)
+ assertClustered(files1)
+
+ val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
+
+ val commits = deltaLog.history.getHistory(None)
+ assert(commits.filter(_.operation == "OPTIMIZE").length ==
optimizeCommits)
+ }
+ }
+ }
+ }
+
+ test("optimize clustered table with batching on an empty table") {
+ withClusteredTable(table = table, schema = "col1 int, col2 int", clusterBy
= "col1, col2") {
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE.key -> "1g") {
+ runOptimize(table) {
+ metrics =>
+ assert(metrics.numFilesRemoved == 0)
+ assert(metrics.numFilesAdded == 0)
+ }
+ }
+ }
+ }
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala
new file mode 100644
index 0000000000..590787b6bd
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.coordinatedcommits
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaTestUtilsBase}
+import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
+import org.apache.spark.sql.delta.util.JsonUtils
+import org.apache.spark.sql.test.SharedSparkSession
+
+import io.delta.storage.LogStore
+import io.delta.storage.commit.{CommitCoordinatorClient =>
JCommitCoordinatorClient, CommitResponse, GetCommitsResponse =>
JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions}
+import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import java.util.Optional
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+trait CoordinatedCommitsTestUtils extends DeltaTestUtilsBase {
+ self: SparkFunSuite with SharedSparkSession =>
+
+ protected val defaultCommitsCoordinatorName = "tracking-in-memory"
+ protected val defaultCommitsCoordinatorConf = Map("randomConf" ->
"randomConfValue")
+
+ def getCoordinatedCommitsDefaultProperties(withICT: Boolean = false):
Map[String, String] = {
+ val coordinatedCommitsConfJson =
JsonUtils.toJson(defaultCommitsCoordinatorConf)
+ val properties = Map(
+ DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key ->
defaultCommitsCoordinatorName,
+ DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key ->
coordinatedCommitsConfJson,
+ DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF.key -> "{}"
+ )
+ if (withICT) {
+ properties + (DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key -> "true")
+ } else {
+ properties
+ }
+ }
+
+ /**
+ * Runs a specific test with coordinated commits default properties unset.
Any table created in
+ * this test won't have coordinated commits enabled by default.
+ */
+ def testWithDefaultCommitCoordinatorUnset(testName: String)(f: => Unit):
Unit = {
+ test(testName) {
+ withoutCoordinatedCommitsDefaultTableProperties {
+ f
+ }
+ }
+ }
+
+ /**
+ * Runs the function `f` with coordinated commits default properties unset.
Any table created in
+ * function `f` won't have coordinated commits enabled by default.
+ */
+ def withoutCoordinatedCommitsDefaultTableProperties[T](f: => T): T = {
+ val defaultCoordinatedCommitsConfs = CoordinatedCommitsUtils
+ .getDefaultCCConfigurations(spark, withDefaultKey = true)
+ defaultCoordinatedCommitsConfs.foreach {
+ case (defaultKey, _) =>
+ spark.conf.unset(defaultKey)
+ }
+ try { f }
+ finally {
+ defaultCoordinatedCommitsConfs.foreach {
+ case (defaultKey, oldValue) =>
+ spark.conf.set(defaultKey, oldValue)
+ }
+ }
+ }
+
+ /**
+ * Runs the function `f` with coordinated commits default properties set to
what is specified. Any
+ * table created in function `f` will have the `commitCoordinator` property
set to the specified
+ * `commitCoordinatorName`.
+ */
+ def withCustomCoordinatedCommitsTableProperties(
+ commitCoordinatorName: String,
+ conf: Map[String, String] = Map("randomConf" -> "randomConfValue"))(f:
=> Unit): Unit = {
+ withSQLConf(
+
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey ->
+ commitCoordinatorName,
+
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey ->
+ JsonUtils.toJson(conf)
+ ) {
+ f
+ }
+ }
+
+ /** Run the test with different backfill batch sizes: 1, 2, 10 */
+ def testWithDifferentBackfillInterval(testName: String)(f: Int => Unit):
Unit = {
+ Seq(1, 2, 10).foreach {
+ backfillBatchSize =>
+ test(s"$testName [Backfill batch size: $backfillBatchSize]") {
+ CommitCoordinatorProvider.clearNonDefaultBuilders()
+ CommitCoordinatorProvider.registerBuilder(
+ TrackingInMemoryCommitCoordinatorBuilder(backfillBatchSize))
+ CommitCoordinatorProvider.registerBuilder(
+ InMemoryCommitCoordinatorBuilder(backfillBatchSize))
+ f(backfillBatchSize)
+ }
+ }
+ }
+
+ /**
+ * Run the test against a [[TrackingCommitCoordinatorClient]] with backfill
batch size =
+ * `batchBackfillSize`
+ */
+ def testWithCoordinatedCommits(backfillBatchSize: Int)(testName: String)(f:
=> Unit): Unit = {
+ test(s"$testName [Backfill batch size: $backfillBatchSize]") {
+ CommitCoordinatorProvider.clearNonDefaultBuilders()
+ CommitCoordinatorProvider.registerBuilder(
+ TrackingInMemoryCommitCoordinatorBuilder(backfillBatchSize))
+ val coordinatedCommitsCoordinatorJson =
JsonUtils.toJson(defaultCommitsCoordinatorConf)
+ withSQLConf(
+
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey ->
+ defaultCommitsCoordinatorName,
+
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey ->
+ coordinatedCommitsCoordinatorJson
+ ) {
+ f
+ }
+ }
+ }
+
+ /**
+ * Run the test with:
+ * 1. Without coordinated-commits 2. With coordinated-commits with
different backfill batch
+ * sizes
+ */
+ def testWithDifferentBackfillIntervalOptional(testName: String)(f:
Option[Int] => Unit): Unit = {
+ test(s"$testName [Backfill batch size: None]") {
+ f(None)
+ }
+ testWithDifferentBackfillInterval(testName) {
+ backfillBatchSize =>
+ val coordinatedCommitsCoordinatorJson =
JsonUtils.toJson(defaultCommitsCoordinatorConf)
+ withSQLConf(
+
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey ->
+ defaultCommitsCoordinatorName,
+
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey ->
+ coordinatedCommitsCoordinatorJson
+ ) {
+ f(Some(backfillBatchSize))
+ }
+ }
+ }
+
+ def getUpdatedActionsForZerothCommit(
+ commitInfo: CommitInfo,
+ oldMetadata: Metadata = Metadata()): UpdatedActions = {
+ val newMetadataConfiguration =
+ oldMetadata.configuration +
+ (DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key ->
defaultCommitsCoordinatorName)
+ val newMetadata = oldMetadata.copy(configuration =
newMetadataConfiguration)
+ new UpdatedActions(commitInfo, newMetadata, Protocol(), oldMetadata,
Protocol())
+ }
+
+ def getUpdatedActionsForNonZerothCommit(commitInfo: CommitInfo):
UpdatedActions = {
+ val updatedActions = getUpdatedActionsForZerothCommit(commitInfo)
+ new UpdatedActions(
+ updatedActions.getCommitInfo,
+ updatedActions.getNewMetadata,
+ updatedActions.getNewProtocol,
+ updatedActions.getNewMetadata,
+ updatedActions.getOldProtocol
+ )
+ }
+}
+
+case class TrackingInMemoryCommitCoordinatorBuilder(
+ batchSize: Long,
+ defaultCommitCoordinatorClientOpt: Option[JCommitCoordinatorClient] = None,
+ defaultCommitCoordinatorName: String = "tracking-in-memory")
+ extends CommitCoordinatorBuilder {
+ lazy val trackingInMemoryCommitCoordinatorClient =
+ defaultCommitCoordinatorClientOpt.getOrElse {
+ new TrackingCommitCoordinatorClient(
+ new PredictableUuidInMemoryCommitCoordinatorClient(batchSize))
+ }
+
+ override def getName: String = defaultCommitCoordinatorName
+ override def build(spark: SparkSession, conf: Map[String, String]):
JCommitCoordinatorClient = {
+ trackingInMemoryCommitCoordinatorClient
+ }
+}
+
+case class TrackingGenericInMemoryCommitCoordinatorBuilder(
+ builderName: String,
+ realBuilder: CommitCoordinatorBuilder)
+ extends CommitCoordinatorBuilder {
+ override def getName: String = builderName
+
+ override def build(spark: SparkSession, conf: Map[String, String]):
JCommitCoordinatorClient = {
+ new TrackingCommitCoordinatorClient(realBuilder.build(spark, conf))
+ }
+}
+
+class PredictableUuidInMemoryCommitCoordinatorClient(batchSize: Long)
+ extends InMemoryCommitCoordinator(batchSize) {
+
+ var nextUuidSuffix = 1L
+ override def generateUUID(): String = {
+ nextUuidSuffix += 1
+ s"uuid-${nextUuidSuffix - 1}"
+ }
+}
+
+object TrackingCommitCoordinatorClient {
+ private val insideOperation = new ThreadLocal[Boolean] {
+ override def initialValue(): Boolean = false
+ }
+}
+
+class TrackingCommitCoordinatorClient(
+ val delegatingCommitCoordinatorClient: JCommitCoordinatorClient)
+ extends JCommitCoordinatorClient {
+
+ val numCommitsCalled = new AtomicInteger(0)
+ val numGetCommitsCalled = new AtomicInteger(0)
+ val numBackfillToVersionCalled = new AtomicInteger(0)
+ val numRegisterTableCalled = new AtomicInteger(0)
+
+ def recordOperation[T](op: String)(f: => T): T = {
+ val oldInsideOperation =
TrackingCommitCoordinatorClient.insideOperation.get()
+ try {
+ if (!TrackingCommitCoordinatorClient.insideOperation.get()) {
+ op match {
+ case "commit" => numCommitsCalled.incrementAndGet()
+ case "getCommits" => numGetCommitsCalled.incrementAndGet()
+ case "backfillToVersion" =>
numBackfillToVersionCalled.incrementAndGet()
+ case "registerTable" => numRegisterTableCalled.incrementAndGet()
+ case _ => ()
+ }
+ }
+ TrackingCommitCoordinatorClient.insideOperation.set(true)
+ f
+ } finally {
+ TrackingCommitCoordinatorClient.insideOperation.set(oldInsideOperation)
+ }
+ }
+
+ override def commit(
+ logStore: LogStore,
+ hadoopConf: Configuration,
+ tableDesc: TableDescriptor,
+ commitVersion: Long,
+ actions: java.util.Iterator[String],
+ updatedActions: UpdatedActions): CommitResponse =
recordOperation("commit") {
+ delegatingCommitCoordinatorClient.commit(
+ logStore,
+ hadoopConf,
+ tableDesc,
+ commitVersion,
+ actions,
+ updatedActions)
+ }
+
+ override def getCommits(
+ tableDesc: TableDescriptor,
+ startVersion: java.lang.Long,
+ endVersion: java.lang.Long): JGetCommitsResponse =
recordOperation("getCommits") {
+ delegatingCommitCoordinatorClient.getCommits(tableDesc, startVersion,
endVersion)
+ }
+
+ override def backfillToVersion(
+ logStore: LogStore,
+ hadoopConf: Configuration,
+ tableDesc: TableDescriptor,
+ version: Long,
+ lastKnownBackfilledVersion: java.lang.Long): Unit =
recordOperation("backfillToVersion") {
+ delegatingCommitCoordinatorClient.backfillToVersion(
+ logStore,
+ hadoopConf,
+ tableDesc,
+ version,
+ lastKnownBackfilledVersion)
+ }
+
+ override def semanticEquals(other: JCommitCoordinatorClient): Boolean = {
+ other match {
+ case otherTracking: TrackingCommitCoordinatorClient =>
+ delegatingCommitCoordinatorClient.semanticEquals(
+ otherTracking.delegatingCommitCoordinatorClient)
+ case _ =>
+ delegatingCommitCoordinatorClient.semanticEquals(other)
+ }
+ }
+
+ def reset(): Unit = {
+ numCommitsCalled.set(0)
+ numGetCommitsCalled.set(0)
+ numBackfillToVersionCalled.set(0)
+ }
+
+ override def registerTable(
+ logPath: Path,
+ tableIdentifier: Optional[TableIdentifier],
+ currentVersion: Long,
+ currentMetadata: AbstractMetadata,
+ currentProtocol: AbstractProtocol): java.util.Map[String, String] =
+ recordOperation("registerTable") {
+ delegatingCommitCoordinatorClient.registerTable(
+ logPath,
+ tableIdentifier,
+ currentVersion,
+ currentMetadata,
+ currentProtocol)
+ }
+}
+
+/**
+ * A helper class which enables coordinated-commits for the test suite based
on the given
+ * `coordinatedCommitsBackfillBatchSize` conf.
+ */
+trait CoordinatedCommitsBaseSuite
+ extends SparkFunSuite
+ with SharedSparkSession
+ with CoordinatedCommitsTestUtils {
+
+ // If this config is not overridden, coordinated commits are disabled.
+ def coordinatedCommitsBackfillBatchSize: Option[Int] = None
+
+ final def coordinatedCommitsEnabledInTests: Boolean =
coordinatedCommitsBackfillBatchSize.nonEmpty
+
+ // Keeps track of the number of table names pointing to the location.
+ protected val locRefCount: mutable.Map[String, Int] = mutable.Map.empty
+
+ // In case some tests reuse the table path/name with DROP table, this method
can be used to
+ // clean the table data in the commit coordinator. Note that we should call
this before
+ // the table actually gets DROP.
+ def deleteTableFromCommitCoordinator(tableName: String): Unit = {
+ val cc = CommitCoordinatorProvider.getCommitCoordinatorClient(
+ defaultCommitsCoordinatorName,
+ defaultCommitsCoordinatorConf,
+ spark)
+ assert(
+ cc.isInstanceOf[TrackingCommitCoordinatorClient],
+ s"Please implement delete/drop method for coordinator:
${cc.getClass.getName}")
+ val location =
+ try {
+ spark
+ .sql(s"describe detail $tableName")
+ .select("location")
+ .first()
+ .getAs[String](0)
+ } catch {
+ case NonFatal(_) =>
+ // Ignore if the table does not exist/broken.
+ return
+ }
+ val locKey = location.stripPrefix("file:")
+ if (locRefCount.contains(locKey)) {
+ locRefCount(locKey) -= 1
+ }
+ // When we create an external table in a location where some table already
existed, two table
+ // names could be pointing to the same location. We should only clean up
the table data in the
+ // commit coordinator when the last table name pointing to the location is
dropped.
+ if (locRefCount.getOrElse(locKey, 0) == 0) {
+ val logPath = location + "/_delta_log"
+ cc.asInstanceOf[TrackingCommitCoordinatorClient]
+ .delegatingCommitCoordinatorClient
+ .asInstanceOf[InMemoryCommitCoordinator]
+ .dropTable(new Path(logPath))
+ }
+ DeltaLog.clearCache()
+ }
+
+ override protected def sparkConf: SparkConf = {
+ if (coordinatedCommitsBackfillBatchSize.nonEmpty) {
+ val coordinatedCommitsCoordinatorJson =
JsonUtils.toJson(defaultCommitsCoordinatorConf)
+ super.sparkConf
+ .set(
+
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey,
+ defaultCommitsCoordinatorName)
+ .set(
+
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey,
+ coordinatedCommitsCoordinatorJson)
+ } else {
+ super.sparkConf
+ }
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ CommitCoordinatorProvider.clearNonDefaultBuilders()
+ coordinatedCommitsBackfillBatchSize.foreach {
+ batchSize =>
+ CommitCoordinatorProvider.registerBuilder(
+ TrackingInMemoryCommitCoordinatorBuilder(batchSize))
+ }
+ DeltaLog.clearCache()
+ }
+
+ protected def isICTEnabledForNewTables: Boolean = {
+ spark.conf
+
.getOption(DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey)
+ .nonEmpty ||
+ spark.conf
+
.getOption(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey)
+ .contains("true")
+ }
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala
new file mode 100644
index 0000000000..0f601043f6
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.skipping
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
+import org.apache.spark.sql.delta.DeltaOperations
+import org.apache.spark.sql.delta.DeltaOperations.{CLUSTERING_PARAMETER_KEY,
ZORDER_PARAMETER_KEY}
+import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
+import
org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
+import org.apache.spark.sql.delta.hooks.UpdateCatalog
+import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils,
ClusteringColumn, ClusteringColumnInfo}
+import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.util.JsonUtils
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+import org.junit.Assert.assertEquals
+
+trait ClusteredTableTestUtilsBase
+ extends SparkFunSuite
+ with SharedSparkSession
+ with CoordinatedCommitsBaseSuite {
+ import testImplicits._
+
+ /**
+ * Helper for running optimize on the table with different APIs.
+ * @param table
+ * the name of table
+ */
+ def optimizeTable(table: String): DataFrame = {
+ sql(s"OPTIMIZE $table")
+ }
+
+ /**
+ * Runs optimize on the table and calls postHook on the metrics.
+ * @param table
+ * the name of table
+ * @param postHook
+ * callback triggered with OptimizeMetrics returned by the OPTIMIZE command
+ */
+ def runOptimize(table: String)(postHook: OptimizeMetrics => Unit): Unit = {
+ // Verify Delta history operation parameters' clusterBy
+ val isPathBasedTable = table.startsWith("tahoe.") ||
table.startsWith("delta.")
+ var (deltaLog, snapshot) = if (isPathBasedTable) {
+ // Path based table e.g. delta.`path-to-directory` or
tahoe.`path-to-directory`. Strip
+ // 6 characters to extract table path.
+ DeltaLog.forTableWithSnapshot(spark, table.drop(6).replace("`", ""))
+ } else {
+ DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table))
+ }
+ val beforeVersion = snapshot.version
+
+
postHook(optimizeTable(table).select($"metrics.*").as[OptimizeMetrics].head())
+ snapshot = deltaLog.update()
+ val afterVersion = snapshot.version
+
+ val shouldCheckFullStatus =
deltaLog.history.getHistory(Some(1)).headOption.exists {
+ h =>
+ Seq(
+ DeltaOperations.OPTIMIZE_OPERATION_NAME
+ ).contains(h.operation)
+ }
+
+ // Note: Only expect isFull status when the table has non-empty clustering
columns and
+ // clustering table feature, otherwise the OPTIMIZE will fall back to
compaction and
+ // isFull status will not be relevant anymore.
+ val expectedOperationParameters = ClusteredTableUtils
+ .getClusteringColumnsOptional(snapshot)
+ .filter {
+ cols =>
+ cols.nonEmpty &&
+ shouldCheckFullStatus &&
+ ClusteredTableUtils.isSupported(snapshot.protocol) &&
+ afterVersion > beforeVersion
+ }
+ .map(_ => Map(DeltaOperations.CLUSTERING_IS_FULL_KEY -> false))
+ .getOrElse(Map.empty)
+ verifyDescribeHistoryOperationParameters(
+ table,
+ expectedOperationParameters = expectedOperationParameters)
+ }
+
+ /**
+ * Runs optimize full on the table and calls postHook on the metrics.
+ *
+ * @param table
+ * the name of table
+ * @param postHook
+ * callback triggered with OptimizeMetrics returned by the OPTIMIZE command
+ */
+ def runOptimizeFull(table: String)(postHook: OptimizeMetrics => Unit): Unit
= {
+ postHook(sql(s"OPTIMIZE $table
FULL").select($"metrics.*").as[OptimizeMetrics].head())
+
+ // Verify Delta history operation parameters' clusterBy
+ verifyDescribeHistoryOperationParameters(
+ table,
+ expectedOperationParameters = Map(DeltaOperations.CLUSTERING_IS_FULL_KEY
-> true))
+ }
+
+ def verifyClusteringColumnsInDomainMetadata(
+ snapshot: Snapshot,
+ logicalColumnNames: Seq[String]): Unit = {
+ val expectedClusteringColumns =
logicalColumnNames.map(ClusteringColumn(snapshot.schema, _))
+ val actualClusteringColumns =
+ ClusteredTableUtils.getClusteringColumnsOptional(snapshot).orNull
+ assert(expectedClusteringColumns == actualClusteringColumns)
+ }
+
+ // Verify the operation parameters of the last history event contains
`clusterBy`.
+ protected def verifyDescribeHistoryOperationParameters(
+ table: String,
+ expectedOperationParameters: Map[String, Any] = Map.empty): Unit = {
+ val clusterBySupportedOperations = Set(
+ "CREATE TABLE",
+ "REPLACE TABLE",
+ "CREATE OR REPLACE TABLE",
+ "CREATE TABLE AS SELECT",
+ "REPLACE TABLE AS SELECT",
+ "CREATE OR REPLACE TABLE AS SELECT")
+
+ val isPathBasedTable = table.startsWith("tahoe.") ||
table.startsWith("delta.")
+ val (deltaLog, snapshot) = if (isPathBasedTable) {
+ // Path based table.
+ DeltaLog.forTableWithSnapshot(spark, table.drop(6).replace("`", ""))
+ } else {
+ DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table))
+ }
+ val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol)
+ val clusteringColumns =
+ ClusteringColumnInfo.extractLogicalNames(snapshot)
+ val expectedClusterBy = JsonUtils.toJson(clusteringColumns)
+ val expectClustering = isClusteredTable && clusteringColumns.nonEmpty
+
+ val lastEvent = deltaLog.history.getHistory(Some(1)).head
+ val lastOperationParameters = lastEvent.operationParameters
+
+ def doAssert(assertion: => Boolean): Unit = {
+ val debugMsg = "verifyDescribeHistoryOperationParameters DEBUG: " +
+ "assert failed. Please check the expected behavior and " +
+ "add the operation to the appropriate case in " +
+ "verifyDescribeHistoryOperationParameters. " +
+ s"table: $table, lastOperation: ${lastEvent.operation} " +
+ s"lastOperationParameters: $lastOperationParameters " +
+ s"expectedOperationParameters: $expectedOperationParameters"
+ try {
+ assert(assertion, debugMsg)
+ } catch {
+ case e: Throwable =>
+ throw new Throwable(debugMsg, e)
+ }
+ }
+
+ // Check clusterBy exists and matches the expected clusterBy.
+ def assertClusterByExists(): Unit = {
+ doAssert(lastOperationParameters(CLUSTERING_PARAMETER_KEY) ===
expectedClusterBy)
+ }
+
+ // Check clusterBy does not exist or is empty.
+ def assertClusterByEmptyOrNotExists(): Unit = {
+ doAssert(
+ !lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY) ||
+ lastOperationParameters(CLUSTERING_PARAMETER_KEY) === "[]")
+ }
+
+ // Check clusterBy does not exist.
+ def assertClusterByNotExist(): Unit = {
+ doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
+ }
+
+ // Validate caller provided operator parameters from the last commit.
+ for ((operationParameterKey, value) <- expectedOperationParameters) {
+ // Convert value to string since value is stored as toString in
operationParameters.
+ doAssert(lastOperationParameters(operationParameterKey) ===
value.toString)
+ }
+
+ // Check clusterBy
+ lastEvent.operation match {
+ case "CLUSTER BY" =>
+ // Operation is [[DeltaOperations.ClusterBy]] - ALTER TABLE CLUSTER BY
+ doAssert(
+ lastOperationParameters("newClusteringColumns") ===
clusteringColumns.mkString(",")
+ )
+ case "OPTIMIZE" =>
+ if (expectClustering) {
+ doAssert(lastOperationParameters(CLUSTERING_PARAMETER_KEY) ===
expectedClusterBy)
+ doAssert(lastOperationParameters(ZORDER_PARAMETER_KEY) === "[]")
+ } else {
+ // If the table clusters by NONE, OPTIMIZE will be a regular
compaction.
+ // In this case, both clustering and z-order parameters should be
empty.
+ doAssert(lastOperationParameters(CLUSTERING_PARAMETER_KEY) === "[]")
+ doAssert(lastOperationParameters(ZORDER_PARAMETER_KEY) === "[]")
+ }
+ case "CLONE" =>
+ // CLUSTER BY not in operation parameters for CLONE - similar to
PARTITION BY.
+ doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
+ case o if clusterBySupportedOperations.contains(o) =>
+ if (expectClustering) {
+ assertClusterByExists()
+ } else if (isClusteredTable && clusteringColumns.isEmpty) {
+ assertClusterByEmptyOrNotExists()
+ } else {
+ assertClusterByNotExist()
+ }
+ case "WRITE" | "RESTORE" =>
+ // These are known operations from our tests that don't have clusterBy.
+ doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
+ case _ =>
+ // Other operations are not tested yet. If the test fails here, please
check the expected
+ // behavior and add the operation to the appropriate case.
+ doAssert(false)
+ }
+ }
+
+ protected def deleteTableFromCommitCoordinatorIfNeeded(table: String): Unit
= {
+ if (coordinatedCommitsEnabledInTests) {
+ // Clean up the table data in commit coordinator because DROP/REPLACE
TABLE does not bother
+ // commit coordinator.
+ deleteTableFromCommitCoordinator(table)
+ }
+ }
+
+ override def withTable(tableNames: String*)(f: => Unit): Unit = {
+ Utils.tryWithSafeFinally(f) {
+ tableNames.foreach {
+ name =>
+ deleteTableFromCommitCoordinatorIfNeeded(name)
+ spark.sql(s"DROP TABLE IF EXISTS $name")
+ }
+ }
+ }
+
+ def withClusteredTable[T](
+ table: String,
+ schema: String,
+ clusterBy: String,
+ tableProperties: Map[String, String] = Map.empty,
+ location: Option[String] = None)(f: => T): T = {
+ createOrReplaceClusteredTable("CREATE", table, schema, clusterBy,
tableProperties, location)
+
+ Utils.tryWithSafeFinally(f) {
+ deleteTableFromCommitCoordinatorIfNeeded(table)
+ spark.sql(s"DROP TABLE IF EXISTS $table")
+ }
+ }
+
+ /**
+ * Helper for creating or replacing table with different APIs.
+ * @param clause
+ * clause for SQL API ('CREATE', 'REPLACE', 'CREATE OR REPLACE')
+ * @param table
+ * the name of table
+ * @param schema
+ * comma separated list of "colName dataType"
+ * @param clusterBy
+ * comma separated list of clustering columns
+ */
+ def createOrReplaceClusteredTable(
+ clause: String,
+ table: String,
+ schema: String,
+ clusterBy: String,
+ tableProperties: Map[String, String] = Map.empty,
+ location: Option[String] = None): Unit = {
+ val locationClause = if (location.isEmpty) "" else s"LOCATION
'${location.get}'"
+ val tablePropertiesClause = if (!tableProperties.isEmpty) {
+ val tablePropertiesString =
+ tableProperties.map { case (key, value) => s"'$key' = '$value'"
}.mkString(",")
+ s"TBLPROPERTIES($tablePropertiesString)"
+ } else {
+ ""
+ }
+ spark.sql(
+ s"$clause TABLE $table ($schema) USING delta CLUSTER BY ($clusterBy) " +
+ s"$tablePropertiesClause $locationClause")
+ location.foreach(loc => locRefCount(loc) = locRefCount.getOrElse(loc, 0) +
1)
+ }
+
+ protected def createOrReplaceAsSelectClusteredTable(
+ clause: String,
+ table: String,
+ srcTable: String,
+ clusterBy: String,
+ location: Option[String] = None): Unit = {
+ val locationClause = if (location.isEmpty) "" else s"LOCATION
'${location.get}'"
+ spark.sql(
+ s"$clause TABLE $table USING delta CLUSTER BY ($clusterBy) " +
+ s"$locationClause AS SELECT * FROM $srcTable")
+ }
+
+ def verifyClusteringColumns(
+ tableIdentifier: TableIdentifier,
+ expectedLogicalClusteringColumns: Seq[String],
+ skipCatalogCheck: Boolean = false
+ ): Unit = {
+ val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier)
+ verifyClusteringColumnsInternal(
+ snapshot,
+ tableIdentifier.table,
+ expectedLogicalClusteringColumns
+ )
+
+ if (skipCatalogCheck) {
+ return
+ }
+
+ val updateCatalogEnabled =
spark.conf.get(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED)
+ assert(
+ updateCatalogEnabled,
+ "need to enable [[DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED]] to verify
catalog updates.")
+ UpdateCatalog.awaitCompletion(10000)
+ val catalog = spark.sessionState.catalog
+ catalog.refreshTable(tableIdentifier)
+ val table = catalog.getTableMetadata(tableIdentifier)
+
+ // Verify CatalogTable's clusterBySpec.
+ assert(ClusteredTableUtils.getClusterBySpecOptional(table).isDefined)
+ assertEquals(
+ ClusterBySpec.fromColumnNames(expectedLogicalClusteringColumns),
+ ClusteredTableUtils.getClusterBySpecOptional(table).get)
+ }
+
+ def verifyClusteringColumns(
+ dataPath: String,
+ expectedLogicalClusteringColumns: Seq[String]): Unit = {
+ val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, dataPath)
+ verifyClusteringColumnsInternal(
+ snapshot,
+ s"delta.`$dataPath`",
+ expectedLogicalClusteringColumns
+ )
+ }
+
+ def verifyClusteringColumnsInternal(
+ snapshot: Snapshot,
+ tableNameOrPath: String,
+ expectedLogicalClusteringColumns: Seq[String]
+ ): Unit = {
+ assert(ClusteredTableUtils.isSupported(snapshot.protocol) === true)
+ verifyClusteringColumnsInDomainMetadata(snapshot,
expectedLogicalClusteringColumns)
+
+ // Verify Delta history operation parameters' clusterBy
+ verifyDescribeHistoryOperationParameters(
+ tableNameOrPath
+ )
+
+ // Verify DESCRIBE DETAIL's properties doesn't contain the
"clusteringColumns" key.
+ val describeDetailProps = sql(s"describe detail $tableNameOrPath")
+ .select("properties")
+ .first
+ .getAs[Map[String, String]](0)
+
assert(!describeDetailProps.contains(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS))
+
+ // Verify SHOW TBLPROPERTIES contains the correct clustering columns.
+ val clusteringColumnsVal =
+ sql(s"show tblproperties $tableNameOrPath")
+ .filter($"key" === ClusteredTableUtils.PROP_CLUSTERING_COLUMNS)
+ .select("value")
+ .first
+ .getString(0)
+ val clusterBySpec = ClusterBySpec
+ .fromProperties(Map(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS ->
clusteringColumnsVal))
+ .get
+ assert(expectedLogicalClusteringColumns ===
clusterBySpec.columnNames.map(_.toString))
+ }
+}
+
+trait ClusteredTableTestUtils extends ClusteredTableTestUtilsBase
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index bb2c9c6c38..10a7f6f3ea 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -370,7 +370,15 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val newShuffle = shuffle.outputPartitioning match {
case HashPartitioning(exprs, _) =>
- val hashExpr = new Murmur3Hash(exprs)
+ val hashExpr = if (exprs.isEmpty) {
+ // In Spark, a hash expression with empty input is not resolvable
and an
+ // `WRONG_NUM_ARGS.WITHOUT_SUGGESTION` error will be reported when
validating the project
+ // transformer. So we directly return the seed here, which is the
intended hashed value
+ // for empty input given Spark's murmur3 hash logic.
+ Literal(new Murmur3Hash(Nil).seed, IntegerType)
+ } else {
+ new Murmur3Hash(exprs)
+ }
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
child.output
val projectTransformer = ProjectExecTransformer(projectList, child)
val validationResult = projectTransformer.doValidate()
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index a770fdf7a3..134db93e70 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -163,7 +163,9 @@ case class ColumnarCollapseTransformStages(glutenConf:
GlutenConfig) extends Rul
/** Inserts an InputIteratorTransformer on top of those that do not support
transform. */
private def insertInputIteratorTransformer(plan: SparkPlan): SparkPlan = {
plan match {
- case p if !supportTransform(p) =>
+ case p if p.isInstanceOf[WholeStageTransformer] || !supportTransform(p)
=>
+ // TODO: if p.isInstanceOf[WholeStageTransformer], we can merge two
whole stage
+ // transformers.
ColumnarCollapseTransformStages.wrapInputIteratorTransformer(insertWholeStageTransformer(p))
case p =>
p.withNewChildren(p.children.map(insertInputIteratorTransformer))
@@ -172,6 +174,7 @@ case class ColumnarCollapseTransformStages(glutenConf:
GlutenConfig) extends Rul
private def insertWholeStageTransformer(plan: SparkPlan): SparkPlan = {
plan match {
+ case wst: WholeStageTransformer => wst
case t if supportTransform(t) =>
// transformStageId will be updated by rule `GenerateTransformStageId`.
WholeStageTransformer(t.withNewChildren(t.children.map(insertInputIteratorTransformer)))(-1)
diff --git a/pom.xml b/pom.xml
index 54956693fb..b216855d84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
<delta.package.name>delta-spark</delta.package.name>
<delta.version>3.3.2</delta.version>
<delta.binary.version>33</delta.binary.version>
+ <aws-java-sdk-dynamodb.version>1.12.262</aws-java-sdk-dynamodb.version>
<celeborn.version>0.6.1</celeborn.version>
<uniffle.version>0.10.0</uniffle.version>
<arrow.version>15.0.0</arrow.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]