This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d45c8f9488 [GLUTEN-10215][VL] Delta write: Native statistics tracker 
to eliminate C2R overhead (#11419)
d45c8f9488 is described below

commit d45c8f9488e64b2f728b15bad759fedcd6d437fa
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Feb 13 13:36:54 2026 +0000

    [GLUTEN-10215][VL] Delta write: Native statistics tracker to eliminate C2R 
overhead (#11419)
---
 .../sql/delta/GlutenOptimisticTransaction.scala    |  13 +-
 .../delta/files/GlutenDeltaFileFormatWriter.scala  |  19 +-
 ...la => GlutenDeltaJobStatsFallbackTracker.scala} |  51 +--
 ...=> GlutenDeltaJobStatsRowCountingTracker.scala} |  61 ++--
 .../delta/stats/GlutenDeltaJobStatsTracker.scala   | 342 +++++++++++++++++++++
 .../datasources/velox/VeloxBlockStripes.java       |  46 +--
 cpp/core/compute/Runtime.h                         |   4 +
 cpp/core/jni/JniWrapper.cc                         |  14 +
 cpp/core/memory/SplitAwareColumnarBatchIterator.h  |   6 +
 cpp/velox/compute/VeloxRuntime.cc                  |   8 +
 cpp/velox/compute/VeloxRuntime.h                   |   2 +
 cpp/velox/compute/WholeStageResultIterator.cc      |   7 +
 cpp/velox/compute/WholeStageResultIterator.h       |   6 +
 .../vectorized/ColumnarBatchOutIterator.java       |  24 +-
 .../extension/columnar/validator/Validator.scala   |   9 +-
 15 files changed, 501 insertions(+), 111 deletions(-)

diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
index af19d1df9f..351c2effb7 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.delta.hooks.AutoCompact
 import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec, 
GlutenDeltaOptimizedWriterExec}
 import org.apache.spark.sql.delta.schema.InnerInvariantViolationException
 import org.apache.spark.sql.delta.sources.DeltaSQLConf
-import 
org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker, 
GlutenDeltaJobStatisticsTracker}
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
FileFormatWriter, WriteJobStatsTracker}
@@ -72,8 +71,7 @@ class GlutenOptimisticTransaction(delegate: 
OptimisticTransaction)
     // the FileFormatWriter.write call below and will collect per-file stats 
using
     // StatisticsCollection
     val optionalStatsTracker =
-      getOptionalStatsTrackerAndStatsCollection(output, outputPath, 
partitionSchema, data)._1.map(
-        new GlutenDeltaJobStatisticsTracker(_))
+      getOptionalStatsTrackerAndStatsCollection(output, outputPath, 
partitionSchema, data)._1
 
     val constraints =
       Constraints.getAll(metadata, spark) ++ generatedColumnConstraints ++ 
additionalConstraints
@@ -87,7 +85,6 @@ class GlutenOptimisticTransaction(delegate: 
OptimisticTransaction)
         statsDataSchema,
         trackIdentityHighWaterMarks
       )
-      .map(new GlutenDeltaIdentityColumnStatsTracker(_))
 
     SQLExecution.withNewExecutionId(queryExecution, 
Option("deltaTransactionalWrite")) {
       val outputSpec = FileFormatWriter.OutputSpec(outputPath.toString, 
Map.empty, output)
@@ -96,8 +93,8 @@ class GlutenOptimisticTransaction(delegate: 
OptimisticTransaction)
         convertEmptyToNullIfNeeded(queryExecution.executedPlan, 
partitioningColumns, constraints)
       val maybeCheckInvariants = if (constraints.isEmpty) {
         // Compared to vanilla Delta, we simply avoid adding the invariant 
checker
-        // when the constraint list is empty, to omit the unnecessary 
transitions
-        // added around the invariant checker.
+        // when the constraint list is empty, to prevent the unnecessary 
transitions
+        // from being added around the invariant checker.
         empty2NullPlan
       } else {
         DeltaInvariantCheckerExec(empty2NullPlan, constraints)
@@ -206,7 +203,7 @@ class GlutenOptimisticTransaction(delegate: 
OptimisticTransaction)
          committer.addedStatuses.map {
            a =>
              a.copy(stats = optionalStatsTracker
-               .map(_.delegate.recordedStats(a.toPath.getName))
+               .map(_.recordedStats(a.toPath.getName))
                .getOrElse(a.stats))
          }
        } else {
@@ -235,7 +232,7 @@ class GlutenOptimisticTransaction(delegate: 
OptimisticTransaction)
     if (resultFiles.nonEmpty && !isOptimize) 
registerPostCommitHook(AutoCompact)
     // Record the updated high water marks to be used during transaction 
commit.
     identityTrackerOpt.ifDefined {
-      tracker => 
updatedIdentityHighWaterMarks.appendAll(tracker.delegate.highWaterMarks.toSeq)
+      tracker => 
updatedIdentityHighWaterMarks.appendAll(tracker.highWaterMarks.toSeq)
     }
 
     resultFiles.toSeq ++ committer.changeFiles
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index 74a1e3f036..653b705bec 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -36,6 +36,7 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.connector.write.WriterCommitMessage
 import org.apache.spark.sql.delta.{DeltaOptions, GlutenParquetFileFormat}
 import org.apache.spark.sql.delta.logging.DeltaLogKeys
+import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -125,13 +126,14 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
     val dataSchema = dataColumns.toStructType
     DataSourceUtils.verifySchema(fileFormat, dataSchema)
     DataSourceUtils.checkFieldNames(fileFormat, dataSchema)
-    // Note: prepareWrite has side effect. It sets "job".
+    val isNativeWritable = GlutenParquetFileFormat.isNativeWritable(dataSchema)
 
     val outputDataColumns =
       if 
(caseInsensitiveOptions.get(DeltaOptions.WRITE_PARTITION_COLUMNS).contains("true"))
 {
         dataColumns ++ partitionColumns
       } else dataColumns
 
+    // Note: prepareWrite has side effect. It sets "job".
     val outputWriterFactory =
       fileFormat.prepareWrite(
         sparkSession,
@@ -140,6 +142,12 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
         outputDataColumns.toStructType
       )
 
+    val maybeWrappedStatsTrackers: Seq[WriteJobStatsTracker] = if 
(isNativeWritable) {
+      statsTrackers.map(GlutenDeltaJobStatsTracker(_))
+    } else {
+      statsTrackers
+    }
+
     val description = new WriteJobDescription(
       uuid = UUID.randomUUID.toString,
       serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
@@ -157,7 +165,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
       timeZoneId = caseInsensitiveOptions
         .get(DateTimeUtils.TIMEZONE_OPTION)
         .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone),
-      statsTrackers = statsTrackers
+      statsTrackers = maybeWrappedStatsTrackers
     )
 
     // We should first sort by dynamic partition columns, then bucket id, and 
finally sorting
@@ -222,7 +230,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
         partitionColumns,
         sortColumns,
         orderingMatched,
-        GlutenParquetFileFormat.isNativeWritable(dataSchema)
+        isNativeWritable
       )
     }
   }
@@ -459,6 +467,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
       } else {
         concurrentOutputWriterSpec match {
           case Some(spec) =>
+            // TODO: Concurrent writer is not yet supported.
             new DynamicPartitionDataConcurrentWriter(
               description,
               taskAttemptContext,
@@ -468,7 +477,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
           case _ =>
             // Columnar-based partition writer to divide the input batch by 
partition values
             // and bucket IDs in advance.
-            new ColumnarDynamicPartitionDataSingleWriter(description, 
taskAttemptContext, committer)
+            new GlutenDynamicPartitionDataSingleWriter(description, 
taskAttemptContext, committer)
         }
       }
 
@@ -525,7 +534,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
     }
   }
 
-  private class ColumnarDynamicPartitionDataSingleWriter(
+  private class GlutenDynamicPartitionDataSingleWriter(
                                                           description: 
WriteJobDescription,
                                                           taskAttemptContext: 
TaskAttemptContext,
                                                           committer: 
FileCommitProtocol,
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala
similarity index 57%
copy from 
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
copy to 
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala
index 30e61730c1..4eab5408fa 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsFallbackTracker.scala
@@ -19,17 +19,19 @@ package org.apache.spark.sql.delta.stats
 import org.apache.gluten.execution.{PlaceholderRow, TerminalRow, 
VeloxColumnarToRowExec}
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker
 import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, 
WriteTaskStats, WriteTaskStatsTracker}
 import org.apache.spark.sql.execution.metric.SQLMetric
 
-class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker)
+/**
+ * A fallback stats tracker where a C2R converter converts all the incoming 
batches to rows then
+ * send to the delegate tracker.
+ */
+private[stats] class GlutenDeltaJobStatsFallbackTracker(val delegate: 
WriteJobStatsTracker)
   extends WriteJobStatsTracker {
-  import GlutenDeltaJobStatisticsTracker._
+  import GlutenDeltaJobStatsFallbackTracker._
 
   override def newTaskInstance(): WriteTaskStatsTracker = {
-    new GlutenDeltaTaskStatisticsTracker(
-      delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker])
+    new GlutenDeltaTaskStatsFallbackTracker(delegate.newTaskInstance())
   }
 
   override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): 
Unit = {
@@ -37,43 +39,22 @@ class GlutenDeltaJobStatisticsTracker(val delegate: 
DeltaJobStatisticsTracker)
   }
 }
 
-class GlutenDeltaIdentityColumnStatsTracker(override val delegate: 
DeltaIdentityColumnStatsTracker)
-  extends GlutenDeltaJobStatisticsTracker(delegate)
-
-private object GlutenDeltaJobStatisticsTracker {
-
-  /**
-   * This is a temporary implementation of statistics tracker for Delta Lake. 
It's sub-optimal in
-   * performance because it internally performs C2R then send rows to the 
delegate row-based
-   * tracker.
-   *
-   * TODO: Columnar-based statistics collection.
-   */
-  private class GlutenDeltaTaskStatisticsTracker(delegate: 
DeltaTaskStatisticsTracker)
+private object GlutenDeltaJobStatsFallbackTracker {
+  private class GlutenDeltaTaskStatsFallbackTracker(delegate: 
WriteTaskStatsTracker)
     extends WriteTaskStatsTracker {
-
     private val c2r = new VeloxColumnarToRowExec.Converter(new 
SQLMetric("convertTime"))
 
-    override def newPartition(partitionValues: InternalRow): Unit = {
+    override def newPartition(partitionValues: InternalRow): Unit =
       delegate.newPartition(partitionValues)
-    }
 
-    override def newFile(filePath: String): Unit = {
-      delegate.newFile(filePath)
-    }
+    override def newFile(filePath: String): Unit = delegate.newFile(filePath)
 
-    override def closeFile(filePath: String): Unit = {
-      delegate.closeFile(filePath)
-    }
+    override def closeFile(filePath: String): Unit = 
delegate.closeFile(filePath)
 
-    override def newRow(filePath: String, row: InternalRow): Unit = {
-      row match {
-        case _: PlaceholderRow =>
-        case t: TerminalRow =>
-          c2r.toRowIterator(t.batch()).foreach(eachRow => 
delegate.newRow(filePath, eachRow))
-        case otherRow =>
-          delegate.newRow(filePath, otherRow)
-      }
+    override def newRow(filePath: String, row: InternalRow): Unit = row match {
+      case _: PlaceholderRow =>
+      case t: TerminalRow =>
+        c2r.toRowIterator(t.batch()).foreach(eachRow => 
delegate.newRow(filePath, eachRow))
     }
 
     override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala
similarity index 50%
rename from 
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
rename to 
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala
index 30e61730c1..d4086cf8e9 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaWriteJobStatsTracker.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsRowCountingTracker.scala
@@ -16,20 +16,22 @@
  */
 package org.apache.spark.sql.delta.stats
 
-import org.apache.gluten.execution.{PlaceholderRow, TerminalRow, 
VeloxColumnarToRowExec}
+import org.apache.gluten.execution.{PlaceholderRow, TerminalRow}
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.delta.DeltaIdentityColumnStatsTracker
 import org.apache.spark.sql.execution.datasources.{WriteJobStatsTracker, 
WriteTaskStats, WriteTaskStatsTracker}
-import org.apache.spark.sql.execution.metric.SQLMetric
 
-class GlutenDeltaJobStatisticsTracker(val delegate: DeltaJobStatisticsTracker)
+/**
+ * A fallback stats tracker to simply call `newRow` many times when a columnar 
batch comes. There is
+ * no C2R process involved to save performance. Therefore, the delegate stats 
tracker must not read
+ * the row in its `newRow` implementation.
+ */
+private[stats] class GlutenDeltaJobStatsRowCountingTracker(val delegate: 
WriteJobStatsTracker)
   extends WriteJobStatsTracker {
-  import GlutenDeltaJobStatisticsTracker._
+  import GlutenDeltaJobStatsRowCountingTracker._
 
   override def newTaskInstance(): WriteTaskStatsTracker = {
-    new GlutenDeltaTaskStatisticsTracker(
-      delegate.newTaskInstance().asInstanceOf[DeltaTaskStatisticsTracker])
+    new GlutenDeltaTaskStatsRowCountingTracker(delegate.newTaskInstance())
   }
 
   override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): 
Unit = {
@@ -37,43 +39,24 @@ class GlutenDeltaJobStatisticsTracker(val delegate: 
DeltaJobStatisticsTracker)
   }
 }
 
-class GlutenDeltaIdentityColumnStatsTracker(override val delegate: 
DeltaIdentityColumnStatsTracker)
-  extends GlutenDeltaJobStatisticsTracker(delegate)
-
-private object GlutenDeltaJobStatisticsTracker {
-
-  /**
-   * This is a temporary implementation of statistics tracker for Delta Lake. 
It's sub-optimal in
-   * performance because it internally performs C2R then send rows to the 
delegate row-based
-   * tracker.
-   *
-   * TODO: Columnar-based statistics collection.
-   */
-  private class GlutenDeltaTaskStatisticsTracker(delegate: 
DeltaTaskStatisticsTracker)
+private object GlutenDeltaJobStatsRowCountingTracker {
+  private class GlutenDeltaTaskStatsRowCountingTracker(delegate: 
WriteTaskStatsTracker)
     extends WriteTaskStatsTracker {
-
-    private val c2r = new VeloxColumnarToRowExec.Converter(new 
SQLMetric("convertTime"))
-
-    override def newPartition(partitionValues: InternalRow): Unit = {
+    override def newPartition(partitionValues: InternalRow): Unit =
       delegate.newPartition(partitionValues)
-    }
 
-    override def newFile(filePath: String): Unit = {
-      delegate.newFile(filePath)
-    }
+    override def newFile(filePath: String): Unit = delegate.newFile(filePath)
 
-    override def closeFile(filePath: String): Unit = {
-      delegate.closeFile(filePath)
-    }
+    override def closeFile(filePath: String): Unit = 
delegate.closeFile(filePath)
 
-    override def newRow(filePath: String, row: InternalRow): Unit = {
-      row match {
-        case _: PlaceholderRow =>
-        case t: TerminalRow =>
-          c2r.toRowIterator(t.batch()).foreach(eachRow => 
delegate.newRow(filePath, eachRow))
-        case otherRow =>
-          delegate.newRow(filePath, otherRow)
-      }
+    override def newRow(filePath: String, row: InternalRow): Unit = row match {
+      case _: PlaceholderRow =>
+      case t: TerminalRow =>
+        for (_ <- 0 until t.batch().numRows()) {
+          // Here we pass null row to the delegate stats tracker as we assume
+          // the delegate stats tracker only counts on row numbers.
+          delegate.newRow(filePath, null)
+        }
     }
 
     override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
new file mode 100644
index 0000000000..ae11263943
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.stats
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution._
+import org.apache.gluten.expression.{ConverterUtils, TransformerState}
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.offload.OffloadOthers
+import org.apache.gluten.extension.columnar.rewrite.PullOutPreProject
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.plan.PlanBuilder
+import org.apache.gluten.vectorized.{ArrowWritableColumnVector, 
ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeReference, EmptyRow, Expression, Projection, SortOrder, 
SpecificInternalRow, UnsafeProjection}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Complete, DeclarativeAggregate}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
LeafExecNode, ProjectExec}
+import org.apache.spark.sql.execution.aggregate.SortAggregateExec
+import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker}
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil}
+
+import com.google.common.collect.Lists
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import java.util.UUID
+import java.util.concurrent.{Callable, Executors, Future, SynchronousQueue}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Gluten's stats tracker with vectorized aggregation inside to produce 
statistics efficiently. */
+private[stats] class GlutenDeltaJobStatsTracker(val delegate: 
DeltaJobStatisticsTracker)
+  extends WriteJobStatsTracker {
+  import GlutenDeltaJobStatsTracker._
+
+  @transient private val hadoopConf: Configuration = {
+    val clazz = classOf[DeltaJobStatisticsTracker]
+    val method = clazz.getDeclaredField("hadoopConf")
+    method.setAccessible(true)
+    method.get(delegate).asInstanceOf[Configuration]
+  }
+  @transient private val path = delegate.path
+  private val dataCols = delegate.dataCols
+  private val statsColExpr = delegate.statsColExpr
+
+  private val srlHadoopConf = new SerializableConfiguration(hadoopConf)
+  private val rootUri = 
path.getFileSystem(hadoopConf).makeQualified(path).toUri
+
+  override def newTaskInstance(): WriteTaskStatsTracker = {
+    val rootPath = new Path(rootUri)
+    val hadoopConf = srlHadoopConf.value
+    new GlutenDeltaTaskStatsTracker(dataCols, statsColExpr, rootPath, 
hadoopConf)
+  }
+
+  override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): 
Unit = {
+    delegate.processStats(stats, jobCommitTime)
+  }
+}
+
+object GlutenDeltaJobStatsTracker extends Logging {
+  def apply(tracker: WriteJobStatsTracker): WriteJobStatsTracker = tracker 
match {
+    case tracker: BasicWriteJobStatsTracker =>
+      new GlutenDeltaJobStatsRowCountingTracker(tracker)
+    case tracker: DeltaJobStatisticsTracker =>
+      new GlutenDeltaJobStatsTracker(tracker)
+    case tracker =>
+      logWarning(
+        "Gluten Delta: Creating fallback job stats tracker," +
+          " this involves frequent columnar-to-row conversions which may cause 
performance" +
+          " issues.")
+      new GlutenDeltaJobStatsFallbackTracker(tracker)
+  }
+
+  /** A columnar-based statistics collection for Gluten + Delta Lake. */
+  private class GlutenDeltaTaskStatsTracker(
+      dataCols: Seq[Attribute],
+      statsColExpr: Expression,
+      rootPath: Path,
+      hadoopConf: Configuration)
+    extends WriteTaskStatsTracker {
+    // We use one single thread to ensure the statistic files are written in 
serial.
+    // Do not increase the thread number, otherwise sanity will not be 
guaranteed.
+    private val resultThreadRunner = Executors.newSingleThreadExecutor()
+    private val evaluator = NativePlanEvaluator.create(
+      BackendsApiManager.getBackendName,
+      Map.empty[String, String].asJava)
+    private val c2r = new VeloxColumnarToRowExec.Converter(new 
SQLMetric("convertTime"))
+    private val inputBatchQueue = new SynchronousQueue[Option[ColumnarBatch]]()
+    private val aggregates: Seq[AggregateExpression] = statsColExpr.collect {
+      case ae: AggregateExpression if 
ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
+        assert(ae.mode == Complete)
+        ae
+    }
+    private val declarativeAggregates: Seq[DeclarativeAggregate] = 
aggregates.map {
+      ae => ae.aggregateFunction.asInstanceOf[DeclarativeAggregate]
+    }
+    private val resultExpr: Expression = statsColExpr.transform {
+      case ae: AggregateExpression if 
ae.aggregateFunction.isInstanceOf[DeclarativeAggregate] =>
+        
ae.aggregateFunction.asInstanceOf[DeclarativeAggregate].evaluateExpression
+    }
+    private val aggBufferAttrs: Seq[Attribute] =
+      declarativeAggregates.flatMap(_.aggBufferAttributes)
+    private val emptyRow: InternalRow = {
+      val initializeStats = GenerateMutableProjection.generate(
+        expressions = declarativeAggregates.flatMap(_.initialValues),
+        inputSchema = Seq.empty,
+        useSubexprElimination = false
+      )
+      val buffer = new SpecificInternalRow(aggBufferAttrs.map(_.dataType))
+      initializeStats.target(buffer).apply(EmptyRow)
+    }
+    private val getStats: Projection = UnsafeProjection.create(
+      exprs = Seq(resultExpr),
+      inputSchema = aggBufferAttrs
+    )
+    private val taskContext = TaskContext.get()
+    private val dummyKeyAttr = {
+      // FIXME: We have to force the use of Velox's streaming aggregation 
since hash aggregation
+      //  doesn't support task barriers. But as streaming aggregation should 
always be keyed, we
+      //  have to do a small hack here by adding a dummy key for the global 
aggregation.
+      AttributeReference("__GLUTEN_DELTA_DUMMY_KEY__", IntegerType)()
+    }
+    private val statsAttrs = 
aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
+    private val statsResultAttrs = 
aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
+    private val veloxAggTask: ColumnarBatchOutIterator = {
+      val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols)
+      val aggOp = SortAggregateExec(
+        None,
+        isStreaming = false,
+        None,
+        Seq(dummyKeyAttr),
+        aggregates,
+        statsAttrs,
+        0,
+        dummyKeyAttr +: statsResultAttrs,
+        inputNode
+      )
+      val projOp = ProjectExec(statsResultAttrs, aggOp)
+      // Invoke the legacy transform rule to get a local Velox aggregation 
query plan.
+      val offloads = Seq(OffloadOthers()).map(_.toStrcitRule())
+      val validatorBuilder: GlutenConfig => Validator = conf =>
+        Validators.newValidator(conf, offloads)
+      val rewrites = Seq(PullOutPreProject)
+      val config = GlutenConfig.get
+      val transformRule =
+        HeuristicTransform.WithRewrites(validatorBuilder(config), rewrites, 
offloads)
+      val veloxTransformer = transformRule(projOp)
+      val wholeStageTransformer = 
ColumnarCollapseTransformStages(config)(veloxTransformer)
+        .asInstanceOf[WholeStageTransformer]
+        .child
+        .asInstanceOf[TransformSupport]
+      val substraitContext = new SubstraitContext
+      TransformerState.enterValidation
+      val transformedNode =
+        try {
+          wholeStageTransformer.transform(substraitContext)
+        } finally {
+          TransformerState.finishValidation
+        }
+      val outNames = 
wholeStageTransformer.output.map(ConverterUtils.genColumnNameWithExprId).asJava
+      val planNode =
+        PlanBuilder.makePlan(substraitContext, 
Lists.newArrayList(transformedNode.root), outNames)
+
+      val spillDirPath = SparkDirectoryUtil
+        .get()
+        .namespace("gluten-spill")
+        .mkChildDirRoundRobin(UUID.randomUUID.toString)
+        .getAbsolutePath
+      val nativeOutItr = evaluator
+        .createKernelWithBatchIterator(
+          planNode.toProtobuf.toByteArray,
+          null,
+          null,
+          0,
+          
BackendsApiManager.getSparkPlanExecApiInstance.rewriteSpillPath(spillDirPath)
+        )
+      nativeOutItr
+    }
+
+    private val resultJsonMap: mutable.Map[String, String] = mutable.Map()
+
+    private var currentPath: String = _
+    private var currentJsonFuture: Future[String] = _
+
+    override def newPartition(partitionValues: InternalRow): Unit = {}
+
+    override def newFile(filePath: String): Unit = {
+      assert(currentPath == null)
+      veloxAggTask.addIteratorSplits(Array(newIteratorFromInputQueue()))
+      veloxAggTask.requestBarrier()
+      currentJsonFuture = resultThreadRunner.submit(new Callable[String] {
+        private val resultThreadName =
+          s"Gluten Delta Statistics Writer - ${System.identityHashCode(this)}"
+        override def call(): String = {
+          Thread.currentThread().setName(resultThreadName)
+          TaskContext.setTaskContext(taskContext)
+          val outBatches = veloxAggTask.asScala.toSeq
+          val row: InternalRow = if (outBatches.isEmpty) {
+            // No input was received. Returns the default aggregation values.
+            emptyRow
+          } else {
+            assert(outBatches.size == 1)
+            val batch = outBatches.head
+            val rows = c2r.toRowIterator(batch).toSeq
+            assert(
+              rows.size == 1,
+              "Only one single output row is expected from the global 
aggregation.")
+            batch.close()
+            rows.head
+          }
+          val jsonStats = getStats(row).getString(0)
+          jsonStats
+        }
+        currentPath = filePath
+      })
+    }
+
+    override def closeFile(filePath: String): Unit = {
+      assert(filePath == currentPath)
+      val fileName = new Path(filePath).getName
+      inputBatchQueue.put(None)
+      val json = currentJsonFuture.get()
+      resultJsonMap(fileName) = json
+      currentPath = null
+    }
+
+    override def newRow(filePath: String, row: InternalRow): Unit = {
+      assert(filePath == currentPath)
+      row match {
+        case _: PlaceholderRow =>
+        case t: TerminalRow =>
+          val valueBatch = t.batch()
+          val numRows = valueBatch.numRows()
+          val dummyKeyVec = ArrowWritableColumnVector
+            .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, 
IntegerType))
+            .head
+          (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
+          val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
+            ColumnarBatches.offload(
+              ArrowBufferAllocators.contextInstance(),
+              new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
+          val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, 
valueBatch)
+          dummyKeyBatch.close()
+          valueBatch.close()
+          inputBatchQueue.put(Some(compositeBatch))
+      }
+    }
+
+    override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
+      veloxAggTask.noMoreSplits()
+      veloxAggTask.close()
+      DeltaFileStatistics(resultJsonMap.toMap)
+    }
+
+    private def newIteratorFromInputQueue(): ColumnarBatchInIterator = {
+      val itr = new ColumnarBatchInIterator(
+        BackendsApiManager.getBackendName,
+        Iterators
+          .wrap(new Iterator[ColumnarBatch] {
+            private var batch: ColumnarBatch = _
+
+            override def hasNext: Boolean = {
+              assert(batch == null)
+              while (!Thread.currentThread().isInterrupted) {
+                val tmp =
+                  try {
+                    inputBatchQueue.take()
+                  } catch {
+                    case _: InterruptedException =>
+                      Thread.currentThread().interrupt()
+                      return false;
+                  }
+                if (tmp.isDefined) {
+                  batch = tmp.get
+                  return true
+                }
+                return false
+              }
+              throw new IllegalStateException()
+            }
+
+            override def next(): ColumnarBatch = {
+              assert(batch != null)
+              try {
+                batch
+              } finally {
+                batch = null
+              }
+            }
+          })
+          .recyclePayload(b => b.close())
+          .create()
+          .asJava
+      )
+      itr
+    }
+  }
+
+  private case class StatisticsInputNode(keySchema: Seq[Attribute], 
dataSchema: Seq[Attribute])
+    extends GlutenPlan
+    with LeafExecNode {
+    override def output: Seq[Attribute] = keySchema ++ dataSchema
+    override def batchType(): Convention.BatchType = VeloxBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
+      throw new UnsupportedOperationException()
+    override def outputOrdering: Seq[SortOrder] = {
+      keySchema.map(key => SortOrder(key, Ascending))
+    }
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
index 646164de3a..3b06570398 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxBlockStripes.java
@@ -26,10 +26,34 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 import java.util.Iterator;
 
 public class VeloxBlockStripes extends BlockStripes {
+  private final BlockStripe[] blockStripes;
+
   public VeloxBlockStripes(BlockStripes bs) {
     super(bs.originBlockAddress,
         bs.blockAddresses, bs.headingRowIndice, bs.originBlockNumColumns,
         bs.headingRowBytes);
+    blockStripes = new BlockStripe[blockAddresses.length];
+    for (int i = 0; i < blockStripes.length; i++) {
+      final long blockAddress = blockAddresses[i];
+      final byte[] headingRowByteArray = headingRowBytes[i];
+      blockStripes[i] = new BlockStripe() {
+        private final ColumnarBatch batch = 
ColumnarBatches.create(blockAddress);
+        private final UnsafeRow headingRow = new 
UnsafeRow(originBlockNumColumns);
+        {
+          headingRow.pointTo(headingRowByteArray, headingRowByteArray.length);
+        }
+
+        @Override
+        public ColumnarBatch getColumnarBatch() {
+          return batch;
+        }
+
+        @Override
+        public InternalRow getHeadingRow() {
+          return headingRow;
+        }
+      };
+    }
   }
 
   @Override
@@ -44,23 +68,8 @@ public class VeloxBlockStripes extends BlockStripes {
 
       @Override
       public BlockStripe next() {
-        final BlockStripe nextStripe = new BlockStripe() {
-          private final long blockAddress = blockAddresses[index];
-          private final byte[] headingRowByteArray = headingRowBytes[index];
-
-          @Override
-          public ColumnarBatch getColumnarBatch() {
-            return ColumnarBatches.create(blockAddress);
-          }
-
-          @Override
-          public InternalRow getHeadingRow() {
-            UnsafeRow row = new UnsafeRow(originBlockNumColumns);
-            row.pointTo(headingRowByteArray, headingRowByteArray.length);
-            return row;
-          }
-        };
-        index += 1;
+        final BlockStripe nextStripe = blockStripes[index];
+        index++;
         return nextStripe;
       }
     };
@@ -69,7 +78,6 @@ public class VeloxBlockStripes extends BlockStripes {
 
   @Override
   public void release() {
-
+    // Do nothing. We rely on the caller to call #close API on columnar 
batches returned to them.
   }
 }
-
diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h
index a15cdc83d3..9d8315731f 100644
--- a/cpp/core/compute/Runtime.h
+++ b/cpp/core/compute/Runtime.h
@@ -110,6 +110,10 @@ class Runtime : public 
std::enable_shared_from_this<Runtime> {
     throw GlutenException("Not implemented");
   }
 
+  virtual void requestBarrier(ResultIterator* iter) {
+    throw GlutenException("Not implemented");
+  }
+
   virtual std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t 
numRows) {
     throw GlutenException("Not implemented");
   }
diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index d384eebc08..c48886195b 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -699,6 +699,20 @@ JNIEXPORT void JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterato
   JNI_METHOD_END()
 }
 
+JNIEXPORT void JNICALL 
Java_org_apache_gluten_vectorized_ColumnarBatchOutIterator_nativeRequestBarrier(
 // NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong iterHandle) {
+  JNI_METHOD_START
+  auto ctx = getRuntime(env, wrapper);
+  auto iter = ObjectStore::retrieve<ResultIterator>(iterHandle);
+  if (iter == nullptr) {
+    throw GlutenException("Invalid iterator handle for requestBarrier");
+  }
+  ctx->requestBarrier(iter.get());
+  JNI_METHOD_END()
+}
+
 JNIEXPORT jlong JNICALL
 
Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarToRowInit(
 // NOLINT
     JNIEnv* env,
diff --git a/cpp/core/memory/SplitAwareColumnarBatchIterator.h 
b/cpp/core/memory/SplitAwareColumnarBatchIterator.h
index e12f38afad..987eb04351 100644
--- a/cpp/core/memory/SplitAwareColumnarBatchIterator.h
+++ b/cpp/core/memory/SplitAwareColumnarBatchIterator.h
@@ -44,6 +44,12 @@ class SplitAwareColumnarBatchIterator : public 
ColumnarBatchIterator {
   /// Signal that no more splits will be added to this iterator.
   /// This must be called after all splits have been added to ensure proper 
task completion.
   virtual void noMoreSplits() = 0;
+
+  /// Request a barrier in task execution. This signals the task to finish 
processing
+  /// all currently queued splits and drain all stateful operators before 
continuing.
+  /// Enables task reuse and deterministic execution for streaming workloads.
+  /// @see https://facebookincubator.github.io/velox/develop/task-barrier.html
+  virtual void requestBarrier() = 0;
 };
 
 } // namespace gluten
diff --git a/cpp/velox/compute/VeloxRuntime.cc 
b/cpp/velox/compute/VeloxRuntime.cc
index e88cb43a91..d97498586b 100644
--- a/cpp/velox/compute/VeloxRuntime.cc
+++ b/cpp/velox/compute/VeloxRuntime.cc
@@ -215,6 +215,14 @@ void VeloxRuntime::noMoreSplits(ResultIterator* iter){
     splitAwareIter->noMoreSplits();
 }
 
+void VeloxRuntime::requestBarrier(ResultIterator* iter){
+    auto* splitAwareIter = 
dynamic_cast<gluten::SplitAwareColumnarBatchIterator*>(iter->getInputIter());
+    if (splitAwareIter == nullptr) {
+      throw GlutenException("Iterator does not support split management");
+    }
+    splitAwareIter->requestBarrier();
+}
+
 std::shared_ptr<ColumnarToRowConverter> 
VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) {
   auto veloxPool = memoryManager()->getLeafMemoryPool();
   return std::make_shared<VeloxColumnarToRowConverter>(veloxPool, 
column2RowMemThreshold);
diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h
index a3c3da0c5a..728cc46c92 100644
--- a/cpp/velox/compute/VeloxRuntime.h
+++ b/cpp/velox/compute/VeloxRuntime.h
@@ -61,6 +61,8 @@ class VeloxRuntime final : public Runtime {
 
   void noMoreSplits(ResultIterator* iter) override;
 
+  void requestBarrier(ResultIterator* iter) override;
+
   std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t 
column2RowMemThreshold) override;
 
   std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) 
override;
diff --git a/cpp/velox/compute/WholeStageResultIterator.cc 
b/cpp/velox/compute/WholeStageResultIterator.cc
index 185dc5fd5e..157f9829b8 100644
--- a/cpp/velox/compute/WholeStageResultIterator.cc
+++ b/cpp/velox/compute/WholeStageResultIterator.cc
@@ -394,6 +394,13 @@ void WholeStageResultIterator::noMoreSplits() {
   allSplitsAdded_ = true;
 }
 
+void WholeStageResultIterator::requestBarrier() {
+  if (task_ == nullptr) {
+    throw GlutenException("Cannot request barrier: task is null");
+  }
+  task_->requestBarrier();
+}
+
 void WholeStageResultIterator::collectMetrics() {
   if (metrics_) {
     // The metrics has already been created.
diff --git a/cpp/velox/compute/WholeStageResultIterator.h 
b/cpp/velox/compute/WholeStageResultIterator.h
index 85cb18fd25..9bb6ef8b11 100644
--- a/cpp/velox/compute/WholeStageResultIterator.h
+++ b/cpp/velox/compute/WholeStageResultIterator.h
@@ -85,6 +85,12 @@ class WholeStageResultIterator : public 
SplitAwareColumnarBatchIterator {
   /// This is required for proper task completion and enables future barrier 
support.
   void noMoreSplits() override;
 
+  /// Request a barrier in the Velox task execution.
+  /// This signals the task to finish processing all currently queued splits
+  /// and drain all stateful operators before continuing.
+  /// @see https://facebookincubator.github.io/velox/develop/task-barrier.html
+  void requestBarrier() override;
+
  private:
   /// Get the Spark confs to Velox query context.
   std::unordered_map<std::string, std::string> getQueryContextConf();
diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
index ad200dd46c..27162a800f 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ColumnarBatchOutIterator.java
@@ -58,6 +58,8 @@ public class ColumnarBatchOutIterator extends 
ClosableIterator<ColumnarBatch>
 
   private native void nativeNoMoreSplits(long iterHandle);
 
+  private native void nativeRequestBarrier(long iterHandle);
+
   @Override
   public boolean hasNext0() throws IOException {
     return nativeHasNext(iterHandle);
@@ -97,7 +99,7 @@ public class ColumnarBatchOutIterator extends 
ClosableIterator<ColumnarBatch>
 
   /**
    * Signal that no more splits will be added to the iterator. This is 
required for proper task
-   * completion and is a prerequisite for barrier support.
+   * completion.
    *
    * @throws IllegalStateException if the iterator is closed
    */
@@ -108,6 +110,26 @@ public class ColumnarBatchOutIterator extends 
ClosableIterator<ColumnarBatch>
     nativeNoMoreSplits(iterHandle);
   }
 
+  /**
+   * Request a barrier in the task execution. This signals the task to finish 
processing all
+   * currently queued splits and drain all stateful operators before 
continuing. After calling this
+   * method, continue calling next() to fetch results. When next() returns 
null and hasNext()
+   * returns false, the barrier has been reached.
+   *
+   * <p>This enables task reuse and deterministic execution for workloads like 
AI training data
+   * loading and real-time streaming processing.
+   *
+   * @throws IllegalStateException if the iterator is closed
+   * @see <a 
href="https://facebookincubator.github.io/velox/develop/task-barrier.html";>Velox
 Task
+   *     Barrier Documentation</a>
+   */
+  public void requestBarrier() {
+    if (closed.get()) {
+      throw new IllegalStateException("Cannot call requestBarrier on a closed 
iterator");
+    }
+    nativeRequestBarrier(iterHandle);
+  }
+
   @Override
   public void close0() {
     // To make sure the outputted batches are still accessible after the 
iterator is closed.
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
index b6f1313ff5..31ce4b05af 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
@@ -43,6 +43,11 @@ object Validator {
   case object Passed extends OutCome
   case class Failed private (reason: String) extends OutCome
 
+  private object NoopValidator extends Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = pass()
+  }
+
+  def noop(): Validator = NoopValidator
   def builder(): Builder = Builder()
 
   class Builder private {
@@ -75,10 +80,6 @@ object Validator {
   private object Builder {
     def apply(): Builder = new Builder()
 
-    private object NoopValidator extends Validator {
-      override def validate(plan: SparkPlan): Validator.OutCome = pass()
-    }
-
     private class ValidatorPipeline(val validators: Seq[Validator]) extends 
Validator {
       assert(!validators.exists(_.isInstanceOf[ValidatorPipeline]))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to