2010YOUY01 commented on code in PR #18799:
URL: https://github.com/apache/datafusion/pull/18799#discussion_r2544086681
##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -1198,4 +1201,52 @@ mod tests {
Ok(())
}
+
+ /// This test verifies that the dynamic filter is marked as complete after
TopK processing finishes.
+ #[tokio::test]
+ async fn test_topk_marks_filter_complete() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+
+ let sort_expr = PhysicalSortExpr {
+ expr: col("a", schema.as_ref())?,
+ options: SortOptions::default(),
+ };
+
+ let full_expr = LexOrdering::from([sort_expr.clone()]);
+ let prefix = vec![sort_expr];
+
+ // Create a dummy runtime environment and metrics
+ let runtime = Arc::new(RuntimeEnv::default());
+ let metrics = ExecutionPlanMetricsSet::new();
+
+ // Create a dynamic filter that we'll check for completion
+ let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![],
lit(true)));
+ let dynamic_filter_clone = Arc::clone(&dynamic_filter);
+
+ // Create a TopK instance
+ let mut topk = TopK::try_new(
+ 0,
+ Arc::clone(&schema),
+ prefix,
+ full_expr,
+ 2,
+ 10,
+ runtime,
+ &metrics,
+ Arc::new(RwLock::new(TopKDynamicFilters::new(dynamic_filter))),
+ )?;
+
+ let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(1),
Some(2)]));
+ let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array])?;
+ topk.insert_batch(batch)?;
+
Review Comment:
Should we also do an assertion for 'in progress' here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]