This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new acec058cb5 perf: Use Arrow vectorized eq kernel for IN list with 
column references (#20528)
acec058cb5 is described below

commit acec058cb5e32c7a339280d7b58ebe58d8b38e8b
Author: Zhang Xiaofeng <[email protected]>
AuthorDate: Sat Feb 28 12:40:16 2026 +0800

    perf: Use Arrow vectorized eq kernel for IN list with column references 
(#20528)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Relates to #20427 .
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    When the IN list contains column references (e.g. `SELECT * FROM t WHERE
    a IN (b, c, d, e)`), DataFusion falls back to a row-by-row
    `make_comparator` path which is significantly slower than it needs to
    be. Arrow provides SIMD-optimized `eq` kernels that can compare entire
    arrays in one call.
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    - Use Arrow's vectorized `eq` kernel instead of row-by-row
    `make_comparator` for non-nested types (primitive, string, binary) in
    the column-reference IN list evaluation path
    - For nested types (Struct, List, etc.), fall back to `make_comparator`
    since Arrow's `eq` kernel does not support them
    - Add 6 unit tests covering the column-reference evaluation path (Int32,
    Utf8, NOT IN, NULL handling, NaN semantics)
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
      Yes. 6 new unit tests added:
      - `test_in_list_with_columns_int32_scalars`
      - `test_in_list_with_columns_int32_column_refs`
      - `test_in_list_with_columns_utf8_column_refs`
      - `test_in_list_with_columns_negated`
      - `test_in_list_with_columns_null_in_list`
      - `test_in_list_with_columns_float_nan`
    
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    No API changes. Queries with column-reference IN lists will run faster.
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 .../physical-expr/src/expressions/in_list.rs       | 245 +++++++++++++++++++--
 1 file changed, 231 insertions(+), 14 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/in_list.rs 
b/datafusion/physical-expr/src/expressions/in_list.rs
index 5c2f1adcd0..44a6572f53 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -28,6 +28,7 @@ use crate::physical_expr::physical_exprs_bag_equal;
 use arrow::array::*;
 use arrow::buffer::{BooleanBuffer, NullBuffer};
 use arrow::compute::kernels::boolean::{not, or_kleene};
+use arrow::compute::kernels::cmp::eq as arrow_eq;
 use arrow::compute::{SortOptions, take};
 use arrow::datatypes::*;
 use arrow::util::bit_iterator::BitIndexIterator;
@@ -138,6 +139,21 @@ impl StaticFilter for ArrayStaticFilter {
     }
 }
 
+/// Returns true if Arrow's vectorized `eq` kernel supports this data type.
+///
+/// Supported: primitives, boolean, strings (Utf8/LargeUtf8/Utf8View),
+/// binary (Binary/LargeBinary/BinaryView/FixedSizeBinary), Null, and
+/// Dictionary-encoded variants of the above.
+/// Unsupported: nested types (Struct, List, Map, Union) and RunEndEncoded.
+fn supports_arrow_eq(dt: &DataType) -> bool {
+    use DataType::*;
+    match dt {
+        Boolean | Binary | LargeBinary | BinaryView | FixedSizeBinary(_) => 
true,
+        Dictionary(_, v) => supports_arrow_eq(v.as_ref()),
+        _ => dt.is_primitive() || dt.is_null() || dt.is_string(),
+    }
+}
+
 fn instantiate_static_filter(
     in_array: ArrayRef,
 ) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
@@ -771,32 +787,45 @@ impl PhysicalExpr for InListExpr {
                 }
             }
             None => {
-                // No static filter: iterate through each expression, compare, 
and OR results
+                // No static filter: iterate through each expression, compare, 
and OR results.
+                // Use Arrow's vectorized eq kernel for types it supports 
(primitive,
+                // boolean, string, binary, dictionary), falling back to 
row-by-row
+                // comparator for unsupported types (nested, RunEndEncoded, 
etc.).
                 let value = value.into_array(num_rows)?;
+                let lhs_supports_arrow_eq = 
supports_arrow_eq(value.data_type());
                 let found = self.list.iter().map(|expr| 
expr.evaluate(batch)).try_fold(
                     BooleanArray::new(BooleanBuffer::new_unset(num_rows), 
None),
                     |result, expr| -> Result<BooleanArray> {
                         let rhs = match expr? {
                             ColumnarValue::Array(array) => {
-                                let cmp = make_comparator(
-                                    value.as_ref(),
-                                    array.as_ref(),
-                                    SortOptions::default(),
-                                )?;
-                                (0..num_rows)
-                                    .map(|i| {
-                                        if value.is_null(i) || 
array.is_null(i) {
-                                            return None;
-                                        }
-                                        Some(cmp(i, i).is_eq())
-                                    })
-                                    .collect::<BooleanArray>()
+                                if lhs_supports_arrow_eq
+                                    && supports_arrow_eq(array.data_type())
+                                {
+                                    arrow_eq(&value, &array)?
+                                } else {
+                                    let cmp = make_comparator(
+                                        value.as_ref(),
+                                        array.as_ref(),
+                                        SortOptions::default(),
+                                    )?;
+                                    (0..num_rows)
+                                        .map(|i| {
+                                            if value.is_null(i) || 
array.is_null(i) {
+                                                return None;
+                                            }
+                                            Some(cmp(i, i).is_eq())
+                                        })
+                                        .collect::<BooleanArray>()
+                                }
                             }
                             ColumnarValue::Scalar(scalar) => {
                                 // Check if scalar is null once, before the 
loop
                                 if scalar.is_null() {
                                     // If scalar is null, all comparisons 
return null
                                     BooleanArray::from(vec![None; num_rows])
+                                } else if lhs_supports_arrow_eq {
+                                    let scalar_datum = scalar.to_scalar()?;
+                                    arrow_eq(&value, &scalar_datum)?
                                 } else {
                                     // Convert scalar to 1-element array
                                     let array = scalar.to_array()?;
@@ -3507,4 +3536,192 @@ mod tests {
 
         Ok(())
     }
+
+    /// Helper: creates an InListExpr with `static_filter = None`
+    /// to force the column-reference evaluation path.
+    fn make_in_list_with_columns(
+        expr: Arc<dyn PhysicalExpr>,
+        list: Vec<Arc<dyn PhysicalExpr>>,
+        negated: bool,
+    ) -> Arc<InListExpr> {
+        Arc::new(InListExpr::new(expr, list, negated, None))
+    }
+
+    #[test]
+    fn test_in_list_with_columns_int32_scalars() -> Result<()> {
+        // Column-reference path with scalar literals (bypassing static filter)
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
+        let col_a = col("a", &schema)?;
+        let batch = RecordBatch::try_new(
+            Arc::new(schema),
+            vec![Arc::new(Int32Array::from(vec![
+                Some(1),
+                Some(2),
+                Some(3),
+                None,
+            ]))],
+        )?;
+
+        let list = vec![
+            lit(ScalarValue::Int32(Some(1))),
+            lit(ScalarValue::Int32(Some(3))),
+        ];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        assert_eq!(
+            result,
+            &BooleanArray::from(vec![Some(true), Some(false), Some(true), 
None,])
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_in_list_with_columns_int32_column_refs() -> Result<()> {
+        // IN list with column references
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, true),
+            Field::new("c", DataType::Int32, true),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3), 
None])),
+                Arc::new(Int32Array::from(vec![
+                    Some(1),
+                    Some(99),
+                    Some(99),
+                    Some(99),
+                ])),
+                Arc::new(Int32Array::from(vec![Some(99), Some(99), Some(3), 
None])),
+            ],
+        )?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?, col("c", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: 1 IN (1, 99) → true
+        // row 1: 2 IN (99, 99) → false
+        // row 2: 3 IN (99, 3) → true
+        // row 3: NULL IN (99, NULL) → NULL
+        assert_eq!(
+            result,
+            &BooleanArray::from(vec![Some(true), Some(false), Some(true), 
None,])
+        );
+        Ok(())
+    }
+
+    #[test]
+    fn test_in_list_with_columns_utf8_column_refs() -> Result<()> {
+        // IN list with Utf8 column references
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Utf8, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(StringArray::from(vec!["x", "y", "z"])),
+                Arc::new(StringArray::from(vec!["x", "x", "z"])),
+            ],
+        )?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: "x" IN ("x") → true
+        // row 1: "y" IN ("x") → false
+        // row 2: "z" IN ("z") → true
+        assert_eq!(result, &BooleanArray::from(vec![true, false, true]));
+        Ok(())
+    }
+
+    #[test]
+    fn test_in_list_with_columns_negated() -> Result<()> {
+        // NOT IN with column references
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(Int32Array::from(vec![1, 99, 3])),
+            ],
+        )?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, true);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: 1 NOT IN (1) → false
+        // row 1: 2 NOT IN (99) → true
+        // row 2: 3 NOT IN (3) → false
+        assert_eq!(result, &BooleanArray::from(vec![false, true, false]));
+        Ok(())
+    }
+
+    #[test]
+    fn test_in_list_with_columns_null_in_list() -> Result<()> {
+        // IN list with NULL scalar (column-reference path)
+        let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
+        let col_a = col("a", &schema)?;
+        let batch = RecordBatch::try_new(
+            Arc::new(schema),
+            vec![Arc::new(Int32Array::from(vec![1, 2]))],
+        )?;
+
+        let list = vec![
+            lit(ScalarValue::Int32(None)),
+            lit(ScalarValue::Int32(Some(1))),
+        ];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: 1 IN (NULL, 1) → true (true OR null = true)
+        // row 1: 2 IN (NULL, 1) → NULL (false OR null = null)
+        assert_eq!(result, &BooleanArray::from(vec![Some(true), None]));
+        Ok(())
+    }
+
+    #[test]
+    fn test_in_list_with_columns_float_nan() -> Result<()> {
+        // Verify NaN == NaN is true in the column-reference path
+        // (consistent with Arrow's totalOrder semantics)
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Float64, false),
+            Field::new("b", DataType::Float64, false),
+        ]);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(Float64Array::from(vec![f64::NAN, 1.0, f64::NAN])),
+                Arc::new(Float64Array::from(vec![f64::NAN, 2.0, 0.0])),
+            ],
+        )?;
+
+        let col_a = col("a", &schema)?;
+        let list = vec![col("b", &schema)?];
+        let expr = make_in_list_with_columns(col_a, list, false);
+
+        let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+        let result = as_boolean_array(&result);
+        // row 0: NaN IN (NaN) → true
+        // row 1: 1.0 IN (2.0) → false
+        // row 2: NaN IN (0.0) → false
+        assert_eq!(result, &BooleanArray::from(vec![true, false, false]));
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to