alamb commented on code in PR #13133:
URL: https://github.com/apache/datafusion/pull/13133#discussion_r1821387276
##########
datafusion/execution/src/memory_pool/pool.rs:
##########
@@ -58,7 +58,9 @@ impl MemoryPool for UnboundedMemoryPool {
#[derive(Debug)]
pub struct GreedyMemoryPool {
pool_size: usize,
+ pool_size_per_consumer: HashMap<String, usize>,
Review Comment:
Could we please document what `pool_size_per_consumer` means? Specifically,
what happens when the pool size is exceed for that consumer?
I see the docs on ` pub fn with_memory_limit_per_consumer(` but I think
we should also document the semantics on the pool itself.
##########
datafusion/physical-plan/benches/spm.rs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow::record_batch::RecordBatch;
+use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::expressions::col;
+use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_plan::memory::MemoryExec;
+use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+use datafusion_physical_plan::{collect, ExecutionPlan};
+
+use criterion::async_executor::FuturesExecutor;
+use criterion::{black_box, criterion_group, criterion_main, Criterion};
+
+fn generate_spm_for_round_robin_tie_breaker(
Review Comment:
There are already some SortPreservingMerge benchmarks in
https://github.com/apache/datafusion/blob/223bb02fce886b47dc1ac81e2eda2bd3c6d60c3e/datafusion/core/benches/sort.rs#L161,
I recommend consolidating the benchmarks
##########
datafusion/physical-plan/src/sorts/sort_preserving_merge.rs:
##########
@@ -326,18 +329,77 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
+ use arrow_array::Int64Array;
use arrow_schema::SchemaRef;
use datafusion_common::{assert_batches_eq, assert_contains,
DataFusionError};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::config::SessionConfig;
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_execution::RecordBatchStream;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use futures::{FutureExt, Stream, StreamExt};
+ use hashbrown::HashMap;
use tokio::time::timeout;
+ fn generate_task_ctx_for_round_robin_tie_breaker() ->
Result<Arc<TaskContext>> {
+ let mut pool_per_consumer = HashMap::new();
+ // Bytes from 660_000 to 30_000_000 (or even more) are all valid limits
+ pool_per_consumer.insert("RepartitionExec[0]".to_string(), 10_000_000);
+ pool_per_consumer.insert("RepartitionExec[1]".to_string(), 10_000_000);
+
+ let runtime = RuntimeEnvBuilder::new()
+ // Random large number for total mem limit, we only care about
RepartitionExec only
+ .with_memory_limit_per_consumer(2_000_000_000, 1.0,
pool_per_consumer)
+ .build_arc()?;
+ let config = SessionConfig::new();
+ let task_ctx = TaskContext::default()
+ .with_runtime(runtime)
+ .with_session_config(config);
+ Ok(Arc::new(task_ctx))
+ }
+ fn generate_spm_for_round_robin_tie_breaker() ->
Result<Arc<SortPreservingMergeExec>>
+ {
+ let target_batch_size = 12500;
+ let row_size = 12500;
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![1; row_size]));
+ let b: ArrayRef = Arc::new(StringArray::from_iter(vec![Some("a");
row_size]));
+ let c: ArrayRef = Arc::new(Int64Array::from_iter(vec![0; row_size]));
+ let rb = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c",
c)]).unwrap();
+
+ let rbs = (0..1024).map(|_| rb.clone()).collect::<Vec<_>>();
+
+ let schema = rb.schema();
+ let sort = vec![
+ PhysicalSortExpr {
+ expr: col("b", &schema).unwrap(),
+ options: Default::default(),
+ },
+ PhysicalSortExpr {
+ expr: col("c", &schema).unwrap(),
+ options: Default::default(),
+ },
+ ];
+
+ let exec = MemoryExec::try_new(&[rbs], schema, None).unwrap();
+ let repartition_exec =
+ RepartitionExec::try_new(Arc::new(exec),
Partitioning::RoundRobinBatch(2))?;
+ let coalesce_batches_exec =
+ CoalesceBatchesExec::new(Arc::new(repartition_exec),
target_batch_size);
+ let spm = SortPreservingMergeExec::new(sort,
Arc::new(coalesce_batches_exec));
+ Ok(Arc::new(spm))
+ }
+
+ #[tokio::test(flavor = "multi_thread")]
+ async fn test_round_robin_tie_breaker() -> Result<()> {
+ let task_ctx = generate_task_ctx_for_round_robin_tie_breaker()?;
Review Comment:
I didn't understand what this test is testing. It makes and runs a plan, but
doesn't seem to verify:
1. The results
2. That the inputs are evenly polled
I ran the test without the changes in this PR and they failed like
```
called `Result::unwrap()` on an `Err` value:
External(ResourcesExhausted("Additional allocation failed with top memory
consumers (across reservations) as: RepartitionExec[1] consumed 9970224 bytes,
SortPreservingMergeExec[0] consumed 771524 bytes, RepartitionExec[0] consumed
216744 bytes. Error: Failed to allocate additional 216744 bytes for
RepartitionExec[1] with 9970224 bytes already allocated for this reservation -
29776 bytes remain available for the total pool"))
thread 'sorts::sort_preserving_merge::tests::test_round_robin_tie_breaker'
panicked at datafusion/physical-plan/src/sorts/sort_preserving_merge.rs:398:55:
called `Result::unwrap()` on an `Err` value:
External(ResourcesExhausted("Additional allocation failed with top memory
consumers (across reservations) as: RepartitionExec[1] consumed 9970224 bytes,
SortPreservingMergeExec[0] consumed 771524 bytes, RepartitionExec[0] consumed
216744 bytes. Error: Failed to allocate additional 216744 bytes for
RepartitionExec[1] with 9970224 bytes already allocated for this reservation -
29776 bytes remain available for the total pool"))
stack backtrace:
0: rust_begin_unwind
at
/rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/std/src/panicking.rs:662:5
1: core::panicking::panic_fmt
at
/rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/core/src/panicking.rs:74:14
2: core::result::unwrap_failed
at
/rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/core/src/result.rs:1677:5
3: core::result::Result<T,E>::unwrap
at
/rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/core/src/result.rs:1102:23
4:
datafusion_physical_plan::sorts::sort_preserving_merge::tests::test_round_robin_tie_breaker::{{closure}}
at ./src/sorts/sort_preserving_merge.rs:398:26
5: <core::pin::Pin<P> as core::future::future::Future>::poll
at
/rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/core/src/future/future.rs:123:9
6: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/park.rs:281:63
7: tokio::runtime::coop::with_budget
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/coop.rs:107:5
8: tokio::runtime::coop::budget
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/coop.rs:73:5
9: tokio::runtime::park::CachedParkThread::block_on
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/park.rs:281:31
10: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/context/blocking.rs:66:9
11:
tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
12: tokio::runtime::context::runtime::enter_runtime
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/context/runtime.rs:65:16
13: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
14: tokio::runtime::runtime::Runtime::block_on_inner
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/runtime.rs:370:45
15: tokio::runtime::runtime::Runtime::block_on
at
/Users/andrewlamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.41.0/src/runtime/runtime.rs:342:13
16:
datafusion_physical_plan::sorts::sort_preserving_merge::tests::test_round_robin_tie_breaker
at ./src/sorts/sort_preserving_merge.rs:399:9
17:
datafusion_physical_plan::sorts::sort_preserving_merge::tests::test_round_robin_tie_breaker::{{closure}}
at ./src/sorts/sort_preserving_merge.rs:395:48
18: core::ops::function::FnOnce::call_once
at
/rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/core/src/ops/function.rs:250:5
19: core::ops::function::FnOnce::call_once
at
/rustc/f6e511eec7342f59a25f7c0534f1dbea00d01b14/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose
backtrace.
error: test failed, to rerun pass `--lib`
error: 1 target failed:
`--lib`
```
So it seems like it is verifying that the RepartitionExec is not buffering
too much data
Perhaps we can add some comments that explain that (to help people in the
future if they make a change and this test fails on them, they can figure out
if they need to update the constants or change their code)
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -327,16 +388,96 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
self.loser_tree_adjusted = true;
}
- /// Attempts to update the loser tree, following winner replacement, if
possible
+ /// Resets the poll count by incrementing the reset epoch.
+ fn reset_poll_counts(&mut self) {
+ self.current_reset_epoch += 1;
+ }
+
+ /// Handles tie-breaking logic during the adjustment of the loser tree.
+ ///
+ /// When comparing elements from multiple partitions in the
`update_loser_tree` process, a tie can occur
+ /// between the current winner and a challenger. This function is invoked
when such a tie needs to be
+ /// resolved according to the round-robin tie-breaker mode.
+ ///
+ /// If round-robin tie-breaking is not active and the poll counts for all
elements are reset.
+ /// The function then compares the poll counts of the current winner and
the challenger:
+ /// - If the winner remains at the top after the final comparison, it
increments the winner's poll count.
+ /// - If the challenger has a lower poll count than the current winner,
the challenger becomes the new winner.
+ /// - If the poll counts are equal but the challenger's index is smaller,
the challenger is preferred.
+ ///
+ /// # Parameters
+ /// - `cmp_node`: The index of the comparison node in the loser tree where
the tie-breaking is happening.
+ /// - `winner`: A mutable reference to the current winner, which may be
updated based on the tie-breaking result.
+ /// - `challenger`: The index of the challenger being compared against the
winner.
+ ///
+ /// This function ensures fair selection among elements with equal values
+ /// aiming to balance the polling across different partitions.
+ #[inline]
+ fn handle_tie(&mut self, cmp_node: usize, winner: &mut usize, challenger:
usize) {
+ if !self.round_robin_tie_breaker_mode {
+ self.round_robin_tie_breaker_mode = true;
+ // Reset poll count for tie-breaker
+ self.reset_poll_counts();
+ }
+ // Update poll count if the winner survives in the final match
+ if *winner == self.loser_tree[0] {
+ self.update_poll_count_on_the_same_value(*winner);
+ if self.is_poll_count_gt(*winner, challenger) {
+ self.update_winner(cmp_node, winner, challenger);
+ }
+ } else if challenger < *winner {
+ // If the winner doesn't surivie in the final match, it means the
value has changed,
+ // The polls count are outdated (because the value advanced) but
not yet cleanup at this point.
+ // Given the value is equal, we choose the smaller index if the
value is the same.
+ self.update_winner(cmp_node, winner, challenger);
+ }
+ }
+
+ /// Updates the loser tree to reflect the new winner after the previous
winner is consumed.
+ /// This function adjusts the tree by comparing the current winner with
challengers from
+ /// other partitions.
+ ///
+ /// If a tie occurs at the final level, the
+ /// tie-breaker logic will be applied to ensure fair selection among equal
elements.
fn update_loser_tree(&mut self) {
+ // Start with the current winner
let mut winner = self.loser_tree[0];
- // Replace overall winner by walking tree of losers
+
+ // Find the leaf node index of the winner in the loser tree.
let mut cmp_node = self.lt_leaf_node_index(winner);
+
+ // Traverse up the tree to adjust comparisons until reaching the root.
while cmp_node != 0 {
let challenger = self.loser_tree[cmp_node];
- if self.is_gt(winner, challenger) {
- self.loser_tree[cmp_node] = winner;
- winner = challenger;
+ // If we're at the final comparison (cmp_node == 1)
+ if cmp_node == 1 {
+ match (&self.cursors[winner], &self.cursors[challenger]) {
Review Comment:
My understanding is that while this adds code to the hit inner loop (loser
tree update) it is only invoked when there is a tie, so unless there are many
ties this will not impact performance
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -327,16 +388,96 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
self.loser_tree_adjusted = true;
}
- /// Attempts to update the loser tree, following winner replacement, if
possible
+ /// Resets the poll count by incrementing the reset epoch.
+ fn reset_poll_counts(&mut self) {
+ self.current_reset_epoch += 1;
+ }
+
+ /// Handles tie-breaking logic during the adjustment of the loser tree.
+ ///
+ /// When comparing elements from multiple partitions in the
`update_loser_tree` process, a tie can occur
+ /// between the current winner and a challenger. This function is invoked
when such a tie needs to be
+ /// resolved according to the round-robin tie-breaker mode.
+ ///
+ /// If round-robin tie-breaking is not active and the poll counts for all
elements are reset.
+ /// The function then compares the poll counts of the current winner and
the challenger:
+ /// - If the winner remains at the top after the final comparison, it
increments the winner's poll count.
+ /// - If the challenger has a lower poll count than the current winner,
the challenger becomes the new winner.
+ /// - If the poll counts are equal but the challenger's index is smaller,
the challenger is preferred.
+ ///
+ /// # Parameters
+ /// - `cmp_node`: The index of the comparison node in the loser tree where
the tie-breaking is happening.
+ /// - `winner`: A mutable reference to the current winner, which may be
updated based on the tie-breaking result.
+ /// - `challenger`: The index of the challenger being compared against the
winner.
+ ///
+ /// This function ensures fair selection among elements with equal values
+ /// aiming to balance the polling across different partitions.
+ #[inline]
+ fn handle_tie(&mut self, cmp_node: usize, winner: &mut usize, challenger:
usize) {
+ if !self.round_robin_tie_breaker_mode {
+ self.round_robin_tie_breaker_mode = true;
+ // Reset poll count for tie-breaker
+ self.reset_poll_counts();
+ }
+ // Update poll count if the winner survives in the final match
+ if *winner == self.loser_tree[0] {
+ self.update_poll_count_on_the_same_value(*winner);
+ if self.is_poll_count_gt(*winner, challenger) {
+ self.update_winner(cmp_node, winner, challenger);
+ }
+ } else if challenger < *winner {
+ // If the winner doesn't surivie in the final match, it means the
value has changed,
+ // The polls count are outdated (because the value advanced) but
not yet cleanup at this point.
+ // Given the value is equal, we choose the smaller index if the
value is the same.
+ self.update_winner(cmp_node, winner, challenger);
+ }
+ }
+
+ /// Updates the loser tree to reflect the new winner after the previous
winner is consumed.
+ /// This function adjusts the tree by comparing the current winner with
challengers from
+ /// other partitions.
+ ///
+ /// If a tie occurs at the final level, the
+ /// tie-breaker logic will be applied to ensure fair selection among equal
elements.
fn update_loser_tree(&mut self) {
+ // Start with the current winner
let mut winner = self.loser_tree[0];
- // Replace overall winner by walking tree of losers
+
+ // Find the leaf node index of the winner in the loser tree.
let mut cmp_node = self.lt_leaf_node_index(winner);
+
+ // Traverse up the tree to adjust comparisons until reaching the root.
while cmp_node != 0 {
let challenger = self.loser_tree[cmp_node];
- if self.is_gt(winner, challenger) {
- self.loser_tree[cmp_node] = winner;
- winner = challenger;
+ // If we're at the final comparison (cmp_node == 1)
Review Comment:
I wonder if there is some way to encapsulate this `1 == final` somehow. As a
follow on PR maybe we can wrap it like
```rust
struct LoserTree {
inner: Vec<usize>
}
```
And add some methods like `is_final_comparison` or something 🤔
##########
datafusion/physical-plan/src/sorts/sort_preserving_merge.rs:
##########
@@ -326,18 +329,77 @@ mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
+ use arrow_array::Int64Array;
use arrow_schema::SchemaRef;
use datafusion_common::{assert_batches_eq, assert_contains,
DataFusionError};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::config::SessionConfig;
+ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_execution::RecordBatchStream;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use futures::{FutureExt, Stream, StreamExt};
+ use hashbrown::HashMap;
use tokio::time::timeout;
+ fn generate_task_ctx_for_round_robin_tie_breaker() ->
Result<Arc<TaskContext>> {
+ let mut pool_per_consumer = HashMap::new();
+ // Bytes from 660_000 to 30_000_000 (or even more) are all valid limits
+ pool_per_consumer.insert("RepartitionExec[0]".to_string(), 10_000_000);
+ pool_per_consumer.insert("RepartitionExec[1]".to_string(), 10_000_000);
+
+ let runtime = RuntimeEnvBuilder::new()
+ // Random large number for total mem limit, we only care about
RepartitionExec only
+ .with_memory_limit_per_consumer(2_000_000_000, 1.0,
pool_per_consumer)
+ .build_arc()?;
+ let config = SessionConfig::new();
+ let task_ctx = TaskContext::default()
+ .with_runtime(runtime)
+ .with_session_config(config);
+ Ok(Arc::new(task_ctx))
+ }
+ fn generate_spm_for_round_robin_tie_breaker() ->
Result<Arc<SortPreservingMergeExec>>
+ {
+ let target_batch_size = 12500;
Review Comment:
WHy are 12500 rows per batch needed? I am curious why this isn't the default
of 8K
##########
datafusion/physical-plan/src/sorts/merge.rs:
##########
@@ -97,6 +97,23 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues>
{
/// Cursors for each input partition. `None` means the input is exhausted
cursors: Vec<Option<Cursor<C>>>,
+ /// Flag indicating whether we are in the mode of round-robin
Review Comment:
Would it be possible to add some more long form documentation on the
rationale / design of this polling mechanism? I think having the rationale in
the code would make this easier to understand/read for future readers.
In particular, perhaps we can adapt the (very nice) content in the
`Rationale` section into the comments of `SortPreservingMergeStream`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]