kosiew opened a new pull request, #20866: URL: https://github.com/apache/datafusion/pull/20866
## Which issue does this PR close? * Part of #20788 ## Rationale for this change `UnnestExec` currently expands each input `RecordBatch` into a fully unnested output batch before emitting any rows downstream. For high-fanout inputs, this can create very large intermediate batches and significantly increase peak memory usage, especially in query shapes like `unnest + group by` where downstream operators may also need to buffer data. Issue #20788 demonstrates this with large array columns, where unnesting multiplies row counts dramatically and causes memory usage to grow far beyond the size of the original input. This change improves the execution strategy by emitting unnested results in smaller, size-bounded chunks, which helps keep memory usage under control while preserving existing semantics. ## What changes are included in this PR? This PR updates `UnnestExec` to process input batches incrementally instead of materializing the full unnested output for the entire input batch at once. The main changes are: * Add chunked draining for input batches in `UnnestStream` via a `PendingBatch` state object. * Respect the session `batch_size` when deciding how many input rows to slice and unnest into the next output batch. * Add estimation logic (`next_input_slice_row_count`, `estimate_row_output_rows`, and `list_output_length`) to bound the number of output rows produced per emitted batch. * Use a conservative single-row fallback for recursive unnest (`depth > 1`) to avoid underestimating expansion. * Centralize list array dispatch in a new helper (`as_list_array_type`) and reuse it in both row-length estimation and existing list unnest logic. * Ensure empty output batches are still skipped. In addition, this PR adds focused tests covering: * high-fanout single-column unnest with output split across multiple batches, * multi-column unnest where per-row list lengths differ, * `preserve_nulls = false` behavior under batch slicing, * recursive unnest fallback behavior under a small batch size. ## Are these changes tested? Yes. This PR adds new tests in `datafusion/core/tests/dataframe/mod.rs` to verify both correctness and chunking behavior. The tests validate: * total output row counts remain correct, * emitted batch sizes do not exceed the configured `batch_size`, * output values match expected results for single-column, multi-column, null-dropping, and recursive unnest cases. These tests specifically exercise the new batch-slicing path introduced in `UnnestExec`. ## Are there any user-facing changes? There are no intended user-facing API changes. This change improves execution behavior and memory characteristics for queries using `unnest`, especially on high-fanout list data. Query results remain the same, but output may now be emitted in smaller record batches. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
