This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 65e368250 feat(datafusion): Add LIMIT pushdown support (#2006)
65e368250 is described below

commit 65e368250c1e363c3c63de06248e4bc40ad37c58
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Jan 9 11:37:44 2026 +0800

    feat(datafusion): Add LIMIT pushdown support (#2006)
    
    Implement LIMIT pushdown to optimize queries with LIMIT clauses by
    stopping data processing once the limit is reached. This reduces
    unnecessary I/O and computation for queries that only need a subset of
    rows.
    
    Changes:
    - Add limit field to IcebergTableScan to track row limit
    - Apply limit at stream level by filtering/slicing record batches
    - Update IcebergTableProvider and IcebergStaticTableProvider to pass
    limit parameter to scan
    - Add comprehensive tests for limit pushdown functionality
    
    ## Which issue does this PR close?
    
    
    - Closes #.
    
    ## What changes are included in this PR?
    
    
    ## Are these changes tested?
    
    Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
 .../datafusion/src/physical_plan/scan.rs           | 30 ++++++-
 crates/integrations/datafusion/src/table/mod.rs    | 98 +++++++++++++++++++++-
 2 files changed, 125 insertions(+), 3 deletions(-)

diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs 
b/crates/integrations/datafusion/src/physical_plan/scan.rs
index be92e93d2..d627b6a63 100644
--- a/crates/integrations/datafusion/src/physical_plan/scan.rs
+++ b/crates/integrations/datafusion/src/physical_plan/scan.rs
@@ -51,6 +51,8 @@ pub struct IcebergTableScan {
     projection: Option<Vec<String>>,
     /// Filters to apply to the table scan
     predicates: Option<Predicate>,
+    /// Optional limit on the number of rows to return
+    limit: Option<usize>,
 }
 
 impl IcebergTableScan {
@@ -61,6 +63,7 @@ impl IcebergTableScan {
         schema: ArrowSchemaRef,
         projection: Option<&Vec<usize>>,
         filters: &[Expr],
+        limit: Option<usize>,
     ) -> Self {
         let output_schema = match projection {
             None => schema.clone(),
@@ -76,6 +79,7 @@ impl IcebergTableScan {
             plan_properties,
             projection,
             predicates,
+            limit,
         }
     }
 
@@ -95,6 +99,10 @@ impl IcebergTableScan {
         self.predicates.as_ref()
     }
 
+    pub fn limit(&self) -> Option<usize> {
+        self.limit
+    }
+
     /// Computes [`PlanProperties`] used in query optimization.
     fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
         // TODO:
@@ -146,9 +154,29 @@ impl ExecutionPlan for IcebergTableScan {
         );
         let stream = futures::stream::once(fut).try_flatten();
 
+        // Apply limit if specified
+        let limited_stream: Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + 
Send>> =
+            if let Some(limit) = self.limit {
+                let mut remaining = limit;
+                Box::pin(stream.try_filter_map(move |batch| {
+                    futures::future::ready(if remaining == 0 {
+                        Ok(None)
+                    } else if batch.num_rows() <= remaining {
+                        remaining -= batch.num_rows();
+                        Ok(Some(batch))
+                    } else {
+                        let limited_batch = batch.slice(0, remaining);
+                        remaining = 0;
+                        Ok(Some(limited_batch))
+                    })
+                }))
+            } else {
+                Box::pin(stream)
+            };
+
         Ok(Box::pin(RecordBatchStreamAdapter::new(
             self.schema(),
-            stream,
+            limited_stream,
         )))
     }
 }
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index ad616542a..ae87342fa 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -127,7 +127,7 @@ impl TableProvider for IcebergTableProvider {
         _state: &dyn Session,
         projection: Option<&Vec<usize>>,
         filters: &[Expr],
-        _limit: Option<usize>,
+        limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
         // Load fresh table metadata from catalog
         let table = self
@@ -143,6 +143,7 @@ impl TableProvider for IcebergTableProvider {
             self.schema.clone(),
             projection,
             filters,
+            limit,
         )))
     }
 
@@ -311,7 +312,7 @@ impl TableProvider for IcebergStaticTableProvider {
         _state: &dyn Session,
         projection: Option<&Vec<usize>>,
         filters: &[Expr],
-        _limit: Option<usize>,
+        limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
         // Use cached table (no refresh)
         Ok(Arc::new(IcebergTableScan::new(
@@ -320,6 +321,7 @@ impl TableProvider for IcebergStaticTableProvider {
             self.schema.clone(),
             projection,
             filters,
+            limit,
         )))
     }
 
@@ -774,4 +776,96 @@ mod tests {
             "Plan should contain SortExec when fanout is disabled"
         );
     }
+
+    #[tokio::test]
+    async fn test_limit_pushdown_static_provider() {
+        use datafusion::datasource::TableProvider;
+
+        let table = get_test_table_from_metadata_file().await;
+        let table_provider = 
IcebergStaticTableProvider::try_new_from_table(table.clone())
+            .await
+            .unwrap();
+
+        let ctx = SessionContext::new();
+        let state = ctx.state();
+
+        // Test scan with limit
+        let scan_plan = table_provider
+            .scan(&state, None, &[], Some(10))
+            .await
+            .unwrap();
+
+        // Verify that the scan plan is an IcebergTableScan
+        let iceberg_scan = scan_plan
+            .as_any()
+            .downcast_ref::<IcebergTableScan>()
+            .expect("Expected IcebergTableScan");
+
+        // Verify the limit is set
+        assert_eq!(
+            iceberg_scan.limit(),
+            Some(10),
+            "Limit should be set to 10 in the scan plan"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_limit_pushdown_catalog_backed_provider() {
+        use datafusion::datasource::TableProvider;
+
+        let (catalog, namespace, table_name, _temp_dir) = 
get_test_catalog_and_table().await;
+
+        let provider =
+            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), 
table_name.clone())
+                .await
+                .unwrap();
+
+        let ctx = SessionContext::new();
+        let state = ctx.state();
+
+        // Test scan with limit
+        let scan_plan = provider.scan(&state, None, &[], 
Some(5)).await.unwrap();
+
+        // Verify that the scan plan is an IcebergTableScan
+        let iceberg_scan = scan_plan
+            .as_any()
+            .downcast_ref::<IcebergTableScan>()
+            .expect("Expected IcebergTableScan");
+
+        // Verify the limit is set
+        assert_eq!(
+            iceberg_scan.limit(),
+            Some(5),
+            "Limit should be set to 5 in the scan plan"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_no_limit_pushdown() {
+        use datafusion::datasource::TableProvider;
+
+        let table = get_test_table_from_metadata_file().await;
+        let table_provider = 
IcebergStaticTableProvider::try_new_from_table(table.clone())
+            .await
+            .unwrap();
+
+        let ctx = SessionContext::new();
+        let state = ctx.state();
+
+        // Test scan without limit
+        let scan_plan = table_provider.scan(&state, None, &[], 
None).await.unwrap();
+
+        // Verify that the scan plan is an IcebergTableScan
+        let iceberg_scan = scan_plan
+            .as_any()
+            .downcast_ref::<IcebergTableScan>()
+            .expect("Expected IcebergTableScan");
+
+        // Verify the limit is None
+        assert_eq!(
+            iceberg_scan.limit(),
+            None,
+            "Limit should be None when not specified"
+        );
+    }
 }

Reply via email to