JingsongLi commented on code in PR #224:
URL: https://github.com/apache/paimon-rust/pull/224#discussion_r3044089366


##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -16,75 +16,307 @@
 // under the License.
 
 use std::any::Any;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::common::stats::Precision;
 use datafusion::common::Statistics;
 use datafusion::error::Result as DFResult;
 use datafusion::execution::{SendableRecordBatchStream, TaskContext};
 use datafusion::physical_expr::EquivalenceProperties;
 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
-use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, 
PlanProperties};
-use futures::{StreamExt, TryStreamExt};
+use datafusion::physical_plan::{
+    DisplayAs, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
+};
+use futures::{Stream, StreamExt, TryStreamExt};
 use paimon::spec::Predicate;
-use paimon::table::Table;
+use paimon::table::{prune_splits_by_limit_hint, Table};
 use paimon::DataSplit;
 
 use crate::error::to_datafusion_error;
+use crate::filter_pushdown::ScanFetchMode;
+
+#[derive(Debug)]
+pub(crate) struct PartitionPlan {
+    planned_partitions: Vec<Arc<[DataSplit]>>,
+    partition_fetches: Vec<Option<usize>>,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct PaimonTableScanSpec {
+    pub table: Table,
+    pub projected_columns: Option<Vec<String>>,
+    pub pushed_predicate: Option<Predicate>,
+    pub full_splits: Arc<[DataSplit]>,
+    pub target_partitions: usize,
+    pub scan_fetch_mode: ScanFetchMode,
+    pub limit_hint: Option<usize>,
+    pub fetch: Option<usize>,
+}
+
+fn round_robin_buckets<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
+    let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_| 
Vec::new()).collect();
+    for (index, item) in items.into_iter().enumerate() {
+        buckets[index % num_buckets].push(item);
+    }
+    buckets
+}
+
+fn empty_partition_plan() -> PartitionPlan {
+    PartitionPlan {
+        planned_partitions: vec![Arc::from(Vec::new())],
+        partition_fetches: vec![None],
+    }
+}
+
+fn build_round_robin_plan(splits: Vec<DataSplit>, target_partitions: usize) -> 
PartitionPlan {
+    if splits.is_empty() {
+        return empty_partition_plan();
+    }
+
+    let num_partitions = splits.len().min(target_partitions.max(1));
+    let buckets = round_robin_buckets(splits, num_partitions);
+
+    PartitionPlan {
+        planned_partitions: buckets.into_iter().map(Arc::from).collect(),
+        partition_fetches: vec![None; num_partitions],
+    }
+}
+
+fn build_exact_fetch_plan(
+    splits: Vec<DataSplit>,
+    target_partitions: usize,
+    fetch: usize,
+) -> PartitionPlan {
+    if splits.is_empty() {
+        return empty_partition_plan();
+    }
+
+    if target_partitions <= 1 {
+        return PartitionPlan {
+            planned_partitions: vec![Arc::from(splits)],
+            partition_fetches: vec![Some(fetch)],
+        };
+    }
+
+    let mut prefix_splits = Vec::new();
+    let mut prefix_rows = Vec::new();
+    let mut remaining = fetch;
+    let mut tail_start = None;
+
+    for (index, split) in splits.iter().enumerate() {
+        match split.merged_row_count() {
+            Some(count) if count >= 0 && (count as usize) <= remaining => {
+                prefix_splits.push(split.clone());
+                prefix_rows.push(count as usize);
+                remaining -= count as usize;
+                if remaining == 0 {
+                    break;
+                }
+            }
+            _ => {
+                tail_start = Some(index);
+                break;
+            }
+        }
+    }
+
+    if tail_start.is_none() {
+        let num_partitions = prefix_splits.len().min(target_partitions.max(1));
+        let split_buckets = round_robin_buckets(prefix_splits, num_partitions);
+        let row_buckets = round_robin_buckets(prefix_rows, num_partitions);
+
+        return PartitionPlan {
+            planned_partitions: 
split_buckets.into_iter().map(Arc::from).collect(),
+            partition_fetches: row_buckets
+                .into_iter()
+                .map(|rows| Some(rows.into_iter().sum()))
+                .collect(),
+        };
+    }
+
+    let tail_start = tail_start.expect("tail_start checked above");

Review Comment:
   Just unwrap()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to