QuakeWang commented on code in PR #224:
URL: https://github.com/apache/paimon-rust/pull/224#discussion_r3045038162


##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -16,75 +16,307 @@
 // under the License.
 
 use std::any::Any;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::common::stats::Precision;
 use datafusion::common::Statistics;
 use datafusion::error::Result as DFResult;
 use datafusion::execution::{SendableRecordBatchStream, TaskContext};
 use datafusion::physical_expr::EquivalenceProperties;
 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
-use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, 
PlanProperties};
-use futures::{StreamExt, TryStreamExt};
+use datafusion::physical_plan::{
+    DisplayAs, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
+};
+use futures::{Stream, StreamExt, TryStreamExt};
 use paimon::spec::Predicate;
-use paimon::table::Table;
+use paimon::table::{prune_splits_by_limit_hint, Table};
 use paimon::DataSplit;
 
 use crate::error::to_datafusion_error;
+use crate::filter_pushdown::ScanFetchMode;
+
+#[derive(Debug)]
+pub(crate) struct PartitionPlan {
+    planned_partitions: Vec<Arc<[DataSplit]>>,
+    partition_fetches: Vec<Option<usize>>,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct PaimonTableScanSpec {
+    pub table: Table,
+    pub projected_columns: Option<Vec<String>>,
+    pub pushed_predicate: Option<Predicate>,
+    pub full_splits: Arc<[DataSplit]>,
+    pub target_partitions: usize,
+    pub scan_fetch_mode: ScanFetchMode,
+    pub limit_hint: Option<usize>,
+    pub fetch: Option<usize>,
+}
+
+fn round_robin_buckets<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
+    let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| 
Vec::new()).collect();
+    for (index, item) in items.into_iter().enumerate() {
+        buckets[index % num_buckets].push(item);
+    }
+    buckets
+}
+
+fn empty_partition_plan() -> PartitionPlan {
+    PartitionPlan {
+        planned_partitions: vec![Arc::from(Vec::new())],
+        partition_fetches: vec![None],
+    }
+}
+
+fn build_round_robin_plan(splits: Vec<DataSplit>, target_partitions: usize) -> 
PartitionPlan {
+    if splits.is_empty() {
+        return empty_partition_plan();
+    }
+
+    let num_partitions = splits.len().min(target_partitions.max(1));
+    let buckets = round_robin_buckets(splits, num_partitions);
+
+    PartitionPlan {
+        planned_partitions: buckets.into_iter().map(Arc::from).collect(),
+        partition_fetches: vec![None; num_partitions],
+    }
+}
+
+fn build_exact_fetch_plan(
+    splits: Vec<DataSplit>,
+    target_partitions: usize,
+    fetch: usize,
+) -> PartitionPlan {
+    if splits.is_empty() {
+        return empty_partition_plan();
+    }
+
+    if target_partitions <= 1 {
+        return PartitionPlan {
+            planned_partitions: vec![Arc::from(splits)],
+            partition_fetches: vec![Some(fetch)],
+        };
+    }
+
+    let mut prefix_splits = Vec::new();
+    let mut prefix_rows = Vec::new();
+    let mut remaining = fetch;
+    let mut tail_start = None;
+
+    for (index, split) in splits.iter().enumerate() {
+        match split.merged_row_count() {
+            Some(count) if count >= 0 && (count as usize) <= remaining => {
+                prefix_splits.push(split.clone());
+                prefix_rows.push(count as usize);
+                remaining -= count as usize;
+                if remaining == 0 {
+                    break;
+                }
+            }
+            _ => {
+                tail_start = Some(index);
+                break;
+            }
+        }
+    }
+
+    if tail_start.is_none() {
+        let num_partitions = prefix_splits.len().min(target_partitions.max(1));
+        let split_buckets = round_robin_buckets(prefix_splits, num_partitions);
+        let row_buckets = round_robin_buckets(prefix_rows, num_partitions);
+
+        return PartitionPlan {
+            planned_partitions: 
split_buckets.into_iter().map(Arc::from).collect(),
+            partition_fetches: row_buckets
+                .into_iter()
+                .map(|rows| Some(rows.into_iter().sum()))
+                .collect(),
+        };
+    }
+
+    let tail_start = tail_start.expect("tail_start checked above");
+    let tail_fetch = remaining;
+
+    if prefix_splits.is_empty() {
+        return PartitionPlan {
+            planned_partitions: vec![Arc::from(splits)],
+            partition_fetches: vec![Some(fetch)],
+        };
+    }
+
+    let prefix_partition_count = prefix_splits
+        .len()
+        .min(target_partitions.saturating_sub(1).max(1));
+    let split_buckets = round_robin_buckets(prefix_splits, 
prefix_partition_count);
+    let row_buckets = round_robin_buckets(prefix_rows, prefix_partition_count);
+
+    let mut planned_partitions: Vec<Arc<[DataSplit]>> =
+        split_buckets.into_iter().map(Arc::from).collect();
+    let mut partition_fetches: Vec<Option<usize>> = row_buckets
+        .into_iter()
+        .map(|rows| Some(rows.into_iter().sum()))
+        .collect();
+
+    planned_partitions.push(Arc::from(splits[tail_start..].to_vec()));
+    partition_fetches.push(Some(tail_fetch));
+
+    PartitionPlan {
+        planned_partitions,
+        partition_fetches,
+    }
+}
+
+pub(crate) fn build_partition_plan(
+    full_splits: &[DataSplit],
+    target_partitions: usize,
+    limit_hint: Option<usize>,
+    fetch: Option<usize>,
+) -> PartitionPlan {
+    if let Some(fetch) = fetch {
+        let selected_splits = 
prune_splits_by_limit_hint(full_splits.iter().cloned(), Some(fetch));

Review Comment:
   I did not switch this to `read_builder.with_limit(...)` here because the 
provider stage needs to retain the full split plan for later physical 
`with_fetch()` rebuilds. For `OFFSET + LIMIT`, DataFusion may push `skip + 
fetch`, so narrowing the provider scan too early would lose the extra skip 
budget. I added a short comment here to make that intent explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to