This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 486c5d860b Refactor InListExpr to support structs by re-using existing
hashing infrastructure (#18449)
486c5d860b is described below
commit 486c5d860b02fba8aca53a10fca918d3e23e3d26
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed Nov 19 07:36:50 2025 +0800
Refactor InListExpr to support structs by re-using existing hashing
infrastructure (#18449)
## Background
This PR is part of an EPIC to push down hash table references from
HashJoinExec into scans. The EPIC is tracked in
https://github.com/apache/datafusion/issues/17171.
A "target state" is tracked in
https://github.com/apache/datafusion/pull/18393.
There is a series of PRs to get us to this target state in smaller more
reviewable changes that are still valuable on their own:
- https://github.com/apache/datafusion/pull/18448
- (This PR): https://github.com/apache/datafusion/pull/18449 (depends on
https://github.com/apache/datafusion/pull/18448)
- https://github.com/apache/datafusion/pull/18451
## Changes in this PR
- Enhance InListExpr to efficiently store homogeneous lists as arrays
and avoid a conversion to Vec<PhysicalExpr>
by adding an internal InListStorage enum with Array and Exprs variants
- Re-use existing hashing and comparison utilities to support Struct
arrays and other complex types
- Add public function `in_list_from_array(expr, list_array, negated)`
for creating InList from arrays
Although the diff looks large most of it is actually tests and docs. I
think the actual code change is a negative LOC change, or at least
negative complexity (eliminates a trait, a macro, matching on data
types).
---------
Co-authored-by: David Hewitt <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/common/src/hash_utils.rs | 178 ++-
.../physical-expr/src/expressions/in_list.rs | 1669 +++++++++++++++++---
datafusion/physical-plan/src/joins/utils.rs | 2 +-
datafusion/sqllogictest/test_files/array.slt | 10 +-
datafusion/sqllogictest/test_files/expr.slt | 207 +++
.../test_files/tpch/plans/q19.slt.part | 4 +-
.../test_files/tpch/plans/q22.slt.part | 4 +-
7 files changed, 1877 insertions(+), 197 deletions(-)
diff --git a/datafusion/common/src/hash_utils.rs
b/datafusion/common/src/hash_utils.rs
index d60189fb6f..0fa47671d3 100644
--- a/datafusion/common/src/hash_utils.rs
+++ b/datafusion/common/src/hash_utils.rs
@@ -31,8 +31,8 @@ use crate::cast::{
as_string_array, as_string_view_array, as_struct_array,
};
use crate::error::Result;
-#[cfg(not(feature = "force_hash_collisions"))]
-use crate::error::_internal_err;
+use crate::error::{_internal_datafusion_err, _internal_err};
+use std::cell::RefCell;
// Combines two hashes into one hash
#[inline]
@@ -41,6 +41,94 @@ pub fn combine_hashes(l: u64, r: u64) -> u64 {
hash.wrapping_mul(37).wrapping_add(r)
}
+/// Maximum size for the thread-local hash buffer before truncation (4MB =
524,288 u64 elements).
+/// The goal of this is to avoid unbounded memory growth that would appear as
a memory leak.
+/// We allow temporary allocations beyond this size, but after use the buffer
is truncated
+/// to this size.
+const MAX_BUFFER_SIZE: usize = 524_288;
+
+thread_local! {
+ /// Thread-local buffer for hash computations to avoid repeated
allocations.
+ /// The buffer is reused across calls and truncated if it exceeds
MAX_BUFFER_SIZE.
+ /// Defaults to a capacity of 8192 u64 elements which is the default batch
size.
+ /// This corresponds to 64KB of memory.
+ static HASH_BUFFER: RefCell<Vec<u64>> = const { RefCell::new(Vec::new()) };
+}
+
+/// Creates hashes for the given arrays using a thread-local buffer, then
calls the provided callback
+/// with an immutable reference to the computed hashes.
+///
+/// This function manages a thread-local buffer to avoid repeated allocations.
The buffer is automatically
+/// truncated if it exceeds `MAX_BUFFER_SIZE` after use.
+///
+/// # Arguments
+/// * `arrays` - The arrays to hash (must contain at least one array)
+/// * `random_state` - The random state for hashing
+/// * `callback` - A function that receives an immutable reference to the hash
slice and returns a result
+///
+/// # Errors
+/// Returns an error if:
+/// - No arrays are provided
+/// - The function is called reentrantly (i.e., the callback invokes
`with_hashes` again on the same thread)
+/// - The function is called during or after thread destruction
+///
+/// # Example
+/// ```ignore
+/// use datafusion_common::hash_utils::{with_hashes, RandomState};
+/// use arrow::array::{Int32Array, ArrayRef};
+/// use std::sync::Arc;
+///
+/// let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+/// let random_state = RandomState::new();
+///
+/// let result = with_hashes([&array], &random_state, |hashes| {
+/// // Use the hashes here
+/// Ok(hashes.len())
+/// })?;
+/// ```
+pub fn with_hashes<I, T, F, R>(
+ arrays: I,
+ random_state: &RandomState,
+ callback: F,
+) -> Result<R>
+where
+ I: IntoIterator<Item = T>,
+ T: AsDynArray,
+ F: FnOnce(&[u64]) -> Result<R>,
+{
+ // Peek at the first array to determine buffer size without fully
collecting
+ let mut iter = arrays.into_iter().peekable();
+
+ // Get the required size from the first array
+ let required_size = match iter.peek() {
+ Some(arr) => arr.as_dyn_array().len(),
+ None => return _internal_err!("with_hashes requires at least one
array"),
+ };
+
+ HASH_BUFFER.try_with(|cell| {
+ let mut buffer = cell.try_borrow_mut()
+ .map_err(|_| _internal_datafusion_err!("with_hashes cannot be
called reentrantly on the same thread"))?;
+
+ // Ensure buffer has sufficient length, clearing old values
+ buffer.clear();
+ buffer.resize(required_size, 0);
+
+ // Create hashes in the buffer - this consumes the iterator
+ create_hashes(iter, random_state, &mut buffer[..required_size])?;
+
+ // Execute the callback with an immutable slice
+ let result = callback(&buffer[..required_size])?;
+
+ // Cleanup: truncate if buffer grew too large
+ if buffer.capacity() > MAX_BUFFER_SIZE {
+ buffer.truncate(MAX_BUFFER_SIZE);
+ buffer.shrink_to_fit();
+ }
+
+ Ok(result)
+ }).map_err(|_| _internal_datafusion_err!("with_hashes cannot access
thread-local storage during or after thread destruction"))?
+}
+
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64],
mul_col: bool) {
if mul_col {
@@ -478,8 +566,8 @@ impl AsDynArray for &ArrayRef {
pub fn create_hashes<'a, I, T>(
arrays: I,
random_state: &RandomState,
- hashes_buffer: &'a mut Vec<u64>,
-) -> Result<&'a mut Vec<u64>>
+ hashes_buffer: &'a mut [u64],
+) -> Result<&'a mut [u64]>
where
I: IntoIterator<Item = T>,
T: AsDynArray,
@@ -522,7 +610,7 @@ mod tests {
fn create_hashes_for_empty_fixed_size_lit() -> Result<()> {
let empty_array = FixedSizeListBuilder::new(StringBuilder::new(),
1).finish();
let random_state = RandomState::with_seeds(0, 0, 0, 0);
- let hashes_buff = &mut vec![0; 0];
+ let hashes_buff = &mut [0; 0];
let hashes = create_hashes(
&[Arc::new(empty_array) as ArrayRef],
&random_state,
@@ -1000,4 +1088,84 @@ mod tests {
assert_eq!(hashes1, hashes2);
}
+
+ #[test]
+ fn test_with_hashes() {
+ let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+ // Test that with_hashes produces the same results as create_hashes
+ let mut expected_hashes = vec![0; array.len()];
+ create_hashes([&array], &random_state, &mut expected_hashes).unwrap();
+
+ let result = with_hashes([&array], &random_state, |hashes| {
+ assert_eq!(hashes.len(), 4);
+ // Verify hashes match expected values
+ assert_eq!(hashes, &expected_hashes[..]);
+ // Return a copy of the hashes
+ Ok(hashes.to_vec())
+ })
+ .unwrap();
+
+ // Verify callback result is returned correctly
+ assert_eq!(result, expected_hashes);
+ }
+
+ #[test]
+ fn test_with_hashes_multi_column() {
+ let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b",
"c"]));
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+ // Test multi-column hashing
+ let mut expected_hashes = vec![0; int_array.len()];
+ create_hashes(
+ [&int_array, &str_array],
+ &random_state,
+ &mut expected_hashes,
+ )
+ .unwrap();
+
+ with_hashes([&int_array, &str_array], &random_state, |hashes| {
+ assert_eq!(hashes.len(), 3);
+ assert_eq!(hashes, &expected_hashes[..]);
+ Ok(())
+ })
+ .unwrap();
+ }
+
+ #[test]
+ fn test_with_hashes_empty_arrays() {
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+ // Test that passing no arrays returns an error
+ let empty: [&ArrayRef; 0] = [];
+ let result = with_hashes(empty, &random_state, |_hashes| Ok(()));
+
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("requires at least one array"));
+ }
+
+ #[test]
+ fn test_with_hashes_reentrancy() {
+ let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let array2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5, 6]));
+ let random_state = RandomState::with_seeds(0, 0, 0, 0);
+
+ // Test that reentrant calls return an error instead of panicking
+ let result = with_hashes([&array], &random_state, |_hashes| {
+ // Try to call with_hashes again inside the callback
+ with_hashes([&array2], &random_state, |_inner_hashes| Ok(()))
+ });
+
+ assert!(result.is_err());
+ let err_msg = result.unwrap_err().to_string();
+ assert!(
+ err_msg.contains("reentrantly") || err_msg.contains("cannot be
called"),
+ "Error message should mention reentrancy: {err_msg}",
+ );
+ }
}
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs
b/datafusion/physical-expr/src/expressions/in_list.rs
index 4bcfbe35d0..95029c1efe 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -25,35 +25,37 @@ use std::sync::Arc;
use crate::physical_expr::physical_exprs_bag_equal;
use crate::PhysicalExpr;
-use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
use arrow::buffer::BooleanBuffer;
use arrow::compute::kernels::boolean::{not, or_kleene};
-use arrow::compute::take;
+use arrow::compute::{take, SortOptions};
use arrow::datatypes::*;
use arrow::util::bit_iterator::BitIndexIterator;
-use arrow::{downcast_dictionary_array, downcast_primitive_array};
-use datafusion_common::cast::{
- as_boolean_array, as_generic_binary_array, as_string_array,
-};
-use datafusion_common::hash_utils::HashValue;
+use datafusion_common::hash_utils::with_hashes;
use datafusion_common::{
- assert_or_internal_err, exec_err, not_impl_err, DFSchema, DataFusionError,
Result,
- ScalarValue,
+ assert_or_internal_err, exec_datafusion_err, exec_err, DFSchema,
DataFusionError,
+ HashSet, Result, ScalarValue,
};
-use datafusion_expr::ColumnarValue;
-use datafusion_physical_expr_common::datum::compare_with_eq;
+use datafusion_expr::{expr_vec_fmt, ColumnarValue};
use ahash::RandomState;
use datafusion_common::HashMap;
use hashbrown::hash_map::RawEntryMut;
+/// Trait for InList static filters
+trait StaticFilter {
+ fn null_count(&self) -> usize;
+
+ /// Checks if values in `v` are contained in the filter
+ fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>;
+}
+
/// InList
pub struct InListExpr {
expr: Arc<dyn PhysicalExpr>,
list: Vec<Arc<dyn PhysicalExpr>>,
negated: bool,
- static_filter: Option<Arc<dyn Set>>,
+ static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
}
impl Debug for InListExpr {
@@ -66,13 +68,10 @@ impl Debug for InListExpr {
}
}
-/// A type-erased container of array elements
-pub trait Set: Send + Sync {
- fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>;
- fn has_nulls(&self) -> bool;
-}
-
-struct ArrayHashSet {
+/// Static filter for InList that stores the array and hash set for O(1)
lookups
+#[derive(Debug, Clone)]
+struct ArrayStaticFilter {
+ in_array: ArrayRef,
state: RandomState,
/// Used to provide a lookup from value to in list index
///
@@ -81,30 +80,20 @@ struct ArrayHashSet {
map: HashMap<usize, (), ()>,
}
-struct ArraySet<T> {
- array: T,
- hash_set: ArrayHashSet,
-}
-
-impl<T> ArraySet<T>
-where
- T: Array + From<ArrayData>,
-{
- fn new(array: &T, hash_set: ArrayHashSet) -> Self {
- Self {
- array: downcast_array(array),
- hash_set,
- }
+impl StaticFilter for ArrayStaticFilter {
+ fn null_count(&self) -> usize {
+ self.in_array.null_count()
}
-}
-impl<T> Set for ArraySet<T>
-where
- T: Array + 'static,
- for<'a> &'a T: ArrayAccessor,
- for<'a> <&'a T as ArrayAccessor>::Item: IsEqual,
-{
+ /// Checks if values in `v` are contained in the `in_array` using this
hash set for lookup.
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
+ // Null type comparisons always return null (SQL three-valued logic)
+ if v.data_type() == &DataType::Null
+ || self.in_array.data_type() == &DataType::Null
+ {
+ return Ok(BooleanArray::from(vec![None; v.len()]));
+ }
+
downcast_dictionary_array! {
v => {
let values_contains = self.contains(v.values().as_ref(),
negated)?;
@@ -114,100 +103,161 @@ where
_ => {}
}
- let v = v.as_any().downcast_ref::<T>().unwrap();
- let in_array = &self.array;
- let has_nulls = in_array.null_count() != 0;
+ let needle_nulls = v.logical_nulls();
+ let needle_nulls = needle_nulls.as_ref();
+ let haystack_has_nulls = self.in_array.null_count() != 0;
+
+ with_hashes([v], &self.state, |hashes| {
+ let cmp = make_comparator(v, &self.in_array,
SortOptions::default())?;
+ Ok((0..v.len())
+ .map(|i| {
+ // SQL three-valued logic: null IN (...) is always null
+ if needle_nulls.is_some_and(|nulls| nulls.is_null(i)) {
+ return None;
+ }
- Ok(ArrayIter::new(v)
- .map(|v| {
- v.and_then(|v| {
- let hash = v.hash_one(&self.hash_set.state);
+ let hash = hashes[i];
let contains = self
- .hash_set
.map
.raw_entry()
- .from_hash(hash, |idx|
in_array.value(*idx).is_equal(&v))
+ .from_hash(hash, |idx| cmp(i, *idx).is_eq())
.is_some();
match contains {
true => Some(!negated),
- false if has_nulls => None,
+ false if haystack_has_nulls => None,
false => Some(negated),
}
})
- })
- .collect())
+ .collect())
+ })
}
+}
- fn has_nulls(&self) -> bool {
- self.array.null_count() != 0
+fn instantiate_static_filter(
+ in_array: ArrayRef,
+) -> Result<Arc<dyn StaticFilter + Send + Sync>> {
+ match in_array.data_type() {
+ DataType::Int32 =>
Ok(Arc::new(Int32StaticFilter::try_new(&in_array)?)),
+ _ => {
+ /* fall through to generic implementation */
+ Ok(Arc::new(ArrayStaticFilter::try_new(in_array)?))
+ }
}
}
-/// Computes an [`ArrayHashSet`] for the provided [`Array`] if there
-/// are nulls present or there are more than the configured number of
-/// elements.
-///
-/// Note: This is split into a separate function as higher-rank trait bounds
currently
-/// cause type inference to misbehave
-fn make_hash_set<T>(array: &T) -> ArrayHashSet
-where
- T: ArrayAccessor,
- T::Item: IsEqual,
-{
- let state = RandomState::new();
- let mut map: HashMap<usize, (), ()> =
- HashMap::with_capacity_and_hasher(array.len(), ());
-
- let insert_value = |idx| {
- let value = array.value(idx);
- let hash = value.hash_one(&state);
- if let RawEntryMut::Vacant(v) = map
- .raw_entry_mut()
- .from_hash(hash, |x| array.value(*x).is_equal(&value))
- {
- v.insert_with_hasher(hash, idx, (), |x|
array.value(*x).hash_one(&state));
+impl ArrayStaticFilter {
+ /// Computes a [`StaticFilter`] for the provided [`Array`] if there
+ /// are nulls present or there are more than the configured number of
+ /// elements.
+ ///
+ /// Note: This is split into a separate function as higher-rank trait
bounds currently
+ /// cause type inference to misbehave
+ fn try_new(in_array: ArrayRef) -> Result<ArrayStaticFilter> {
+ // Null type has no natural order - return empty hash set
+ if in_array.data_type() == &DataType::Null {
+ return Ok(ArrayStaticFilter {
+ in_array,
+ state: RandomState::new(),
+ map: HashMap::with_hasher(()),
+ });
}
- };
- match array.nulls() {
- Some(nulls) => {
- BitIndexIterator::new(nulls.validity(), nulls.offset(),
nulls.len())
- .for_each(insert_value)
- }
- None => (0..array.len()).for_each(insert_value),
+ let state = RandomState::new();
+ let mut map: HashMap<usize, (), ()> = HashMap::with_hasher(());
+
+ with_hashes([&in_array], &state, |hashes| -> Result<()> {
+ let cmp = make_comparator(&in_array, &in_array,
SortOptions::default())?;
+
+ let insert_value = |idx| {
+ let hash = hashes[idx];
+ if let RawEntryMut::Vacant(v) = map
+ .raw_entry_mut()
+ .from_hash(hash, |x| cmp(*x, idx).is_eq())
+ {
+ v.insert_with_hasher(hash, idx, (), |x| hashes[*x]);
+ }
+ };
+
+ match in_array.nulls() {
+ Some(nulls) => {
+ BitIndexIterator::new(nulls.validity(), nulls.offset(),
nulls.len())
+ .for_each(insert_value)
+ }
+ None => (0..in_array.len()).for_each(insert_value),
+ }
+
+ Ok(())
+ })?;
+
+ Ok(Self {
+ in_array,
+ state,
+ map,
+ })
}
+}
- ArrayHashSet { state, map }
+struct Int32StaticFilter {
+ null_count: usize,
+ values: HashSet<i32>,
}
-/// Creates a `Box<dyn Set>` for the given list of `IN` expressions and `batch`
-fn make_set(array: &dyn Array) -> Result<Arc<dyn Set>> {
- Ok(downcast_primitive_array! {
- array => Arc::new(ArraySet::new(array, make_hash_set(&array))),
- DataType::Boolean => {
- let array = as_boolean_array(array)?;
- Arc::new(ArraySet::new(array, make_hash_set(&array)))
- },
- DataType::Utf8 => {
- let array = as_string_array(array)?;
- Arc::new(ArraySet::new(array, make_hash_set(&array)))
- }
- DataType::LargeUtf8 => {
- let array = as_largestring_array(array);
- Arc::new(ArraySet::new(array, make_hash_set(&array)))
- }
- DataType::Binary => {
- let array = as_generic_binary_array::<i32>(array)?;
- Arc::new(ArraySet::new(array, make_hash_set(&array)))
- }
- DataType::LargeBinary => {
- let array = as_generic_binary_array::<i64>(array)?;
- Arc::new(ArraySet::new(array, make_hash_set(&array)))
+impl Int32StaticFilter {
+ fn try_new(in_array: &ArrayRef) -> Result<Self> {
+ let in_array = in_array
+ .as_primitive_opt::<Int32Type>()
+ .ok_or_else(|| exec_datafusion_err!("Failed to downcast array"))?;
+
+ let mut values = HashSet::with_capacity(in_array.len());
+ let null_count = in_array.null_count();
+
+ for v in in_array.iter().flatten() {
+ values.insert(v);
}
- DataType::Dictionary(_, _) => unreachable!("dictionary should have
been flattened"),
- d => return not_impl_err!("DataType::{d} not supported in InList")
- })
+
+ Ok(Self { null_count, values })
+ }
+}
+
+impl StaticFilter for Int32StaticFilter {
+ fn null_count(&self) -> usize {
+ self.null_count
+ }
+
+ fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray> {
+ let v = v
+ .as_primitive_opt::<Int32Type>()
+ .ok_or_else(|| exec_datafusion_err!("Failed to downcast array"))?;
+
+ let result = match (v.null_count() > 0, negated) {
+ (true, false) => {
+ // has nulls, not negated"
+ BooleanArray::from_iter(
+ v.iter().map(|value| Some(self.values.contains(&value?))),
+ )
+ }
+ (true, true) => {
+ // has nulls, negated
+ BooleanArray::from_iter(
+ v.iter().map(|value| Some(!self.values.contains(&value?))),
+ )
+ }
+ (false, false) => {
+ //no null, not negated
+ BooleanArray::from_iter(
+ v.values().iter().map(|value| self.values.contains(value)),
+ )
+ }
+ (false, true) => {
+ // no null, negated
+ BooleanArray::from_iter(
+ v.values().iter().map(|value|
!self.values.contains(value)),
+ )
+ }
+ };
+ Ok(result)
+ }
}
/// Evaluates the list of expressions into an array, flattening any
dictionaries
@@ -232,56 +282,26 @@ fn evaluate_list(
ScalarValue::iter_to_array(scalars)
}
-fn try_cast_static_filter_to_set(
+/// Try to evaluate a list of expressions as constants.
+///
+/// Returns an ArrayRef if all expressions are constants (can be evaluated on
an
+/// empty RecordBatch), otherwise returns an error. This is used to detect when
+/// a list contains only literals, casts of literals, or other constant
expressions.
+fn try_evaluate_constant_list(
list: &[Arc<dyn PhysicalExpr>],
schema: &Schema,
-) -> Result<Arc<dyn Set>> {
+) -> Result<ArrayRef> {
let batch = RecordBatch::new_empty(Arc::new(schema.clone()));
- make_set(evaluate_list(list, &batch)?.as_ref())
-}
-
-/// Custom equality check function which is used with [`ArrayHashSet`] for
existence check.
-trait IsEqual: HashValue {
- fn is_equal(&self, other: &Self) -> bool;
-}
-
-impl<T: IsEqual + ?Sized> IsEqual for &T {
- fn is_equal(&self, other: &Self) -> bool {
- T::is_equal(self, other)
- }
-}
-
-macro_rules! is_equal {
- ($($t:ty),+) => {
- $(impl IsEqual for $t {
- fn is_equal(&self, other: &Self) -> bool {
- self == other
- }
- })*
- };
-}
-is_equal!(i8, i16, i32, i64, i128, i256, u8, u16, u32, u64);
-is_equal!(bool, str, [u8]);
-is_equal!(IntervalDayTime, IntervalMonthDayNano);
-
-macro_rules! is_equal_float {
- ($($t:ty),+) => {
- $(impl IsEqual for $t {
- fn is_equal(&self, other: &Self) -> bool {
- self.to_bits() == other.to_bits()
- }
- })*
- };
+ evaluate_list(list, &batch)
}
-is_equal_float!(half::f16, f32, f64);
impl InListExpr {
/// Create a new InList expression
- pub fn new(
+ fn new(
expr: Arc<dyn PhysicalExpr>,
list: Vec<Arc<dyn PhysicalExpr>>,
negated: bool,
- static_filter: Option<Arc<dyn Set>>,
+ static_filter: Option<Arc<dyn StaticFilter + Send + Sync>>,
) -> Self {
Self {
expr,
@@ -305,19 +325,37 @@ impl InListExpr {
pub fn negated(&self) -> bool {
self.negated
}
-}
-#[macro_export]
-macro_rules! expr_vec_fmt {
- ( $ARRAY:expr ) => {{
- $ARRAY
- .iter()
- .map(|e| format!("{e}"))
- .collect::<Vec<String>>()
- .join(", ")
- }};
+ /// Create a new InList expression directly from an array, bypassing
expression evaluation.
+ ///
+ /// This is more efficient than `in_list()` when you already have the list
as an array,
+ /// as it avoids the conversion: `ArrayRef -> Vec<PhysicalExpr> ->
ArrayRef -> StaticFilter`.
+ /// Instead it goes directly: `ArrayRef -> StaticFilter`.
+ ///
+ /// The `list` field will be empty when using this constructor, as the
array is stored
+ /// directly in the static filter.
+ ///
+ /// This does not make the expression any more performant at runtime, but
it does make it slightly
+ /// cheaper to build.
+ pub fn try_new_from_array(
+ expr: Arc<dyn PhysicalExpr>,
+ array: ArrayRef,
+ negated: bool,
+ ) -> Result<Self> {
+ let list = (0..array.len())
+ .map(|i| {
+ let scalar = ScalarValue::try_from_array(array.as_ref(), i)?;
+ Ok(crate::expressions::lit(scalar) as Arc<dyn PhysicalExpr>)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ Ok(Self::new(
+ expr,
+ list,
+ negated,
+ Some(instantiate_static_filter(array)?),
+ ))
+ }
}
-
impl std::fmt::Display for InListExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let list = expr_vec_fmt!(self.list);
@@ -352,7 +390,7 @@ impl PhysicalExpr for InListExpr {
}
if let Some(static_filter) = &self.static_filter {
- Ok(static_filter.has_nulls())
+ Ok(static_filter.null_count() > 0)
} else {
for expr in &self.list {
if expr.nullable(input_schema)? {
@@ -367,18 +405,85 @@ impl PhysicalExpr for InListExpr {
let num_rows = batch.num_rows();
let value = self.expr.evaluate(batch)?;
let r = match &self.static_filter {
- Some(f) => f.contains(value.into_array(num_rows)?.as_ref(),
self.negated)?,
+ Some(filter) => {
+ match value {
+ ColumnarValue::Array(array) => {
+ filter.contains(&array, self.negated)?
+ }
+ ColumnarValue::Scalar(scalar) => {
+ if scalar.is_null() {
+ // SQL three-valued logic: null IN (...) is always
null
+ // The code below would handle this correctly but
this is a faster path
+ return Ok(ColumnarValue::Array(Arc::new(
+ BooleanArray::from(vec![None; num_rows]),
+ )));
+ }
+ // Use a 1 row array to avoid code
duplication/branching
+ // Since all we do is compute hash and lookup this
should be efficient enough
+ let array = scalar.to_array()?;
+ let result_array =
+ filter.contains(array.as_ref(), self.negated)?;
+ // Broadcast the single result to all rows
+ // Must check is_null() to preserve NULL values (SQL
three-valued logic)
+ if result_array.is_null(0) {
+ BooleanArray::from(vec![None; num_rows])
+ } else {
+ BooleanArray::from_iter(std::iter::repeat_n(
+ result_array.value(0),
+ num_rows,
+ ))
+ }
+ }
+ }
+ }
None => {
+ // No static filter: iterate through each expression, compare,
and OR results
let value = value.into_array(num_rows)?;
- let is_nested = value.data_type().is_nested();
let found = self.list.iter().map(|expr|
expr.evaluate(batch)).try_fold(
BooleanArray::new(BooleanBuffer::new_unset(num_rows),
None),
|result, expr| -> Result<BooleanArray> {
- let rhs = compare_with_eq(
- &value,
- &expr?.into_array(num_rows)?,
- is_nested,
- )?;
+ let rhs = match expr? {
+ ColumnarValue::Array(array) => {
+ let cmp = make_comparator(
+ value.as_ref(),
+ array.as_ref(),
+ SortOptions::default(),
+ )?;
+ (0..num_rows)
+ .map(|i| {
+ if value.is_null(i) ||
array.is_null(i) {
+ return None;
+ }
+ Some(cmp(i, i).is_eq())
+ })
+ .collect::<BooleanArray>()
+ }
+ ColumnarValue::Scalar(scalar) => {
+ // Check if scalar is null once, before the
loop
+ if scalar.is_null() {
+ // If scalar is null, all comparisons
return null
+ BooleanArray::from(vec![None; num_rows])
+ } else {
+ // Convert scalar to 1-element array
+ let array = scalar.to_array()?;
+ let cmp = make_comparator(
+ value.as_ref(),
+ array.as_ref(),
+ SortOptions::default(),
+ )?;
+ // Compare each row of value with the
single scalar element
+ (0..num_rows)
+ .map(|i| {
+ if value.is_null(i) {
+ None
+ } else {
+ Some(cmp(i, 0).is_eq())
+ }
+ })
+ .collect::<BooleanArray>()
+ }
+ }
+ };
Ok(or_kleene(&result, &rhs)?)
},
)?;
@@ -394,8 +499,7 @@ impl PhysicalExpr for InListExpr {
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
- let mut children = vec![];
- children.push(&self.expr);
+ let mut children = vec![&self.expr];
children.extend(&self.list);
children
}
@@ -409,7 +513,7 @@ impl PhysicalExpr for InListExpr {
Arc::clone(&children[0]),
children[1..].to_vec(),
self.negated,
- self.static_filter.clone(),
+ self.static_filter.as_ref().map(Arc::clone),
)))
}
@@ -444,8 +548,8 @@ impl Hash for InListExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.expr.hash(state);
self.negated.hash(state);
- self.list.hash(state);
// Add `self.static_filter` when hash is available
+ self.list.hash(state);
}
}
@@ -465,7 +569,15 @@ pub fn in_list(
"The data type inlist should be same, the value type is
{expr_data_type}, one of list expr type is {list_expr_data_type}"
);
}
- let static_filter = try_cast_static_filter_to_set(&list, schema).ok();
+
+ // Try to create a static filter for constant expressions
+ let static_filter = try_evaluate_constant_list(&list, schema)
+ .and_then(ArrayStaticFilter::try_new)
+ .ok()
+ .map(|static_filter| {
+ Arc::new(static_filter) as Arc<dyn StaticFilter + Send + Sync>
+ });
+
Ok(Arc::new(InListExpr::new(
expr,
list,
@@ -479,11 +591,12 @@ mod tests {
use super::*;
use crate::expressions;
use crate::expressions::{col, lit, try_cast};
+ use arrow::buffer::NullBuffer;
use datafusion_common::plan_err;
use datafusion_expr::type_coercion::binary::comparison_coercion;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use insta::assert_snapshot;
- use itertools::Itertools as _;
+ use itertools::Itertools;
type InListCastResult = (Arc<dyn PhysicalExpr>, Vec<Arc<dyn
PhysicalExpr>>);
@@ -519,6 +632,14 @@ mod tests {
}
}
+ fn try_cast_static_filter_to_set(
+ list: &[Arc<dyn PhysicalExpr>],
+ schema: &Schema,
+ ) -> Result<ArrayStaticFilter> {
+ let array = try_evaluate_constant_list(list, schema)?;
+ ArrayStaticFilter::try_new(array)
+ }
+
// Attempts to coerce the types of `list_type` to be comparable with the
// `expr_type`
fn get_coerce_type(expr_type: &DataType, list_type: &[DataType]) ->
Option<DataType> {
@@ -529,7 +650,18 @@ mod tests {
})
}
- // applies the in_list expr to an input batch and list
+ /// Test helper macro that evaluates an IN LIST expression with automatic
type casting.
+ ///
+ /// # Parameters
+ /// - `$BATCH`: The `RecordBatch` containing the input data to evaluate
against
+ /// - `$LIST`: A `Vec<Arc<dyn PhysicalExpr>>` of literal expressions
representing the IN list values
+ /// - `$NEGATED`: A `&bool` indicating whether this is a NOT IN operation
(true) or IN operation (false)
+ /// - `$EXPECTED`: A `Vec<Option<bool>>` representing the expected boolean
results for each row
+ /// - `$COL`: An `Arc<dyn PhysicalExpr>` representing the column
expression to evaluate
+ /// - `$SCHEMA`: A `&Schema` reference for the input batch
+ ///
+ /// This macro first applies type casting to the column and list
expressions to ensure
+ /// type compatibility, then delegates to `in_list_raw!` to perform the
evaluation and assertion.
macro_rules! in_list {
($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr,
$SCHEMA:expr) => {{
let (cast_expr, cast_list_exprs) = in_list_cast($COL, $LIST,
$SCHEMA)?;
@@ -544,7 +676,19 @@ mod tests {
}};
}
- // applies the in_list expr to an input batch and list without cast
+ /// Test helper macro that evaluates an IN LIST expression without
automatic type casting.
+ ///
+ /// # Parameters
+ /// - `$BATCH`: The `RecordBatch` containing the input data to evaluate
against
+ /// - `$LIST`: A `Vec<Arc<dyn PhysicalExpr>>` of literal expressions
representing the IN list values
+ /// - `$NEGATED`: A `&bool` indicating whether this is a NOT IN operation
(true) or IN operation (false)
+ /// - `$EXPECTED`: A `Vec<Option<bool>>` representing the expected boolean
results for each row
+ /// - `$COL`: An `Arc<dyn PhysicalExpr>` representing the column
expression to evaluate
+ /// - `$SCHEMA`: A `&Schema` reference for the input batch
+ ///
+ /// This macro creates an IN LIST expression, evaluates it against the
batch, converts the result
+ /// to a `BooleanArray`, and asserts that it matches the expected output.
Use this when the column
+ /// and list expressions are already the correct types and don't require
casting.
macro_rules! in_list_raw {
($BATCH:expr, $LIST:expr, $NEGATED:expr, $EXPECTED:expr, $COL:expr,
$SCHEMA:expr) => {{
let expr = in_list($COL, $LIST, $NEGATED, $SCHEMA).unwrap();
@@ -552,8 +696,7 @@ mod tests {
.evaluate(&$BATCH)?
.into_array($BATCH.num_rows())
.expect("Failed to convert to array");
- let result =
- as_boolean_array(&result).expect("failed to downcast to
BooleanArray");
+ let result = as_boolean_array(&result);
let expected = &BooleanArray::from($EXPECTED);
assert_eq!(expected, result);
}};
@@ -1134,10 +1277,10 @@ mod tests {
expressions::cast(lit(2i32), &schema, DataType::Int64)?,
try_cast(lit(3.13f32), &schema, DataType::Int64)?,
];
- let result = try_cast_static_filter_to_set(&phy_exprs,
&schema).unwrap();
+ let static_filter = try_cast_static_filter_to_set(&phy_exprs,
&schema).unwrap();
let array = Int64Array::from(vec![1, 2, 3, 4]);
- let r = result.contains(&array, false).unwrap();
+ let r = static_filter.contains(&array, false).unwrap();
assert_eq!(r, BooleanArray::from(vec![true, true, true, false]));
try_cast_static_filter_to_set(&phy_exprs, &schema).unwrap();
@@ -1514,4 +1657,1166 @@ mod tests {
assert_snapshot!(display_string, @"a@0 NOT IN (SET) ([a, b, NULL])");
Ok(())
}
+
+ #[test]
+ fn in_list_struct() -> Result<()> {
+ // Create schema with a struct column
+ let struct_fields = Fields::from(vec![
+ Field::new("x", DataType::Int32, false),
+ Field::new("y", DataType::Utf8, false),
+ ]);
+ let schema = Schema::new(vec![Field::new(
+ "a",
+ DataType::Struct(struct_fields.clone()),
+ true,
+ )]);
+
+ // Create test data: array of structs
+ let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+ let struct_array =
+ StructArray::new(struct_fields.clone(), vec![x_array, y_array],
None);
+
+ let col_a = col("a", &schema)?;
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(struct_array)])?;
+
+ // Create literal structs for the IN list
+ // Struct {x: 1, y: "a"}
+ let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(StringArray::from(vec!["a"])),
+ ],
+ None,
+ )));
+
+ // Struct {x: 3, y: "c"}
+ let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![3])),
+ Arc::new(StringArray::from(vec!["c"])),
+ ],
+ None,
+ )));
+
+ // Test: a IN ({1, "a"}, {3, "c"})
+ let list = vec![lit(struct1.clone()), lit(struct3.clone())];
+ in_list_raw!(
+ batch,
+ list.clone(),
+ &false,
+ vec![Some(true), Some(false), Some(true)],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ // Test: a NOT IN ({1, "a"}, {3, "c"})
+ in_list_raw!(
+ batch,
+ list,
+ &true,
+ vec![Some(false), Some(true), Some(false)],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn in_list_struct_with_nulls() -> Result<()> {
+ // Create schema with a struct column
+ let struct_fields = Fields::from(vec![
+ Field::new("x", DataType::Int32, false),
+ Field::new("y", DataType::Utf8, false),
+ ]);
+ let schema = Schema::new(vec![Field::new(
+ "a",
+ DataType::Struct(struct_fields.clone()),
+ true,
+ )]);
+
+ // Create test data with a null struct
+ let x_array = Arc::new(Int32Array::from(vec![1, 2]));
+ let y_array = Arc::new(StringArray::from(vec!["a", "b"]));
+ let struct_array = StructArray::new(
+ struct_fields.clone(),
+ vec![x_array, y_array],
+ Some(NullBuffer::from(vec![true, false])),
+ );
+
+ let col_a = col("a", &schema)?;
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(struct_array)])?;
+
+ // Create literal struct for the IN list
+ let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(StringArray::from(vec!["a"])),
+ ],
+ None,
+ )));
+
+ // Test: a IN ({1, "a"})
+ let list = vec![lit(struct1.clone())];
+ in_list_raw!(
+ batch,
+ list.clone(),
+ &false,
+ vec![Some(true), None],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ // Test: a NOT IN ({1, "a"})
+ in_list_raw!(
+ batch,
+ list,
+ &true,
+ vec![Some(false), None],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn in_list_struct_with_null_in_list() -> Result<()> {
+ // Create schema with a struct column
+ let struct_fields = Fields::from(vec![
+ Field::new("x", DataType::Int32, false),
+ Field::new("y", DataType::Utf8, false),
+ ]);
+ let schema = Schema::new(vec![Field::new(
+ "a",
+ DataType::Struct(struct_fields.clone()),
+ true,
+ )]);
+
+ // Create test data
+ let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+ let struct_array =
+ StructArray::new(struct_fields.clone(), vec![x_array, y_array],
None);
+
+ let col_a = col("a", &schema)?;
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(struct_array)])?;
+
+ // Create literal structs including a NULL
+ let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(StringArray::from(vec!["a"])),
+ ],
+ None,
+ )));
+
+ let null_struct = ScalarValue::Struct(Arc::new(StructArray::new_null(
+ struct_fields.clone(),
+ 1,
+ )));
+
+ // Test: a IN ({1, "a"}, NULL)
+ let list = vec![lit(struct1), lit(null_struct.clone())];
+ in_list_raw!(
+ batch,
+ list.clone(),
+ &false,
+ vec![Some(true), None, None],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ // Test: a NOT IN ({1, "a"}, NULL)
+ in_list_raw!(
+ batch,
+ list,
+ &true,
+ vec![Some(false), None, None],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn in_list_nested_struct() -> Result<()> {
+ // Create nested struct schema
+ let inner_struct_fields = Fields::from(vec![
+ Field::new("a", DataType::Int32, false),
+ Field::new("b", DataType::Utf8, false),
+ ]);
+ let outer_struct_fields = Fields::from(vec![
+ Field::new(
+ "inner",
+ DataType::Struct(inner_struct_fields.clone()),
+ false,
+ ),
+ Field::new("c", DataType::Int32, false),
+ ]);
+ let schema = Schema::new(vec![Field::new(
+ "x",
+ DataType::Struct(outer_struct_fields.clone()),
+ true,
+ )]);
+
+ // Create test data with nested structs
+ let inner1 = Arc::new(StructArray::new(
+ inner_struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1, 2])),
+ Arc::new(StringArray::from(vec!["x", "y"])),
+ ],
+ None,
+ ));
+ let c_array = Arc::new(Int32Array::from(vec![10, 20]));
+ let outer_array =
+ StructArray::new(outer_struct_fields.clone(), vec![inner1,
c_array], None);
+
+ let col_x = col("x", &schema)?;
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(outer_array)])?;
+
+ // Create a nested struct literal matching the first row
+ let inner_match = Arc::new(StructArray::new(
+ inner_struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(StringArray::from(vec!["x"])),
+ ],
+ None,
+ ));
+ let outer_match = ScalarValue::Struct(Arc::new(StructArray::new(
+ outer_struct_fields.clone(),
+ vec![inner_match, Arc::new(Int32Array::from(vec![10]))],
+ None,
+ )));
+
+ // Test: x IN ({{1, "x"}, 10})
+ let list = vec![lit(outer_match)];
+ in_list_raw!(
+ batch,
+ list.clone(),
+ &false,
+ vec![Some(true), Some(false)],
+ Arc::clone(&col_x),
+ &schema
+ );
+
+ // Test: x NOT IN ({{1, "x"}, 10})
+ in_list_raw!(
+ batch,
+ list,
+ &true,
+ vec![Some(false), Some(true)],
+ Arc::clone(&col_x),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn in_list_struct_with_exprs_not_array() -> Result<()> {
+ // Test InList using expressions (not the array constructor) with
structs
+ // By using InListExpr::new directly, we bypass the array optimization
+ // and use the Exprs variant, testing the expression evaluation path
+
+ // Create schema with a struct column {x: Int32, y: Utf8}
+ let struct_fields = Fields::from(vec![
+ Field::new("x", DataType::Int32, false),
+ Field::new("y", DataType::Utf8, false),
+ ]);
+ let schema = Schema::new(vec![Field::new(
+ "a",
+ DataType::Struct(struct_fields.clone()),
+ true,
+ )]);
+
+ // Create test data: array of structs [{1, "a"}, {2, "b"}, {3, "c"}]
+ let x_array = Arc::new(Int32Array::from(vec![1, 2, 3]));
+ let y_array = Arc::new(StringArray::from(vec!["a", "b", "c"]));
+ let struct_array =
+ StructArray::new(struct_fields.clone(), vec![x_array, y_array],
None);
+
+ let col_a = col("a", &schema)?;
+ let batch =
+ RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(struct_array)])?;
+
+ // Create struct literals with the SAME shape (so types are compatible)
+ // Struct {x: 1, y: "a"}
+ let struct1 = ScalarValue::Struct(Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(StringArray::from(vec!["a"])),
+ ],
+ None,
+ )));
+
+ // Struct {x: 3, y: "c"}
+ let struct3 = ScalarValue::Struct(Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![3])),
+ Arc::new(StringArray::from(vec!["c"])),
+ ],
+ None,
+ )));
+
+ // Create list of struct expressions
+ let list = vec![lit(struct1), lit(struct3)];
+
+ // Use InListExpr::new directly (not in_list()) to bypass array
optimization
+ // This creates an InList without a static filter
+ let expr = Arc::new(InListExpr::new(Arc::clone(&col_a), list, false,
None));
+
+ // Verify that the expression doesn't have a static filter
+ // by checking the display string does NOT contain "(SET)"
+ let display_string = expr.to_string();
+ assert!(
+ !display_string.contains("(SET)"),
+ "Expected display string to NOT contain '(SET)' (should use Exprs
variant), but got: {display_string}",
+ );
+
+ // Evaluate the expression
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+
+ // Expected: first row {1, "a"} matches struct1,
+ // second row {2, "b"} doesn't match,
+ // third row {3, "c"} matches struct3
+ let expected = BooleanArray::from(vec![Some(true), Some(false),
Some(true)]);
+ assert_eq!(result, &expected);
+
+ // Test NOT IN as well
+ let expr_not = Arc::new(InListExpr::new(
+ Arc::clone(&col_a),
+ vec![
+ lit(ScalarValue::Struct(Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![1])),
+ Arc::new(StringArray::from(vec!["a"])),
+ ],
+ None,
+ )))),
+ lit(ScalarValue::Struct(Arc::new(StructArray::new(
+ struct_fields.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![3])),
+ Arc::new(StringArray::from(vec!["c"])),
+ ],
+ None,
+ )))),
+ ],
+ true,
+ None,
+ ));
+
+ let result_not =
expr_not.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result_not = as_boolean_array(&result_not);
+
+ let expected_not = BooleanArray::from(vec![Some(false), Some(true),
Some(false)]);
+ assert_eq!(result_not, &expected_not);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_null_handling_comprehensive() -> Result<()> {
+ // Comprehensive test demonstrating SQL three-valued logic for IN
expressions
+ // This test explicitly shows all possible outcomes: true, false, and
null
+ let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
+
+ // Test data: [1, 2, 3, null]
+ // - 1 will match in both lists
+ // - 2 will not match in either list
+ // - 3 will not match in either list
+ // - null is always null
+ let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
+ let col_a = col("a", &schema)?;
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+
+ // Case 1: List WITHOUT null - demonstrates true/false/null outcomes
+ // "a IN (1, 4)" - 1 matches, 2 and 3 don't match, null is null
+ let list = vec![lit(1i64), lit(4i64)];
+ in_list!(
+ batch,
+ list,
+ &false,
+ vec![
+ Some(true), // 1 is in the list → true
+ Some(false), // 2 is not in the list → false
+ Some(false), // 3 is not in the list → false
+ None, // null IN (...) → null (SQL three-valued logic)
+ ],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ // Case 2: List WITH null - demonstrates null propagation for
non-matches
+ // "a IN (1, NULL)" - 1 matches (true), 2/3 don't match but list has
null (null), null is null
+ let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
+ in_list!(
+ batch,
+ list,
+ &false,
+ vec![
+ Some(true), // 1 is in the list → true (found match)
+ None, // 2 is not in list, but list has NULL → null (might
match NULL)
+ None, // 3 is not in list, but list has NULL → null (might
match NULL)
+ None, // null IN (...) → null (SQL three-valued logic)
+ ],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_with_only_nulls() -> Result<()> {
+ // Edge case: IN list contains ONLY null values
+ let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
+ let a = Int64Array::from(vec![Some(1), Some(2), None]);
+ let col_a = col("a", &schema)?;
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+
+ // "a IN (NULL, NULL)" - list has only nulls
+ let list = vec![lit(ScalarValue::Int64(None)),
lit(ScalarValue::Int64(None))];
+
+ // All results should be NULL because:
+ // - Non-null values (1, 2) can't match anything concrete, but list
might contain matching value
+ // - NULL value is always NULL in IN expressions
+ in_list!(
+ batch,
+ list.clone(),
+ &false,
+ vec![None, None, None],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ // "a NOT IN (NULL, NULL)" - list has only nulls
+ // All results should still be NULL due to three-valued logic
+ in_list!(
+ batch,
+ list,
+ &true,
+ vec![None, None, None],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_multiple_nulls_deduplication() -> Result<()> {
+ // Test that multiple NULLs in the list are handled correctly
+ // This verifies deduplication doesn't break null handling
+ let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
+ let col_a = col("a", &schema)?;
+
+ // Create array with multiple nulls: [1, 2, NULL, NULL, 3, NULL]
+ let array = Arc::new(Int64Array::from(vec![
+ Some(1),
+ Some(2),
+ None,
+ None,
+ Some(3),
+ None,
+ ])) as ArrayRef;
+
+ // Create InListExpr from array
+ let expr = Arc::new(InListExpr::try_new_from_array(
+ Arc::clone(&col_a),
+ array,
+ false,
+ )?) as Arc<dyn PhysicalExpr>;
+
+ // Create test data: [1, 2, 3, 4, null]
+ let a = Int64Array::from(vec![Some(1), Some(2), Some(3), Some(4),
None]);
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+
+ // Evaluate the expression
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+
+ // Expected behavior with multiple NULLs in list:
+ // - Values in the list (1,2,3) → true
+ // - Values not in the list (4) → NULL (because list contains NULL)
+ // - NULL input → NULL
+ let expected = BooleanArray::from(vec![
+ Some(true), // 1 is in list
+ Some(true), // 2 is in list
+ Some(true), // 3 is in list
+ None, // 4 not in list, but list has NULLs
+ None, // NULL input
+ ]);
+ assert_eq!(result, &expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_not_in_null_handling_comprehensive() -> Result<()> {
+ // Comprehensive test demonstrating SQL three-valued logic for NOT IN
expressions
+ // This test explicitly shows all possible outcomes for NOT IN: true,
false, and null
+ let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
+
+ // Test data: [1, 2, 3, null]
+ let a = Int64Array::from(vec![Some(1), Some(2), Some(3), None]);
+ let col_a = col("a", &schema)?;
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+
+ // Case 1: List WITHOUT null - demonstrates true/false/null outcomes
for NOT IN
+ // "a NOT IN (1, 4)" - 1 matches (false), 2 and 3 don't match (true),
null is null
+ let list = vec![lit(1i64), lit(4i64)];
+ in_list!(
+ batch,
+ list,
+ &true,
+ vec![
+ Some(false), // 1 is in the list → NOT IN returns false
+ Some(true), // 2 is not in the list → NOT IN returns true
+ Some(true), // 3 is not in the list → NOT IN returns true
+ None, // null NOT IN (...) → null (SQL three-valued
logic)
+ ],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ // Case 2: List WITH null - demonstrates null propagation for NOT IN
+ // "a NOT IN (1, NULL)" - 1 matches (false), 2/3 don't match but list
has null (null), null is null
+ let list = vec![lit(1i64), lit(ScalarValue::Int64(None))];
+ in_list!(
+ batch,
+ list,
+ &true,
+ vec![
+ Some(false), // 1 is in the list → NOT IN returns false
+ None, // 2 is not in known values, but list has NULL → null
(can't prove it's not in list)
+ None, // 3 is not in known values, but list has NULL → null
(can't prove it's not in list)
+ None, // null NOT IN (...) → null (SQL three-valued logic)
+ ],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_null_type_column() -> Result<()> {
+ // Test with a column that has DataType::Null (not just nullable
values)
+ // All values in a NullArray are null by definition
+ let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
+ let a = NullArray::new(3);
+ let col_a = col("a", &schema)?;
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+
+ // "null_column IN (1, 2)" - comparing Null type against Int64 list
+ // Note: This tests type coercion behavior between Null and Int64
+ let list = vec![lit(1i64), lit(2i64)];
+
+ // All results should be NULL because:
+ // - Every value in the column is null (DataType::Null)
+ // - null IN (anything) always returns null per SQL three-valued logic
+ in_list!(
+ batch,
+ list.clone(),
+ &false,
+ vec![None, None, None],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ // "null_column NOT IN (1, 2)"
+ // Same behavior for NOT IN - null NOT IN (anything) is still null
+ in_list!(
+ batch,
+ list,
+ &true,
+ vec![None, None, None],
+ Arc::clone(&col_a),
+ &schema
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_null_type_list() -> Result<()> {
+ // Test with a list that has DataType::Null
+ let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
+ let a = Int64Array::from(vec![Some(1), Some(2), None]);
+ let col_a = col("a", &schema)?;
+
+ // Create a NullArray as the list
+ let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
+
+ // Try to create InListExpr with a NullArray list
+ // This tests whether try_new_from_array can handle Null type arrays
+ let expr = Arc::new(InListExpr::try_new_from_array(
+ Arc::clone(&col_a),
+ null_array,
+ false,
+ )?) as Arc<dyn PhysicalExpr>;
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+
+ // If it succeeds, all results should be NULL
+ // because the list contains only null type values
+ let expected = BooleanArray::from(vec![None, None, None]);
+ assert_eq!(result, &expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_null_type_both() -> Result<()> {
+ // Test when both column and list are DataType::Null
+ let schema = Schema::new(vec![Field::new("a", DataType::Null, true)]);
+ let a = NullArray::new(3);
+ let col_a = col("a", &schema)?;
+
+ // Create a NullArray as the list
+ let null_array = Arc::new(NullArray::new(2)) as ArrayRef;
+
+ // Try to create InListExpr with both Null types
+ let expr = Arc::new(InListExpr::try_new_from_array(
+ Arc::clone(&col_a),
+ null_array,
+ false,
+ )?) as Arc<dyn PhysicalExpr>;
+
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(a)])?;
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+
+ // If successful, all results should be NULL
+ // null IN [null, null] -> null
+ let expected = BooleanArray::from(vec![None, None, None]);
+ assert_eq!(result, &expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_comprehensive_null_handling() -> Result<()> {
+ // Comprehensive test for IN LIST operations with various NULL
handling scenarios.
+ // This test covers the key cases validated against DuckDB as the
source of truth.
+ //
+ // Note: Some scalar literal tests (like NULL IN (1, 2)) are omitted
as they
+ // appear to expose an issue with static filter optimization. These
are covered
+ // by existing tests like in_list_no_cols().
+
+ let schema = Arc::new(Schema::new(vec![Field::new("b",
DataType::Int32, true)]));
+ let col_b = col("b", &schema)?;
+ let null_i32 = ScalarValue::Int32(None);
+
+ // Helper to create a batch
+ let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
+ let array = Arc::new(Int32Array::from(values));
+ Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
+ };
+
+ // Helper to run a test
+ let run_test = |batch: &RecordBatch,
+ expr: Arc<dyn PhysicalExpr>,
+ list: Vec<Arc<dyn PhysicalExpr>>,
+ expected: Vec<Option<bool>>|
+ -> Result<()> {
+ let in_expr = in_list(expr, list, &false, schema.as_ref())?;
+ let result =
in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ assert_eq!(result, &BooleanArray::from(expected));
+ Ok(())
+ };
+
+ //
========================================================================
+ // COLUMN TESTS - col(b) IN [1, 2]
+ //
========================================================================
+
+ // [1] IN (1, 2) => [TRUE]
+ let batch = make_batch(vec![Some(1)])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(1i32), lit(2i32)],
+ vec![Some(true)],
+ )?;
+
+ // [1, 2] IN (1, 2) => [TRUE, TRUE]
+ let batch = make_batch(vec![Some(1), Some(2)])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(1i32), lit(2i32)],
+ vec![Some(true), Some(true)],
+ )?;
+
+ // [3, 4] IN (1, 2) => [FALSE, FALSE]
+ let batch = make_batch(vec![Some(3), Some(4)])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(1i32), lit(2i32)],
+ vec![Some(false), Some(false)],
+ )?;
+
+ // [1, NULL] IN (1, 2) => [TRUE, NULL]
+ let batch = make_batch(vec![Some(1), None])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(1i32), lit(2i32)],
+ vec![Some(true), None],
+ )?;
+
+ // [3, NULL] IN (1, 2) => [FALSE, NULL] (no match, NULL is NULL)
+ let batch = make_batch(vec![Some(3), None])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(1i32), lit(2i32)],
+ vec![Some(false), None],
+ )?;
+
+ //
========================================================================
+ // COLUMN WITH NULL IN LIST - col(b) IN [NULL, 1]
+ //
========================================================================
+
+ // [1] IN (NULL, 1) => [TRUE] (found match)
+ let batch = make_batch(vec![Some(1)])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(null_i32.clone()), lit(1i32)],
+ vec![Some(true)],
+ )?;
+
+ // [2] IN (NULL, 1) => [NULL] (no match, but list has NULL)
+ let batch = make_batch(vec![Some(2)])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(null_i32.clone()), lit(1i32)],
+ vec![None],
+ )?;
+
+ // [NULL] IN (NULL, 1) => [NULL]
+ let batch = make_batch(vec![None])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(null_i32.clone()), lit(1i32)],
+ vec![None],
+ )?;
+
+ //
========================================================================
+ // COLUMN WITH ALL NULLS IN LIST - col(b) IN [NULL, NULL]
+ //
========================================================================
+
+ // [1] IN (NULL, NULL) => [NULL]
+ let batch = make_batch(vec![Some(1)])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(null_i32.clone()), lit(null_i32.clone())],
+ vec![None],
+ )?;
+
+ // [NULL] IN (NULL, NULL) => [NULL]
+ let batch = make_batch(vec![None])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(null_i32.clone()), lit(null_i32.clone())],
+ vec![None],
+ )?;
+
+ //
========================================================================
+ // LITERAL IN LIST WITH COLUMN - lit(1) IN [2, col(b)]
+ //
========================================================================
+
+ // 1 IN (2, [1]) => [TRUE] (matches column value)
+ let batch = make_batch(vec![Some(1)])?;
+ run_test(
+ &batch,
+ lit(1i32),
+ vec![lit(2i32), Arc::clone(&col_b)],
+ vec![Some(true)],
+ )?;
+
+ // 1 IN (2, [3]) => [FALSE] (no match)
+ let batch = make_batch(vec![Some(3)])?;
+ run_test(
+ &batch,
+ lit(1i32),
+ vec![lit(2i32), Arc::clone(&col_b)],
+ vec![Some(false)],
+ )?;
+
+ // 1 IN (2, [NULL]) => [NULL] (no match, column is NULL)
+ let batch = make_batch(vec![None])?;
+ run_test(
+ &batch,
+ lit(1i32),
+ vec![lit(2i32), Arc::clone(&col_b)],
+ vec![None],
+ )?;
+
+ //
========================================================================
+ // COLUMN IN LIST CONTAINING ITSELF - col(b) IN [1, col(b)]
+ //
========================================================================
+
+ // [1] IN (1, [1]) => [TRUE] (always matches - either list literal or
itself)
+ let batch = make_batch(vec![Some(1)])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(1i32), Arc::clone(&col_b)],
+ vec![Some(true)],
+ )?;
+
+ // [2] IN (1, [2]) => [TRUE] (matches itself)
+ let batch = make_batch(vec![Some(2)])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(1i32), Arc::clone(&col_b)],
+ vec![Some(true)],
+ )?;
+
+ // [NULL] IN (1, [NULL]) => [NULL] (NULL is never equal to anything)
+ let batch = make_batch(vec![None])?;
+ run_test(
+ &batch,
+ Arc::clone(&col_b),
+ vec![lit(1i32), Arc::clone(&col_b)],
+ vec![None],
+ )?;
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_scalar_literal_cases() -> Result<()> {
+ // Test scalar literal cases (both NULL and non-NULL) to ensure SQL
three-valued
+ // logic is correctly implemented. This covers the important case
where a scalar
+ // value is tested against a list containing NULL.
+
+ let schema = Arc::new(Schema::new(vec![Field::new("b",
DataType::Int32, true)]));
+ let null_i32 = ScalarValue::Int32(None);
+
+ // Helper to create a batch
+ let make_batch = |values: Vec<Option<i32>>| -> Result<RecordBatch> {
+ let array = Arc::new(Int32Array::from(values));
+ Ok(RecordBatch::try_new(Arc::clone(&schema), vec![array])?)
+ };
+
+ // Helper to run a test
+ let run_test = |batch: &RecordBatch,
+ expr: Arc<dyn PhysicalExpr>,
+ list: Vec<Arc<dyn PhysicalExpr>>,
+ negated: bool,
+ expected: Vec<Option<bool>>|
+ -> Result<()> {
+ let in_expr = in_list(expr, list, &negated, schema.as_ref())?;
+ let result =
in_expr.evaluate(batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ let expected_array = BooleanArray::from(expected);
+ assert_eq!(
+ result,
+ &expected_array,
+ "Expected {:?}, got {:?}",
+ expected_array,
+ result.iter().collect::<Vec<_>>()
+ );
+ Ok(())
+ };
+
+ let batch = make_batch(vec![Some(1)])?;
+
+ //
========================================================================
+ // NULL LITERAL TESTS
+ // According to SQL semantics, NULL IN (any_list) should always return
NULL
+ //
========================================================================
+
+ // NULL IN (1, 1) => NULL
+ run_test(
+ &batch,
+ lit(null_i32.clone()),
+ vec![lit(1i32), lit(1i32)],
+ false,
+ vec![None],
+ )?;
+
+ // NULL IN (NULL, 1) => NULL
+ run_test(
+ &batch,
+ lit(null_i32.clone()),
+ vec![lit(null_i32.clone()), lit(1i32)],
+ false,
+ vec![None],
+ )?;
+
+ // NULL IN (NULL, NULL) => NULL
+ run_test(
+ &batch,
+ lit(null_i32.clone()),
+ vec![lit(null_i32.clone()), lit(null_i32.clone())],
+ false,
+ vec![None],
+ )?;
+
+ //
========================================================================
+ // NON-NULL SCALAR LITERALS WITH NULL IN LIST - Int32
+ // When a scalar value is NOT in a list containing NULL, the result is
NULL
+ // When a scalar value IS in the list, the result is TRUE (NULL
doesn't matter)
+ //
========================================================================
+
+ // 3 IN (0, 1, 2, NULL) => NULL (not in list, but list has NULL)
+ run_test(
+ &batch,
+ lit(3i32),
+ vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
+ false,
+ vec![None],
+ )?;
+
+ // 3 NOT IN (0, 1, 2, NULL) => NULL (not in list, but list has NULL)
+ run_test(
+ &batch,
+ lit(3i32),
+ vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
+ true,
+ vec![None],
+ )?;
+
+ // 1 IN (0, 1, 2, NULL) => TRUE (found match, NULL doesn't matter)
+ run_test(
+ &batch,
+ lit(1i32),
+ vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
+ false,
+ vec![Some(true)],
+ )?;
+
+ // 1 NOT IN (0, 1, 2, NULL) => FALSE (found match, NULL doesn't matter)
+ run_test(
+ &batch,
+ lit(1i32),
+ vec![lit(0i32), lit(1i32), lit(2i32), lit(null_i32.clone())],
+ true,
+ vec![Some(false)],
+ )?;
+
+ //
========================================================================
+ // NON-NULL SCALAR LITERALS WITH NULL IN LIST - String
+ // Same semantics as Int32 but with string type
+ //
========================================================================
+
+ let schema_str =
+ Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
+ let batch_str = RecordBatch::try_new(
+ Arc::clone(&schema_str),
+ vec![Arc::new(StringArray::from(vec![Some("dummy")]))],
+ )?;
+ let null_str = ScalarValue::Utf8(None);
+
+ let run_test_str = |expr: Arc<dyn PhysicalExpr>,
+ list: Vec<Arc<dyn PhysicalExpr>>,
+ negated: bool,
+ expected: Vec<Option<bool>>|
+ -> Result<()> {
+ let in_expr = in_list(expr, list, &negated, schema_str.as_ref())?;
+ let result = in_expr
+ .evaluate(&batch_str)?
+ .into_array(batch_str.num_rows())?;
+ let result = as_boolean_array(&result);
+ let expected_array = BooleanArray::from(expected);
+ assert_eq!(
+ result,
+ &expected_array,
+ "Expected {:?}, got {:?}",
+ expected_array,
+ result.iter().collect::<Vec<_>>()
+ );
+ Ok(())
+ };
+
+ // 'c' IN ('a', 'b', NULL) => NULL (not in list, but list has NULL)
+ run_test_str(
+ lit("c"),
+ vec![lit("a"), lit("b"), lit(null_str.clone())],
+ false,
+ vec![None],
+ )?;
+
+ // 'c' NOT IN ('a', 'b', NULL) => NULL (not in list, but list has NULL)
+ run_test_str(
+ lit("c"),
+ vec![lit("a"), lit("b"), lit(null_str.clone())],
+ true,
+ vec![None],
+ )?;
+
+ // 'a' IN ('a', 'b', NULL) => TRUE (found match, NULL doesn't matter)
+ run_test_str(
+ lit("a"),
+ vec![lit("a"), lit("b"), lit(null_str.clone())],
+ false,
+ vec![Some(true)],
+ )?;
+
+ // 'a' NOT IN ('a', 'b', NULL) => FALSE (found match, NULL doesn't
matter)
+ run_test_str(
+ lit("a"),
+ vec![lit("a"), lit("b"), lit(null_str.clone())],
+ true,
+ vec![Some(false)],
+ )?;
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_in_list_tuple_cases() -> Result<()> {
+ // Test tuple/struct cases from the original request: (lit, lit) IN
(lit, lit)
+ // These test row-wise comparisons like (1, 2) IN ((1, 2), (3, 4))
+
+ let schema = Arc::new(Schema::new(vec![Field::new("b",
DataType::Int32, true)]));
+
+ // Helper to create struct scalars for tuple comparisons
+ let make_struct = |v1: Option<i32>, v2: Option<i32>| -> ScalarValue {
+ let fields = Fields::from(vec![
+ Field::new("field_0", DataType::Int32, true),
+ Field::new("field_1", DataType::Int32, true),
+ ]);
+ ScalarValue::Struct(Arc::new(StructArray::new(
+ fields,
+ vec![
+ Arc::new(Int32Array::from(vec![v1])),
+ Arc::new(Int32Array::from(vec![v2])),
+ ],
+ None,
+ )))
+ };
+
+ // Need a single row batch for scalar tests
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![Arc::new(Int32Array::from(vec![Some(1)]))],
+ )?;
+
+ // Helper to run tuple tests
+ let run_tuple_test = |lhs: ScalarValue,
+ list: Vec<ScalarValue>,
+ expected: Vec<Option<bool>>|
+ -> Result<()> {
+ let expr = in_list(
+ lit(lhs),
+ list.into_iter().map(lit).collect(),
+ &false,
+ schema.as_ref(),
+ )?;
+ let result = expr.evaluate(&batch)?.into_array(batch.num_rows())?;
+ let result = as_boolean_array(&result);
+ assert_eq!(result, &BooleanArray::from(expected));
+ Ok(())
+ };
+
+ // (NULL, NULL) IN ((1, 2)) => FALSE (tuples don't match)
+ run_tuple_test(
+ make_struct(None, None),
+ vec![make_struct(Some(1), Some(2))],
+ vec![Some(false)],
+ )?;
+
+ // (NULL, NULL) IN ((NULL, 1)) => FALSE
+ run_tuple_test(
+ make_struct(None, None),
+ vec![make_struct(None, Some(1))],
+ vec![Some(false)],
+ )?;
+
+ // (NULL, NULL) IN ((NULL, NULL)) => TRUE (exact match including nulls)
+ run_tuple_test(
+ make_struct(None, None),
+ vec![make_struct(None, None)],
+ vec![Some(true)],
+ )?;
+
+ // (NULL, 1) IN ((1, 2)) => FALSE
+ run_tuple_test(
+ make_struct(None, Some(1)),
+ vec![make_struct(Some(1), Some(2))],
+ vec![Some(false)],
+ )?;
+
+ // (NULL, 1) IN ((NULL, 1)) => TRUE (exact match)
+ run_tuple_test(
+ make_struct(None, Some(1)),
+ vec![make_struct(None, Some(1))],
+ vec![Some(true)],
+ )?;
+
+ // (NULL, 1) IN ((NULL, NULL)) => FALSE
+ run_tuple_test(
+ make_struct(None, Some(1)),
+ vec![make_struct(None, None)],
+ vec![Some(false)],
+ )?;
+
+ // (1, 2) IN ((1, 2)) => TRUE
+ run_tuple_test(
+ make_struct(Some(1), Some(2)),
+ vec![make_struct(Some(1), Some(2))],
+ vec![Some(true)],
+ )?;
+
+ // (1, 3) IN ((1, 2)) => FALSE
+ run_tuple_test(
+ make_struct(Some(1), Some(3)),
+ vec![make_struct(Some(1), Some(2))],
+ vec![Some(false)],
+ )?;
+
+ // (4, 4) IN ((1, 2)) => FALSE
+ run_tuple_test(
+ make_struct(Some(4), Some(4)),
+ vec![make_struct(Some(1), Some(2))],
+ vec![Some(false)],
+ )?;
+
+ // (1, 1) IN ((NULL, 1)) => FALSE
+ run_tuple_test(
+ make_struct(Some(1), Some(1)),
+ vec![make_struct(None, Some(1))],
+ vec![Some(false)],
+ )?;
+
+ // (1, 1) IN ((NULL, NULL)) => FALSE
+ run_tuple_test(
+ make_struct(Some(1), Some(1)),
+ vec![make_struct(None, None)],
+ vec![Some(false)],
+ )?;
+
+ Ok(())
+ }
}
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index f837423d2b..4cba85a851 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -1680,7 +1680,7 @@ pub fn update_hash(
hash_map: &mut dyn JoinHashMapType,
offset: usize,
random_state: &RandomState,
- hashes_buffer: &mut Vec<u64>,
+ hashes_buffer: &mut [u64],
deleted_offset: usize,
fifo_hashmap: bool,
) -> Result<()> {
diff --git a/datafusion/sqllogictest/test_files/array.slt
b/datafusion/sqllogictest/test_files/array.slt
index c69e7a19e4..77197721e1 100644
--- a/datafusion/sqllogictest/test_files/array.slt
+++ b/datafusion/sqllogictest/test_files/array.slt
@@ -6436,7 +6436,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
-06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
+06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
(SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------LazyMemoryExec: partitions=1,
batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
@@ -6464,7 +6464,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
-06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
+06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
(SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------LazyMemoryExec: partitions=1,
batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
@@ -6492,7 +6492,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
-06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
+06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
(SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------LazyMemoryExec: partitions=1,
batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
@@ -6520,7 +6520,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
-06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
+06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
(SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------LazyMemoryExec: partitions=1,
batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
@@ -6548,7 +6548,7 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
05)--------ProjectionExec: expr=[]
-06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
+06)----------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN
(SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c])
07)------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
08)--------------LazyMemoryExec: partitions=1,
batch_generators=[generate_series: start=1, end=100000, batch_size=8192]
diff --git a/datafusion/sqllogictest/test_files/expr.slt
b/datafusion/sqllogictest/test_files/expr.slt
index 87345b833e..df88d26c9c 100644
--- a/datafusion/sqllogictest/test_files/expr.slt
+++ b/datafusion/sqllogictest/test_files/expr.slt
@@ -1066,6 +1066,213 @@ SELECT '2' NOT IN ('a','b',NULL,1)
----
NULL
+# ========================================================================
+# Comprehensive IN LIST tests with NULL handling
+# These tests validate SQL three-valued logic for IN operations
+# ========================================================================
+
+# test_in_list_null_literals
+# NULL IN (any_list) should always return NULL per SQL three-valued logic
+
+query B
+SELECT NULL IN (1, 1)
+----
+NULL
+
+query B
+SELECT NULL IN (NULL, 1)
+----
+NULL
+
+query B
+SELECT NULL IN (NULL, NULL)
+----
+NULL
+
+# test_in_list_with_columns
+# Create test table for column-based IN LIST tests
+
+statement ok
+CREATE OR REPLACE TABLE in_list_test(b INT) AS VALUES (1), (2), (3), (4),
(NULL);
+
+# Test: b IN (1, 2) with various values
+
+query B
+SELECT b IN (1, 2) FROM in_list_test WHERE b = 1;
+----
+true
+
+query IB
+SELECT b, b IN (1, 2) FROM in_list_test WHERE b IN (1, 2) ORDER BY b;
+----
+1 true
+2 true
+
+query IB
+SELECT b, b IN (1, 2) FROM in_list_test WHERE b IN (3, 4) ORDER BY b;
+----
+3 false
+4 false
+
+query B
+SELECT b IN (1, 2) FROM in_list_test WHERE b = 1;
+----
+true
+
+query B
+SELECT b IN (1, 2) FROM in_list_test WHERE b = 3;
+----
+false
+
+query B
+SELECT b IN (1, 2) FROM in_list_test WHERE b IS NULL;
+----
+NULL
+
+# Test: b IN (NULL, 1) - list contains NULL
+
+query B
+SELECT b IN (NULL, 1) FROM in_list_test WHERE b = 1;
+----
+true
+
+query B
+SELECT b IN (NULL, 1) FROM in_list_test WHERE b = 2;
+----
+NULL
+
+query B
+SELECT b IN (NULL, 1) FROM in_list_test WHERE b IS NULL;
+----
+NULL
+
+# Test: b IN (NULL, NULL) - list contains only NULLs
+
+query B
+SELECT b IN (NULL, NULL) FROM in_list_test WHERE b = 1;
+----
+NULL
+
+query B
+SELECT b IN (NULL, NULL) FROM in_list_test WHERE b IS NULL;
+----
+NULL
+
+# Test: literal IN (list_with_column) - column appears in the list
+
+statement ok
+CREATE OR REPLACE TABLE in_list_col_test(b INT) AS VALUES (1), (3), (NULL);
+
+query B
+SELECT 1 IN (2, b) FROM in_list_col_test WHERE b = 1;
+----
+true
+
+query B
+SELECT 1 IN (2, b) FROM in_list_col_test WHERE b = 3;
+----
+false
+
+query B
+SELECT 1 IN (2, b) FROM in_list_col_test WHERE b IS NULL;
+----
+NULL
+
+# Test: b IN (1, b) - column references itself in list
+
+query B
+SELECT b IN (1, b) FROM in_list_col_test WHERE b = 1;
+----
+true
+
+query B
+SELECT b IN (1, b) FROM in_list_col_test WHERE b = 3;
+----
+true
+
+query B
+SELECT b IN (1, b) FROM in_list_col_test WHERE b IS NULL;
+----
+NULL
+
+# test_in_list_tuples
+# Test tuple/row-wise IN comparisons using struct syntax
+# Note: Using arrow_cast for precise type control
+
+# (NULL, NULL) IN ((1, 2)) => FALSE
+query B
+SELECT struct(arrow_cast(NULL, 'Int32'), arrow_cast(NULL, 'Int32')) IN
(struct(1, 2))
+----
+false
+
+# (NULL, NULL) IN ((NULL, 1)) => FALSE
+query B
+SELECT struct(arrow_cast(NULL, 'Int32'), arrow_cast(NULL, 'Int32')) IN
(struct(arrow_cast(NULL, 'Int32'), 1))
+----
+false
+
+# (NULL, NULL) IN ((NULL, NULL)) => TRUE (exact match)
+query B
+SELECT struct(arrow_cast(NULL, 'Int32'), arrow_cast(NULL, 'Int32')) IN
(struct(arrow_cast(NULL, 'Int32'), arrow_cast(NULL, 'Int32')))
+----
+true
+
+# (NULL, 1) IN ((1, 2)) => FALSE
+query B
+SELECT struct(arrow_cast(NULL, 'Int32'), 1) IN (struct(1, 2))
+----
+false
+
+# (NULL, 1) IN ((NULL, 1)) => TRUE (exact match)
+query B
+SELECT struct(arrow_cast(NULL, 'Int32'), 1) IN (struct(arrow_cast(NULL,
'Int32'), 1))
+----
+true
+
+# (NULL, 1) IN ((NULL, NULL)) => FALSE
+query B
+SELECT struct(arrow_cast(NULL, 'Int32'), 1) IN (struct(arrow_cast(NULL,
'Int32'), arrow_cast(NULL, 'Int32')))
+----
+false
+
+# (1, 2) IN ((1, 2)) => TRUE
+query B
+SELECT struct(1, 2) IN (struct(1, 2))
+----
+true
+
+# (1, 3) IN ((1, 2)) => FALSE
+query B
+SELECT struct(1, 3) IN (struct(1, 2))
+----
+false
+
+# (4, 4) IN ((1, 2)) => FALSE
+query B
+SELECT struct(4, 4) IN (struct(1, 2))
+----
+false
+
+# (1, 1) IN ((NULL, 1)) => FALSE
+query B
+SELECT struct(1, 1) IN (struct(NULL, 1))
+----
+false
+
+# (1, 1) IN ((NULL, NULL)) => FALSE
+query B
+SELECT struct(1, 1) IN (struct(NULL, NULL))
+----
+false
+
+# Cleanup test tables
+
+statement ok
+DROP TABLE in_list_test;
+
+statement ok
+DROP TABLE in_list_col_test;
+
query T
SELECT encode('tom','base64');
----
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part
b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part
index 12efc64555..d20f090fa5 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part
@@ -69,13 +69,13 @@ physical_plan
03)----CoalescePartitionsExec
04)------AggregateExec: mode=Partial, gby=[],
aggr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
05)--------CoalesceBatchesExec: target_batch_size=8192
-06)----------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND p_container@3
IN ([SM CASE, SM BOX, SM PACK, SM PKG]) AND l_quantity@0 >= Some(100),15,2 AND
l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND
p_container@3 IN ([MED BAG, MED BOX, MED PKG, MED PACK]) AND l_quantity@0 >=
Some(1000),15,2 AND l_quantity@0 <= Some(2000),15,2 AND p_size@2 <= 10 OR
p_brand@1 = Brand#34 AND p_container@3 IN ( [...]
+06)----------HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(l_partkey@0, p_partkey@0)], filter=p_brand@1 = Brand#12 AND p_container@3
IN (SET) ([SM CASE, SM BOX, SM PACK, SM PKG]) AND l_quantity@0 >=
Some(100),15,2 AND l_quantity@0 <= Some(1100),15,2 AND p_size@2 <= 5 OR
p_brand@1 = Brand#23 AND p_container@3 IN (SET) ([MED BAG, MED BOX, MED PKG,
MED PACK]) AND l_quantity@0 >= Some(1000),15,2 AND l_quantity@0 <=
Some(2000),15,2 AND p_size@2 <= 10 OR p_brand@1 = Brand#34 AND p_cont [...]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4),
input_partitions=4
09)----------------FilterExec: (l_quantity@1 >= Some(100),15,2 AND
l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND
l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND
l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR
REG) AND l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0,
l_quantity@1, l_extendedprice@2, l_discount@3]
10)------------------DataSourceExec: file_groups={4 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]},
projection=[l_partkey, l_quantity, l_extendedprice, l_di [...]
11)------------CoalesceBatchesExec: target_batch_size=8192
12)--------------RepartitionExec: partitioning=Hash([p_partkey@0], 4),
input_partitions=4
-13)----------------FilterExec: (p_brand@1 = Brand#12 AND p_container@3 IN ([SM
CASE, SM BOX, SM PACK, SM PKG]) AND p_size@2 <= 5 OR p_brand@1 = Brand#23 AND
p_container@3 IN ([MED BAG, MED BOX, MED PKG, MED PACK]) AND p_size@2 <= 10 OR
p_brand@1 = Brand#34 AND p_container@3 IN ([LG CASE, LG BOX, LG PACK, LG PKG])
AND p_size@2 <= 15) AND p_size@2 >= 1
+13)----------------FilterExec: (p_brand@1 = Brand#12 AND p_container@3 IN
(SET) ([SM CASE, SM BOX, SM PACK, SM PKG]) AND p_size@2 <= 5 OR p_brand@1 =
Brand#23 AND p_container@3 IN (SET) ([MED BAG, MED BOX, MED PKG, MED PACK]) AND
p_size@2 <= 10 OR p_brand@1 = Brand#34 AND p_container@3 IN (SET) ([LG CASE, LG
BOX, LG PACK, LG PKG]) AND p_size@2 <= 15) AND p_size@2 >= 1
14)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
15)--------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]},
projection=[p_partkey, p_brand, p_size, p_container], file_type=csv,
has_header=false
diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
index 818c7bc989..a9d95fb1ab 100644
--- a/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
+++ b/datafusion/sqllogictest/test_files/tpch/plans/q22.slt.part
@@ -90,7 +90,7 @@ physical_plan
14)--------------------------HashJoinExec: mode=Partitioned,
join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1,
c_acctbal@2]
15)----------------------------CoalesceBatchesExec: target_batch_size=8192
16)------------------------------RepartitionExec:
partitioning=Hash([c_custkey@0], 4), input_partitions=4
-17)--------------------------------FilterExec: substr(c_phone@1, 1, 2) IN
([13, 31, 23, 29, 30, 18, 17])
+17)--------------------------------FilterExec: substr(c_phone@1, 1, 2) IN
(SET) ([13, 31, 23, 29, 30, 18, 17])
18)----------------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
19)------------------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]},
projection=[c_custkey, c_phone, c_acctbal], file_type=csv, has_header=false
20)----------------------------CoalesceBatchesExec: target_batch_size=8192
@@ -99,6 +99,6 @@ physical_plan
23)--------------------AggregateExec: mode=Final, gby=[],
aggr=[avg(customer.c_acctbal)]
24)----------------------CoalescePartitionsExec
25)------------------------AggregateExec: mode=Partial, gby=[],
aggr=[avg(customer.c_acctbal)]
-26)--------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND
substr(c_phone@0, 1, 2) IN ([13, 31, 23, 29, 30, 18, 17]),
projection=[c_acctbal@1]
+26)--------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND
substr(c_phone@0, 1, 2) IN (SET) ([13, 31, 23, 29, 30, 18, 17]),
projection=[c_acctbal@1]
27)----------------------------RepartitionExec:
partitioning=RoundRobinBatch(4), input_partitions=1
28)------------------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/customer.tbl]]},
projection=[c_phone, c_acctbal], file_type=csv, has_header=false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]