Copilot commented on code in PR #22450:
URL: https://github.com/apache/datafusion/pull/22450#discussion_r3286139005


##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
@@ -1234,16 +1255,38 @@ impl RowGroupsPrunedParquetOpen {
 
         let files_ranges_pruned_statistics =
             prepared.file_metrics.files_ranges_pruned_statistics.clone();
+
+        // Build a dynamic row-group pruner only when both halves are useful:
+        //   1) the scan has a predicate (so there is something to evaluate),
+        //   2) there is at least one pending run that could be skipped.
+        // The pruner consults the predicate's `snapshot_generation` so the
+        // cost is one rebuild per dynamic-filter update, not per RG check.
+        let row_group_pruner = match (&prepared.predicate, 
pending_runs.is_empty()) {
+            (Some(predicate), false) => Some(RowGroupPruner::new(
+                Arc::clone(predicate),
+                Arc::clone(&prepared.physical_file_schema),
+                Arc::clone(reader_metadata.metadata()),
+                prepared.file_metrics.predicate_evaluation_errors.clone(),

Review Comment:
   `RowGroupPruner::new` takes a `predicate_creation_errors` counter (used by 
`build_pruning_predicate`), but this call site passes 
`prepared.file_metrics.predicate_evaluation_errors`. This will misattribute 
pruning-predicate construction failures as evaluation failures, and also makes 
the metric semantics inconsistent with the existing static row-group pruning 
path (which uses `prepared.predicate_creation_errors` for construction and 
`file_metrics.predicate_evaluation_errors` for evaluation). Consider passing 
`prepared.predicate_creation_errors.clone()` here, and (if you also want to 
track `PruningPredicate::prune` errors) add a separate counter for evaluation 
errors instead of reusing the creation counter.
   



##########
datafusion/datasource-parquet/src/push_decoder.rs:
##########
@@ -94,6 +101,118 @@ impl DecoderBuilderConfig<'_> {
     }
 }
 
+/// A decoder queued to run after the current one finishes, together with
+/// the metadata needed to decide whether it should be skipped.
+///
+/// We keep the row group indices alongside the decoder so that, between
+/// runs, the stream state can apply runtime decisions — e.g. dynamic
+/// `TopK`-driven RG-level pruning — by inspecting which row groups a
+/// pending run covers without having to crack the decoder open.
+#[derive(Debug)]
+pub(crate) struct PendingDecoderRun {
+    pub(crate) decoder: ParquetPushDecoder,
+    /// Row group indices this decoder will scan.
+    pub(crate) row_group_indices: Vec<usize>,
+}
+
+/// Runtime row-group pruner driven by a dynamic predicate (e.g. the
+/// threshold expression a `TopK` operator pushes down).
+///
+/// Mirrors the [`FilePruner`](datafusion_pruning::FilePruner) pattern at
+/// the row-group level: tracks the predicate's `snapshot_generation` so it
+/// only rebuilds the [`PruningPredicate`] when the dynamic filter has
+/// actually moved, then evaluates the cached predicate against the
+/// statistics of the requested row groups.
+pub(crate) struct RowGroupPruner {
+    predicate: Arc<dyn PhysicalExpr>,
+    arrow_schema: SchemaRef,
+    parquet_metadata: Arc<ParquetMetaData>,
+    /// Last predicate `snapshot_generation` we built `pruning_predicate` for.
+    last_generation: Option<u64>,
+    /// Cached pruning predicate. `None` means we couldn't build one for the
+    /// current generation (e.g. the predicate has no analyzable bounds);
+    /// in that case we conservatively don't prune.
+    pruning_predicate: Option<Arc<PruningPredicate>>,
+    /// Metric for pruning-predicate construction failures.
+    predicate_creation_errors: Count,
+}
+
+impl RowGroupPruner {
+    pub(crate) fn new(
+        predicate: Arc<dyn PhysicalExpr>,
+        arrow_schema: SchemaRef,
+        parquet_metadata: Arc<ParquetMetaData>,
+        predicate_creation_errors: Count,
+    ) -> Self {
+        Self {
+            predicate,
+            arrow_schema,
+            parquet_metadata,
+            last_generation: None,
+            pruning_predicate: None,
+            predicate_creation_errors,
+        }
+    }
+
+    /// Returns `true` when the statistics for `row_group_indices` prove that
+    /// every requested row group can be skipped under the current value of
+    /// the dynamic predicate.
+    ///
+    /// On any error (predicate construction, statistics evaluation) the
+    /// pruner conservatively returns `false` and logs the failure, so a
+    /// flaky pruning path never silently drops data.
+    pub(crate) fn should_prune(&mut self, row_group_indices: &[usize]) -> bool 
{
+        if row_group_indices.is_empty() {
+            return false;
+        }
+
+        // Refresh the cached `PruningPredicate` if the dynamic filter has
+        // moved since we last evaluated it.
+        let new_generation = snapshot_generation(&self.predicate);
+        if self.last_generation != Some(new_generation) {
+            self.pruning_predicate = build_pruning_predicate(
+                Arc::clone(&self.predicate),
+                &self.arrow_schema,
+                &self.predicate_creation_errors,
+            );
+            self.last_generation = Some(new_generation);
+        }
+
+        let Some(pp) = self.pruning_predicate.as_ref() else {
+            return false;
+        };
+
+        let row_group_metadatas = row_group_indices
+            .iter()
+            .map(|&i| self.parquet_metadata.row_group(i))
+            .collect::<Vec<_>>();
+        let stats = RowGroupPruningStatistics {
+            parquet_schema: 
self.parquet_metadata.file_metadata().schema_descr(),
+            row_group_metadatas,
+            arrow_schema: self.arrow_schema.as_ref(),
+            // Match the existing static row-group pruning behavior: when a
+            // statistic's null count is missing, treat it as zero. This is
+            // sound for runtime pruning because the predicate only needs to
+            // prove a row group *cannot* contain matching rows.
+            missing_null_counts_as_zero: true,
+        };
+
+        match pp.prune(&stats) {
+            // `prune` returns `false` per container that the predicate proves
+            // cannot contain matching rows. We can skip the run only when
+            // every requested row group is in that state.
+            Ok(values) => values.iter().all(|&keep| !keep),
+            Err(e) => {
+                debug!(
+                    "Ignoring error building runtime row-group pruning 
predicate: {e}"
+                );
+                self.predicate_creation_errors.add(1);
+                false
+            }

Review Comment:
   `RowGroupPruner::should_prune` logs "Ignoring error building runtime 
row-group pruning predicate" and increments `predicate_creation_errors` when 
`pp.prune(&stats)` returns `Err`. At this point the pruning predicate is 
already built; the error is from evaluating it against row-group statistics. 
This both makes the log message misleading and conflates predicate *creation* 
vs predicate *evaluation* errors. Consider logging this as an evaluation error 
and incrementing the existing `predicate_evaluation_errors` metric (keeping 
`predicate_creation_errors` only for `build_pruning_predicate` failures).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to