mertak-synnada commented on code in PR #14411:
URL: https://github.com/apache/datafusion/pull/14411#discussion_r1946078413


##########
datafusion/physical-plan/src/repartition/on_demand_repartition.rs:
##########
@@ -0,0 +1,1362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements the [`OnDemandRepartitionExec`]  operator, which maps 
N input
+//! partitions to M output partitions based on a partitioning scheme, 
optionally
+//! maintaining the order of the input rows in the output. The operator is 
similar to the [`RepartitionExec`]
+//! operator, but it doesn't distribute the data to the output streams until 
the downstreams request the data.
+//!
+//! [`RepartitionExec`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::{
+    DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream,
+    RepartitionExecBase, SendableRecordBatchStream,
+};
+use crate::common::SharedMemoryReservation;
+use crate::execution_plan::CardinalityEffect;
+use crate::metrics::{self, BaselineMetrics, MetricBuilder};
+use crate::projection::{all_columns, make_with_child, ProjectionExec};
+use crate::repartition::distributor_channels::{
+    DistributionReceiver, DistributionSender,
+};
+use crate::repartition::RepartitionExecStateBuilder;
+use crate::sorts::streaming_merge::StreamingMergeBuilder;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
Statistics};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_channel::{Receiver, Sender};
+
+use datafusion_common::{internal_datafusion_err, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+
+use datafusion_common::HashMap;
+use futures::stream::Stream;
+use futures::{ready, FutureExt, StreamExt, TryStreamExt};
+use log::trace;
+use parking_lot::Mutex;
+
+type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
+
+/// The OnDemandRepartitionExec operator repartitions the input data based on 
a push-based model.
+/// It is similar to the RepartitionExec operator, but it doesn't distribute 
the data to the output
+/// partitions until the output partitions request the data.
+///
+/// When polling, the operator sends the output partition number to the one 
partition channel, then the prefetch buffer will distribute the data based on 
the order of the partition number.
+/// Each input steams has a prefetch buffer(channel) to distribute the data to 
the output partitions.
+///
+/// The following diagram illustrates the data flow of the 
OnDemandRepartitionExec operator with 3 output partitions for the input stream 
1:
+/// ```text
+///         /\                     /\                     /\
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
+/// │     Stream      │    │     Stream      │    │     Stream      │
+/// │       (1)       │    │       (2)       │    │       (3)       │
+/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
+///          │                     │                       │    / \
+///          │                     │                       │    | |
+///          │                     │                       │    | |
+///          └────────────────┐    │    ┌──────────────────┘    | |
+///                           │    │    │                       | |
+///                           ▼    ▼    ▼                       | |
+///                       ┌─────────────────┐                   | |
+///  Send the partition   │ partion channel │                   | |
+///  number when polling  │                 │                   | |
+///                       └────────┬────────┘                   | |
+///                                │                            | |
+///                                │                            | |
+///                                │  Get the partition number  | |
+///                                ▼  then send data            | |
+///                       ┌─────────────────┐                   | |
+///                       │ Prefetch Buffer │───────────────────┘ |
+///                       │       (1)       │─────────────────────┘
+///                       └─────────────────┘ Distribute data to the output 
partitions
+///
+/// ```text
+
+#[derive(Debug, Clone)]
+pub struct OnDemandRepartitionExec {
+    base: RepartitionExecBase,
+    /// Channel to send partition number to the downstream task
+    partition_channels: Arc<tokio::sync::OnceCell<Mutex<PartitionChannels>>>,
+}
+
+impl OnDemandRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.base.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.base.cache.partitioning
+    }
+
+    /// Get preserve_order flag of the RepartitionExecutor
+    /// `true` means `SortPreservingRepartitionExec`, `false` means 
`OnDemandRepartitionExec`
+    pub fn preserve_order(&self) -> bool {
+        self.base.preserve_order
+    }
+
+    /// Specify if this reparititoning operation should preserve the order of
+    /// rows from its input when producing output. Preserving order is more
+    /// expensive at runtime, so should only be set if the output of this
+    /// operator can take advantage of it.
+    ///
+    /// If the input is not ordered, or has only one partition, this is a no 
op,
+    /// and the node remains a `OnDemandRepartitionExec`.
+    pub fn with_preserve_order(mut self) -> Self {
+        self.base = self.base.with_preserve_order();
+        self
+    }
+
+    /// Get name used to display this Exec
+    pub fn name(&self) -> &str {
+        "OnDemandRepartitionExec"
+    }
+}
+
+impl DisplayAs for OnDemandRepartitionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "{}: partitioning={}, input_partitions={}",
+                    self.name(),
+                    self.partitioning(),
+                    self.base.input.output_partitioning().partition_count()
+                )?;
+
+                if self.base.preserve_order {
+                    write!(f, ", preserve_order=true")?;
+                }
+
+                if let Some(sort_exprs) = self.base.sort_exprs() {
+                    write!(f, ", sort_exprs={}", sort_exprs.clone())?;
+                }
+                Ok(())
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for OnDemandRepartitionExec {
+    fn name(&self) -> &'static str {
+        "OnDemandRepartitionExec"
+    }
+
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.base.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.base.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mut repartition = OnDemandRepartitionExec::try_new(
+            children.swap_remove(0),
+            self.partitioning().clone(),
+        )?;
+        if self.base.preserve_order {
+            repartition = repartition.with_preserve_order();
+        }
+        Ok(Arc::new(repartition))
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        RepartitionExecBase::maintains_input_order_helper(
+            self.input(),
+            self.base.preserve_order,
+        )
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start {}::execute for partition: {}",
+            self.name(),
+            partition
+        );
+
+        let lazy_state = Arc::clone(&self.base.state);
+        let partition_channels = Arc::clone(&self.partition_channels);
+        let input = Arc::clone(&self.base.input);
+        let partitioning = self.partitioning().clone();
+        let metrics = self.base.metrics.clone();
+        let preserve_order = self.base.preserve_order;
+        let name = self.name().to_owned();
+        let schema = self.schema();
+        let schema_captured = Arc::clone(&schema);
+
+        // Get existing ordering to use for merging
+        let sort_exprs = self.base.sort_exprs().cloned().unwrap_or_default();
+
+        let stream = futures::stream::once(async move {
+            let num_input_partitions = 
input.output_partitioning().partition_count();
+            let input_captured = Arc::clone(&input);
+            let metrics_captured = metrics.clone();
+            let name_captured = name.clone();
+            let context_captured = Arc::clone(&context);
+            let partition_channels = partition_channels
+                .get_or_init(|| async move {
+                    let (txs, rxs) = if preserve_order {
+                        (0..num_input_partitions)
+                            .map(|_| async_channel::unbounded())
+                            .unzip::<_, _, Vec<_>, Vec<_>>()
+                    } else {
+                        let (tx, rx) = async_channel::unbounded();
+                        (vec![tx], vec![rx])
+                    };
+                    Mutex::new((txs, rxs))
+                })
+                .await;
+            let (partition_txs, partition_rxs) = {
+                let channel = partition_channels.lock();
+                (channel.0.clone(), channel.1.clone())
+            };
+
+            let state = lazy_state
+                .get_or_init(|| async move {
+                    Mutex::new(
+                        RepartitionExecStateBuilder::new()
+                            .enable_pull_based(true)
+                            .partition_receivers(partition_rxs.clone())
+                            .build(
+                                input_captured,
+                                partitioning.clone(),
+                                metrics_captured,
+                                preserve_order,
+                                name_captured,
+                                context_captured,
+                            ),
+                    )
+                })
+                .await;
+
+            // lock scope
+            let (mut rx, reservation, abort_helper) = {
+                // lock mutexes
+                let mut state = state.lock();
+
+                // now return stream for the specified *output* partition 
which will
+                // read from the channel
+                let (_tx, rx, reservation) = state
+                    .channels
+                    .remove(&partition)
+                    .expect("partition not used yet");
+
+                (rx, reservation, Arc::clone(&state.abort_helper))
+            };
+
+            trace!(
+                "Before returning stream in {}::execute for partition: {}",
+                name,
+                partition
+            );
+
+            if preserve_order {
+                // Store streams from all the input partitions:
+                let input_streams = rx
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, receiver)| {
+                        // sender should be partition-wise
+                        Box::pin(OnDemandPerPartitionStream {
+                            schema: Arc::clone(&schema_captured),
+                            receiver,
+                            _drop_helper: Arc::clone(&abort_helper),
+                            reservation: Arc::clone(&reservation),
+                            sender: partition_txs[i].clone(),
+                            partition,
+                            is_requested: false,
+                        }) as SendableRecordBatchStream
+                    })
+                    .collect::<Vec<_>>();
+                // Note that receiver size (`rx.len()`) and 
`num_input_partitions` are same.
+
+                // Merge streams (while preserving ordering) coming from
+                // input partitions to this partition:
+                let fetch = None;
+                let merge_reservation =
+                    MemoryConsumer::new(format!("{}[Merge {partition}]", name))
+                        .register(context.memory_pool());
+                StreamingMergeBuilder::new()
+                    .with_streams(input_streams)
+                    .with_schema(schema_captured)
+                    .with_expressions(&sort_exprs)
+                    .with_metrics(BaselineMetrics::new(&metrics, partition))
+                    .with_batch_size(context.session_config().batch_size())
+                    .with_fetch(fetch)
+                    .with_reservation(merge_reservation)
+                    .build()
+            } else {
+                Ok(Box::pin(OnDemandRepartitionStream {
+                    num_input_partitions,
+                    num_input_partitions_processed: 0,
+                    schema: input.schema(),
+                    input: rx.swap_remove(0),
+                    _drop_helper: abort_helper,
+                    reservation,
+                    sender: partition_txs[0].clone(),
+                    partition,
+                    is_requested: false,
+                }) as SendableRecordBatchStream)
+            }
+        })
+        .try_flatten();
+        let stream = RecordBatchStreamAdapter::new(schema, stream);
+        Ok(Box::pin(stream))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.base.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.base.input.statistics()
+    }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
+
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // If the projection does not narrow the schema, we should not try to 
push it down.
+        if projection.expr().len() >= 
projection.input().schema().fields().len() {
+            return Ok(None);
+        }
+
+        // If pushdown is not beneficial or applicable, break it.
+        if projection.benefits_from_input_partitioning()[0]
+            || !all_columns(projection.expr())
+        {
+            return Ok(None);
+        }
+
+        let new_projection = make_with_child(projection, self.input())?;
+
+        Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
+            new_projection,
+            self.partitioning().clone(),
+        )?)))
+    }
+}
+
+impl OnDemandRepartitionExec {
+    /// Create a new RepartitionExec, that produces output `partitioning`, and
+    /// does not preserve the order of the input (see 
[`Self::with_preserve_order`]
+    /// for more details)
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        partitioning: Partitioning,
+    ) -> Result<Self> {
+        let preserve_order = false;
+        let cache = RepartitionExecBase::compute_properties(
+            &input,
+            partitioning.clone(),
+            preserve_order,
+        );
+        Ok(OnDemandRepartitionExec {
+            base: RepartitionExecBase {
+                input,
+                state: Default::default(),
+                metrics: ExecutionPlanMetricsSet::new(),
+                preserve_order,
+                cache,
+            },
+            partition_channels: Default::default(),
+        })
+    }
+
+    async fn process_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        buffer_tx: Sender<RecordBatch>,
+        context: Arc<TaskContext>,
+        fetch_time: metrics::Time,
+        send_buffer_time: metrics::Time,
+    ) -> Result<()> {
+        let timer = fetch_time.timer();
+        let mut stream = input.execute(partition, context).map_err(|e| {
+            internal_datafusion_err!(
+                "Error executing input partition {} for on demand 
repartitioning: {}",
+                partition,
+                e
+            )
+        })?;
+        timer.done();
+
+        loop {
+            let timer = fetch_time.timer();
+            let batch = stream.next().await;
+            timer.done();
+
+            // send the batch to the buffer channel
+            if let Some(batch) = batch {
+                let timer = send_buffer_time.timer();
+                buffer_tx.send(batch?).await.map_err(|e| {
+                    internal_datafusion_err!(
+                        "Error sending batch to buffer channel for partition 
{}: {}",
+                        partition,
+                        e
+                    )
+                })?;
+                timer.done();
+            } else {
+                break;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Pulls data from the specified input plan, feeding it to the
+    /// output partitions based on the desired partitioning
+    ///
+    /// txs hold the output sending channels for each output partition
+    pub(crate) async fn pull_from_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        mut output_channels: HashMap<
+            usize,
+            (DistributionSender<MaybeBatch>, SharedMemoryReservation),
+        >,
+        partitioning: Partitioning,
+        output_partition_rx: Receiver<usize>,
+        metrics: OnDemandRepartitionMetrics,
+        context: Arc<TaskContext>,
+    ) -> Result<()> {
+        // execute the child operator in a separate task
+        let (buffer_tx, buffer_rx) = async_channel::bounded::<RecordBatch>(2);
+        let processing_task = SpawnedTask::spawn(Self::process_input(
+            Arc::clone(&input),
+            partition,
+            buffer_tx,
+            Arc::clone(&context),
+            metrics.fetch_time.clone(),
+            metrics.send_buffer_time.clone(),
+        ));
+
+        // While there are still outputs to send to, keep pulling inputs
+        let mut batches_until_yield = partitioning.partition_count();
+        while !output_channels.is_empty() {
+            // When the input is done, break the loop
+            let batch = match buffer_rx.recv().await {
+                Ok(batch) => batch,
+                _ => break,
+            };
+
+            // Get the partition number from the output partition
+            let partition = output_partition_rx.recv().await.map_err(|e| {
+                internal_datafusion_err!(
+                    "Error receiving partition number from output partition: 
{}",
+                    e
+                )
+            })?;

Review Comment:
   ```suggestion
           // When the input is done, break the loop
           while !output_channels.is_empty() {
              // Fetch the batch from the buffer, ideally this should reduce 
the time gap between the requester and the input stream
               let batch = match buffer_rx.recv().await {
                   Ok(batch) => batch,
                   _ => break,
               };
   
               // Wait until a partition is requested, then get the output 
partition information 
               let partition = output_partition_rx.recv().await.map_err(|e| {
                   internal_datafusion_err!(
                       "Error receiving partition number from output partition: 
{}",
                       e
                   )
               })?;
   ```



##########
datafusion/physical-plan/src/repartition/on_demand_repartition.rs:
##########
@@ -0,0 +1,1362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements the [`OnDemandRepartitionExec`]  operator, which maps 
N input
+//! partitions to M output partitions based on a partitioning scheme, 
optionally
+//! maintaining the order of the input rows in the output. The operator is 
similar to the [`RepartitionExec`]
+//! operator, but it doesn't distribute the data to the output streams until 
the downstreams request the data.
+//!
+//! [`RepartitionExec`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::{
+    DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream,
+    RepartitionExecBase, SendableRecordBatchStream,
+};
+use crate::common::SharedMemoryReservation;
+use crate::execution_plan::CardinalityEffect;
+use crate::metrics::{self, BaselineMetrics, MetricBuilder};
+use crate::projection::{all_columns, make_with_child, ProjectionExec};
+use crate::repartition::distributor_channels::{
+    DistributionReceiver, DistributionSender,
+};
+use crate::repartition::RepartitionExecStateBuilder;
+use crate::sorts::streaming_merge::StreamingMergeBuilder;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
Statistics};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_channel::{Receiver, Sender};
+
+use datafusion_common::{internal_datafusion_err, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+
+use datafusion_common::HashMap;
+use futures::stream::Stream;
+use futures::{ready, FutureExt, StreamExt, TryStreamExt};
+use log::trace;
+use parking_lot::Mutex;
+
+type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
+
+/// The OnDemandRepartitionExec operator repartitions the input data based on 
a push-based model.
+/// It is similar to the RepartitionExec operator, but it doesn't distribute 
the data to the output
+/// partitions until the output partitions request the data.
+///
+/// When polling, the operator sends the output partition number to the one 
partition channel, then the prefetch buffer will distribute the data based on 
the order of the partition number.
+/// Each input steams has a prefetch buffer(channel) to distribute the data to 
the output partitions.
+///
+/// The following diagram illustrates the data flow of the 
OnDemandRepartitionExec operator with 3 output partitions for the input stream 
1:
+/// ```text
+///         /\                     /\                     /\
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
+/// │     Stream      │    │     Stream      │    │     Stream      │
+/// │       (1)       │    │       (2)       │    │       (3)       │
+/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
+///          │                     │                       │    / \
+///          │                     │                       │    | |
+///          │                     │                       │    | |
+///          └────────────────┐    │    ┌──────────────────┘    | |
+///                           │    │    │                       | |
+///                           ▼    ▼    ▼                       | |
+///                       ┌─────────────────┐                   | |
+///  Send the partition   │ partion channel │                   | |
+///  number when polling  │                 │                   | |
+///                       └────────┬────────┘                   | |
+///                                │                            | |
+///                                │                            | |
+///                                │  Get the partition number  | |
+///                                ▼  then send data            | |
+///                       ┌─────────────────┐                   | |
+///                       │ Prefetch Buffer │───────────────────┘ |
+///                       │       (1)       │─────────────────────┘
+///                       └─────────────────┘ Distribute data to the output 
partitions
+///
+/// ```text
+
+#[derive(Debug, Clone)]
+pub struct OnDemandRepartitionExec {
+    base: RepartitionExecBase,
+    /// Channel to send partition number to the downstream task
+    partition_channels: Arc<tokio::sync::OnceCell<Mutex<PartitionChannels>>>,
+}
+
+impl OnDemandRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.base.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.base.cache.partitioning
+    }
+
+    /// Get preserve_order flag of the RepartitionExecutor
+    /// `true` means `SortPreservingRepartitionExec`, `false` means 
`OnDemandRepartitionExec`
+    pub fn preserve_order(&self) -> bool {
+        self.base.preserve_order
+    }
+
+    /// Specify if this reparititoning operation should preserve the order of
+    /// rows from its input when producing output. Preserving order is more
+    /// expensive at runtime, so should only be set if the output of this
+    /// operator can take advantage of it.
+    ///
+    /// If the input is not ordered, or has only one partition, this is a no 
op,
+    /// and the node remains a `OnDemandRepartitionExec`.
+    pub fn with_preserve_order(mut self) -> Self {
+        self.base = self.base.with_preserve_order();
+        self
+    }
+
+    /// Get name used to display this Exec
+    pub fn name(&self) -> &str {
+        "OnDemandRepartitionExec"
+    }
+}
+
+impl DisplayAs for OnDemandRepartitionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "{}: partitioning={}, input_partitions={}",
+                    self.name(),
+                    self.partitioning(),
+                    self.base.input.output_partitioning().partition_count()
+                )?;
+
+                if self.base.preserve_order {
+                    write!(f, ", preserve_order=true")?;
+                }
+
+                if let Some(sort_exprs) = self.base.sort_exprs() {
+                    write!(f, ", sort_exprs={}", sort_exprs.clone())?;
+                }
+                Ok(())
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for OnDemandRepartitionExec {
+    fn name(&self) -> &'static str {
+        "OnDemandRepartitionExec"
+    }
+
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.base.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.base.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mut repartition = OnDemandRepartitionExec::try_new(
+            children.swap_remove(0),
+            self.partitioning().clone(),
+        )?;
+        if self.base.preserve_order {
+            repartition = repartition.with_preserve_order();
+        }
+        Ok(Arc::new(repartition))
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        RepartitionExecBase::maintains_input_order_helper(
+            self.input(),
+            self.base.preserve_order,
+        )
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start {}::execute for partition: {}",
+            self.name(),
+            partition
+        );
+
+        let lazy_state = Arc::clone(&self.base.state);
+        let partition_channels = Arc::clone(&self.partition_channels);
+        let input = Arc::clone(&self.base.input);
+        let partitioning = self.partitioning().clone();
+        let metrics = self.base.metrics.clone();
+        let preserve_order = self.base.preserve_order;
+        let name = self.name().to_owned();
+        let schema = self.schema();
+        let schema_captured = Arc::clone(&schema);
+
+        // Get existing ordering to use for merging
+        let sort_exprs = self.base.sort_exprs().cloned().unwrap_or_default();
+
+        let stream = futures::stream::once(async move {
+            let num_input_partitions = 
input.output_partitioning().partition_count();
+            let input_captured = Arc::clone(&input);
+            let metrics_captured = metrics.clone();
+            let name_captured = name.clone();
+            let context_captured = Arc::clone(&context);
+            let partition_channels = partition_channels
+                .get_or_init(|| async move {
+                    let (txs, rxs) = if preserve_order {
+                        (0..num_input_partitions)
+                            .map(|_| async_channel::unbounded())
+                            .unzip::<_, _, Vec<_>, Vec<_>>()
+                    } else {
+                        let (tx, rx) = async_channel::unbounded();
+                        (vec![tx], vec![rx])
+                    };
+                    Mutex::new((txs, rxs))
+                })
+                .await;
+            let (partition_txs, partition_rxs) = {
+                let channel = partition_channels.lock();
+                (channel.0.clone(), channel.1.clone())
+            };
+
+            let state = lazy_state
+                .get_or_init(|| async move {
+                    Mutex::new(
+                        RepartitionExecStateBuilder::new()
+                            .enable_pull_based(true)
+                            .partition_receivers(partition_rxs.clone())
+                            .build(
+                                input_captured,
+                                partitioning.clone(),
+                                metrics_captured,
+                                preserve_order,
+                                name_captured,
+                                context_captured,
+                            ),
+                    )
+                })
+                .await;
+
+            // lock scope
+            let (mut rx, reservation, abort_helper) = {
+                // lock mutexes
+                let mut state = state.lock();
+
+                // now return stream for the specified *output* partition 
which will
+                // read from the channel
+                let (_tx, rx, reservation) = state
+                    .channels
+                    .remove(&partition)
+                    .expect("partition not used yet");
+
+                (rx, reservation, Arc::clone(&state.abort_helper))
+            };
+
+            trace!(
+                "Before returning stream in {}::execute for partition: {}",
+                name,
+                partition
+            );
+
+            if preserve_order {
+                // Store streams from all the input partitions:
+                let input_streams = rx
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, receiver)| {
+                        // sender should be partition-wise
+                        Box::pin(OnDemandPerPartitionStream {
+                            schema: Arc::clone(&schema_captured),
+                            receiver,
+                            _drop_helper: Arc::clone(&abort_helper),
+                            reservation: Arc::clone(&reservation),
+                            sender: partition_txs[i].clone(),
+                            partition,
+                            is_requested: false,
+                        }) as SendableRecordBatchStream
+                    })
+                    .collect::<Vec<_>>();
+                // Note that receiver size (`rx.len()`) and 
`num_input_partitions` are same.
+
+                // Merge streams (while preserving ordering) coming from
+                // input partitions to this partition:
+                let fetch = None;
+                let merge_reservation =
+                    MemoryConsumer::new(format!("{}[Merge {partition}]", name))
+                        .register(context.memory_pool());
+                StreamingMergeBuilder::new()
+                    .with_streams(input_streams)
+                    .with_schema(schema_captured)
+                    .with_expressions(&sort_exprs)
+                    .with_metrics(BaselineMetrics::new(&metrics, partition))
+                    .with_batch_size(context.session_config().batch_size())
+                    .with_fetch(fetch)
+                    .with_reservation(merge_reservation)
+                    .build()
+            } else {
+                Ok(Box::pin(OnDemandRepartitionStream {
+                    num_input_partitions,
+                    num_input_partitions_processed: 0,
+                    schema: input.schema(),
+                    input: rx.swap_remove(0),
+                    _drop_helper: abort_helper,
+                    reservation,
+                    sender: partition_txs[0].clone(),
+                    partition,
+                    is_requested: false,
+                }) as SendableRecordBatchStream)
+            }
+        })
+        .try_flatten();
+        let stream = RecordBatchStreamAdapter::new(schema, stream);
+        Ok(Box::pin(stream))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.base.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.base.input.statistics()
+    }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
+
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // If the projection does not narrow the schema, we should not try to 
push it down.
+        if projection.expr().len() >= 
projection.input().schema().fields().len() {
+            return Ok(None);
+        }
+
+        // If pushdown is not beneficial or applicable, break it.
+        if projection.benefits_from_input_partitioning()[0]
+            || !all_columns(projection.expr())
+        {
+            return Ok(None);
+        }
+
+        let new_projection = make_with_child(projection, self.input())?;
+
+        Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
+            new_projection,
+            self.partitioning().clone(),
+        )?)))
+    }
+}
+
+impl OnDemandRepartitionExec {
+    /// Create a new RepartitionExec, that produces output `partitioning`, and
+    /// does not preserve the order of the input (see 
[`Self::with_preserve_order`]
+    /// for more details)
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        partitioning: Partitioning,
+    ) -> Result<Self> {
+        let preserve_order = false;
+        let cache = RepartitionExecBase::compute_properties(
+            &input,
+            partitioning.clone(),
+            preserve_order,
+        );
+        Ok(OnDemandRepartitionExec {
+            base: RepartitionExecBase {
+                input,
+                state: Default::default(),
+                metrics: ExecutionPlanMetricsSet::new(),
+                preserve_order,
+                cache,
+            },
+            partition_channels: Default::default(),
+        })
+    }
+
+    async fn process_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        buffer_tx: Sender<RecordBatch>,
+        context: Arc<TaskContext>,
+        fetch_time: metrics::Time,
+        send_buffer_time: metrics::Time,
+    ) -> Result<()> {
+        let timer = fetch_time.timer();
+        let mut stream = input.execute(partition, context).map_err(|e| {
+            internal_datafusion_err!(
+                "Error executing input partition {} for on demand 
repartitioning: {}",
+                partition,
+                e
+            )
+        })?;
+        timer.done();
+
+        loop {
+            let timer = fetch_time.timer();
+            let batch = stream.next().await;
+            timer.done();
+
+            // send the batch to the buffer channel
+            if let Some(batch) = batch {
+                let timer = send_buffer_time.timer();
+                buffer_tx.send(batch?).await.map_err(|e| {
+                    internal_datafusion_err!(
+                        "Error sending batch to buffer channel for partition 
{}: {}",
+                        partition,
+                        e
+                    )
+                })?;
+                timer.done();
+            } else {
+                break;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Pulls data from the specified input plan, feeding it to the
+    /// output partitions based on the desired partitioning
+    ///
+    /// txs hold the output sending channels for each output partition
+    pub(crate) async fn pull_from_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,

Review Comment:
   Maybe renaming this variable as `input_partition` or something else helps on 
readability since we also have the `output_partition` channel



##########
datafusion/physical-plan/src/repartition/on_demand_repartition.rs:
##########
@@ -0,0 +1,1362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements the [`OnDemandRepartitionExec`]  operator, which maps 
N input
+//! partitions to M output partitions based on a partitioning scheme, 
optionally
+//! maintaining the order of the input rows in the output. The operator is 
similar to the [`RepartitionExec`]
+//! operator, but it doesn't distribute the data to the output streams until 
the downstreams request the data.
+//!
+//! [`RepartitionExec`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::{
+    DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream,
+    RepartitionExecBase, SendableRecordBatchStream,
+};
+use crate::common::SharedMemoryReservation;
+use crate::execution_plan::CardinalityEffect;
+use crate::metrics::{self, BaselineMetrics, MetricBuilder};
+use crate::projection::{all_columns, make_with_child, ProjectionExec};
+use crate::repartition::distributor_channels::{
+    DistributionReceiver, DistributionSender,
+};
+use crate::repartition::RepartitionExecStateBuilder;
+use crate::sorts::streaming_merge::StreamingMergeBuilder;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
Statistics};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_channel::{Receiver, Sender};
+
+use datafusion_common::{internal_datafusion_err, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+
+use datafusion_common::HashMap;
+use futures::stream::Stream;
+use futures::{ready, FutureExt, StreamExt, TryStreamExt};
+use log::trace;
+use parking_lot::Mutex;
+
+type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
+
+/// The OnDemandRepartitionExec operator repartitions the input data based on 
a push-based model.
+/// It is similar to the RepartitionExec operator, but it doesn't distribute 
the data to the output
+/// partitions until the output partitions request the data.
+///
+/// When polling, the operator sends the output partition number to the one 
partition channel, then the prefetch buffer will distribute the data based on 
the order of the partition number.
+/// Each input steams has a prefetch buffer(channel) to distribute the data to 
the output partitions.
+///
+/// The following diagram illustrates the data flow of the 
OnDemandRepartitionExec operator with 3 output partitions for the input stream 
1:
+/// ```text
+///         /\                     /\                     /\
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
+/// │     Stream      │    │     Stream      │    │     Stream      │
+/// │       (1)       │    │       (2)       │    │       (3)       │
+/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
+///          │                     │                       │    / \
+///          │                     │                       │    | |
+///          │                     │                       │    | |
+///          └────────────────┐    │    ┌──────────────────┘    | |
+///                           │    │    │                       | |
+///                           ▼    ▼    ▼                       | |
+///                       ┌─────────────────┐                   | |
+///  Send the partition   │ partion channel │                   | |
+///  number when polling  │                 │                   | |
+///                       └────────┬────────┘                   | |
+///                                │                            | |
+///                                │                            | |
+///                                │  Get the partition number  | |
+///                                ▼  then send data            | |
+///                       ┌─────────────────┐                   | |
+///                       │ Prefetch Buffer │───────────────────┘ |
+///                       │       (1)       │─────────────────────┘
+///                       └─────────────────┘ Distribute data to the output 
partitions
+///
+/// ```text
+
+#[derive(Debug, Clone)]
+pub struct OnDemandRepartitionExec {
+    base: RepartitionExecBase,
+    /// Channel to send partition number to the downstream task
+    partition_channels: Arc<tokio::sync::OnceCell<Mutex<PartitionChannels>>>,
+}
+
+impl OnDemandRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.base.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.base.cache.partitioning
+    }
+
+    /// Get preserve_order flag of the RepartitionExecutor
+    /// `true` means `SortPreservingRepartitionExec`, `false` means 
`OnDemandRepartitionExec`
+    pub fn preserve_order(&self) -> bool {
+        self.base.preserve_order
+    }
+
+    /// Specify if this reparititoning operation should preserve the order of
+    /// rows from its input when producing output. Preserving order is more
+    /// expensive at runtime, so should only be set if the output of this
+    /// operator can take advantage of it.
+    ///
+    /// If the input is not ordered, or has only one partition, this is a no 
op,
+    /// and the node remains a `OnDemandRepartitionExec`.
+    pub fn with_preserve_order(mut self) -> Self {
+        self.base = self.base.with_preserve_order();
+        self
+    }
+
+    /// Get name used to display this Exec
+    pub fn name(&self) -> &str {
+        "OnDemandRepartitionExec"
+    }
+}
+
+impl DisplayAs for OnDemandRepartitionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "{}: partitioning={}, input_partitions={}",
+                    self.name(),
+                    self.partitioning(),
+                    self.base.input.output_partitioning().partition_count()
+                )?;
+
+                if self.base.preserve_order {
+                    write!(f, ", preserve_order=true")?;
+                }
+
+                if let Some(sort_exprs) = self.base.sort_exprs() {
+                    write!(f, ", sort_exprs={}", sort_exprs.clone())?;
+                }
+                Ok(())
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for OnDemandRepartitionExec {
+    fn name(&self) -> &'static str {
+        "OnDemandRepartitionExec"
+    }
+
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.base.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.base.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mut repartition = OnDemandRepartitionExec::try_new(
+            children.swap_remove(0),
+            self.partitioning().clone(),
+        )?;
+        if self.base.preserve_order {
+            repartition = repartition.with_preserve_order();
+        }
+        Ok(Arc::new(repartition))
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        RepartitionExecBase::maintains_input_order_helper(
+            self.input(),
+            self.base.preserve_order,
+        )
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start {}::execute for partition: {}",
+            self.name(),
+            partition
+        );
+
+        let lazy_state = Arc::clone(&self.base.state);
+        let partition_channels = Arc::clone(&self.partition_channels);
+        let input = Arc::clone(&self.base.input);
+        let partitioning = self.partitioning().clone();
+        let metrics = self.base.metrics.clone();
+        let preserve_order = self.base.preserve_order;
+        let name = self.name().to_owned();
+        let schema = self.schema();
+        let schema_captured = Arc::clone(&schema);
+
+        // Get existing ordering to use for merging
+        let sort_exprs = self.base.sort_exprs().cloned().unwrap_or_default();
+
+        let stream = futures::stream::once(async move {
+            let num_input_partitions = 
input.output_partitioning().partition_count();
+            let input_captured = Arc::clone(&input);
+            let metrics_captured = metrics.clone();
+            let name_captured = name.clone();
+            let context_captured = Arc::clone(&context);
+            let partition_channels = partition_channels
+                .get_or_init(|| async move {
+                    let (txs, rxs) = if preserve_order {
+                        (0..num_input_partitions)
+                            .map(|_| async_channel::unbounded())
+                            .unzip::<_, _, Vec<_>, Vec<_>>()
+                    } else {
+                        let (tx, rx) = async_channel::unbounded();
+                        (vec![tx], vec![rx])
+                    };
+                    Mutex::new((txs, rxs))
+                })
+                .await;
+            let (partition_txs, partition_rxs) = {
+                let channel = partition_channels.lock();
+                (channel.0.clone(), channel.1.clone())
+            };
+
+            let state = lazy_state
+                .get_or_init(|| async move {
+                    Mutex::new(
+                        RepartitionExecStateBuilder::new()
+                            .enable_pull_based(true)
+                            .partition_receivers(partition_rxs.clone())
+                            .build(
+                                input_captured,
+                                partitioning.clone(),
+                                metrics_captured,
+                                preserve_order,
+                                name_captured,
+                                context_captured,
+                            ),
+                    )
+                })
+                .await;
+
+            // lock scope
+            let (mut rx, reservation, abort_helper) = {
+                // lock mutexes
+                let mut state = state.lock();
+
+                // now return stream for the specified *output* partition 
which will
+                // read from the channel
+                let (_tx, rx, reservation) = state
+                    .channels
+                    .remove(&partition)
+                    .expect("partition not used yet");
+
+                (rx, reservation, Arc::clone(&state.abort_helper))
+            };
+
+            trace!(
+                "Before returning stream in {}::execute for partition: {}",
+                name,
+                partition
+            );
+
+            if preserve_order {
+                // Store streams from all the input partitions:
+                let input_streams = rx
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, receiver)| {
+                        // sender should be partition-wise
+                        Box::pin(OnDemandPerPartitionStream {
+                            schema: Arc::clone(&schema_captured),
+                            receiver,
+                            _drop_helper: Arc::clone(&abort_helper),
+                            reservation: Arc::clone(&reservation),
+                            sender: partition_txs[i].clone(),
+                            partition,
+                            is_requested: false,
+                        }) as SendableRecordBatchStream
+                    })
+                    .collect::<Vec<_>>();
+                // Note that receiver size (`rx.len()`) and 
`num_input_partitions` are same.
+
+                // Merge streams (while preserving ordering) coming from
+                // input partitions to this partition:
+                let fetch = None;
+                let merge_reservation =
+                    MemoryConsumer::new(format!("{}[Merge {partition}]", name))
+                        .register(context.memory_pool());
+                StreamingMergeBuilder::new()
+                    .with_streams(input_streams)
+                    .with_schema(schema_captured)
+                    .with_expressions(&sort_exprs)
+                    .with_metrics(BaselineMetrics::new(&metrics, partition))
+                    .with_batch_size(context.session_config().batch_size())
+                    .with_fetch(fetch)
+                    .with_reservation(merge_reservation)
+                    .build()
+            } else {
+                Ok(Box::pin(OnDemandRepartitionStream {
+                    num_input_partitions,
+                    num_input_partitions_processed: 0,
+                    schema: input.schema(),
+                    input: rx.swap_remove(0),
+                    _drop_helper: abort_helper,
+                    reservation,
+                    sender: partition_txs[0].clone(),
+                    partition,
+                    is_requested: false,
+                }) as SendableRecordBatchStream)
+            }
+        })
+        .try_flatten();
+        let stream = RecordBatchStreamAdapter::new(schema, stream);
+        Ok(Box::pin(stream))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.base.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.base.input.statistics()
+    }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
+
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // If the projection does not narrow the schema, we should not try to 
push it down.
+        if projection.expr().len() >= 
projection.input().schema().fields().len() {
+            return Ok(None);
+        }
+
+        // If pushdown is not beneficial or applicable, break it.
+        if projection.benefits_from_input_partitioning()[0]
+            || !all_columns(projection.expr())
+        {
+            return Ok(None);
+        }
+
+        let new_projection = make_with_child(projection, self.input())?;
+
+        Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
+            new_projection,
+            self.partitioning().clone(),
+        )?)))
+    }
+}
+
+impl OnDemandRepartitionExec {
+    /// Create a new RepartitionExec, that produces output `partitioning`, and
+    /// does not preserve the order of the input (see 
[`Self::with_preserve_order`]
+    /// for more details)
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        partitioning: Partitioning,
+    ) -> Result<Self> {
+        let preserve_order = false;
+        let cache = RepartitionExecBase::compute_properties(
+            &input,
+            partitioning.clone(),
+            preserve_order,
+        );
+        Ok(OnDemandRepartitionExec {
+            base: RepartitionExecBase {
+                input,
+                state: Default::default(),
+                metrics: ExecutionPlanMetricsSet::new(),
+                preserve_order,
+                cache,
+            },
+            partition_channels: Default::default(),
+        })
+    }
+
+    async fn process_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        buffer_tx: Sender<RecordBatch>,
+        context: Arc<TaskContext>,
+        fetch_time: metrics::Time,
+        send_buffer_time: metrics::Time,
+    ) -> Result<()> {
+        let timer = fetch_time.timer();
+        let mut stream = input.execute(partition, context).map_err(|e| {
+            internal_datafusion_err!(
+                "Error executing input partition {} for on demand 
repartitioning: {}",
+                partition,
+                e
+            )
+        })?;
+        timer.done();
+
+        loop {
+            let timer = fetch_time.timer();
+            let batch = stream.next().await;
+            timer.done();
+
+            // send the batch to the buffer channel
+            if let Some(batch) = batch {
+                let timer = send_buffer_time.timer();
+                buffer_tx.send(batch?).await.map_err(|e| {
+                    internal_datafusion_err!(
+                        "Error sending batch to buffer channel for partition 
{}: {}",
+                        partition,
+                        e
+                    )
+                })?;
+                timer.done();
+            } else {
+                break;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Pulls data from the specified input plan, feeding it to the
+    /// output partitions based on the desired partitioning
+    ///
+    /// txs hold the output sending channels for each output partition
+    pub(crate) async fn pull_from_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        mut output_channels: HashMap<
+            usize,
+            (DistributionSender<MaybeBatch>, SharedMemoryReservation),
+        >,
+        partitioning: Partitioning,
+        output_partition_rx: Receiver<usize>,
+        metrics: OnDemandRepartitionMetrics,
+        context: Arc<TaskContext>,
+    ) -> Result<()> {
+        // execute the child operator in a separate task
+        let (buffer_tx, buffer_rx) = async_channel::bounded::<RecordBatch>(2);
+        let processing_task = SpawnedTask::spawn(Self::process_input(

Review Comment:
   ```suggestion
           // initialize buffer channel so that we can pre-fetch from input
           let (buffer_tx, buffer_rx) = 
async_channel::bounded::<RecordBatch>(2);
           // execute the child operator in a separate task
           // that pushes batches into buffer channel with limited capacity
           let processing_task = SpawnedTask::spawn(Self::process_input(
   ```



##########
datafusion/physical-plan/src/repartition/on_demand_repartition.rs:
##########
@@ -0,0 +1,1362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements the [`OnDemandRepartitionExec`]  operator, which maps 
N input
+//! partitions to M output partitions based on a partitioning scheme, 
optionally
+//! maintaining the order of the input rows in the output. The operator is 
similar to the [`RepartitionExec`]
+//! operator, but it doesn't distribute the data to the output streams until 
the downstreams request the data.
+//!
+//! [`RepartitionExec`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::{
+    DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream,
+    RepartitionExecBase, SendableRecordBatchStream,
+};
+use crate::common::SharedMemoryReservation;
+use crate::execution_plan::CardinalityEffect;
+use crate::metrics::{self, BaselineMetrics, MetricBuilder};
+use crate::projection::{all_columns, make_with_child, ProjectionExec};
+use crate::repartition::distributor_channels::{
+    DistributionReceiver, DistributionSender,
+};
+use crate::repartition::RepartitionExecStateBuilder;
+use crate::sorts::streaming_merge::StreamingMergeBuilder;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
Statistics};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_channel::{Receiver, Sender};
+
+use datafusion_common::{internal_datafusion_err, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+
+use datafusion_common::HashMap;
+use futures::stream::Stream;
+use futures::{ready, FutureExt, StreamExt, TryStreamExt};
+use log::trace;
+use parking_lot::Mutex;
+
+type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
+
+/// The OnDemandRepartitionExec operator repartitions the input data based on 
a push-based model.
+/// It is similar to the RepartitionExec operator, but it doesn't distribute 
the data to the output
+/// partitions until the output partitions request the data.
+///
+/// When polling, the operator sends the output partition number to the one 
partition channel, then the prefetch buffer will distribute the data based on 
the order of the partition number.
+/// Each input steams has a prefetch buffer(channel) to distribute the data to 
the output partitions.
+///
+/// The following diagram illustrates the data flow of the 
OnDemandRepartitionExec operator with 3 output partitions for the input stream 
1:
+/// ```text
+///         /\                     /\                     /\
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
+/// │     Stream      │    │     Stream      │    │     Stream      │
+/// │       (1)       │    │       (2)       │    │       (3)       │
+/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
+///          │                     │                       │    / \
+///          │                     │                       │    | |
+///          │                     │                       │    | |
+///          └────────────────┐    │    ┌──────────────────┘    | |
+///                           │    │    │                       | |
+///                           ▼    ▼    ▼                       | |
+///                       ┌─────────────────┐                   | |
+///  Send the partition   │ partion channel │                   | |
+///  number when polling  │                 │                   | |
+///                       └────────┬────────┘                   | |
+///                                │                            | |
+///                                │                            | |
+///                                │  Get the partition number  | |
+///                                ▼  then send data            | |
+///                       ┌─────────────────┐                   | |
+///                       │ Prefetch Buffer │───────────────────┘ |
+///                       │       (1)       │─────────────────────┘
+///                       └─────────────────┘ Distribute data to the output 
partitions
+///
+/// ```text
+
+#[derive(Debug, Clone)]
+pub struct OnDemandRepartitionExec {
+    base: RepartitionExecBase,
+    /// Channel to send partition number to the downstream task
+    partition_channels: Arc<tokio::sync::OnceCell<Mutex<PartitionChannels>>>,
+}
+
+impl OnDemandRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.base.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.base.cache.partitioning
+    }
+
+    /// Get preserve_order flag of the RepartitionExecutor
+    /// `true` means `SortPreservingRepartitionExec`, `false` means 
`OnDemandRepartitionExec`
+    pub fn preserve_order(&self) -> bool {
+        self.base.preserve_order
+    }
+
+    /// Specify if this reparititoning operation should preserve the order of
+    /// rows from its input when producing output. Preserving order is more
+    /// expensive at runtime, so should only be set if the output of this
+    /// operator can take advantage of it.
+    ///
+    /// If the input is not ordered, or has only one partition, this is a no 
op,
+    /// and the node remains a `OnDemandRepartitionExec`.
+    pub fn with_preserve_order(mut self) -> Self {
+        self.base = self.base.with_preserve_order();
+        self
+    }
+
+    /// Get name used to display this Exec
+    pub fn name(&self) -> &str {
+        "OnDemandRepartitionExec"
+    }
+}
+
+impl DisplayAs for OnDemandRepartitionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "{}: partitioning={}, input_partitions={}",
+                    self.name(),
+                    self.partitioning(),
+                    self.base.input.output_partitioning().partition_count()
+                )?;
+
+                if self.base.preserve_order {
+                    write!(f, ", preserve_order=true")?;
+                }
+
+                if let Some(sort_exprs) = self.base.sort_exprs() {
+                    write!(f, ", sort_exprs={}", sort_exprs.clone())?;
+                }
+                Ok(())
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for OnDemandRepartitionExec {
+    fn name(&self) -> &'static str {
+        "OnDemandRepartitionExec"
+    }
+
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.base.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.base.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mut repartition = OnDemandRepartitionExec::try_new(
+            children.swap_remove(0),
+            self.partitioning().clone(),
+        )?;
+        if self.base.preserve_order {
+            repartition = repartition.with_preserve_order();
+        }
+        Ok(Arc::new(repartition))
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        RepartitionExecBase::maintains_input_order_helper(
+            self.input(),
+            self.base.preserve_order,
+        )
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start {}::execute for partition: {}",
+            self.name(),
+            partition
+        );
+
+        let lazy_state = Arc::clone(&self.base.state);
+        let partition_channels = Arc::clone(&self.partition_channels);
+        let input = Arc::clone(&self.base.input);
+        let partitioning = self.partitioning().clone();
+        let metrics = self.base.metrics.clone();
+        let preserve_order = self.base.preserve_order;
+        let name = self.name().to_owned();
+        let schema = self.schema();
+        let schema_captured = Arc::clone(&schema);
+
+        // Get existing ordering to use for merging
+        let sort_exprs = self.base.sort_exprs().cloned().unwrap_or_default();
+
+        let stream = futures::stream::once(async move {
+            let num_input_partitions = 
input.output_partitioning().partition_count();
+            let input_captured = Arc::clone(&input);
+            let metrics_captured = metrics.clone();
+            let name_captured = name.clone();
+            let context_captured = Arc::clone(&context);
+            let partition_channels = partition_channels
+                .get_or_init(|| async move {
+                    let (txs, rxs) = if preserve_order {
+                        (0..num_input_partitions)
+                            .map(|_| async_channel::unbounded())
+                            .unzip::<_, _, Vec<_>, Vec<_>>()
+                    } else {
+                        let (tx, rx) = async_channel::unbounded();
+                        (vec![tx], vec![rx])
+                    };
+                    Mutex::new((txs, rxs))
+                })
+                .await;
+            let (partition_txs, partition_rxs) = {
+                let channel = partition_channels.lock();
+                (channel.0.clone(), channel.1.clone())
+            };
+
+            let state = lazy_state
+                .get_or_init(|| async move {
+                    Mutex::new(
+                        RepartitionExecStateBuilder::new()
+                            .enable_pull_based(true)
+                            .partition_receivers(partition_rxs.clone())
+                            .build(
+                                input_captured,
+                                partitioning.clone(),
+                                metrics_captured,
+                                preserve_order,
+                                name_captured,
+                                context_captured,
+                            ),
+                    )
+                })
+                .await;
+
+            // lock scope
+            let (mut rx, reservation, abort_helper) = {
+                // lock mutexes
+                let mut state = state.lock();
+
+                // now return stream for the specified *output* partition 
which will
+                // read from the channel
+                let (_tx, rx, reservation) = state
+                    .channels
+                    .remove(&partition)
+                    .expect("partition not used yet");
+
+                (rx, reservation, Arc::clone(&state.abort_helper))
+            };
+
+            trace!(
+                "Before returning stream in {}::execute for partition: {}",
+                name,
+                partition
+            );
+
+            if preserve_order {
+                // Store streams from all the input partitions:
+                let input_streams = rx
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, receiver)| {
+                        // sender should be partition-wise
+                        Box::pin(OnDemandPerPartitionStream {
+                            schema: Arc::clone(&schema_captured),
+                            receiver,
+                            _drop_helper: Arc::clone(&abort_helper),
+                            reservation: Arc::clone(&reservation),
+                            sender: partition_txs[i].clone(),
+                            partition,
+                            is_requested: false,
+                        }) as SendableRecordBatchStream
+                    })
+                    .collect::<Vec<_>>();
+                // Note that receiver size (`rx.len()`) and 
`num_input_partitions` are same.
+
+                // Merge streams (while preserving ordering) coming from
+                // input partitions to this partition:
+                let fetch = None;
+                let merge_reservation =
+                    MemoryConsumer::new(format!("{}[Merge {partition}]", name))
+                        .register(context.memory_pool());
+                StreamingMergeBuilder::new()
+                    .with_streams(input_streams)
+                    .with_schema(schema_captured)
+                    .with_expressions(&sort_exprs)
+                    .with_metrics(BaselineMetrics::new(&metrics, partition))
+                    .with_batch_size(context.session_config().batch_size())
+                    .with_fetch(fetch)
+                    .with_reservation(merge_reservation)
+                    .build()
+            } else {
+                Ok(Box::pin(OnDemandRepartitionStream {
+                    num_input_partitions,
+                    num_input_partitions_processed: 0,
+                    schema: input.schema(),
+                    input: rx.swap_remove(0),
+                    _drop_helper: abort_helper,
+                    reservation,
+                    sender: partition_txs[0].clone(),
+                    partition,
+                    is_requested: false,
+                }) as SendableRecordBatchStream)
+            }
+        })
+        .try_flatten();
+        let stream = RecordBatchStreamAdapter::new(schema, stream);
+        Ok(Box::pin(stream))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.base.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.base.input.statistics()
+    }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
+
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // If the projection does not narrow the schema, we should not try to 
push it down.
+        if projection.expr().len() >= 
projection.input().schema().fields().len() {
+            return Ok(None);
+        }
+
+        // If pushdown is not beneficial or applicable, break it.
+        if projection.benefits_from_input_partitioning()[0]
+            || !all_columns(projection.expr())
+        {
+            return Ok(None);
+        }
+
+        let new_projection = make_with_child(projection, self.input())?;
+
+        Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
+            new_projection,
+            self.partitioning().clone(),
+        )?)))
+    }
+}
+
+impl OnDemandRepartitionExec {
+    /// Create a new RepartitionExec, that produces output `partitioning`, and
+    /// does not preserve the order of the input (see 
[`Self::with_preserve_order`]
+    /// for more details)
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        partitioning: Partitioning,
+    ) -> Result<Self> {
+        let preserve_order = false;
+        let cache = RepartitionExecBase::compute_properties(
+            &input,
+            partitioning.clone(),
+            preserve_order,
+        );
+        Ok(OnDemandRepartitionExec {
+            base: RepartitionExecBase {
+                input,
+                state: Default::default(),
+                metrics: ExecutionPlanMetricsSet::new(),
+                preserve_order,
+                cache,
+            },
+            partition_channels: Default::default(),
+        })
+    }
+
+    async fn process_input(

Review Comment:
   ```suggestion
       // Executes the input plan and poll stream into the buffer, records 
fetch_time and buffer_time metrics
       async fn process_input(
   ```



##########
datafusion/physical-plan/src/repartition/on_demand_repartition.rs:
##########
@@ -0,0 +1,1362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements the [`OnDemandRepartitionExec`]  operator, which maps 
N input
+//! partitions to M output partitions based on a partitioning scheme, 
optionally
+//! maintaining the order of the input rows in the output. The operator is 
similar to the [`RepartitionExec`]
+//! operator, but it doesn't distribute the data to the output streams until 
the downstreams request the data.
+//!
+//! [`RepartitionExec`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::{
+    DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream,
+    RepartitionExecBase, SendableRecordBatchStream,
+};
+use crate::common::SharedMemoryReservation;
+use crate::execution_plan::CardinalityEffect;
+use crate::metrics::{self, BaselineMetrics, MetricBuilder};
+use crate::projection::{all_columns, make_with_child, ProjectionExec};
+use crate::repartition::distributor_channels::{
+    DistributionReceiver, DistributionSender,
+};
+use crate::repartition::RepartitionExecStateBuilder;
+use crate::sorts::streaming_merge::StreamingMergeBuilder;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
Statistics};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_channel::{Receiver, Sender};
+
+use datafusion_common::{internal_datafusion_err, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+
+use datafusion_common::HashMap;
+use futures::stream::Stream;
+use futures::{ready, FutureExt, StreamExt, TryStreamExt};
+use log::trace;
+use parking_lot::Mutex;
+
+type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
+
+/// The OnDemandRepartitionExec operator repartitions the input data based on 
a push-based model.
+/// It is similar to the RepartitionExec operator, but it doesn't distribute 
the data to the output
+/// partitions until the output partitions request the data.
+///
+/// When polling, the operator sends the output partition number to the one 
partition channel, then the prefetch buffer will distribute the data based on 
the order of the partition number.
+/// Each input steams has a prefetch buffer(channel) to distribute the data to 
the output partitions.
+///
+/// The following diagram illustrates the data flow of the 
OnDemandRepartitionExec operator with 3 output partitions for the input stream 
1:
+/// ```text
+///         /\                     /\                     /\
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
+/// │     Stream      │    │     Stream      │    │     Stream      │
+/// │       (1)       │    │       (2)       │    │       (3)       │
+/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
+///          │                     │                       │    / \
+///          │                     │                       │    | |
+///          │                     │                       │    | |
+///          └────────────────┐    │    ┌──────────────────┘    | |
+///                           │    │    │                       | |
+///                           ▼    ▼    ▼                       | |
+///                       ┌─────────────────┐                   | |
+///  Send the partition   │ partion channel │                   | |
+///  number when polling  │                 │                   | |
+///                       └────────┬────────┘                   | |
+///                                │                            | |
+///                                │                            | |
+///                                │  Get the partition number  | |
+///                                ▼  then send data            | |
+///                       ┌─────────────────┐                   | |
+///                       │ Prefetch Buffer │───────────────────┘ |
+///                       │       (1)       │─────────────────────┘
+///                       └─────────────────┘ Distribute data to the output 
partitions
+///
+/// ```text
+
+#[derive(Debug, Clone)]
+pub struct OnDemandRepartitionExec {
+    base: RepartitionExecBase,
+    /// Channel to send partition number to the downstream task
+    partition_channels: Arc<tokio::sync::OnceCell<Mutex<PartitionChannels>>>,
+}
+
+impl OnDemandRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.base.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.base.cache.partitioning
+    }
+
+    /// Get preserve_order flag of the RepartitionExecutor
+    /// `true` means `SortPreservingRepartitionExec`, `false` means 
`OnDemandRepartitionExec`
+    pub fn preserve_order(&self) -> bool {
+        self.base.preserve_order
+    }
+
+    /// Specify if this reparititoning operation should preserve the order of
+    /// rows from its input when producing output. Preserving order is more
+    /// expensive at runtime, so should only be set if the output of this
+    /// operator can take advantage of it.
+    ///
+    /// If the input is not ordered, or has only one partition, this is a no 
op,
+    /// and the node remains a `OnDemandRepartitionExec`.
+    pub fn with_preserve_order(mut self) -> Self {
+        self.base = self.base.with_preserve_order();
+        self
+    }
+
+    /// Get name used to display this Exec
+    pub fn name(&self) -> &str {
+        "OnDemandRepartitionExec"
+    }
+}
+
+impl DisplayAs for OnDemandRepartitionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "{}: partitioning={}, input_partitions={}",
+                    self.name(),
+                    self.partitioning(),
+                    self.base.input.output_partitioning().partition_count()
+                )?;
+
+                if self.base.preserve_order {
+                    write!(f, ", preserve_order=true")?;
+                }
+
+                if let Some(sort_exprs) = self.base.sort_exprs() {
+                    write!(f, ", sort_exprs={}", sort_exprs.clone())?;
+                }
+                Ok(())
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for OnDemandRepartitionExec {
+    fn name(&self) -> &'static str {
+        "OnDemandRepartitionExec"
+    }
+
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.base.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.base.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mut repartition = OnDemandRepartitionExec::try_new(
+            children.swap_remove(0),
+            self.partitioning().clone(),
+        )?;
+        if self.base.preserve_order {
+            repartition = repartition.with_preserve_order();
+        }
+        Ok(Arc::new(repartition))
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        RepartitionExecBase::maintains_input_order_helper(
+            self.input(),
+            self.base.preserve_order,
+        )
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start {}::execute for partition: {}",
+            self.name(),
+            partition
+        );
+
+        let lazy_state = Arc::clone(&self.base.state);
+        let partition_channels = Arc::clone(&self.partition_channels);
+        let input = Arc::clone(&self.base.input);
+        let partitioning = self.partitioning().clone();
+        let metrics = self.base.metrics.clone();
+        let preserve_order = self.base.preserve_order;
+        let name = self.name().to_owned();
+        let schema = self.schema();
+        let schema_captured = Arc::clone(&schema);
+
+        // Get existing ordering to use for merging
+        let sort_exprs = self.base.sort_exprs().cloned().unwrap_or_default();
+
+        let stream = futures::stream::once(async move {
+            let num_input_partitions = 
input.output_partitioning().partition_count();
+            let input_captured = Arc::clone(&input);
+            let metrics_captured = metrics.clone();
+            let name_captured = name.clone();
+            let context_captured = Arc::clone(&context);
+            let partition_channels = partition_channels
+                .get_or_init(|| async move {
+                    let (txs, rxs) = if preserve_order {
+                        (0..num_input_partitions)
+                            .map(|_| async_channel::unbounded())
+                            .unzip::<_, _, Vec<_>, Vec<_>>()
+                    } else {
+                        let (tx, rx) = async_channel::unbounded();
+                        (vec![tx], vec![rx])
+                    };
+                    Mutex::new((txs, rxs))
+                })
+                .await;
+            let (partition_txs, partition_rxs) = {
+                let channel = partition_channels.lock();
+                (channel.0.clone(), channel.1.clone())
+            };
+
+            let state = lazy_state
+                .get_or_init(|| async move {
+                    Mutex::new(
+                        RepartitionExecStateBuilder::new()
+                            .enable_pull_based(true)
+                            .partition_receivers(partition_rxs.clone())
+                            .build(
+                                input_captured,
+                                partitioning.clone(),
+                                metrics_captured,
+                                preserve_order,
+                                name_captured,
+                                context_captured,
+                            ),
+                    )
+                })
+                .await;
+
+            // lock scope
+            let (mut rx, reservation, abort_helper) = {
+                // lock mutexes
+                let mut state = state.lock();
+
+                // now return stream for the specified *output* partition 
which will
+                // read from the channel
+                let (_tx, rx, reservation) = state
+                    .channels
+                    .remove(&partition)
+                    .expect("partition not used yet");
+
+                (rx, reservation, Arc::clone(&state.abort_helper))
+            };
+
+            trace!(
+                "Before returning stream in {}::execute for partition: {}",
+                name,
+                partition
+            );
+
+            if preserve_order {
+                // Store streams from all the input partitions:
+                let input_streams = rx
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, receiver)| {
+                        // sender should be partition-wise
+                        Box::pin(OnDemandPerPartitionStream {
+                            schema: Arc::clone(&schema_captured),
+                            receiver,
+                            _drop_helper: Arc::clone(&abort_helper),
+                            reservation: Arc::clone(&reservation),
+                            sender: partition_txs[i].clone(),
+                            partition,
+                            is_requested: false,
+                        }) as SendableRecordBatchStream
+                    })
+                    .collect::<Vec<_>>();
+                // Note that receiver size (`rx.len()`) and 
`num_input_partitions` are same.
+
+                // Merge streams (while preserving ordering) coming from
+                // input partitions to this partition:
+                let fetch = None;
+                let merge_reservation =
+                    MemoryConsumer::new(format!("{}[Merge {partition}]", name))
+                        .register(context.memory_pool());
+                StreamingMergeBuilder::new()
+                    .with_streams(input_streams)
+                    .with_schema(schema_captured)
+                    .with_expressions(&sort_exprs)
+                    .with_metrics(BaselineMetrics::new(&metrics, partition))
+                    .with_batch_size(context.session_config().batch_size())
+                    .with_fetch(fetch)
+                    .with_reservation(merge_reservation)
+                    .build()
+            } else {
+                Ok(Box::pin(OnDemandRepartitionStream {
+                    num_input_partitions,
+                    num_input_partitions_processed: 0,
+                    schema: input.schema(),
+                    input: rx.swap_remove(0),
+                    _drop_helper: abort_helper,
+                    reservation,
+                    sender: partition_txs[0].clone(),
+                    partition,
+                    is_requested: false,
+                }) as SendableRecordBatchStream)
+            }
+        })
+        .try_flatten();
+        let stream = RecordBatchStreamAdapter::new(schema, stream);
+        Ok(Box::pin(stream))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.base.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.base.input.statistics()
+    }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
+
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // If the projection does not narrow the schema, we should not try to 
push it down.
+        if projection.expr().len() >= 
projection.input().schema().fields().len() {
+            return Ok(None);
+        }
+
+        // If pushdown is not beneficial or applicable, break it.
+        if projection.benefits_from_input_partitioning()[0]
+            || !all_columns(projection.expr())
+        {
+            return Ok(None);
+        }
+
+        let new_projection = make_with_child(projection, self.input())?;
+
+        Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
+            new_projection,
+            self.partitioning().clone(),
+        )?)))
+    }
+}
+
+impl OnDemandRepartitionExec {
+    /// Create a new RepartitionExec, that produces output `partitioning`, and
+    /// does not preserve the order of the input (see 
[`Self::with_preserve_order`]
+    /// for more details)
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        partitioning: Partitioning,
+    ) -> Result<Self> {
+        let preserve_order = false;
+        let cache = RepartitionExecBase::compute_properties(
+            &input,
+            partitioning.clone(),
+            preserve_order,
+        );
+        Ok(OnDemandRepartitionExec {
+            base: RepartitionExecBase {
+                input,
+                state: Default::default(),
+                metrics: ExecutionPlanMetricsSet::new(),
+                preserve_order,
+                cache,
+            },
+            partition_channels: Default::default(),
+        })
+    }
+
+    async fn process_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        buffer_tx: Sender<RecordBatch>,
+        context: Arc<TaskContext>,
+        fetch_time: metrics::Time,
+        send_buffer_time: metrics::Time,
+    ) -> Result<()> {
+        let timer = fetch_time.timer();
+        let mut stream = input.execute(partition, context).map_err(|e| {
+            internal_datafusion_err!(
+                "Error executing input partition {} for on demand 
repartitioning: {}",
+                partition,
+                e
+            )
+        })?;
+        timer.done();
+
+        loop {
+            let timer = fetch_time.timer();
+            let batch = stream.next().await;
+            timer.done();
+
+            // send the batch to the buffer channel
+            if let Some(batch) = batch {
+                let timer = send_buffer_time.timer();
+                buffer_tx.send(batch?).await.map_err(|e| {
+                    internal_datafusion_err!(
+                        "Error sending batch to buffer channel for partition 
{}: {}",
+                        partition,
+                        e
+                    )
+                })?;
+                timer.done();
+            } else {
+                break;
+            }
+        }
+
+        Ok(())
+    }
+
+    /// Pulls data from the specified input plan, feeding it to the
+    /// output partitions based on the desired partitioning
+    ///
+    /// txs hold the output sending channels for each output partition
+    pub(crate) async fn pull_from_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        mut output_channels: HashMap<
+            usize,
+            (DistributionSender<MaybeBatch>, SharedMemoryReservation),
+        >,
+        partitioning: Partitioning,
+        output_partition_rx: Receiver<usize>,
+        metrics: OnDemandRepartitionMetrics,
+        context: Arc<TaskContext>,
+    ) -> Result<()> {
+        // execute the child operator in a separate task
+        let (buffer_tx, buffer_rx) = async_channel::bounded::<RecordBatch>(2);
+        let processing_task = SpawnedTask::spawn(Self::process_input(
+            Arc::clone(&input),
+            partition,
+            buffer_tx,
+            Arc::clone(&context),
+            metrics.fetch_time.clone(),
+            metrics.send_buffer_time.clone(),
+        ));
+
+        // While there are still outputs to send to, keep pulling inputs
+        let mut batches_until_yield = partitioning.partition_count();
+        while !output_channels.is_empty() {
+            // When the input is done, break the loop
+            let batch = match buffer_rx.recv().await {
+                Ok(batch) => batch,
+                _ => break,
+            };
+
+            // Get the partition number from the output partition
+            let partition = output_partition_rx.recv().await.map_err(|e| {
+                internal_datafusion_err!(
+                    "Error receiving partition number from output partition: 
{}",
+                    e
+                )
+            })?;
+
+            let size = batch.get_array_memory_size();
+
+            let timer = metrics.send_time[partition].timer();
+            // if there is still a receiver, send to it
+            if let Some((tx, reservation)) = 
output_channels.get_mut(&partition) {
+                reservation.lock().try_grow(size)?;
+
+                if tx.send(Some(Ok(batch))).await.is_err() {
+                    // If the other end has hung up, it was an early shutdown 
(e.g. LIMIT)
+                    reservation.lock().shrink(size);
+                    output_channels.remove(&partition);
+                }
+            }
+            timer.done();
+
+            // If the input stream is endless, we may spin forever and
+            // never yield back to tokio.  See
+            // https://github.com/apache/datafusion/issues/5278.
+            //
+            // However, yielding on every batch causes a bottleneck
+            // when running with multiple cores. See
+            // https://github.com/apache/datafusion/issues/6290
+            //
+            // Thus, heuristically yield after producing num_partition
+            // batches
+            if batches_until_yield == 0 {
+                tokio::task::yield_now().await;
+                batches_until_yield = partitioning.partition_count();
+            } else {
+                batches_until_yield -= 1;
+            }
+        }
+
+        processing_task.join().await.map_err(|e| {
+            internal_datafusion_err!("Error waiting for processing task to 
finish: {}", e)
+        })??;
+        Ok(())
+    }
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct OnDemandRepartitionMetrics {
+    /// Time in nanos to execute child operator and fetch batches
+    fetch_time: metrics::Time,
+    /// Time in nanos for sending resulting batches to buffer channels.
+    send_buffer_time: metrics::Time,
+    /// Time in nanos for sending resulting batches to channels.
+    ///
+    /// One metric per output partition.
+    send_time: Vec<metrics::Time>,
+}
+
+impl OnDemandRepartitionMetrics {
+    pub fn new(
+        input_partition: usize,
+        num_output_partitions: usize,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> Self {
+        // Time in nanos to execute child operator and fetch batches
+        let fetch_time =
+            MetricBuilder::new(metrics).subset_time("fetch_time", 
input_partition);
+
+        // Time in nanos for sending resulting batches to channels
+        let send_time = (0..num_output_partitions)
+            .map(|output_partition| {
+                let label =
+                    metrics::Label::new("outputPartition", 
output_partition.to_string());
+                MetricBuilder::new(metrics)
+                    .with_label(label)
+                    .subset_time("send_time", input_partition)
+            })
+            .collect();
+
+        // Time in nanos for sending resulting batches to buffer channels
+        let send_buffer_time =
+            MetricBuilder::new(metrics).subset_time("send_buffer_time", 
input_partition);
+        Self {
+            fetch_time,
+            send_time,
+            send_buffer_time,
+        }
+    }
+}
+
+/// This struct converts a receiver to a stream.
+/// Receiver receives data on an SPSC channel.
+struct OnDemandPerPartitionStream {
+    /// Schema wrapped by Arc
+    schema: SchemaRef,
+
+    /// channel containing the repartitioned batches
+    receiver: DistributionReceiver<MaybeBatch>,
+
+    /// Handle to ensure background tasks are killed when no longer needed.
+    _drop_helper: Arc<Vec<SpawnedTask<()>>>,
+
+    /// Memory reservation.
+    reservation: SharedMemoryReservation,
+
+    /// Sender to send partititon number to the receiver
+    sender: Sender<usize>,
+
+    /// Partition number
+    partition: usize,
+
+    /// Sender State
+    is_requested: bool,
+}
+
+impl Stream for OnDemandPerPartitionStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        if !self.is_requested && !self.sender.is_closed() {
+            self.sender.try_send(self.partition).map_err(|e| {
+                internal_datafusion_err!(
+                    "Error sending partition number to the receiver for 
partition {}: {}",
+                    self.partition,
+                    e
+                )
+            })?;
+            self.is_requested = true;
+        }
+
+        let result = ready!(self.receiver.recv().poll_unpin(cx));
+        self.is_requested = false;

Review Comment:
   Do you think we can create a test case for the `is_requested` logic? I'm not 
sure if we can deterministically create a race-condition but it would be really 
better if we could protect the behavior with a unit test.
   
   And maybe we can put more for docs what is it really do



##########
datafusion/physical-plan/src/repartition/on_demand_repartition.rs:
##########
@@ -0,0 +1,1362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This file implements the [`OnDemandRepartitionExec`]  operator, which maps 
N input
+//! partitions to M output partitions based on a partitioning scheme, 
optionally
+//! maintaining the order of the input rows in the output. The operator is 
similar to the [`RepartitionExec`]
+//! operator, but it doesn't distribute the data to the output streams until 
the downstreams request the data.
+//!
+//! [`RepartitionExec`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/repartition/struct.RepartitionExec.html
+
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::{
+    DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream,
+    RepartitionExecBase, SendableRecordBatchStream,
+};
+use crate::common::SharedMemoryReservation;
+use crate::execution_plan::CardinalityEffect;
+use crate::metrics::{self, BaselineMetrics, MetricBuilder};
+use crate::projection::{all_columns, make_with_child, ProjectionExec};
+use crate::repartition::distributor_channels::{
+    DistributionReceiver, DistributionSender,
+};
+use crate::repartition::RepartitionExecStateBuilder;
+use crate::sorts::streaming_merge::StreamingMergeBuilder;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
Statistics};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_channel::{Receiver, Sender};
+
+use datafusion_common::{internal_datafusion_err, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+
+use datafusion_common::HashMap;
+use futures::stream::Stream;
+use futures::{ready, FutureExt, StreamExt, TryStreamExt};
+use log::trace;
+use parking_lot::Mutex;
+
+type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
+
+/// The OnDemandRepartitionExec operator repartitions the input data based on 
a push-based model.
+/// It is similar to the RepartitionExec operator, but it doesn't distribute 
the data to the output
+/// partitions until the output partitions request the data.
+///
+/// When polling, the operator sends the output partition number to the one 
partition channel, then the prefetch buffer will distribute the data based on 
the order of the partition number.
+/// Each input steams has a prefetch buffer(channel) to distribute the data to 
the output partitions.
+///
+/// The following diagram illustrates the data flow of the 
OnDemandRepartitionExec operator with 3 output partitions for the input stream 
1:
+/// ```text
+///         /\                     /\                     /\
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+///         ││                     ││                     ││
+/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
+/// │     Stream      │    │     Stream      │    │     Stream      │
+/// │       (1)       │    │       (2)       │    │       (3)       │
+/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
+///          │                     │                       │    / \
+///          │                     │                       │    | |
+///          │                     │                       │    | |
+///          └────────────────┐    │    ┌──────────────────┘    | |
+///                           │    │    │                       | |
+///                           ▼    ▼    ▼                       | |
+///                       ┌─────────────────┐                   | |
+///  Send the partition   │ partion channel │                   | |
+///  number when polling  │                 │                   | |
+///                       └────────┬────────┘                   | |
+///                                │                            | |
+///                                │                            | |
+///                                │  Get the partition number  | |
+///                                ▼  then send data            | |
+///                       ┌─────────────────┐                   | |
+///                       │ Prefetch Buffer │───────────────────┘ |
+///                       │       (1)       │─────────────────────┘
+///                       └─────────────────┘ Distribute data to the output 
partitions
+///
+/// ```text
+
+#[derive(Debug, Clone)]
+pub struct OnDemandRepartitionExec {
+    base: RepartitionExecBase,
+    /// Channel to send partition number to the downstream task
+    partition_channels: Arc<tokio::sync::OnceCell<Mutex<PartitionChannels>>>,
+}
+
+impl OnDemandRepartitionExec {
+    /// Input execution plan
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.base.input
+    }
+
+    /// Partitioning scheme to use
+    pub fn partitioning(&self) -> &Partitioning {
+        &self.base.cache.partitioning
+    }
+
+    /// Get preserve_order flag of the RepartitionExecutor
+    /// `true` means `SortPreservingRepartitionExec`, `false` means 
`OnDemandRepartitionExec`
+    pub fn preserve_order(&self) -> bool {
+        self.base.preserve_order
+    }
+
+    /// Specify if this reparititoning operation should preserve the order of
+    /// rows from its input when producing output. Preserving order is more
+    /// expensive at runtime, so should only be set if the output of this
+    /// operator can take advantage of it.
+    ///
+    /// If the input is not ordered, or has only one partition, this is a no 
op,
+    /// and the node remains a `OnDemandRepartitionExec`.
+    pub fn with_preserve_order(mut self) -> Self {
+        self.base = self.base.with_preserve_order();
+        self
+    }
+
+    /// Get name used to display this Exec
+    pub fn name(&self) -> &str {
+        "OnDemandRepartitionExec"
+    }
+}
+
+impl DisplayAs for OnDemandRepartitionExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "{}: partitioning={}, input_partitions={}",
+                    self.name(),
+                    self.partitioning(),
+                    self.base.input.output_partitioning().partition_count()
+                )?;
+
+                if self.base.preserve_order {
+                    write!(f, ", preserve_order=true")?;
+                }
+
+                if let Some(sort_exprs) = self.base.sort_exprs() {
+                    write!(f, ", sort_exprs={}", sort_exprs.clone())?;
+                }
+                Ok(())
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for OnDemandRepartitionExec {
+    fn name(&self) -> &'static str {
+        "OnDemandRepartitionExec"
+    }
+
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.base.cache
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.base.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let mut repartition = OnDemandRepartitionExec::try_new(
+            children.swap_remove(0),
+            self.partitioning().clone(),
+        )?;
+        if self.base.preserve_order {
+            repartition = repartition.with_preserve_order();
+        }
+        Ok(Arc::new(repartition))
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        RepartitionExecBase::maintains_input_order_helper(
+            self.input(),
+            self.base.preserve_order,
+        )
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        trace!(
+            "Start {}::execute for partition: {}",
+            self.name(),
+            partition
+        );
+
+        let lazy_state = Arc::clone(&self.base.state);
+        let partition_channels = Arc::clone(&self.partition_channels);
+        let input = Arc::clone(&self.base.input);
+        let partitioning = self.partitioning().clone();
+        let metrics = self.base.metrics.clone();
+        let preserve_order = self.base.preserve_order;
+        let name = self.name().to_owned();
+        let schema = self.schema();
+        let schema_captured = Arc::clone(&schema);
+
+        // Get existing ordering to use for merging
+        let sort_exprs = self.base.sort_exprs().cloned().unwrap_or_default();
+
+        let stream = futures::stream::once(async move {
+            let num_input_partitions = 
input.output_partitioning().partition_count();
+            let input_captured = Arc::clone(&input);
+            let metrics_captured = metrics.clone();
+            let name_captured = name.clone();
+            let context_captured = Arc::clone(&context);
+            let partition_channels = partition_channels
+                .get_or_init(|| async move {
+                    let (txs, rxs) = if preserve_order {
+                        (0..num_input_partitions)
+                            .map(|_| async_channel::unbounded())
+                            .unzip::<_, _, Vec<_>, Vec<_>>()
+                    } else {
+                        let (tx, rx) = async_channel::unbounded();
+                        (vec![tx], vec![rx])
+                    };
+                    Mutex::new((txs, rxs))
+                })
+                .await;
+            let (partition_txs, partition_rxs) = {
+                let channel = partition_channels.lock();
+                (channel.0.clone(), channel.1.clone())
+            };
+
+            let state = lazy_state
+                .get_or_init(|| async move {
+                    Mutex::new(
+                        RepartitionExecStateBuilder::new()
+                            .enable_pull_based(true)
+                            .partition_receivers(partition_rxs.clone())
+                            .build(
+                                input_captured,
+                                partitioning.clone(),
+                                metrics_captured,
+                                preserve_order,
+                                name_captured,
+                                context_captured,
+                            ),
+                    )
+                })
+                .await;
+
+            // lock scope
+            let (mut rx, reservation, abort_helper) = {
+                // lock mutexes
+                let mut state = state.lock();
+
+                // now return stream for the specified *output* partition 
which will
+                // read from the channel
+                let (_tx, rx, reservation) = state
+                    .channels
+                    .remove(&partition)
+                    .expect("partition not used yet");
+
+                (rx, reservation, Arc::clone(&state.abort_helper))
+            };
+
+            trace!(
+                "Before returning stream in {}::execute for partition: {}",
+                name,
+                partition
+            );
+
+            if preserve_order {
+                // Store streams from all the input partitions:
+                let input_streams = rx
+                    .into_iter()
+                    .enumerate()
+                    .map(|(i, receiver)| {
+                        // sender should be partition-wise
+                        Box::pin(OnDemandPerPartitionStream {
+                            schema: Arc::clone(&schema_captured),
+                            receiver,
+                            _drop_helper: Arc::clone(&abort_helper),
+                            reservation: Arc::clone(&reservation),
+                            sender: partition_txs[i].clone(),
+                            partition,
+                            is_requested: false,
+                        }) as SendableRecordBatchStream
+                    })
+                    .collect::<Vec<_>>();
+                // Note that receiver size (`rx.len()`) and 
`num_input_partitions` are same.
+
+                // Merge streams (while preserving ordering) coming from
+                // input partitions to this partition:
+                let fetch = None;
+                let merge_reservation =
+                    MemoryConsumer::new(format!("{}[Merge {partition}]", name))
+                        .register(context.memory_pool());
+                StreamingMergeBuilder::new()
+                    .with_streams(input_streams)
+                    .with_schema(schema_captured)
+                    .with_expressions(&sort_exprs)
+                    .with_metrics(BaselineMetrics::new(&metrics, partition))
+                    .with_batch_size(context.session_config().batch_size())
+                    .with_fetch(fetch)
+                    .with_reservation(merge_reservation)
+                    .build()
+            } else {
+                Ok(Box::pin(OnDemandRepartitionStream {
+                    num_input_partitions,
+                    num_input_partitions_processed: 0,
+                    schema: input.schema(),
+                    input: rx.swap_remove(0),
+                    _drop_helper: abort_helper,
+                    reservation,
+                    sender: partition_txs[0].clone(),
+                    partition,
+                    is_requested: false,
+                }) as SendableRecordBatchStream)
+            }
+        })
+        .try_flatten();
+        let stream = RecordBatchStreamAdapter::new(schema, stream);
+        Ok(Box::pin(stream))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.base.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.base.input.statistics()
+    }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
+
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        // If the projection does not narrow the schema, we should not try to 
push it down.
+        if projection.expr().len() >= 
projection.input().schema().fields().len() {
+            return Ok(None);
+        }
+
+        // If pushdown is not beneficial or applicable, break it.
+        if projection.benefits_from_input_partitioning()[0]
+            || !all_columns(projection.expr())
+        {
+            return Ok(None);
+        }
+
+        let new_projection = make_with_child(projection, self.input())?;
+
+        Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
+            new_projection,
+            self.partitioning().clone(),
+        )?)))
+    }
+}
+
+impl OnDemandRepartitionExec {
+    /// Create a new RepartitionExec, that produces output `partitioning`, and
+    /// does not preserve the order of the input (see 
[`Self::with_preserve_order`]
+    /// for more details)
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        partitioning: Partitioning,
+    ) -> Result<Self> {
+        let preserve_order = false;
+        let cache = RepartitionExecBase::compute_properties(
+            &input,
+            partitioning.clone(),
+            preserve_order,
+        );
+        Ok(OnDemandRepartitionExec {
+            base: RepartitionExecBase {
+                input,
+                state: Default::default(),
+                metrics: ExecutionPlanMetricsSet::new(),
+                preserve_order,
+                cache,
+            },
+            partition_channels: Default::default(),
+        })
+    }
+
+    async fn process_input(
+        input: Arc<dyn ExecutionPlan>,
+        partition: usize,
+        buffer_tx: Sender<RecordBatch>,
+        context: Arc<TaskContext>,
+        fetch_time: metrics::Time,
+        send_buffer_time: metrics::Time,
+    ) -> Result<()> {
+        let timer = fetch_time.timer();
+        let mut stream = input.execute(partition, context).map_err(|e| {
+            internal_datafusion_err!(
+                "Error executing input partition {} for on demand 
repartitioning: {}",
+                partition,
+                e
+            )
+        })?;
+        timer.done();
+
+        loop {
+            let timer = fetch_time.timer();
+            let batch = stream.next().await;
+            timer.done();
+
+            // send the batch to the buffer channel
+            if let Some(batch) = batch {
+                let timer = send_buffer_time.timer();
+                buffer_tx.send(batch?).await.map_err(|e| {
+                    internal_datafusion_err!(
+                        "Error sending batch to buffer channel for partition 
{}: {}",
+                        partition,
+                        e
+                    )
+                })?;
+                timer.done();
+            } else {
+                break;
+            }

Review Comment:
   ```suggestion
               let Some(batch) = batch  else {
                   break;
               }
               let timer = send_buffer_time.timer();
               // Feed the buffer with batch, since the buffer channel has 
limited capacity
               // The process waits here until one is consumed
               buffer_tx.send(batch?).await.map_err(|e| {
                   internal_datafusion_err!(
                       "Error sending batch to buffer channel for partition {}: 
{}",
                       partition,
                       e
                   )
               })?;
               timer.done();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to