bjchambers commented on code in PR #1499:
URL: https://github.com/apache/arrow-rs/pull/1499#discussion_r837674561


##########
arrow/src/compute/kernels/boolean.rs:
##########
@@ -575,10 +577,242 @@ where
     Ok(PrimitiveArray::<T>::from(data))
 }
 
+/// Creates a (mostly) zero-copy slice of the given buffers so that they can 
be combined
+/// in the same array with other buffers that start at offset 0.
+/// The only buffers that need an actual copy are booleans (if they are not 
byte-aligned)
+/// and list/binary/string offsets because the arrow implementation requires 
them to start at 0.
+/// This is useful when a kernel calculates a new validity bitmap but wants to 
reuse other buffers.
+fn slice_buffers(
+    buffers: &[Buffer],
+    offset: usize,
+    len: usize,
+    data_type: &DataType,
+    child_data: &[ArrayData],
+) -> (Vec<Buffer>, Vec<ArrayData>) {
+    use std::mem::size_of;
+
+    if offset == 0 {
+        return (buffers.to_vec(), child_data.to_vec());
+    }
+
+    // we only need to do something special to child data in 2 of the match 
branches
+    let mut result_child_data = None;
+
+    let result_buffers = match data_type {
+        DataType::Boolean => vec![buffers[0].bit_slice(offset, len)],
+        DataType::Int8 | DataType::UInt8 => {
+            vec![buffers[0].slice(offset * size_of::<u8>())]
+        }
+        DataType::Int16 | DataType::UInt16 | DataType::Float16 => {
+            vec![buffers[0].slice(offset * size_of::<u16>())]
+        }
+        DataType::Int32 | DataType::UInt32 | DataType::Float32 => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Int64 | DataType::UInt64 | DataType::Float64 => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Timestamp(_, _) | DataType::Duration(_) => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Date32 | DataType::Time32(_) => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Date64 | DataType::Time64(_) => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Interval(IntervalUnit::YearMonth) => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Interval(IntervalUnit::DayTime) => {
+            vec![buffers[0].slice(offset * 2 * size_of::<u32>())]
+        }
+        DataType::Interval(IntervalUnit::MonthDayNano) => {
+            vec![buffers[0].slice(offset * (2 * size_of::<u32>() + 
size_of::<u64>()))]
+        }
+        DataType::Decimal(_, _) => vec![buffers[0].slice(offset * 
size_of::<i128>())],
+        DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+            DataType::Int8 | DataType::UInt8 => {
+                vec![buffers[0].slice(offset * size_of::<u8>())]
+            }
+            DataType::Int16 | DataType::UInt16 => {
+                vec![buffers[0].slice(offset * size_of::<u16>())]
+            }
+            DataType::Int32 | DataType::UInt32 => {
+                vec![buffers[0].slice(offset * size_of::<u32>())]
+            }
+            DataType::Int64 | DataType::UInt64 => {
+                vec![buffers[0].slice(offset * size_of::<u64>())]
+            }
+            _ => unreachable!(),
+        },
+        DataType::List(_) => {
+            // safe because for List the first buffer is guaranteed to contain 
i32 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            let last_offset = offsets[len] as usize;
+            let nested_len = last_offset - first_offset;
+
+            // since we calculate a new offset buffer starting from 0 we also 
have to slice the child data
+            result_child_data = Some(
+                child_data
+                    .iter()
+                    .map(|d| d.slice(first_offset, nested_len))
+                    .collect(),
+            );
+            vec![offset_buffer_slice::<i32>(offsets, len)]
+        }
+        DataType::LargeList(_) => {
+            // safe because for LargeList the first buffer is guaranteed to 
contain i64 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            let last_offset = offsets[len] as usize;
+            let nested_len = last_offset - first_offset;
+            // since we calculate a new offset buffer starting from 0 we also 
have to slice the child data
+
+            result_child_data = Some(
+                child_data
+                    .iter()
+                    .map(|d| d.slice(first_offset, nested_len))
+                    .collect(),
+            );
+            vec![offset_buffer_slice::<i64>(offsets, len)]
+        }
+        DataType::Binary | DataType::Utf8 => {
+            // safe because for Binary/Utf8 the first buffer is guaranteed to 
contain i32 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            vec![
+                offset_buffer_slice::<i32>(offsets, len),
+                buffers[1].slice(first_offset * size_of::<i32>()),
+            ]
+        }
+        DataType::LargeBinary | DataType::LargeUtf8 => {
+            // safe because for LargeBinary/LargeUtf8 the first buffer is 
guaranteed to contain i64 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            vec![
+                offset_buffer_slice::<i64>(offsets, len),
+                buffers[1].slice(first_offset * size_of::<i64>()),
+            ]
+        }
+        DataType::FixedSizeBinary(size) => {
+            vec![buffers[0].slice(offset * (*size as usize))]
+        }
+        DataType::FixedSizeList(_, _size) => {
+            // TODO: should this actually slice the child arrays?
+            vec![]
+        }
+        DataType::Struct(_) => {
+            // TODO: should this actually slice the child arrays?
+            vec![]
+        }
+        DataType::Union(..) => {
+            // TODO: verify this is actually correct
+            if buffers.len() > 1 {
+                // dense union, type ids and offsets
+                vec![
+                    buffers[0].slice(offset * size_of::<i8>()),
+                    buffers[1].slice(offset * size_of::<u32>()),
+                ]
+            } else {
+                // sparse union with only type ids
+                // should this also slice the child arrays?
+                vec![buffers[0].slice(offset * size_of::<i8>())]
+            }
+        }
+        DataType::Map(_, _) => {
+            // TODO: verify this is actually correct
+            result_child_data =
+                Some(child_data.iter().map(|d| d.slice(offset, 
len)).collect());
+            vec![]
+        }
+        DataType::Null => vec![],
+    };
+
+    (
+        result_buffers,
+        result_child_data.unwrap_or_else(|| child_data.to_vec()),
+    )
+}
+
+// ListArray::from(ArrayData) expected offsets to always start at 0 so we 
can't simply make a slice of the buffer,
+// but instead have to calculate a new buffer
+fn offset_buffer_slice<T: OffsetSizeTrait>(
+    original_offsets: &[T],
+    array_len: usize,
+) -> Buffer {
+    // offset buffer has 1 element more than the corresponding data buffer 
pointing yet another offset for the end of the buffer
+    let len_in_bytes = (array_len + 1) * std::mem::size_of::<T>();
+    let mut offset_buffer =
+        MutableBuffer::new(len_in_bytes).with_bitset(len_in_bytes, false);
+    let offset_slice = unsafe { offset_buffer.typed_data_mut::<T>() };
+    let offset_start = original_offsets[0];
+    offset_slice
+        .iter_mut()
+        .zip(original_offsets.iter())
+        .for_each(|(output, input)| {
+            *output = *input - offset_start;
+        });
+
+    offset_buffer.into()
+}
+
+pub fn nullif_alternative(

Review Comment:
   Could also have this one be `generic_nullif` (or something like that) if we 
wanted to keep them around. Does it behave differently than the primitive 
version on primitives? If so, that could be a reason to consider keeping both.



##########
arrow/src/compute/kernels/boolean.rs:
##########
@@ -575,10 +577,242 @@ where
     Ok(PrimitiveArray::<T>::from(data))
 }
 
+/// Creates a (mostly) zero-copy slice of the given buffers so that they can 
be combined
+/// in the same array with other buffers that start at offset 0.
+/// The only buffers that need an actual copy are booleans (if they are not 
byte-aligned)
+/// and list/binary/string offsets because the arrow implementation requires 
them to start at 0.
+/// This is useful when a kernel calculates a new validity bitmap but wants to 
reuse other buffers.
+fn slice_buffers(
+    buffers: &[Buffer],
+    offset: usize,
+    len: usize,
+    data_type: &DataType,
+    child_data: &[ArrayData],
+) -> (Vec<Buffer>, Vec<ArrayData>) {
+    use std::mem::size_of;
+
+    if offset == 0 {
+        return (buffers.to_vec(), child_data.to_vec());
+    }
+
+    // we only need to do something special to child data in 2 of the match 
branches
+    let mut result_child_data = None;
+
+    let result_buffers = match data_type {
+        DataType::Boolean => vec![buffers[0].bit_slice(offset, len)],
+        DataType::Int8 | DataType::UInt8 => {
+            vec![buffers[0].slice(offset * size_of::<u8>())]
+        }
+        DataType::Int16 | DataType::UInt16 | DataType::Float16 => {
+            vec![buffers[0].slice(offset * size_of::<u16>())]
+        }
+        DataType::Int32 | DataType::UInt32 | DataType::Float32 => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Int64 | DataType::UInt64 | DataType::Float64 => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Timestamp(_, _) | DataType::Duration(_) => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Date32 | DataType::Time32(_) => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Date64 | DataType::Time64(_) => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Interval(IntervalUnit::YearMonth) => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Interval(IntervalUnit::DayTime) => {
+            vec![buffers[0].slice(offset * 2 * size_of::<u32>())]
+        }
+        DataType::Interval(IntervalUnit::MonthDayNano) => {
+            vec![buffers[0].slice(offset * (2 * size_of::<u32>() + 
size_of::<u64>()))]
+        }
+        DataType::Decimal(_, _) => vec![buffers[0].slice(offset * 
size_of::<i128>())],
+        DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+            DataType::Int8 | DataType::UInt8 => {
+                vec![buffers[0].slice(offset * size_of::<u8>())]
+            }
+            DataType::Int16 | DataType::UInt16 => {
+                vec![buffers[0].slice(offset * size_of::<u16>())]
+            }
+            DataType::Int32 | DataType::UInt32 => {
+                vec![buffers[0].slice(offset * size_of::<u32>())]
+            }
+            DataType::Int64 | DataType::UInt64 => {
+                vec![buffers[0].slice(offset * size_of::<u64>())]
+            }
+            _ => unreachable!(),
+        },
+        DataType::List(_) => {
+            // safe because for List the first buffer is guaranteed to contain 
i32 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            let last_offset = offsets[len] as usize;
+            let nested_len = last_offset - first_offset;
+
+            // since we calculate a new offset buffer starting from 0 we also 
have to slice the child data
+            result_child_data = Some(
+                child_data
+                    .iter()
+                    .map(|d| d.slice(first_offset, nested_len))
+                    .collect(),
+            );
+            vec![offset_buffer_slice::<i32>(offsets, len)]
+        }
+        DataType::LargeList(_) => {
+            // safe because for LargeList the first buffer is guaranteed to 
contain i64 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            let last_offset = offsets[len] as usize;
+            let nested_len = last_offset - first_offset;
+            // since we calculate a new offset buffer starting from 0 we also 
have to slice the child data
+
+            result_child_data = Some(
+                child_data
+                    .iter()
+                    .map(|d| d.slice(first_offset, nested_len))
+                    .collect(),
+            );
+            vec![offset_buffer_slice::<i64>(offsets, len)]
+        }
+        DataType::Binary | DataType::Utf8 => {
+            // safe because for Binary/Utf8 the first buffer is guaranteed to 
contain i32 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            vec![
+                offset_buffer_slice::<i32>(offsets, len),
+                buffers[1].slice(first_offset * size_of::<i32>()),
+            ]
+        }
+        DataType::LargeBinary | DataType::LargeUtf8 => {
+            // safe because for LargeBinary/LargeUtf8 the first buffer is 
guaranteed to contain i64 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            vec![
+                offset_buffer_slice::<i64>(offsets, len),
+                buffers[1].slice(first_offset * size_of::<i64>()),
+            ]
+        }
+        DataType::FixedSizeBinary(size) => {
+            vec![buffers[0].slice(offset * (*size as usize))]
+        }
+        DataType::FixedSizeList(_, _size) => {
+            // TODO: should this actually slice the child arrays?
+            vec![]
+        }
+        DataType::Struct(_) => {
+            // TODO: should this actually slice the child arrays?
+            vec![]
+        }
+        DataType::Union(..) => {
+            // TODO: verify this is actually correct
+            if buffers.len() > 1 {
+                // dense union, type ids and offsets
+                vec![
+                    buffers[0].slice(offset * size_of::<i8>()),
+                    buffers[1].slice(offset * size_of::<u32>()),
+                ]
+            } else {
+                // sparse union with only type ids
+                // should this also slice the child arrays?
+                vec![buffers[0].slice(offset * size_of::<i8>())]
+            }
+        }
+        DataType::Map(_, _) => {
+            // TODO: verify this is actually correct
+            result_child_data =
+                Some(child_data.iter().map(|d| d.slice(offset, 
len)).collect());
+            vec![]
+        }
+        DataType::Null => vec![],
+    };
+
+    (
+        result_buffers,
+        result_child_data.unwrap_or_else(|| child_data.to_vec()),
+    )
+}
+
+// ListArray::from(ArrayData) expected offsets to always start at 0 so we 
can't simply make a slice of the buffer,
+// but instead have to calculate a new buffer
+fn offset_buffer_slice<T: OffsetSizeTrait>(
+    original_offsets: &[T],
+    array_len: usize,
+) -> Buffer {
+    // offset buffer has 1 element more than the corresponding data buffer 
pointing yet another offset for the end of the buffer
+    let len_in_bytes = (array_len + 1) * std::mem::size_of::<T>();
+    let mut offset_buffer =
+        MutableBuffer::new(len_in_bytes).with_bitset(len_in_bytes, false);
+    let offset_slice = unsafe { offset_buffer.typed_data_mut::<T>() };
+    let offset_start = original_offsets[0];
+    offset_slice
+        .iter_mut()
+        .zip(original_offsets.iter())
+        .for_each(|(output, input)| {
+            *output = *input - offset_start;
+        });
+
+    offset_buffer.into()
+}
+
+pub fn nullif_alternative(
+    array: &ArrayRef,
+    condition: &BooleanArray,
+) -> Result<ArrayRef> {
+    if array.len() != condition.len() {
+        return Err(ArrowError::ComputeError(
+            "Inputs to conditional kernels must have the same 
length".to_string(),

Review Comment:
   Maybe change "conditional kernels" to "nullif_alternative" to make it 
clearer what went wrong?



##########
arrow/src/compute/kernels/boolean.rs:
##########
@@ -575,10 +577,242 @@ where
     Ok(PrimitiveArray::<T>::from(data))
 }
 
+/// Creates a (mostly) zero-copy slice of the given buffers so that they can 
be combined
+/// in the same array with other buffers that start at offset 0.
+/// The only buffers that need an actual copy are booleans (if they are not 
byte-aligned)
+/// and list/binary/string offsets because the arrow implementation requires 
them to start at 0.
+/// This is useful when a kernel calculates a new validity bitmap but wants to 
reuse other buffers.
+fn slice_buffers(
+    buffers: &[Buffer],
+    offset: usize,
+    len: usize,
+    data_type: &DataType,
+    child_data: &[ArrayData],
+) -> (Vec<Buffer>, Vec<ArrayData>) {
+    use std::mem::size_of;
+
+    if offset == 0 {
+        return (buffers.to_vec(), child_data.to_vec());
+    }
+
+    // we only need to do something special to child data in 2 of the match 
branches
+    let mut result_child_data = None;
+
+    let result_buffers = match data_type {
+        DataType::Boolean => vec![buffers[0].bit_slice(offset, len)],
+        DataType::Int8 | DataType::UInt8 => {
+            vec![buffers[0].slice(offset * size_of::<u8>())]
+        }
+        DataType::Int16 | DataType::UInt16 | DataType::Float16 => {
+            vec![buffers[0].slice(offset * size_of::<u16>())]
+        }
+        DataType::Int32 | DataType::UInt32 | DataType::Float32 => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Int64 | DataType::UInt64 | DataType::Float64 => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Timestamp(_, _) | DataType::Duration(_) => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Date32 | DataType::Time32(_) => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Date64 | DataType::Time64(_) => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Interval(IntervalUnit::YearMonth) => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Interval(IntervalUnit::DayTime) => {
+            vec![buffers[0].slice(offset * 2 * size_of::<u32>())]
+        }
+        DataType::Interval(IntervalUnit::MonthDayNano) => {
+            vec![buffers[0].slice(offset * (2 * size_of::<u32>() + 
size_of::<u64>()))]
+        }
+        DataType::Decimal(_, _) => vec![buffers[0].slice(offset * 
size_of::<i128>())],
+        DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+            DataType::Int8 | DataType::UInt8 => {
+                vec![buffers[0].slice(offset * size_of::<u8>())]
+            }
+            DataType::Int16 | DataType::UInt16 => {
+                vec![buffers[0].slice(offset * size_of::<u16>())]
+            }
+            DataType::Int32 | DataType::UInt32 => {
+                vec![buffers[0].slice(offset * size_of::<u32>())]
+            }
+            DataType::Int64 | DataType::UInt64 => {
+                vec![buffers[0].slice(offset * size_of::<u64>())]
+            }
+            _ => unreachable!(),
+        },
+        DataType::List(_) => {
+            // safe because for List the first buffer is guaranteed to contain 
i32 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            let last_offset = offsets[len] as usize;
+            let nested_len = last_offset - first_offset;
+
+            // since we calculate a new offset buffer starting from 0 we also 
have to slice the child data
+            result_child_data = Some(
+                child_data
+                    .iter()
+                    .map(|d| d.slice(first_offset, nested_len))
+                    .collect(),
+            );
+            vec![offset_buffer_slice::<i32>(offsets, len)]
+        }
+        DataType::LargeList(_) => {
+            // safe because for LargeList the first buffer is guaranteed to 
contain i64 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            let last_offset = offsets[len] as usize;
+            let nested_len = last_offset - first_offset;
+            // since we calculate a new offset buffer starting from 0 we also 
have to slice the child data
+
+            result_child_data = Some(
+                child_data
+                    .iter()
+                    .map(|d| d.slice(first_offset, nested_len))
+                    .collect(),
+            );
+            vec![offset_buffer_slice::<i64>(offsets, len)]
+        }
+        DataType::Binary | DataType::Utf8 => {
+            // safe because for Binary/Utf8 the first buffer is guaranteed to 
contain i32 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            vec![
+                offset_buffer_slice::<i32>(offsets, len),
+                buffers[1].slice(first_offset * size_of::<i32>()),
+            ]
+        }
+        DataType::LargeBinary | DataType::LargeUtf8 => {
+            // safe because for LargeBinary/LargeUtf8 the first buffer is 
guaranteed to contain i64 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            vec![
+                offset_buffer_slice::<i64>(offsets, len),
+                buffers[1].slice(first_offset * size_of::<i64>()),
+            ]
+        }
+        DataType::FixedSizeBinary(size) => {
+            vec![buffers[0].slice(offset * (*size as usize))]
+        }
+        DataType::FixedSizeList(_, _size) => {
+            // TODO: should this actually slice the child arrays?
+            vec![]

Review Comment:
   Here and below -- does this mean it doesn't support fixed size lists, 
structs, etc.?
   
   If yes -- should this instead return an error (or panic) rather than 
returning the empty list?
   If no -- should there be added tests for these cases?



##########
arrow/src/compute/kernels/boolean.rs:
##########
@@ -575,10 +577,242 @@ where
     Ok(PrimitiveArray::<T>::from(data))
 }
 
+/// Creates a (mostly) zero-copy slice of the given buffers so that they can 
be combined
+/// in the same array with other buffers that start at offset 0.
+/// The only buffers that need an actual copy are booleans (if they are not 
byte-aligned)
+/// and list/binary/string offsets because the arrow implementation requires 
them to start at 0.
+/// This is useful when a kernel calculates a new validity bitmap but wants to 
reuse other buffers.
+fn slice_buffers(
+    buffers: &[Buffer],
+    offset: usize,
+    len: usize,
+    data_type: &DataType,
+    child_data: &[ArrayData],
+) -> (Vec<Buffer>, Vec<ArrayData>) {
+    use std::mem::size_of;
+
+    if offset == 0 {
+        return (buffers.to_vec(), child_data.to_vec());
+    }
+
+    // we only need to do something special to child data in 2 of the match 
branches
+    let mut result_child_data = None;
+
+    let result_buffers = match data_type {
+        DataType::Boolean => vec![buffers[0].bit_slice(offset, len)],
+        DataType::Int8 | DataType::UInt8 => {
+            vec![buffers[0].slice(offset * size_of::<u8>())]
+        }
+        DataType::Int16 | DataType::UInt16 | DataType::Float16 => {
+            vec![buffers[0].slice(offset * size_of::<u16>())]
+        }
+        DataType::Int32 | DataType::UInt32 | DataType::Float32 => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Int64 | DataType::UInt64 | DataType::Float64 => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Timestamp(_, _) | DataType::Duration(_) => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Date32 | DataType::Time32(_) => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Date64 | DataType::Time64(_) => {
+            vec![buffers[0].slice(offset * size_of::<u64>())]
+        }
+        DataType::Interval(IntervalUnit::YearMonth) => {
+            vec![buffers[0].slice(offset * size_of::<u32>())]
+        }
+        DataType::Interval(IntervalUnit::DayTime) => {
+            vec![buffers[0].slice(offset * 2 * size_of::<u32>())]
+        }
+        DataType::Interval(IntervalUnit::MonthDayNano) => {
+            vec![buffers[0].slice(offset * (2 * size_of::<u32>() + 
size_of::<u64>()))]
+        }
+        DataType::Decimal(_, _) => vec![buffers[0].slice(offset * 
size_of::<i128>())],
+        DataType::Dictionary(key_type, _) => match key_type.as_ref() {
+            DataType::Int8 | DataType::UInt8 => {
+                vec![buffers[0].slice(offset * size_of::<u8>())]
+            }
+            DataType::Int16 | DataType::UInt16 => {
+                vec![buffers[0].slice(offset * size_of::<u16>())]
+            }
+            DataType::Int32 | DataType::UInt32 => {
+                vec![buffers[0].slice(offset * size_of::<u32>())]
+            }
+            DataType::Int64 | DataType::UInt64 => {
+                vec![buffers[0].slice(offset * size_of::<u64>())]
+            }
+            _ => unreachable!(),
+        },
+        DataType::List(_) => {
+            // safe because for List the first buffer is guaranteed to contain 
i32 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            let last_offset = offsets[len] as usize;
+            let nested_len = last_offset - first_offset;
+
+            // since we calculate a new offset buffer starting from 0 we also 
have to slice the child data
+            result_child_data = Some(
+                child_data
+                    .iter()
+                    .map(|d| d.slice(first_offset, nested_len))
+                    .collect(),
+            );
+            vec![offset_buffer_slice::<i32>(offsets, len)]
+        }
+        DataType::LargeList(_) => {
+            // safe because for LargeList the first buffer is guaranteed to 
contain i64 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            let last_offset = offsets[len] as usize;
+            let nested_len = last_offset - first_offset;
+            // since we calculate a new offset buffer starting from 0 we also 
have to slice the child data
+
+            result_child_data = Some(
+                child_data
+                    .iter()
+                    .map(|d| d.slice(first_offset, nested_len))
+                    .collect(),
+            );
+            vec![offset_buffer_slice::<i64>(offsets, len)]
+        }
+        DataType::Binary | DataType::Utf8 => {
+            // safe because for Binary/Utf8 the first buffer is guaranteed to 
contain i32 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            vec![
+                offset_buffer_slice::<i32>(offsets, len),
+                buffers[1].slice(first_offset * size_of::<i32>()),
+            ]
+        }
+        DataType::LargeBinary | DataType::LargeUtf8 => {
+            // safe because for LargeBinary/LargeUtf8 the first buffer is 
guaranteed to contain i64 offsets
+            let offsets = unsafe { &buffers[0].typed_data()[offset..] };
+            let first_offset = offsets[0] as usize;
+            vec![
+                offset_buffer_slice::<i64>(offsets, len),
+                buffers[1].slice(first_offset * size_of::<i64>()),
+            ]
+        }
+        DataType::FixedSizeBinary(size) => {
+            vec![buffers[0].slice(offset * (*size as usize))]
+        }
+        DataType::FixedSizeList(_, _size) => {
+            // TODO: should this actually slice the child arrays?
+            vec![]
+        }
+        DataType::Struct(_) => {
+            // TODO: should this actually slice the child arrays?
+            vec![]
+        }
+        DataType::Union(..) => {
+            // TODO: verify this is actually correct
+            if buffers.len() > 1 {
+                // dense union, type ids and offsets
+                vec![
+                    buffers[0].slice(offset * size_of::<i8>()),
+                    buffers[1].slice(offset * size_of::<u32>()),
+                ]
+            } else {
+                // sparse union with only type ids
+                // should this also slice the child arrays?
+                vec![buffers[0].slice(offset * size_of::<i8>())]
+            }
+        }
+        DataType::Map(_, _) => {
+            // TODO: verify this is actually correct
+            result_child_data =
+                Some(child_data.iter().map(|d| d.slice(offset, 
len)).collect());
+            vec![]
+        }
+        DataType::Null => vec![],
+    };
+
+    (
+        result_buffers,
+        result_child_data.unwrap_or_else(|| child_data.to_vec()),
+    )
+}
+
+// ListArray::from(ArrayData) expected offsets to always start at 0 so we 
can't simply make a slice of the buffer,
+// but instead have to calculate a new buffer
+fn offset_buffer_slice<T: OffsetSizeTrait>(
+    original_offsets: &[T],
+    array_len: usize,
+) -> Buffer {
+    // offset buffer has 1 element more than the corresponding data buffer 
pointing yet another offset for the end of the buffer
+    let len_in_bytes = (array_len + 1) * std::mem::size_of::<T>();
+    let mut offset_buffer =
+        MutableBuffer::new(len_in_bytes).with_bitset(len_in_bytes, false);
+    let offset_slice = unsafe { offset_buffer.typed_data_mut::<T>() };
+    let offset_start = original_offsets[0];
+    offset_slice
+        .iter_mut()
+        .zip(original_offsets.iter())
+        .for_each(|(output, input)| {
+            *output = *input - offset_start;
+        });
+
+    offset_buffer.into()
+}
+
+pub fn nullif_alternative(
+    array: &ArrayRef,
+    condition: &BooleanArray,
+) -> Result<ArrayRef> {
+    if array.len() != condition.len() {
+        return Err(ArrowError::ComputeError(
+            "Inputs to conditional kernels must have the same 
length".to_string(),
+        ));
+    }
+
+    let condition_buffer = combine_option_buffers(
+        condition.data().buffers().first(),
+        condition.offset(),
+        condition.data().null_buffer(),
+        condition.offset(),
+        condition.len(),
+    )
+    .map(|b| !&b);
+
+    let result_valid_buffer = combine_option_buffers(
+        condition_buffer.as_ref(),
+        0,
+        array.data().null_buffer(),
+        array.offset(),
+        condition.len(),
+    );
+
+    let (result_data_buffers, result_child_data) = slice_buffers(

Review Comment:
   I wonder if this `slice_buffers` method should instead be a method on the 
`array` (given everything comes from that). And rather than "slice" (which has 
an existing meaning) I wonder if it could be something like "trim" to indicate 
that it "trims" the buffer to start at 0 (rather than the offset). As your 
comment identifies, I could see this being useful with other kernels that 
expect the inputs to start at 0 (because it's created a buffer).



##########
arrow/src/compute/util.rs:
##########
@@ -36,25 +36,36 @@ pub(super) fn combine_option_bitmap(
     let left_offset_in_bits = left_data.offset();
     let right_offset_in_bits = right_data.offset();
 
-    let left = left_data.null_buffer();
-    let right = right_data.null_buffer();
-
-    match left {
-        None => match right {
-            None => Ok(None),
-            Some(r) => Ok(Some(r.bit_slice(right_offset_in_bits, 
len_in_bits))),
-        },
-        Some(l) => match right {
-            None => Ok(Some(l.bit_slice(left_offset_in_bits, len_in_bits))),
-
-            Some(r) => Ok(Some(buffer_bin_and(
-                l,
-                left_offset_in_bits,
-                r,
-                right_offset_in_bits,
-                len_in_bits,
-            ))),
-        },
+    let left_null_buffer = left_data.null_buffer();
+    let right_null_buffer = right_data.null_buffer();
+
+    Ok(combine_option_buffers(
+        left_null_buffer,
+        left_offset_in_bits,
+        right_null_buffer,
+        right_offset_in_bits,
+        len_in_bits,
+    ))
+}
+
+pub(super) fn combine_option_buffers(

Review Comment:
   May be worth some comments on what this combination does?
   
   ```
   /// The result is null if either of the buffers are null.
   /// The resulting buffer is at offset `0`.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to