This is an automated email from the ASF dual-hosted git repository.

zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new bc37334384 [GLUTEN-10215][VL] Remove streaming aggregation workaround 
in Delta stats writer (#12222)
bc37334384 is described below

commit bc373343841920618d77bd7415b4658b7c0417cb
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jun 5 11:13:22 2026 +0100

    [GLUTEN-10215][VL] Remove streaming aggregation workaround in Delta stats 
writer (#12222)
---
 .../delta/stats/GlutenDeltaJobStatsTracker.scala   | 65 ++++++++++------------
 .../delta/stats/GlutenDeltaJobStatsTracker.scala   | 65 ++++++++++------------
 .../gluten/integration/action/DataGenOnly.scala    | 52 ++++++++---------
 3 files changed, 84 insertions(+), 98 deletions(-)

diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
index 70ca536792..2ea7b9e554 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.stats
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.backendsapi.velox.VeloxBatchType
-import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
+import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution._
 import org.apache.gluten.expression.{ConverterUtils, TransformerState}
@@ -28,24 +28,22 @@ import 
org.apache.gluten.extension.columnar.rewrite.PullOutPreProject
 import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
 import org.apache.gluten.iterator.Iterators
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.plan.PlanBuilder
-import org.apache.gluten.vectorized.{ArrowWritableColumnVector, 
ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator}
+import org.apache.gluten.vectorized.{ColumnarBatchInIterator, 
ColumnarBatchOutIterator, NativePlanEvaluator}
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeReference, EmptyRow, Expression, Projection, SortOrder, 
SpecificInternalRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, 
Expression, Projection, SortOrder, SpecificInternalRow, UnsafeProjection}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Complete, DeclarativeAggregate}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
LeafExecNode, ProjectExec}
 import org.apache.spark.sql.execution.aggregate.SortAggregateExec
 import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker}
 import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.types.{IntegerType, StructType}
-import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil}
 
 import com.google.common.collect.Lists
@@ -144,25 +142,19 @@ object GlutenDeltaJobStatsTracker extends Logging {
       inputSchema = aggBufferAttrs
     )
     private val taskContext = TaskContext.get()
-    private val dummyKeyAttr = {
-      // FIXME: We have to force the use of Velox's streaming aggregation 
since hash aggregation
-      //  doesn't support task barriers. But as streaming aggregation should 
always be keyed, we
-      //  have to do a small hack here by adding a dummy key for the global 
aggregation.
-      AttributeReference("__GLUTEN_DELTA_DUMMY_KEY__", IntegerType)()
-    }
     private val statsAttrs = 
aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
     private val statsResultAttrs = 
aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
     private val veloxAggTask: ColumnarBatchOutIterator = {
-      val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols)
+      val inputNode = StatisticsInputNode(dataCols)
       val aggOp = SortAggregateExec(
         None,
         isStreaming = false,
         None,
-        Seq(dummyKeyAttr),
+        Seq.empty,
         aggregates,
         statsAttrs,
         0,
-        dummyKeyAttr +: statsResultAttrs,
+        statsResultAttrs,
         inputNode
       )
       val projOp = ProjectExec(statsResultAttrs, aggOp)
@@ -246,9 +238,23 @@ object GlutenDeltaJobStatsTracker extends Logging {
     }
 
     override def closeFile(filePath: String): Unit = {
+      def signalEoS(): Unit = {
+        while (true) {
+          if (currentJsonFuture.isDone) {
+            currentJsonFuture.get()
+            // The future should be an error since we haven't signaled EoS yet.
+            throw new IllegalStateException("Unreachable code.")
+          }
+          val queued = inputBatchQueue.offer(None, 500, 
java.util.concurrent.TimeUnit.MILLISECONDS)
+          if (queued) {
+            // The future should be done after we signal EoS.
+            return
+          }
+        }
+      }
       assert(filePath == currentPath)
+      signalEoS()
       val fileName = new Path(filePath).getName
-      inputBatchQueue.put(None)
       val json = currentJsonFuture.get()
       resultJsonMap(fileName) = json
       currentPath = null
@@ -259,20 +265,11 @@ object GlutenDeltaJobStatsTracker extends Logging {
       row match {
         case _: PlaceholderRow =>
         case t: TerminalRow =>
-          val valueBatch = t.batch()
-          val numRows = valueBatch.numRows()
-          val dummyKeyVec = ArrowWritableColumnVector
-            .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, 
IntegerType))
-            .head
-          (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
-          val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
-            ColumnarBatches.offload(
-              ArrowBufferAllocators.contextInstance(),
-              new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
-          val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, 
valueBatch)
-          dummyKeyBatch.close()
-          valueBatch.close()
-          inputBatchQueue.put(Some(compositeBatch))
+          val batch = t.batch()
+          // Counts up the reference count for the batch since it
+          // will be consumed by Velox aggregation task asynchronously.
+          ColumnarBatches.retain(batch)
+          inputBatchQueue.put(Some(batch))
       }
     }
 
@@ -326,17 +323,15 @@ object GlutenDeltaJobStatsTracker extends Logging {
     }
   }
 
-  private case class StatisticsInputNode(keySchema: Seq[Attribute], 
dataSchema: Seq[Attribute])
+  private case class StatisticsInputNode(dataSchema: Seq[Attribute])
     extends GlutenPlan
     with LeafExecNode {
-    override def output: Seq[Attribute] = keySchema ++ dataSchema
+    override def output: Seq[Attribute] = dataSchema
     override def batchType(): Convention.BatchType = VeloxBatchType
     override def rowType(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
       throw new UnsupportedOperationException()
-    override def outputOrdering: Seq[SortOrder] = {
-      keySchema.map(key => SortOrder(key, Ascending))
-    }
+    override def outputOrdering: Seq[SortOrder] = Seq.empty
   }
 }
diff --git 
a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
 
b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
index d046263b59..ca6c7a6a7f 100644
--- 
a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
+++ 
b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.stats
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.backendsapi.velox.VeloxBatchType
-import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
+import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution._
 import org.apache.gluten.expression.{ConverterUtils, TransformerState}
@@ -28,24 +28,22 @@ import 
org.apache.gluten.extension.columnar.rewrite.PullOutPreProject
 import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
 import org.apache.gluten.iterator.Iterators
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.plan.PlanBuilder
-import org.apache.gluten.vectorized.{ArrowWritableColumnVector, 
ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator}
+import org.apache.gluten.vectorized.{ColumnarBatchInIterator, 
ColumnarBatchOutIterator, NativePlanEvaluator}
 
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeReference, BindReferences, EmptyRow, Expression, RuntimeReplaceable, 
SortOrder, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, BindReferences, 
EmptyRow, Expression, RuntimeReplaceable, SortOrder, SpecificInternalRow}
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Complete, DeclarativeAggregate}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
LeafExecNode, ProjectExec}
 import org.apache.spark.sql.execution.aggregate.SortAggregateExec
 import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, 
WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker}
 import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.types.{IntegerType, StructType}
-import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil}
 
 import com.google.common.collect.Lists
@@ -148,25 +146,19 @@ object GlutenDeltaJobStatsTracker extends Logging {
     private val getStatsExpr: Expression =
       BindReferences.bindReference(normalizedResultExpr, aggBufferAttrs)
     private val taskContext = TaskContext.get()
-    private val dummyKeyAttr = {
-      // FIXME: We have to force the use of Velox's streaming aggregation 
since hash aggregation
-      //  doesn't support task barriers. But as streaming aggregation should 
always be keyed, we
-      //  have to do a small hack here by adding a dummy key for the global 
aggregation.
-      AttributeReference("__GLUTEN_DELTA_DUMMY_KEY__", IntegerType)()
-    }
     private val statsAttrs = 
aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
     private val statsResultAttrs = 
aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
     private val veloxAggTask: ColumnarBatchOutIterator = {
-      val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols)
+      val inputNode = StatisticsInputNode(dataCols)
       val aggOp = SortAggregateExec(
         None,
         isStreaming = false,
         None,
-        Seq(dummyKeyAttr),
+        Seq.empty,
         aggregates,
         statsAttrs,
         0,
-        dummyKeyAttr +: statsResultAttrs,
+        statsResultAttrs,
         inputNode
       )
       val projOp = ProjectExec(statsResultAttrs, aggOp)
@@ -250,9 +242,23 @@ object GlutenDeltaJobStatsTracker extends Logging {
     }
 
     override def closeFile(filePath: String): Unit = {
+      def signalEoS(): Unit = {
+        while (true) {
+          if (currentJsonFuture.isDone) {
+            currentJsonFuture.get()
+            // The future should be an error since we haven't signaled EoS yet.
+            throw new IllegalStateException("Unreachable code.")
+          }
+          val queued = inputBatchQueue.offer(None, 500, 
java.util.concurrent.TimeUnit.MILLISECONDS)
+          if (queued) {
+            // The future should be done after we signal EoS.
+            return
+          }
+        }
+      }
       assert(filePath == currentPath)
+      signalEoS()
       val fileName = new Path(filePath).getName
-      inputBatchQueue.put(None)
       val json = currentJsonFuture.get()
       resultJsonMap(fileName) = json
       currentPath = null
@@ -263,20 +269,11 @@ object GlutenDeltaJobStatsTracker extends Logging {
       row match {
         case _: PlaceholderRow =>
         case t: TerminalRow =>
-          val valueBatch = t.batch()
-          val numRows = valueBatch.numRows()
-          val dummyKeyVec = ArrowWritableColumnVector
-            .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name, 
IntegerType))
-            .head
-          (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
-          val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
-            ColumnarBatches.offload(
-              ArrowBufferAllocators.contextInstance(),
-              new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
-          val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch, 
valueBatch)
-          dummyKeyBatch.close()
-          valueBatch.close()
-          inputBatchQueue.put(Some(compositeBatch))
+          val batch = t.batch()
+          // Counts up the reference count for the batch since it
+          // will be consumed by Velox aggregation task asynchronously.
+          ColumnarBatches.retain(batch)
+          inputBatchQueue.put(Some(batch))
       }
     }
 
@@ -330,17 +327,15 @@ object GlutenDeltaJobStatsTracker extends Logging {
     }
   }
 
-  private case class StatisticsInputNode(keySchema: Seq[Attribute], 
dataSchema: Seq[Attribute])
+  private case class StatisticsInputNode(dataSchema: Seq[Attribute])
     extends GlutenPlan
     with LeafExecNode {
-    override def output: Seq[Attribute] = keySchema ++ dataSchema
+    override def output: Seq[Attribute] = dataSchema
     override def batchType(): Convention.BatchType = VeloxBatchType
     override def rowType(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
       throw new UnsupportedOperationException()
-    override def outputOrdering: Seq[SortOrder] = {
-      keySchema.map(key => SortOrder(key, Ascending))
-    }
+    override def outputOrdering: Seq[SortOrder] = Seq.empty
   }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/DataGenOnly.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/DataGenOnly.scala
index deee1d008e..148a64865e 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/DataGenOnly.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/DataGenOnly.scala
@@ -21,39 +21,46 @@ import org.apache.gluten.integration.Suite
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 case class DataGenOnly(strategy: DataGenOnly.Strategy) extends Action {
-
   override def execute(suite: Suite): Boolean = {
+
     suite.sessionSwitcher.useSession("baseline", "Data Gen")
     val fs = this.fs(suite)
+    val dataPath = this.dataPath(suite)
     val markerPath = this.markerPath(suite)
 
+    def gen(): Unit = {
+      if (fs.exists(dataPath)) {
+        println(
+          s"Test data exists at $dataPath but no completion marker found. 
Regenerating."
+        )
+        fs.delete(dataPath, true)
+      }
+      if (fs.exists(markerPath)) {
+        fs.delete(markerPath, true)
+      }
+
+      println(s"Generating test data to $dataPath...")
+      val dataGen = suite.createDataGen()
+      dataGen.gen(suite.sessionSwitcher.spark())
+      println(s"All test data successfully generated at $dataPath.")
+
+      // Create marker after successful generation.
+      fs.create(markerPath, false).close()
+    }
+
     strategy match {
       case DataGenOnly.Skip =>
         ()
 
       case DataGenOnly.Once =>
-        val dataPath = this.dataPath(suite)
         if (fs.exists(dataPath) && fs.exists(markerPath)) {
           println(s"Test data already generated at $dataPath. Skipping.")
         } else {
-          if (fs.exists(dataPath)) {
-            println(
-              s"Test data exists at $dataPath but no completion marker found. 
Regenerating."
-            )
-            fs.delete(dataPath, true)
-          }
-          if (fs.exists(markerPath)) {
-            fs.delete(markerPath, true)
-          }
-          gen(suite)
-          // Create marker after successful generation.
-          fs.create(markerPath, false).close()
+          gen()
         }
 
       case DataGenOnly.Always =>
-        gen(suite)
-        // Create marker after successful generation.
-        fs.create(markerPath, false).close()
+        gen()
     }
     true
   }
@@ -68,17 +75,6 @@ case class DataGenOnly(strategy: DataGenOnly.Strategy) 
extends Action {
 
   private def dataPath(suite: Suite): Path =
     new Path(suite.dataWritePath())
-
-  private def gen(suite: Suite): Unit = {
-    val dataPath = suite.dataWritePath()
-
-    println(s"Generating test data to $dataPath...")
-
-    val dataGen = suite.createDataGen()
-    dataGen.gen(suite.sessionSwitcher.spark())
-
-    println(s"All test data successfully generated at $dataPath.")
-  }
 }
 
 object DataGenOnly {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to