neilconway commented on code in PR #22390:
URL: https://github.com/apache/datafusion/pull/22390#discussion_r3282338914


##########
datafusion/functions-nested/src/remove.rs:
##########
@@ -468,6 +531,98 @@ fn general_remove<OffsetSize: OffsetSizeTrait>(
     )?))
 }
 
+/// For each element of `list_array[i]`, removed up to `arr_n[i]` occurrences
+/// of `needle[0]` (scalar element broadcasted).
+///
+/// This is a specialized version of `general_remove` for scalar elements that
+/// uses bulk comparison for better performance.
+fn general_remove_with_scalar<OffsetSize: OffsetSizeTrait>(
+    list_array: &GenericListArray<OffsetSize>,
+    needle: &ArrayRef,
+    arr_n: &[i64],
+) -> Result<ArrayRef> {
+    let list_field = match list_array.data_type() {
+        DataType::List(field) | DataType::LargeList(field) => field,
+        _ => {
+            return exec_err!(
+                "Expected List or LargeList data type, got {:?}",
+                list_array.data_type()
+            );
+        }
+    };
+    let original_data = list_array.values().to_data();
+    let mut offsets = Vec::<OffsetSize>::with_capacity(list_array.len() + 1);
+    offsets.push(OffsetSize::zero());
+
+    let mut mutable = MutableArrayData::with_capacities(
+        vec![&original_data],
+        false,
+        Capacities::Array(original_data.len()),
+    );
+    let nulls = list_array.nulls().cloned();
+    let keep_mask =
+        arrow_ord::cmp::distinct(list_array.values(), 
&Scalar::new(Arc::clone(needle)))?;
+
+    for (row_index, offset_window) in 
list_array.offsets().windows(2).enumerate() {
+        if nulls.as_ref().is_some_and(|nulls| nulls.is_null(row_index)) {
+            offsets.push(offsets[row_index]);
+            continue;
+        }
+
+        let start = offset_window[0].to_usize().unwrap();
+        let end = offset_window[1].to_usize().unwrap();
+
+        let n = arr_n[row_index];
+
+        if n <= 0 {
+            mutable.extend(0, start, end);
+            offsets.push(offsets[row_index] + OffsetSize::usize_as(end - 
start));
+            continue;
+        }
+
+        let eq_array = keep_mask.slice(start, end - start);
+        let num_to_remove = eq_array.false_count();
+
+        if num_to_remove == 0 {
+            mutable.extend(0, start, end);
+            offsets.push(offsets[row_index] + OffsetSize::usize_as(end - 
start));
+            continue;
+        }
+
+        let max_removals = n.min(num_to_remove as i64);
+        let mut removed = 0i64;
+        let mut copied = 0usize;
+        let mut pending_batch_to_retain: Option<usize> = None;
+        for (i, keep) in eq_array.iter().enumerate() {
+            if keep == Some(false) && removed < max_removals {
+                if let Some(bs) = pending_batch_to_retain {
+                    mutable.extend(0, start + bs, start + i);
+                    copied += i - bs;
+                    pending_batch_to_retain = None;
+                }
+                removed += 1;
+            } else if pending_batch_to_retain.is_none() {
+                pending_batch_to_retain = Some(i);
+            }
+        }

Review Comment:
   Amazing!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to