martin-g commented on code in PR #20243: URL: https://github.com/apache/datafusion/pull/20243#discussion_r2787799491
########## datafusion/functions-nested/benches/array_set_ops.rs: ########## @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, ListArray}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; +use rand::SeedableRng; +use rand::prelude::SliceRandom; +use rand::rngs::StdRng; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 1000; +const ARRAY_SIZES: &[usize] = &[10, 50, 100]; +const SEED: u64 = 42; + +fn criterion_benchmark(c: &mut Criterion) { + bench_array_union(c); + bench_array_intersect(c); +} + +fn invoke_array_union(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ], + arg_fields: vec![ + Field::new("arr1", array1.data_type().clone(), false).into(), + Field::new("arr2", array2.data_type().clone(), false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new("result", array1.data_type().clone(), false).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ); +} + +fn invoke_array_intersect( Review Comment: This function look exactly the same as `invoke_array_union()`. Maybe drop one of them ?! ########## datafusion/functions-nested/benches/array_set_ops.rs: ########## @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, ListArray}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; +use rand::SeedableRng; +use rand::prelude::SliceRandom; +use rand::rngs::StdRng; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 1000; +const ARRAY_SIZES: &[usize] = &[10, 50, 100]; +const SEED: u64 = 42; + +fn criterion_benchmark(c: &mut Criterion) { + bench_array_union(c); + bench_array_intersect(c); +} + +fn invoke_array_union(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ], + arg_fields: vec![ + Field::new("arr1", array1.data_type().clone(), false).into(), + Field::new("arr2", array2.data_type().clone(), false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new("result", array1.data_type().clone(), false).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ); +} + +fn invoke_array_intersect( + udf: &impl ScalarUDFImpl, + array1: &ArrayRef, + array2: &ArrayRef, +) { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ], + arg_fields: vec![ + Field::new("arr1", array1.data_type().clone(), false).into(), + Field::new("arr2", array2.data_type().clone(), false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new("result", array1.data_type().clone(), false).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ); +} + +fn bench_array_union(c: &mut Criterion) { + let mut group = c.benchmark_group("array_union"); + let udf = ArrayUnion::new(); + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.8); + group.bench_with_input( + BenchmarkId::new("high_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_union(&udf, &array1, &array2)), + ); + } + + for &array_size in ARRAY_SIZES { Review Comment: the two `for &array_size in ARRAY_SIZES` loops could be simplified into one by using another outer/inner loop: `for (overlap_label, overlap_ratio) in &[("high_overlap", 0.8), ("low_overlap", 0.2)] { ... }` ########## datafusion/functions-nested/src/set_ops.rs: ########## @@ -358,69 +364,84 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>( "{set_op:?} is not implemented for '{l:?}' and '{r:?}'" ); - let mut offsets = vec![OffsetSize::usize_as(0)]; - let mut new_arrays = vec![]; + // Convert all values to rows in batch for performance. let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; - for (l_arr, r_arr) in l.iter().zip(r.iter()) { - let last_offset = *offsets.last().unwrap(); - - let (l_values, r_values) = match (l_arr, r_arr) { - (Some(l_arr), Some(r_arr)) => ( - converter.convert_columns(&[l_arr])?, - converter.convert_columns(&[r_arr])?, - ), - _ => { - offsets.push(last_offset); - continue; - } - }; - - let l_iter = l_values.iter().sorted().dedup(); - let values_set: HashSet<_> = l_iter.clone().collect(); - let mut rows = if set_op == SetOp::Union { - l_iter.collect() - } else { - vec![] - }; + let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; + let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; + let l_offsets = l.value_offsets(); + let r_offsets = r.value_offsets(); + + let mut result_offsets = Vec::with_capacity(l.len() + 1); + result_offsets.push(OffsetSize::usize_as(0)); + let mut final_rows = Vec::with_capacity(rows_l.num_rows()); Review Comment: for SetOp::Intercept the capacity could be optimised to `min(rows_l.num_rows(), rows_r.num_rows())` ########## datafusion/functions-nested/src/set_ops.rs: ########## @@ -358,69 +364,84 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>( "{set_op:?} is not implemented for '{l:?}' and '{r:?}'" ); - let mut offsets = vec![OffsetSize::usize_as(0)]; - let mut new_arrays = vec![]; + // Convert all values to rows in batch for performance. let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; - for (l_arr, r_arr) in l.iter().zip(r.iter()) { - let last_offset = *offsets.last().unwrap(); - - let (l_values, r_values) = match (l_arr, r_arr) { - (Some(l_arr), Some(r_arr)) => ( - converter.convert_columns(&[l_arr])?, - converter.convert_columns(&[r_arr])?, - ), - _ => { - offsets.push(last_offset); - continue; - } - }; - - let l_iter = l_values.iter().sorted().dedup(); - let values_set: HashSet<_> = l_iter.clone().collect(); - let mut rows = if set_op == SetOp::Union { - l_iter.collect() - } else { - vec![] - }; + let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; + let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; + let l_offsets = l.value_offsets(); + let r_offsets = r.value_offsets(); + + let mut result_offsets = Vec::with_capacity(l.len() + 1); + result_offsets.push(OffsetSize::usize_as(0)); + let mut final_rows = Vec::with_capacity(rows_l.num_rows()); + + // Reuse hash sets across iterations + let mut seen = HashSet::new(); + let mut r_set = HashSet::new(); + for i in 0..l.len() { + let last_offset = *result_offsets.last().unwrap(); + + if l.is_null(i) || r.is_null(i) { + result_offsets.push(last_offset); + continue; + } - for r_val in r_values.iter().sorted().dedup() { - match set_op { - SetOp::Union => { - if !values_set.contains(&r_val) { - rows.push(r_val); + let l_start = l_offsets[i].as_usize(); + let l_end = l_offsets[i + 1].as_usize(); + let r_start = r_offsets[i].as_usize(); + let r_end = r_offsets[i + 1].as_usize(); + + let mut count = 0usize; + // Clear sets for reuse + seen.clear(); + r_set.clear(); + + match set_op { + SetOp::Union => { + for idx in l_start..l_end { + let row = rows_l.row(idx); + if seen.insert(row) { + final_rows.push(row); + count += 1; } } - SetOp::Intersect => { - if values_set.contains(&r_val) { - rows.push(r_val); + for idx in r_start..r_end { + let row = rows_r.row(idx); + if seen.insert(row) { + final_rows.push(row); + count += 1; } } } - } - - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => Arc::clone(array), - None => { - return internal_err!("{set_op}: failed to get array from rows"); + SetOp::Intersect => { + // Build hash set from right array for lookup table + // then iterator left array to find common elements. Review Comment: ```suggestion // then iterate left array to find common elements. ``` ########## datafusion/functions-nested/src/set_ops.rs: ########## @@ -358,69 +364,84 @@ fn generic_set_lists<OffsetSize: OffsetSizeTrait>( "{set_op:?} is not implemented for '{l:?}' and '{r:?}'" ); - let mut offsets = vec![OffsetSize::usize_as(0)]; - let mut new_arrays = vec![]; + // Convert all values to rows in batch for performance. let converter = RowConverter::new(vec![SortField::new(l.value_type())])?; - for (l_arr, r_arr) in l.iter().zip(r.iter()) { - let last_offset = *offsets.last().unwrap(); - - let (l_values, r_values) = match (l_arr, r_arr) { - (Some(l_arr), Some(r_arr)) => ( - converter.convert_columns(&[l_arr])?, - converter.convert_columns(&[r_arr])?, - ), - _ => { - offsets.push(last_offset); - continue; - } - }; - - let l_iter = l_values.iter().sorted().dedup(); - let values_set: HashSet<_> = l_iter.clone().collect(); - let mut rows = if set_op == SetOp::Union { - l_iter.collect() - } else { - vec![] - }; + let rows_l = converter.convert_columns(&[Arc::clone(l.values())])?; + let rows_r = converter.convert_columns(&[Arc::clone(r.values())])?; + let l_offsets = l.value_offsets(); + let r_offsets = r.value_offsets(); + + let mut result_offsets = Vec::with_capacity(l.len() + 1); + result_offsets.push(OffsetSize::usize_as(0)); + let mut final_rows = Vec::with_capacity(rows_l.num_rows()); + + // Reuse hash sets across iterations + let mut seen = HashSet::new(); + let mut r_set = HashSet::new(); + for i in 0..l.len() { + let last_offset = *result_offsets.last().unwrap(); + + if l.is_null(i) || r.is_null(i) { + result_offsets.push(last_offset); + continue; + } - for r_val in r_values.iter().sorted().dedup() { - match set_op { - SetOp::Union => { - if !values_set.contains(&r_val) { - rows.push(r_val); + let l_start = l_offsets[i].as_usize(); + let l_end = l_offsets[i + 1].as_usize(); + let r_start = r_offsets[i].as_usize(); + let r_end = r_offsets[i + 1].as_usize(); + + let mut count = 0usize; + // Clear sets for reuse + seen.clear(); + r_set.clear(); + + match set_op { + SetOp::Union => { + for idx in l_start..l_end { + let row = rows_l.row(idx); + if seen.insert(row) { + final_rows.push(row); + count += 1; } } - SetOp::Intersect => { - if values_set.contains(&r_val) { - rows.push(r_val); + for idx in r_start..r_end { + let row = rows_r.row(idx); + if seen.insert(row) { + final_rows.push(row); + count += 1; } } } - } - - offsets.push(last_offset + OffsetSize::usize_as(rows.len())); - let arrays = converter.convert_rows(rows)?; - let array = match arrays.first() { - Some(array) => Arc::clone(array), - None => { - return internal_err!("{set_op}: failed to get array from rows"); + SetOp::Intersect => { + // Build hash set from right array for lookup table + // then iterator left array to find common elements. Review Comment: It would be faster to create the HashSet from the shorter array and iterate over the longer one. This would minimise the memory usage for the hash set and can reduce the number of hash operations, especially when there's a significant size difference between the two arrays. ########## datafusion/functions-nested/benches/array_set_ops.rs: ########## @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_use] +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, ListArray}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{DataType, Field}; +use criterion::{BenchmarkId, Criterion}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl}; +use datafusion_functions_nested::set_ops::{ArrayIntersect, ArrayUnion}; +use rand::SeedableRng; +use rand::prelude::SliceRandom; +use rand::rngs::StdRng; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 1000; +const ARRAY_SIZES: &[usize] = &[10, 50, 100]; +const SEED: u64 = 42; + +fn criterion_benchmark(c: &mut Criterion) { + bench_array_union(c); + bench_array_intersect(c); +} + +fn invoke_array_union(udf: &impl ScalarUDFImpl, array1: &ArrayRef, array2: &ArrayRef) { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ], + arg_fields: vec![ + Field::new("arr1", array1.data_type().clone(), false).into(), + Field::new("arr2", array2.data_type().clone(), false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new("result", array1.data_type().clone(), false).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ); +} + +fn invoke_array_intersect( + udf: &impl ScalarUDFImpl, + array1: &ArrayRef, + array2: &ArrayRef, +) { + black_box( + udf.invoke_with_args(ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(array1.clone()), + ColumnarValue::Array(array2.clone()), + ], + arg_fields: vec![ + Field::new("arr1", array1.data_type().clone(), false).into(), + Field::new("arr2", array2.data_type().clone(), false).into(), + ], + number_rows: NUM_ROWS, + return_field: Field::new("result", array1.data_type().clone(), false).into(), + config_options: Arc::new(ConfigOptions::default()), + }) + .unwrap(), + ); +} + +fn bench_array_union(c: &mut Criterion) { + let mut group = c.benchmark_group("array_union"); + let udf = ArrayUnion::new(); + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.8); + group.bench_with_input( + BenchmarkId::new("high_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_union(&udf, &array1, &array2)), + ); + } + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.2); + group.bench_with_input( + BenchmarkId::new("low_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_union(&udf, &array1, &array2)), + ); + } + + group.finish(); +} + +fn bench_array_intersect(c: &mut Criterion) { + let mut group = c.benchmark_group("array_intersect"); + let udf = ArrayIntersect::new(); + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.8); + group.bench_with_input( + BenchmarkId::new("high_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_intersect(&udf, &array1, &array2)), + ); + } + + for &array_size in ARRAY_SIZES { + let (array1, array2) = create_arrays_with_overlap(NUM_ROWS, array_size, 0.2); + group.bench_with_input( + BenchmarkId::new("low_overlap", array_size), + &array_size, + |b, _| b.iter(|| invoke_array_intersect(&udf, &array1, &array2)), + ); + } + + group.finish(); +} + +fn create_arrays_with_overlap( + num_rows: usize, + array_size: usize, + overlap_ratio: f64, +) -> (ArrayRef, ArrayRef) { + assert!((0.0..=1.0).contains(&overlap_ratio)); + let overlap_count = ((array_size as f64) * overlap_ratio).round() as usize; + + let mut rng = StdRng::seed_from_u64(SEED); + + let mut values1 = Vec::with_capacity(num_rows * array_size); + let mut values2 = Vec::with_capacity(num_rows * array_size); + + for row in 0..num_rows { + let base = (row as i64) * (array_size as i64) * 2; + + for i in 0..array_size { + values1.push(base + i as i64); + } + + let mut positions: Vec<usize> = (0..array_size).collect(); + positions.shuffle(&mut rng); + + let overlap_positions = &positions[..overlap_count]; + + for i in 0..array_size { + if overlap_positions.contains(&i) { Review Comment: Slice::contains() is O(n) (linear search). Using a HashSet would be O(1), but `create_arrays_with_overlap()` is called before `group.bench_with_input(...)`, so maybe it is OK. ```rust let overlap_positions: std::collections::HashSet<_> = positions[..overlap_count].iter().copied().collect(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
