Dandandan commented on issue #7149: URL: https://github.com/apache/arrow-datafusion/issues/7149#issuecomment-1660756166
> Oh I think I see the problem: the fetch count is never utilized while the sort is accumulating batches: https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/physical_plan/sorts/sort.rs#L628-L632 > > Consequently, all the batches get loaded into memory inside of the `ExternalSorter`: > > ```diff > diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs > index f660f0acf..6eb63e39a 100644 > --- a/datafusion/core/src/physical_plan/sorts/sort.rs > +++ b/datafusion/core/src/physical_plan/sorts/sort.rs > @@ -159,6 +159,7 @@ impl ExternalSorter { > } > } > > + debug!("Inserted batch of size {} for a total of {} in-memory batches", input.num_rows(), self.in_mem_batches.len() + 1); > self.in_mem_batches.push(input); > self.in_mem_batches_sorted = false; > Ok(()) > ``` > > ```shell > [2023-08-01T08:13:27Z TRACE datafusion::physical_plan::limit] Start GlobalLimitExec::execute for partition: 0 > [2023-08-01T08:13:27Z TRACE datafusion::physical_plan::sorts::sort] Start SortExec::execute for partition 0 of context session_id 8855be36-07c1-495b-b6fd-9cb9501cb46d and task_id None > [2023-08-01T08:13:27Z TRACE datafusion::physical_plan::sorts::sort] End SortExec's input.execute for partition: 0 > [2023-08-01T08:13:28Z DEBUG datafusion::physical_plan::sorts::sort] Inserted batch of size 8192 for a total of 1 in-memory batches > [2023-08-01T08:13:28Z DEBUG datafusion::physical_plan::sorts::sort] Inserted batch of size 209 for a total of 2 in-memory batches > [2023-08-01T08:13:28Z DEBUG datafusion::physical_plan::sorts::sort] Inserted batch of size 8192 for a total of 3 in-memory batches > [2023-08-01T08:13:28Z DEBUG datafusion::physical_plan::sorts::sort] Inserted batch of size 209 for a total of 4 in-memory batches > [2023-08-01T08:13:28Z DEBUG datafusion::physical_plan::sorts::sort] Inserted batch of size 8192 for a total of 5 in-memory batches > [2023-08-01T08:13:28Z DEBUG datafusion::physical_plan::sorts::sort] Inserted batch of size 209 for a total of 6 in-memory batches > ... > [2023-08-01T08:14:12Z DEBUG datafusion::physical_plan::sorts::sort] Inserted batch of size 209 for a total of 710 in-memory batches > [2023-08-01T08:14:12Z DEBUG datafusion::physical_plan::sorts::sort] Inserted batch of size 6836 for a total of 711 in-memory batches > ``` > > This raises a (maybe naive) related question: why bother with materializing the record batches from the [input stream](https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/physical_plan/sorts/sort.rs#L611) and potentially spilling them to the disk at this point, only for them to be [streamed back](https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/physical_plan/sorts/sort.rs#L174-L193) into `streaming_merge` function again? > > EDIT: Alternatively, the following diff fixes the issue: > > ```diff > diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs > index f660f0acf..49e44a834 100644 > --- a/datafusion/core/src/physical_plan/sorts/sort.rs > +++ b/datafusion/core/src/physical_plan/sorts/sort.rs > @@ -133,11 +133,17 @@ impl ExternalSorter { > /// Appends an unsorted [`RecordBatch`] to `in_mem_batches` > /// > /// Updates memory usage metrics, and possibly triggers spilling to disk > - async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> { > + async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> { > if input.num_rows() == 0 { > return Ok(()); > } > > + if self.fetch.map_or(false, |f| f < input.num_rows()) { > + // Sort the batch pre-emptively to reduce the number of rows > + // since the fetch count is smaller than the batch size > + input = sort_batch(&input, &self.expr, self.fetch)?; > + } > + > let size = batch_byte_size(&input); > if self.reservation.try_grow(size).is_err() { > let before = self.reservation.size(); > ``` > > <img alt="slika" width="1379" src="https://user-images.githubusercontent.com/45558892/257501818-9712e501-f218-4f51-a7ed-7ad87eee829a.png"> Not sure about the overall performance implications of this tweak however (i.e. it may impact the runtime for some cases I guess). Also the value of this tweak is progressively reduced the larger the fetch size is, and when fetch size reaches the batch size, we're back to the underlying problem. Ah, this might be a good improvement. I thought it did it like this before, but this might have been changed more recently. I think doing this should be as fast (as long as we avoid to perform sorting twice). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
