bharath-techie commented on code in PR #20381:
URL: https://github.com/apache/datafusion/pull/20381#discussion_r3117617504
##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -790,16 +787,20 @@ impl TopKHeap {
/// Compact this heap, rewriting all stored batches into a single
/// input batch
pub fn maybe_compact(&mut self) -> Result<()> {
- // we compact if the number of "unused" rows in the store is
- // past some pre-defined threshold. Target holding up to
- // around 20 batches, but handle cases of large k where some
- // batches might be partially full
- let max_unused_rows = (20 * self.batch_size) + self.k;
- let unused_rows = self.store.unused_rows();
-
- // don't compact if the store has one extra batch or
- // unused rows is under the threshold
- if self.store.len() <= 2 || unused_rows < max_unused_rows {
+ // Don't compact if there's only one batch (compacting into itself is
pointless)
+ if self.store.len() <= 1 {
+ return Ok(());
+ }
+
+ let total_rows = self.store.total_rows;
+
+ let avg_bytes_per_row = self.store.batches_size / total_rows.max(1);
// .max(1) prevents div by zero
+ let compacted_estimate = avg_bytes_per_row * self.inner.len();
+
+ // Compact when current store memory exceeds 2x what the compacted
+ // result would need. The multiplier avoids compacting when the
+ // savings would be marginal.
+ if self.store.batches_size <= compacted_estimate * 2 {
return Ok(());
Review Comment:
This looks good to me , @Dandandan @kosiew folks do you see any concerns ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]