This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 411807609 Parquet limit pushdown (#5404) (#5416)
411807609 is described below

commit 411807609332c78ff1844a75eee6f71e94cedd55
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Feb 27 13:28:56 2023 +0000

    Parquet limit pushdown (#5404) (#5416)
---
 datafusion/core/src/physical_plan/file_format/parquet.rs | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs 
b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 5bb03b4f4..e2d8cc94d 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -376,6 +376,7 @@ impl ExecutionPlan for ParquetExec {
             partition_index,
             projection: Arc::from(projection),
             batch_size: ctx.session_config().batch_size(),
+            limit: self.base_config.limit,
             predicate: self.predicate.clone(),
             pruning_predicate: self.pruning_predicate.clone(),
             page_pruning_predicate: self.page_pruning_predicate.clone(),
@@ -460,6 +461,7 @@ struct ParquetOpener {
     partition_index: usize,
     projection: Arc<[usize]>,
     batch_size: usize,
+    limit: Option<usize>,
     predicate: Option<Arc<Expr>>,
     pruning_predicate: Option<Arc<PruningPredicate>>,
     page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
@@ -500,6 +502,7 @@ impl FileOpener for ParquetOpener {
         let reorder_predicates = self.reorder_filters;
         let pushdown_filters = self.pushdown_filters;
         let enable_page_index = self.enable_page_index;
+        let limit = self.limit;
 
         Ok(Box::pin(async move {
             let options = 
ArrowReaderOptions::new().with_page_index(enable_page_index);
@@ -562,6 +565,10 @@ impl FileOpener for ParquetOpener {
                 }
             }
 
+            if let Some(limit) = limit {
+                builder = builder.with_limit(limit)
+            }
+
             let stream = builder
                 .with_projection(mask)
                 .with_batch_size(batch_size)

Reply via email to