alamb commented on code in PR #15301:
URL: https://github.com/apache/datafusion/pull/15301#discussion_r2012937304


##########
datafusion/core/src/datasource/physical_plan/parquet.rs:
##########
@@ -1847,6 +1848,28 @@ mod tests {
         writer.close().unwrap();
     }
 
+    fn write_file_null(file: &String) {

Review Comment:
   I don't understand what `null` means in this context.. The data appears to 
have two columns an one row with a null and a string



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -644,10 +738,122 @@ impl RecordBatchStore {
     }
 }
 
+/// Pushdown of dynamic fitlers from TopK operators is used to speed up queries
+/// such as `SELECT * FROM table ORDER BY col DESC LIMIT 10` by pushing down 
the
+/// threshold values for the sort columns to the data source.
+/// That is, the TopK operator will keep track of the top 10 values for the 
sort
+/// and before a new file is opened it's statitics will be checked against the
+/// threshold values to determine if the file can be skipped and predicate 
pushdown
+/// will use these to skip rows during the scan.
+/// 
+/// For example, imagine this data gets created if multiple sources with clock 
skews,
+/// network delays, etc. are writing data and you don't do anything fancy to 
guarantee
+/// perfect sorting by `timestamp` (i.e. you naively write out the data to 
Parquet, maybe do some compaction, etc.).
+/// The point is that 99% of yesterday's files have a `timestamp` smaller than 
99% of today's files
+/// but there may be a couple seconds of overlap between files.
+/// To be concrete, let's say this is our data:
+//
+// | file | min | max |
+// |------|-----|-----|
+// | 1    | 1   | 10  |
+// | 2    | 9   | 19  |
+// | 3    | 20  | 31  |
+// | 4    | 30  | 35  |
+//
+// Ideally a [`TableProvider`] is able to use file level stats or other 
methods to roughly order the files
+// within each partition / file group such that we start with the newest / 
largest `timestamp`s.
+// If this is not possible the optimization still works but is less efficient 
and harder to visualize,
+// so for this example let's assume that we process 1 file at a time and we 
started with file 4.
+// After processing file 4 let's say we have 10 values in our TopK heap, the 
smallest of which is 30.
+// The TopK operator will then push down the filter `timestamp < 30` down the 
tree of [`ExecutionPlan`]s
+// and if the data source supports dynamic filter pushdown it will accept a 
reference to this [`DynamicFilterSource`]
+// and when it goes to open file 3 it will ask the [`DynamicFilterSource`] for 
the current filters.
+// Since file 3 may contain values larger than 30 we cannot skip it entirely,
+// but scanning it may still be more efficient due to page pruning and other 
optimizations.
+// Once we get to file 2 however we can skip it entirely because we know that 
all values in file 2 are smaller than 30.
+// The same goes for file 1.
+// So this optimization just saved us 50% of the work of scanning the data.
+struct TopKDynamicFilterSource {
+    /// The TopK heap that provides the current filters
+    heap: Arc<RwLock<TopKHeap>>,
+    /// The sort expressions used to create the TopK
+    expr: Arc<[PhysicalSortExpr]>,
+}
+
+impl std::fmt::Debug for TopKDynamicFilterSource {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TopKDynamicFilterSource")
+            .field("expr", &self.expr)
+            .finish()
+    }
+}
+
+impl DynamicFilterSource for TopKDynamicFilterSource {
+    fn current_filters(&self) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+        let heap_guard = self.heap.read().map_err(|_| {
+            DataFusionError::Internal(
+                "Failed to acquire read lock on TopK heap".to_string(),
+            )
+        })?;
+
+        // Get the threshold values for all sort expressions
+        let Some(thresholds) = heap_guard.get_threshold_values(&self.expr)? 
else {
+            return Ok(vec![]); // No thresholds available yet
+        };
+
+        // Create filter expressions for each threshold
+        let mut filters: Vec<Arc<dyn PhysicalExpr>> =
+            Vec::with_capacity(thresholds.len());
+
+        for threshold in thresholds {
+            // Create the appropriate operator based on sort order
+            let op = if threshold.sort_options.descending {
+                // For descending sort, we want col > threshold (exclude 
smaller values)
+                Operator::Gt
+            } else {
+                // For ascending sort, we want col < threshold (exclude larger 
values)
+                Operator::Lt
+            };
+
+            let comparison = Arc::new(BinaryExpr::new(

Review Comment:
   this is interesting -- it basically takes a snapshot of the current 
threshold values and then returns a `col < <value>` type expression
   
   However, that means that as the values in the TopK heap are updated the 
expression doesn't get updated either. As the filter is currently used for 
pruning on file open, this is probably ok (as I think the code will effectivly 
create a new predicate with updated thresholds for each file)
   
   I wonder if you considered actually changing the values of the *thresholds* 
over time?
   
   That might look like
   
   ```rust
   /// Returns the current minimum value stored in the topK heap
   struct TopKValue {
     heap: Arc<Mutex<TopKHeap>>
   }
   
   impl PhysiaclExpr for TopKValue {
   ...
   }
   ```
   
   And then we would add a predicate during PredicatePushdown that looks like
   
   ```
   sort_col < TopKValue(..)
   ```
   
   The value of `TopKValue()` would change over the course of the plan 
execution as the TopK heap was updated
   
   
   



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -587,4 +578,17 @@ impl FileSource for ParquetSource {
             }
         }
     }
+
+    fn supports_dynamic_filter_pushdown(&self) -> bool {
+        true
+    }

Review Comment:
   I agree -- that would be a nicer interface -- or return a specific Enum 
perhaps 🤔 



##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -259,6 +261,8 @@ pub struct ParquetSource {
     pub(crate) metrics: ExecutionPlanMetricsSet,
     /// Optional predicate for row filtering during parquet scan
     pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
+    /// Dynamic filters for row filtering during parquet scan
+    pub(crate) dynamic_filters: Vec<Arc<dyn DynamicFilterSource>>,

Review Comment:
   again I would suggest calling this with `source` to distinguish it from the 
filters themselves (it is a minor point)
   
   ```suggestion
       pub(crate) dynamic_filters_source: Vec<Arc<dyn DynamicFilterSource>>,
   ```



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -54,6 +60,8 @@ pub(super) struct ParquetOpener {
     pub limit: Option<usize>,
     /// Optional predicate to apply during the scan
     pub predicate: Option<Arc<dyn PhysicalExpr>>,
+    /// Optional sources for dynamic filtes (e.g. joins, top-k filters)
+    pub dynamic_filters: Vec<Arc<dyn DynamicFilterSource>>,

Review Comment:
   ```suggestion
       pub dynamic_filter_source: Vec<Arc<dyn DynamicFilterSource>>,
   ```



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -102,25 +110,52 @@ impl FileOpener for ParquetOpener {
 
         let batch_size = self.batch_size;
 
+        let dynamic_filters = self
+            .dynamic_filters
+            .iter()
+            .map(|f| f.current_filters())
+            .collect::<Result<Vec<_>>>()?
+            .into_iter()
+            .flatten()
+            .collect::<Vec<_>>();
+        // Collect dynamic_filters into a single predicate by reducing with AND
+        let dynamic_predicate = dynamic_filters.into_iter().reduce(|a, b| {
+            Arc::new(BinaryExpr::new(a, datafusion_expr::Operator::And, b))
+        });
+        let enable_page_index = should_enable_page_index(
+            self.enable_page_index,
+            &self.page_pruning_predicate,
+            dynamic_predicate.is_some(),
+        );
+        let predicate = self.predicate.clone();
+        let predicate = match (predicate, dynamic_predicate) {

Review Comment:
   It would be nice to make a `conjunction` function for physical exprs 
similarly to 
https://docs.rs/datafusion-expr/46.0.1/src/datafusion_expr/utils.rs.html#1285
   
   Perhaps in the the same module as 
https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.split_conjunction.html
   
   (we can do this as a follow on / PR in parallel)



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -295,3 +350,104 @@ fn create_initial_plan(
     // default to scanning all row groups
     Ok(ParquetAccessPlan::new_all(row_group_count))
 }
+
+/// Build a pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a pruning
+/// predicate, return None.
+/// If there is an error creating the pruning predicate it is recorded by 
incrementing
+/// the `predicate_creation_errors` counter.
+pub(crate) fn build_pruning_predicate(
+    predicate: Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+    predicate_creation_errors: &Count,
+) -> Option<Arc<PruningPredicate>> {
+    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
+        Ok(pruning_predicate) => {
+            if !pruning_predicate.always_true() {
+                return Some(Arc::new(pruning_predicate));
+            }
+        }
+        Err(e) => {
+            debug!("Could not create pruning predicate for: {e}");
+            predicate_creation_errors.add(1);
+        }
+    }
+    None
+}
+
+/// Build a page pruning predicate from an optional predicate expression.
+/// If the predicate is None or the predicate cannot be converted to a page 
pruning
+/// predicate, return None.
+pub(crate) fn build_page_pruning_predicate(
+    predicate: &Arc<dyn PhysicalExpr>,
+    file_schema: &SchemaRef,
+) -> Arc<PagePruningAccessPlanFilter> {
+    Arc::new(PagePruningAccessPlanFilter::new(
+        predicate,
+        Arc::clone(file_schema),
+    ))
+}
+
+/// A vistor for a PhysicalExpr that collects all column references to 
determine what columns the expression needs to be evaluated.
+struct FilterSchemaBuilder<'schema> {
+    filter_schema_fields: BTreeSet<Arc<Field>>,
+    file_schema: &'schema Schema,
+    table_schema: &'schema Schema,
+}
+
+impl<'schema> FilterSchemaBuilder<'schema> {
+    fn new(file_schema: &'schema Schema, table_schema: &'schema Schema) -> 
Self {
+        Self {
+            filter_schema_fields: BTreeSet::new(),
+            file_schema,
+            table_schema,
+        }
+    }
+
+    fn sort_fields(
+        fields: &mut Vec<Arc<Field>>,
+        table_schema: &Schema,
+        file_schema: &Schema,
+    ) {
+        fields.sort_by_key(|f| f.name().to_string());
+        fields.dedup_by_key(|f| f.name().to_string());
+        fields.sort_by_key(|f| {
+            let table_schema_index =
+                table_schema.index_of(f.name()).unwrap_or(usize::MAX);
+            let file_schema_index = 
file_schema.index_of(f.name()).unwrap_or(usize::MAX);
+            (table_schema_index, file_schema_index)
+        });
+    }
+
+    fn build(self) -> SchemaRef {
+        let mut fields = 
self.filter_schema_fields.into_iter().collect::<Vec<_>>();
+        FilterSchemaBuilder::sort_fields(
+            &mut fields,
+            self.table_schema,
+            self.file_schema,
+        );
+        Arc::new(Schema::new(fields))
+    }
+}
+
+impl TreeNodeRewriter for FilterSchemaBuilder<'_> {
+    type Node = Arc<dyn PhysicalExpr>;
+
+    fn f_down(&mut self, node: Arc<dyn PhysicalExpr>) -> 
Result<Transformed<Self::Node>> {
+        if let Some(column) = node.as_any().downcast_ref::<Column>() {
+            if let Ok(field) = 
self.table_schema.field_with_name(column.name()) {
+                self.filter_schema_fields.insert(Arc::new(field.clone()));
+            } else if let Ok(field) = 
self.file_schema.field_with_name(column.name()) {
+                self.filter_schema_fields.insert(Arc::new(field.clone()));
+            } else {
+                // If it's not in either schema it must be a partition column

Review Comment:
   that is a fascinating -- if the code really does push filters on 
partitioning columns down, it seems to me like a better approach would not be 
to avoid pushing filters on partitioning columns into the parquet scan at all.
   
   If they are never going to be evaluated anyways there is no need for them to 
be filtered out 🤔 



##########
datafusion/physical-plan/src/topk/mod.rs:
##########
@@ -163,26 +187,32 @@ impl TopK {
         // TODO make this algorithmically better?:
         // Idea: filter out rows >= self.heap.max() early (before passing to 
`RowConverter`)
         //       this avoids some work and also might be better vectorizable.
-        let mut batch_entry = self.heap.register_batch(batch);
+        let mut heap = self.heap.try_write().map_err(|_| {
+            DataFusionError::Internal(
+                "Failed to acquire write lock on TopK heap".to_string(),

Review Comment:
   a think a poisoned lock is likely not a scenario that happens often in 
practice so it is not something that needs a lot of special handing



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1067,35 +1067,53 @@ impl ExecutionPlan for SortExec {
     ) -> Result<SendableRecordBatchStream> {
         trace!("Start SortExec::execute for partition {} of context session_id 
{} and task_id {:?}", partition, context.session_id(), context.task_id());
 
-        let mut input = self.input.execute(partition, Arc::clone(&context))?;
-
-        let execution_options = &context.session_config().options().execution;
-
-        trace!("End SortExec's input.execute for partition: {}", partition);
-
         let sort_satisfied = self
             .input
             .equivalence_properties()
             
.ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
 
+        let input_exec = Arc::clone(&self.input);
+
+        let execution_options = &context.session_config().options().execution;
+
+        trace!("End SortExec's input.execute for partition: {}", partition);
+
         match (sort_satisfied, self.fetch.as_ref()) {
-            (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
-                input,
-                0,
-                Some(*fetch),
-                BaselineMetrics::new(&self.metrics_set, partition),
-            ))),
-            (true, None) => Ok(input),
+            (true, Some(fetch)) => {
+                let input = input_exec.execute(partition, 
Arc::clone(&context))?;
+                Ok(Box::pin(LimitStream::new(
+                    input,
+                    0,
+                    Some(*fetch),
+                    BaselineMetrics::new(&self.metrics_set, partition),
+                )))
+            }
+            (true, None) => self.input.execute(partition, 
Arc::clone(&context)),
             (false, Some(fetch)) => {
+                let schema = input_exec.schema();
                 let mut topk = TopK::try_new(
                     partition,
-                    input.schema(),
+                    schema,
                     self.expr.clone(),
                     *fetch,
                     context.session_config().batch_size(),
                     context.runtime_env(),
                     &self.metrics_set,
                 )?;
+                let input_exec = if context
+                    .session_config()
+                    .options()
+                    .optimizer
+                    .enable_dynamic_filter_pushdown
+                    && input_exec.supports_dynamic_filter_pushdown()
+                {
+                    input_exec
+                        
.push_down_dynamic_filter(topk.dynamic_filter_source())?

Review Comment:
   I still don't understand why we have to push the dynamic filter *source* 
down at execution.
   
   Why can't the  `TopK` node  introduce the  dynamic `Arc<dyn<PhysicalExpr>>` 
during the PushdownFilters pass 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to