kosiew commented on code in PR #20381:
URL: https://github.com/apache/datafusion/pull/20381#discussion_r3217605298
##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -1243,4 +1243,154 @@ mod tests {
Ok(())
}
+
+ /// Tests that memory-based compaction triggers when a large batch
+ /// has very few rows referenced by the top-k heap.
+ #[tokio::test]
+ async fn test_topk_memory_compaction() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+
+ let sort_expr = PhysicalSortExpr {
+ expr: col("a", schema.as_ref())?,
+ options: SortOptions::default(),
+ };
+
+ let full_expr = LexOrdering::from([sort_expr.clone()]);
+ let prefix = vec![sort_expr];
+
+ let runtime = Arc::new(RuntimeEnv::default());
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ let k = 5;
+ let mut topk = TopK::try_new(
+ 0,
+ Arc::clone(&schema),
+ prefix,
+ full_expr,
+ k,
+ 8192,
+ runtime,
+ &metrics,
+ Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
+ DynamicFilterPhysicalExpr::new(vec![], lit(true)),
+ )))),
+ )?;
+
+ // Insert a large batch (100,000 rows) with values 1..=100_000.
+ // Only the smallest 5 values (1..=5) will end up in the heap.
+ let large_values: Vec<i32> = (1..=100_000).collect();
+ let array1: ArrayRef = Arc::new(Int32Array::from(large_values));
+ let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array1])?;
+ topk.insert_batch(batch1)?;
+
+ // After the first batch, store has 1 batch — compaction should
+ // not trigger (guard: store.len() <= 1).
+ assert_eq!(
+ topk.heap.store.len(),
+ 1,
+ "should have 1 batch before second insert"
+ );
+
+ // Insert a second batch with a couple of values that replace
+ // some heap entries (e.g. values 2 and 3 appear again, but won't
+ // displace 1-5; use values smaller than current min-of-max to
+ // force replacements). Actually, heap already has 1..=5. Inserting
+ // values like [1, 2] would just be duplicates that don't replace.
+ // Instead, insert values that do NOT replace, to keep the 5 rows
+ // pointing at the first batch. The point is that we now have 2
+ // batches and the first is huge, so compaction should fire.
+ let array2: ArrayRef = Arc::new(Int32Array::from(vec![200_000,
300_000]));
+ let batch2 = RecordBatch::try_new(Arc::clone(&schema), vec![array2])?;
+ topk.insert_batch(batch2)?;
+
+ // After inserting a second batch, maybe_compact should have fired
+ // because the large batch dwarfs the compacted estimate.
+ // The store should now contain only 1 batch (the compacted one).
+ assert_eq!(
+ topk.heap.store.len(),
+ 1,
+ "store should be compacted to 1 batch"
+ );
+
+ // Verify the emitted results are correct (top 5 ascending).
+ let results: Vec<_> = topk.emit()?.try_collect().await?;
+ assert_batches_eq!(
+ &[
+ "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "| 4 |",
"| 5 |",
+ "+---+",
+ ],
+ &results
+ );
+
+ Ok(())
+ }
+
+ /// Negative path: when stored rows are close to the heap size,
+ /// compaction must NOT fire even with multiple batches present,
+ /// because the savings would be marginal
+ /// (guard: `total_rows <= num_rows * 2`).
+ #[tokio::test]
+ async fn test_topk_memory_compaction_skipped_when_marginal() -> Result<()>
{
Review Comment:
The requested boolean-column regression test still seems to be missing. The
new negative-path test uses an Int32Array, so it does not cover the bit-packed
boolean case from the earlier review.
Could you please add a boolean-column multi-batch test that verifies
marginal savings do not trigger compaction every time?
##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -1243,4 +1243,154 @@ mod tests {
Ok(())
}
+
+ /// Tests that memory-based compaction triggers when a large batch
+ /// has very few rows referenced by the top-k heap.
+ #[tokio::test]
+ async fn test_topk_memory_compaction() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+
+ let sort_expr = PhysicalSortExpr {
+ expr: col("a", schema.as_ref())?,
+ options: SortOptions::default(),
+ };
+
+ let full_expr = LexOrdering::from([sort_expr.clone()]);
+ let prefix = vec![sort_expr];
+
+ let runtime = Arc::new(RuntimeEnv::default());
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ let k = 5;
+ let mut topk = TopK::try_new(
+ 0,
+ Arc::clone(&schema),
+ prefix,
+ full_expr,
+ k,
+ 8192,
+ runtime,
+ &metrics,
+ Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
+ DynamicFilterPhysicalExpr::new(vec![], lit(true)),
+ )))),
+ )?;
+
+ // Insert a large batch (100,000 rows) with values 1..=100_000.
+ // Only the smallest 5 values (1..=5) will end up in the heap.
+ let large_values: Vec<i32> = (1..=100_000).collect();
+ let array1: ArrayRef = Arc::new(Int32Array::from(large_values));
+ let batch1 = RecordBatch::try_new(Arc::clone(&schema), vec![array1])?;
+ topk.insert_batch(batch1)?;
+
+ // After the first batch, store has 1 batch — compaction should
+ // not trigger (guard: store.len() <= 1).
+ assert_eq!(
+ topk.heap.store.len(),
+ 1,
+ "should have 1 batch before second insert"
+ );
+
+ // Insert a second batch with a couple of values that replace
+ // some heap entries (e.g. values 2 and 3 appear again, but won't
+ // displace 1-5; use values smaller than current min-of-max to
+ // force replacements). Actually, heap already has 1..=5. Inserting
+ // values like [1, 2] would just be duplicates that don't replace.
+ // Instead, insert values that do NOT replace, to keep the 5 rows
+ // pointing at the first batch. The point is that we now have 2
+ // batches and the first is huge, so compaction should fire.
+ let array2: ArrayRef = Arc::new(Int32Array::from(vec![200_000,
300_000]));
Review Comment:
I do not think this test currently exercises compaction. The second batch
uses [200_000, 300_000], which does not produce replacements for ascending
top-5 after the first batch already contains 1..=5.
Since TopK::insert_batch only calls maybe_compact() when replacements > 0,
the store remains at length 1 from the first insert, and the later assertion
passes without proving compaction happened.
Could you please make the second batch produce at least one replacement
while keeping multiple stored batches, or add an assertion for the
pre-compaction state that proves two batches existed before compaction?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]