QuakeWang commented on code in PR #224:
URL: https://github.com/apache/paimon-rust/pull/224#discussion_r3045590275
##########
crates/integrations/datafusion/src/physical_plan/scan.rs:
##########
@@ -16,75 +16,307 @@
// under the License.
use std::any::Any;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::{Context, Poll};
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
+use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::stats::Precision;
use datafusion::common::Statistics;
use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
-use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning,
PlanProperties};
-use futures::{StreamExt, TryStreamExt};
+use datafusion::physical_plan::{
+ DisplayAs, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream,
+};
+use futures::{Stream, StreamExt, TryStreamExt};
use paimon::spec::Predicate;
-use paimon::table::Table;
+use paimon::table::{prune_splits_by_limit_hint, Table};
use paimon::DataSplit;
use crate::error::to_datafusion_error;
+use crate::filter_pushdown::ScanFetchMode;
+
+#[derive(Debug)]
+pub(crate) struct PartitionPlan {
+ planned_partitions: Vec<Arc<[DataSplit]>>,
+ partition_fetches: Vec<Option<usize>>,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct PaimonTableScanSpec {
+ pub table: Table,
+ pub projected_columns: Option<Vec<String>>,
+ pub pushed_predicate: Option<Predicate>,
+ pub full_splits: Arc<[DataSplit]>,
+ pub target_partitions: usize,
+ pub scan_fetch_mode: ScanFetchMode,
+ pub limit_hint: Option<usize>,
+ pub fetch: Option<usize>,
+}
+
+fn round_robin_buckets<T>(items: Vec<T>, num_buckets: usize) -> Vec<Vec<T>> {
+ let mut buckets: Vec<Vec<T>> = (0..num_buckets).map(|_|
Vec::new()).collect();
+ for (index, item) in items.into_iter().enumerate() {
+ buckets[index % num_buckets].push(item);
+ }
+ buckets
+}
+
+fn empty_partition_plan() -> PartitionPlan {
+ PartitionPlan {
+ planned_partitions: vec![Arc::from(Vec::new())],
+ partition_fetches: vec![None],
+ }
+}
+
+fn build_round_robin_plan(splits: Vec<DataSplit>, target_partitions: usize) ->
PartitionPlan {
+ if splits.is_empty() {
+ return empty_partition_plan();
+ }
+
+ let num_partitions = splits.len().min(target_partitions.max(1));
+ let buckets = round_robin_buckets(splits, num_partitions);
+
+ PartitionPlan {
+ planned_partitions: buckets.into_iter().map(Arc::from).collect(),
+ partition_fetches: vec![None; num_partitions],
+ }
+}
+
+fn build_exact_fetch_plan(
+ splits: Vec<DataSplit>,
+ target_partitions: usize,
+ fetch: usize,
+) -> PartitionPlan {
+ if splits.is_empty() {
+ return empty_partition_plan();
+ }
+
+ if target_partitions <= 1 {
+ return PartitionPlan {
+ planned_partitions: vec![Arc::from(splits)],
+ partition_fetches: vec![Some(fetch)],
+ };
+ }
+
+ let mut prefix_splits = Vec::new();
+ let mut prefix_rows = Vec::new();
+ let mut remaining = fetch;
+ let mut tail_start = None;
+
+ for (index, split) in splits.iter().enumerate() {
+ match split.merged_row_count() {
+ Some(count) if count >= 0 && (count as usize) <= remaining => {
+ prefix_splits.push(split.clone());
+ prefix_rows.push(count as usize);
+ remaining -= count as usize;
+ if remaining == 0 {
+ break;
+ }
+ }
+ _ => {
+ tail_start = Some(index);
+ break;
+ }
+ }
+ }
+
+ if tail_start.is_none() {
+ let num_partitions = prefix_splits.len().min(target_partitions.max(1));
+ let split_buckets = round_robin_buckets(prefix_splits, num_partitions);
+ let row_buckets = round_robin_buckets(prefix_rows, num_partitions);
+
+ return PartitionPlan {
+ planned_partitions:
split_buckets.into_iter().map(Arc::from).collect(),
+ partition_fetches: row_buckets
+ .into_iter()
+ .map(|rows| Some(rows.into_iter().sum()))
+ .collect(),
+ };
+ }
+
+ let tail_start = tail_start.expect("tail_start checked above");
+ let tail_fetch = remaining;
+
+ if prefix_splits.is_empty() {
+ return PartitionPlan {
+ planned_partitions: vec![Arc::from(splits)],
+ partition_fetches: vec![Some(fetch)],
+ };
+ }
+
+ let prefix_partition_count = prefix_splits
+ .len()
+ .min(target_partitions.saturating_sub(1).max(1));
+ let split_buckets = round_robin_buckets(prefix_splits,
prefix_partition_count);
+ let row_buckets = round_robin_buckets(prefix_rows, prefix_partition_count);
+
+ let mut planned_partitions: Vec<Arc<[DataSplit]>> =
+ split_buckets.into_iter().map(Arc::from).collect();
+ let mut partition_fetches: Vec<Option<usize>> = row_buckets
+ .into_iter()
+ .map(|rows| Some(rows.into_iter().sum()))
+ .collect();
+
+ planned_partitions.push(Arc::from(splits[tail_start..].to_vec()));
+ partition_fetches.push(Some(tail_fetch));
+
+ PartitionPlan {
+ planned_partitions,
+ partition_fetches,
+ }
+}
+
+pub(crate) fn build_partition_plan(
+ full_splits: &[DataSplit],
+ target_partitions: usize,
+ limit_hint: Option<usize>,
+ fetch: Option<usize>,
+) -> PartitionPlan {
+ if let Some(fetch) = fetch {
+ let selected_splits =
prune_splits_by_limit_hint(full_splits.iter().cloned(), Some(fetch));
Review Comment:
I think this is a reasonable direction longer-term, and that is also why I
moved the shared split-pruning rule into `table_scan` as
`prune_splits_by_limit_hint(...)`.
For this PR, I kept the full physical fetch contract in the DataFusion layer
because it also needs DataFusion-specific behavior (`with_fetch()`,
repartitioned scan planning, per-partition fetch budgets, and exact stream
truncation). Moving all of that into `table_scan` would expand the scope beyond
issue #220.
If we later want a cleaner core abstraction for fetch-aware split planning,
I’d be happy to follow up on that separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]