zhuqi-lucas commented on code in PR #22450:
URL: https://github.com/apache/datafusion/pull/22450#discussion_r3421025319


##########
datafusion/datasource-parquet/src/push_decoder.rs:
##########
@@ -151,7 +305,72 @@ impl PushDecoderStreamState {
             if self.remaining_limit == Some(0) {
                 return None;
             }
-            match self.decoder.try_decode() {
+
+            // Step 1: drain a batch from the active reader if any.
+            if let Some(reader) = self.active_reader.as_mut() {
+                match reader.next() {
+                    Some(Ok(batch)) => {
+                        let mut timer = 
self.baseline_metrics.elapsed_compute().timer();
+                        self.copy_arrow_reader_metrics();
+                        let result = self.project_batch(&batch);
+                        timer.stop();
+                        drop(timer);
+                        return Some((result, self));
+                    }
+                    Some(Err(e)) => {
+                        return Some((Err(DataFusionError::from(e)), self));
+                    }
+                    None => {
+                        // Reader exhausted: drop and fall through to per-RG
+                        // boundary handling, then try_next_reader.
+                        self.active_reader = None;
+                    }
+                }
+            }
+
+            // Step 2: at RG boundary, drop pruned RGs from the head of the
+            // plan. Rebuild the decoder via `into_builder` so it skips the
+            // pruned RGs entirely. Buffered bytes for already-fetched RGs
+            // carry across the rebuild.
+            if !self.rg_plan.is_empty() {
+                let mut pruned_head = 0usize;
+                while let Some(next) = self.rg_plan.front() {
+                    let pruned = self
+                        .row_group_pruner
+                        .as_mut()
+                        .map(|p| p.should_prune(&[next.rg_index]))
+                        .unwrap_or(false);
+                    if pruned {
+                        pruned_head += 1;
+                        self.rg_plan.pop_front();
+                        self.row_groups_pruned_dynamic.add(1);
+                    } else {
+                        break;
+                    }
+                }
+                if pruned_head > 0 {
+                    if self.rg_plan.is_empty() {
+                        return None;
+                    }
+                    let decoder = self.decoder.take().expect("decoder 
present");
+                    let new_indices: Vec<usize> =
+                        self.rg_plan.iter().map(|e| e.rg_index).collect();
+                    let rebuilt = match decoder.into_builder() {
+                        Ok(b) => b.with_row_groups(new_indices).build(),
+                        Err(e) => Err(e),
+                    };

Review Comment:
   Added test in latest PR, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to