alamb opened a new issue, #6692: URL: https://github.com/apache/arrow-rs/issues/6692
**Is your feature request related to a problem or challenge? Please describe what you are trying to do.** Upstream in DataFusion, we have a common pattern where we have multiple input `RecordBatch`es and want to produce an output `RecordBatch` with some subset of the rows from the input batches At the moment, we typically either: 1. Apply a filter immediately to the batches (thus copying the data), but this often results in batches that are too small (see https://github.com/apache/datafusion/issues/7957). 2. Buffer up several `RecordBatch`es (what `CoalesceBatches` does) and call `concat`. Applying `filter` immediately is non ideal as it typically requires us copying the data *again* (in `CoalesceBatches`) Buffering up several `RecordBatch`es is also non ideal, especially for StringView arrays which may consume significant amounts of memory for mostly filtered rows, which requires running gc a bunch: - https://github.com/apache/datafusion/issues/11628 Here is an ascii art picture (from https://github.com/apache/datafusion/issues/7957) that shows the extra copy in action ``` ┌────────────────────┐ Filter │ │ ┌────────────────────┐ Coalesce │ │ ─ ─ ─ ─ ─ ─ ▶ │ RecordBatch │ Batches │ RecordBatch │ │ num_rows = 234 │─ ─ ─ ─ ─ ┐ │ num_rows = 8000 │ └────────────────────┘ │ │ │ │ │ ┌────────────────────┐ └────────────────────┘ │ │ │ ┌────────────────────┐ ┌────────────────────┐ │ │ │ │ Filter │ │ │ │ │ │ │ │ RecordBatch │ ─ ─ ─ ─ ─ ▶│ │ │ RecordBatch │ ─ ─ ─ ─ ─ ─ ▶ │ num_rows = 500 │─ ─ ─ ─ ─ ┐ │ │ │ num_rows = 8000 │ │ │ │ RecordBatch │ │ │ │ │ └ ─ ─ ─ ─ ─▶│ num_rows = 8000 │ │ │ └────────────────────┘ │ │ └────────────────────┘ │ │ ... ─ ─ ─ ─ ─ ▶│ │ ... ... │ │ │ │ │ ┌────────────────────┐ │ └────────────────────┘ │ │ ┌────────────────────┐ │ │ Filter │ │ │ │ RecordBatch │ │ RecordBatch │ │ num_rows = 8000 │ ─ ─ ─ ─ ─ ─ ▶ │ num_rows = 333 │─ ─ ─ ─ ─ ┘ │ │ │ │ │ │ └────────────────────┘ └────────────────────┘ FilterExec RepartitonExec copies the data creates output batches with copies *again* to form final large of the matching rows (calls take() RecordBatches to make a copy) ``` **Describe the solution you'd like** I would like some way to apply `filter`/`take` to each incoming `RecordBatch `as it arrives, copying the data to the inprogress output, in a way that is as fast as the `filter` and `take` operations Note This is somewhat like the [`interleave` kernel](https://docs.rs/arrow/latest/arrow/compute/kernels/interleave/fn.interleave.html), except that 1. we always want the output rows to be in the same order as the input batches (so the second `usize` batch index is not needed) 3. We don't want to have to buffer all the input **Describe alternatives you've considered** <!-- A clear and concise description of any alternative solutions or features you've considered. --> **Additional context** - https://github.com/apache/datafusion/issues/7957 - https://github.com/apache/datafusion/pull/12996/files#r1819245889 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
