alamb commented on code in PR #18868:
URL: https://github.com/apache/datafusion/pull/18868#discussion_r2566459585


##########
datafusion/optimizer/src/optimize_projections/mod.rs:
##########
@@ -269,15 +270,11 @@ fn optimize_projections(
                 Some(projection) => indices.into_mapped_indices(|idx| 
projection[idx]),
                 None => indices.into_inner(),
             };
-            return TableScan::try_new(
-                table_name,
-                source,
-                Some(projection),
-                filters,
-                fetch,
-            )
-            .map(LogicalPlan::TableScan)
-            .map(Transformed::yes);
+            let mut new_scan =

Review Comment:
   I worry people will start to miss these fields. I wonder if we should make a 
`TableScanBuilder` or something (as a follow on PR) and deprecate 
`TableScan::try_new` to make it less likely that these fields will get forgotten



##########
datafusion/optimizer/src/push_down_limit.rs:
##########
@@ -1131,4 +1146,39 @@ mod test {
         "
         )
     }
+
+    fn has_fetch_order_sensitive_scan(plan: &LogicalPlan) -> bool {
+        let mut found = false;
+        plan.apply(|node| {
+            if let LogicalPlan::TableScan(scan) = node {
+                if scan.fetch_order_sensitive {
+                    found = true;
+                    return Ok(TreeNodeRecursion::Stop);
+                }
+            }
+            Ok(TreeNodeRecursion::Continue)
+        })
+        .expect("plan traversal");
+        found
+    }
+
+    #[test]
+    fn limit_push_down_sort_marks_scans_order_sensitive() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(table_scan)

Review Comment:
   I recommend:
   1. A negativee case
   2. A more complex test like `SELECT COUNT(*) c FROM t GROUP BY x ORDER BY c`



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -135,13 +186,56 @@ impl RowGroupAccessPlanFilter {
         // try to prune the row groups in a single call
         match predicate.prune(&pruning_stats) {
             Ok(values) => {
-                // values[i] is false means the predicate could not be true 
for row group i
+                let mut fully_contained_candidates_original_idx: Vec<usize> = 
Vec::new();
                 for (idx, &value) in 
row_group_indexes.iter().zip(values.iter()) {
                     if !value {
                         self.access_plan.skip(*idx);
                         metrics.row_groups_pruned_statistics.add_pruned(1);
                     } else {
                         metrics.row_groups_pruned_statistics.add_matched(1);
+                        fully_contained_candidates_original_idx.push(*idx);
+                    }
+                }
+
+                // Note: this part of code shouldn't be expensive with a 
limited number of row groups

Review Comment:
   would it be possible to extract this logic into a function? This one is 
already complicated enough



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -46,13 +48,19 @@ use parquet::{
 pub struct RowGroupAccessPlanFilter {
     /// which row groups should be accessed
     access_plan: ParquetAccessPlan,
+    /// which row groups are fully contained within the pruning predicate
+    is_fully_matched: Vec<bool>,

Review Comment:
   I recommend also defining "full contained" (aka it is known for sure that 
*ALL* rows within the RowGoup pass the filter -- or conversely that no rows are 
filtered)



##########
datafusion/optimizer/src/push_down_limit.rs:
##########
@@ -124,6 +124,9 @@ impl OptimizerRule for PushDownLimit {
                 })),
 
             LogicalPlan::Sort(mut sort) => {
+                let marked_input =

Review Comment:
   Since this optimizer is already doing a top-down traversal, I think we can 
avoid this call (which will rewrite all inputs a second time) and just set a 
flag on on the `PushDownLimit` structure itself and reuse the existing tree 
rewrite



##########
datafusion/catalog/src/table.rs:
##########
@@ -397,6 +398,25 @@ impl<'a> ScanArgs<'a> {
     pub fn limit(&self) -> Option<usize> {
         self.limit
     }
+
+    /// Set whether the scan's limit should be order-sensitive.
+    ///
+    /// If specified, the scan should return the limited rows in a specific 
order.
+    /// Or we can leverage limit pruning to optimize the scan.
+    ///
+    /// # Arguments
+    /// * `order_sensitive` - Whether the scan's limit should be 
order-sensitive
+    pub fn with_limit_order_sensitive(mut self, order_sensitive: bool) -> Self 
{

Review Comment:
   I was pretty confused by this name for awhile. After reading more of the PR 
I undersand now that this flag basically means that the order of the rows in 
the file is important for the query. When the order is not required for the 
query, it gives the datasource the freedom to reorder the rows if there is some 
way that is more efficient to apply filters / limits
   
   I think this is a nice general concept (it doesn't only apply to limits -- 
it could be used to parallelize decode within a CSV reader, for example)
   
   What do you think about renaming this field / other references to 
`preserve_order` instead of `limit_order_sensitive` . 
   
   The semantics would be that the scan must preserve the order of rows  
(basically the same practical effect of this flag now)
   
   



##########
datafusion/optimizer/src/push_down_limit.rs:
##########
@@ -124,6 +124,9 @@ impl OptimizerRule for PushDownLimit {
                 })),
 
             LogicalPlan::Sort(mut sort) => {
+                let marked_input =
+                    
mark_fetch_order_sensitive(Arc::unwrap_or_clone(sort.input))?;
+                sort.input = Arc::new(marked_input);

Review Comment:
   this is probably a conservative choice, but I think it will over eagerly 
mark some inputs -- for example  I think it will mark this scan, even when it 
doesn't matter 
   
   ```sql
   SELECT COUNT(*) as cnt, x 
   FROM t 
   GROUP BY x 
   ORDER BY cnt DESC
   LIMIT 10;
   ```
   
   I think if you used a flag approach, you could probably avoid this (by 
resetting the flag when you hit an aggregate, for example)



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -135,13 +186,56 @@ impl RowGroupAccessPlanFilter {
         // try to prune the row groups in a single call
         match predicate.prune(&pruning_stats) {
             Ok(values) => {
-                // values[i] is false means the predicate could not be true 
for row group i
+                let mut fully_contained_candidates_original_idx: Vec<usize> = 
Vec::new();
                 for (idx, &value) in 
row_group_indexes.iter().zip(values.iter()) {
                     if !value {
                         self.access_plan.skip(*idx);
                         metrics.row_groups_pruned_statistics.add_pruned(1);
                     } else {
                         metrics.row_groups_pruned_statistics.add_matched(1);
+                        fully_contained_candidates_original_idx.push(*idx);
+                    }
+                }
+
+                // Note: this part of code shouldn't be expensive with a 
limited number of row groups
+                // If we do find it's expensive, we can consider optimizing it 
further.
+                if !fully_contained_candidates_original_idx.is_empty() {
+                    // Use NotExpr to create the inverted predicate
+                    let inverted_expr =
+                        
Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
+                    // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 
0)
+                    // before building the pruning predicate
+                    let mut simplifier = 
PhysicalExprSimplifier::new(arrow_schema);
+                    let inverted_expr = 
simplifier.simplify(inverted_expr).unwrap();
+                    if let Ok(inverted_predicate) = PruningPredicate::try_new(
+                        inverted_expr,
+                        Arc::clone(predicate.schema()),
+                    ) {
+                        let inverted_pruning_stats = RowGroupPruningStatistics 
{
+                            parquet_schema,
+                            row_group_metadatas: 
fully_contained_candidates_original_idx
+                                .iter()
+                                .map(|&i| &groups[i])
+                                .collect::<Vec<_>>(),
+                            arrow_schema,
+                        };
+
+                        if let Ok(inverted_values) =
+                            inverted_predicate.prune(&inverted_pruning_stats)
+                        {
+                            for (i, &original_row_group_idx) in
+                                
fully_contained_candidates_original_idx.iter().enumerate()
+                            {
+                                // If the inverted predicate *also* prunes 
this row group (meaning inverted_values[i] is false),

Review Comment:
   This is a very clever idea -- basically if `not(predicate)` would prune the 
predicate it means that `predicate` evaluated to (known) `true` for all rows
   
   I tried thinking of counter examples, but I couldn't come up with any and 
the reasoning is sound



##########
datafusion/expr/src/logical_plan/plan.rs:
##########
@@ -2652,6 +2652,9 @@ pub struct TableScan {
     pub filters: Vec<Expr>,
     /// Optional number of rows to read
     pub fetch: Option<usize>,
+    /// If the fetch is order-sensitive, it'll be true.

Review Comment:
   I think this field should have a consistent name with the option on 
`ScanArgs`
   
   I recommend calling it `preserve_order` (see above)



##########
datafusion/physical-expr/src/simplifier/not.rs:
##########
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Simplify NOT expressions in physical expressions
+//!
+//! This module provides optimizations for NOT expressions such as:
+//! - Double negation elimination: NOT(NOT(expr)) -> expr
+//! - NOT with binary comparisons: NOT(a = b) -> a != b
+//! - NOT with IN expressions: NOT(a IN (list)) -> a NOT IN (list)
+//! - De Morgan's laws: NOT(A AND B) -> NOT A OR NOT B
+//! - Constant folding: NOT(TRUE) -> FALSE, NOT(FALSE) -> TRUE
+
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+use datafusion_common::{tree_node::Transformed, Result, ScalarValue};
+use datafusion_expr::Operator;
+
+use crate::expressions::{lit, BinaryExpr, Literal, NotExpr};
+use crate::PhysicalExpr;
+
+/// Attempts to simplify NOT expressions

Review Comment:
   I recommend we pull this code into its own PR for easier review



##########
datafusion/physical-expr/src/simplifier/mod.rs:
##########
@@ -56,6 +57,11 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> {
     type Node = Arc<dyn PhysicalExpr>;
 
     fn f_up(&mut self, node: Self::Node) -> Result<Transformed<Self::Node>> {
+        // Apply NOT expression simplification first

Review Comment:
   I think we should pull the NOT simplification into its own PR



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -70,6 +78,49 @@ impl RowGroupAccessPlanFilter {
         self.access_plan
     }
 
+    /// Returns the is_fully_matched vector
+    pub fn is_fully_matched(&self) -> &Vec<bool> {
+        &self.is_fully_matched
+    }
+
+    /// Prunes the access plan based on the limit and fully contained row 
groups.

Review Comment:
   I think we should explain the rationale here and leave a link to the paper 
-- I think you can adapt the very nice description on 
https://github.com/apache/datafusion/issues/18860
   
   Basically:
   1. Why is this optimization important: (can reduce the number of RowGroups 
needed with a known limit)
   2. Why is it correct (the rationale on 
https://github.com/apache/datafusion/issues/18860)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to