This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 41acbf8e4b [branch-52] fix: Return `probe_side.len()` for 
RightMark/Anti count(*) queries (#20710) (#20881)
41acbf8e4b is described below

commit 41acbf8e4bb4ac15003bd5365661e6b17551f7f0
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 11:57:34 2026 -0400

    [branch-52] fix: Return `probe_side.len()` for RightMark/Anti count(*) 
queries (#20710) (#20881)
    
    - Part of https://github.com/apache/datafusion/issues/20855
    - Closes https://github.com/apache/datafusion/issues/20669 on branch-52
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20710 from
    @jonathanc-n to the branch-52 line
    
    Co-authored-by: Jonathan Chen <[email protected]>
---
 .../physical-plan/src/joins/hash_join/stream.rs    |  3 ++
 .../physical-plan/src/joins/symmetric_hash_join.rs |  3 ++
 datafusion/physical-plan/src/joins/utils.rs        | 15 +++++++-
 datafusion/sqllogictest/test_files/joins.slt       | 43 ++++++++++++++++++++++
 4 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs 
b/datafusion/physical-plan/src/joins/hash_join/stream.rs
index e673567512..c5c794f5a8 100644
--- a/datafusion/physical-plan/src/joins/hash_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs
@@ -639,6 +639,7 @@ impl HashJoinStream {
                 filter,
                 JoinSide::Left,
                 None,
+                self.join_type,
             )?
         } else {
             (left_indices, right_indices)
@@ -707,6 +708,7 @@ impl HashJoinStream {
             &right_indices,
             &self.column_indices,
             join_side,
+            self.join_type,
         )?;
 
         self.output_buffer.push_batch(batch)?;
@@ -770,6 +772,7 @@ impl HashJoinStream {
                 &right_side,
                 &self.column_indices,
                 JoinSide::Left,
+                self.join_type,
             )?;
             self.output_buffer.push_batch(batch)?;
         }
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 1f6bc703a0..a75a9893e9 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -930,6 +930,7 @@ pub(crate) fn build_side_determined_results(
             &probe_indices,
             column_indices,
             build_hash_joiner.build_side,
+            join_type,
         )
         .map(|batch| (batch.num_rows() > 0).then_some(batch))
     } else {
@@ -993,6 +994,7 @@ pub(crate) fn join_with_probe_batch(
             filter,
             build_hash_joiner.build_side,
             None,
+            join_type,
         )?
     } else {
         (build_indices, probe_indices)
@@ -1031,6 +1033,7 @@ pub(crate) fn join_with_probe_batch(
             &probe_indices,
             column_indices,
             build_hash_joiner.build_side,
+            join_type,
         )
         .map(|batch| (batch.num_rows() > 0).then_some(batch))
     }
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index a9243fe04e..53b4c4f802 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -910,6 +910,7 @@ pub(crate) fn get_final_indices_from_bit_map(
     (left_indices, right_indices)
 }
 
+#[expect(clippy::too_many_arguments)]
 pub(crate) fn apply_join_filter_to_indices(
     build_input_buffer: &RecordBatch,
     probe_batch: &RecordBatch,
@@ -918,6 +919,7 @@ pub(crate) fn apply_join_filter_to_indices(
     filter: &JoinFilter,
     build_side: JoinSide,
     max_intermediate_size: Option<usize>,
+    join_type: JoinType,
 ) -> Result<(UInt64Array, UInt32Array)> {
     if build_indices.is_empty() && probe_indices.is_empty() {
         return Ok((build_indices, probe_indices));
@@ -938,6 +940,7 @@ pub(crate) fn apply_join_filter_to_indices(
                 &probe_indices.slice(i, len),
                 filter.column_indices(),
                 build_side,
+                join_type,
             )?;
             let filter_result = filter
                 .expression()
@@ -959,6 +962,7 @@ pub(crate) fn apply_join_filter_to_indices(
             &probe_indices,
             filter.column_indices(),
             build_side,
+            join_type,
         )?;
 
         filter
@@ -979,6 +983,7 @@ pub(crate) fn apply_join_filter_to_indices(
 
 /// Returns a new [RecordBatch] by combining the `left` and `right` according 
to `indices`.
 /// The resulting batch has [Schema] `schema`.
+#[expect(clippy::too_many_arguments)]
 pub(crate) fn build_batch_from_indices(
     schema: &Schema,
     build_input_buffer: &RecordBatch,
@@ -987,11 +992,19 @@ pub(crate) fn build_batch_from_indices(
     probe_indices: &UInt32Array,
     column_indices: &[ColumnIndex],
     build_side: JoinSide,
+    join_type: JoinType,
 ) -> Result<RecordBatch> {
     if schema.fields().is_empty() {
+        // For RightAnti and RightSemi joins, after 
`adjust_indices_by_join_type`
+        // the build_indices were untouched so only probe_indices hold the 
actual
+        // row count.
+        let row_count = match join_type {
+            JoinType::RightAnti | JoinType::RightSemi => probe_indices.len(),
+            _ => build_indices.len(),
+        };
         let options = RecordBatchOptions::new()
             .with_match_field_names(true)
-            .with_row_count(Some(build_indices.len()));
+            .with_row_count(Some(row_count));
 
         return Ok(RecordBatch::try_new_with_options(
             Arc::new(schema.clone()),
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 42441fe787..1155bc4f3b 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -5226,3 +5226,46 @@ DROP TABLE issue_20437_small;
 
 statement count 0
 DROP TABLE issue_20437_large;
+
+# Test count(*) with right semi/anti joins returns correct row counts
+# issue: https://github.com/apache/datafusion/issues/20669 
+
+statement ok
+CREATE TABLE t1 (k INT, v INT);
+
+statement ok
+CREATE TABLE t2 (k INT, v INT);
+
+statement ok
+INSERT INTO t1 SELECT i AS k, i AS v FROM generate_series(1, 100) t(i);
+
+statement ok
+INSERT INTO t2 VALUES (1, 1);
+
+query I
+WITH t AS (
+    SELECT *
+    FROM t1
+    LEFT ANTI JOIN t2 ON t1.k = t2.k
+)
+SELECT count(*)
+FROM t;
+----
+99
+
+query I
+WITH t AS (
+    SELECT *
+    FROM t1
+    LEFT SEMI JOIN t2 ON t1.k = t2.k
+)
+SELECT count(*)
+FROM t;
+----
+1
+
+statement count 0
+DROP TABLE t1;
+
+statement count 0
+DROP TABLE t2;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to