milenkovicm commented on code in PR #1684:
URL:
https://github.com/apache/datafusion-ballista/pull/1684#discussion_r3252526074
##########
ballista/core/src/config.rs:
##########
@@ -178,7 +193,38 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String,
ConfigEntry>> = LazyLock::new(||
ConfigEntry::new(BALLISTA_CLIENT_IO_RETRY_WAIT_TIME_MS.to_string(),
"Wait time in milliseconds between IO retries in the
Ballista client.".to_string(),
DataType::UInt64,
- Some(3000.to_string()))
+ Some(3000.to_string())),
+ ConfigEntry::new(BALLISTA_COALESCE_ENABLED.to_string(),
+ "Enables the AQE coalesce-shuffle-partitions rule. \
+ Disabled by default — opt in when fewer/larger \
+ downstream tasks matter more than
parallelism.".to_string(),
+ DataType::Boolean,
+ Some(false.to_string())),
+ ConfigEntry::new(
+ BALLISTA_COALESCE_TARGET_PARTITION_BYTES.to_string(),
+ "Target post-coalesce partition byte size in bytes. Mirrors
Spark's \
+ advisoryPartitionSizeInBytes."
+ .to_string(),
+ DataType::UInt64,
+ Some((64 * 1024 * 1024_usize).to_string()),
+ ),
+ ConfigEntry::new(
+ BALLISTA_COALESCE_SMALL_PARTITION_FACTOR.to_string(),
+ "Small-partition merge factor (Spark legacy). Stored as Utf8
because \
+ BallistaConfig::parse_value does not support Float64; parsed back
via \
+ f64::from_str in the accessor."
+ .to_string(),
+ DataType::Utf8,
Review Comment:
should it be float?
##########
ballista/scheduler/src/state/aqe/planner.rs:
##########
@@ -296,8 +298,13 @@ impl AdaptivePlanner {
let (stage_ids, shuffle_writers) = stages
.into_iter()
.map(|plan| {
- // TODO: we need to find input stages for given stage
- // thus result should change
+ // Run the coalesce rule per-stage: the root of `plan`
is
+ // the stage's wrapper exchange, so the rule's walker
sees
+ // only this stage's input exchanges as the alignment
+ // group. This avoids cross-stage gluing and stale
state
+ // that would arise if the rule walked the entire
residual
+ // plan in `default_optimizers()`.
+ let plan = CoalescePartitionsRule.optimize(plan,
config)?;
Review Comment:
Current AQE approach with centralised rules execution has its downsides, it
probably bends use of rules in a way which is not really intended and making
rules overcomplicated.
We had an issue before where running a rule detached from other rules
creates invalid plan as rules which could "fix" the plan do not run, you
probably remember.
A corner case I see is when coalesce partition produce single partition
output. In that case rest of the plan could probably be re-optimised due to
fact there is single partition now. We would miss that opertunity with this
approach, as the fact there is single partition may be catched in next "full"
optimizer pass (next stage generatio)
having said that, at the moment i dont have better proposal
##########
ballista/core/src/config.rs:
##########
@@ -86,6 +86,21 @@ pub const
BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT_PER_TASK_BYTES: &str =
pub const BALLISTA_BROADCAST_JOIN_THRESHOLD_BYTES: &str =
"ballista.optimizer.broadcast_join_threshold_bytes";
+/// Configuration key to enable AQE coalesce-shuffle-partitions rule.
+/// Disabled by default — opt in when the workload benefits from larger
+/// downstream tasks more than from preserved parallelism.
+pub const BALLISTA_COALESCE_ENABLED: &str = "ballista.coalesce.enabled";
Review Comment:
can we prefix configuration values with `ballista.planner.`
(`ballista.planner.coalesce...`)?
##########
ballista/core/src/config.rs:
##########
@@ -178,7 +193,38 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String,
ConfigEntry>> = LazyLock::new(||
ConfigEntry::new(BALLISTA_CLIENT_IO_RETRY_WAIT_TIME_MS.to_string(),
"Wait time in milliseconds between IO retries in the
Ballista client.".to_string(),
DataType::UInt64,
- Some(3000.to_string()))
+ Some(3000.to_string())),
+ ConfigEntry::new(BALLISTA_COALESCE_ENABLED.to_string(),
+ "Enables the AQE coalesce-shuffle-partitions rule. \
+ Disabled by default — opt in when fewer/larger \
+ downstream tasks matter more than
parallelism.".to_string(),
+ DataType::Boolean,
+ Some(false.to_string())),
+ ConfigEntry::new(
+ BALLISTA_COALESCE_TARGET_PARTITION_BYTES.to_string(),
+ "Target post-coalesce partition byte size in bytes. Mirrors
Spark's \
+ advisoryPartitionSizeInBytes."
+ .to_string(),
+ DataType::UInt64,
+ Some((64 * 1024 * 1024_usize).to_string()),
+ ),
+ ConfigEntry::new(
+ BALLISTA_COALESCE_SMALL_PARTITION_FACTOR.to_string(),
+ "Small-partition merge factor (Spark legacy). Stored as Utf8
because \
+ BallistaConfig::parse_value does not support Float64; parsed back
via \
+ f64::from_str in the accessor."
+ .to_string(),
+ DataType::Utf8,
+ Some("0.2".to_string()),
+ ),
+ ConfigEntry::new(
+ BALLISTA_COALESCE_MERGED_PARTITION_FACTOR.to_string(),
+ "Merged-partition early-flush factor (Spark legacy). Stored as
Utf8 — \
+ see small_partition_factor for rationale."
+ .to_string(),
+ DataType::Utf8,
Review Comment:
should it be float
##########
ballista/core/src/config.rs:
##########
@@ -383,6 +429,36 @@ impl BallistaConfig {
self.get_usize_setting(BALLISTA_BROADCAST_JOIN_THRESHOLD_BYTES)
}
+ /// Returns whether the AQE coalesce-shuffle-partitions rule is enabled.
+ pub fn coalesce_enabled(&self) -> bool {
+ self.get_bool_setting(BALLISTA_COALESCE_ENABLED)
+ }
+
+ /// Returns the target post-coalesce partition byte size in bytes
+ /// (Spark's `advisoryPartitionSizeInBytes`).
+ pub fn coalesce_target_partition_bytes(&self) -> u64 {
+ self.get_usize_setting(BALLISTA_COALESCE_TARGET_PARTITION_BYTES) as u64
+ }
+
+ /// Returns the small-partition merge factor (Spark legacy).
+ ///
+ /// Stored as Utf8 in CONFIG_ENTRIES because BallistaConfig::parse_value
does not
+ /// support DataType::Float64. Falls back to 0.2 if the stored string
fails to parse.
+ pub fn coalesce_small_partition_factor(&self) -> f64 {
+ self.get_string_setting(BALLISTA_COALESCE_SMALL_PARTITION_FACTOR)
Review Comment:
maybe we should extract get_flot_setting?
##########
ballista/core/src/config.rs:
##########
@@ -178,7 +193,38 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String,
ConfigEntry>> = LazyLock::new(||
ConfigEntry::new(BALLISTA_CLIENT_IO_RETRY_WAIT_TIME_MS.to_string(),
"Wait time in milliseconds between IO retries in the
Ballista client.".to_string(),
DataType::UInt64,
- Some(3000.to_string()))
+ Some(3000.to_string())),
+ ConfigEntry::new(BALLISTA_COALESCE_ENABLED.to_string(),
+ "Enables the AQE coalesce-shuffle-partitions rule. \
+ Disabled by default — opt in when fewer/larger \
+ downstream tasks matter more than
parallelism.".to_string(),
+ DataType::Boolean,
+ Some(false.to_string())),
+ ConfigEntry::new(
+ BALLISTA_COALESCE_TARGET_PARTITION_BYTES.to_string(),
+ "Target post-coalesce partition byte size in bytes. Mirrors
Spark's \
+ advisoryPartitionSizeInBytes."
+ .to_string(),
+ DataType::UInt64,
+ Some((64 * 1024 * 1024_usize).to_string()),
+ ),
+ ConfigEntry::new(
+ BALLISTA_COALESCE_SMALL_PARTITION_FACTOR.to_string(),
+ "Small-partition merge factor (Spark legacy). Stored as Utf8
because \
+ BallistaConfig::parse_value does not support Float64; parsed back
via \
+ f64::from_str in the accessor."
+ .to_string(),
+ DataType::Utf8,
Review Comment:
probably not
##########
ballista/scheduler/src/state/aqe/optimizer_rule/coalesce_partitions.rs:
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! AQE rule that coalesces shuffle partitions after upstream stages finalize.
+//!
+//! [`CoalescePartitionsRule`] runs once per `replan_stages()` pass on a
+//! stage subtree whose root is either an [`ExchangeExec`] (intermediate
+//! stage) or an [`AdaptiveDatafusionExec`] (final stage). The rule walks the
+//! subtree, collects every leaf [`ExchangeExec`] — the resolved upstream
+//! shuffles feeding this stage — and decides whether to coalesce.
+//!
+//! # The alignment group
+//!
+//! Every leaf `ExchangeExec` in a single stage subtree forms one **alignment
+//! group**. Why one group, not one decision per leaf?
+//!
+//! - Hash-partitioned joins (`HashJoinExec(Partitioned)`, `SortMergeJoinExec`)
+//! require their two inputs to have the *same partition count* and to be
+//! hash-partitioned on the join key. If we coalesced left to `K=4` and
+//! right to `K=2`, DataFusion's `EnforceDistribution` would either reject
+//! the plan or insert remediation repartitions that undo the optimization.
+//! - Both join legs read shuffle output from upstream stages that wrote
+//! `M` partitions using the *same* hash function on the *same* key
+//! (that's what made them joinable in the first place). So upstream
+//! partition `i` of the left and upstream partition `i` of the right
+//! hold rows that must meet at downstream partition `f(i)`. Coalescing
+//! them with the *same* mapping `i → group(i)` keeps that meeting point
+//! consistent; coalescing them with different mappings scatters it.
+//!
+//! Practically: we treat all leaf Exchanges as a single workload, sum their
+//! per-partition byte counts element-wise, bin-pack the summed sizes once,
+//! and attach the *same* `CoalescePlan` to every leaf. Joins with two leaves
+//! and chains of joins with three or more leaves all go through the same
+//! code path — there is no per-leaf decision.
+//!
+//! Concretely for `[25; 8]` bytes per partition on both sides of a join:
+//! summed `[50; 8]`, bin-pack at target `200` produces `K=2` (4 upstream
+//! partitions per group), both leaves get `coalesce=2 of 8`, the downstream
+//! join runs with 2 partitions on each side, hash buckets stay aligned.
+//!
+//! # Default off
+//!
+//! `ballista.coalesce.enabled` is `false` by default. The rule is an opt-in
+//! trade — coalescing reduces task overhead and IPC cost, but at the price
+//! of less downstream parallelism. Users who want the trade explicitly turn
+//! the rule on. When off, the rule short-circuits at the first statement
+//! of `optimize()` and the plan flows through untouched.
+//!
+//! Conceptually:
+//! - `coalesce.enabled=false` (default) ≈ Spark's `parallelismFirst=true`
+//! outcome — partitions preserved, no packing.
+//! - `coalesce.enabled=true` (opt-in) ≈ Spark's `parallelismFirst=false`
+//! outcome — pack toward the advisory target, accept fewer/larger tasks.
+//!
+//! # Algorithm
+//!
+//! 1. Find leaf `ExchangeExec`s — the alignment group. If empty, this
+//! stage reads from scans and has nothing to coalesce.
+//! 2. All leaves share the upstream partition count `M` (the writer side).
+//! 3. Sum per-partition byte sizes element-wise across the group to get
+//! combined work per upstream index.
+//! 4. Bin-pack the summed sizes into `K` buckets near
+//! `target_partition_bytes` (Spark's `advisoryPartitionSizeInBytes`,
+//! 64 MB by default) using `split_size_list_by_target_size`.
+//! 5. If `K >= M` or `K <= 1`, the rewrite is degenerate and is skipped.
+//! 6. Otherwise, attach a shared [`CoalescePlan`] (with `K` partition
+//! groups) to every leaf `ExchangeExec` via `set_coalesce(..)`. The
+//! adapter consumes that decision when it builds the downstream
+//! `ShuffleReaderExec`s.
+//!
+//! # Carrier semantics
+//!
+//! The `CoalescePlan` lives on the upstream `ExchangeExec`; the rule does
+//! not rewrite the plan tree. Idempotency is structural — `set_coalesce`
+//! overwrites the slot with an equivalent plan on re-entry, and the
+//! bin-pack is a pure function of the resolved byte sizes, so the second
+//! pass produces the same decision.
+//!
Review Comment:
maybe we should note that it will binpack neighboring partitions only
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]