This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 65e368250 feat(datafusion): Add LIMIT pushdown support (#2006)
65e368250 is described below
commit 65e368250c1e363c3c63de06248e4bc40ad37c58
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 9 11:37:44 2026 +0800
feat(datafusion): Add LIMIT pushdown support (#2006)
Implement LIMIT pushdown to optimize queries with LIMIT clauses by
stopping data processing once the limit is reached. This reduces
unnecessary I/O and computation for queries that only need a subset of
rows.
Changes:
- Add limit field to IcebergTableScan to track row limit
- Apply limit at stream level by filtering/slicing record batches
- Update IcebergTableProvider and IcebergStaticTableProvider to pass
limit parameter to scan
- Add comprehensive tests for limit pushdown functionality
## Which issue does this PR close?
- Closes #.
## What changes are included in this PR?
## Are these changes tested?
Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
.../datafusion/src/physical_plan/scan.rs | 30 ++++++-
crates/integrations/datafusion/src/table/mod.rs | 98 +++++++++++++++++++++-
2 files changed, 125 insertions(+), 3 deletions(-)
diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index be92e93d2..d627b6a63 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -51,6 +51,8 @@ pub struct IcebergTableScan {
projection: Option<Vec<String>>,
/// Filters to apply to the table scan
predicates: Option<Predicate>,
+ /// Optional limit on the number of rows to return
+ limit: Option<usize>,
}
impl IcebergTableScan {
@@ -61,6 +63,7 @@ impl IcebergTableScan {
schema: ArrowSchemaRef,
projection: Option<&Vec<usize>>,
filters: &[Expr],
+ limit: Option<usize>,
) -> Self {
let output_schema = match projection {
None => schema.clone(),
@@ -76,6 +79,7 @@ impl IcebergTableScan {
plan_properties,
projection,
predicates,
+ limit,
}
}
@@ -95,6 +99,10 @@ impl IcebergTableScan {
self.predicates.as_ref()
}
+ pub fn limit(&self) -> Option<usize> {
+ self.limit
+ }
+
/// Computes [`PlanProperties`] used in query optimization.
fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
// TODO:
@@ -146,9 +154,29 @@ impl ExecutionPlan for IcebergTableScan {
);
let stream = futures::stream::once(fut).try_flatten();
+ // Apply limit if specified
+ let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> +
Send>> =
+ if let Some(limit) = self.limit {
+ let mut remaining = limit;
+ Box::pin(stream.try_filter_map(move |batch| {
+ futures::future::ready(if remaining == 0 {
+ Ok(None)
+ } else if batch.num_rows() <= remaining {
+ remaining -= batch.num_rows();
+ Ok(Some(batch))
+ } else {
+ let limited_batch = batch.slice(0, remaining);
+ remaining = 0;
+ Ok(Some(limited_batch))
+ })
+ }))
+ } else {
+ Box::pin(stream)
+ };
+
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
- stream,
+ limited_stream,
)))
}
}
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index ad616542a..ae87342fa 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -127,7 +127,7 @@ impl TableProvider for IcebergTableProvider {
_state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
- _limit: Option<usize>,
+ limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Load fresh table metadata from catalog
let table = self
@@ -143,6 +143,7 @@ impl TableProvider for IcebergTableProvider {
self.schema.clone(),
projection,
filters,
+ limit,
)))
}
@@ -311,7 +312,7 @@ impl TableProvider for IcebergStaticTableProvider {
_state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
- _limit: Option<usize>,
+ limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
// Use cached table (no refresh)
Ok(Arc::new(IcebergTableScan::new(
@@ -320,6 +321,7 @@ impl TableProvider for IcebergStaticTableProvider {
self.schema.clone(),
projection,
filters,
+ limit,
)))
}
@@ -774,4 +776,96 @@ mod tests {
"Plan should contain SortExec when fanout is disabled"
);
}
+
+ #[tokio::test]
+ async fn test_limit_pushdown_static_provider() {
+ use datafusion::datasource::TableProvider;
+
+ let table = get_test_table_from_metadata_file().await;
+ let table_provider =
IcebergStaticTableProvider::try_new_from_table(table.clone())
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let state = ctx.state();
+
+ // Test scan with limit
+ let scan_plan = table_provider
+ .scan(&state, None, &[], Some(10))
+ .await
+ .unwrap();
+
+ // Verify that the scan plan is an IcebergTableScan
+ let iceberg_scan = scan_plan
+ .as_any()
+ .downcast_ref::<IcebergTableScan>()
+ .expect("Expected IcebergTableScan");
+
+ // Verify the limit is set
+ assert_eq!(
+ iceberg_scan.limit(),
+ Some(10),
+ "Limit should be set to 10 in the scan plan"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_limit_pushdown_catalog_backed_provider() {
+ use datafusion::datasource::TableProvider;
+
+ let (catalog, namespace, table_name, _temp_dir) =
get_test_catalog_and_table().await;
+
+ let provider =
+ IcebergTableProvider::try_new(catalog.clone(), namespace.clone(),
table_name.clone())
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let state = ctx.state();
+
+ // Test scan with limit
+ let scan_plan = provider.scan(&state, None, &[],
Some(5)).await.unwrap();
+
+ // Verify that the scan plan is an IcebergTableScan
+ let iceberg_scan = scan_plan
+ .as_any()
+ .downcast_ref::<IcebergTableScan>()
+ .expect("Expected IcebergTableScan");
+
+ // Verify the limit is set
+ assert_eq!(
+ iceberg_scan.limit(),
+ Some(5),
+ "Limit should be set to 5 in the scan plan"
+ );
+ }
+
+ #[tokio::test]
+ async fn test_no_limit_pushdown() {
+ use datafusion::datasource::TableProvider;
+
+ let table = get_test_table_from_metadata_file().await;
+ let table_provider =
IcebergStaticTableProvider::try_new_from_table(table.clone())
+ .await
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let state = ctx.state();
+
+ // Test scan without limit
+ let scan_plan = table_provider.scan(&state, None, &[],
None).await.unwrap();
+
+ // Verify that the scan plan is an IcebergTableScan
+ let iceberg_scan = scan_plan
+ .as_any()
+ .downcast_ref::<IcebergTableScan>()
+ .expect("Expected IcebergTableScan");
+
+ // Verify the limit is None
+ assert_eq!(
+ iceberg_scan.limit(),
+ None,
+ "Limit should be None when not specified"
+ );
+ }
}