This is an automated email from the ASF dual-hosted git repository.

richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 283b5da9 [AURON #1792] Keep the null result in the reverse connection 
result (#1793)
283b5da9 is described below

commit 283b5da92caae7588456201dd75057e3b1780c5b
Author: Graceful <[email protected]>
AuthorDate: Tue Dec 30 19:27:41 2025 +0800

    [AURON #1792] Keep the null result in the reverse connection result (#1793)
    
    <!--
    - Start the PR title with the related issue ID, e.g. '[AURON #XXXX]
    Short summary...'.
    -->
    # Which issue does this PR close?
    
    Closes #1792
    
    # Rationale for this change
    Keep the null result in the reverse connection result
    # What changes are included in this PR?
    
    # Are there any user-facing changes?
    
    # How was this patch tested?
    cluster test
    
    ---------
    
    Co-authored-by: duanhao-jk <[email protected]>
---
 .../src/joins/bhj/semi_join.rs                     | 10 +++--
 .../datafusion-ext-plans/src/joins/test.rs         | 48 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs 
b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
index 26841cee..41ebcf6f 100644
--- a/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/bhj/semi_join.rs
@@ -189,11 +189,15 @@ impl<const P: JoinerParams> Joiner for SemiJoiner<P> {
         let mut hashes_idx = 0;
 
         for row_idx in 0..probed_batch.num_rows() {
-            if probed_valids
+            let key_is_valid = probed_valids
                 .as_ref()
                 .map(|nb| nb.is_valid(row_idx))
-                .unwrap_or(true)
-            {
+                .unwrap_or(true);
+            if P.mode == Anti && P.probe_is_join_side && !key_is_valid {
+                probed_joined.set(row_idx, true);
+                continue;
+            }
+            if key_is_valid {
                 let map_value = map_values[hashes_idx];
                 hashes_idx += 1;
 
diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs 
b/native-engine/datafusion-ext-plans/src/joins/test.rs
index 9125ed53..671ecd73 100644
--- a/native-engine/datafusion-ext-plans/src/joins/test.rs
+++ b/native-engine/datafusion-ext-plans/src/joins/test.rs
@@ -600,6 +600,54 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
+    async fn join_anti_with_null_keys() -> Result<()> {
+        let left = build_table_i32_nullable(
+            ("a1", &vec![Some(1), Some(2), None, Some(4), Some(5)]),
+            ("b1", &vec![Some(4), Some(5), Some(6), None, Some(8)]),
+            ("c1", &vec![Some(7), Some(8), Some(9), Some(10), Some(11)]),
+        );
+        let right = build_table_i32_nullable(
+            ("a2", &vec![Some(10), Some(20), Some(30)]),
+            ("b1", &vec![Some(4), Some(5), Some(7)]),
+            ("c2", &vec![Some(70), Some(80), Some(90)]),
+        );
+        let on: JoinOn = vec![(
+            Arc::new(Column::new_with_schema("b1", &left.schema())?),
+            Arc::new(Column::new_with_schema("b1", &right.schema())?),
+        )];
+
+        for test_type in [BHJLeftProbed, SHJLeftProbed] {
+            let (_, batches) =
+                join_collect(test_type, left.clone(), right.clone(), 
on.clone(), LeftAnti).await?;
+            let expected = vec![
+                "+----+----+----+",
+                "| a1 | b1 | c1 |",
+                "+----+----+----+",
+                "|    | 6  | 9  |",
+                "| 5  | 8  | 11 |",
+                "+----+----+----+",
+            ];
+            assert_batches_sorted_eq!(expected, &batches);
+        }
+
+        for test_type in [SMJ, BHJRightProbed, SHJRightProbed] {
+            let (_, batches) =
+                join_collect(test_type, left.clone(), right.clone(), 
on.clone(), LeftAnti).await?;
+            let expected = vec![
+                "+----+----+----+",
+                "| a1 | b1 | c1 |",
+                "+----+----+----+",
+                "|    | 6  | 9  |",
+                "| 4  |    | 10 |",
+                "| 5  | 8  | 11 |",
+                "+----+----+----+",
+            ];
+            assert_batches_sorted_eq!(expected, &batches);
+        }
+        Ok(())
+    }
+
     #[tokio::test(flavor = "multi_thread", worker_threads = 1)]
     async fn join_with_duplicated_column_names() -> Result<()> {
         for test_type in ALL_TEST_TYPE {

Reply via email to