QuakeWang commented on code in PR #224:
URL: https://github.com/apache/paimon-rust/pull/224#discussion_r3049091476


##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -16,75 +16,307 @@
 // under the License.
 
 use std::any::Any;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::common::stats::Precision;
 use datafusion::common::Statistics;
 use datafusion::error::Result as DFResult;
 use datafusion::execution::{SendableRecordBatchStream, TaskContext};
 use datafusion::physical_expr::EquivalenceProperties;
 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
-use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, 
PlanProperties};
-use futures::{StreamExt, TryStreamExt};
+use datafusion::physical_plan::{
+    DisplayAs, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
+};
+use futures::{Stream, StreamExt, TryStreamExt};
 use paimon::spec::Predicate;
-use paimon::table::Table;
+use paimon::table::{prune_splits_by_limit_hint, Table};
 use paimon::DataSplit;
 
 use crate::error::to_datafusion_error;
+use crate::filter_pushdown::ScanFetchMode;
+
+#[derive(Debug)]
+pub(crate) struct PartitionPlan {
+    planned_partitions: Vec<Arc<[DataSplit]>>,
+    partition_fetches: Vec<Option<usize>>,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct PaimonTableScanSpec {
+    pub table: Table,
+    pub projected_columns: Option<Vec<String>>,
+    pub pushed_predicate: Option<Predicate>,
+    pub full_splits: Arc<[DataSplit]>,
+    pub target_partitions: usize,
+    pub scan_fetch_mode: ScanFetchMode,
+    pub limit_hint: Option<usize>,
+    pub fetch: Option<usize>,
+}
+
+fn round_robin_buckets<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
+    let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| 
Vec::new()).collect();
+    for (index, item) in items.into_iter().enumerate() {
+        buckets[index % num_buckets].push(item);
+    }
+    buckets
+}
+
+fn empty_partition_plan() -> PartitionPlan {
+    PartitionPlan {
+        planned_partitions: vec![Arc::from(Vec::new())],
+        partition_fetches: vec![None],
+    }
+}
+
+fn build_round_robin_plan(splits: Vec<DataSplit>, target_partitions: usize) -> 
PartitionPlan {
+    if splits.is_empty() {
+        return empty_partition_plan();
+    }
+
+    let num_partitions = splits.len().min(target_partitions.max(1));
+    let buckets = round_robin_buckets(splits, num_partitions);
+
+    PartitionPlan {
+        planned_partitions: buckets.into_iter().map(Arc::from).collect(),
+        partition_fetches: vec![None; num_partitions],
+    }
+}
+
+fn build_exact_fetch_plan(
+    splits: Vec<DataSplit>,
+    target_partitions: usize,
+    fetch: usize,
+) -> PartitionPlan {
+    if splits.is_empty() {
+        return empty_partition_plan();
+    }
+
+    if target_partitions <= 1 {
+        return PartitionPlan {
+            planned_partitions: vec![Arc::from(splits)],
+            partition_fetches: vec![Some(fetch)],
+        };
+    }
+
+    let mut prefix_splits = Vec::new();
+    let mut prefix_rows = Vec::new();
+    let mut remaining = fetch;
+    let mut tail_start = None;
+
+    for (index, split) in splits.iter().enumerate() {
+        match split.merged_row_count() {
+            Some(count) if count >= 0 && (count as usize) <= remaining => {
+                prefix_splits.push(split.clone());
+                prefix_rows.push(count as usize);
+                remaining -= count as usize;
+                if remaining == 0 {
+                    break;
+                }
+            }
+            _ => {
+                tail_start = Some(index);
+                break;
+            }
+        }
+    }
+
+    if tail_start.is_none() {
+        let num_partitions = prefix_splits.len().min(target_partitions.max(1));
+        let split_buckets = round_robin_buckets(prefix_splits, num_partitions);
+        let row_buckets = round_robin_buckets(prefix_rows, num_partitions);
+
+        return PartitionPlan {
+            planned_partitions: 
split_buckets.into_iter().map(Arc::from).collect(),
+            partition_fetches: row_buckets
+                .into_iter()
+                .map(|rows| Some(rows.into_iter().sum()))
+                .collect(),
+        };
+    }
+
+    let tail_start = tail_start.expect("tail_start checked above");
+    let tail_fetch = remaining;
+
+    if prefix_splits.is_empty() {
+        return PartitionPlan {
+            planned_partitions: vec![Arc::from(splits)],
+            partition_fetches: vec![Some(fetch)],
+        };
+    }
+
+    let prefix_partition_count = prefix_splits
+        .len()
+        .min(target_partitions.saturating_sub(1).max(1));
+    let split_buckets = round_robin_buckets(prefix_splits, 
prefix_partition_count);
+    let row_buckets = round_robin_buckets(prefix_rows, prefix_partition_count);
+
+    let mut planned_partitions: Vec<Arc<[DataSplit]>> =
+        split_buckets.into_iter().map(Arc::from).collect();
+    let mut partition_fetches: Vec<Option<usize>> = row_buckets
+        .into_iter()
+        .map(|rows| Some(rows.into_iter().sum()))
+        .collect();
+
+    planned_partitions.push(Arc::from(splits[tail_start..].to_vec()));
+    partition_fetches.push(Some(tail_fetch));
+
+    PartitionPlan {
+        planned_partitions,
+        partition_fetches,
+    }
+}
+
+pub(crate) fn build_partition_plan(
+    full_splits: &[DataSplit],
+    target_partitions: usize,
+    limit_hint: Option<usize>,
+    fetch: Option<usize>,
+) -> PartitionPlan {
+    if let Some(fetch) = fetch {
+        let selected_splits = 
prune_splits_by_limit_hint(full_splits.iter().cloned(), Some(fetch));

Review Comment:
   I aligned the latest commit with this direction. The PR no longer introduces 
a DataFusion-only fetch contract: `TableProvider::scan()` now only passes a 
conservative limit hint via `read_builder.with_limit(...)` when there are no 
filters or all pushed filters are `Exact`, and `PaimonTableScan` no longer 
carries `with_fetch()` / per-partition fetch budgets / exact stream truncation.
   
   I also updated the plan tests to assert scan-side `limit=...` without 
scan-side `fetch=...` for `OFFSET + LIMIT`. So the remaining change is just 
core-owned limit-hint pruning reused by DataFusion, not a standalone fetch 
optimization feature. If we want true fetch optimization later, I agree it 
should be introduced from `table_scan` first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to