This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new f423462954ec [SPARK-57088][SQL] Allow non-deterministic ranking
expression for EXACT NEAREST BY
f423462954ec is described below
commit f423462954ece0a41d67493f6ab70d7e57abefd9
Author: Zero Qu <[email protected]>
AuthorDate: Wed May 27 13:47:03 2026 +0800
[SPARK-57088][SQL] Allow non-deterministic ranking expression for EXACT
NEAREST BY
### What changes were proposed in this pull request?
Removes the `NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION`
rejection in `CheckAnalysis` so the `EXACT` mode of `NEAREST BY JOIN` (added in
SPARK-56395) accepts non-deterministic ranking expressions, the same way
`APPROX` already does.
Concretely:
- Drop the `NearestByJoin` arm in `CheckAnalysis` that failed analysis when
`approx = false` and the ranking expression was non-deterministic.
- Change `NearestByJoin.allowNonDeterministicExpression` to return `true`
unconditionally (was previously returning `approx`).
- Delete the `NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION` error
condition.
- Update scaladoc/comments in `NearestByJoin` and `RewriteNearestByJoin` to
reflect that both modes permit a non-deterministic ranking expression.
- Update the user-facing docs in `sql-ref-syntax-qry-select-join.md`.
- Convert the existing rejection tests (Scala, Python, SQL golden) to
positive tests asserting that EXACT + a non-deterministic ranking expression
now succeeds.
### Why are the changes needed?
`APPROX` vs. `EXACT` and determinism are orthogonal concerns:
- `APPROX` vs. `EXACT` is about the search algorithm contract: `APPROX`
permits the optimizer to use faster approximate strategies (e.g. indexed ANN);
`EXACT` forces brute-force evaluation.
- Determinism is a property of the ranking expression itself. Ordinary
joins, for example, accept non-deterministic join conditions without forcing
the user into an "approximate" join.
`EXACT` describes algebraic semantics ("compute the exact top-K according
to the user's ranking expression"); it does not promise reproducibility across
runs when the ranking expression is itself non-deterministic. Coupling the two
was an over-restriction that this PR removes.
### Does this PR introduce _any_ user-facing change?
Yes. Queries of the form
```sql
SELECT ... FROM left JOIN right EXACT NEAREST k BY {DISTANCE | SIMILARITY}
<non-deterministic expression>
```
previously failed at analysis with
`NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION`; they are now accepted
and evaluated through the same brute-force rewrite as the `APPROX` variant.
The error condition
`NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION` is removed.
### How was this patch tested?
- Updated `DataFrameNearestByJoinSuite`: the rejection test is converted to
a positive test asserting the result count (21/21 passing locally).
- Updated the PySpark equivalent in `test_nearest_by_join.py`.
- Updated the SQL golden file `join-nearest-by.sql` (replaced the
failing-EXACT query with a `COUNT(*)` query mirroring the existing APPROX
case); regenerated `results/` and `analyzer-results/`. `SQLQueryTestSuite -z
join-nearest-by` passes (2/2).
- `RewriteNearestByJoinSuite` (12/12) still passes — the
materializing-Project path in the optimizer rewrite already handled
non-deterministic ranking expressions; only the analyzer gate changes.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
Closes #56128 from zhidongqu-db/allow-exact-no-deter-expr.
Lead-authored-by: Zero Qu <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 2603f6a639394bc604bc8c77f4eb071df6070750)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 5 ----
docs/sql-ref-syntax-qry-select-join.md | 4 ++--
python/pyspark/sql/tests/test_nearest_by_join.py | 27 ++++++++--------------
.../sql/catalyst/analysis/CheckAnalysis.scala | 6 -----
.../catalyst/optimizer/RewriteNearestByJoin.scala | 10 ++++----
.../sql/catalyst/plans/logical/NearestByJoin.scala | 16 ++++---------
.../analyzer-results/join-nearest-by.sql.out | 25 ++++++--------------
.../resources/sql-tests/inputs/join-nearest-by.sql | 14 ++++++-----
.../sql-tests/results/join-nearest-by.sql.out | 27 +++++++---------------
.../spark/sql/DataFrameNearestByJoinSuite.scala | 25 +++++++++-----------
10 files changed, 56 insertions(+), 103 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index d29225dbcf92..51e1fadfbf40 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5412,11 +5412,6 @@
"Nearest-by join is implemented as a bounded cross-product
internally and is therefore rejected when `spark.sql.crossJoin.enabled =
false`. Set `spark.sql.crossJoin.enabled = true` to permit it, or rewrite the
query without nearest-by."
]
},
- "EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
- "message" : [
- "EXACT nearest-by join is incompatible with the nondeterministic
ranking expression <expression>. Use APPROX, or replace the expression with a
deterministic one."
- ]
- },
"NON_ORDERABLE_RANKING_EXPRESSION" : {
"message" : [
"The ranking expression <expression> of type <type> is not
orderable. Provide an expression that returns an orderable type, such as a
numeric distance like abs(a.col - b.col) or a numeric similarity score."
diff --git a/docs/sql-ref-syntax-qry-select-join.md
b/docs/sql-ref-syntax-qry-select-join.md
index a082a13707bd..646297831d1c 100644
--- a/docs/sql-ref-syntax-qry-select-join.md
+++ b/docs/sql-ref-syntax-qry-select-join.md
@@ -61,7 +61,7 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [
join_criteria | nearest_by_
`APPROX | EXACT`
- Controls the search algorithm contract. `APPROX` allows the optimizer to
use faster approximate strategies (such as indexed nearest-neighbor search when
available). `EXACT` forces brute-force evaluation and requires
`ranking_expression` to be deterministic.
+ Controls the search algorithm contract. `APPROX` allows the optimizer to
use faster approximate strategies (such as indexed nearest-neighbor search when
available). `EXACT` forces brute-force evaluation.
`num_results`
@@ -73,7 +73,7 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [
join_criteria | nearest_by_
`ranking_expression`
- A scalar expression that returns an orderable type. Must be deterministic
with `EXACT`; may be nondeterministic with `APPROX` (e.g., `rand()` for
randomized tie-breaking). The expression is evaluated once per (left, right)
pair on the brute-force path, so avoid expensive or side-effecting UDFs in
ranking expressions.
+ A scalar expression that returns an orderable type. The expression is
evaluated once per (left, right) pair on the brute-force path, so avoid
expensive or side-effecting UDFs in ranking expressions.
**Performance note.** The current implementation evaluates the full
cross-product of the left and right sides and bounds memory per left row by
`num_results`. Per-query work is `O(|left| × |right| × log num_results)`.
Index-backed approximate strategies (transparent to `APPROX` queries) are
planned in a future release; until then, pre-filter the right side (e.g. via a
subquery) when it is large.
diff --git a/python/pyspark/sql/tests/test_nearest_by_join.py
b/python/pyspark/sql/tests/test_nearest_by_join.py
index fdee3043289e..5e5236c213fb 100644
--- a/python/pyspark/sql/tests/test_nearest_by_join.py
+++ b/python/pyspark/sql/tests/test_nearest_by_join.py
@@ -217,24 +217,17 @@ class NearestByJoinTestsMixin:
messageParameters={},
)
- def test_exact_with_nondeterministic_ranking_rejected(self):
+ def test_exact_with_nondeterministic_ranking_accepted(self):
users, products = self.users, self.products
- # Use an explicit seed (`rand(0)`) so the rendered expression in the
error message is
- # byte-stable. Without it, Spark assigns a random seed at analysis and
the message
- # parameter becomes `"(rand(<random-long>) + pscore)"`, which can't be
asserted on.
- with self.assertRaises(AnalysisException) as pe:
- users.nearestByJoin(
- products,
- sf.rand(0) + products.pscore,
- numResults=1,
- mode="exact",
- direction="similarity",
- ).collect()
- self.check_error(
- exception=pe.exception,
-
errorClass="NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
- messageParameters={"expression": '"(rand(0) + pscore)"'},
- )
+ # Result rows are nondeterministic; only assert that each left row
gets exactly one match.
+ count = users.nearestByJoin(
+ products,
+ sf.rand(0) + products.pscore,
+ numResults=1,
+ mode="exact",
+ direction="similarity",
+ ).count()
+ self.assertEqual(count, 3)
def test_streaming_inputs_rejected(self):
streaming_users = (
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 9c4fbd719a96..4e07280f94c9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -701,12 +701,6 @@ trait CheckAnalysis extends LookupCatalog with
QueryErrorsBase with PlanToString
"expression" -> toSQLExpr(rankingExpression),
"type" -> toSQLType(rankingExpression.dataType)))
- case j @ NearestByJoin(_, _, _, false, _, rankingExpression, _)
- if !rankingExpression.deterministic =>
- j.failAnalysis(
- errorClass =
"NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
- messageParameters = Map("expression" ->
toSQLExpr(rankingExpression)))
-
case a: Aggregate =>
a.groupingExpressions.foreach(
expression =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala
index e920bbfffc55..ee8e61b457fa 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNearestByJoin.scala
@@ -56,11 +56,11 @@ import org.apache.spark.sql.catalyst.rules._
* preserves array order, so the K rows emitted per left row appear best-first
in the output
* of this rule. (Downstream operators may reorder.)
*
- * If `rankingExpression` is nondeterministic (legal only under `APPROX`), an
extra
- * `Project` is inserted above the `Join` to materialize the value as
`__ranking__`. The
- * standard projection machinery runs
`Nondeterministic.initialize(partitionIndex)` on every
- * nondeterministic descendant before any value is evaluated, so `MaxMinByK`
only ever sees a
- * plain `AttributeReference` and never evaluates a nondeterministic
expression directly.
+ * If `rankingExpression` is nondeterministic, an extra `Project` is inserted
above the `Join`
+ * to materialize the value as `__ranking__`. The standard projection
machinery runs
+ * `Nondeterministic.initialize(partitionIndex)` on every nondeterministic
descendant before any
+ * value is evaluated, so `MaxMinByK` only ever sees a plain
`AttributeReference` and never
+ * evaluates a nondeterministic expression directly.
*
* Unlike [[RewriteAsOfJoin]], which uses a correlated scalar subquery, this
rule materializes
* the cross product directly. A scalar subquery returns a single value per
left row, so it
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NearestByJoin.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NearestByJoin.scala
index 6a5c94d4a1df..4aaac7dfe546 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NearestByJoin.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NearestByJoin.scala
@@ -41,10 +41,8 @@ object NearestByJoin {
* @param right The right (base) side of the join, against which each left row
finds matches.
* @param joinType Must be `Inner` or `LeftOuter`. `Inner` drops left rows
with no matches;
* `LeftOuter` preserves them with `NULL` right-side columns.
- * @param approx `true` for `APPROX` mode, `false` for `EXACT` mode. `APPROX`
permits a
- * nondeterministic `rankingExpression` and is the contract
future indexed
- * approximate-nearest-neighbor strategies key off; `EXACT`
requires
- * determinism (enforced by `CheckAnalysis`).
+ * @param approx `true` for `APPROX` mode, `false` for `EXACT` mode. `APPROX`
is the contract
+ * future indexed approximate-nearest-neighbor strategies key
off.
* @param numResults The K in top-K: the maximum number of right-side matches
returned per
* left row. Bounded above by `NearestByJoin.MaxNumResults`.
* @param rankingExpression Scalar expression evaluated per (left, right)
pair. Must return
@@ -66,13 +64,9 @@ case class NearestByJoin(
require(Seq(Inner, LeftOuter).contains(joinType),
s"Unsupported nearest-by join type $joinType")
- // `APPROX` mode permits a nondeterministic ranking expression (e.g.
`rand()` for randomized
- // tie-breaking). `EXACT` mode requires determinism, and that requirement is
enforced
- // separately by the
`NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION` arm in
- // `CheckAnalysis`. Returning `approx` from this override is what lets
APPROX queries pass
- // the generic `INVALID_NON_DETERMINISTIC_EXPRESSIONS` check that fires on
operators not on
- // the analyzer's whitelist.
- override def allowNonDeterministicExpression: Boolean = approx
+ // Both APPROX and EXACT permit a nondeterministic ranking expression (e.g.
`rand()` for
+ // randomized tie-breaking, or an external scoring UDF).
+ override def allowNonDeterministicExpression: Boolean = true
// Both left- and right-side attributes are declared nullable to match the
schema produced
// by `RewriteNearestByJoin`. Right-side attributes are widened because the
rewrite
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-nearest-by.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-nearest-by.sql.out
index 48819f172310..3b7d9e55ca87 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-nearest-by.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-nearest-by.sql.out
@@ -286,25 +286,14 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
-- !query
-SELECT u.user_id, p.product
-FROM users u JOIN products p
- EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+SELECT COUNT(*) AS num_rows
+FROM (
+ SELECT u.user_id, p.product
+ FROM users u JOIN products p
+ EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+)
-- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
- "errorClass" : "NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
- "sqlState" : "42604",
- "messageParameters" : {
- "expression" : "\"(rand() + pscore)\""
- },
- "queryContext" : [ {
- "objectType" : "",
- "objectName" : "",
- "startIndex" : 42,
- "stopIndex" : 106,
- "fragment" : "JOIN products p\n EXACT NEAREST 1 BY SIMILARITY rand() +
p.pscore"
- } ]
-}
+[Analyzer test output redacted due to nondeterminism]
-- !query
diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-nearest-by.sql
b/sql/core/src/test/resources/sql-tests/inputs/join-nearest-by.sql
index 6b3dc63d28e3..40cfa87c4cde 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/join-nearest-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/join-nearest-by.sql
@@ -74,13 +74,15 @@ SELECT u.user_id, p.product
FROM users u JOIN products p
APPROX NEAREST 1 BY SIMILARITY map(u.score, p.pscore);
--- Error: EXACT mode with nondeterministic ranking expression
-SELECT u.user_id, p.product
-FROM users u JOIN products p
- EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore;
-
--- APPROX permits a nondeterministic ranking expression (per the SPIP). Rows
differ run to
+-- Both EXACT and APPROX permit a nondeterministic ranking expression. Rows
differ run to
-- run, so we only assert the row count: one match per left row when k = 1.
+SELECT COUNT(*) AS num_rows
+FROM (
+ SELECT u.user_id, p.product
+ FROM users u JOIN products p
+ EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+);
+
SELECT COUNT(*) AS num_rows
FROM (
SELECT u.user_id, p.product
diff --git
a/sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out
b/sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out
index d06fb53686e7..81803d139672 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-nearest-by.sql.out
@@ -228,27 +228,16 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
-- !query
-SELECT u.user_id, p.product
-FROM users u JOIN products p
- EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+SELECT COUNT(*) AS num_rows
+FROM (
+ SELECT u.user_id, p.product
+ FROM users u JOIN products p
+ EXACT NEAREST 1 BY SIMILARITY rand() + p.pscore
+)
-- !query schema
-struct<>
+struct<num_rows:bigint>
-- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
- "errorClass" : "NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
- "sqlState" : "42604",
- "messageParameters" : {
- "expression" : "\"(rand() + pscore)\""
- },
- "queryContext" : [ {
- "objectType" : "",
- "objectName" : "",
- "startIndex" : 42,
- "stopIndex" : 106,
- "fragment" : "JOIN products p\n EXACT NEAREST 1 BY SIMILARITY rand() +
p.pscore"
- } ]
-}
+3
-- !query
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNearestByJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNearestByJoinSuite.scala
index b34880b71f5b..271c5eb4552c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNearestByJoinSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNearestByJoinSuite.scala
@@ -425,20 +425,17 @@ class DataFrameNearestByJoinSuite extends QueryTest with
SharedSparkSession {
"type" -> "\"MAP<DOUBLE, DOUBLE>\""))
}
- test("EXACT mode rejects nondeterministic ranking expression") {
+ test("EXACT mode accepts nondeterministic ranking expression") {
val (users, products) = prepareForNearestByJoin()
- checkError(
- exception = intercept[AnalysisException] {
- users.nearestByJoin(
- products,
- rand() + products("pscore"),
- numResults = 1,
- joinType = "inner",
- mode = "exact",
- direction = "similarity")
- },
- condition = "NEAREST_BY_JOIN.EXACT_WITH_NONDETERMINISTIC_EXPRESSION",
- matchPVals = true,
- parameters = Map("expression" -> ".*rand.*pscore.*"))
+ val result = users.nearestByJoin(
+ products,
+ rand() + products("pscore"),
+ numResults = 1,
+ joinType = "inner",
+ mode = "exact",
+ direction = "similarity")
+
+ // Result rows are nondeterministic; only assert that each left row gets
exactly one match.
+ assert(result.count() === 3)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]