adriangb commented on code in PR #18868:
URL: https://github.com/apache/datafusion/pull/18868#discussion_r2669633441


##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -153,6 +218,68 @@ impl RowGroupAccessPlanFilter {
         }
     }
 
+    /// Identifies row groups that are fully matched by the predicate.
+    ///
+    /// This optimization checks whether all rows in a row group satisfy the 
predicate
+    /// by inverting the predicate and checking if it prunes the row group. If 
the
+    /// inverted predicate prunes a row group, it means no rows match the 
inverted
+    /// predicate, which implies all rows match the original predicate.
+    ///
+    /// Note: This optimization is relatively inexpensive for a limited number 
of row groups.
+    fn identify_fully_matched_row_groups(
+        &mut self,
+        candidate_row_group_indices: &[usize],
+        arrow_schema: &Schema,
+        parquet_schema: &SchemaDescriptor,
+        groups: &[RowGroupMetaData],
+        predicate: &PruningPredicate,
+        metrics: &ParquetFileMetrics,
+    ) {
+        if candidate_row_group_indices.is_empty() {
+            return;
+        }
+
+        // Use NotExpr to create the inverted predicate
+        let inverted_expr = 
Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
+
+        // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0)
+        // before building the pruning predicate
+        let simplifier = PhysicalExprSimplifier::new(arrow_schema);

Review Comment:
   I wonder if we could short circuit by adding a `not()` to `PruningPredicate` 
or something like that to take an existing `PruningPredicate` and negate it by 
cloning the innner expression `Arc` or something.



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -152,6 +152,8 @@ pub struct FileScanConfig {
     /// The maximum number of records to read from this plan. If `None`,
     /// all records after filtering are returned.
     pub limit: Option<usize>,
+    /// Whether the scan's limit is order sensitive
+    pub preserve_order: bool,

Review Comment:
   Seeing this in isolation is a bit confusing. I think adding 2-3 lines of 
docstring summarizing the explanation from the PR description or something 
would be helpful.



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -76,6 +76,8 @@ pub(super) struct ParquetOpener {
     pub batch_size: usize,
     /// Optional limit on the number of rows to read
     pub limit: Option<usize>,
+    /// If should keep the output rows in order
+    pub preserve_order: bool,

Review Comment:
   Does this need to be pub? Is it pub only for tests?



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -70,6 +79,109 @@ impl RowGroupAccessPlanFilter {
         self.access_plan
     }
 
+    /// Returns the is_fully_matched vector
+    pub fn is_fully_matched(&self) -> &Vec<bool> {
+        &self.is_fully_matched
+    }
+
+    /// Prunes the access plan based on the limit and fully contained row 
groups.
+    ///
+    /// The pruning works by leveraging the concept of fully matched row 
groups. Consider a query like:
+    /// `WHERE species LIKE 'Alpine%' AND s >= 50 LIMIT N`
+    ///
+    /// After initial filtering, row groups can be classified into three 
states:
+    ///
+    /// 1. Not Matching / Pruned
+    /// 2. Partially Matching (Row Group/Page contains some matches)
+    /// 3. Fully Matching (Entire range is within predicate)
+    ///
+    /// 
+-----------------------------------------------------------------------+
+    /// |                            NOT MATCHING                              
 |
+    /// |  Partition 1                                                         
 |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// |  | SPECIES (min: 'B...',max: 'S...') | S (min: 7, max: 133)        | 
 |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// |  | Snow Vole                         | 7                           | 
 |
+    /// |  | Brown Bear                        | 133                         | 
 |
+    /// |  | Gray Wolf                         | 82                          | 
 |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// 
+-----------------------------------------------------------------------+
+    ///
+    /// 
+-----------------------------------------------------------------------+
+    /// |                          PARTIALLY MATCHING                          
 |
+    /// |  Partition 2                             Partition 4                 
 |
+    /// |  +------------------+--------------+     
+------------------+-------+ |
+    /// |  | SPECIES          | S            |     | SPECIES          | S     
| |
+    /// |  | (min:A, max:R)   |(min:6,max:70)|     | (min:A, max:P)   |[4-51] 
| |
+    /// |  +------------------+--------------+     
+------------------+-------+ |
+    /// |  | Lynx             | 71           |     | Europ. Mole      | 4     
| |
+    /// |  | Red Fox          | 40           |     | Polecat          | 16    
| |
+    /// |  | Alpine Bat       | 6            |     | Alpine Ibex      | 97    
| |
+    /// |  +------------------+--------------+     
+------------------+-------+ |
+    /// 
+-----------------------------------------------------------------------+
+    ///
+    /// 
+-----------------------------------------------------------------------+
+    /// |                           FULLY MATCHING                             
 |
+    /// |  Partition 3                                                         
 |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// |  | SPECIES (min: 'A...',max: 'A...') | S (min: 76, max: 101)       | 
 |

Review Comment:
   Is this `min: 'Alpine', max: 'Alpine'`? Because `min: 'Alp', max = 'Alp` 
would not guarantee an exhaustive match



##########
datafusion/physical-optimizer/src/limit_pushdown.rs:
##########
@@ -337,4 +362,37 @@ fn add_global_limit(
     Arc::new(GlobalLimitExec::new(pushdown_plan, skip, fetch)) as _
 }
 
+/// Helper function to handle DataSourceExec preserve_order setting
+fn ensure_preserve_order_if_needed(
+    plan: Arc<dyn ExecutionPlan>,
+    order_sensitive: bool,
+) -> Arc<dyn ExecutionPlan> {
+    if !order_sensitive {
+        return plan;
+    }
+
+    let Some(data_source_exec) = 
plan.as_any().downcast_ref::<DataSourceExec>() else {
+        return plan;
+    };

Review Comment:
   It saddens me every time I see a downcast. Maybe you decided not to because 
it's more code churn or something but could we record in the FileScanConfig if 
an order was pushed down into it and if so set the `preserve_order` flag? As 
opposed to the case where the scan has an ordering but it isn't required by 
upstream operators.



##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -283,6 +287,12 @@ impl FileScanConfigBuilder {
         self
     }
 
+    /// Set whether the limit should be order-sensitive.

Review Comment:
   Same here.



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -70,6 +79,109 @@ impl RowGroupAccessPlanFilter {
         self.access_plan
     }
 
+    /// Returns the is_fully_matched vector
+    pub fn is_fully_matched(&self) -> &Vec<bool> {
+        &self.is_fully_matched
+    }
+
+    /// Prunes the access plan based on the limit and fully contained row 
groups.
+    ///
+    /// The pruning works by leveraging the concept of fully matched row 
groups. Consider a query like:
+    /// `WHERE species LIKE 'Alpine%' AND s >= 50 LIMIT N`
+    ///
+    /// After initial filtering, row groups can be classified into three 
states:
+    ///
+    /// 1. Not Matching / Pruned
+    /// 2. Partially Matching (Row Group/Page contains some matches)
+    /// 3. Fully Matching (Entire range is within predicate)
+    ///
+    /// 
+-----------------------------------------------------------------------+
+    /// |                            NOT MATCHING                              
 |
+    /// |  Partition 1                                                         
 |

Review Comment:
   I think we use the terminology `container` in DataFusion. I understand the 
Snowflake paper uses partition / micro partition, but I think better to stick 
to container in our codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to