danielhumanmod commented on code in PR #1649:
URL:
https://github.com/apache/datafusion-ballista/pull/1649#discussion_r3199075305
##########
.gitignore:
##########
@@ -113,4 +113,6 @@ CLAUDE.md
.claude/
# git worktrees (local only)
-.worktrees/
\ No newline at end of file
+.worktrees/
+# ignore insta captures
+*-snap
Review Comment:
Is this change intented?
##########
ballista/scheduler/src/state/aqe/optimizer_rule/distributed_exchange.rs:
##########
@@ -25,82 +25,74 @@ use datafusion::physical_plan::{ExecutionPlan,
execution_plan};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
+enum ExchangeStatus {
+ None,
+ Resolved,
+ Unresolved,
+}
+
#[derive(Debug, Clone, Default)]
pub struct DistributedExchangeRule {
plan_id_generator: Arc<AtomicUsize>,
}
impl DistributedExchangeRule {
- pub fn plan_invalid(
- &self,
- execution_plan: Arc<dyn ExecutionPlan>,
- ) -> datafusion::error::Result<bool> {
- execution_plan
- .transform_up(|p| self.transform(p))
- .transformed()
- }
-
pub(crate) fn transform(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
) -> datafusion::error::Result<Transformed<Arc<dyn ExecutionPlan>>> {
if let Some(coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
- && coalesce
- .input()
- .as_any()
- .downcast_ref::<ExchangeExec>()
- .is_none()
{
- let exchange_exec = ExchangeExec::new(
- coalesce.input().clone(),
- None,
- self.plan_id_generator
- .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
- );
- Ok(Transformed::yes(
-
execution_plan.with_new_children(vec![Arc::new(exchange_exec)])?,
- ))
+ let input = coalesce.input();
Review Comment:
Love the refactor in this part, much easier to review
##########
ballista/scheduler/src/state/aqe/optimizer_rule/distributed_exchange.rs:
##########
@@ -138,3 +130,515 @@ impl PhysicalOptimizerRule for DistributedExchangeRule {
false
}
}
+
+/// Scans the subtree for the nearest `ExchangeExec` in each path and returns
the
+/// aggregate status. Stops recursing at `ExchangeExec` boundaries so that
only the
+/// shallowest exchange in each branch is considered.
+///
+/// Returns `Unresolved` as soon as any branch contains an unresolved exchange
+/// (short-circuits), `Resolved` if every branch that has an exchange has a
resolved
+/// one, and `None` if no exchange is found anywhere.
+fn find_exchange_status(plan: &Arc<dyn ExecutionPlan>) -> ExchangeStatus {
+ if let Some(exchange) = plan.as_any().downcast_ref::<ExchangeExec>() {
+ if exchange.shuffle_created() {
Review Comment:
I notice there is a `inactive_stage` in ExchangeExec, should we take it into
account as well?
##########
ballista/scheduler/src/state/aqe/test/alter_stages.rs:
##########
@@ -149,68 +146,182 @@ async fn should_propagate_empty_stage_and_remove() ->
datafusion::error::Result<
let stages = planner.runnable_stages()?.unwrap();
assert_eq!(1, stages.len());
assert_plan!(stages.first().unwrap().plan.as_ref(), @ r"
- ShuffleWriterExec: partitioning: None
+ SortShuffleWriterExec: partitioning=Hash([c0@0], 2)
EmptyExec
");
planner.finalise_stage_internal(1,
mock_partitions_with_statistics_no_data())?;
let stages = planner.runnable_stages()?;
- assert!(stages.is_none());
+ assert_plan!(planner.current_plan(), @ r"
+ AdaptiveDatafusionExec: is_final=true, plan_id=1, stage_id=2
+ EmptyExec
+ ");
+ assert!(stages.is_some());
Ok(())
}
+// This test covers join re-ordering after exchange statistic changed
+//
+// note: I was expecting join type change from `mode=Partitioned` to
+// `mode=CollectLeft` but it did not, to be investigated as a follow
+// up
#[tokio::test]
-async fn should_insert_new_stage() -> datafusion::error::Result<()> {
+async fn should_support_join_re_ordering() -> datafusion::error::Result<()> {
let ctx = mock_context();
- let join = create_plan_with_scan()?;
+ let join = create_plan_with_hash_join()?;
+
+ assert_plan!(join.as_ref(), @ r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(big_col@0,
big_col@0)]
+ RepartitionExec: partitioning=Hash([big_col@0], 2), input_partitions=2
+ MockPartitionedScan: num_partitions=2, statistics=[Rows=Exact(262144),
Bytes=Exact(2097152), [(Col[0]:)]]
+ RepartitionExec: partitioning=Hash([big_col@0], 2), input_partitions=2
+ MockPartitionedScan: num_partitions=2, statistics=[Rows=Exact(262144),
Bytes=Exact(2097152), [(Col[0]:)]]
+ ");
+
let mut planner =
AdaptivePlanner::try_new(ctx.state().config(), join,
"test_job".to_string())?;
assert_plan!(planner.current_plan(), @ r"
- AdaptiveDatafusionExec: is_final=false, plan_id=1, stage_id=pending
- CrossJoinExec
- CoalescePartitionsExec
- ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=0,
stage_id=pending, stage_resolved=false
- StatisticsExec: col_count=1, row_count=Inexact(262144)
- CooperativeExec
- MockPartitionedScan: num_partitions=2,
statistics=[Rows=Inexact(1024), Bytes=Inexact(8192), [(Col[0]:)]]
+ AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(big_col@0,
big_col@0)]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=0,
stage_id=pending, stage_resolved=false
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=1,
stage_id=pending, stage_resolved=false
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
");
let stages = planner.runnable_stages()?.unwrap();
- assert_eq!(1, stages.len());
+ assert_eq!(2, stages.len());
- assert_plan!(stages[0].plan.as_ref(), @ r"
- SortShuffleWriterExec: partitioning=Hash([big_col@0], 2)
- StatisticsExec: col_count=1, row_count=Inexact(262144)
+ assert_plan!(planner.current_plan(), @ r"
+ AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(big_col@0,
big_col@0)]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=0,
stage_id=0, stage_resolved=false
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=1,
stage_id=1, stage_resolved=false
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
");
+ // we finalize stage 1 with smaller number of rows, for test purposes,
+ // this should trigger join re-ordering
planner.finalise_stage_internal(0, big_statistics_exchange())?;
+ planner.finalise_stage_internal(1, small_statistics_exchange())?;
+
+ // join ordering changes as build side is bigger than probe side
+ // after exchange statistic updated.
+ assert_plan!(planner.current_plan(), @ r"
+ AdaptiveDatafusionExec: is_final=false, plan_id=2, stage_id=pending
+ ProjectionExec: expr=[big_col@1 as big_col, big_col@0 as big_col]
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(big_col@0,
big_col@0)]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=1,
stage_id=1, stage_resolved=true
+ CooperativeExec
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=0,
stage_id=0, stage_resolved=true
+ CooperativeExec
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
+ ");
+
+ let stages = planner.runnable_stages()?.unwrap();
+ assert_eq!(1, stages.len());
+
+ assert_plan!(planner.current_plan(), @ r"
+ AdaptiveDatafusionExec: is_final=true, plan_id=2, stage_id=2
+ ProjectionExec: expr=[big_col@1 as big_col, big_col@0 as big_col]
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(big_col@0,
big_col@0)]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=1,
stage_id=1, stage_resolved=true
+ CooperativeExec
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=0,
stage_id=0, stage_resolved=true
+ CooperativeExec
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
+ ");
+
+ planner.finalise_stage_internal(2, small_statistics_exchange())?;
+
+ assert_plan!(planner.current_plan(), @ r"
+ AdaptiveDatafusionExec: is_final=true, plan_id=2, stage_id=2
+ ProjectionExec: expr=[big_col@1 as big_col, big_col@0 as big_col]
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(big_col@0,
big_col@0)]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=1,
stage_id=1, stage_resolved=true
+ CooperativeExec
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
+ ExchangeExec: partitioning=Hash([big_col@0], 2), plan_id=0,
stage_id=0, stage_resolved=true
+ CooperativeExec
+ MockPartitionedScan: num_partitions=2,
statistics=[Rows=Exact(262144), Bytes=Exact(2097152), [(Col[0]:)]]
+ ");
+
+ Ok(())
+}
+
+// this test initially handled cross join where new repartition
+// is inserted due to new statistic information.
+// with lazy stage computation this plan is adjusted much earlier
+// hance no late exchange addition.
Review Comment:
NIT: typo, hance -> hence?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]