milenkovicm commented on code in PR #1684:
URL: 
https://github.com/apache/datafusion-ballista/pull/1684#discussion_r3252526074


##########
ballista/core/src/config.rs:
##########
@@ -178,7 +193,38 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, 
ConfigEntry>> = LazyLock::new(||
         ConfigEntry::new(BALLISTA_CLIENT_IO_RETRY_WAIT_TIME_MS.to_string(),
                          "Wait time in milliseconds between IO retries in the 
Ballista client.".to_string(),
                          DataType::UInt64,
-                         Some(3000.to_string()))
+                         Some(3000.to_string())),
+        ConfigEntry::new(BALLISTA_COALESCE_ENABLED.to_string(),
+                         "Enables the AQE coalesce-shuffle-partitions rule. \
+                          Disabled by default — opt in when fewer/larger \
+                          downstream tasks matter more than 
parallelism.".to_string(),
+                         DataType::Boolean,
+                         Some(false.to_string())),
+        ConfigEntry::new(
+            BALLISTA_COALESCE_TARGET_PARTITION_BYTES.to_string(),
+            "Target post-coalesce partition byte size in bytes. Mirrors 
Spark's \
+             advisoryPartitionSizeInBytes."
+                .to_string(),
+            DataType::UInt64,
+            Some((64 * 1024 * 1024_usize).to_string()),
+        ),
+        ConfigEntry::new(
+            BALLISTA_COALESCE_SMALL_PARTITION_FACTOR.to_string(),
+            "Small-partition merge factor (Spark legacy). Stored as Utf8 
because \
+             BallistaConfig::parse_value does not support Float64; parsed back 
via \
+             f64::from_str in the accessor."
+                .to_string(),
+            DataType::Utf8,

Review Comment:
   should it be float?



##########
ballista/scheduler/src/state/aqe/planner.rs:
##########
@@ -296,8 +298,13 @@ impl AdaptivePlanner {
                 let (stage_ids, shuffle_writers) = stages
                     .into_iter()
                     .map(|plan| {
-                        // TODO: we need to find input stages for given stage
-                        //       thus result should change
+                        // Run the coalesce rule per-stage: the root of `plan` 
is
+                        // the stage's wrapper exchange, so the rule's walker 
sees
+                        // only this stage's input exchanges as the alignment
+                        // group. This avoids cross-stage gluing and stale 
state
+                        // that would arise if the rule walked the entire 
residual
+                        // plan in `default_optimizers()`.
+                        let plan = CoalescePartitionsRule.optimize(plan, 
config)?;

Review Comment:
   Current AQE approach with centralised rules execution has its downsides, it 
probably bends use of rules in a way which is not really intended and making 
rules overcomplicated. 
   
   We had an issue before where running a rule detached from other rules 
creates invalid plan as rules which could "fix" the plan do not run, you 
probably remember.
   
   A corner case I see is when coalesce partition produce single partition 
output. In that case rest of the plan could probably be re-optimised due to 
fact there is single partition now. We would miss that opertunity with this 
approach, as the fact there is single partition may be catched in next "full" 
optimizer pass (next stage generatio)
   
   having said that, at the moment i dont have better proposal 



##########
ballista/core/src/config.rs:
##########
@@ -86,6 +86,21 @@ pub const 
BALLISTA_SHUFFLE_SORT_BASED_MEMORY_LIMIT_PER_TASK_BYTES: &str =
 pub const BALLISTA_BROADCAST_JOIN_THRESHOLD_BYTES: &str =
     "ballista.optimizer.broadcast_join_threshold_bytes";
 
+/// Configuration key to enable AQE coalesce-shuffle-partitions rule.
+/// Disabled by default — opt in when the workload benefits from larger
+/// downstream tasks more than from preserved parallelism.
+pub const BALLISTA_COALESCE_ENABLED: &str = "ballista.coalesce.enabled";

Review Comment:
   can we prefix configuration values with `ballista.planner.` 
(`ballista.planner.coalesce...`)?



##########
ballista/core/src/config.rs:
##########
@@ -178,7 +193,38 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, 
ConfigEntry>> = LazyLock::new(||
         ConfigEntry::new(BALLISTA_CLIENT_IO_RETRY_WAIT_TIME_MS.to_string(),
                          "Wait time in milliseconds between IO retries in the 
Ballista client.".to_string(),
                          DataType::UInt64,
-                         Some(3000.to_string()))
+                         Some(3000.to_string())),
+        ConfigEntry::new(BALLISTA_COALESCE_ENABLED.to_string(),
+                         "Enables the AQE coalesce-shuffle-partitions rule. \
+                          Disabled by default — opt in when fewer/larger \
+                          downstream tasks matter more than 
parallelism.".to_string(),
+                         DataType::Boolean,
+                         Some(false.to_string())),
+        ConfigEntry::new(
+            BALLISTA_COALESCE_TARGET_PARTITION_BYTES.to_string(),
+            "Target post-coalesce partition byte size in bytes. Mirrors 
Spark's \
+             advisoryPartitionSizeInBytes."
+                .to_string(),
+            DataType::UInt64,
+            Some((64 * 1024 * 1024_usize).to_string()),
+        ),
+        ConfigEntry::new(
+            BALLISTA_COALESCE_SMALL_PARTITION_FACTOR.to_string(),
+            "Small-partition merge factor (Spark legacy). Stored as Utf8 
because \
+             BallistaConfig::parse_value does not support Float64; parsed back 
via \
+             f64::from_str in the accessor."
+                .to_string(),
+            DataType::Utf8,
+            Some("0.2".to_string()),
+        ),
+        ConfigEntry::new(
+            BALLISTA_COALESCE_MERGED_PARTITION_FACTOR.to_string(),
+            "Merged-partition early-flush factor (Spark legacy). Stored as 
Utf8 — \
+             see small_partition_factor for rationale."
+                .to_string(),
+            DataType::Utf8,

Review Comment:
   should it be float



##########
ballista/core/src/config.rs:
##########
@@ -383,6 +429,36 @@ impl BallistaConfig {
         self.get_usize_setting(BALLISTA_BROADCAST_JOIN_THRESHOLD_BYTES)
     }
 
+    /// Returns whether the AQE coalesce-shuffle-partitions rule is enabled.
+    pub fn coalesce_enabled(&self) -> bool {
+        self.get_bool_setting(BALLISTA_COALESCE_ENABLED)
+    }
+
+    /// Returns the target post-coalesce partition byte size in bytes
+    /// (Spark's `advisoryPartitionSizeInBytes`).
+    pub fn coalesce_target_partition_bytes(&self) -> u64 {
+        self.get_usize_setting(BALLISTA_COALESCE_TARGET_PARTITION_BYTES) as u64
+    }
+
+    /// Returns the small-partition merge factor (Spark legacy).
+    ///
+    /// Stored as Utf8 in CONFIG_ENTRIES because BallistaConfig::parse_value 
does not
+    /// support DataType::Float64. Falls back to 0.2 if the stored string 
fails to parse.
+    pub fn coalesce_small_partition_factor(&self) -> f64 {
+        self.get_string_setting(BALLISTA_COALESCE_SMALL_PARTITION_FACTOR)

Review Comment:
   maybe we should extract get_flot_setting?



##########
ballista/core/src/config.rs:
##########
@@ -178,7 +193,38 @@ static CONFIG_ENTRIES: LazyLock<HashMap<String, 
ConfigEntry>> = LazyLock::new(||
         ConfigEntry::new(BALLISTA_CLIENT_IO_RETRY_WAIT_TIME_MS.to_string(),
                          "Wait time in milliseconds between IO retries in the 
Ballista client.".to_string(),
                          DataType::UInt64,
-                         Some(3000.to_string()))
+                         Some(3000.to_string())),
+        ConfigEntry::new(BALLISTA_COALESCE_ENABLED.to_string(),
+                         "Enables the AQE coalesce-shuffle-partitions rule. \
+                          Disabled by default — opt in when fewer/larger \
+                          downstream tasks matter more than 
parallelism.".to_string(),
+                         DataType::Boolean,
+                         Some(false.to_string())),
+        ConfigEntry::new(
+            BALLISTA_COALESCE_TARGET_PARTITION_BYTES.to_string(),
+            "Target post-coalesce partition byte size in bytes. Mirrors 
Spark's \
+             advisoryPartitionSizeInBytes."
+                .to_string(),
+            DataType::UInt64,
+            Some((64 * 1024 * 1024_usize).to_string()),
+        ),
+        ConfigEntry::new(
+            BALLISTA_COALESCE_SMALL_PARTITION_FACTOR.to_string(),
+            "Small-partition merge factor (Spark legacy). Stored as Utf8 
because \
+             BallistaConfig::parse_value does not support Float64; parsed back 
via \
+             f64::from_str in the accessor."
+                .to_string(),
+            DataType::Utf8,

Review Comment:
   probably not 



##########
ballista/scheduler/src/state/aqe/optimizer_rule/coalesce_partitions.rs:
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! AQE rule that coalesces shuffle partitions after upstream stages finalize.
+//!
+//! [`CoalescePartitionsRule`] runs once per `replan_stages()` pass on a
+//! stage subtree whose root is either an [`ExchangeExec`] (intermediate
+//! stage) or an [`AdaptiveDatafusionExec`] (final stage). The rule walks the
+//! subtree, collects every leaf [`ExchangeExec`] — the resolved upstream
+//! shuffles feeding this stage — and decides whether to coalesce.
+//!
+//! # The alignment group
+//!
+//! Every leaf `ExchangeExec` in a single stage subtree forms one **alignment
+//! group**. Why one group, not one decision per leaf?
+//!
+//! - Hash-partitioned joins (`HashJoinExec(Partitioned)`, `SortMergeJoinExec`)
+//!   require their two inputs to have the *same partition count* and to be
+//!   hash-partitioned on the join key. If we coalesced left to `K=4` and
+//!   right to `K=2`, DataFusion's `EnforceDistribution` would either reject
+//!   the plan or insert remediation repartitions that undo the optimization.
+//! - Both join legs read shuffle output from upstream stages that wrote
+//!   `M` partitions using the *same* hash function on the *same* key
+//!   (that's what made them joinable in the first place). So upstream
+//!   partition `i` of the left and upstream partition `i` of the right
+//!   hold rows that must meet at downstream partition `f(i)`. Coalescing
+//!   them with the *same* mapping `i → group(i)` keeps that meeting point
+//!   consistent; coalescing them with different mappings scatters it.
+//!
+//! Practically: we treat all leaf Exchanges as a single workload, sum their
+//! per-partition byte counts element-wise, bin-pack the summed sizes once,
+//! and attach the *same* `CoalescePlan` to every leaf. Joins with two leaves
+//! and chains of joins with three or more leaves all go through the same
+//! code path — there is no per-leaf decision.
+//!
+//! Concretely for `[25; 8]` bytes per partition on both sides of a join:
+//! summed `[50; 8]`, bin-pack at target `200` produces `K=2` (4 upstream
+//! partitions per group), both leaves get `coalesce=2 of 8`, the downstream
+//! join runs with 2 partitions on each side, hash buckets stay aligned.
+//!
+//! # Default off
+//!
+//! `ballista.coalesce.enabled` is `false` by default. The rule is an opt-in
+//! trade — coalescing reduces task overhead and IPC cost, but at the price
+//! of less downstream parallelism. Users who want the trade explicitly turn
+//! the rule on. When off, the rule short-circuits at the first statement
+//! of `optimize()` and the plan flows through untouched.
+//!
+//! Conceptually:
+//!   - `coalesce.enabled=false` (default) ≈ Spark's `parallelismFirst=true`
+//!     outcome — partitions preserved, no packing.
+//!   - `coalesce.enabled=true` (opt-in) ≈ Spark's `parallelismFirst=false`
+//!     outcome — pack toward the advisory target, accept fewer/larger tasks.
+//!
+//! # Algorithm
+//!
+//!   1. Find leaf `ExchangeExec`s — the alignment group. If empty, this
+//!      stage reads from scans and has nothing to coalesce.
+//!   2. All leaves share the upstream partition count `M` (the writer side).
+//!   3. Sum per-partition byte sizes element-wise across the group to get
+//!      combined work per upstream index.
+//!   4. Bin-pack the summed sizes into `K` buckets near
+//!      `target_partition_bytes` (Spark's `advisoryPartitionSizeInBytes`,
+//!      64 MB by default) using `split_size_list_by_target_size`.
+//!   5. If `K >= M` or `K <= 1`, the rewrite is degenerate and is skipped.
+//!   6. Otherwise, attach a shared [`CoalescePlan`] (with `K` partition
+//!      groups) to every leaf `ExchangeExec` via `set_coalesce(..)`. The
+//!      adapter consumes that decision when it builds the downstream
+//!      `ShuffleReaderExec`s.
+//!
+//! # Carrier semantics
+//!
+//! The `CoalescePlan` lives on the upstream `ExchangeExec`; the rule does
+//! not rewrite the plan tree. Idempotency is structural — `set_coalesce`
+//! overwrites the slot with an equivalent plan on re-entry, and the
+//! bin-pack is a pure function of the resolved byte sizes, so the second
+//! pass produces the same decision.
+//!

Review Comment:
   maybe we should note that it will binpack neighboring partitions only 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to