This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d45c8f9488 [GLUTEN-10215][VL] Delta write: Native statistics tracker
to eliminate C2R overhead (#11419)
d45c8f9488 is described below
commit d45c8f9488e64b2f728b15bad759fedcd6d437fa
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Feb 13 13:36:54 2026 +0000
[GLUTEN-10215][VL] Delta write: Native statistics tracker to eliminate C2R
overhead (#11419)
---
.../sql/delta/GlutenOptimisticTransaction.scala | 13 +-
.../delta/files/GlutenDeltaFileFormatWriter.scala | 19 +-
...la => GlutenDeltaJobStatsFallbackTracker.scala} | 51 +--
...=> GlutenDeltaJobStatsRowCountingTracker.scala} | 61 ++--
.../delta/stats/GlutenDeltaJobStatsTracker.scala | 342 +++++++++++++++++++++
.../datasources/velox/VeloxBlockStripes.java | 46 +--
cpp/core/compute/Runtime.h | 4 +
cpp/core/jni/JniWrapper.cc | 14 +
cpp/core/memory/SplitAwareColumnarBatchIterator.h | 6 +
cpp/velox/compute/VeloxRuntime.cc | 8 +
cpp/velox/compute/VeloxRuntime.h | 2 +
cpp/velox/compute/WholeStageResultIterator.cc | 7 +
cpp/velox/compute/WholeStageResultIterator.h | 6 +
.../vectorized/ColumnarBatchOutIterator.java | 24 +-
.../extension/columnar/validator/Validator.scala | 9 +-
15 files changed, 501 insertions(+), 111 deletions(-)
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
index af19d1df9f..351c2effb7 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.delta.hooks.AutoCompact
import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec,
GlutenDeltaOptimizedWriterExec}
import org.apache.spark.sql.delta.schema.InnerInvariantViolationException
import org.apache.spark.sql.delta.sources.DeltaSQLConf
-import
org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker,
GlutenDeltaJobStatisticsTracker}
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker,
FileFormatWriter, WriteJobStatsTracker}
@@ -72,8 +71,7 @@ class GlutenOptimisticTransaction(delegate:
OptimisticTransaction)
// the FileFormatWriter.write call below and will collect per-file stats
using
// StatisticsCollection
val optionalStatsTracker =
- getOptionalStatsTrackerAndStatsCollection(output, outputPath,
partitionSchema, data)._1.map(
- new GlutenDeltaJobStatisticsTracker(_))
+ getOptionalStatsTrackerAndStatsCollection(output, outputPath,
partitionSchema, data)._1
val constraints =
Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++
additionalConstraints
@@ -87,7 +85,6 @@ class GlutenOptimisticTransaction(delegate:
OptimisticTransaction)
statsDataSchema,
trackIdentityHighWaterMarks
)
- .map(new GlutenDeltaIdentityColumnStatsTracker(_))
SQLExecution.withNewExecutionId(queryExecution,
Option("deltaTransactionalWrite")) {
val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString,
Map.empty, output)
@@ -96,8 +93,8 @@ class GlutenOptimisticTransaction(delegate:
OptimisticTransaction)
convertEmptyToNullIfNeeded(queryExecution.executedPlan,
partitioningColumns, constraints)
val maybeCheckInvariants = if (constraints.isEmpty) {
// Compared to vanilla Delta, we simply avoid adding the invariant
checker
- // when the constraint list is empty, to omit the unnecessary
transitions
- // added around the invariant checker.
+ // when the constraint list is empty, to prevent the unnecessary
transitions
+ // from being added around the invariant checker.
empty2NullPlan
} else {
DeltaInvariantCheckerExec(empty2NullPlan, constraints)
@@ -206,7 +203,7 @@ class GlutenOptimisticTransaction(delegate:
OptimisticTransaction)
committer.addedStatuses.map {
a =>
a.copy(stats = optionalStatsTracker
- .map(_.delegate.recordedStats(a.toPath.getName))
+ .map(_.recordedStats(a.toPath.getName))
.getOrElse(a.stats))
}
} else {
@@ -235,7 +232,7 @@ class GlutenOptimisticTransaction(delegate:
OptimisticTransaction)
if (resultFiles.nonEmpty && !isOptimize)
registerPostCommitHook(AutoCompact)
// Record the updated high water marks to be used during transaction
commit.
identityTrackerOpt.ifDefined {
- tracker =>
updatedIdentityHighWaterMarks.appendAll(tracker.delegate.highWaterMarks.toSeq)
+ tracker =>
updatedIdentityHighWaterMarks.appendAll(tracker.highWaterMarks.toSeq)
}
resultFiles.toSeq ++ committer.changeFiles
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index 74a1e3f036..653b705bec 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -36,6 +36,7 @@ import
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
+import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -125,13 +126,14 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
val dataSchema = dataColumns.toStructType
DataSourceUtils.verifySchema(fileFormat, dataSchema)
DataSourceUtils.checkFieldNames(fileFormat, dataSchema)
- // Note: prepareWrite has side effect. It sets "job".
+ val isNativeWritable = GlutenParquetFileFormat.isNativeWritable(dataSchema)
val outputDataColumns =
if
(caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true"))
{
dataColumns ++ partitionColumns
} else dataColumns
+ // Note: prepareWrite has side effect. It sets "job".
val outputWriterFactory =
fileFormat.prepareWrite(
sparkSession,
@@ -140,6 +142,12 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
outputDataColumns.toStructType
)
+ val maybeWrappedStatsTrackers: Seq[WriteJobStatsTracker] = if
(isNativeWritable) {
+ statsTrackers.map(GlutenDeltaJobStatsTracker(_))
+ } else {
+ statsTrackers
+ }
+
val description = new WriteJobDescription(
uuid = UUID.randomUUID.toString,
serializableHadoopConf = new
SerializableConfiguration(job.getConfiguration),
@@ -157,7 +165,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
timeZoneId = caseInsensitiveOptions
.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone),
- statsTrackers = statsTrackers
+ statsTrackers = maybeWrappedStatsTrackers
)
// We should first sort by dynamic partition columns, then bucket id, and
finally sorting
@@ -222,7 +230,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
partitionColumns,
sortColumns,
orderingMatched,
- GlutenParquetFileFormat.isNativeWritable(dataSchema)
+ isNativeWritable
)
}
}
@@ -459,6 +467,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
} else {
concurrentOutputWriterSpec match {
case Some(spec) =>
+ // TODO: Concurrent writer is not yet supported.
new DynamicPartitionDataConcurrentWriter(
description,
taskAttemptContext,
@@ -468,7 +477,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
case _ =>
// Columnar-based partition writer to divide the input batch by
partition values
// and bucket IDs in advance.
- new ColumnarDynamicPartitionDataSingleWriter(description,
taskAttemptContext, committer)
+ new GlutenDynamicPartitionDataSingleWriter(description,
taskAttemptContext, committer)
}
}
@@ -525,7 +534,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
}
}
- private class ColumnarDynamicPartitionDataSingleWriter(
+ private class GlutenDynamicPartitionDataSingleWriter(
description:
WriteJobDescription,
taskAttemptContext:
TaskAttemptContext,
committer:
FileCommitProtocol,
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala
similarity index 57%
copy from
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
copy to
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala
index 30e61730c1..4eab5408fa 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala
@@ -19,17 +19,19 @@ package org.apache.spark.sql.delta.stats
import org.apache.gluten.execution.{PlaceholderRow, TerminalRow,
VeloxColumnarToRowExec}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker
import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker,
WriteTaskStats, WriteTaskStatsTracker}
import org.apache.spark.sql.execution.metric.SQLMetric
-class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker)
+/**
+ * A fallback stats tracker where a C2R converter converts all the incoming
batches to rows then
+ * send to the delegate tracker.
+ */
+private[stats] class GlutenDeltaJobStatsFallbackTracker(val delegate:
WriteJobStatsTracker)
extends WriteJobStatsTracker {
- import GlutenDeltaJobStatisticsTracker._
+ import GlutenDeltaJobStatsFallbackTracker._
override def newTaskInstance(): WriteTaskStatsTracker = {
- new GlutenDeltaTaskStatisticsTracker(
- delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker])
+ new GlutenDeltaTaskStatsFallbackTracker(delegate.newTaskInstance())
}
override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long):
Unit = {
@@ -37,43 +39,22 @@ class GlutenDeltaJobStatisticsTracker(val delegate:
DeltaJobStatisticsTracker)
}
}
-class GlutenDeltaIdentityColumnStatsTracker(override val delegate:
DeltaIdentityColumnStatsTracker)
- extends GlutenDeltaJobStatisticsTracker(delegate)
-
-private object GlutenDeltaJobStatisticsTracker {
-
- /**
- * This is a temporary implementation of statistics tracker for Delta Lake.
It's sub-optimal in
- * performance because it internally performs C2R then send rows to the
delegate row-based
- * tracker.
- *
- * TODO: Columnar-based statistics collection.
- */
- private class GlutenDeltaTaskStatisticsTracker(delegate:
DeltaTaskStatisticsTracker)
+private object GlutenDeltaJobStatsFallbackTracker {
+ private class GlutenDeltaTaskStatsFallbackTracker(delegate:
WriteTaskStatsTracker)
extends WriteTaskStatsTracker {
-
private val c2r = new VeloxColumnarToRowExec.Converter(new
SQLMetric("convertTime"))
- override def newPartition(partitionValues: InternalRow): Unit = {
+ override def newPartition(partitionValues: InternalRow): Unit =
delegate.newPartition(partitionValues)
- }
- override def newFile(filePath: String): Unit = {
- delegate.newFile(filePath)
- }
+ override def newFile(filePath: String): Unit = delegate.newFile(filePath)
- override def closeFile(filePath: String): Unit = {
- delegate.closeFile(filePath)
- }
+ override def closeFile(filePath: String): Unit =
delegate.closeFile(filePath)
- override def newRow(filePath: String, row: InternalRow): Unit = {
- row match {
- case _: PlaceholderRow =>
- case t: TerminalRow =>
- c2r.toRowIterator(t.batch()).foreach(eachRow =>
delegate.newRow(filePath, eachRow))
- case otherRow =>
- delegate.newRow(filePath, otherRow)
- }
+ override def newRow(filePath: String, row: InternalRow): Unit = row match {
+ case _: PlaceholderRow =>
+ case t: TerminalRow =>
+ c2r.toRowIterator(t.batch()).foreach(eachRow =>
delegate.newRow(filePath, eachRow))
}
override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala
similarity index 50%
rename from
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
rename to
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala
index 30e61730c1..d4086cf8e9 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala
@@ -16,20 +16,22 @@
*/
package org.apache.spark.sql.delta.stats
-import org.apache.gluten.execution.{PlaceholderRow, TerminalRow,
VeloxColumnarToRowExec}
+import org.apache.gluten.execution.{PlaceholderRow, TerminalRow}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker
import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker,
WriteTaskStats, WriteTaskStatsTracker}
-import org.apache.spark.sql.execution.metric.SQLMetric
-class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker)
+/**
+ * A fallback stats tracker to simply call `newRow` many times when a columnar
batch comes. There is
+ * no C2R process involved to save performance. Therefore, the delegate stats
tracker must not read
+ * the row in its `newRow` implementation.
+ */
+private[stats] class GlutenDeltaJobStatsRowCountingTracker(val delegate:
WriteJobStatsTracker)
extends WriteJobStatsTracker {
- import GlutenDeltaJobStatisticsTracker._
+ import GlutenDeltaJobStatsRowCountingTracker._
override def newTaskInstance(): WriteTaskStatsTracker = {
- new GlutenDeltaTaskStatisticsTracker(
- delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker])
+ new GlutenDeltaTaskStatsRowCountingTracker(delegate.newTaskInstance())
}
override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long):
Unit = {
@@ -37,43 +39,24 @@ class GlutenDeltaJobStatisticsTracker(val delegate:
DeltaJobStatisticsTracker)
}
}
-class GlutenDeltaIdentityColumnStatsTracker(override val delegate:
DeltaIdentityColumnStatsTracker)
- extends GlutenDeltaJobStatisticsTracker(delegate)
-
-private object GlutenDeltaJobStatisticsTracker {
-
- /**
- * This is a temporary implementation of statistics tracker for Delta Lake.
It's sub-optimal in
- * performance because it internally performs C2R then send rows to the
delegate row-based
- * tracker.
- *
- * TODO: Columnar-based statistics collection.
- */
- private class GlutenDeltaTaskStatisticsTracker(delegate:
DeltaTaskStatisticsTracker)
+private object GlutenDeltaJobStatsRowCountingTracker {
+ private class GlutenDeltaTaskStatsRowCountingTracker(delegate:
WriteTaskStatsTracker)
extends WriteTaskStatsTracker {
-
- private val c2r = new VeloxColumnarToRowExec.Converter(new
SQLMetric("convertTime"))
-
- override def newPartition(partitionValues: InternalRow): Unit = {
+ override def newPartition(partitionValues: InternalRow): Unit =
delegate.newPartition(partitionValues)
- }
- override def newFile(filePath: String): Unit = {
- delegate.newFile(filePath)
- }
+ override def newFile(filePath: String): Unit = delegate.newFile(filePath)
- override def closeFile(filePath: String): Unit = {
- delegate.closeFile(filePath)
- }
+ override def closeFile(filePath: String): Unit =
delegate.closeFile(filePath)
- override def newRow(filePath: String, row: InternalRow): Unit = {
- row match {
- case _: PlaceholderRow =>
- case t: TerminalRow =>
- c2r.toRowIterator(t.batch()).foreach(eachRow =>
delegate.newRow(filePath, eachRow))
- case otherRow =>
- delegate.newRow(filePath, otherRow)
- }
+ override def newRow(filePath: String, row: InternalRow): Unit = row match {
+ case _: PlaceholderRow =>
+ case t: TerminalRow =>
+ for (_ <- 0 until t.batch().numRows()) {
+ // Here we pass null row to the delegate stats tracker as we assume
+ // the delegate stats tracker only counts on row numbers.
+ delegate.newRow(filePath, null)
+ }
}
override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
new file mode 100644
index 0000000000..ae11263943
--- /dev/null
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.stats
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution._
+import org.apache.gluten.expression.{ConverterUtils, TransformerState}
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.offload.OffloadOthers
+import org.apache.gluten.extension.columnar.rewrite.PullOutPreProject
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.plan.PlanBuilder
+import org.apache.gluten.vectorized.{ArrowWritableColumnVector,
ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
AttributeReference, EmptyRow, Expression, Projection, SortOrder,
SpecificInternalRow, UnsafeProjection}
+import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Complete, DeclarativeAggregate}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
LeafExecNode, ProjectExec}
+import org.apache.spark.sql.execution.aggregate.SortAggregateExec
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker,
WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker}
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil}
+
+import com.google.common.collect.Lists
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import java.util.UUID
+import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Gluten's stats tracker with vectorized aggregation inside to produce
statistics efficiently. */
+private[stats] class GlutenDeltaJobStatsTracker(val delegate:
DeltaJobStatisticsTracker)
+ extends WriteJobStatsTracker {
+ import GlutenDeltaJobStatsTracker._
+
+ @transient private val hadoopConf: Configuration = {
+ val clazz = classOf[DeltaJobStatisticsTracker]
+ val method = clazz.getDeclaredField("hadoopConf")
+ method.setAccessible(true)
+ method.get(delegate).asInstanceOf[Configuration]
+ }
+ @transient private val path = delegate.path
+ private val dataCols = delegate.dataCols
+ private val statsColExpr = delegate.statsColExpr
+
+ private val srlHadoopConf = new SerializableConfiguration(hadoopConf)
+ private val rootUri =
path.getFileSystem(hadoopConf).makeQualified(path).toUri
+
+ override def newTaskInstance(): WriteTaskStatsTracker = {
+ val rootPath = new Path(rootUri)
+ val hadoopConf = srlHadoopConf.value
+ new GlutenDeltaTaskStatsTracker(dataCols, statsColExpr, rootPath,
hadoopConf)
+ }
+
+ override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long):
Unit = {
+ delegate.processStats(stats, jobCommitTime)
+ }
+}
+
+object GlutenDeltaJobStatsTracker extends Logging {
+ def apply(tracker: WriteJobStatsTracker): WriteJobStatsTracker = tracker
match {
+ case tracker: BasicWriteJobStatsTracker =>
+ new GlutenDeltaJobStatsRowCountingTracker(tracker)
+ case tracker: DeltaJobStatisticsTracker =>
+ new GlutenDeltaJobStatsTracker(tracker)
+ case tracker =>
+ logWarning(
+ "Gluten Delta: Creating fallback job stats tracker," +
+ " this involves frequent columnar-to-row conversions which may cause
performance" +
+ " issues.")
+ new GlutenDeltaJobStatsFallbackTracker(tracker)
+ }
+
+ /** A columnar-based statistics collection for Gluten + Delta Lake. */
+ private class GlutenDeltaTaskStatsTracker(
+ dataCols: Seq[Attribute],
+ statsColExpr: Expression,
+ rootPath: Path,
+ hadoopConf: Configuration)
+ extends WriteTaskStatsTracker {
+ // We use one single thread to ensure the statistic files are written in
serial.
+ // Do not increase the thread number, otherwise sanity will not be
guaranteed.
+ private val resultThreadRunner = Executors.newSingleThreadExecutor()
+ private val evaluator = NativePlanEvaluator.create(
+ BackendsApiManager.getBackendName,
+ Map.empty[String, String].asJava)
+ private val c2r = new VeloxColumnarToRowExec.Converter(new
SQLMetric("convertTime"))
+ private val inputBatchQueue = new SynchronousQueue[Option[ColumnarBatch]]()
+ private val aggregates: Seq[AggregateExpression] = statsColExpr.collect {
+ case ae: AggregateExpression if
ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
+ assert(ae.mode == Complete)
+ ae
+ }
+ private val declarativeAggregates: Seq[DeclarativeAggregate] =
aggregates.map {
+ ae => ae.aggregateFunction.asInstanceOf[DeclarativeAggregate]
+ }
+ private val resultExpr: Expression = statsColExpr.transform {
+ case ae: AggregateExpression if
ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
+
ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression
+ }
+ private val aggBufferAttrs: Seq[Attribute] =
+ declarativeAggregates.flatMap(_.aggBufferAttributes)
+ private val emptyRow: InternalRow = {
+ val initializeStats = GenerateMutableProjection.generate(
+ expressions = declarativeAggregates.flatMap(_.initialValues),
+ inputSchema = Seq.empty,
+ useSubexprElimination = false
+ )
+ val buffer = new SpecificInternalRow(aggBufferAttrs.map(_.dataType))
+ initializeStats.target(buffer).apply(EmptyRow)
+ }
+ private val getStats: Projection = UnsafeProjection.create(
+ exprs = Seq(resultExpr),
+ inputSchema = aggBufferAttrs
+ )
+ private val taskContext = TaskContext.get()
+ private val dummyKeyAttr = {
+ // FIXME: We have to force the use of Velox's streaming aggregation
since hash aggregation
+ // doesn't support task barriers. But as streaming aggregation should
always be keyed, we
+ // have to do a small hack here by adding a dummy key for the global
aggregation.
+ AttributeReference("__GLUTEN_DELTA_DUMMY_KEY__", IntegerType)()
+ }
+ private val statsAttrs =
aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
+ private val statsResultAttrs =
aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
+ private val veloxAggTask: ColumnarBatchOutIterator = {
+ val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols)
+ val aggOp = SortAggregateExec(
+ None,
+ isStreaming = false,
+ None,
+ Seq(dummyKeyAttr),
+ aggregates,
+ statsAttrs,
+ 0,
+ dummyKeyAttr +: statsResultAttrs,
+ inputNode
+ )
+ val projOp = ProjectExec(statsResultAttrs, aggOp)
+ // Invoke the legacy transform rule to get a local Velox aggregation
query plan.
+ val offloads = Seq(OffloadOthers()).map(_.toStrcitRule())
+ val validatorBuilder: GlutenConfig => Validator = conf =>
+ Validators.newValidator(conf, offloads)
+ val rewrites = Seq(PullOutPreProject)
+ val config = GlutenConfig.get
+ val transformRule =
+ HeuristicTransform.WithRewrites(validatorBuilder(config), rewrites,
offloads)
+ val veloxTransformer = transformRule(projOp)
+ val wholeStageTransformer =
ColumnarCollapseTransformStages(config)(veloxTransformer)
+ .asInstanceOf[WholeStageTransformer]
+ .child
+ .asInstanceOf[TransformSupport]
+ val substraitContext = new SubstraitContext
+ TransformerState.enterValidation
+ val transformedNode =
+ try {
+ wholeStageTransformer.transform(substraitContext)
+ } finally {
+ TransformerState.finishValidation
+ }
+ val outNames =
wholeStageTransformer.output.map(ConverterUtils.genColumnNameWithExprId).asJava
+ val planNode =
+ PlanBuilder.makePlan(substraitContext,
Lists.newArrayList(transformedNode.root), outNames)
+
+ val spillDirPath = SparkDirectoryUtil
+ .get()
+ .namespace("gluten-spill")
+ .mkChildDirRoundRobin(UUID.randomUUID.toString)
+ .getAbsolutePath
+ val nativeOutItr = evaluator
+ .createKernelWithBatchIterator(
+ planNode.toProtobuf.toByteArray,
+ null,
+ null,
+ 0,
+
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
+ )
+ nativeOutItr
+ }
+
+ private val resultJsonMap: mutable.Map[String, String] = mutable.Map()
+
+ private var currentPath: String = _
+ private var currentJsonFuture: Future[String] = _
+
+ override def newPartition(partitionValues: InternalRow): Unit = {}
+
+ override def newFile(filePath: String): Unit = {
+ assert(currentPath == null)
+ veloxAggTask.addIteratorSplits(Array(newIteratorFromInputQueue()))
+ veloxAggTask.requestBarrier()
+ currentJsonFuture = resultThreadRunner.submit(new Callable[String] {
+ private val resultThreadName =
+ s"Gluten Delta Statistics Writer - ${System.identityHashCode(this)}"
+ override def call(): String = {
+ Thread.currentThread().setName(resultThreadName)
+ TaskContext.setTaskContext(taskContext)
+ val outBatches = veloxAggTask.asScala.toSeq
+ val row: InternalRow = if (outBatches.isEmpty) {
+ // No input was received. Returns the default aggregation values.
+ emptyRow
+ } else {
+ assert(outBatches.size == 1)
+ val batch = outBatches.head
+ val rows = c2r.toRowIterator(batch).toSeq
+ assert(
+ rows.size == 1,
+ "Only one single output row is expected from the global
aggregation.")
+ batch.close()
+ rows.head
+ }
+ val jsonStats = getStats(row).getString(0)
+ jsonStats
+ }
+ currentPath = filePath
+ })
+ }
+
+ override def closeFile(filePath: String): Unit = {
+ assert(filePath == currentPath)
+ val fileName = new Path(filePath).getName
+ inputBatchQueue.put(None)
+ val json = currentJsonFuture.get()
+ resultJsonMap(fileName) = json
+ currentPath = null
+ }
+
+ override def newRow(filePath: String, row: InternalRow): Unit = {
+ assert(filePath == currentPath)
+ row match {
+ case _: PlaceholderRow =>
+ case t: TerminalRow =>
+ val valueBatch = t.batch()
+ val numRows = valueBatch.numRows()
+ val dummyKeyVec = ArrowWritableColumnVector
+ .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name,
IntegerType))
+ .head
+ (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
+ val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
+ ColumnarBatches.offload(
+ ArrowBufferAllocators.contextInstance(),
+ new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
+ val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch,
valueBatch)
+ dummyKeyBatch.close()
+ valueBatch.close()
+ inputBatchQueue.put(Some(compositeBatch))
+ }
+ }
+
+ override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
+ veloxAggTask.noMoreSplits()
+ veloxAggTask.close()
+ DeltaFileStatistics(resultJsonMap.toMap)
+ }
+
+ private def newIteratorFromInputQueue(): ColumnarBatchInIterator = {
+ val itr = new ColumnarBatchInIterator(
+ BackendsApiManager.getBackendName,
+ Iterators
+ .wrap(new Iterator[ColumnarBatch] {
+ private var batch: ColumnarBatch = _
+
+ override def hasNext: Boolean = {
+ assert(batch == null)
+ while (!Thread.currentThread().isInterrupted) {
+ val tmp =
+ try {
+ inputBatchQueue.take()
+ } catch {
+ case _: InterruptedException =>
+ Thread.currentThread().interrupt()
+ return false;
+ }
+ if (tmp.isDefined) {
+ batch = tmp.get
+ return true
+ }
+ return false
+ }
+ throw new IllegalStateException()
+ }
+
+ override def next(): ColumnarBatch = {
+ assert(batch != null)
+ try {
+ batch
+ } finally {
+ batch = null
+ }
+ }
+ })
+ .recyclePayload(b => b.close())
+ .create()
+ .asJava
+ )
+ itr
+ }
+ }
+
+ private case class StatisticsInputNode(keySchema: Seq[Attribute],
dataSchema: Seq[Attribute])
+ extends GlutenPlan
+ with LeafExecNode {
+ override def output: Seq[Attribute] = keySchema ++ dataSchema
+ override def batchType(): Convention.BatchType = VeloxBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
+ throw new UnsupportedOperationException()
+ override def outputOrdering: Seq[SortOrder] = {
+ keySchema.map(key => SortOrder(key, Ascending))
+ }
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
index 646164de3a..3b06570398 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
@@ -26,10 +26,34 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
import java.util.Iterator;
public class VeloxBlockStripes extends BlockStripes {
+ private final BlockStripe[] blockStripes;
+
public VeloxBlockStripes(BlockStripes bs) {
super(bs.originBlockAddress,
bs.blockAddresses, bs.headingRowIndice, bs.originBlockNumColumns,
bs.headingRowBytes);
+ blockStripes = new BlockStripe[blockAddresses.length];
+ for (int i = 0; i < blockStripes.length; i++) {
+ final long blockAddress = blockAddresses[i];
+ final byte[] headingRowByteArray = headingRowBytes[i];
+ blockStripes[i] = new BlockStripe() {
+ private final ColumnarBatch batch =
ColumnarBatches.create(blockAddress);
+ private final UnsafeRow headingRow = new
UnsafeRow(originBlockNumColumns);
+ {
+ headingRow.pointTo(headingRowByteArray, headingRowByteArray.length);
+ }
+
+ @Override
+ public ColumnarBatch getColumnarBatch() {
+ return batch;
+ }
+
+ @Override
+ public InternalRow getHeadingRow() {
+ return headingRow;
+ }
+ };
+ }
}
@Override
@@ -44,23 +68,8 @@ public class VeloxBlockStripes extends BlockStripes {
@Override
public BlockStripe next() {
- final BlockStripe nextStripe = new BlockStripe() {
- private final long blockAddress = blockAddresses[index];
- private final byte[] headingRowByteArray = headingRowBytes[index];
-
- @Override
- public ColumnarBatch getColumnarBatch() {
- return ColumnarBatches.create(blockAddress);
- }
-
- @Override
- public InternalRow getHeadingRow() {
- UnsafeRow row = new UnsafeRow(originBlockNumColumns);
- row.pointTo(headingRowByteArray, headingRowByteArray.length);
- return row;
- }
- };
- index += 1;
+ final BlockStripe nextStripe = blockStripes[index];
+ index++;
return nextStripe;
}
};
@@ -69,7 +78,6 @@ public class VeloxBlockStripes extends BlockStripes {
@Override
public void release() {
-
+ // Do nothing. We rely on the caller to call #close API on columnar
batches returned to them.
}
}
-
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index a15cdc83d3..9d8315731f 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -110,6 +110,10 @@ class Runtime : public
std::enable_shared_from_this<Runtime> {
throw GlutenException("Not implemented");
}
+ virtual void requestBarrier(ResultIterator* iter) {
+ throw GlutenException("Not implemented");
+ }
+
virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t
numRows) {
throw GlutenException("Not implemented");
}
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index d384eebc08..c48886195b 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -699,6 +699,20 @@ JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato
JNI_METHOD_END()
}
+JNIEXPORT void JNICALL
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeRequestBarrier(
// NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong iterHandle) {
+ JNI_METHOD_START
+ auto ctx = getRuntime(env, wrapper);
+ auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
+ if (iter == nullptr) {
+ throw GlutenException("Invalid iterator handle for requestBarrier");
+ }
+ ctx->requestBarrier(iter.get());
+ JNI_METHOD_END()
+}
+
JNIEXPORT jlong JNICALL
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit(
// NOLINT
JNIEnv* env,
diff --git a/cpp/core/memory/SplitAwareColumnarBatchIterator.h
b/cpp/core/memory/SplitAwareColumnarBatchIterator.h
index e12f38afad..987eb04351 100644
--- a/cpp/core/memory/SplitAwareColumnarBatchIterator.h
+++ b/cpp/core/memory/SplitAwareColumnarBatchIterator.h
@@ -44,6 +44,12 @@ class SplitAwareColumnarBatchIterator : public
ColumnarBatchIterator {
/// Signal that no more splits will be added to this iterator.
/// This must be called after all splits have been added to ensure proper
task completion.
virtual void noMoreSplits() = 0;
+
+ /// Request a barrier in task execution. This signals the task to finish
processing
+ /// all currently queued splits and drain all stateful operators before
continuing.
+ /// Enables task reuse and deterministic execution for streaming workloads.
+ /// @see https://facebookincubator.github.io/velox/develop/task-barrier.html
+ virtual void requestBarrier() = 0;
};
} // namespace gluten
diff --git a/cpp/velox/compute/VeloxRuntime.cc
b/cpp/velox/compute/VeloxRuntime.cc
index e88cb43a91..d97498586b 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -215,6 +215,14 @@ void VeloxRuntime::noMoreSplits(ResultIterator* iter){
splitAwareIter->noMoreSplits();
}
+void VeloxRuntime::requestBarrier(ResultIterator* iter){
+ auto* splitAwareIter =
dynamic_cast<gluten::SplitAwareColumnarBatchIterator*>(iter->getInputIter());
+ if (splitAwareIter == nullptr) {
+ throw GlutenException("Iterator does not support split management");
+ }
+ splitAwareIter->requestBarrier();
+}
+
std::shared_ptr<ColumnarToRowConverter>
VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) {
auto veloxPool = memoryManager()->getLeafMemoryPool();
return std::make_shared<VeloxColumnarToRowConverter>(veloxPool,
column2RowMemThreshold);
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index a3c3da0c5a..728cc46c92 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -61,6 +61,8 @@ class VeloxRuntime final : public Runtime {
void noMoreSplits(ResultIterator* iter) override;
+ void requestBarrier(ResultIterator* iter) override;
+
std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t
column2RowMemThreshold) override;
std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows)
override;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc
b/cpp/velox/compute/WholeStageResultIterator.cc
index 185dc5fd5e..157f9829b8 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -394,6 +394,13 @@ void WholeStageResultIterator::noMoreSplits() {
allSplitsAdded_ = true;
}
+void WholeStageResultIterator::requestBarrier() {
+ if (task_ == nullptr) {
+ throw GlutenException("Cannot request barrier: task is null");
+ }
+ task_->requestBarrier();
+}
+
void WholeStageResultIterator::collectMetrics() {
if (metrics_) {
// The metrics has already been created.
diff --git a/cpp/velox/compute/WholeStageResultIterator.h
b/cpp/velox/compute/WholeStageResultIterator.h
index 85cb18fd25..9bb6ef8b11 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -85,6 +85,12 @@ class WholeStageResultIterator : public
SplitAwareColumnarBatchIterator {
/// This is required for proper task completion and enables future barrier
support.
void noMoreSplits() override;
+ /// Request a barrier in the Velox task execution.
+ /// This signals the task to finish processing all currently queued splits
+ /// and drain all stateful operators before continuing.
+ /// @see https://facebookincubator.github.io/velox/develop/task-barrier.html
+ void requestBarrier() override;
+
private:
/// Get the Spark confs to Velox query context.
std::unordered_map<std::string, std::string> getQueryContextConf();
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
index ad200dd46c..27162a800f 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
@@ -58,6 +58,8 @@ public class ColumnarBatchOutIterator extends
ClosableIterator<ColumnarBatch>
private native void nativeNoMoreSplits(long iterHandle);
+ private native void nativeRequestBarrier(long iterHandle);
+
@Override
public boolean hasNext0() throws IOException {
return nativeHasNext(iterHandle);
@@ -97,7 +99,7 @@ public class ColumnarBatchOutIterator extends
ClosableIterator<ColumnarBatch>
/**
* Signal that no more splits will be added to the iterator. This is
required for proper task
- * completion and is a prerequisite for barrier support.
+ * completion.
*
* @throws IllegalStateException if the iterator is closed
*/
@@ -108,6 +110,26 @@ public class ColumnarBatchOutIterator extends
ClosableIterator<ColumnarBatch>
nativeNoMoreSplits(iterHandle);
}
+ /**
+ * Request a barrier in the task execution. This signals the task to finish
processing all
+ * currently queued splits and drain all stateful operators before
continuing. After calling this
+ * method, continue calling next() to fetch results. When next() returns
null and hasNext()
+ * returns false, the barrier has been reached.
+ *
+ * <p>This enables task reuse and deterministic execution for workloads like
AI training data
+ * loading and real-time streaming processing.
+ *
+ * @throws IllegalStateException if the iterator is closed
+ * @see <a
href="https://facebookincubator.github.io/velox/develop/task-barrier.html">Velox
Task
+ * Barrier Documentation</a>
+ */
+ public void requestBarrier() {
+ if (closed.get()) {
+ throw new IllegalStateException("Cannot call requestBarrier on a closed
iterator");
+ }
+ nativeRequestBarrier(iterHandle);
+ }
+
@Override
public void close0() {
// To make sure the outputted batches are still accessible after the
iterator is closed.
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
index b6f1313ff5..31ce4b05af 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
@@ -43,6 +43,11 @@ object Validator {
case object Passed extends OutCome
case class Failed private (reason: String) extends OutCome
+ private object NoopValidator extends Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = pass()
+ }
+
+ def noop(): Validator = NoopValidator
def builder(): Builder = Builder()
class Builder private {
@@ -75,10 +80,6 @@ object Validator {
private object Builder {
def apply(): Builder = new Builder()
- private object NoopValidator extends Validator {
- override def validate(plan: SparkPlan): Validator.OutCome = pass()
- }
-
private class ValidatorPipeline(val validators: Seq[Validator]) extends
Validator {
assert(!validators.exists(_.isInstanceOf[ValidatorPipeline]))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]