martin-g commented on code in PR #8753:
URL: https://github.com/apache/arrow-rs/pull/8753#discussion_r2510959123


##########
arrow-select/src/merge.rs:
##########
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`merge`] and [`merge_n`]: Combine values from two or more arrays
+
+use crate::filter::{SlicesIterator, prep_null_mask_filter};
+use crate::zip::zip;
+use arrow_array::{Array, ArrayRef, BooleanArray, Datum, make_array, 
new_empty_array};
+use arrow_data::ArrayData;
+use arrow_data::transform::MutableArrayData;
+use arrow_schema::ArrowError;
+
+/// An index for the [merge] function.
+///
+/// This trait allows the indices argument for [merge] to be stored using a 
more
+/// compact representation than `usize` when the input arrays are small.
+/// If the number of input arrays is less than 256 for instance, the indices 
can be stored as `u8`.
+///
+/// Implementation must ensure that all values which return `None` from 
[MergeIndex::index] are
+/// considered equal by the [PartialEq] and [Eq] implementations.
+pub trait MergeIndex: PartialEq + Eq + Copy {
+    /// Returns the index value as an `Option<usize>`.
+    ///
+    /// `None` values returned by this function indicate holes in the index 
array and will result
+    /// in null values in the array created by [merge].
+    fn index(&self) -> Option<usize>;
+}
+
+impl MergeIndex for usize {
+    fn index(&self) -> Option<usize> {
+        Some(*self)
+    }
+}
+
+impl MergeIndex for Option<usize> {
+    fn index(&self) -> Option<usize> {
+        *self
+    }
+}
+
+/// Merges elements by index from a list of [`Array`], creating a new 
[`Array`] from
+/// those values.
+///
+/// Each element in `indices` is the index of an array in `values`. The 
`indices` array is processed
+/// sequentially. The first occurrence of index value `n` will be mapped to 
the first
+/// value of the array at index `n`. The second occurrence to the second 
value, and so on.
+/// An index value where `MergeIndex::index` returns `None` is interpreted as 
a null value.
+///
+/// # Implementation notes
+///
+/// This algorithm is similar in nature to both [zip] and
+/// [interleave](crate::interleave::interleave), but there are some important 
differences.
+///
+/// In contrast to [zip], this function supports multiple input arrays. 
Instead of
+/// a boolean selection vector, an index array is to take values from the 
input arrays, and a special
+/// marker values can be used to indicate null values.
+///
+/// In contrast to [interleave](crate::interleave::interleave), this function 
does not use pairs of
+/// indices. The values in `indices` serve the same purpose as the first value 
in the pairs passed
+/// to `interleave`.
+/// The index in the array is implicit and is derived from the number of times 
a particular array
+/// index occurs.
+/// The more constrained indexing mechanism used by this algorithm makes it 
easier to copy values
+/// in contiguous slices. In the example below, the two subsequent elements 
from array `2` can be
+/// copied in a single operation from the source array instead of copying them 
one by one.
+/// Long spans of null values are also especially cheap because they do not 
need to be represented
+/// in an input array.
+///
+/// # Safety
+///
+/// This function does not check that the number of occurrences of any 
particular array index matches
+/// the length of the corresponding input array. If an array contains more 
values than required, the
+/// spurious values will be ignored. If an array contains fewer values than 
necessary, this function
+/// will panic.
+///
+/// # Example
+///
+/// ```text
+/// ┌───────────┐  ┌─────────┐                             ┌─────────┐
+/// │┌─────────┐│  │   None  │                             │   NULL  │
+/// ││    A    ││  ├─────────┤                             ├─────────┤
+/// │└─────────┘│  │    1    │                             │    B    │
+/// │┌─────────┐│  ├─────────┤                             ├─────────┤
+/// ││    B    ││  │    0    │    merge(values, indices)   │    A    │
+/// │└─────────┘│  ├─────────┤  ─────────────────────────▶ ├─────────┤
+/// │┌─────────┐│  │   None  │                             │   NULL  │
+/// ││    C    ││  ├─────────┤                             ├─────────┤
+/// │├─────────┤│  │    2    │                             │    C    │
+/// ││    D    ││  ├─────────┤                             ├─────────┤
+/// │└─────────┘│  │    2    │                             │    D    │
+/// └───────────┘  └─────────┘                             └─────────┘
+///    values        indices                                  result
+///
+/// ```
+pub fn merge_n(values: &[&dyn Array], indices: &[impl MergeIndex]) -> 
Result<ArrayRef, ArrowError> {
+    let data_type = values[0].data_type();

Review Comment:
   There is no check for empty `values` array.



##########
arrow-select/src/merge.rs:
##########
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`merge`] and [`merge_n`]: Combine values from two or more arrays
+
+use crate::filter::{SlicesIterator, prep_null_mask_filter};
+use crate::zip::zip;
+use arrow_array::{Array, ArrayRef, BooleanArray, Datum, make_array, 
new_empty_array};
+use arrow_data::ArrayData;
+use arrow_data::transform::MutableArrayData;
+use arrow_schema::ArrowError;
+
+/// An index for the [merge] function.
+///
+/// This trait allows the indices argument for [merge] to be stored using a 
more
+/// compact representation than `usize` when the input arrays are small.
+/// If the number of input arrays is less than 256 for instance, the indices 
can be stored as `u8`.
+///
+/// Implementation must ensure that all values which return `None` from 
[MergeIndex::index] are
+/// considered equal by the [PartialEq] and [Eq] implementations.
+pub trait MergeIndex: PartialEq + Eq + Copy {
+    /// Returns the index value as an `Option<usize>`.
+    ///
+    /// `None` values returned by this function indicate holes in the index 
array and will result
+    /// in null values in the array created by [merge].
+    fn index(&self) -> Option<usize>;
+}
+
+impl MergeIndex for usize {
+    fn index(&self) -> Option<usize> {
+        Some(*self)
+    }
+}
+
+impl MergeIndex for Option<usize> {
+    fn index(&self) -> Option<usize> {
+        *self
+    }
+}
+
+/// Merges elements by index from a list of [`Array`], creating a new 
[`Array`] from
+/// those values.
+///
+/// Each element in `indices` is the index of an array in `values`. The 
`indices` array is processed
+/// sequentially. The first occurrence of index value `n` will be mapped to 
the first
+/// value of the array at index `n`. The second occurrence to the second 
value, and so on.
+/// An index value where `MergeIndex::index` returns `None` is interpreted as 
a null value.
+///
+/// # Implementation notes
+///
+/// This algorithm is similar in nature to both [zip] and
+/// [interleave](crate::interleave::interleave), but there are some important 
differences.
+///
+/// In contrast to [zip], this function supports multiple input arrays. 
Instead of
+/// a boolean selection vector, an index array is to take values from the 
input arrays, and a special
+/// marker values can be used to indicate null values.
+///
+/// In contrast to [interleave](crate::interleave::interleave), this function 
does not use pairs of
+/// indices. The values in `indices` serve the same purpose as the first value 
in the pairs passed
+/// to `interleave`.
+/// The index in the array is implicit and is derived from the number of times 
a particular array
+/// index occurs.
+/// The more constrained indexing mechanism used by this algorithm makes it 
easier to copy values
+/// in contiguous slices. In the example below, the two subsequent elements 
from array `2` can be
+/// copied in a single operation from the source array instead of copying them 
one by one.
+/// Long spans of null values are also especially cheap because they do not 
need to be represented
+/// in an input array.
+///
+/// # Safety
+///
+/// This function does not check that the number of occurrences of any 
particular array index matches
+/// the length of the corresponding input array. If an array contains more 
values than required, the
+/// spurious values will be ignored. If an array contains fewer values than 
necessary, this function
+/// will panic.
+///
+/// # Example
+///
+/// ```text
+/// ┌───────────┐  ┌─────────┐                             ┌─────────┐
+/// │┌─────────┐│  │   None  │                             │   NULL  │
+/// ││    A    ││  ├─────────┤                             ├─────────┤
+/// │└─────────┘│  │    1    │                             │    B    │
+/// │┌─────────┐│  ├─────────┤                             ├─────────┤
+/// ││    B    ││  │    0    │    merge(values, indices)   │    A    │
+/// │└─────────┘│  ├─────────┤  ─────────────────────────▶ ├─────────┤
+/// │┌─────────┐│  │   None  │                             │   NULL  │
+/// ││    C    ││  ├─────────┤                             ├─────────┤
+/// │├─────────┤│  │    2    │                             │    C    │
+/// ││    D    ││  ├─────────┤                             ├─────────┤
+/// │└─────────┘│  │    2    │                             │    D    │
+/// └───────────┘  └─────────┘                             └─────────┘
+///    values        indices                                  result
+///
+/// ```
+pub fn merge_n(values: &[&dyn Array], indices: &[impl MergeIndex]) -> 
Result<ArrayRef, ArrowError> {
+    let data_type = values[0].data_type();
+
+    for array in values.iter().skip(1) {
+        if array.data_type() != data_type {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "It is not possible to merge arrays of different data types 
({} and {})",
+                data_type,
+                array.data_type()
+            )));
+        }
+    }
+
+    if indices.is_empty() {
+        return Ok(new_empty_array(data_type));
+    }
+
+    #[cfg(debug_assertions)]
+    for ix in indices {
+        if let Some(index) = ix.index() {
+            assert!(
+                index < values.len(),
+                "Index out of bounds: {} >= {}",
+                index,
+                values.len()
+            );
+        }
+    }
+
+    let data: Vec<ArrayData> = values.iter().map(|a| a.to_data()).collect();
+    let data_refs = data.iter().collect();
+
+    let mut mutable = MutableArrayData::new(data_refs, true, indices.len());
+
+    // This loop extends the mutable array by taking slices from the partial 
results.
+    //
+    // take_offsets keeps track of how many values have been taken from each 
array.
+    let mut take_offsets = vec![0; values.len() + 1];
+    let mut start_row_ix = 0;
+    loop {
+        let array_ix = indices[start_row_ix];
+
+        // Determine the length of the slice to take.
+        let mut end_row_ix = start_row_ix + 1;
+        while end_row_ix < indices.len() && indices[end_row_ix] == array_ix {
+            end_row_ix += 1;
+        }
+        let slice_length = end_row_ix - start_row_ix;
+
+        // Extend mutable with either nulls or with values from the array.
+        match array_ix.index() {
+            None => mutable.extend_nulls(slice_length),
+            Some(index) => {
+                let start_offset = take_offsets[index];
+                let end_offset = start_offset + slice_length;
+                mutable.extend(index, start_offset, end_offset);
+                take_offsets[index] = end_offset;
+            }
+        }
+
+        if end_row_ix == indices.len() {
+            break;
+        } else {
+            // Set the start_row_ix for the next slice.
+            start_row_ix = end_row_ix;
+        }
+    }
+
+    Ok(make_array(mutable.freeze()))
+}
+
+/// Merges two arrays in the order specified by a boolean mask.
+///
+/// This algorithm is a variant of [zip] that does not require the truthy and
+/// falsy arrays to have the same length.
+///
+/// When truthy of falsy are [Scalar](arrow_array::Scalar), the single
+/// scalar value is repeated whenever the mask array contains true or false 
respectively.
+///
+/// # Example
+///
+/// ```text
+///  truthy
+/// ┌─────────┐  mask
+/// │    A    │  ┌─────────┐                             ┌─────────┐
+/// ├─────────┤  │  true   │                             │    A    │
+/// │    C    │  ├─────────┤                             ├─────────┤
+/// ├─────────┤  │  true   │                             │    C    │
+/// │   NULL  │  ├─────────┤                             ├─────────┤
+/// ├─────────┤  │  false  │  merge(mask, truthy, falsy) │    B    │
+/// │    D    │  ├─────────┤  ─────────────────────────▶ ├─────────┤
+/// └─────────┘  │  true   │                             │   NULL  │
+///  falsy       ├─────────┤                             ├─────────┤
+/// ┌─────────┐  │  false  │                             │    E    │
+/// │    B    │  ├─────────┤                             ├─────────┤
+/// ├─────────┤  │  true   │                             │    D    │
+/// │    E    │  └─────────┘                             └─────────┘
+/// └─────────┘
+/// ```
+pub fn merge(
+    mask: &BooleanArray,
+    truthy: &dyn Datum,
+    falsy: &dyn Datum,
+) -> Result<ArrayRef, ArrowError> {
+    let (truthy_array, truthy_is_scalar) = truthy.get();
+    let (falsy_array, falsy_is_scalar) = falsy.get();
+
+    if truthy_is_scalar && falsy_is_scalar {
+        // When both truthy and falsy are scalars, we can use `zip` since the 
result is the same
+        // and zip has optimized code for scalars.
+        return zip(mask, truthy, falsy);
+    }
+
+    if truthy_array.data_type() != falsy_array.data_type() {
+        return Err(ArrowError::InvalidArgumentError(
+            "arguments need to have the same data type".into(),
+        ));
+    }
+
+    if truthy_is_scalar && truthy_array.len() != 1 {
+        return Err(ArrowError::InvalidArgumentError(
+            "scalar arrays must have 1 element".into(),
+        ));
+    }
+    if falsy_is_scalar && falsy_array.len() != 1 {
+        return Err(ArrowError::InvalidArgumentError(
+            "scalar arrays must have 1 element".into(),
+        ));
+    }
+
+    let falsy = falsy_array.to_data();
+    let truthy = truthy_array.to_data();
+
+    let mut mutable = MutableArrayData::new(vec![&truthy, &falsy], false, 
truthy.len());

Review Comment:
   ```suggestion
       let mut mutable = MutableArrayData::new(vec![&truthy, &falsy], false, 
mask.len());
   ```



##########
arrow-select/src/merge.rs:
##########
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`merge`] and [`merge_n`]: Combine values from two or more arrays
+
+use crate::filter::{SlicesIterator, prep_null_mask_filter};
+use crate::zip::zip;
+use arrow_array::{Array, ArrayRef, BooleanArray, Datum, make_array, 
new_empty_array};
+use arrow_data::ArrayData;
+use arrow_data::transform::MutableArrayData;
+use arrow_schema::ArrowError;
+
+/// An index for the [merge] function.
+///
+/// This trait allows the indices argument for [merge] to be stored using a 
more
+/// compact representation than `usize` when the input arrays are small.
+/// If the number of input arrays is less than 256 for instance, the indices 
can be stored as `u8`.
+///
+/// Implementation must ensure that all values which return `None` from 
[MergeIndex::index] are
+/// considered equal by the [PartialEq] and [Eq] implementations.
+pub trait MergeIndex: PartialEq + Eq + Copy {
+    /// Returns the index value as an `Option<usize>`.
+    ///
+    /// `None` values returned by this function indicate holes in the index 
array and will result
+    /// in null values in the array created by [merge].
+    fn index(&self) -> Option<usize>;
+}
+
+impl MergeIndex for usize {
+    fn index(&self) -> Option<usize> {
+        Some(*self)
+    }
+}
+
+impl MergeIndex for Option<usize> {
+    fn index(&self) -> Option<usize> {
+        *self
+    }
+}
+
+/// Merges elements by index from a list of [`Array`], creating a new 
[`Array`] from
+/// those values.
+///
+/// Each element in `indices` is the index of an array in `values`. The 
`indices` array is processed
+/// sequentially. The first occurrence of index value `n` will be mapped to 
the first
+/// value of the array at index `n`. The second occurrence to the second 
value, and so on.
+/// An index value where `MergeIndex::index` returns `None` is interpreted as 
a null value.
+///
+/// # Implementation notes
+///
+/// This algorithm is similar in nature to both [zip] and
+/// [interleave](crate::interleave::interleave), but there are some important 
differences.
+///
+/// In contrast to [zip], this function supports multiple input arrays. 
Instead of
+/// a boolean selection vector, an index array is to take values from the 
input arrays, and a special
+/// marker values can be used to indicate null values.
+///
+/// In contrast to [interleave](crate::interleave::interleave), this function 
does not use pairs of
+/// indices. The values in `indices` serve the same purpose as the first value 
in the pairs passed
+/// to `interleave`.
+/// The index in the array is implicit and is derived from the number of times 
a particular array
+/// index occurs.
+/// The more constrained indexing mechanism used by this algorithm makes it 
easier to copy values
+/// in contiguous slices. In the example below, the two subsequent elements 
from array `2` can be
+/// copied in a single operation from the source array instead of copying them 
one by one.
+/// Long spans of null values are also especially cheap because they do not 
need to be represented
+/// in an input array.
+///
+/// # Safety

Review Comment:
   ```suggestion
   /// # Panics
   ```



##########
arrow-select/src/merge.rs:
##########
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`merge`] and [`merge_n`]: Combine values from two or more arrays
+
+use crate::filter::{SlicesIterator, prep_null_mask_filter};
+use crate::zip::zip;
+use arrow_array::{Array, ArrayRef, BooleanArray, Datum, make_array, 
new_empty_array};
+use arrow_data::ArrayData;
+use arrow_data::transform::MutableArrayData;
+use arrow_schema::ArrowError;
+
+/// An index for the [merge] function.

Review Comment:
   ```suggestion
   /// An index for the [merge_n] function.
   ```



##########
arrow/benches/merge_kernels.rs:
##########
@@ -0,0 +1,280 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::measurement::WallTime;
+use criterion::{BenchmarkGroup, BenchmarkId, Criterion, criterion_group, 
criterion_main};
+use rand::distr::{Distribution, StandardUniform};
+use rand::prelude::StdRng;
+use rand::{Rng, SeedableRng};
+use std::hint;
+use std::sync::Arc;
+
+use arrow::array::*;
+use arrow::datatypes::*;
+use arrow::util::bench_util::*;
+use arrow_select::merge::merge;
+
+trait InputGenerator {
+    fn name(&self) -> &str;
+
+    /// Return an ArrayRef containing a single null value
+    fn generate_scalar_with_null_value(&self) -> ArrayRef;
+
+    /// Generate a `number_of_scalars` unique scalars
+    fn generate_non_null_scalars(&self, seed: u64, number_of_scalars: usize) 
-> Vec<ArrayRef>;
+
+    /// Generate an array with the specified length and null percentage
+    fn generate_array(&self, seed: u64, array_length: usize, null_percentage: 
f32) -> ArrayRef;
+}
+
+struct GeneratePrimitive<T: ArrowPrimitiveType> {
+    description: String,
+    _marker: std::marker::PhantomData<T>,
+}
+
+impl<T> InputGenerator for GeneratePrimitive<T>
+where
+    T: ArrowPrimitiveType,
+    StandardUniform: Distribution<T::Native>,
+{
+    fn name(&self) -> &str {
+        self.description.as_str()
+    }
+
+    fn generate_scalar_with_null_value(&self) -> ArrayRef {
+        new_null_array(&T::DATA_TYPE, 1)
+    }
+
+    fn generate_non_null_scalars(&self, seed: u64, number_of_scalars: usize) 
-> Vec<ArrayRef> {
+        let rng = StdRng::seed_from_u64(seed);
+
+        rng.sample_iter::<T::Native, _>(StandardUniform)
+            .take(number_of_scalars)
+            .map(|v: T::Native| {
+                Arc::new(PrimitiveArray::<T>::new_scalar(v).into_inner()) as 
ArrayRef
+            })
+            .collect()
+    }
+
+    fn generate_array(&self, seed: u64, array_length: usize, null_percentage: 
f32) -> ArrayRef {
+        Arc::new(create_primitive_array_with_seed::<T>(
+            array_length,
+            null_percentage,
+            seed,
+        ))
+    }
+}
+
+struct GenerateBytes<Byte: ByteArrayType> {
+    range_length: std::ops::Range<usize>,
+    description: String,
+
+    _marker: std::marker::PhantomData<Byte>,
+}
+
+impl<Byte> InputGenerator for GenerateBytes<Byte>
+where
+    Byte: ByteArrayType,
+{
+    fn name(&self) -> &str {
+        self.description.as_str()
+    }
+
+    fn generate_scalar_with_null_value(&self) -> ArrayRef {
+        new_null_array(&Byte::DATA_TYPE, 1)
+    }
+
+    fn generate_non_null_scalars(&self, seed: u64, number_of_scalars: usize) 
-> Vec<ArrayRef> {
+        let array = self.generate_array(seed, number_of_scalars, 0.0);
+
+        (0..number_of_scalars).map(|i| array.slice(i, 1)).collect()
+    }
+
+    fn generate_array(&self, seed: u64, array_length: usize, null_percentage: 
f32) -> ArrayRef {
+        let is_binary =
+            Byte::DATA_TYPE == DataType::Binary || Byte::DATA_TYPE == 
DataType::LargeBinary;
+        if is_binary {
+            Arc::new(create_binary_array_with_len_range_and_prefix_and_seed::<
+                Byte::Offset,
+            >(
+                array_length,
+                null_percentage,
+                self.range_length.start,
+                self.range_length.end - 1,
+                &[],
+                seed,
+            ))
+        } else {
+            Arc::new(create_string_array_with_len_range_and_prefix_and_seed::<
+                Byte::Offset,
+            >(
+                array_length,
+                null_percentage,
+                self.range_length.start,
+                self.range_length.end - 1,
+                "",
+                seed,
+            ))
+        }
+    }
+}
+
+fn mask_cases(len: usize) -> Vec<(&'static str, BooleanArray)> {
+    vec![
+        ("all_true", create_boolean_array(len, 0.0, 1.0)),
+        ("99pct_true", create_boolean_array(len, 0.0, 0.99)),
+        ("90pct_true", create_boolean_array(len, 0.0, 0.9)),
+        ("50pct_true", create_boolean_array(len, 0.0, 0.5)),
+        ("10pct_true", create_boolean_array(len, 0.0, 0.1)),
+        ("1pct_true", create_boolean_array(len, 0.0, 0.01)),
+        ("all_false", create_boolean_array(len, 0.0, 0.0)),
+        ("50pct_nulls", create_boolean_array(len, 0.5, 0.5)),
+    ]
+}
+
+fn bench_merge_on_input_generator(c: &mut Criterion, input_generator: &impl 
InputGenerator) {
+    const ARRAY_LEN: usize = 8192;
+
+    let mut group =
+        c.benchmark_group(format!("merge_{ARRAY_LEN}_from_{}", 
input_generator.name()).as_str());
+
+    let null_scalar = input_generator.generate_scalar_with_null_value();
+    let [non_null_scalar_1, non_null_scalar_2]: [_; 2] = input_generator
+        .generate_non_null_scalars(42, 2)
+        .try_into()
+        .unwrap();
+
+    // For simplicity, we generate arrays with length ARRAY_LEN. Not all input 
values will be used.
+    let array_1_10pct_nulls = input_generator.generate_array(42, ARRAY_LEN, 
0.1);
+    let array_2_10pct_nulls = input_generator.generate_array(18, ARRAY_LEN, 
0.1);
+
+    let masks = mask_cases(ARRAY_LEN);
+
+    // Benchmarks for different scalar combinations
+    for (description, truthy, falsy) in &[
+        ("null_vs_non_null_scalar", &null_scalar, &non_null_scalar_1),
+        (
+            "non_null_scalar_vs_null_scalar",
+            &non_null_scalar_1,
+            &null_scalar,
+        ),
+        ("non_nulls_scalars", &non_null_scalar_1, &non_null_scalar_2),
+    ] {
+        bench_merge_input_on_all_masks(
+            description,
+            &mut group,
+            &masks,
+            &Scalar::new(truthy),
+            &Scalar::new(falsy),
+        );
+    }
+
+    bench_merge_input_on_all_masks(
+        "array_vs_non_null_scalar",
+        &mut group,
+        &masks,
+        &array_1_10pct_nulls,
+        &non_null_scalar_1,
+    );
+
+    bench_merge_input_on_all_masks(
+        "non_null_scalar_vs_array",
+        &mut group,
+        &masks,
+        &array_1_10pct_nulls,
+        &non_null_scalar_1,

Review Comment:
   The arguments here look exactly the same as for `array_vs_non_null_scalar` 
above. I think the last two arguments should be swapped.



##########
arrow-select/src/merge.rs:
##########
@@ -0,0 +1,386 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`merge`] and [`merge_n`]: Combine values from two or more arrays
+
+use crate::filter::{SlicesIterator, prep_null_mask_filter};
+use crate::zip::zip;
+use arrow_array::{Array, ArrayRef, BooleanArray, Datum, make_array, 
new_empty_array};
+use arrow_data::ArrayData;
+use arrow_data::transform::MutableArrayData;
+use arrow_schema::ArrowError;
+
+/// An index for the [merge] function.
+///
+/// This trait allows the indices argument for [merge] to be stored using a 
more
+/// compact representation than `usize` when the input arrays are small.
+/// If the number of input arrays is less than 256 for instance, the indices 
can be stored as `u8`.
+///
+/// Implementation must ensure that all values which return `None` from 
[MergeIndex::index] are
+/// considered equal by the [PartialEq] and [Eq] implementations.
+pub trait MergeIndex: PartialEq + Eq + Copy {
+    /// Returns the index value as an `Option<usize>`.
+    ///
+    /// `None` values returned by this function indicate holes in the index 
array and will result
+    /// in null values in the array created by [merge].
+    fn index(&self) -> Option<usize>;
+}
+
+impl MergeIndex for usize {
+    fn index(&self) -> Option<usize> {
+        Some(*self)
+    }
+}
+
+impl MergeIndex for Option<usize> {
+    fn index(&self) -> Option<usize> {
+        *self
+    }
+}
+
+/// Merges elements by index from a list of [`Array`], creating a new 
[`Array`] from
+/// those values.
+///
+/// Each element in `indices` is the index of an array in `values`. The 
`indices` array is processed
+/// sequentially. The first occurrence of index value `n` will be mapped to 
the first
+/// value of the array at index `n`. The second occurrence to the second 
value, and so on.
+/// An index value where `MergeIndex::index` returns `None` is interpreted as 
a null value.
+///
+/// # Implementation notes
+///
+/// This algorithm is similar in nature to both [zip] and
+/// [interleave](crate::interleave::interleave), but there are some important 
differences.
+///
+/// In contrast to [zip], this function supports multiple input arrays. 
Instead of
+/// a boolean selection vector, an index array is to take values from the 
input arrays, and a special
+/// marker values can be used to indicate null values.
+///
+/// In contrast to [interleave](crate::interleave::interleave), this function 
does not use pairs of
+/// indices. The values in `indices` serve the same purpose as the first value 
in the pairs passed
+/// to `interleave`.
+/// The index in the array is implicit and is derived from the number of times 
a particular array
+/// index occurs.
+/// The more constrained indexing mechanism used by this algorithm makes it 
easier to copy values
+/// in contiguous slices. In the example below, the two subsequent elements 
from array `2` can be
+/// copied in a single operation from the source array instead of copying them 
one by one.
+/// Long spans of null values are also especially cheap because they do not 
need to be represented
+/// in an input array.
+///
+/// # Safety
+///
+/// This function does not check that the number of occurrences of any 
particular array index matches
+/// the length of the corresponding input array. If an array contains more 
values than required, the
+/// spurious values will be ignored. If an array contains fewer values than 
necessary, this function
+/// will panic.
+///
+/// # Example
+///
+/// ```text
+/// ┌───────────┐  ┌─────────┐                             ┌─────────┐
+/// │┌─────────┐│  │   None  │                             │   NULL  │
+/// ││    A    ││  ├─────────┤                             ├─────────┤
+/// │└─────────┘│  │    1    │                             │    B    │
+/// │┌─────────┐│  ├─────────┤                             ├─────────┤
+/// ││    B    ││  │    0    │    merge(values, indices)   │    A    │
+/// │└─────────┘│  ├─────────┤  ─────────────────────────▶ ├─────────┤
+/// │┌─────────┐│  │   None  │                             │   NULL  │
+/// ││    C    ││  ├─────────┤                             ├─────────┤
+/// │├─────────┤│  │    2    │                             │    C    │
+/// ││    D    ││  ├─────────┤                             ├─────────┤
+/// │└─────────┘│  │    2    │                             │    D    │
+/// └───────────┘  └─────────┘                             └─────────┘
+///    values        indices                                  result
+///
+/// ```
+pub fn merge_n(values: &[&dyn Array], indices: &[impl MergeIndex]) -> 
Result<ArrayRef, ArrowError> {
+    let data_type = values[0].data_type();
+
+    for array in values.iter().skip(1) {
+        if array.data_type() != data_type {
+            return Err(ArrowError::InvalidArgumentError(format!(
+                "It is not possible to merge arrays of different data types 
({} and {})",
+                data_type,
+                array.data_type()
+            )));
+        }
+    }
+
+    if indices.is_empty() {
+        return Ok(new_empty_array(data_type));
+    }
+
+    #[cfg(debug_assertions)]
+    for ix in indices {
+        if let Some(index) = ix.index() {
+            assert!(
+                index < values.len(),
+                "Index out of bounds: {} >= {}",
+                index,
+                values.len()
+            );
+        }
+    }
+
+    let data: Vec<ArrayData> = values.iter().map(|a| a.to_data()).collect();
+    let data_refs = data.iter().collect();
+
+    let mut mutable = MutableArrayData::new(data_refs, true, indices.len());
+
+    // This loop extends the mutable array by taking slices from the partial 
results.
+    //
+    // take_offsets keeps track of how many values have been taken from each 
array.
+    let mut take_offsets = vec![0; values.len() + 1];
+    let mut start_row_ix = 0;
+    loop {
+        let array_ix = indices[start_row_ix];
+
+        // Determine the length of the slice to take.
+        let mut end_row_ix = start_row_ix + 1;
+        while end_row_ix < indices.len() && indices[end_row_ix] == array_ix {
+            end_row_ix += 1;
+        }
+        let slice_length = end_row_ix - start_row_ix;
+
+        // Extend mutable with either nulls or with values from the array.
+        match array_ix.index() {
+            None => mutable.extend_nulls(slice_length),
+            Some(index) => {
+                let start_offset = take_offsets[index];
+                let end_offset = start_offset + slice_length;
+                mutable.extend(index, start_offset, end_offset);
+                take_offsets[index] = end_offset;
+            }
+        }
+
+        if end_row_ix == indices.len() {
+            break;
+        } else {
+            // Set the start_row_ix for the next slice.
+            start_row_ix = end_row_ix;
+        }
+    }
+
+    Ok(make_array(mutable.freeze()))
+}
+
+/// Merges two arrays in the order specified by a boolean mask.
+///
+/// This algorithm is a variant of [zip] that does not require the truthy and
+/// falsy arrays to have the same length.
+///
+/// When truthy of falsy are [Scalar](arrow_array::Scalar), the single
+/// scalar value is repeated whenever the mask array contains true or false 
respectively.
+///
+/// # Example
+///
+/// ```text
+///  truthy
+/// ┌─────────┐  mask
+/// │    A    │  ┌─────────┐                             ┌─────────┐
+/// ├─────────┤  │  true   │                             │    A    │
+/// │    C    │  ├─────────┤                             ├─────────┤
+/// ├─────────┤  │  true   │                             │    C    │
+/// │   NULL  │  ├─────────┤                             ├─────────┤
+/// ├─────────┤  │  false  │  merge(mask, truthy, falsy) │    B    │
+/// │    D    │  ├─────────┤  ─────────────────────────▶ ├─────────┤
+/// └─────────┘  │  true   │                             │   NULL  │
+///  falsy       ├─────────┤                             ├─────────┤
+/// ┌─────────┐  │  false  │                             │    E    │
+/// │    B    │  ├─────────┤                             ├─────────┤
+/// ├─────────┤  │  true   │                             │    D    │
+/// │    E    │  └─────────┘                             └─────────┘
+/// └─────────┘
+/// ```
+pub fn merge(
+    mask: &BooleanArray,
+    truthy: &dyn Datum,
+    falsy: &dyn Datum,
+) -> Result<ArrayRef, ArrowError> {
+    let (truthy_array, truthy_is_scalar) = truthy.get();
+    let (falsy_array, falsy_is_scalar) = falsy.get();
+
+    if truthy_is_scalar && falsy_is_scalar {
+        // When both truthy and falsy are scalars, we can use `zip` since the 
result is the same
+        // and zip has optimized code for scalars.
+        return zip(mask, truthy, falsy);
+    }
+
+    if truthy_array.data_type() != falsy_array.data_type() {
+        return Err(ArrowError::InvalidArgumentError(
+            "arguments need to have the same data type".into(),
+        ));
+    }
+
+    if truthy_is_scalar && truthy_array.len() != 1 {
+        return Err(ArrowError::InvalidArgumentError(
+            "scalar arrays must have 1 element".into(),
+        ));
+    }
+    if falsy_is_scalar && falsy_array.len() != 1 {
+        return Err(ArrowError::InvalidArgumentError(
+            "scalar arrays must have 1 element".into(),
+        ));
+    }
+
+    let falsy = falsy_array.to_data();
+    let truthy = truthy_array.to_data();
+
+    let mut mutable = MutableArrayData::new(vec![&truthy, &falsy], false, 
truthy.len());
+
+    // the SlicesIterator slices only the true values. So the gaps left by 
this iterator we need to
+    // fill with falsy values
+
+    // keep track of how much is filled
+    let mut filled = 0;
+    let mut falsy_offset = 0;
+    let mut truthy_offset = 0;
+
+    // Ensure nulls are treated as false
+    let mask_buffer = match mask.null_count() {
+        0 => mask.values().clone(),
+        _ => prep_null_mask_filter(mask).into_parts().0,
+    };
+
+    SlicesIterator::from(&mask_buffer).for_each(|(start, end)| {
+        // the gap needs to be filled with falsy values
+        if start > filled {
+            if falsy_is_scalar {
+                for _ in filled..start {
+                    // Copy the first item from the 'falsy' array into the 
output buffer.
+                    mutable.extend(1, 0, 1);
+                }
+            } else {
+                let falsy_length = start - filled;
+                let falsy_end = falsy_offset + falsy_length;
+                mutable.extend(1, falsy_offset, falsy_end);
+                falsy_offset = falsy_end;
+            }
+        }
+        // fill with truthy values
+        if truthy_is_scalar {
+            for _ in start..end {
+                // Copy the first item from the 'truthy' array into the output 
buffer.
+                mutable.extend(0, 0, 1);
+            }
+        } else {
+            let truthy_length = end - start;
+            let truthy_end = truthy_offset + truthy_length;
+            mutable.extend(0, truthy_offset, truthy_end);
+            truthy_offset = truthy_end;
+        }
+        filled = end;
+    });
+    // the remaining part is falsy
+    if filled < mask.len() {
+        if falsy_is_scalar {
+            for _ in filled..mask.len() {
+                // Copy the first item from the 'falsy' array into the output 
buffer.
+                mutable.extend(1, 0, 1);
+            }
+        } else {
+            let falsy_length = mask.len() - filled;
+            let falsy_end = falsy_offset + falsy_length;
+            mutable.extend(1, falsy_offset, falsy_end);
+        }
+    }
+
+    let data = mutable.freeze();
+    Ok(make_array(data))
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::merge::{MergeIndex, merge, merge_n};
+    use arrow_array::cast::AsArray;
+    use arrow_array::{Array, BooleanArray, StringArray};
+
+    #[derive(PartialEq, Eq, Copy, Clone)]
+    struct CompactMergeIndex {
+        index: u8,
+    }
+
+    impl MergeIndex for CompactMergeIndex {
+        fn index(&self) -> Option<usize> {
+            if self.index == u8::MAX {
+                None
+            } else {
+                Some(self.index as usize)
+            }
+        }
+    }
+
+    #[test]
+    fn test_merge() {
+        let a1 = StringArray::from(vec![Some("A"), Some("B"), Some("E"), 
None]);
+        let a2 = StringArray::from(vec![Some("C"), Some("D")]);
+
+        let indices = BooleanArray::from(vec![true, false, true, false, true, 
true]);
+
+        let merged = merge(&indices, &a1, &a2).unwrap();
+        let merged = merged.as_string::<i32>();
+
+        assert_eq!(merged.len(), indices.len());
+        assert!(merged.is_valid(0));
+        assert_eq!(merged.value(0), "A");
+        assert!(merged.is_valid(1));
+        assert_eq!(merged.value(1), "C");
+        assert!(merged.is_valid(2));
+        assert_eq!(merged.value(2), "B");
+        assert!(merged.is_valid(3));
+        assert_eq!(merged.value(3), "D");
+        assert!(merged.is_valid(4));
+        assert_eq!(merged.value(4), "E");
+        assert!(!merged.is_valid(5));
+    }
+

Review Comment:
   Add a unit test for empty mask:
   
   ```suggestion
   #[test]
   fn test_merge_empty_mask() {
       let a1 = StringArray::from(vec![Some("A")]);
       let a2 = StringArray::from(vec![Some("B")]);
       let mask = BooleanArray::from(vec![]);
       let result = merge(&mask, &a1, &a2).unwrap();
       assert_eq!(result.len(), 0);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to