This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ea3b965dd Memory reservation & metrics for cross join (#5339)
ea3b965dd is described below

commit ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12
Author: Eduard Karacharov <[email protected]>
AuthorDate: Tue Feb 28 17:33:07 2023 +0300

    Memory reservation & metrics for cross join (#5339)
    
    * Memory reservation & metrics for cross join
    
    * memory_limit test & removed fixed error msg from test_overallocation
---
 datafusion/core/src/physical_plan/common.rs        |   4 +
 .../core/src/physical_plan/joins/cross_join.rs     | 209 +++++++++++++++------
 datafusion/core/src/physical_plan/joins/utils.rs   |  63 +++++++
 .../core/src/physical_plan/repartition/mod.rs      |   5 +-
 datafusion/core/src/test/mod.rs                    |  15 +-
 datafusion/core/tests/memory_limit.rs              |  10 +
 6 files changed, 246 insertions(+), 60 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs 
b/datafusion/core/src/physical_plan/common.rs
index caf9b5ebc..68f888210 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -20,6 +20,7 @@
 use super::{RecordBatchStream, SendableRecordBatchStream};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
+use crate::execution::memory_pool::MemoryReservation;
 use crate::physical_plan::metrics::MemTrackingMetrics;
 use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, 
Statistics};
 use arrow::datatypes::{Schema, SchemaRef};
@@ -28,6 +29,7 @@ use arrow::record_batch::RecordBatch;
 use datafusion_physical_expr::PhysicalSortExpr;
 use futures::{Future, Stream, StreamExt, TryStreamExt};
 use log::debug;
+use parking_lot::Mutex;
 use pin_project_lite::pin_project;
 use std::fs;
 use std::fs::{metadata, File};
@@ -37,6 +39,8 @@ use std::task::{Context, Poll};
 use tokio::sync::mpsc;
 use tokio::task::JoinHandle;
 
+pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
+
 /// Stream of record batches
 pub struct SizedRecordBatchStream {
     schema: SchemaRef,
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs 
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 522cebe0c..0523a2e35 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -26,6 +26,9 @@ use arrow::datatypes::{Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 
 use crate::execution::context::TaskContext;
+use crate::execution::memory_pool::MemoryConsumer;
+use crate::physical_plan::common::SharedMemoryReservation;
+use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use crate::physical_plan::{
     coalesce_batches::concat_batches, 
coalesce_partitions::CoalescePartitionsExec,
     ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
@@ -35,12 +38,11 @@ use crate::physical_plan::{
 use crate::{error::Result, scalar::ScalarValue};
 use async_trait::async_trait;
 use datafusion_common::DataFusionError;
-use log::debug;
-use std::time::Instant;
+use parking_lot::Mutex;
 
 use super::utils::{
-    adjust_right_output_partitioning, cross_join_equivalence_properties, 
OnceAsync,
-    OnceFut,
+    adjust_right_output_partitioning, cross_join_equivalence_properties,
+    BuildProbeJoinMetrics, OnceAsync, OnceFut,
 };
 
 /// Data of the left side
@@ -58,6 +60,8 @@ pub struct CrossJoinExec {
     schema: SchemaRef,
     /// Build-side data
     left_fut: OnceAsync<JoinLeftData>,
+    /// Execution plan metrics
+    metrics: ExecutionPlanMetricsSet,
 }
 
 impl CrossJoinExec {
@@ -79,6 +83,7 @@ impl CrossJoinExec {
             right,
             schema,
             left_fut: Default::default(),
+            metrics: ExecutionPlanMetricsSet::default(),
         }
     }
 
@@ -97,9 +102,9 @@ impl CrossJoinExec {
 async fn load_left_input(
     left: Arc<dyn ExecutionPlan>,
     context: Arc<TaskContext>,
+    metrics: BuildProbeJoinMetrics,
+    reservation: SharedMemoryReservation,
 ) -> Result<JoinLeftData> {
-    let start = Instant::now();
-
     // merge all left parts into a single stream
     let merge = {
         if left.output_partitioning().partition_count() != 1 {
@@ -111,22 +116,28 @@ async fn load_left_input(
     let stream = merge.execute(0, context)?;
 
     // Load all batches and count the rows
-    let (batches, num_rows) = stream
-        .try_fold((Vec::new(), 0usize), |mut acc, batch| async {
-            acc.1 += batch.num_rows();
-            acc.0.push(batch);
-            Ok(acc)
-        })
+    let (batches, num_rows, _, _) = stream
+        .try_fold(
+            (Vec::new(), 0usize, metrics, reservation),
+            |mut acc, batch| async {
+                let batch_size = batch.get_array_memory_size();
+                // Reserve memory for incoming batch
+                acc.3.lock().try_grow(batch_size)?;
+                // Update metrics
+                acc.2.build_mem_used.add(batch_size);
+                acc.2.build_input_batches.add(1);
+                acc.2.build_input_rows.add(batch.num_rows());
+                // Update rowcount
+                acc.1 += batch.num_rows();
+                // Push batch to output
+                acc.0.push(batch);
+                Ok(acc)
+            },
+        )
         .await?;
 
     let merged_batch = concat_batches(&left.schema(), &batches, num_rows)?;
 
-    debug!(
-        "Built build-side of cross join containing {} rows in {} ms",
-        num_rows,
-        start.elapsed().as_millis()
-    );
-
     Ok(merged_batch)
 }
 
@@ -143,6 +154,10 @@ impl ExecutionPlan for CrossJoinExec {
         vec![self.left.clone(), self.right.clone()]
     }
 
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
     /// Specifies whether this plan generates an infinite stream of records.
     /// If the plan does not support pipelining, but it its input(s) are
     /// infinite, returns an error to indicate this.    
@@ -205,9 +220,20 @@ impl ExecutionPlan for CrossJoinExec {
     ) -> Result<SendableRecordBatchStream> {
         let stream = self.right.execute(partition, context.clone())?;
 
-        let left_fut = self
-            .left_fut
-            .once(|| load_left_input(self.left.clone(), context));
+        let join_metrics = BuildProbeJoinMetrics::new(partition, 
&self.metrics);
+        let reservation = Arc::new(Mutex::new(
+            MemoryConsumer::new(format!("CrossJoinStream[{partition}]"))
+                .register(context.memory_pool()),
+        ));
+
+        let left_fut = self.left_fut.once(|| {
+            load_left_input(
+                self.left.clone(),
+                context,
+                join_metrics.clone(),
+                reservation.clone(),
+            )
+        });
 
         Ok(Box::pin(CrossJoinStream {
             schema: self.schema.clone(),
@@ -215,11 +241,8 @@ impl ExecutionPlan for CrossJoinExec {
             right: stream,
             right_batch: Arc::new(parking_lot::Mutex::new(None)),
             left_index: 0,
-            num_input_batches: 0,
-            num_input_rows: 0,
-            num_output_batches: 0,
-            num_output_rows: 0,
-            join_time: 0,
+            join_metrics,
+            reservation,
         }))
     }
 
@@ -321,16 +344,10 @@ struct CrossJoinStream {
     left_index: usize,
     /// Current batch being processed from the right side
     right_batch: Arc<parking_lot::Mutex<Option<RecordBatch>>>,
-    /// number of input batches
-    num_input_batches: usize,
-    /// number of input rows
-    num_input_rows: usize,
-    /// number of batches produced
-    num_output_batches: usize,
-    /// number of rows produced
-    num_output_rows: usize,
-    /// total time for joining probe-side batches to the build-side batches
-    join_time: usize,
+    /// join execution metrics
+    join_metrics: BuildProbeJoinMetrics,
+    /// memory reservation
+    reservation: SharedMemoryReservation,
 }
 
 impl RecordBatchStream for CrossJoinStream {
@@ -385,28 +402,30 @@ impl CrossJoinStream {
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Result<RecordBatch>>> {
+        let build_timer = self.join_metrics.build_time.timer();
         let left_data = match ready!(self.left_fut.get(cx)) {
             Ok(left_data) => left_data,
             Err(e) => return Poll::Ready(Some(Err(e))),
         };
+        build_timer.done();
 
         if left_data.num_rows() == 0 {
             return Poll::Ready(None);
         }
 
         if self.left_index > 0 && self.left_index < left_data.num_rows() {
-            let start = Instant::now();
+            let join_timer = self.join_metrics.join_time.timer();
             let right_batch = {
                 let right_batch = self.right_batch.lock();
                 right_batch.clone().unwrap()
             };
             let result =
                 build_batch(self.left_index, &right_batch, left_data, 
&self.schema);
-            self.num_input_rows += right_batch.num_rows();
+            self.join_metrics.input_rows.add(right_batch.num_rows());
             if let Ok(ref batch) = result {
-                self.join_time += start.elapsed().as_millis() as usize;
-                self.num_output_batches += 1;
-                self.num_output_rows += batch.num_rows();
+                join_timer.done();
+                self.join_metrics.output_batches.add(1);
+                self.join_metrics.output_rows.add(batch.num_rows());
             }
             self.left_index += 1;
             return Poll::Ready(Some(result));
@@ -416,15 +435,15 @@ impl CrossJoinStream {
             .poll_next_unpin(cx)
             .map(|maybe_batch| match maybe_batch {
                 Some(Ok(batch)) => {
-                    let start = Instant::now();
+                    let join_timer = self.join_metrics.join_time.timer();
                     let result =
                         build_batch(self.left_index, &batch, left_data, 
&self.schema);
-                    self.num_input_batches += 1;
-                    self.num_input_rows += batch.num_rows();
+                    self.join_metrics.input_batches.add(1);
+                    self.join_metrics.input_rows.add(batch.num_rows());
                     if let Ok(ref batch) = result {
-                        self.join_time += start.elapsed().as_millis() as usize;
-                        self.num_output_batches += 1;
-                        self.num_output_rows += batch.num_rows();
+                        join_timer.done();
+                        self.join_metrics.output_batches.add(1);
+                        self.join_metrics.output_rows.add(batch.num_rows());
                     }
                     self.left_index = 1;
 
@@ -434,15 +453,7 @@ impl CrossJoinStream {
                     Some(result)
                 }
                 other => {
-                    debug!(
-                        "Processed {} probe-side input batches containing {} 
rows and \
-                        produced {} output batches containing {} rows in {} 
ms",
-                        self.num_input_batches,
-                        self.num_input_rows,
-                        self.num_output_batches,
-                        self.num_output_rows,
-                        self.join_time
-                    );
+                    self.reservation.lock().free();
                     other
                 }
             })
@@ -452,6 +463,26 @@ impl CrossJoinStream {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::assert_batches_sorted_eq;
+    use crate::common::assert_contains;
+    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+    use crate::physical_plan::common;
+    use crate::prelude::{SessionConfig, SessionContext};
+    use crate::test::{build_table_scan_i32, columns};
+
+    async fn join_collect(
+        left: Arc<dyn ExecutionPlan>,
+        right: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+    ) -> Result<(Vec<String>, Vec<RecordBatch>)> {
+        let join = CrossJoinExec::new(left, right);
+        let columns_header = columns(&join.schema());
+
+        let stream = join.execute(0, context)?;
+        let batches = common::collect(stream).await?;
+
+        Ok((columns_header, batches))
+    }
 
     #[tokio::test]
     async fn test_stats_cartesian_product() {
@@ -589,4 +620,70 @@ mod tests {
 
         assert_eq!(result, expected);
     }
+
+    #[tokio::test]
+    async fn test_join() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+
+        let left = build_table_scan_i32(
+            ("a1", &vec![1, 2, 3]),
+            ("b1", &vec![4, 5, 6]),
+            ("c1", &vec![7, 8, 9]),
+        );
+        let right = build_table_scan_i32(
+            ("a2", &vec![10, 11]),
+            ("b2", &vec![12, 13]),
+            ("c2", &vec![14, 15]),
+        );
+
+        let (columns, batches) = join_collect(left, right, task_ctx).await?;
+
+        assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
+        let expected = vec![
+            "+----+----+----+----+----+----+",
+            "| a1 | b1 | c1 | a2 | b2 | c2 |",
+            "+----+----+----+----+----+----+",
+            "| 1  | 4  | 7  | 10 | 12 | 14 |",
+            "| 1  | 4  | 7  | 11 | 13 | 15 |",
+            "| 2  | 5  | 8  | 10 | 12 | 14 |",
+            "| 2  | 5  | 8  | 11 | 13 | 15 |",
+            "| 3  | 6  | 9  | 10 | 12 | 14 |",
+            "| 3  | 6  | 9  | 11 | 13 | 15 |",
+            "+----+----+----+----+----+----+",
+        ];
+
+        assert_batches_sorted_eq!(expected, &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_overallocation() -> Result<()> {
+        let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
+        let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+        let session_ctx =
+            SessionContext::with_config_rt(SessionConfig::default(), runtime);
+        let task_ctx = session_ctx.task_ctx();
+
+        let left = build_table_scan_i32(
+            ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+            ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+            ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+        );
+        let right = build_table_scan_i32(
+            ("a2", &vec![10, 11]),
+            ("b2", &vec![12, 13]),
+            ("c2", &vec![14, 15]),
+        );
+
+        let err = join_collect(left, right, task_ctx).await.unwrap_err();
+
+        assert_contains!(
+            err.to_string(),
+            "External error: Resources exhausted: Failed to allocate 
additional"
+        );
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs 
b/datafusion/core/src/physical_plan/joins/utils.rs
index b01483f56..2f0267315 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -20,6 +20,7 @@
 use crate::error::{DataFusionError, Result, SharedResult};
 use crate::logical_expr::JoinType;
 use crate::physical_plan::expressions::Column;
+use crate::physical_plan::metrics::{self, ExecutionPlanMetricsSet, 
MetricBuilder};
 use crate::physical_plan::SchemaRef;
 use arrow::array::{
     new_null_array, Array, BooleanBufferBuilder, PrimitiveArray, UInt32Array,
@@ -968,6 +969,68 @@ pub(crate) fn get_semi_u64_indices(
         .collect::<UInt64Array>()
 }
 
+/// Metrics for build & probe joins
+#[derive(Clone, Debug)]
+pub(crate) struct BuildProbeJoinMetrics {
+    /// Total time for collecting build-side of join
+    pub(crate) build_time: metrics::Time,
+    /// Number of batches consumed by build-side
+    pub(crate) build_input_batches: metrics::Count,
+    /// Number of rows consumed by build-side
+    pub(crate) build_input_rows: metrics::Count,
+    /// Memory used by build-side in bytes
+    pub(crate) build_mem_used: metrics::Gauge,
+    /// Total time for joining probe-side batches to the build-side batches
+    pub(crate) join_time: metrics::Time,
+    /// Number of batches consumed by probe-side of this operator
+    pub(crate) input_batches: metrics::Count,
+    /// Number of rows consumed by probe-side this operator
+    pub(crate) input_rows: metrics::Count,
+    /// Number of batches produced by this operator
+    pub(crate) output_batches: metrics::Count,
+    /// Number of rows produced by this operator
+    pub(crate) output_rows: metrics::Count,
+}
+
+impl BuildProbeJoinMetrics {
+    pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+        let join_time = MetricBuilder::new(metrics).subset_time("join_time", 
partition);
+
+        let build_time = MetricBuilder::new(metrics).subset_time("build_time", 
partition);
+
+        let build_input_batches =
+            MetricBuilder::new(metrics).counter("build_input_batches", 
partition);
+
+        let build_input_rows =
+            MetricBuilder::new(metrics).counter("build_input_rows", partition);
+
+        let build_mem_used =
+            MetricBuilder::new(metrics).gauge("build_mem_used", partition);
+
+        let input_batches =
+            MetricBuilder::new(metrics).counter("input_batches", partition);
+
+        let input_rows = MetricBuilder::new(metrics).counter("input_rows", 
partition);
+
+        let output_batches =
+            MetricBuilder::new(metrics).counter("output_batches", partition);
+
+        let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
+        Self {
+            build_time,
+            build_input_batches,
+            build_input_rows,
+            build_mem_used,
+            join_time,
+            input_batches,
+            input_rows,
+            output_batches,
+            output_rows,
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs 
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 8a7ce3ce2..7f13418d2 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
 use std::{any::Any, vec};
 
 use crate::error::{DataFusionError, Result};
-use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use crate::execution::memory_pool::MemoryConsumer;
 use crate::physical_plan::hash_utils::create_hashes;
 use crate::physical_plan::repartition::distributor_channels::channels;
 use crate::physical_plan::{
@@ -37,7 +37,7 @@ use log::debug;
 
 use self::distributor_channels::{DistributionReceiver, DistributionSender};
 
-use super::common::{AbortOnDropMany, AbortOnDropSingle};
+use super::common::{AbortOnDropMany, AbortOnDropSingle, 
SharedMemoryReservation};
 use super::expressions::PhysicalSortExpr;
 use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
 use super::{RecordBatchStream, SendableRecordBatchStream};
@@ -53,7 +53,6 @@ use tokio::task::JoinHandle;
 mod distributor_channels;
 
 type MaybeBatch = Option<Result<RecordBatch>>;
-type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
 
 /// Inner state of [`RepartitionExec`].
 #[derive(Debug)]
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 9dccde8a7..f7113123c 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -26,6 +26,8 @@ use crate::error::Result;
 use crate::from_slice::FromSlice;
 use crate::logical_expr::LogicalPlan;
 use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
+use crate::physical_plan::memory::MemoryExec;
+use crate::physical_plan::ExecutionPlan;
 use crate::test::object_store::local_unpartitioned_file;
 use crate::test_util::{aggr_test_schema, arrow_test_data};
 use array::ArrayRef;
@@ -207,7 +209,7 @@ pub fn assert_fields_eq(plan: &LogicalPlan, expected: 
Vec<&str>) {
     assert_eq!(actual, expected);
 }
 
-/// returns a table with 3 columns of i32 in memory
+/// returns record batch with 3 columns of i32 in memory
 pub fn build_table_i32(
     a: (&str, &Vec<i32>),
     b: (&str, &Vec<i32>),
@@ -230,6 +232,17 @@ pub fn build_table_i32(
     .unwrap()
 }
 
+/// returns memory table scan wrapped around record batch with 3 columns of i32
+pub fn build_table_scan_i32(
+    a: (&str, &Vec<i32>),
+    b: (&str, &Vec<i32>),
+    c: (&str, &Vec<i32>),
+) -> Arc<dyn ExecutionPlan> {
+    let batch = build_table_i32(a, b, c);
+    let schema = batch.schema();
+    Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap())
+}
+
 /// Returns the column names on the schema
 pub fn columns(schema: &Schema) -> Vec<String> {
     schema.fields().iter().map(|f| f.name().clone()).collect()
diff --git a/datafusion/core/tests/memory_limit.rs 
b/datafusion/core/tests/memory_limit.rs
index 170f55903..75e26485d 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -74,6 +74,16 @@ async fn group_by_hash() {
     .await
 }
 
+#[tokio::test]
+async fn cross_join() {
+    run_limit_test(
+        "select t1.* from t t1 CROSS JOIN t t2",
+        "Resources exhausted: Failed to allocate additional",
+        1_000,
+    )
+    .await
+}
+
 /// 50 byte memory limit
 const MEMORY_FRACTION: f64 = 0.95;
 

Reply via email to