This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new db984c1a [AURON #1898] Fix NULL handling for NOT IN subquery (#1899)
db984c1a is described below
commit db984c1a1ff0d4c40215bc96ca86f2ea4a8a59a6
Author: Thomas <[email protected]>
AuthorDate: Fri Jan 16 15:30:10 2026 +0800
[AURON #1898] Fix NULL handling for NOT IN subquery (#1899)
# Which issue does this PR close?
Closes #1898
# Rationale for this change
# What changes are included in this PR?
# Are there any user-facing changes?
# How was this patch tested?
---
.../datafusion-ext-plans/src/joins/bhj/semi_join.rs | 9 ++++++++-
.../test/scala/org/apache/auron/AuronQuerySuite.scala | 17 +++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
index 1018b72a..402f54fa 100644
--- a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
@@ -188,6 +188,13 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
let _probed_side_compare_timer = probed_side_compare_time.timer();
let mut hashes_idx = 0;
+ // Whether the build side contains any NULL join keys
+ let build_has_null_keys = if map.data_batch().num_rows() == 0 {
+ false
+ } else {
+ map.key_columns().iter().any(|col| col.null_count() > 0)
+ };
+
for row_idx in 0..probed_batch.num_rows() {
let key_is_valid = probed_valids
.as_ref()
@@ -195,7 +202,7 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
.unwrap_or(true);
if P.mode == Anti
&& P.probe_is_join_side
- && !key_is_valid
+ && (!key_is_valid || build_has_null_keys) // Filter if probe
row is NULL or build side has any NULL
&& self.join_params.is_null_aware_anti_join
{
probed_joined.set(row_idx, true);
diff --git
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
index 8fe2a3e3..da335fc9 100644
---
a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
+++
b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala
@@ -18,6 +18,7 @@ package org.apache.auron
import org.apache.spark.sql.{AuronQueryTest, Row}
import org.apache.spark.sql.auron.AuronConf
+import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec
import org.apache.auron.util.AuronTestUtils
@@ -662,4 +663,20 @@ class AuronQuerySuite extends AuronQueryTest with
BaseAuronSQLSuite with AuronSQ
checkSparkAnswer(query)
}
}
+
+ test("NOT IN subquery with NULL values") {
+ val row = identity[(java.lang.Integer, java.lang.Integer)] _
+ Seq(row((1, 1)), row((2, 2)), row((3, null)))
+ .toDF("a", "b")
+ .createOrReplaceTempView("tbl")
+ val df = checkSparkAnswer("select * from tbl where a not in (select b from
tbl)")
+
+ // Spark 3.0: NOT IN subquery is converted to BroadcastNestedLoopJoinExec,
and falls back due to unsupported join condition
+ if (AuronTestUtils.isSparkV31OrGreater) {
+ assert(collectFirst(df.queryExecution.executedPlan) { case bhj:
NativeBroadcastJoinExec =>
+ assert(bhj.isNullAwareAntiJoin)
+ bhj
+ }.isDefined)
+ }
+ }
}