Dandandan commented on code in PR #14411: URL: https://github.com/apache/datafusion/pull/14411#discussion_r1956072347
########## datafusion/physical-plan/src/repartition/on_demand_repartition.rs: ########## @@ -0,0 +1,1589 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file implements the [`OnDemandRepartitionExec`] operator, which maps N input +//! partitions to M output partitions based on a partitioning scheme, optionally +//! maintaining the order of the input rows in the output. The operator is similar to the [`RepartitionExec`] +//! operator, but it doesn't distribute the data to the output streams until the downstreams request the data. +//! +//! [`RepartitionExec`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::{any::Any, vec}; + +use super::distributor_channels::{on_demand_partition_aware_channels, tokio_channels}; +use super::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use super::{ + DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream, + RepartitionExecBase, SendableRecordBatchStream, +}; +use crate::common::SharedMemoryReservation; +use crate::execution_plan::CardinalityEffect; +use crate::metrics::{self, BaselineMetrics, MetricBuilder}; +use crate::projection::{all_columns, make_with_child, ProjectionExec}; +use crate::sorts::streaming_merge::StreamingMergeBuilder; +use crate::stream::RecordBatchStreamAdapter; +use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use async_channel::{Receiver, Sender, TrySendError}; + +use datafusion_common::utils::transpose; +use datafusion_common::DataFusionError; +use datafusion_common::{internal_datafusion_err, internal_err, Result}; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::memory_pool::MemoryConsumer; +use datafusion_execution::TaskContext; + +use datafusion_common::HashMap; +use futures::stream::Stream; +use futures::{ready, StreamExt, TryStreamExt}; +use log::trace; +use parking_lot::Mutex; + +type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>); + +/// The OnDemandRepartitionExec operator repartitions the input data based on a push-based model. +/// It is similar to the RepartitionExec operator, but it doesn't distribute the data to the output +/// partitions until the output partitions request the data. +/// +/// When polling, the operator sends the output partition number to the one partition channel, then the prefetch buffer will distribute the data based on the order of the partition number. +/// Each input steams has a prefetch buffer(channel) to distribute the data to the output partitions. +/// +/// The following diagram illustrates the data flow of the OnDemandRepartitionExec operator with 3 output partitions for the input stream 1: +/// ```text +/// /\ /\ /\ +/// ││ ││ ││ +/// ││ ││ ││ +/// ││ ││ ││ +/// ┌───────┴┴────────┐ ┌───────┴┴────────┐ ┌───────┴┴────────┐ +/// │ Stream │ │ Stream │ │ Stream │ +/// │ (1) │ │ (2) │ │ (3) │ +/// └────────┬────────┘ └───────┬─────────┘ └────────┬────────┘ +/// │ │ │ / \ +/// │ │ │ | | +/// │ │ │ | | +/// └────────────────┐ │ ┌──────────────────┘ | | +/// │ │ │ | | +/// ▼ ▼ ▼ | | +/// ┌─────────────────┐ | | +/// Send the partition │ partion channel │ | | +/// number when polling │ │ | | +/// └────────┬────────┘ | | +/// │ | | +/// │ | | +/// │ Get the partition number | | +/// ▼ then send data | | +/// ┌─────────────────┐ | | +/// │ Prefetch Buffer │───────────────────┘ | +/// │ (1) │─────────────────────┘ +/// └─────────────────┘ Distribute data to the output partitions +/// +/// ``` +type OnDemandDistributionSender = tokio::sync::mpsc::UnboundedSender<MaybeBatch>; +type OnDemandDistributionReceiver = tokio::sync::mpsc::UnboundedReceiver<MaybeBatch>; + +type OnDemandInputPartitionsToCurrentPartitionSender = Vec<OnDemandDistributionSender>; +type OnDemandInputPartitionsToCurrentPartitionReceiver = + Vec<OnDemandDistributionReceiver>; +/// Inner state of [`OnDemandRepartitionExec`]. +#[derive(Debug)] +struct OnDemandRepartitionExecState { + /// Channels for sending batches from input partitions to output partitions. + /// Key is the partition number. + channels: HashMap< Review Comment: Is a `HashMap` needed here? It could be a `Vec`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org