alamb commented on code in PR #9625:
URL: https://github.com/apache/arrow-rs/pull/9625#discussion_r3344236018


##########
arrow-select/src/take.rs:
##########
@@ -495,99 +495,126 @@ fn take_bytes<T: ByteArrayType, IndexType: 
ArrowPrimitiveType>(
     array: &GenericByteArray<T>,
     indices: &PrimitiveArray<IndexType>,
 ) -> Result<GenericByteArray<T>, ArrowError> {
+    let mut values: Vec<u8> = Vec::new();
     let mut offsets = Vec::with_capacity(indices.len() + 1);
     offsets.push(T::Offset::default());
 
     let input_offsets = array.value_offsets();
     let mut capacity = 0;
     let nulls = take_nulls(array.nulls(), indices);
 
-    let (offsets, values) = if array.null_count() == 0 && indices.null_count() 
== 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for index in indices.values() {
-            values.extend_from_slice(array.value(index.as_usize()).as_ref());
-        }
-        (offsets, values)
-    } else if indices.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+    // Branch on output nulls — `None` means every output slot is valid.
+    match nulls.as_ref().filter(|n| n.null_count() > 0) {
+        // Fast path: no nulls in output, every index is valid.
+        None => {
+            for index in indices.values() {
+                let index = index.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets.push(
+                    T::Offset::from_usize(capacity)
+                        .ok_or_else(|| 
ArrowError::OffsetOverflowError(capacity))?,
+                );
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                values.extend_from_slice(array.value(index).as_ref());
-            }
-        }
-        (offsets, values)
-    } else if array.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if indices.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+            values.reserve(capacity);
+
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+            let mut offset = 0;
+
+            for index in indices.values() {
+                // SAFETY: in-bounds proven by the first loop's bounds-checked 
offset access.
+                // dst asserted above to include the required capacity.
+                unsafe {
+                    let data: &[u8] = 
array.value_unchecked(index.as_usize()).as_ref();
+                    std::ptr::copy_nonoverlapping(
+                        data.as_ptr(),
+                        dst[offset..].as_mut_ptr().cast::<u8>(),
+                        data.len(),
+                    );
+                    offset += data.len();
+                }
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for (i, index) in indices.values().iter().enumerate() {
-            if indices.is_valid(i) {
-                
values.extend_from_slice(array.value(index.as_usize()).as_ref());
+            // SAFETY: wrote exactly `capacity` bytes above; reserved on line 
above.
+            unsafe {
+                values.set_len(capacity);
             }
         }
-        (offsets, values)
-    } else {
-        let nulls = nulls.as_ref().unwrap();
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+        // Nullable path: only process valid (non-null) output positions.
+        Some(output_nulls) => {
+            let mut ranges = Vec::with_capacity(indices.len() - 
output_nulls.null_count());

Review Comment:
   this does technically do another allocation, but given the performance 
results, apparently this is offset by the better vectorization



##########
arrow-select/src/take.rs:
##########
@@ -495,99 +495,126 @@ fn take_bytes<T: ByteArrayType, IndexType: 
ArrowPrimitiveType>(
     array: &GenericByteArray<T>,
     indices: &PrimitiveArray<IndexType>,
 ) -> Result<GenericByteArray<T>, ArrowError> {
+    let mut values: Vec<u8> = Vec::new();
     let mut offsets = Vec::with_capacity(indices.len() + 1);
     offsets.push(T::Offset::default());
 
     let input_offsets = array.value_offsets();
     let mut capacity = 0;
     let nulls = take_nulls(array.nulls(), indices);
 
-    let (offsets, values) = if array.null_count() == 0 && indices.null_count() 
== 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for index in indices.values() {
-            values.extend_from_slice(array.value(index.as_usize()).as_ref());
-        }
-        (offsets, values)
-    } else if indices.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+    // Branch on output nulls — `None` means every output slot is valid.
+    match nulls.as_ref().filter(|n| n.null_count() > 0) {
+        // Fast path: no nulls in output, every index is valid.
+        None => {
+            for index in indices.values() {
+                let index = index.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets.push(
+                    T::Offset::from_usize(capacity)
+                        .ok_or_else(|| 
ArrowError::OffsetOverflowError(capacity))?,
+                );
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                values.extend_from_slice(array.value(index).as_ref());
-            }
-        }
-        (offsets, values)
-    } else if array.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if indices.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+            values.reserve(capacity);
+
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+            let mut offset = 0;
+
+            for index in indices.values() {
+                // SAFETY: in-bounds proven by the first loop's bounds-checked 
offset access.
+                // dst asserted above to include the required capacity.
+                unsafe {
+                    let data: &[u8] = 
array.value_unchecked(index.as_usize()).as_ref();
+                    std::ptr::copy_nonoverlapping(
+                        data.as_ptr(),
+                        dst[offset..].as_mut_ptr().cast::<u8>(),

Review Comment:
   You could potentially get rid of another bounds check here if you wanted to 
crank up the unsafety and possibly give LLVM more optimization opportunities
   
   ```rust
                           
dst.get_unchecked_mut(offset..).as_mut_ptr().cast::<u8>(),
   ```



##########
arrow-select/src/take.rs:
##########
@@ -495,99 +495,126 @@ fn take_bytes<T: ByteArrayType, IndexType: 
ArrowPrimitiveType>(
     array: &GenericByteArray<T>,
     indices: &PrimitiveArray<IndexType>,
 ) -> Result<GenericByteArray<T>, ArrowError> {
+    let mut values: Vec<u8> = Vec::new();
     let mut offsets = Vec::with_capacity(indices.len() + 1);
     offsets.push(T::Offset::default());
 
     let input_offsets = array.value_offsets();
     let mut capacity = 0;
     let nulls = take_nulls(array.nulls(), indices);
 
-    let (offsets, values) = if array.null_count() == 0 && indices.null_count() 
== 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for index in indices.values() {
-            values.extend_from_slice(array.value(index.as_usize()).as_ref());
-        }
-        (offsets, values)
-    } else if indices.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+    // Branch on output nulls — `None` means every output slot is valid.
+    match nulls.as_ref().filter(|n| n.null_count() > 0) {
+        // Fast path: no nulls in output, every index is valid.
+        None => {
+            for index in indices.values() {
+                let index = index.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets.push(
+                    T::Offset::from_usize(capacity)
+                        .ok_or_else(|| 
ArrowError::OffsetOverflowError(capacity))?,
+                );
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                values.extend_from_slice(array.value(index).as_ref());
-            }
-        }
-        (offsets, values)
-    } else if array.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if indices.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+            values.reserve(capacity);
+
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+            let mut offset = 0;
+
+            for index in indices.values() {
+                // SAFETY: in-bounds proven by the first loop's bounds-checked 
offset access.
+                // dst asserted above to include the required capacity.
+                unsafe {
+                    let data: &[u8] = 
array.value_unchecked(index.as_usize()).as_ref();
+                    std::ptr::copy_nonoverlapping(
+                        data.as_ptr(),
+                        dst[offset..].as_mut_ptr().cast::<u8>(),
+                        data.len(),
+                    );
+                    offset += data.len();
+                }
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for (i, index) in indices.values().iter().enumerate() {
-            if indices.is_valid(i) {
-                
values.extend_from_slice(array.value(index.as_usize()).as_ref());
+            // SAFETY: wrote exactly `capacity` bytes above; reserved on line 
above.
+            unsafe {
+                values.set_len(capacity);
             }
         }
-        (offsets, values)
-    } else {
-        let nulls = nulls.as_ref().unwrap();
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+        // Nullable path: only process valid (non-null) output positions.
+        Some(output_nulls) => {
+            let mut ranges = Vec::with_capacity(indices.len() - 
output_nulls.null_count());
+            let mut last_filled = 0;
+
+            // Pre-fill offsets; we overwrite valid positions below.
+            offsets.resize(indices.len() + 1, T::Offset::default());
+
+            // Pass 1: find all valid ranges that need to be copied.
+            for i in output_nulls.valid_indices() {
+                let current_offset = T::Offset::from_usize(capacity)
+                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+                // Fill offsets for skipped null slots so they get zero-length 
ranges.
+                if last_filled < i {
+                    offsets[last_filled + 1..=i].fill(current_offset);
+                }
+
+                // SAFETY: `i` comes from a validity bitmap over `indices`, so 
it is in-bounds.
+                let index = unsafe { indices.value_unchecked(i) }.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets[i + 1] = T::Offset::from_usize(capacity)
+                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+
+                debug_assert!(end >= start, "invalid range: start ({start}) > 
end ({end})");
+
+                ranges.push((start, end));
+                last_filled = i + 1;
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
+
+            // Fill trailing null offsets after the last valid position.
+            let final_offset = T::Offset::from_usize(capacity)
+                .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+            offsets[last_filled + 1..].fill(final_offset);
+            // Pass 2: copy byte data for all collected ranges.
+            values.reserve(capacity);
+            debug_assert_eq!(
+                ranges.iter().map(|(s, e)| e - s).sum::<usize>(),
+                capacity,
+                "capacity must equal total bytes across all ranges"
             );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for (i, index) in indices.values().iter().enumerate() {
-            // check index is valid before using index. The value in
-            // NULL index slots may not be within bounds of array
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                values.extend_from_slice(array.value(index).as_ref());
+
+            let src = array.value_data();
+            let src = src.as_ptr();
+            let dst = values.spare_capacity_mut();

Review Comment:
   TIL: spare_capacity_mut 👍 



##########
arrow-select/src/take.rs:
##########
@@ -495,99 +495,126 @@ fn take_bytes<T: ByteArrayType, IndexType: 
ArrowPrimitiveType>(
     array: &GenericByteArray<T>,
     indices: &PrimitiveArray<IndexType>,
 ) -> Result<GenericByteArray<T>, ArrowError> {
+    let mut values: Vec<u8> = Vec::new();
     let mut offsets = Vec::with_capacity(indices.len() + 1);
     offsets.push(T::Offset::default());
 
     let input_offsets = array.value_offsets();
     let mut capacity = 0;
     let nulls = take_nulls(array.nulls(), indices);
 
-    let (offsets, values) = if array.null_count() == 0 && indices.null_count() 
== 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for index in indices.values() {
-            values.extend_from_slice(array.value(index.as_usize()).as_ref());
-        }
-        (offsets, values)
-    } else if indices.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+    // Branch on output nulls — `None` means every output slot is valid.
+    match nulls.as_ref().filter(|n| n.null_count() > 0) {
+        // Fast path: no nulls in output, every index is valid.
+        None => {
+            for index in indices.values() {
+                let index = index.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets.push(
+                    T::Offset::from_usize(capacity)
+                        .ok_or_else(|| 
ArrowError::OffsetOverflowError(capacity))?,
+                );
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                values.extend_from_slice(array.value(index).as_ref());
-            }
-        }
-        (offsets, values)
-    } else if array.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if indices.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+            values.reserve(capacity);
+
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+            let mut offset = 0;
+
+            for index in indices.values() {
+                // SAFETY: in-bounds proven by the first loop's bounds-checked 
offset access.
+                // dst asserted above to include the required capacity.
+                unsafe {
+                    let data: &[u8] = 
array.value_unchecked(index.as_usize()).as_ref();
+                    std::ptr::copy_nonoverlapping(
+                        data.as_ptr(),
+                        dst[offset..].as_mut_ptr().cast::<u8>(),
+                        data.len(),
+                    );
+                    offset += data.len();
+                }
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for (i, index) in indices.values().iter().enumerate() {
-            if indices.is_valid(i) {
-                
values.extend_from_slice(array.value(index.as_usize()).as_ref());
+            // SAFETY: wrote exactly `capacity` bytes above; reserved on line 
above.
+            unsafe {
+                values.set_len(capacity);
             }
         }
-        (offsets, values)
-    } else {
-        let nulls = nulls.as_ref().unwrap();
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+        // Nullable path: only process valid (non-null) output positions.
+        Some(output_nulls) => {
+            let mut ranges = Vec::with_capacity(indices.len() - 
output_nulls.null_count());
+            let mut last_filled = 0;
+
+            // Pre-fill offsets; we overwrite valid positions below.
+            offsets.resize(indices.len() + 1, T::Offset::default());
+
+            // Pass 1: find all valid ranges that need to be copied.
+            for i in output_nulls.valid_indices() {
+                let current_offset = T::Offset::from_usize(capacity)
+                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+                // Fill offsets for skipped null slots so they get zero-length 
ranges.
+                if last_filled < i {
+                    offsets[last_filled + 1..=i].fill(current_offset);
+                }
+
+                // SAFETY: `i` comes from a validity bitmap over `indices`, so 
it is in-bounds.
+                let index = unsafe { indices.value_unchecked(i) }.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets[i + 1] = T::Offset::from_usize(capacity)
+                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+
+                debug_assert!(end >= start, "invalid range: start ({start}) > 
end ({end})");

Review Comment:
   technically this is unreachable I think as in a debug build the `usize` 
calculation `end - start` would underflow and panic already in the statement 
above
   
   ```rust
                   capacity += end - start;
   ```



##########
arrow-select/src/take.rs:
##########
@@ -495,99 +495,126 @@ fn take_bytes<T: ByteArrayType, IndexType: 
ArrowPrimitiveType>(
     array: &GenericByteArray<T>,
     indices: &PrimitiveArray<IndexType>,
 ) -> Result<GenericByteArray<T>, ArrowError> {
+    let mut values: Vec<u8> = Vec::new();
     let mut offsets = Vec::with_capacity(indices.len() + 1);
     offsets.push(T::Offset::default());
 
     let input_offsets = array.value_offsets();
     let mut capacity = 0;
     let nulls = take_nulls(array.nulls(), indices);
 
-    let (offsets, values) = if array.null_count() == 0 && indices.null_count() 
== 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for index in indices.values() {
-            values.extend_from_slice(array.value(index.as_usize()).as_ref());
-        }
-        (offsets, values)
-    } else if indices.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+    // Branch on output nulls — `None` means every output slot is valid.
+    match nulls.as_ref().filter(|n| n.null_count() > 0) {
+        // Fast path: no nulls in output, every index is valid.
+        None => {
+            for index in indices.values() {
+                let index = index.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets.push(
+                    T::Offset::from_usize(capacity)
+                        .ok_or_else(|| 
ArrowError::OffsetOverflowError(capacity))?,
+                );
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                values.extend_from_slice(array.value(index).as_ref());
-            }
-        }
-        (offsets, values)
-    } else if array.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if indices.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+            values.reserve(capacity);
+
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+            let mut offset = 0;
+
+            for index in indices.values() {
+                // SAFETY: in-bounds proven by the first loop's bounds-checked 
offset access.
+                // dst asserted above to include the required capacity.
+                unsafe {
+                    let data: &[u8] = 
array.value_unchecked(index.as_usize()).as_ref();
+                    std::ptr::copy_nonoverlapping(
+                        data.as_ptr(),
+                        dst[offset..].as_mut_ptr().cast::<u8>(),
+                        data.len(),
+                    );
+                    offset += data.len();
+                }
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for (i, index) in indices.values().iter().enumerate() {
-            if indices.is_valid(i) {
-                
values.extend_from_slice(array.value(index.as_usize()).as_ref());
+            // SAFETY: wrote exactly `capacity` bytes above; reserved on line 
above.
+            unsafe {
+                values.set_len(capacity);
             }
         }
-        (offsets, values)
-    } else {
-        let nulls = nulls.as_ref().unwrap();
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+        // Nullable path: only process valid (non-null) output positions.
+        Some(output_nulls) => {
+            let mut ranges = Vec::with_capacity(indices.len() - 
output_nulls.null_count());
+            let mut last_filled = 0;
+
+            // Pre-fill offsets; we overwrite valid positions below.
+            offsets.resize(indices.len() + 1, T::Offset::default());
+
+            // Pass 1: find all valid ranges that need to be copied.
+            for i in output_nulls.valid_indices() {
+                let current_offset = T::Offset::from_usize(capacity)
+                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+                // Fill offsets for skipped null slots so they get zero-length 
ranges.
+                if last_filled < i {
+                    offsets[last_filled + 1..=i].fill(current_offset);
+                }
+
+                // SAFETY: `i` comes from a validity bitmap over `indices`, so 
it is in-bounds.
+                let index = unsafe { indices.value_unchecked(i) }.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets[i + 1] = T::Offset::from_usize(capacity)
+                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+
+                debug_assert!(end >= start, "invalid range: start ({start}) > 
end ({end})");
+
+                ranges.push((start, end));
+                last_filled = i + 1;
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
+
+            // Fill trailing null offsets after the last valid position.
+            let final_offset = T::Offset::from_usize(capacity)
+                .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?;
+            offsets[last_filled + 1..].fill(final_offset);
+            // Pass 2: copy byte data for all collected ranges.
+            values.reserve(capacity);
+            debug_assert_eq!(
+                ranges.iter().map(|(s, e)| e - s).sum::<usize>(),
+                capacity,
+                "capacity must equal total bytes across all ranges"
             );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for (i, index) in indices.values().iter().enumerate() {
-            // check index is valid before using index. The value in
-            // NULL index slots may not be within bounds of array
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                values.extend_from_slice(array.value(index).as_ref());
+
+            let src = array.value_data();
+            let src = src.as_ptr();
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+
+            let mut offset = 0;
+
+            for (start, end) in ranges.into_iter() {
+                let value_len = end - start;
+                // SAFETY: caller guarantees each (start, end) is in-bounds of 
`src`.
+                // `dst` asserted above to include the required capacity.
+                // The regions don't overlap (src is input, dst is a fresh 
allocation).
+                unsafe {
+                    std::ptr::copy_nonoverlapping(
+                        src.add(start),
+                        dst[offset..].as_mut_ptr().cast::<u8>(),

Review Comment:
   same as above -- you might be able to avoid another bounds check using more 
unsafe here



##########
arrow-select/src/take.rs:
##########
@@ -495,99 +495,126 @@ fn take_bytes<T: ByteArrayType, IndexType: 
ArrowPrimitiveType>(
     array: &GenericByteArray<T>,
     indices: &PrimitiveArray<IndexType>,
 ) -> Result<GenericByteArray<T>, ArrowError> {
+    let mut values: Vec<u8> = Vec::new();
     let mut offsets = Vec::with_capacity(indices.len() + 1);
     offsets.push(T::Offset::default());
 
     let input_offsets = array.value_offsets();
     let mut capacity = 0;
     let nulls = take_nulls(array.nulls(), indices);
 
-    let (offsets, values) = if array.null_count() == 0 && indices.null_count() 
== 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
-
-        for index in indices.values() {
-            values.extend_from_slice(array.value(index.as_usize()).as_ref());
-        }
-        (offsets, values)
-    } else if indices.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+    // Branch on output nulls — `None` means every output slot is valid.
+    match nulls.as_ref().filter(|n| n.null_count() > 0) {
+        // Fast path: no nulls in output, every index is valid.
+        None => {
+            for index in indices.values() {
+                let index = index.as_usize();
+                let start = input_offsets[index].as_usize();
+                let end = input_offsets[index + 1].as_usize();
+                capacity += end - start;
+                offsets.push(
+                    T::Offset::from_usize(capacity)
+                        .ok_or_else(|| 
ArrowError::OffsetOverflowError(capacity))?,
+                );
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for index in indices.values() {
-            let index = index.as_usize();
-            if array.is_valid(index) {
-                values.extend_from_slice(array.value(index).as_ref());
-            }
-        }
-        (offsets, values)
-    } else if array.null_count() == 0 {
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if indices.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+            values.reserve(capacity);
+
+            let dst = values.spare_capacity_mut();
+            debug_assert!(dst.len() >= capacity);
+            let mut offset = 0;
+
+            for index in indices.values() {
+                // SAFETY: in-bounds proven by the first loop's bounds-checked 
offset access.
+                // dst asserted above to include the required capacity.
+                unsafe {
+                    let data: &[u8] = 
array.value_unchecked(index.as_usize()).as_ref();
+                    std::ptr::copy_nonoverlapping(
+                        data.as_ptr(),
+                        dst[offset..].as_mut_ptr().cast::<u8>(),
+                        data.len(),
+                    );
+                    offset += data.len();
+                }
             }
-            offsets.push(
-                T::Offset::from_usize(capacity)
-                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
-            );
-        }
-        let mut values = Vec::with_capacity(capacity);
 
-        for (i, index) in indices.values().iter().enumerate() {
-            if indices.is_valid(i) {
-                
values.extend_from_slice(array.value(index.as_usize()).as_ref());
+            // SAFETY: wrote exactly `capacity` bytes above; reserved on line 
above.
+            unsafe {
+                values.set_len(capacity);
             }
         }
-        (offsets, values)
-    } else {
-        let nulls = nulls.as_ref().unwrap();
-        offsets.reserve(indices.len());
-        for (i, index) in indices.values().iter().enumerate() {
-            let index = index.as_usize();
-            if nulls.is_valid(i) {
-                capacity += input_offsets[index + 1].as_usize() - 
input_offsets[index].as_usize();
+        // Nullable path: only process valid (non-null) output positions.
+        Some(output_nulls) => {
+            let mut ranges = Vec::with_capacity(indices.len() - 
output_nulls.null_count());

Review Comment:
   nit: I found `ranges` a bit generic and had to look carefully at how it was 
used. I would have found `source_ranges` maybe easier to understand



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to