kosiew commented on code in PR #20381:
URL: https://github.com/apache/datafusion/pull/20381#discussion_r2888883831
##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -790,16 +787,20 @@ impl TopKHeap {
/// Compact this heap, rewriting all stored batches into a single
/// input batch
pub fn maybe_compact(&mut self) -> Result<()> {
- // we compact if the number of "unused" rows in the store is
- // past some pre-defined threshold. Target holding up to
- // around 20 batches, but handle cases of large k where some
- // batches might be partially full
- let max_unused_rows = (20 * self.batch_size) + self.k;
- let unused_rows = self.store.unused_rows();
-
- // don't compact if the store has one extra batch or
- // unused rows is under the threshold
- if self.store.len() <= 2 || unused_rows < max_unused_rows {
+ // Don't compact if there's only one batch (compacting into itself is
pointless)
+ if self.store.len() <= 1 {
+ return Ok(());
+ }
+
+ let total_rows = self.store.total_rows;
+
+ let avg_bytes_per_row = self.store.batches_size / total_rows.max(1);
// .max(1) prevents div by zero
+ let compacted_estimate = avg_bytes_per_row * self.inner.len();
+
+ // Compact when current store memory exceeds 2x what the compacted
+ // result would need. The multiplier avoids compacting when the
+ // savings would be marginal.
+ if self.store.batches_size <= compacted_estimate * 2 {
return Ok(());
Review Comment:
Could we simplify and avoid truncation (from integer division)?
Also add a boolean-column regression test that verifies we *don’t* compact
every time when savings are marginal.
##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -1254,4 +1256,85 @@ mod tests {
Ok(())
}
+
+ /// Tests that memory-based compaction triggers when a large batch
+ /// has very few rows referenced by the top-k heap.
+ #[tokio::test]
+ async fn test_topk_memory_compaction() -> Result<()> {
Review Comment:
Can we also add a negative-path test (multiple batches, low expected
savings) to pin down aggressiveness?
##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -1056,6 +1053,11 @@ impl RecordBatchStore {
.batches_size
.checked_sub(get_record_batch_memory_size(&old_entry.batch))
.unwrap();
+
+ self.total_rows = self
+ .total_rows
+ .checked_sub(old_entry.batch.num_rows())
+ .unwrap_or_default();
Review Comment:
This belongs with the existing strict invariant checks
(`checked_sub(...).unwrap()` used for `batches_size`).
Prefer `expect("total_rows underflow")` (or `unwrap`) to fail fast.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]