Weijun-H commented on code in PR #19494:
URL: https://github.com/apache/datafusion/pull/19494#discussion_r2650527109
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -813,22 +829,14 @@ impl ExternalSorter {
}
/// Estimate how much memory is needed to sort a `RecordBatch`.
-///
-/// This is used to pre-reserve memory for the sort/merge. The sort/merge
process involves
-/// creating sorted copies of sorted columns in record batches for speeding up
comparison
-/// in sorting and merging. The sorted copies are in either row format or
array format.
-/// Please refer to cursor.rs and stream.rs for more details. No matter what
format the
-/// sorted copies are, they will use more memory than the original record
batch.
-pub(crate) fn get_reserved_byte_for_record_batch_size(record_batch_size:
usize) -> usize {
- // 2x may not be enough for some cases, but it's a good start.
- // If 2x is not enough, user can set a larger value for
`sort_spill_reservation_bytes`
- // to compensate for the extra memory needed.
- record_batch_size * 2
-}
-
-/// Estimate how much memory is needed to sort a `RecordBatch`.
-fn get_reserved_byte_for_record_batch(batch: &RecordBatch) -> usize {
-
get_reserved_byte_for_record_batch_size(get_record_batch_memory_size(batch))
+/// This is calculated by adding the record batch's memory size
+/// (which can be much larger than expected for sliced record batches)
+/// with the sliced buffer sizes, as that is the amount that will be needed to
create the new buffer.
+/// The latter is rounded up to the nearest multiple of 64 based on the
architecture,
+/// as this is how arrow creates buffers.
Review Comment:
```
/// Estimate how much memory is needed to sort a `RecordBatch`.
///
/// For sliced batches, `get_record_batch_memory_size` returns the size of
the
/// underlying shared buffers (which may be larger than the logical data).
/// We add `get_sliced_size()` (the actual logical data size, rounded to 64
bytes)
/// because sorting will create new buffers containing only the referenced
data.
///
/// Total = existing buffer size + new sorted buffer size
```
##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -1198,7 +1198,7 @@ impl GroupedHashAggregateStream {
// instead.
// Spilling to disk and reading back also ensures batch size is
consistent
// rather than potentially having one significantly larger last
batch.
- self.spill()?;
+ self.spill()?; // TODO: use sort_batch_chunked instead
Review Comment:
Will you address this in this pr?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]