This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 074fb66edcda [SPARK-54138][SQL] Enforce constant configuration
parameter for Theta and HLL
074fb66edcda is described below
commit 074fb66edcda838dd853b44d64547feb1c61714c
Author: Chris Boumalhab <[email protected]>
AuthorDate: Mon Nov 3 12:08:39 2025 -0800
[SPARK-54138][SQL] Enforce constant configuration parameter for Theta and
HLL
### What changes were proposed in this pull request?
This PR enforces that the sketch configuration parameter
(lgConfigK/lgNomEntries) in both HllSketchAgg and ThetaSketchAgg must be a
constant value.
If the parameter expression (right) is not foldable, a
QueryExecutionErrors.*MustBeConstantError(prettyName) is thrown.
This change ensures that the aggregation configuration is validated at
analysis time rather than runtime, preventing inconsistent or invalid sketch
behavior.
### Why are the changes needed?
The configuration parameter determines the accuracy and memory footprint of
the sketch.
Allowing it to vary dynamically at runtime could lead to nondeterministic
aggregation results and incorrect computations.
By enforcing it as a constant expression, we ensure deterministic behavior,
predictable memory use, and alignment with the expected semantics of
sketch-based aggregations.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added tests in SQLQueryTestSuite
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52836 from cboumalh/hll-theta-check-foldable-param.
Authored-by: Chris Boumalhab <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
(cherry picked from commit be6ab9cfafbe3b239c2bde4c48d434cfee03f876)
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 12 +++++
.../aggregate/datasketchesAggregates.scala | 3 ++
.../aggregate/thetasketchesAggregates.scala | 6 +++
.../spark/sql/errors/QueryExecutionErrors.scala | 12 +++++
.../sql-tests/analyzer-results/hll.sql.out | 43 +++++++++++++++
.../sql-tests/analyzer-results/thetasketch.sql.out | 43 +++++++++++++++
.../src/test/resources/sql-tests/inputs/hll.sql | 9 ++++
.../resources/sql-tests/inputs/thetasketch.sql | 12 +++++
.../test/resources/sql-tests/results/hll.sql.out | 62 ++++++++++++++++++++++
.../sql-tests/results/thetasketch.sql.out | 62 ++++++++++++++++++++++
10 files changed, 264 insertions(+)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 9d95d74cc21a..a34ceb9f1145 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1972,6 +1972,12 @@
],
"sqlState" : "22546"
},
+ "HLL_K_MUST_BE_CONSTANT" : {
+ "message" : [
+ "Invalid call to <function>; the `K` value must be a constant value, but
got a non-constant expression."
+ ],
+ "sqlState" : "42K0E"
+ },
"HLL_UNION_DIFFERENT_LG_K" : {
"message" : [
"Sketches have different `lgConfigK` values: <left> and <right>. Set the
`allowDifferentLgConfigK` parameter to true to call <function> with different
`lgConfigK` values."
@@ -5757,6 +5763,12 @@
],
"sqlState" : "22546"
},
+ "THETA_LG_NOM_ENTRIES_MUST_BE_CONSTANT" : {
+ "message" : [
+ "Invalid call to <function>; the `lgNomEntries` value must be a constant
value, but got a non-constant expression."
+ ],
+ "sqlState" : "42K0E"
+ },
"TRAILING_COMMA_IN_SELECT" : {
"message" : [
"Trailing comma detected in SELECT clause. Remove the trailing comma
before the FROM clause."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
index cfcb53769e25..8ae9b8fddde7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
@@ -66,6 +66,9 @@ case class HllSketchAgg(
// Hllsketch config - mark as lazy so that they're not evaluated during tree
transformation.
lazy val lgConfigK: Int = {
+ if (!right.foldable) {
+ throw QueryExecutionErrors.hllKMustBeConstantError(prettyName)
+ }
val lgConfigK = right.eval().asInstanceOf[Int]
HllSketchAgg.checkLgK(lgConfigK)
lgConfigK
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
index 7e55c006782c..f841632b1462 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/thetasketchesAggregates.scala
@@ -94,6 +94,9 @@ case class ThetaSketchAgg(
// ThetaSketch config - mark as lazy so that they're not evaluated during
tree transformation.
lazy val lgNomEntries: Int = {
+ if (!right.foldable) {
+ throw
QueryExecutionErrors.thetaLgNomEntriesMustBeConstantError(prettyName)
+ }
val lgNomEntriesInput = right.eval().asInstanceOf[Int]
ThetaSketchUtils.checkLgNomLongs(lgNomEntriesInput, prettyName)
lgNomEntriesInput
@@ -332,6 +335,9 @@ case class ThetaUnionAgg(
// ThetaSketch config - mark as lazy so that they're not evaluated during
tree transformation.
lazy val lgNomEntries: Int = {
+ if (!right.foldable) {
+ throw
QueryExecutionErrors.thetaLgNomEntriesMustBeConstantError(prettyName)
+ }
val lgNomEntriesInput = right.eval().asInstanceOf[Int]
ThetaSketchUtils.checkLgNomLongs(lgNomEntriesInput, prettyName)
lgNomEntriesInput
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 5f5e1da47184..55dcea57ff32 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2804,6 +2804,12 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
"value" -> toSQLValue(value, IntegerType)))
}
+ def hllKMustBeConstantError(function: String): Throwable = {
+ new SparkRuntimeException(
+ errorClass = "HLL_K_MUST_BE_CONSTANT",
+ messageParameters = Map("function" -> toSQLId(function)))
+ }
+
def hllInvalidInputSketchBuffer(function: String): Throwable = {
new SparkRuntimeException(
errorClass = "HLL_INVALID_INPUT_SKETCH_BUFFER",
@@ -3169,4 +3175,10 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
"max" -> toSQLValue(max, IntegerType),
"value" -> toSQLValue(value, IntegerType)))
}
+
+ def thetaLgNomEntriesMustBeConstantError(function: String): Throwable = {
+ new SparkRuntimeException(
+ errorClass = "THETA_LG_NOM_ENTRIES_MUST_BE_CONSTANT",
+ messageParameters = Map("function" -> toSQLId(function)))
+ }
}
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out
index 167c8f930d25..291f071ef06c 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/hll.sql.out
@@ -233,6 +233,49 @@ Aggregate [hll_sketch_agg(col#x, 40, 0, 0) AS
hll_sketch_agg(col, 40)#x]
+- LocalRelation [col#x]
+-- !query
+SELECT hll_sketch_agg(col, CAST(NULL AS INT)) AS k_is_null
+FROM VALUES (15), (16), (17) tab(col)
+-- !query analysis
+Aggregate [hll_sketch_agg(col#x, cast(null as int), 0, 0) AS k_is_null#x]
++- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_agg(col, CAST(col AS INT)) AS k_non_constant
+FROM VALUES (15), (16), (17) tab(col)
+-- !query analysis
+Aggregate [hll_sketch_agg(col#x, cast(col#x as int), 0, 0) AS k_non_constant#x]
++- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT hll_sketch_agg(col, '15')
+FROM VALUES (50), (60), (60) tab(col)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"15\"",
+ "inputType" : "\"STRING\"",
+ "paramIndex" : "second",
+ "requiredType" : "\"INT\"",
+ "sqlExpr" : "\"hll_sketch_agg(col, 15)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 32,
+ "fragment" : "hll_sketch_agg(col, '15')"
+ } ]
+}
+
+
-- !query
SELECT hll_union(
hll_sketch_agg(col1, 12),
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/thetasketch.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/thetasketch.sql.out
index 323084223d4b..84fb8086151d 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/thetasketch.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/thetasketch.sql.out
@@ -1068,6 +1068,49 @@ Aggregate [theta_union_agg(sketch#x, 27, 0, 0) AS
theta_union_agg(sketch, 27)#x]
+- LocalRelation [col#x]
+-- !query
+SELECT theta_sketch_agg(col, CAST(NULL AS INT)) AS lg_nom_entries_is_null
+FROM VALUES (15), (16), (17) tab(col)
+-- !query analysis
+Aggregate [theta_sketch_agg(col#x, cast(null as int), 0, 0) AS
lg_nom_entries_is_null#x]
++- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT theta_sketch_agg(col, CAST(col AS INT)) AS lg_nom_entries_non_constant
+FROM VALUES (15), (16), (17) tab(col)
+-- !query analysis
+Aggregate [theta_sketch_agg(col#x, cast(col#x as int), 0, 0) AS
lg_nom_entries_non_constant#x]
++- SubqueryAlias tab
+ +- LocalRelation [col#x]
+
+
+-- !query
+SELECT theta_sketch_agg(col, '15')
+FROM VALUES (50), (60), (60) tab(col)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"15\"",
+ "inputType" : "\"STRING\"",
+ "paramIndex" : "second",
+ "requiredType" : "\"INT\"",
+ "sqlExpr" : "\"theta_sketch_agg(col, 15)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 34,
+ "fragment" : "theta_sketch_agg(col, '15')"
+ } ]
+}
+
+
-- !query
SELECT theta_union(1, 2)
FROM VALUES
diff --git a/sql/core/src/test/resources/sql-tests/inputs/hll.sql
b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
index fbd82b936b77..35128da97fd6 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/hll.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/hll.sql
@@ -69,6 +69,15 @@ FROM VALUES (50), (60), (60) tab(col);
SELECT hll_sketch_agg(col, 40)
FROM VALUES (50), (60), (60) tab(col);
+SELECT hll_sketch_agg(col, CAST(NULL AS INT)) AS k_is_null
+FROM VALUES (15), (16), (17) tab(col);
+
+SELECT hll_sketch_agg(col, CAST(col AS INT)) AS k_non_constant
+FROM VALUES (15), (16), (17) tab(col);
+
+SELECT hll_sketch_agg(col, '15')
+FROM VALUES (50), (60), (60) tab(col);
+
SELECT hll_union(
hll_sketch_agg(col1, 12),
hll_sketch_agg(col2, 13))
diff --git a/sql/core/src/test/resources/sql-tests/inputs/thetasketch.sql
b/sql/core/src/test/resources/sql-tests/inputs/thetasketch.sql
index d270442b5049..4782d2017f2a 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/thetasketch.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/thetasketch.sql
@@ -457,6 +457,18 @@ FROM (SELECT theta_sketch_agg(col, 12) as sketch
SELECT theta_sketch_agg(col, 20) as sketch
FROM VALUES (1) AS tab(col));
+-- lgNomEntries parameter is NULL
+SELECT theta_sketch_agg(col, CAST(NULL AS INT)) AS lg_nom_entries_is_null
+FROM VALUES (15), (16), (17) tab(col);
+
+-- lgNomEntries parameter is not foldable (non-constant)
+SELECT theta_sketch_agg(col, CAST(col AS INT)) AS lg_nom_entries_non_constant
+FROM VALUES (15), (16), (17) tab(col);
+
+-- lgNomEntries parameter has wrong type (STRING instead of INT)
+SELECT theta_sketch_agg(col, '15')
+FROM VALUES (50), (60), (60) tab(col);
+
-- Test theta_union with integers (1, 2) instead of binary sketch data -
should fail
SELECT theta_union(1, 2)
FROM VALUES
diff --git a/sql/core/src/test/resources/sql-tests/results/hll.sql.out
b/sql/core/src/test/resources/sql-tests/results/hll.sql.out
index ecdfcbcc791a..908221f0e7c4 100644
--- a/sql/core/src/test/resources/sql-tests/results/hll.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/hll.sql.out
@@ -256,6 +256,68 @@ org.apache.spark.SparkRuntimeException
}
+-- !query
+SELECT hll_sketch_agg(col, CAST(NULL AS INT)) AS k_is_null
+FROM VALUES (15), (16), (17) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_INVALID_LG_K",
+ "sqlState" : "22546",
+ "messageParameters" : {
+ "function" : "`hll_sketch_agg`",
+ "max" : "21",
+ "min" : "4",
+ "value" : "0"
+ }
+}
+
+
+-- !query
+SELECT hll_sketch_agg(col, CAST(col AS INT)) AS k_non_constant
+FROM VALUES (15), (16), (17) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "HLL_K_MUST_BE_CONSTANT",
+ "sqlState" : "42K0E",
+ "messageParameters" : {
+ "function" : "`hll_sketch_agg`"
+ }
+}
+
+
+-- !query
+SELECT hll_sketch_agg(col, '15')
+FROM VALUES (50), (60), (60) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"15\"",
+ "inputType" : "\"STRING\"",
+ "paramIndex" : "second",
+ "requiredType" : "\"INT\"",
+ "sqlExpr" : "\"hll_sketch_agg(col, 15)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 32,
+ "fragment" : "hll_sketch_agg(col, '15')"
+ } ]
+}
+
+
-- !query
SELECT hll_union(
hll_sketch_agg(col1, 12),
diff --git a/sql/core/src/test/resources/sql-tests/results/thetasketch.sql.out
b/sql/core/src/test/resources/sql-tests/results/thetasketch.sql.out
index 95c6e28a8c42..685a268b434d 100644
--- a/sql/core/src/test/resources/sql-tests/results/thetasketch.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/thetasketch.sql.out
@@ -976,6 +976,68 @@ org.apache.spark.SparkRuntimeException
}
+-- !query
+SELECT theta_sketch_agg(col, CAST(NULL AS INT)) AS lg_nom_entries_is_null
+FROM VALUES (15), (16), (17) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "THETA_INVALID_LG_NOM_ENTRIES",
+ "sqlState" : "22546",
+ "messageParameters" : {
+ "function" : "`theta_sketch_agg`",
+ "max" : "26",
+ "min" : "4",
+ "value" : "0"
+ }
+}
+
+
+-- !query
+SELECT theta_sketch_agg(col, CAST(col AS INT)) AS lg_nom_entries_non_constant
+FROM VALUES (15), (16), (17) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.SparkRuntimeException
+{
+ "errorClass" : "THETA_LG_NOM_ENTRIES_MUST_BE_CONSTANT",
+ "sqlState" : "42K0E",
+ "messageParameters" : {
+ "function" : "`theta_sketch_agg`"
+ }
+}
+
+
+-- !query
+SELECT theta_sketch_agg(col, '15')
+FROM VALUES (50), (60), (60) tab(col)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+ "sqlState" : "42K09",
+ "messageParameters" : {
+ "inputSql" : "\"15\"",
+ "inputType" : "\"STRING\"",
+ "paramIndex" : "second",
+ "requiredType" : "\"INT\"",
+ "sqlExpr" : "\"theta_sketch_agg(col, 15)\""
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 8,
+ "stopIndex" : 34,
+ "fragment" : "theta_sketch_agg(col, '15')"
+ } ]
+}
+
+
-- !query
SELECT theta_union(1, 2)
FROM VALUES
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]