This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ea3b965dd Memory reservation & metrics for cross join (#5339)
ea3b965dd is described below
commit ea3b965dd4861f966a5ec1c4a65f748f0e5dcc12
Author: Eduard Karacharov <[email protected]>
AuthorDate: Tue Feb 28 17:33:07 2023 +0300
Memory reservation & metrics for cross join (#5339)
* Memory reservation & metrics for cross join
* memory_limit test & removed fixed error msg from test_overallocation
---
datafusion/core/src/physical_plan/common.rs | 4 +
.../core/src/physical_plan/joins/cross_join.rs | 209 +++++++++++++++------
datafusion/core/src/physical_plan/joins/utils.rs | 63 +++++++
.../core/src/physical_plan/repartition/mod.rs | 5 +-
datafusion/core/src/test/mod.rs | 15 +-
datafusion/core/tests/memory_limit.rs | 10 +
6 files changed, 246 insertions(+), 60 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs
b/datafusion/core/src/physical_plan/common.rs
index caf9b5ebc..68f888210 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -20,6 +20,7 @@
use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
+use crate::execution::memory_pool::MemoryReservation;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan,
Statistics};
use arrow::datatypes::{Schema, SchemaRef};
@@ -28,6 +29,7 @@ use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::PhysicalSortExpr;
use futures::{Future, Stream, StreamExt, TryStreamExt};
use log::debug;
+use parking_lot::Mutex;
use pin_project_lite::pin_project;
use std::fs;
use std::fs::{metadata, File};
@@ -37,6 +39,8 @@ use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
+pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
+
/// Stream of record batches
pub struct SizedRecordBatchStream {
schema: SchemaRef,
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 522cebe0c..0523a2e35 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -26,6 +26,9 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use crate::execution::context::TaskContext;
+use crate::execution::memory_pool::MemoryConsumer;
+use crate::physical_plan::common::SharedMemoryReservation;
+use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
@@ -35,12 +38,11 @@ use crate::physical_plan::{
use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
-use log::debug;
-use std::time::Instant;
+use parking_lot::Mutex;
use super::utils::{
- adjust_right_output_partitioning, cross_join_equivalence_properties,
OnceAsync,
- OnceFut,
+ adjust_right_output_partitioning, cross_join_equivalence_properties,
+ BuildProbeJoinMetrics, OnceAsync, OnceFut,
};
/// Data of the left side
@@ -58,6 +60,8 @@ pub struct CrossJoinExec {
schema: SchemaRef,
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
+ /// Execution plan metrics
+ metrics: ExecutionPlanMetricsSet,
}
impl CrossJoinExec {
@@ -79,6 +83,7 @@ impl CrossJoinExec {
right,
schema,
left_fut: Default::default(),
+ metrics: ExecutionPlanMetricsSet::default(),
}
}
@@ -97,9 +102,9 @@ impl CrossJoinExec {
async fn load_left_input(
left: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
+ metrics: BuildProbeJoinMetrics,
+ reservation: SharedMemoryReservation,
) -> Result<JoinLeftData> {
- let start = Instant::now();
-
// merge all left parts into a single stream
let merge = {
if left.output_partitioning().partition_count() != 1 {
@@ -111,22 +116,28 @@ async fn load_left_input(
let stream = merge.execute(0, context)?;
// Load all batches and count the rows
- let (batches, num_rows) = stream
- .try_fold((Vec::new(), 0usize), |mut acc, batch| async {
- acc.1 += batch.num_rows();
- acc.0.push(batch);
- Ok(acc)
- })
+ let (batches, num_rows, _, _) = stream
+ .try_fold(
+ (Vec::new(), 0usize, metrics, reservation),
+ |mut acc, batch| async {
+ let batch_size = batch.get_array_memory_size();
+ // Reserve memory for incoming batch
+ acc.3.lock().try_grow(batch_size)?;
+ // Update metrics
+ acc.2.build_mem_used.add(batch_size);
+ acc.2.build_input_batches.add(1);
+ acc.2.build_input_rows.add(batch.num_rows());
+ // Update rowcount
+ acc.1 += batch.num_rows();
+ // Push batch to output
+ acc.0.push(batch);
+ Ok(acc)
+ },
+ )
.await?;
let merged_batch = concat_batches(&left.schema(), &batches, num_rows)?;
- debug!(
- "Built build-side of cross join containing {} rows in {} ms",
- num_rows,
- start.elapsed().as_millis()
- );
-
Ok(merged_batch)
}
@@ -143,6 +154,10 @@ impl ExecutionPlan for CrossJoinExec {
vec![self.left.clone(), self.right.clone()]
}
+ fn metrics(&self) -> Option<MetricsSet> {
+ Some(self.metrics.clone_inner())
+ }
+
/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but it its input(s) are
/// infinite, returns an error to indicate this.
@@ -205,9 +220,20 @@ impl ExecutionPlan for CrossJoinExec {
) -> Result<SendableRecordBatchStream> {
let stream = self.right.execute(partition, context.clone())?;
- let left_fut = self
- .left_fut
- .once(|| load_left_input(self.left.clone(), context));
+ let join_metrics = BuildProbeJoinMetrics::new(partition,
&self.metrics);
+ let reservation = Arc::new(Mutex::new(
+ MemoryConsumer::new(format!("CrossJoinStream[{partition}]"))
+ .register(context.memory_pool()),
+ ));
+
+ let left_fut = self.left_fut.once(|| {
+ load_left_input(
+ self.left.clone(),
+ context,
+ join_metrics.clone(),
+ reservation.clone(),
+ )
+ });
Ok(Box::pin(CrossJoinStream {
schema: self.schema.clone(),
@@ -215,11 +241,8 @@ impl ExecutionPlan for CrossJoinExec {
right: stream,
right_batch: Arc::new(parking_lot::Mutex::new(None)),
left_index: 0,
- num_input_batches: 0,
- num_input_rows: 0,
- num_output_batches: 0,
- num_output_rows: 0,
- join_time: 0,
+ join_metrics,
+ reservation,
}))
}
@@ -321,16 +344,10 @@ struct CrossJoinStream {
left_index: usize,
/// Current batch being processed from the right side
right_batch: Arc<parking_lot::Mutex<Option<RecordBatch>>>,
- /// number of input batches
- num_input_batches: usize,
- /// number of input rows
- num_input_rows: usize,
- /// number of batches produced
- num_output_batches: usize,
- /// number of rows produced
- num_output_rows: usize,
- /// total time for joining probe-side batches to the build-side batches
- join_time: usize,
+ /// join execution metrics
+ join_metrics: BuildProbeJoinMetrics,
+ /// memory reservation
+ reservation: SharedMemoryReservation,
}
impl RecordBatchStream for CrossJoinStream {
@@ -385,28 +402,30 @@ impl CrossJoinStream {
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<RecordBatch>>> {
+ let build_timer = self.join_metrics.build_time.timer();
let left_data = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
Err(e) => return Poll::Ready(Some(Err(e))),
};
+ build_timer.done();
if left_data.num_rows() == 0 {
return Poll::Ready(None);
}
if self.left_index > 0 && self.left_index < left_data.num_rows() {
- let start = Instant::now();
+ let join_timer = self.join_metrics.join_time.timer();
let right_batch = {
let right_batch = self.right_batch.lock();
right_batch.clone().unwrap()
};
let result =
build_batch(self.left_index, &right_batch, left_data,
&self.schema);
- self.num_input_rows += right_batch.num_rows();
+ self.join_metrics.input_rows.add(right_batch.num_rows());
if let Ok(ref batch) = result {
- self.join_time += start.elapsed().as_millis() as usize;
- self.num_output_batches += 1;
- self.num_output_rows += batch.num_rows();
+ join_timer.done();
+ self.join_metrics.output_batches.add(1);
+ self.join_metrics.output_rows.add(batch.num_rows());
}
self.left_index += 1;
return Poll::Ready(Some(result));
@@ -416,15 +435,15 @@ impl CrossJoinStream {
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(batch)) => {
- let start = Instant::now();
+ let join_timer = self.join_metrics.join_time.timer();
let result =
build_batch(self.left_index, &batch, left_data,
&self.schema);
- self.num_input_batches += 1;
- self.num_input_rows += batch.num_rows();
+ self.join_metrics.input_batches.add(1);
+ self.join_metrics.input_rows.add(batch.num_rows());
if let Ok(ref batch) = result {
- self.join_time += start.elapsed().as_millis() as usize;
- self.num_output_batches += 1;
- self.num_output_rows += batch.num_rows();
+ join_timer.done();
+ self.join_metrics.output_batches.add(1);
+ self.join_metrics.output_rows.add(batch.num_rows());
}
self.left_index = 1;
@@ -434,15 +453,7 @@ impl CrossJoinStream {
Some(result)
}
other => {
- debug!(
- "Processed {} probe-side input batches containing {}
rows and \
- produced {} output batches containing {} rows in {}
ms",
- self.num_input_batches,
- self.num_input_rows,
- self.num_output_batches,
- self.num_output_rows,
- self.join_time
- );
+ self.reservation.lock().free();
other
}
})
@@ -452,6 +463,26 @@ impl CrossJoinStream {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::assert_batches_sorted_eq;
+ use crate::common::assert_contains;
+ use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+ use crate::physical_plan::common;
+ use crate::prelude::{SessionConfig, SessionContext};
+ use crate::test::{build_table_scan_i32, columns};
+
+ async fn join_collect(
+ left: Arc<dyn ExecutionPlan>,
+ right: Arc<dyn ExecutionPlan>,
+ context: Arc<TaskContext>,
+ ) -> Result<(Vec<String>, Vec<RecordBatch>)> {
+ let join = CrossJoinExec::new(left, right);
+ let columns_header = columns(&join.schema());
+
+ let stream = join.execute(0, context)?;
+ let batches = common::collect(stream).await?;
+
+ Ok((columns_header, batches))
+ }
#[tokio::test]
async fn test_stats_cartesian_product() {
@@ -589,4 +620,70 @@ mod tests {
assert_eq!(result, expected);
}
+
+ #[tokio::test]
+ async fn test_join() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+
+ let left = build_table_scan_i32(
+ ("a1", &vec![1, 2, 3]),
+ ("b1", &vec![4, 5, 6]),
+ ("c1", &vec![7, 8, 9]),
+ );
+ let right = build_table_scan_i32(
+ ("a2", &vec![10, 11]),
+ ("b2", &vec![12, 13]),
+ ("c2", &vec![14, 15]),
+ );
+
+ let (columns, batches) = join_collect(left, right, task_ctx).await?;
+
+ assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b2", "c2"]);
+ let expected = vec![
+ "+----+----+----+----+----+----+",
+ "| a1 | b1 | c1 | a2 | b2 | c2 |",
+ "+----+----+----+----+----+----+",
+ "| 1 | 4 | 7 | 10 | 12 | 14 |",
+ "| 1 | 4 | 7 | 11 | 13 | 15 |",
+ "| 2 | 5 | 8 | 10 | 12 | 14 |",
+ "| 2 | 5 | 8 | 11 | 13 | 15 |",
+ "| 3 | 6 | 9 | 10 | 12 | 14 |",
+ "| 3 | 6 | 9 | 11 | 13 | 15 |",
+ "+----+----+----+----+----+----+",
+ ];
+
+ assert_batches_sorted_eq!(expected, &batches);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_overallocation() -> Result<()> {
+ let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
+ let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+ let session_ctx =
+ SessionContext::with_config_rt(SessionConfig::default(), runtime);
+ let task_ctx = session_ctx.task_ctx();
+
+ let left = build_table_scan_i32(
+ ("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ ("b1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ ("c1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
+ );
+ let right = build_table_scan_i32(
+ ("a2", &vec![10, 11]),
+ ("b2", &vec![12, 13]),
+ ("c2", &vec![14, 15]),
+ );
+
+ let err = join_collect(left, right, task_ctx).await.unwrap_err();
+
+ assert_contains!(
+ err.to_string(),
+ "External error: Resources exhausted: Failed to allocate
additional"
+ );
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs
b/datafusion/core/src/physical_plan/joins/utils.rs
index b01483f56..2f0267315 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -20,6 +20,7 @@
use crate::error::{DataFusionError, Result, SharedResult};
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
+use crate::physical_plan::metrics::{self, ExecutionPlanMetricsSet,
MetricBuilder};
use crate::physical_plan::SchemaRef;
use arrow::array::{
new_null_array, Array, BooleanBufferBuilder, PrimitiveArray, UInt32Array,
@@ -968,6 +969,68 @@ pub(crate) fn get_semi_u64_indices(
.collect::<UInt64Array>()
}
+/// Metrics for build & probe joins
+#[derive(Clone, Debug)]
+pub(crate) struct BuildProbeJoinMetrics {
+ /// Total time for collecting build-side of join
+ pub(crate) build_time: metrics::Time,
+ /// Number of batches consumed by build-side
+ pub(crate) build_input_batches: metrics::Count,
+ /// Number of rows consumed by build-side
+ pub(crate) build_input_rows: metrics::Count,
+ /// Memory used by build-side in bytes
+ pub(crate) build_mem_used: metrics::Gauge,
+ /// Total time for joining probe-side batches to the build-side batches
+ pub(crate) join_time: metrics::Time,
+ /// Number of batches consumed by probe-side of this operator
+ pub(crate) input_batches: metrics::Count,
+ /// Number of rows consumed by probe-side this operator
+ pub(crate) input_rows: metrics::Count,
+ /// Number of batches produced by this operator
+ pub(crate) output_batches: metrics::Count,
+ /// Number of rows produced by this operator
+ pub(crate) output_rows: metrics::Count,
+}
+
+impl BuildProbeJoinMetrics {
+ pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+ let join_time = MetricBuilder::new(metrics).subset_time("join_time",
partition);
+
+ let build_time = MetricBuilder::new(metrics).subset_time("build_time",
partition);
+
+ let build_input_batches =
+ MetricBuilder::new(metrics).counter("build_input_batches",
partition);
+
+ let build_input_rows =
+ MetricBuilder::new(metrics).counter("build_input_rows", partition);
+
+ let build_mem_used =
+ MetricBuilder::new(metrics).gauge("build_mem_used", partition);
+
+ let input_batches =
+ MetricBuilder::new(metrics).counter("input_batches", partition);
+
+ let input_rows = MetricBuilder::new(metrics).counter("input_rows",
partition);
+
+ let output_batches =
+ MetricBuilder::new(metrics).counter("output_batches", partition);
+
+ let output_rows = MetricBuilder::new(metrics).output_rows(partition);
+
+ Self {
+ build_time,
+ build_input_batches,
+ build_input_rows,
+ build_mem_used,
+ join_time,
+ input_batches,
+ input_rows,
+ output_batches,
+ output_rows,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs
b/datafusion/core/src/physical_plan/repartition/mod.rs
index 8a7ce3ce2..7f13418d2 100644
--- a/datafusion/core/src/physical_plan/repartition/mod.rs
+++ b/datafusion/core/src/physical_plan/repartition/mod.rs
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
use std::{any::Any, vec};
use crate::error::{DataFusionError, Result};
-use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
+use crate::execution::memory_pool::MemoryConsumer;
use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::repartition::distributor_channels::channels;
use crate::physical_plan::{
@@ -37,7 +37,7 @@ use log::debug;
use self::distributor_channels::{DistributionReceiver, DistributionSender};
-use super::common::{AbortOnDropMany, AbortOnDropSingle};
+use super::common::{AbortOnDropMany, AbortOnDropSingle,
SharedMemoryReservation};
use super::expressions::PhysicalSortExpr;
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use super::{RecordBatchStream, SendableRecordBatchStream};
@@ -53,7 +53,6 @@ use tokio::task::JoinHandle;
mod distributor_channels;
type MaybeBatch = Option<Result<RecordBatch>>;
-type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
/// Inner state of [`RepartitionExec`].
#[derive(Debug)]
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 9dccde8a7..f7113123c 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -26,6 +26,8 @@ use crate::error::Result;
use crate::from_slice::FromSlice;
use crate::logical_expr::LogicalPlan;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
+use crate::physical_plan::memory::MemoryExec;
+use crate::physical_plan::ExecutionPlan;
use crate::test::object_store::local_unpartitioned_file;
use crate::test_util::{aggr_test_schema, arrow_test_data};
use array::ArrayRef;
@@ -207,7 +209,7 @@ pub fn assert_fields_eq(plan: &LogicalPlan, expected:
Vec<&str>) {
assert_eq!(actual, expected);
}
-/// returns a table with 3 columns of i32 in memory
+/// returns record batch with 3 columns of i32 in memory
pub fn build_table_i32(
a: (&str, &Vec<i32>),
b: (&str, &Vec<i32>),
@@ -230,6 +232,17 @@ pub fn build_table_i32(
.unwrap()
}
+/// returns memory table scan wrapped around record batch with 3 columns of i32
+pub fn build_table_scan_i32(
+ a: (&str, &Vec<i32>),
+ b: (&str, &Vec<i32>),
+ c: (&str, &Vec<i32>),
+) -> Arc<dyn ExecutionPlan> {
+ let batch = build_table_i32(a, b, c);
+ let schema = batch.schema();
+ Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None).unwrap())
+}
+
/// Returns the column names on the schema
pub fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
diff --git a/datafusion/core/tests/memory_limit.rs
b/datafusion/core/tests/memory_limit.rs
index 170f55903..75e26485d 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -74,6 +74,16 @@ async fn group_by_hash() {
.await
}
+#[tokio::test]
+async fn cross_join() {
+ run_limit_test(
+ "select t1.* from t t1 CROSS JOIN t t2",
+ "Resources exhausted: Failed to allocate additional",
+ 1_000,
+ )
+ .await
+}
+
/// 50 byte memory limit
const MEMORY_FRACTION: f64 = 0.95;