This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 74d2f0d13ac5 [SPARK-52407][SQL][FOLLOW-UP] Remove Theta Sketch
aggregation buffer re-wrapping
74d2f0d13ac5 is described below
commit 74d2f0d13ac58a9aab5feedb826362fb1eef49ce
Author: Chris Boumalhab <[email protected]>
AuthorDate: Wed Feb 11 13:09:49 2026 -0800
[SPARK-52407][SQL][FOLLOW-UP] Remove Theta Sketch aggregation buffer
re-wrapping
### What changes were proposed in this pull request?
The current Theta sketch update and merge functions from
`TypedImperativeAggregate` unnecessarily re-wrap the aggregation buffer with
one of the ThetaSketchState case classes. Since changes to the buffer are
mutable, we can avoid this re-wrap entirely.
### Why are the changes needed?
Better engineering practice, small optimization.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
re-ran the SQLQueryTestSuite with `SPARK_GENERATE_GOLDEN_FILES=1`, no
impact as expected.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53984 from cboumalh/cboumalh-theta-merge-followup.
Authored-by: Chris Boumalhab <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
(cherry picked from commit 6112a0bfc4818a161bc0f69bcb7a5351fa9ba65a)
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../aggregate/thetasketchesAggregates.scala | 24 +++++++++++-----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
index a14df39bf822..0f148d03cd70 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
@@ -219,7 +219,7 @@ case class ThetaSketchAgg(
messageParameters = Map("dataType" -> left.dataType.toString))
}
- UpdatableSketchBuffer(sketch)
+ updateBuffer
}
/**
@@ -246,13 +246,13 @@ case class ThetaSketchAgg(
// Reuse the existing union in the next iteration. This is the most
efficient path.
case (UnionAggregationBuffer(existingUnion),
UpdatableSketchBuffer(sketch)) =>
existingUnion.union(sketch.compact)
- UnionAggregationBuffer(existingUnion)
+ updateBuffer
case (UnionAggregationBuffer(existingUnion), FinalizedSketch(sketch)) =>
existingUnion.union(sketch)
- UnionAggregationBuffer(existingUnion)
+ updateBuffer
case (UnionAggregationBuffer(union1), UnionAggregationBuffer(union2)) =>
union1.union(union2.getResult)
- UnionAggregationBuffer(union1)
+ updateBuffer
// Create a new union only when necessary.
case (UpdatableSketchBuffer(sketch1), UpdatableSketchBuffer(sketch2)) =>
createUnionWith(sketch1.compact, sketch2.compact)
@@ -420,7 +420,7 @@ case class ThetaUnionAgg(
case _ => throw
QueryExecutionErrors.thetaInvalidInputSketchBuffer(prettyName)
}
union.union(inputSketch)
- UnionAggregationBuffer(union)
+ unionBuffer
}
/**
@@ -436,11 +436,11 @@ case class ThetaUnionAgg(
// If both arguments are union objects, merge them directly.
case (UnionAggregationBuffer(unionLeft),
UnionAggregationBuffer(unionRight)) =>
unionLeft.union(unionRight.getResult)
- UnionAggregationBuffer(unionLeft)
+ unionBuffer
// The input was serialized then deserialized.
case (UnionAggregationBuffer(union), FinalizedSketch(sketch)) =>
union.union(sketch)
- UnionAggregationBuffer(union)
+ unionBuffer
// The program should never make it here, the cases are for defensive
programming.
case (FinalizedSketch(sketch1), FinalizedSketch(sketch2)) =>
val union =
SetOperation.builder.setLogNominalEntries(lgNomEntries).buildUnion
@@ -449,7 +449,7 @@ case class ThetaUnionAgg(
UnionAggregationBuffer(union)
case (FinalizedSketch(sketch), UnionAggregationBuffer(union)) =>
union.union(sketch)
- UnionAggregationBuffer(union)
+ input
case _ => throw
QueryExecutionErrors.thetaInvalidInputSketchBuffer(prettyName)
}
}
@@ -582,7 +582,7 @@ case class ThetaIntersectionAgg(
case _ => throw
QueryExecutionErrors.thetaInvalidInputSketchBuffer(prettyName)
}
intersection.intersect(inputSketch)
- IntersectionAggregationBuffer(intersection)
+ intersectionBuffer
}
/**
@@ -603,11 +603,11 @@ case class ThetaIntersectionAgg(
IntersectionAggregationBuffer(intersectLeft),
IntersectionAggregationBuffer(intersectRight)) =>
intersectLeft.intersect(intersectRight.getResult)
- IntersectionAggregationBuffer(intersectLeft)
+ intersectionBuffer
// The input was serialized then deserialized.
case (IntersectionAggregationBuffer(intersection),
FinalizedSketch(sketch)) =>
intersection.intersect(sketch)
- IntersectionAggregationBuffer(intersection)
+ intersectionBuffer
// The program should never make it here, the cases are for defensive
programming.
case (FinalizedSketch(sketch1), FinalizedSketch(sketch2)) =>
val intersection =
@@ -617,7 +617,7 @@ case class ThetaIntersectionAgg(
IntersectionAggregationBuffer(intersection)
case (FinalizedSketch(sketch),
IntersectionAggregationBuffer(intersection)) =>
intersection.intersect(sketch)
- IntersectionAggregationBuffer(intersection)
+ input
case _ => throw
QueryExecutionErrors.thetaInvalidInputSketchBuffer(prettyName)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]