martin-g commented on code in PR #1789:
URL:
https://github.com/apache/datafusion-ballista/pull/1789#discussion_r3324499703
##########
ballista/scheduler/src/state/execution_stage.rs:
##########
@@ -1058,18 +1058,29 @@ impl StageOutput {
}
/// returns vector of partition locations
/// which is compatible with ShuffleReader vector format
+ ///
+ /// `output_partition_count` is the number of expected
+ /// output partition number
pub fn partition_locations(
mut self,
- output_partitions: usize,
+ output_partition_count: usize,
) -> Vec<Vec<PartitionLocation>> {
let mut partition_locations = Vec::new();
- for i in 0..output_partitions {
+ for i in 0..output_partition_count {
let p = self.partition_locations.remove(&i).unwrap_or_default();
partition_locations.push(p);
}
partition_locations
}
+
+ /// returns vector of partition locations
+ /// which is compatible with ShuffleReader vector format
+ /// supporting broadcast shuffle read.
+ /// All partitions are merged into one
+ pub fn partition_locations_broadcast(self) -> Vec<Vec<PartitionLocation>> {
+ self.partition_locations.into_values().collect()
Review Comment:
for broadcast is should be just one partition, no ?
```suggestion
let all_locations: Vec<PartitionLocation> =
self.partition_locations.into_values().flatten().collect();
vec![all_locations]
```
##########
ballista/scheduler/src/state/aqe/execution_plan/dynamic_join.rs:
##########
@@ -0,0 +1,537 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::{
+ arrow::compute::SortOptions,
+ common::{JoinType, NullEquality, Result, exec_err, internal_err},
+ physical_expr_common::physical_expr::fmt_sql,
+ physical_plan::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
PlanProperties,
+ joins::{
+ HashJoinExec, HashJoinExecBuilder, JoinOn, PartitionMode,
SortMergeJoinExec,
+ utils::JoinFilter,
+ },
+ },
+};
+use std::sync::Arc;
+
+use crate::state::aqe::execution_plan::ExchangeExec;
+
+/// has children of this join been
+/// repartitioned
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum JoinInputState {
+ /// All inputs has been repartitioned
+ /// which means this join can be resolved
+ Repartitioned,
+ /// State on join inputs is unknown
+ Unknown,
+}
+
+// SortMergeJoinExec::try_new(left, right, on, filter, join_type,
sort_options, null_equality )
+// HashJoinExec::try_new (left, right, on, filter, join_type, projection,
partition_mode, null_equality, null_aware )
+#[derive(Debug)]
+pub struct DynamicJoinSelectionExec {
+ pub left: Arc<dyn ExecutionPlan>,
Review Comment:
The fields could be private.
##########
ballista/scheduler/src/state/aqe/execution_plan/dynamic_join.rs:
##########
@@ -0,0 +1,537 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::{
+ arrow::compute::SortOptions,
+ common::{JoinType, NullEquality, Result, exec_err, internal_err},
+ physical_expr_common::physical_expr::fmt_sql,
+ physical_plan::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
PlanProperties,
+ joins::{
+ HashJoinExec, HashJoinExecBuilder, JoinOn, PartitionMode,
SortMergeJoinExec,
+ utils::JoinFilter,
+ },
+ },
+};
+use std::sync::Arc;
+
+use crate::state::aqe::execution_plan::ExchangeExec;
+
+/// has children of this join been
+/// repartitioned
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum JoinInputState {
+ /// All inputs has been repartitioned
+ /// which means this join can be resolved
+ Repartitioned,
+ /// State on join inputs is unknown
+ Unknown,
+}
+
+// SortMergeJoinExec::try_new(left, right, on, filter, join_type,
sort_options, null_equality )
+// HashJoinExec::try_new (left, right, on, filter, join_type, projection,
partition_mode, null_equality, null_aware )
+#[derive(Debug)]
+pub struct DynamicJoinSelectionExec {
+ pub left: Arc<dyn ExecutionPlan>,
+ pub right: Arc<dyn ExecutionPlan>,
+ pub on: JoinOn,
+ pub filter: Option<JoinFilter>,
+ pub join_type: JoinType,
+ pub projection: Option<Vec<usize>>,
+ pub null_equality: NullEquality,
+ pub properties: Arc<PlanProperties>,
+ pub selection_state: JoinInputState,
+ pub null_aware: bool,
+}
+
+impl ExecutionPlan for DynamicJoinSelectionExec {
+ fn name(&self) -> &str {
+ "DynamicJoinSelectionExec"
+ }
+
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn properties(&self) ->
&std::sync::Arc<datafusion::physical_plan::PlanProperties> {
+ &self.properties
+ }
+
+ fn children(&self) -> Vec<&std::sync::Arc<dyn ExecutionPlan>> {
+ vec![&self.left, &self.right]
+ }
+
+ fn with_new_children(
+ self: std::sync::Arc<Self>,
+ children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
+ ) -> datafusion::error::Result<std::sync::Arc<dyn ExecutionPlan>> {
+ if children.len() != 2 {
+ return internal_err!(
+ "DynamicJoinSelectionExec expects 2 children, got {}",
+ children.len()
+ );
+ }
+ Ok(Arc::new(DynamicJoinSelectionExec {
+ left: Arc::clone(&children[0]),
+ right: Arc::clone(&children[1]),
+ on: self.on.clone(),
+ filter: self.filter.clone(),
+ join_type: self.join_type,
+ projection: self.projection.clone(),
+ null_equality: self.null_equality,
+ properties: Arc::clone(&self.properties),
+ selection_state: self.selection_state.clone(),
+ null_aware: self.null_aware,
+ }))
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: std::sync::Arc<datafusion::execution::TaskContext>,
+ ) ->
datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
+ exec_err!(
+ "Operator should not be executed; it should have been replaced by
an optimizer rule."
+ )
+ }
+}
+
+impl DisplayAs for DynamicJoinSelectionExec {
+ fn fmt_as(
+ &self,
+ t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let on = self
+ .on
+ .iter()
+ .map(|(c1, c2)| format!("({c1}, {c2})"))
+ .collect::<Vec<String>>()
+ .join(", ");
+ let display_null_equality =
+ if self.null_equality == NullEquality::NullEqualsNull {
+ ", NullsEqual: true"
+ } else {
+ ""
+ };
+ write!(
+ f,
+ "{}: join_type={:?}, on=[{}]{}{}",
+ Self::static_name(),
+ self.join_type,
+ on,
+ self.filter.as_ref().map_or_else(
+ || "".to_string(),
+ |f| format!(", filter={}", f.expression())
+ ),
+ display_null_equality,
+ )?;
+
+ write!(
+ f,
+ " repartitioned={}",
+ matches!(self.selection_state,
JoinInputState::Repartitioned)
+ )?;
+
+ Ok(())
+ }
+ DisplayFormatType::TreeRender => {
+ let on = self
+ .on
+ .iter()
+ .map(|(c1, c2)| {
+ format!("({} = {})", fmt_sql(c1.as_ref()),
fmt_sql(c2.as_ref()))
+ })
+ .collect::<Vec<String>>()
+ .join(", ");
+
+ if self.join_type != JoinType::Inner {
+ writeln!(f, "join_type={:?}", self.join_type)?;
+ }
+ writeln!(f, "on={on}")?;
+
+ if self.null_equality == NullEquality::NullEqualsNull {
+ writeln!(f, "NullsEqual: true")?;
+ }
+
+ writeln!(
+ f,
+ " repartitioned={}",
+ matches!(self.selection_state,
JoinInputState::Repartitioned)
+ )?;
+
+ Ok(())
+ }
+ }
+ }
+}
+
+pub enum JoinSelectionAction {
+ Repartition(Arc<DynamicJoinSelectionExec>),
+ CollectLeft(Arc<HashJoinExec>),
+ LateCollectLeft(Arc<HashJoinExec>),
+ Hash(Arc<HashJoinExec>),
+ Sort(Arc<SortMergeJoinExec>),
+}
+
+impl DynamicJoinSelectionExec {
+ // this is required, as we do not want to implement
+ // ExecutionPlan::required_input_distribution as other
+ // datafusion planners are going to add repartition
+ pub(crate) fn _required_input_distribution(&self) -> Vec<Distribution> {
Review Comment:
Why the name is prefixed with `_` ? To distinguish it from
`ExecutionPlan::required_input_distribution` ?
Maybe rename it to `input_distribution()` ?!
##########
ballista/scheduler/src/state/aqe/planner.rs:
##########
@@ -210,25 +255,34 @@ impl AdaptivePlanner {
&mut self,
stage_id: usize,
) -> common::Result<Vec<Vec<PartitionLocation>>> {
- let output_partition_count = self
- .runnable_stage_cache
- .get(&stage_id)
- .ok_or(datafusion::error::DataFusionError::Execution(
+ let stage = self.runnable_stage_cache.get(&stage_id).ok_or(
+ datafusion::error::DataFusionError::Execution(
"Can't find active cache resolve".into(),
- ))?
- .output_partitioning()
- .partition_count();
-
- let stage_output = self
- .runnable_stage_output
- .remove(&stage_id)
- .ok_or(datafusion::error::DataFusionError::Execution(
- "Can't find active stage to update resolve".into(),
- ))?
- .partition_locations(output_partition_count);
-
+ ),
+ )?;
+ let is_broadcast = stage
+ .as_any()
+ .downcast_ref::<ExchangeExec>()
+ .map(|e| e.broadcast)
+ .unwrap_or(false);
+
+ let output_partition_count =
stage.output_partitioning().partition_count();
+ let stage_output = if is_broadcast {
Review Comment:
The logic in `then` and `else` are almost the same. Could be simplified by
extracting the common logic before the `if`
##########
ballista/core/src/execution_plans/chaos_exec.rs:
##########
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// ChaosExec is a physical execution plan node for robustness/chaos testing.
+// It wraps a single child node, preserving its schema and partitioning, and
+// randomly injects faults according to a configurable failure_probability
+// (in [0.0, 1.0]) and fault_type:
+//
+// "transient" — returns a recoverable IoError on the first batch
+// "fatal" — returns a non-recoverable Execution error on the first
batch
+// "panic" — panics on the first batch
+// "delay" — sleeps 1 ms before every batch
+// "delay:N" — sleeps N ms before every batch
+//
+// ChaosExec is inserted into query plans by physical optimizer rule, which
+// probabilistically wraps leaf nodes.
+
+use datafusion::common::{DataFusionError, Result, Statistics, internal_err};
+use datafusion::config::ConfigOptions;
+use datafusion::execution::TaskContext;
+use datafusion::physical_plan::execution_plan::CardinalityEffect;
+use datafusion::physical_plan::metrics::MetricsSet;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{
+ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
+ SendableRecordBatchStream,
+};
+use futures::StreamExt;
+use rand::rngs::StdRng;
+use rand::{RngExt, SeedableRng};
+use std::any::Any;
+use std::sync::Arc;
+
+/// Physical execution plan node that randomly injects failures for
chaos/robustness testing.
+#[derive(Debug, Clone)]
+pub struct ChaosExec {
+ input: Arc<dyn ExecutionPlan>,
+ // a probability this node will fail when run
+ failure_probability: f64,
+ // controls what kind of fault is injected: "transient", "fatal", "panic",
or "delay"
+ fault_type: String,
+ // optional RNG seed; None means thread-local non-deterministic RNG
+ seed: Option<u64>,
+}
+
+impl ChaosExec {
+ /// Creates a new `ChaosExec` wrapping `input`.
+ ///
+ /// `failure_probability` must be in `[0.0, 1.0]`.
+ /// `fault_type` must be one of `"transient"`, `"fatal"`, `"panic"`,
`"delay"`, or `"delay:N"`
+ /// where N is a positive integer number of milliseconds.
+ pub fn new(
+ input: Arc<dyn ExecutionPlan>,
+ failure_probability: f64,
+ fault_type: &str,
+ seed: Option<u64>,
+ ) -> Result<Self> {
+ if !(0.0..=1.0).contains(&failure_probability) {
+ return internal_err!(
+ "ChaosExec failure_probability must be in [0.0, 1.0], got
{failure_probability}"
+ );
+ }
+ match fault_type {
+ "transient" | "fatal" | "panic" | "delay" => {}
+ other if other.starts_with("delay:") => {
+ let ms_str = &other["delay:".len()..];
+ if ms_str.parse::<u64>().is_err() {
Review Comment:
this would also accept `delay:0`. Is this OK ?
`std::thread::sleep(Duration::from(0))` behaves a bit differently on Unix
and Windows.
##########
ballista/scheduler/src/state/aqe/adapter.rs:
##########
@@ -89,12 +90,18 @@ impl BallistaAdapter {
new_partitioning,
)?
}
- None => ShuffleReaderExec::try_new(
+ (None, false) => ShuffleReaderExec::try_new(
stage_id,
partitions,
schema,
partitioning,
)?,
+ (_, true) => ShuffleReaderExec::try_new_broadcast(
+ stage_id,
+ exchange.shuffle_partitions_flattened(),
+ schema,
+ exchange.input().output_partitioning().partition_count(),
Review Comment:
```suggestion
partitions.len(),
```
?!
##########
ballista/scheduler/src/state/aqe/planner.rs:
##########
@@ -118,16 +124,55 @@ impl AdaptivePlanner {
///
/// # Returns
/// A new instance of `AdaptivePlanner` or an error if the initialization
fails.
- pub fn try_new(
+ #[cfg(test)]
+ pub fn try_from_plan(
session_config: &SessionConfig,
plan: Arc<dyn ExecutionPlan>,
job_name: String,
) -> common::Result<Self> {
+ let plan_id_generator = Arc::new(AtomicUsize::new(0));
Self::try_new_with_optimizers(
session_config,
plan,
job_name,
- Self::default_optimizers(),
+ Self::default_optimizers(plan_id_generator),
+ )
+ }
+
+ /// Creates a new `AdaptivePlanner` with default physical optimizer rules.
+ ///
+ /// # Arguments
+ ///
+ /// * `ctx` - The session context
+ /// * `logical_plan` - The logical plan for the job.
+ /// * `job_name` - The name of the job.
+ ///
+ /// # Returns
+ /// A new instance of `AdaptivePlanner` or an error if the initialization
fails.
+ pub async fn try_new(
+ ctx: &SessionContext,
+ logical_plan: &LogicalPlan,
+ job_name: String,
+ ) -> common::Result<Self> {
+ // session state with very limited set of optimizers.
+ // this optimizer set will be executed only once, before
+ // running standard set of optimizers, which will
+ // after each stage.
+ let state = Self::create_session_state(
+ ctx.state().config(),
+ Self::plan_preparation_optimizers(),
+ );
+
+ let plan = state.create_physical_plan(logical_plan).await?;
+ let plan = handle_explain_plan(&job_name, ctx, logical_plan, plan)
Review Comment:
https://github.com/apache/datafusion-ballista/pull/1789/changes#diff-4930b650031f2682787dc93a3211bf2c8edc3ff2ca88854d5a7714e390836f76L151
was passing `job_id` instead of `job_name`
##########
ballista/scheduler/src/state/aqe/execution_plan/exchange.rs:
##########
@@ -171,6 +215,14 @@ impl ExchangeExec {
self.shuffle_partitions.lock().clone()
}
+ /// Flattens partition locations into single vector,
+ /// this method is usually used when we want to collect partitions
+ /// to form a broadcast join
+ pub(crate) fn shuffle_partitions_flattened(&self) ->
Vec<PartitionLocation> {
+ let partitions =
self.shuffle_partitions.lock().clone().unwrap_or_default();
Review Comment:
Is it possible `self.shuffle_partitions` to be None ? In that case the
result will be an empty Vec. Is this OK for broadcast join ?
##########
ballista/core/src/serde/mod.rs:
##########
@@ -532,6 +532,15 @@ impl PhysicalExtensionCodec for
BallistaPhysicalExtensionCodec {
};
Ok(Arc::new(exec))
}
+ PhysicalPlanType::ChaosExec(chaos_exec) => {
+ let input = inputs[0].clone();
Review Comment:
```suggestion
let input = inputs.first().cloned().ok_or_else(|| {
DataFusionError::Internal(
"ChaosExec requires exactly one input
child".to_string(),
)
})?;
```
##########
ballista/scheduler/src/state/aqe/execution_plan/dynamic_join.rs:
##########
@@ -0,0 +1,537 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::{
+ arrow::compute::SortOptions,
+ common::{JoinType, NullEquality, Result, exec_err, internal_err},
+ physical_expr_common::physical_expr::fmt_sql,
+ physical_plan::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
PlanProperties,
+ joins::{
+ HashJoinExec, HashJoinExecBuilder, JoinOn, PartitionMode,
SortMergeJoinExec,
+ utils::JoinFilter,
+ },
+ },
+};
+use std::sync::Arc;
+
+use crate::state::aqe::execution_plan::ExchangeExec;
+
+/// has children of this join been
+/// repartitioned
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum JoinInputState {
+ /// All inputs has been repartitioned
+ /// which means this join can be resolved
+ Repartitioned,
+ /// State on join inputs is unknown
+ Unknown,
+}
+
+// SortMergeJoinExec::try_new(left, right, on, filter, join_type,
sort_options, null_equality )
+// HashJoinExec::try_new (left, right, on, filter, join_type, projection,
partition_mode, null_equality, null_aware )
+#[derive(Debug)]
+pub struct DynamicJoinSelectionExec {
+ pub left: Arc<dyn ExecutionPlan>,
+ pub right: Arc<dyn ExecutionPlan>,
+ pub on: JoinOn,
+ pub filter: Option<JoinFilter>,
+ pub join_type: JoinType,
+ pub projection: Option<Vec<usize>>,
+ pub null_equality: NullEquality,
+ pub properties: Arc<PlanProperties>,
+ pub selection_state: JoinInputState,
+ pub null_aware: bool,
+}
+
+impl ExecutionPlan for DynamicJoinSelectionExec {
+ fn name(&self) -> &str {
+ "DynamicJoinSelectionExec"
+ }
+
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn properties(&self) ->
&std::sync::Arc<datafusion::physical_plan::PlanProperties> {
+ &self.properties
+ }
+
+ fn children(&self) -> Vec<&std::sync::Arc<dyn ExecutionPlan>> {
+ vec![&self.left, &self.right]
+ }
+
+ fn with_new_children(
+ self: std::sync::Arc<Self>,
+ children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
+ ) -> datafusion::error::Result<std::sync::Arc<dyn ExecutionPlan>> {
+ if children.len() != 2 {
+ return internal_err!(
+ "DynamicJoinSelectionExec expects 2 children, got {}",
+ children.len()
+ );
+ }
+ Ok(Arc::new(DynamicJoinSelectionExec {
+ left: Arc::clone(&children[0]),
+ right: Arc::clone(&children[1]),
+ on: self.on.clone(),
+ filter: self.filter.clone(),
+ join_type: self.join_type,
+ projection: self.projection.clone(),
+ null_equality: self.null_equality,
+ properties: Arc::clone(&self.properties),
+ selection_state: self.selection_state.clone(),
+ null_aware: self.null_aware,
+ }))
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: std::sync::Arc<datafusion::execution::TaskContext>,
+ ) ->
datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
+ exec_err!(
+ "Operator should not be executed; it should have been replaced by
an optimizer rule."
+ )
+ }
+}
+
+impl DisplayAs for DynamicJoinSelectionExec {
+ fn fmt_as(
+ &self,
+ t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let on = self
+ .on
+ .iter()
+ .map(|(c1, c2)| format!("({c1}, {c2})"))
+ .collect::<Vec<String>>()
+ .join(", ");
+ let display_null_equality =
+ if self.null_equality == NullEquality::NullEqualsNull {
+ ", NullsEqual: true"
+ } else {
+ ""
+ };
+ write!(
+ f,
+ "{}: join_type={:?}, on=[{}]{}{}",
+ Self::static_name(),
+ self.join_type,
+ on,
+ self.filter.as_ref().map_or_else(
+ || "".to_string(),
+ |f| format!(", filter={}", f.expression())
+ ),
+ display_null_equality,
+ )?;
+
+ write!(
+ f,
+ " repartitioned={}",
+ matches!(self.selection_state,
JoinInputState::Repartitioned)
+ )?;
+
+ Ok(())
+ }
+ DisplayFormatType::TreeRender => {
+ let on = self
+ .on
+ .iter()
+ .map(|(c1, c2)| {
+ format!("({} = {})", fmt_sql(c1.as_ref()),
fmt_sql(c2.as_ref()))
+ })
+ .collect::<Vec<String>>()
+ .join(", ");
+
+ if self.join_type != JoinType::Inner {
+ writeln!(f, "join_type={:?}", self.join_type)?;
+ }
+ writeln!(f, "on={on}")?;
+
+ if self.null_equality == NullEquality::NullEqualsNull {
+ writeln!(f, "NullsEqual: true")?;
+ }
+
+ writeln!(
+ f,
+ " repartitioned={}",
+ matches!(self.selection_state,
JoinInputState::Repartitioned)
+ )?;
+
+ Ok(())
+ }
+ }
+ }
+}
+
+pub enum JoinSelectionAction {
+ Repartition(Arc<DynamicJoinSelectionExec>),
+ CollectLeft(Arc<HashJoinExec>),
+ LateCollectLeft(Arc<HashJoinExec>),
+ Hash(Arc<HashJoinExec>),
+ Sort(Arc<SortMergeJoinExec>),
+}
+
+impl DynamicJoinSelectionExec {
+ // this is required, as we do not want to implement
+ // ExecutionPlan::required_input_distribution as other
+ // datafusion planners are going to add repartition
+ pub(crate) fn _required_input_distribution(&self) -> Vec<Distribution> {
+ let (left_expr, right_expr) = self
+ .on
+ .iter()
+ .map(|(l, r)| (Arc::clone(l), Arc::clone(r)))
+ .unzip();
+ vec![
+ Distribution::HashPartitioned(left_expr),
+ Distribution::HashPartitioned(right_expr),
+ ]
+ }
+
+ pub(crate) fn to_actual_join(
+ &self,
+ config: &datafusion::config::ConfigOptions,
+ ) -> Result<JoinSelectionAction> {
+ let prefer_hash_join = config.optimizer.prefer_hash_join;
+ let threshold_collect_left_join_bytes =
+ config.optimizer.hash_join_single_partition_threshold;
+ let threshold_collect_left_join_rows =
+ config.optimizer.hash_join_single_partition_threshold_rows;
+
+ let partition_mode = if Self::supports_collect_by_thresholds(
+ self.left.as_ref(),
+ threshold_collect_left_join_bytes,
+ threshold_collect_left_join_rows,
+ ) || Self::supports_collect_by_thresholds(
+ self.right.as_ref(),
Review Comment:
Please confirm that `right` should be used here.
It is strange that both `left` and `right` lead to `CollectLeft`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]