adriangb commented on code in PR #22450:
URL: https://github.com/apache/datafusion/pull/22450#discussion_r3419902761


##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
@@ -1419,16 +1412,51 @@ impl RowGroupsPrunedParquetOpen {
 
         let files_ranges_pruned_statistics =
             prepared.file_metrics.files_ranges_pruned_statistics.clone();
+
+        // Build a dynamic row-group pruner only when all three conditions 
hold:
+        //   1) the scan has a predicate (so there is something to evaluate),
+        //   2) the predicate has at least one not-yet-complete dynamic filter
+        //      (`DynamicFilterTracking::Watching`) — static or 
already-complete
+        //      predicates were fully consumed by `prune_by_statistics` at file
+        //      open, so re-evaluating them per RG boundary would be wasted 
work,
+        //   3) there is at least one pending RG that could be skipped.
+        // The pruner subscribes once to every still-incomplete dynamic filter
+        // via the `DynamicFilterTracker` watch channel (#22460), so detecting
+        // a threshold change is a single atomic load — not a tree walk per
+        // RG check.
+        let row_group_pruner = match (&prepared.predicate, rg_plan.len() > 1) {
+            (Some(predicate), true)
+                if DynamicFilterTracking::classify(predicate)
+                    .contains_dynamic_filter() =>

Review Comment:
   I think `contains_dynamic_filter()` also returns `true` for `AllComplete`.
   
   I suggest we `if matches!(DynamicFilterTracking::classify(predicate), 
DynamicFilterTracking::Watching(_))` or something like that.



##########
datafusion/datasource-parquet/src/push_decoder.rs:
##########
@@ -151,7 +305,72 @@ impl PushDecoderStreamState {
             if self.remaining_limit == Some(0) {
                 return None;
             }
-            match self.decoder.try_decode() {
+
+            // Step 1: drain a batch from the active reader if any.
+            if let Some(reader) = self.active_reader.as_mut() {
+                match reader.next() {
+                    Some(Ok(batch)) => {
+                        let mut timer = 
self.baseline_metrics.elapsed_compute().timer();
+                        self.copy_arrow_reader_metrics();
+                        let result = self.project_batch(&batch);
+                        timer.stop();
+                        drop(timer);
+                        return Some((result, self));
+                    }
+                    Some(Err(e)) => {
+                        return Some((Err(DataFusionError::from(e)), self));
+                    }
+                    None => {
+                        // Reader exhausted: drop and fall through to per-RG
+                        // boundary handling, then try_next_reader.
+                        self.active_reader = None;
+                    }
+                }
+            }
+
+            // Step 2: at RG boundary, drop pruned RGs from the head of the
+            // plan. Rebuild the decoder via `into_builder` so it skips the
+            // pruned RGs entirely. Buffered bytes for already-fetched RGs
+            // carry across the rebuild.
+            if !self.rg_plan.is_empty() {
+                let mut pruned_head = 0usize;
+                while let Some(next) = self.rg_plan.front() {
+                    let pruned = self
+                        .row_group_pruner
+                        .as_mut()
+                        .map(|p| p.should_prune(&[next.rg_index]))
+                        .unwrap_or(false);
+                    if pruned {
+                        pruned_head += 1;
+                        self.rg_plan.pop_front();
+                        self.row_groups_pruned_dynamic.add(1);
+                    } else {
+                        break;
+                    }
+                }
+                if pruned_head > 0 {
+                    if self.rg_plan.is_empty() {
+                        return None;
+                    }
+                    let decoder = self.decoder.take().expect("decoder 
present");
+                    let new_indices: Vec<usize> =
+                        self.rg_plan.iter().map(|e| e.rg_index).collect();
+                    let rebuilt = match decoder.into_builder() {
+                        Ok(b) => b.with_row_groups(new_indices).build(),
+                        Err(e) => Err(e),
+                    };

Review Comment:
   Do we need to adjust the `RowSelection` here? Or does that just work fine? 
Might be worth adding a test where there will be a row selection (e.g. every 
other page skipped because of a filter).



##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
@@ -1366,50 +1368,41 @@ impl RowGroupsPrunedParquetOpen {
                 &prepared.file_metrics,
             );
 
-            // Split into consecutive runs of row groups that share the same 
filter
-            // requirement. Fully matched row groups skip the RowFilter; 
others need it.
-            // Reverse the run order for reverse scans so the combined decoder 
stream
-            // preserves the requested global row group order.
-            let mut runs = 
access_plan.split_runs(row_filter_generator.has_row_filter());
-            if prepared.reverse_row_groups {
-                runs.reverse();
-            }
-            let run_count = runs.len();
-            let decoder_limit = prepared.limit.filter(|_| run_count == 1);
-            let remaining_limit = prepared.limit.filter(|_| run_count > 1);
-
+            // Build the prepared access plan first — `prepare_access_plan` may
+            // call `reorder_by_statistics` (for `sort_order_for_reorder`) and
+            // `reverse` (for `reverse_row_groups`), both of which mutate
+            // `row_group_indexes` to the physical scan order the decoder will
+            // actually read. We MUST build our `rg_plan` from this reordered
+            // list, otherwise our per-RG pruner check would consult the
+            // metadata of a different RG than the decoder is about to yield.
             let decoder_config = DecoderBuilderConfig {
                 projection_mask: decoder_projection.projection_mask(),
                 batch_size: prepared.batch_size,
                 arrow_reader_metrics: &arrow_reader_metrics,
                 force_filter_selections: prepared.force_filter_selections,
-                decoder_limit,
+                decoder_limit: prepared.limit,
             };
 
-            // Build a decoder per run.
-            let mut decoders = VecDeque::with_capacity(runs.len());
-            for run in runs {
-                let prepared_access_plan = 
prepare_access_plan(run.access_plan)?;
-                let mut builder =
-                    decoder_config.build(prepared_access_plan, 
reader_metadata.clone());
-                if run.needs_filter {
-                    if let Some(row_filter) = 
row_filter_generator.next_filter() {
-                        builder = builder.with_row_filter(row_filter);
-                    }
-                    if let Some(max_predicate_cache_size) =
-                        prepared.max_predicate_cache_size
-                    {
-                        builder = builder
-                            
.with_max_predicate_cache_size(max_predicate_cache_size);
-                    }
+            let prepared_access_plan = prepare_access_plan(access_plan)?;
+            let rg_plan: VecDeque<RgPlanEntry> = prepared_access_plan
+                .row_group_indexes
+                .iter()
+                .copied()
+                .map(|rg_index| RgPlanEntry { rg_index })
+                .collect();
+
+            let mut builder =
+                decoder_config.build(prepared_access_plan, 
reader_metadata.clone());
+            if let Some(row_filter) = row_filter_generator.next_filter() {
+                builder = builder.with_row_filter(row_filter);

Review Comment:
   `split_runs` used to build a different decoder for each row group when there 
was row groups that fully matched filters, allowing those to skip applying the 
row filter. I think we're loosing that now?



##########
datafusion/datasource-parquet/src/opener/mod.rs:
##########
@@ -1419,16 +1412,51 @@ impl RowGroupsPrunedParquetOpen {
 
         let files_ranges_pruned_statistics =
             prepared.file_metrics.files_ranges_pruned_statistics.clone();
+
+        // Build a dynamic row-group pruner only when all three conditions 
hold:
+        //   1) the scan has a predicate (so there is something to evaluate),
+        //   2) the predicate has at least one not-yet-complete dynamic filter
+        //      (`DynamicFilterTracking::Watching`) — static or 
already-complete
+        //      predicates were fully consumed by `prune_by_statistics` at file
+        //      open, so re-evaluating them per RG boundary would be wasted 
work,
+        //   3) there is at least one pending RG that could be skipped.
+        // The pruner subscribes once to every still-incomplete dynamic filter
+        // via the `DynamicFilterTracker` watch channel (#22460), so detecting
+        // a threshold change is a single atomic load — not a tree walk per
+        // RG check.
+        let row_group_pruner = match (&prepared.predicate, rg_plan.len() > 1) {
+            (Some(predicate), true)
+                if DynamicFilterTracking::classify(predicate)
+                    .contains_dynamic_filter() =>
+            {
+                Some(RowGroupPruner::new(
+                    Arc::clone(predicate),
+                    Arc::clone(&prepared.physical_file_schema),
+                    Arc::clone(reader_metadata.metadata()),
+                    prepared.predicate_creation_errors.clone(),
+                    prepared.file_metrics.predicate_evaluation_errors.clone(),
+                ))
+            }
+            _ => None,
+        };
+        let row_groups_pruned_dynamic = prepared
+            .file_metrics
+            .row_groups_pruned_dynamic_filter
+            .clone();
+
         let stream = PushDecoderStreamState {
-            decoder,
-            pending_decoders,
-            remaining_limit,
+            decoder: Some(decoder),
+            active_reader: None,
+            rg_plan,
+            remaining_limit: None,

Review Comment:
   `remaining_limit` seems like dead code to me, I don't see that it's actually 
used anywhere



##########
datafusion/core/tests/parquet/dynamic_row_group_pruning.rs:
##########
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! End-to-end test for **runtime row-group pruning** driven by a TopK
+//! `SortExec`'s `DynamicFilterPhysicalExpr`.
+//!
+//! A 5-row-group parquet file is constructed with disjoint statistics on
+//! the sort column (`v`): row group `i` contains values
+//! `[i*100, (i+1)*100)`. The query `ORDER BY v DESC LIMIT 5` fills the
+//! TopK heap from the row group with the largest values; the threshold
+//! then proves the remaining row groups cannot contribute. The runtime
+//! `RowGroupPruner` in the parquet scan must observe the tightened
+//! threshold and increment `row_groups_pruned_dynamic_filter`.
+//!
+//! We assert a property (`pruned >= 1`) rather than an exact count
+//! because batch-arrival timing affects how soon the TopK heap fills,
+//! and we don't want this test to become flaky.
+
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array, RecordBatch};
+use arrow_schema::{DataType, Field, Schema};
+
+use crate::parquet::Unit::RowGroup;
+use crate::parquet::{ContextWithParquet, Scenario};
+
+/// Build five `RecordBatch`es whose `v` column ranges are disjoint:
+/// batch `i` carries `v` values `[i*100, (i+1)*100)`. When written with
+/// `max_row_group_row_count = 100` each batch lands in its own row group.
+fn build_five_disjoint_batches(schema: &Arc<Schema>) -> Vec<RecordBatch> {
+    (0..5i64)
+        .map(|rg| {
+            let base = rg * 100;
+            let values: Vec<i64> = (base..base + 100).collect();
+            let col: ArrayRef = Arc::new(Int64Array::from(values));
+            RecordBatch::try_new(Arc::clone(schema), vec![col]).unwrap()
+        })
+        .collect()
+}
+
+/// Build five `RecordBatch`es in *descending* value order: batch 0 holds
+/// `v ∈ [400, 500)`, batch 4 holds `v ∈ [0, 100)`. The physical row-group
+/// order on disk therefore does **not** match the order a `ORDER BY v ASC`
+/// query wants — sort-pushdown's `reorder_by_statistics` must rearrange
+/// the access plan so the scan reads RG 4 first, then RG 3, etc.
+fn build_five_disjoint_batches_desc(schema: &Arc<Schema>) -> Vec<RecordBatch> {
+    (0..5i64)
+        .map(|rg| {
+            let base = (4 - rg) * 100;
+            let values: Vec<i64> = (base..base + 100).collect();
+            let col: ArrayRef = Arc::new(Int64Array::from(values));
+            RecordBatch::try_new(Arc::clone(schema), vec![col]).unwrap()
+        })
+        .collect()
+}
+
+/// `ORDER BY v DESC LIMIT 5` against a 5-RG file with disjoint per-RG
+/// stats must trigger runtime RG pruning: the first RG read fills the
+/// heap, and the tightened threshold proves every other RG unreachable.
+#[tokio::test]
+async fn dynamic_rg_pruning_metric_fires_for_topk_descending_limit() {
+    let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, 
false)]));
+    let batches = build_five_disjoint_batches(&schema);
+
+    // `with_custom_data` honors the custom schema + batches and ignores
+    // `Scenario`. `Unit::RowGroup(100)` enables `pushdown_filters`, which
+    // is required for the TopK dynamic filter to reach the parquet scan.
+    let mut ctx = ContextWithParquet::with_custom_data(
+        Scenario::Int,
+        RowGroup(100),
+        Arc::clone(&schema),
+        batches,
+    )
+    .await;
+
+    let output = ctx.query("SELECT v FROM t ORDER BY v DESC LIMIT 5").await;
+
+    assert_eq!(output.result_rows, 5, "query must return LIMIT rows",);
+
+    let pruned = output
+        .row_groups_pruned_dynamic_filter()
+        .expect("`row_groups_pruned_dynamic_filter` metric must be 
registered");
+    assert!(
+        pruned >= 1,
+        "dynamic RG pruner must skip at least one row group; \
+         pruned={pruned}\n{}",
+        output.description(),
+    );
+}
+
+/// Regression for the rg_plan / `reorder_by_statistics` ordering bug.
+///
+/// When `sort_order_for_reorder` is set on the parquet scan,
+/// `prepare_access_plan` calls
+/// [`PreparedAccessPlan::reorder_by_statistics`], which rearranges
+/// `row_group_indexes` so the decoder reads row groups in stats-optimal
+/// order (smallest-min first for ASC, etc.). The stream's per-RG plan
+/// (`rg_plan`) — which the runtime pruner walks one entry at a time —
+/// **must use this reordered list**, not the access plan's natural
+/// (index-ascending) order. Otherwise the pruner would consult the
+/// metadata of RG K while the decoder is actually about to yield RG K',
+/// silently producing wrong results.
+///
+/// This test makes the failure visible:
+///
+/// - File is written with RGs in *descending* `v` order (RG 0 has the
+///   largest values, RG 4 has the smallest).
+/// - Query is `ORDER BY v ASC LIMIT 5`, so sort-pushdown reorders the
+///   access plan to read RG 4 first, then RG 3, etc.
+/// - The smallest five values (which form the entire correct LIMIT
+///   answer) live in RG 4 alone. After they are emitted, the TopK
+///   threshold tightens enough that the per-RG pruner skips every other
+///   RG.
+///
+/// Without the fix, `rg_plan` would be `[0, 1, 2, 3, 4]` while the
+/// decoder reads `[4, 3, 2, 1, 0]`. The first yielded reader (for RG 4
+/// in the decoder) would be tracked as if it were RG 0, the pruner
+/// would check RG 1's stats (id range 300..400) against a threshold
+/// already tightened to `v < 5`, prune RG 1 (because nothing in
+/// 300..400 can satisfy `v < 5`), and then the rebuild via
+/// `into_builder` would scan a row group whose data does not match its
+/// expected metadata. The query would return fewer than five rows or
+/// the wrong rows.
+#[tokio::test]
+async fn dynamic_rg_pruning_handles_sort_pushdown_reorder() {
+    let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, 
false)]));
+    let batches = build_five_disjoint_batches_desc(&schema);
+
+    let mut ctx = ContextWithParquet::with_custom_data(
+        Scenario::Int,
+        RowGroup(100),
+        Arc::clone(&schema),
+        batches,
+    )
+    .await;
+
+    let output = ctx.query("SELECT v FROM t ORDER BY v ASC LIMIT 5").await;
+
+    // Correctness — the five smallest values in the file are 0..=4.
+    // If `rg_plan` is misaligned with the decoder's read order, the
+    // pruner consults the wrong RG's stats and the result row count or
+    // values would drift.
+    assert_eq!(output.result_rows, 5, "query must return LIMIT rows");
+    let formatted = output.pretty_results();
+    for v in 0..=4i64 {
+        assert!(
+            formatted.contains(&format!("| {v} ")),
+            "output must contain the smallest value {v}; got:\n{formatted}",
+        );
+    }
+
+    // Behavior — the per-RG pruner must engage. We don't pin the exact
+    // count (batch-arrival timing affects how soon the heap fills); we
+    // only require that at least one row group is skipped at runtime.
+    let pruned = output
+        .row_groups_pruned_dynamic_filter()
+        .expect("`row_groups_pruned_dynamic_filter` metric must be 
registered");
+    assert!(
+        pruned >= 1,
+        "with `sort_order_for_reorder` active and a tight TopK, the \
+         runtime pruner must skip at least one row group; pruned={pruned}\n{}",
+        output.description(),
+    );
+}
+
+/// A query without ORDER BY does not produce a TopK and therefore no
+/// `DynamicFilterPhysicalExpr` reaches the scan. The runtime pruner must
+/// stay quiet — the metric should be 0.
+#[tokio::test]
+async fn dynamic_rg_pruning_metric_quiet_without_topk() {
+    let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, 
false)]));
+    let batches = build_five_disjoint_batches(&schema);
+
+    let mut ctx = ContextWithParquet::with_custom_data(
+        Scenario::Int,
+        RowGroup(100),
+        Arc::clone(&schema),
+        batches,
+    )
+    .await;
+
+    // Plain `SELECT *` — no sort, no limit, no dynamic filter.
+    let output = ctx.query("SELECT v FROM t").await;
+    assert_eq!(output.result_rows, 500);
+
+    let pruned = output.row_groups_pruned_dynamic_filter().unwrap_or(0);
+    assert_eq!(
+        pruned,
+        0,
+        "without TopK there is no dynamic filter, so the runtime pruner \
+         must not fire; pruned={pruned}\n{}",
+        output.description(),
+    );
+}

Review Comment:
   I think it's worth adding a test that would trigger a `RowFilter`, e.g. 
selects 1/2 of the pages or every other page.



##########
datafusion/sqllogictest/test_files/dynamic_row_group_pruning.slt:
##########
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# End-to-end SLT for **dynamic row-group pruning** driven by a TopK
+# `SortExec`'s `DynamicFilterPhysicalExpr`.
+#
+# Builds a 5-row-group parquet file with disjoint per-RG ranges of `v`:
+#   RG 0: 0..3,  RG 1: 3..6,  RG 2: 6..9,  RG 3: 9..12,  RG 4: 12..15
+# `ORDER BY v DESC LIMIT 3` fills the TopK heap from the row group with
+# the largest values; the tightened threshold then proves every other
+# row group unreachable. The opener splits decoder runs per row group

Review Comment:
   I think this is outdated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to