This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new bc37334384 [GLUTEN-10215][VL] Remove streaming aggregation workaround
in Delta stats writer (#12222)
bc37334384 is described below
commit bc373343841920618d77bd7415b4658b7c0417cb
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jun 5 11:13:22 2026 +0100
[GLUTEN-10215][VL] Remove streaming aggregation workaround in Delta stats
writer (#12222)
---
.../delta/stats/GlutenDeltaJobStatsTracker.scala | 65 ++++++++++------------
.../delta/stats/GlutenDeltaJobStatsTracker.scala | 65 ++++++++++------------
.../gluten/integration/action/DataGenOnly.scala | 52 ++++++++---------
3 files changed, 84 insertions(+), 98 deletions(-)
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
index 70ca536792..2ea7b9e554 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.stats
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.backendsapi.velox.VeloxBatchType
-import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
+import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._
import org.apache.gluten.expression.{ConverterUtils, TransformerState}
@@ -28,24 +28,22 @@ import
org.apache.gluten.extension.columnar.rewrite.PullOutPreProject
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
import org.apache.gluten.iterator.Iterators
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.PlanBuilder
-import org.apache.gluten.vectorized.{ArrowWritableColumnVector,
ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator}
+import org.apache.gluten.vectorized.{ColumnarBatchInIterator,
ColumnarBatchOutIterator, NativePlanEvaluator}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
AttributeReference, EmptyRow, Expression, Projection, SortOrder,
SpecificInternalRow, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow,
Expression, Projection, SortOrder, SpecificInternalRow, UnsafeProjection}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Complete, DeclarativeAggregate}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
LeafExecNode, ProjectExec}
import org.apache.spark.sql.execution.aggregate.SortAggregateExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker,
WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker}
import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.types.{IntegerType, StructType}
-import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil}
import com.google.common.collect.Lists
@@ -144,25 +142,19 @@ object GlutenDeltaJobStatsTracker extends Logging {
inputSchema = aggBufferAttrs
)
private val taskContext = TaskContext.get()
- private val dummyKeyAttr = {
- // FIXME: We have to force the use of Velox's streaming aggregation
since hash aggregation
- // doesn't support task barriers. But as streaming aggregation should
always be keyed, we
- // have to do a small hack here by adding a dummy key for the global
aggregation.
- AttributeReference("__GLUTEN_DELTA_DUMMY_KEY__", IntegerType)()
- }
private val statsAttrs =
aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
private val statsResultAttrs =
aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
private val veloxAggTask: ColumnarBatchOutIterator = {
- val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols)
+ val inputNode = StatisticsInputNode(dataCols)
val aggOp = SortAggregateExec(
None,
isStreaming = false,
None,
- Seq(dummyKeyAttr),
+ Seq.empty,
aggregates,
statsAttrs,
0,
- dummyKeyAttr +: statsResultAttrs,
+ statsResultAttrs,
inputNode
)
val projOp = ProjectExec(statsResultAttrs, aggOp)
@@ -246,9 +238,23 @@ object GlutenDeltaJobStatsTracker extends Logging {
}
override def closeFile(filePath: String): Unit = {
+ def signalEoS(): Unit = {
+ while (true) {
+ if (currentJsonFuture.isDone) {
+ currentJsonFuture.get()
+ // The future should be an error since we haven't signaled EoS yet.
+ throw new IllegalStateException("Unreachable code.")
+ }
+ val queued = inputBatchQueue.offer(None, 500,
java.util.concurrent.TimeUnit.MILLISECONDS)
+ if (queued) {
+ // The future should be done after we signal EoS.
+ return
+ }
+ }
+ }
assert(filePath == currentPath)
+ signalEoS()
val fileName = new Path(filePath).getName
- inputBatchQueue.put(None)
val json = currentJsonFuture.get()
resultJsonMap(fileName) = json
currentPath = null
@@ -259,20 +265,11 @@ object GlutenDeltaJobStatsTracker extends Logging {
row match {
case _: PlaceholderRow =>
case t: TerminalRow =>
- val valueBatch = t.batch()
- val numRows = valueBatch.numRows()
- val dummyKeyVec = ArrowWritableColumnVector
- .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name,
IntegerType))
- .head
- (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
- val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
- ColumnarBatches.offload(
- ArrowBufferAllocators.contextInstance(),
- new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
- val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch,
valueBatch)
- dummyKeyBatch.close()
- valueBatch.close()
- inputBatchQueue.put(Some(compositeBatch))
+ val batch = t.batch()
+ // Counts up the reference count for the batch since it
+ // will be consumed by Velox aggregation task asynchronously.
+ ColumnarBatches.retain(batch)
+ inputBatchQueue.put(Some(batch))
}
}
@@ -326,17 +323,15 @@ object GlutenDeltaJobStatsTracker extends Logging {
}
}
- private case class StatisticsInputNode(keySchema: Seq[Attribute],
dataSchema: Seq[Attribute])
+ private case class StatisticsInputNode(dataSchema: Seq[Attribute])
extends GlutenPlan
with LeafExecNode {
- override def output: Seq[Attribute] = keySchema ++ dataSchema
+ override def output: Seq[Attribute] = dataSchema
override def batchType(): Convention.BatchType = VeloxBatchType
override def rowType(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
throw new UnsupportedOperationException()
- override def outputOrdering: Seq[SortOrder] = {
- keySchema.map(key => SortOrder(key, Ascending))
- }
+ override def outputOrdering: Seq[SortOrder] = Seq.empty
}
}
diff --git
a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
index d046263b59..ca6c7a6a7f 100644
---
a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
+++
b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/stats/GlutenDeltaJobStatsTracker.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.stats
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.backendsapi.velox.VeloxBatchType
-import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
+import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._
import org.apache.gluten.expression.{ConverterUtils, TransformerState}
@@ -28,24 +28,22 @@ import
org.apache.gluten.extension.columnar.rewrite.PullOutPreProject
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
import org.apache.gluten.iterator.Iterators
-import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.plan.PlanBuilder
-import org.apache.gluten.vectorized.{ArrowWritableColumnVector,
ColumnarBatchInIterator, ColumnarBatchOutIterator, NativePlanEvaluator}
+import org.apache.gluten.vectorized.{ColumnarBatchInIterator,
ColumnarBatchOutIterator, NativePlanEvaluator}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
AttributeReference, BindReferences, EmptyRow, Expression, RuntimeReplaceable,
SortOrder, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, BindReferences,
EmptyRow, Expression, RuntimeReplaceable, SortOrder, SpecificInternalRow}
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Complete, DeclarativeAggregate}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
LeafExecNode, ProjectExec}
import org.apache.spark.sql.execution.aggregate.SortAggregateExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker,
WriteJobStatsTracker, WriteTaskStats, WriteTaskStatsTracker}
import org.apache.spark.sql.execution.metric.SQLMetric
-import org.apache.spark.sql.types.{IntegerType, StructType}
-import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, SparkDirectoryUtil}
import com.google.common.collect.Lists
@@ -148,25 +146,19 @@ object GlutenDeltaJobStatsTracker extends Logging {
private val getStatsExpr: Expression =
BindReferences.bindReference(normalizedResultExpr, aggBufferAttrs)
private val taskContext = TaskContext.get()
- private val dummyKeyAttr = {
- // FIXME: We have to force the use of Velox's streaming aggregation
since hash aggregation
- // doesn't support task barriers. But as streaming aggregation should
always be keyed, we
- // have to do a small hack here by adding a dummy key for the global
aggregation.
- AttributeReference("__GLUTEN_DELTA_DUMMY_KEY__", IntegerType)()
- }
private val statsAttrs =
aggregates.flatMap(_.aggregateFunction.aggBufferAttributes)
private val statsResultAttrs =
aggregates.flatMap(_.aggregateFunction.inputAggBufferAttributes)
private val veloxAggTask: ColumnarBatchOutIterator = {
- val inputNode = StatisticsInputNode(Seq(dummyKeyAttr), dataCols)
+ val inputNode = StatisticsInputNode(dataCols)
val aggOp = SortAggregateExec(
None,
isStreaming = false,
None,
- Seq(dummyKeyAttr),
+ Seq.empty,
aggregates,
statsAttrs,
0,
- dummyKeyAttr +: statsResultAttrs,
+ statsResultAttrs,
inputNode
)
val projOp = ProjectExec(statsResultAttrs, aggOp)
@@ -250,9 +242,23 @@ object GlutenDeltaJobStatsTracker extends Logging {
}
override def closeFile(filePath: String): Unit = {
+ def signalEoS(): Unit = {
+ while (true) {
+ if (currentJsonFuture.isDone) {
+ currentJsonFuture.get()
+ // The future should be an error since we haven't signaled EoS yet.
+ throw new IllegalStateException("Unreachable code.")
+ }
+ val queued = inputBatchQueue.offer(None, 500,
java.util.concurrent.TimeUnit.MILLISECONDS)
+ if (queued) {
+ // The future should be done after we signal EoS.
+ return
+ }
+ }
+ }
assert(filePath == currentPath)
+ signalEoS()
val fileName = new Path(filePath).getName
- inputBatchQueue.put(None)
val json = currentJsonFuture.get()
resultJsonMap(fileName) = json
currentPath = null
@@ -263,20 +269,11 @@ object GlutenDeltaJobStatsTracker extends Logging {
row match {
case _: PlaceholderRow =>
case t: TerminalRow =>
- val valueBatch = t.batch()
- val numRows = valueBatch.numRows()
- val dummyKeyVec = ArrowWritableColumnVector
- .allocateColumns(numRows, new StructType().add(dummyKeyAttr.name,
IntegerType))
- .head
- (0 until numRows).foreach(i => dummyKeyVec.putInt(i, 1))
- val dummyKeyBatch = VeloxColumnarBatches.toVeloxBatch(
- ColumnarBatches.offload(
- ArrowBufferAllocators.contextInstance(),
- new ColumnarBatch(Array[ColumnVector](dummyKeyVec), numRows)))
- val compositeBatch = VeloxColumnarBatches.compose(dummyKeyBatch,
valueBatch)
- dummyKeyBatch.close()
- valueBatch.close()
- inputBatchQueue.put(Some(compositeBatch))
+ val batch = t.batch()
+ // Counts up the reference count for the batch since it
+ // will be consumed by Velox aggregation task asynchronously.
+ ColumnarBatches.retain(batch)
+ inputBatchQueue.put(Some(batch))
}
}
@@ -330,17 +327,15 @@ object GlutenDeltaJobStatsTracker extends Logging {
}
}
- private case class StatisticsInputNode(keySchema: Seq[Attribute],
dataSchema: Seq[Attribute])
+ private case class StatisticsInputNode(dataSchema: Seq[Attribute])
extends GlutenPlan
with LeafExecNode {
- override def output: Seq[Attribute] = keySchema ++ dataSchema
+ override def output: Seq[Attribute] = dataSchema
override def batchType(): Convention.BatchType = VeloxBatchType
override def rowType(): Convention.RowType = Convention.RowType.None
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
throw new UnsupportedOperationException()
- override def outputOrdering: Seq[SortOrder] = {
- keySchema.map(key => SortOrder(key, Ascending))
- }
+ override def outputOrdering: Seq[SortOrder] = Seq.empty
}
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/DataGenOnly.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/DataGenOnly.scala
index deee1d008e..148a64865e 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/DataGenOnly.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/action/DataGenOnly.scala
@@ -21,39 +21,46 @@ import org.apache.gluten.integration.Suite
import org.apache.hadoop.fs.{FileSystem, Path}
case class DataGenOnly(strategy: DataGenOnly.Strategy) extends Action {
-
override def execute(suite: Suite): Boolean = {
+
suite.sessionSwitcher.useSession("baseline", "Data Gen")
val fs = this.fs(suite)
+ val dataPath = this.dataPath(suite)
val markerPath = this.markerPath(suite)
+ def gen(): Unit = {
+ if (fs.exists(dataPath)) {
+ println(
+ s"Test data exists at $dataPath but no completion marker found.
Regenerating."
+ )
+ fs.delete(dataPath, true)
+ }
+ if (fs.exists(markerPath)) {
+ fs.delete(markerPath, true)
+ }
+
+ println(s"Generating test data to $dataPath...")
+ val dataGen = suite.createDataGen()
+ dataGen.gen(suite.sessionSwitcher.spark())
+ println(s"All test data successfully generated at $dataPath.")
+
+ // Create marker after successful generation.
+ fs.create(markerPath, false).close()
+ }
+
strategy match {
case DataGenOnly.Skip =>
()
case DataGenOnly.Once =>
- val dataPath = this.dataPath(suite)
if (fs.exists(dataPath) && fs.exists(markerPath)) {
println(s"Test data already generated at $dataPath. Skipping.")
} else {
- if (fs.exists(dataPath)) {
- println(
- s"Test data exists at $dataPath but no completion marker found.
Regenerating."
- )
- fs.delete(dataPath, true)
- }
- if (fs.exists(markerPath)) {
- fs.delete(markerPath, true)
- }
- gen(suite)
- // Create marker after successful generation.
- fs.create(markerPath, false).close()
+ gen()
}
case DataGenOnly.Always =>
- gen(suite)
- // Create marker after successful generation.
- fs.create(markerPath, false).close()
+ gen()
}
true
}
@@ -68,17 +75,6 @@ case class DataGenOnly(strategy: DataGenOnly.Strategy)
extends Action {
private def dataPath(suite: Suite): Path =
new Path(suite.dataWritePath())
-
- private def gen(suite: Suite): Unit = {
- val dataPath = suite.dataWritePath()
-
- println(s"Generating test data to $dataPath...")
-
- val dataGen = suite.createDataGen()
- dataGen.gen(suite.sessionSwitcher.spark())
-
- println(s"All test data successfully generated at $dataPath.")
- }
}
object DataGenOnly {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]