adriangb commented on code in PR #19761:
URL: https://github.com/apache/datafusion/pull/19761#discussion_r2690601539


##########
datafusion/physical-plan/src/buffer.rs:
##########
@@ -0,0 +1,626 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`BufferExec`] decouples production and consumption on messages by 
buffering the input in the
+//! background up to a certain capacity.
+
+use crate::execution_plan::{CardinalityEffect, SchedulingType};
+use crate::filter_pushdown::{
+    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
+    FilterPushdownPropagation,
+};
+use crate::projection::ProjectionExec;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{
+    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, 
SortOrderPushdownResult,
+};
+use arrow::array::RecordBatch;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::{Result, Statistics, internal_err, plan_err};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_physical_expr_common::metrics::{
+    ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+};
+use datafusion_physical_expr_common::physical_expr::{
+    PhysicalExpr, is_dynamic_physical_expr,
+};
+use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
+use futures::{Stream, StreamExt, TryStreamExt};
+use pin_project_lite::pin_project;
+use std::any::Any;
+use std::fmt;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::task::{Context, Poll};
+use tokio::sync::mpsc::UnboundedReceiver;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+
+/// Decouples production and consumption of record batches with an internal 
queue per partition,
+/// eagerly filling up the capacity of the queues even before any message is 
requested.
+///
+/// ```text
+///             ┌───────────────────────────┐
+///             │        BufferExec         │
+///             │                           │
+///             │┌────── Partition 0 ──────┐│
+///             ││            ┌────┐ ┌────┐││       ┌────┐
+/// ──background poll────────▶│    │ │    ├┼┼───────▶    │
+///             ││            └────┘ └────┘││       └────┘
+///             │└─────────────────────────┘│
+///             │┌────── Partition 1 ──────┐│
+///             ││     ┌────┐ ┌────┐ ┌────┐││       ┌────┐
+/// ──background poll─▶│    │ │    │ │    ├┼┼───────▶    │
+///             ││     └────┘ └────┘ └────┘││       └────┘
+///             │└─────────────────────────┘│
+///             │                           │
+///             │           ...             │
+///             │                           │
+///             │┌────── Partition N ──────┐│
+///             ││                   ┌────┐││       ┌────┐
+/// ──background poll───────────────▶│    ├┼┼───────▶    │
+///             ││                   └────┘││       └────┘
+///             │└─────────────────────────┘│
+///             └───────────────────────────┘
+/// ```
+///
+/// The capacity is provided in bytes, and for each buffered record batch it 
will take into account
+/// the size reported by [RecordBatch::get_array_memory_size].
+///
+/// This is useful for operators that conditionally start polling one of their 
children only after
+/// other child has finished, allowing to perform some early work and 
accumulating batches in
+/// memory so that they can be served immediately when requested.
+#[derive(Debug, Clone)]
+pub struct BufferExec {
+    input: Arc<dyn ExecutionPlan>,
+    properties: PlanProperties,
+    capacity: usize,
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl BufferExec {
+    /// Builds a new [BufferExec] with the provided capacity in bytes.
+    pub fn new(input: Arc<dyn ExecutionPlan>, capacity: usize) -> Self {
+        let properties = input
+            .properties()
+            .clone()
+            .with_scheduling_type(SchedulingType::Cooperative);
+
+        Self {
+            input,
+            properties,
+            capacity,
+            metrics: ExecutionPlanMetricsSet::new(),
+        }
+    }
+
+    /// Returns the input [ExecutionPlan] of this [BufferExec].
+    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+        &self.input
+    }
+
+    /// Returns the per-partition capacity in bytes for this [BufferExec].
+    pub fn capacity(&self) -> usize {
+        self.capacity
+    }
+}
+
+impl DisplayAs for BufferExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "BufferExec: capacity={}", self.capacity)
+            }
+            DisplayFormatType::TreeRender => {
+                writeln!(f, "target_batch_size={}", self.capacity)
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for BufferExec {
+    fn name(&self) -> &str {
+        "BufferExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true]
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        mut children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if children.len() != 1 {
+            return plan_err!("BufferExec can only have one child");
+        }
+        Ok(Arc::new(Self::new(children.swap_remove(0), self.capacity)))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let mem_reservation = 
MemoryConsumer::new(format!("BufferExec[{partition}]"))
+            .register(context.memory_pool());
+        let in_stream = self.input.execute(partition, context)?;
+
+        // Set up the metrics for the stream.
+        let curr_mem_in = Arc::new(AtomicUsize::new(0));
+        let curr_mem_out = Arc::clone(&curr_mem_in);
+        let mut max_mem_in = 0;
+        let max_mem = MetricBuilder::new(&self.metrics).gauge("max_mem_used", 
partition);
+
+        let curr_queued_in = Arc::new(AtomicUsize::new(0));
+        let curr_queued_out = Arc::clone(&curr_queued_in);
+        let mut max_queued_in = 0;
+        let max_queued = MetricBuilder::new(&self.metrics).gauge("max_queued", 
partition);
+
+        // Capture metrics when an element is queued on the stream.
+        let in_stream = in_stream.inspect_ok(move |v| {
+            let size = v.get_array_memory_size();
+            let curr_size = curr_mem_in.fetch_add(size, Ordering::Relaxed) + 
size;
+            if curr_size > max_mem_in {
+                max_mem_in = curr_size;
+                max_mem.set(max_mem_in);
+            }
+
+            let curr_queued = curr_queued_in.fetch_add(1, Ordering::Relaxed) + 
1;
+            if curr_queued > max_queued_in {
+                max_queued_in = curr_queued;
+                max_queued.set(max_queued_in);
+            }
+        });
+        // Buffer the input.
+        let out_stream =
+            MemoryBufferedStream::new(in_stream, self.capacity, 
mem_reservation);
+        // Update in the metrics that when an element gets out, some memory 
gets freed.
+        let out_stream = out_stream.inspect_ok(move |v| {
+            curr_mem_out.fetch_sub(v.get_array_memory_size(), 
Ordering::Relaxed);
+            curr_queued_out.fetch_sub(1, Ordering::Relaxed);
+        });
+
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            self.schema(),
+            out_stream,
+        )))
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
+        self.input.partition_statistics(partition)
+    }
+
+    fn supports_limit_pushdown(&self) -> bool {
+        self.input.supports_limit_pushdown()
+    }
+
+    fn cardinality_effect(&self) -> CardinalityEffect {
+        CardinalityEffect::Equal
+    }
+
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        match self.input.try_swapping_with_projection(projection)? {
+            Some(new_input) => Ok(Some(
+                Arc::new(self.clone()).with_new_children(vec![new_input])?,
+            )),
+            None => Ok(None),
+        }
+    }
+
+    fn gather_filters_for_pushdown(
+        &self,
+        _phase: FilterPushdownPhase,
+        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+        _config: &ConfigOptions,
+    ) -> Result<FilterDescription> {
+        FilterDescription::from_children(parent_filters, &self.children())
+    }
+
+    fn handle_child_pushdown_result(
+        &self,
+        _phase: FilterPushdownPhase,
+        child_pushdown_result: ChildPushdownResult,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+        // If there is a dynamic filter being pushed down through this node, 
we don't want to buffer,
+        // we prefer to give a chance to the dynamic filter to be populated 
with something rather
+        // than eagerly polling data with an empty dynamic filter.
+        let mut has_dynamic_filter = false;
+        for parent_filter in &child_pushdown_result.parent_filters {
+            if is_dynamic_physical_expr(&parent_filter.filter) {
+                has_dynamic_filter = true;
+            }
+        }
+        if has_dynamic_filter {
+            let mut result = 
FilterPushdownPropagation::if_all(child_pushdown_result);
+            result.updated_node = Some(Arc::clone(self.input()));
+            Ok(result)
+        } else {
+            Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
+        }

Review Comment:
   Yeah makes sense it wouldn’t work from here. We’d need a different explicit 
API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to