This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c919054241 perf: short-circuit and collect_bool for IN list with 
column references (#20694)
c919054241 is described below

commit c9190542413c0642cca47129b4830b074924bdd3
Author: Zhang Xiaofeng <[email protected]>
AuthorDate: Thu Mar 5 23:13:58 2026 +0800

    perf: short-circuit and collect_bool for IN list with column references 
(#20694)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #20428 .
    
    ## Rationale for this change
    
      Third PR in the IN list optimization series (split from #20428):
      - PR1: benchmarks (#20444, merged)
      - PR2: Arrow vectorized eq kernel (#20528, merged)
    - **PR3 (this): short-circuit, collect_bool, and first-expr
    initialization**
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    - **Short-circuit break**: convert `try_fold` to `for` loop; when all
    non-null rows are already `true`, skip remaining list items (up to 27x
    faster for match=100%/nulls=0%)
    - **`BooleanBuffer::collect_bool`**: use in `make_comparator` fallback
    path for nested types instead `(0..n).map().collect()` (suggested by
    @Dandandan in #20428 )
    - **First-expr initialization**: evaluate the first list expression
    directly as the accumulator, avoiding a redundant `or_kleene(all_false,
    rhs)` (suggested by @Dandandan in #20428 )
    - **Tests**: added 3 new tests covering short-circuit, short-circuit
    with nulls, and struct column references (make_comparator fallback path)
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    Yes, and add test to cover short-circuit, short-circuit with nulls, and
    struct column references (make_comparator fallback path)
    
    Benchmark result:
    ```
    (zhangxffff) zhangxffff@95d3d60664da ~/W/datafusion ((bcc52cd4))> critcmp 
after before
    group                                              after                    
              before
    -----                                              -----                    
              ------
    in_list_cols/Int32/list=28/match=0%/nulls=0%       1.02     93.8±1.80µs     
   ? ?/sec    1.00     91.8±1.52µs        ? ?/sec
    in_list_cols/Int32/list=28/match=0%/nulls=20%      1.03    105.3±1.95µs     
   ? ?/sec    1.00    102.2±1.59µs        ? ?/sec
    in_list_cols/Int32/list=28/match=100%/nulls=0%     1.00      3.4±0.07µs     
   ? ?/sec    27.14    91.7±1.52µs        ? ?/sec
    in_list_cols/Int32/list=28/match=100%/nulls=20%    1.07    107.7±1.91µs     
   ? ?/sec    1.00    100.4±1.33µs        ? ?/sec
    in_list_cols/Int32/list=28/match=50%/nulls=0%      1.00     50.1±1.15µs     
   ? ?/sec    1.84     92.4±1.36µs        ? ?/sec
    in_list_cols/Int32/list=28/match=50%/nulls=20%     1.05    105.1±1.49µs     
   ? ?/sec    1.00    100.0±0.84µs        ? ?/sec
    in_list_cols/Int32/list=3/match=0%/nulls=0%        1.00      9.9±0.17µs     
   ? ?/sec    1.01     10.1±0.19µs        ? ?/sec
    in_list_cols/Int32/list=3/match=0%/nulls=20%       1.02     11.0±0.18µs     
   ? ?/sec    1.00     10.8±0.16µs        ? ?/sec
    in_list_cols/Int32/list=3/match=100%/nulls=0%      1.00      3.3±0.06µs     
   ? ?/sec    2.95      9.9±0.16µs        ? ?/sec
    in_list_cols/Int32/list=3/match=100%/nulls=20%     1.01     10.9±0.19µs     
   ? ?/sec    1.00     10.8±0.09µs        ? ?/sec
    in_list_cols/Int32/list=3/match=50%/nulls=0%       1.00     10.0±0.17µs     
   ? ?/sec    1.00      9.9±0.18µs        ? ?/sec
    in_list_cols/Int32/list=3/match=50%/nulls=20%      1.05     11.3±0.24µs     
   ? ?/sec    1.00     10.8±0.11µs        ? ?/sec
    in_list_cols/Int32/list=8/match=0%/nulls=0%        1.02     26.7±0.58µs     
   ? ?/sec    1.00     26.2±0.50µs        ? ?/sec
    in_list_cols/Int32/list=8/match=0%/nulls=20%       1.04     29.6±0.57µs     
   ? ?/sec    1.00     28.5±0.45µs        ? ?/sec
    in_list_cols/Int32/list=8/match=100%/nulls=0%      1.00      3.4±0.05µs     
   ? ?/sec    7.78     26.2±0.36µs        ? ?/sec
    in_list_cols/Int32/list=8/match=100%/nulls=20%     1.05     30.0±0.65µs     
   ? ?/sec    1.00     28.7±0.55µs        ? ?/sec
    in_list_cols/Int32/list=8/match=50%/nulls=0%       1.03     26.7±0.59µs     
   ? ?/sec    1.00     26.0±0.37µs        ? ?/sec
    in_list_cols/Int32/list=8/match=50%/nulls=20%      1.04     29.9±0.57µs     
   ? ?/sec    1.00     28.7±0.46µs        ? ?/sec
    in_list_cols/Utf8/list=28/match=0%                 1.17    155.0±2.44µs     
   ? ?/sec    1.00    132.8±2.97µs        ? ?/sec
    in_list_cols/Utf8/list=28/match=100%               1.02   726.6±14.54µs     
   ? ?/sec    1.00    712.4±9.09µs        ? ?/sec
    in_list_cols/Utf8/list=28/match=50%                1.02  1070.1±13.06µs     
   ? ?/sec    1.00   1051.8±8.17µs        ? ?/sec
    in_list_cols/Utf8/list=3/match=0%                  1.14     16.4±0.37µs     
   ? ?/sec    1.00     14.4±0.22µs        ? ?/sec
    in_list_cols/Utf8/list=3/match=100%                1.02     68.0±1.29µs     
   ? ?/sec    1.00     66.5±0.99µs        ? ?/sec
    in_list_cols/Utf8/list=3/match=50%                 1.15    107.6±2.05µs     
   ? ?/sec    1.00     93.6±1.88µs        ? ?/sec
    in_list_cols/Utf8/list=8/match=0%                  1.16     44.0±0.61µs     
   ? ?/sec    1.00     37.9±0.95µs        ? ?/sec
    in_list_cols/Utf8/list=8/match=100%                1.00    190.4±2.71µs     
   ? ?/sec    1.03    195.7±2.01µs        ? ?/sec
    in_list_cols/Utf8/list=8/match=50%                 1.03    295.9±4.45µs     
   ? ?/sec    1.00    287.3±3.26µs        ? ?/sec
    ```
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    ---------
    
    Co-authored-by: Adrian Garcia Badaracco 
<[email protected]>
---
 .../physical-expr/src/expressions/in_list.rs       | 264 ++++++++++++++++-----
 1 file changed, 209 insertions(+), 55 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/in_list.rs 
b/datafusion/physical-expr/src/expressions/in_list.rs
index 44a6572f53..e30f256352 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -793,63 +793,71 @@ impl PhysicalExpr for InListExpr {
                 // comparator for unsupported types (nested, RunEndEncoded, 
etc.).
                 let value = value.into_array(num_rows)?;
                 let lhs_supports_arrow_eq = 
supports_arrow_eq(value.data_type());
-                let found = self.list.iter().map(|expr| 
expr.evaluate(batch)).try_fold(
-                    BooleanArray::new(BooleanBuffer::new_unset(num_rows), 
None),
-                    |result, expr| -> Result<BooleanArray> {
-                        let rhs = match expr? {
-                            ColumnarValue::Array(array) => {
-                                if lhs_supports_arrow_eq
-                                    && supports_arrow_eq(array.data_type())
-                                {
-                                    arrow_eq(&value, &array)?
-                                } else {
-                                    let cmp = make_comparator(
-                                        value.as_ref(),
-                                        array.as_ref(),
-                                        SortOptions::default(),
-                                    )?;
-                                    (0..num_rows)
-                                        .map(|i| {
-                                            if value.is_null(i) || 
array.is_null(i) {
-                                                return None;
-                                            }
-                                            Some(cmp(i, i).is_eq())
-                                        })
-                                        .collect::<BooleanArray>()
-                                }
+
+                // Helper: compare value against a single list expression
+                let compare_one = |expr: &Arc<dyn PhysicalExpr>| -> 
Result<BooleanArray> {
+                    match expr.evaluate(batch)? {
+                        ColumnarValue::Array(array) => {
+                            if lhs_supports_arrow_eq
+                                && supports_arrow_eq(array.data_type())
+                            {
+                                Ok(arrow_eq(&value, &array)?)
+                            } else {
+                                let cmp = make_comparator(
+                                    value.as_ref(),
+                                    array.as_ref(),
+                                    SortOptions::default(),
+                                )?;
+                                let buffer = 
BooleanBuffer::collect_bool(num_rows, |i| {
+                                    cmp(i, i).is_eq()
+                                });
+                                let nulls =
+                                    NullBuffer::union(value.nulls(), 
array.nulls());
+                                Ok(BooleanArray::new(buffer, nulls))
                             }
-                            ColumnarValue::Scalar(scalar) => {
-                                // Check if scalar is null once, before the 
loop
-                                if scalar.is_null() {
-                                    // If scalar is null, all comparisons 
return null
-                                    BooleanArray::from(vec![None; num_rows])
-                                } else if lhs_supports_arrow_eq {
-                                    let scalar_datum = scalar.to_scalar()?;
-                                    arrow_eq(&value, &scalar_datum)?
-                                } else {
-                                    // Convert scalar to 1-element array
-                                    let array = scalar.to_array()?;
-                                    let cmp = make_comparator(
-                                        value.as_ref(),
-                                        array.as_ref(),
-                                        SortOptions::default(),
-                                    )?;
-                                    // Compare each row of value with the 
single scalar element
-                                    (0..num_rows)
-                                        .map(|i| {
-                                            if value.is_null(i) {
-                                                None
-                                            } else {
-                                                Some(cmp(i, 0).is_eq())
-                                            }
-                                        })
-                                        .collect::<BooleanArray>()
-                                }
+                        }
+                        ColumnarValue::Scalar(scalar) => {
+                            // Check if scalar is null once, before the loop
+                            if scalar.is_null() {
+                                // If scalar is null, all comparisons return 
null
+                                Ok(BooleanArray::from(vec![None; num_rows]))
+                            } else if lhs_supports_arrow_eq {
+                                let scalar_datum = scalar.to_scalar()?;
+                                Ok(arrow_eq(&value, &scalar_datum)?)
+                            } else {
+                                // Convert scalar to 1-element array
+                                let array = scalar.to_array()?;
+                                let cmp = make_comparator(
+                                    value.as_ref(),
+                                    array.as_ref(),
+                                    SortOptions::default(),
+                                )?;
+                                // Compare each row of value with the single 
scalar element
+                                let buffer = 
BooleanBuffer::collect_bool(num_rows, |i| {
+                                    cmp(i, 0).is_eq()
+                                });
+                                Ok(BooleanArray::new(buffer, 
value.nulls().cloned()))
                             }
-                        };
-                        Ok(or_kleene(&result, &rhs)?)
-                    },
-                )?;
+                        }
+                    }
+                };
+
+                // Evaluate first expression directly to avoid a redundant
+                // or_kleene with an all-false accumulator.
+                let mut found = if let Some(first) = self.list.first() {
+                    compare_one(first)?
+                } else {
+                    BooleanArray::new(BooleanBuffer::new_unset(num_rows), None)
+                };
+
+                for expr in self.list.iter().skip(1) {
+                    // Short-circuit: if every non-null row is already true,
+                    // no further list items can change the result.
+                    if found.null_count() == 0 && found.true_count() == 
num_rows {
+                        break;
+                    }
+                    found = or_kleene(&found, &compare_one(expr)?)?;
+                }
 
                 if self.negated { not(&found)? } else { found }
             }
@@ -3724,4 +3732,150 @@ mod tests {
         assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
         Ok(())
     }
+
+    /// Tests that short-circuit evaluation produces correct results.
+    /// When all rows match after the first list item, remaining items
+    /// should be skipped without affecting correctness.
+    #[test]
+    fn test_in_list_with_columns_short_circuit() -> Result<()> {
+        // a IN (b, c) where b already matches every row of a
+        // The short-circuit should skip evaluating c
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(Int32Array::from(vec![1, 2, 3])), // b == a for all 
rows
+                Arc::new(Int32Array::from(vec![99, 99, 99])),
+            ],
+        )?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        assert_eq!(result, &BooleanArray::from(vec![true, true, true]));
+        Ok(())
+    }
+
+    /// Short-circuit must NOT skip when nulls are present (three-valued 
logic).
+    /// Even if all non-null values are true, null rows keep the result as 
null.
+    #[test]
+    fn test_in_list_with_columns_short_circuit_with_nulls() -> Result<()> {
+        // a IN (b, c) where a has nulls
+        // Even if b matches all non-null rows, result should preserve nulls
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])),
+                Arc::new(Int32Array::from(vec![1, 2, 3])), // matches non-null 
rows
+                Arc::new(Int32Array::from(vec![99, 99, 99])),
+            ],
+        )?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: 1 IN (1, 99) → true
+        // row 1: NULL IN (2, 99) → NULL
+        // row 2: 3 IN (3, 99) → true
+        assert_eq!(
+            result,
+            &BooleanArray::from(vec![Some(true), None, Some(true)])
+        );
+        Ok(())
+    }
+
+    /// Tests the make_comparator + collect_bool fallback path using
+    /// struct column references (nested types don't support arrow_eq).
+    #[test]
+    fn test_in_list_with_columns_struct() -> Result<()> {
+        let struct_fields = Fields::from(vec![
+            Field::new("x", DataType::Int32, false),
+            Field::new("y", DataType::Utf8, false),
+        ]);
+        let struct_dt = DataType::Struct(struct_fields.clone());
+
+        let schema = Schema::new(vec![
+            Field::new("a", struct_dt.clone(), true),
+            Field::new("b", struct_dt.clone(), false),
+            Field::new("c", struct_dt.clone(), false),
+        ]);
+
+        // a: [{1,"a"}, {2,"b"}, NULL,    {4,"d"}]
+        // b: [{1,"a"}, {9,"z"}, {3,"c"}, {4,"d"}]
+        // c: [{9,"z"}, {2,"b"}, {9,"z"}, {9,"z"}]
+        let a = Arc::new(StructArray::new(
+            struct_fields.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
+                Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+            ],
+            Some(vec![true, true, false, true].into()),
+        ));
+        let b = Arc::new(StructArray::new(
+            struct_fields.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 9, 3, 4])),
+                Arc::new(StringArray::from(vec!["a", "z", "c", "d"])),
+            ],
+            None,
+        ));
+        let c = Arc::new(StructArray::new(
+            struct_fields.clone(),
+            vec![
+                Arc::new(Int32Array::from(vec![9, 2, 9, 9])),
+                Arc::new(StringArray::from(vec!["z", "b", "z", "z"])),
+            ],
+            None,
+        ));
+
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, b, 
c])?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: {1,"a"} IN ({1,"a"}, {9,"z"}) → true  (matches b)
+        // row 1: {2,"b"} IN ({9,"z"}, {2,"b"}) → true  (matches c)
+        // row 2: NULL    IN ({3,"c"}, {9,"z"}) → NULL
+        // row 3: {4,"d"} IN ({4,"d"}, {9,"z"}) → true  (matches b)
+        assert_eq!(
+            result,
+            &BooleanArray::from(vec![Some(true), Some(true), None, Some(true)])
+        );
+
+        // Also test NOT IN
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, true);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: {1,"a"} NOT IN ({1,"a"}, {9,"z"}) → false
+        // row 1: {2,"b"} NOT IN ({9,"z"}, {2,"b"}) → false
+        // row 2: NULL    NOT IN ({3,"c"}, {9,"z"}) → NULL
+        // row 3: {4,"d"} NOT IN ({4,"d"}, {9,"z"}) → false
+        assert_eq!(
+            result,
+            &BooleanArray::from(vec![Some(false), Some(false), None, 
Some(false)])
+        );
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to