alamb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2021746142


##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1769,13 +1775,13 @@ mod tests {
         let sql = "select * from base_table where name='test02'";
         let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
         assert_eq!(batch.len(), 1);
-        insta::assert_snapshot!(batches_to_string(&batch),@r###"
-            +---------------------+----+--------+
-            | struct              | id | name   |
-            +---------------------+----+--------+
-            | {id: 4, name: aaa2} | 2  | test02 |
-            +---------------------+----+--------+
-        "###);
+        insta::assert_snapshot!(batches_to_string(&batch),@r"

Review Comment:
   hopefully updating these expected output files wasn't terrible



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1455,6 +1456,7 @@ mod tests {
             .await;
 
         // should have a pruning predicate
+        #[expect(deprecated)]

Review Comment:
   maybe we can come up with some way to annotate on the plan that pruning will 
happen (even if the actual predicate isn't stored on the source itself) 🤔 



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1976,4 +2066,231 @@ mod tests {
         assert_eq!(calls.len(), 2);
         assert_eq!(calls, vec![Some(123), Some(456)]);
     }
+
+    #[tokio::test]
+    async fn test_topk_predicate_pushdown() {

Review Comment:
   I recommend we put these in the core integration tests as they are running 
queries using a Session context.
   
   
https://github.com/apache/datafusion/blob/818e7390b650b4f2ba71e09f99ef0b39406bac0a/datafusion/core/tests/parquet_config.rs#L18
   
   (I am not sure why that is called `parquet_config.rs` -- seems like it 
should be 'parquet' 🤔 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -186,6 +235,90 @@ impl TopK {
         Ok(())
     }
 
+    fn calculate_dynamic_filters(
+        thresholds: Vec<ColumnThreshold>,
+    ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+        // Create filter expressions for each threshold
+        let mut filters: Vec<Arc<dyn PhysicalExpr>> =

Review Comment:
   `col > 123` is implemented like `BinaryExpr(Column, Literal)`
   
   I really think the idea of `DynamicLiteral` or something is quite compelling 
   
   You could store 
   ```rust
   struct DynamicLiteral {
     literal: Arc<Mutex<Arc<ScalarValur>>>>
   }
   ```
   
   And follow the model of evaluation for 
[`Literal`](https://github.com/apache/datafusion/blob/main/datafusion/physical-expr/src/expressions/literal.rs)
   
   Except you would first clone the Arc before execution
   
   That would mean the `Mutex` is only held long enough to Clone Arcs so would 
minimize contention
   



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -644,10 +836,101 @@ impl RecordBatchStore {
     }
 }
 
+/// Pushdown of dynamic fitlers from TopK operators is used to speed up queries
+/// such as `SELECT * FROM table ORDER BY col DESC LIMIT 10` by pushing down 
the
+/// threshold values for the sort columns to the data source.
+/// That is, the TopK operator will keep track of the top 10 values for the 
sort
+/// and before a new file is opened it's statitics will be checked against the
+/// threshold values to determine if the file can be skipped and predicate 
pushdown
+/// will use these to skip rows during the scan.
+///
+/// For example, imagine this data gets created if multiple sources with clock 
skews,
+/// network delays, etc. are writing data and you don't do anything fancy to 
guarantee
+/// perfect sorting by `timestamp` (i.e. you naively write out the data to 
Parquet, maybe do some compaction, etc.).
+/// The point is that 99% of yesterday's files have a `timestamp` smaller than 
99% of today's files
+/// but there may be a couple seconds of overlap between files.
+/// To be concrete, let's say this is our data:
+//
+// | file | min | max |
+// |------|-----|-----|
+// | 1    | 1   | 10  |
+// | 2    | 9   | 19  |
+// | 3    | 20  | 31  |
+// | 4    | 30  | 35  |
+//
+// Ideally a [`TableProvider`] is able to use file level stats or other 
methods to roughly order the files
+// within each partition / file group such that we start with the newest / 
largest `timestamp`s.
+// If this is not possible the optimization still works but is less efficient 
and harder to visualize,
+// so for this example let's assume that we process 1 file at a time and we 
started with file 4.
+// After processing file 4 let's say we have 10 values in our TopK heap, the 
smallest of which is 30.
+// The TopK operator will then push down the filter `timestamp < 30` down the 
tree of [`ExecutionPlan`]s
+// and if the data source supports dynamic filter pushdown it will accept a 
reference to this [`DynamicPhysicalExprSource`]
+// and when it goes to open file 3 it will ask the 
[`DynamicPhysicalExprSource`] for the current filters.
+// Since file 3 may contain values larger than 30 we cannot skip it entirely,
+// but scanning it may still be more efficient due to page pruning and other 
optimizations.
+// Once we get to file 2 however we can skip it entirely because we know that 
all values in file 2 are smaller than 30.
+// The same goes for file 1.
+// So this optimization just saved us 50% of the work of scanning the data.
+#[derive(Debug, Clone)]
+struct TopKDynamicFilterSource {

Review Comment:
   maybe we can put this in its own file `topk/filter.rs` or 
`topk/dynamic_filter.rs`



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -173,11 +210,23 @@ impl TopK {
                 None | Some(_) => {
                     self.heap.add(&mut batch_entry, row, index);
                     self.metrics.row_replacements.add(1);
+                    need_to_update_dynamic_filters = true;
                 }
             }
         }
         self.heap.insert_batch_entry(batch_entry);
 
+        if need_to_update_dynamic_filters {
+            if let Some(filters) = self.filters.as_ref() {
+                if let Some(threasholds) = 
self.heap.get_threshold_values(&self.expr)? {

Review Comment:
   ```suggestion
                   if let Some(thresholds) = 
self.heap.get_threshold_values(&self.expr)? {
   ```



##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1904,6 +1951,49 @@ mod tests {
         }
     }
 
+    struct DynamicFilterTestCase {
+        query: String,
+        path: String,
+    }
+
+    impl DynamicFilterTestCase {
+        fn new(query: String, path: String) -> Self {
+            Self { query, path }
+        }
+
+        async fn _run_query(&self, query: &str) -> Vec<RecordBatch> {

Review Comment:
   What is the reason to name it starting with `_` ? perhaps it could just be
   
   ```suggestion
           async fn run_query(&self, query: &str) -> Vec<RecordBatch> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to