This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 387e20cc58 Improve `HashJoinExecBuilder` to save state from previous
fields (#20276)
387e20cc58 is described below
commit 387e20cc58ae91da3902b58438a0684998d7b45b
Author: Albert Skalt <[email protected]>
AuthorDate: Wed Feb 25 01:15:34 2026 +0300
Improve `HashJoinExecBuilder` to save state from previous fields (#20276)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
Closes #20270
Prior the patch HashJoinExecBuilder constructed from an existing node
reseted some fields of the node, e.g. dynamic filters, metrics. It
significantly reduces usage scope of the builder.
## What changes are included in this PR?
This patch improves the implementation. Now builder created from the
existing node preserves all fields in case they have not been explicitly
updated. Also builder now tracks flag if it must recompute plan
properties.
Co-authored-by: Andrew Lamb <[email protected]>
---
.../physical-optimizer/src/enforce_distribution.rs | 79 ++--
.../physical-optimizer/src/join_selection.rs | 11 +-
datafusion/physical-plan/src/execution_plan.rs | 17 +-
.../physical-plan/src/joins/hash_join/exec.rs | 396 +++++++++++----------
4 files changed, 258 insertions(+), 245 deletions(-)
diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs
b/datafusion/physical-optimizer/src/enforce_distribution.rs
index 2d8fbe1cfe..d23a699f71 100644
--- a/datafusion/physical-optimizer/src/enforce_distribution.rs
+++ b/datafusion/physical-optimizer/src/enforce_distribution.rs
@@ -49,7 +49,7 @@ use datafusion_physical_plan::aggregates::{
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::joins::{
- CrossJoinExec, HashJoinExec, HashJoinExecBuilder, PartitionMode,
SortMergeJoinExec,
+ CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
};
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -286,18 +286,15 @@ pub fn adjust_input_keys_ordering(
) -> Result<Transformed<PlanWithKeyRequirements>> {
let plan = Arc::clone(&requirements.plan);
- if let Some(HashJoinExec {
- left,
- right,
- on,
- filter,
- join_type,
- projection,
- mode,
- null_equality,
- null_aware,
- ..
- }) = plan.as_any().downcast_ref::<HashJoinExec>()
+ if let Some(
+ exec @ HashJoinExec {
+ left,
+ on,
+ join_type,
+ mode,
+ ..
+ },
+ ) = plan.as_any().downcast_ref::<HashJoinExec>()
{
match mode {
PartitionMode::Partitioned => {
@@ -305,20 +302,10 @@ pub fn adjust_input_keys_ordering(
Vec<(PhysicalExprRef, PhysicalExprRef)>,
Vec<SortOptions>,
)| {
- HashJoinExecBuilder::new(
- Arc::clone(left),
- Arc::clone(right),
- new_conditions.0,
- *join_type,
- )
- .with_filter(filter.clone())
- // TODO: although projection is not used in the join here,
because projection pushdown is after enforce_distribution. Maybe we need to
handle it later. Same as filter.
- .with_projection_ref(projection.clone())
- .with_partition_mode(PartitionMode::Partitioned)
- .with_null_equality(*null_equality)
- .with_null_aware(*null_aware)
- .build()
- .map(|e| Arc::new(e) as _)
+ exec.builder()
+ .with_partition_mode(PartitionMode::Partitioned)
+ .with_on(new_conditions.0)
+ .build_exec()
};
return reorder_partitioned_join_keys(
requirements,
@@ -612,18 +599,15 @@ pub fn reorder_join_keys_to_inputs(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_any = plan.as_any();
- if let Some(HashJoinExec {
- left,
- right,
- on,
- filter,
- join_type,
- projection,
- mode,
- null_equality,
- null_aware,
- ..
- }) = plan_any.downcast_ref::<HashJoinExec>()
+ if let Some(
+ exec @ HashJoinExec {
+ left,
+ right,
+ on,
+ mode,
+ ..
+ },
+ ) = plan_any.downcast_ref::<HashJoinExec>()
{
if *mode == PartitionMode::Partitioned {
let (join_keys, positions) = reorder_current_join_keys(
@@ -639,20 +623,11 @@ pub fn reorder_join_keys_to_inputs(
right_keys,
} = join_keys;
let new_join_on = new_join_conditions(&left_keys, &right_keys);
- return Ok(Arc::new(
- HashJoinExecBuilder::new(
- Arc::clone(left),
- Arc::clone(right),
- new_join_on,
- *join_type,
- )
- .with_filter(filter.clone())
- .with_projection_ref(projection.clone())
+ return exec
+ .builder()
.with_partition_mode(PartitionMode::Partitioned)
- .with_null_equality(*null_equality)
- .with_null_aware(*null_aware)
- .build()?,
- ));
+ .with_on(new_join_on)
+ .build_exec();
}
}
} else if let Some(SortMergeJoinExec {
diff --git a/datafusion/physical-optimizer/src/join_selection.rs
b/datafusion/physical-optimizer/src/join_selection.rs
index 02ef378d70..29bbc8e108 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -34,7 +34,7 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::joins::utils::ColumnIndex;
use datafusion_physical_plan::joins::{
- CrossJoinExec, HashJoinExec, HashJoinExecBuilder, NestedLoopJoinExec,
PartitionMode,
+ CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
StreamJoinPartitionMode, SymmetricHashJoinExec,
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
@@ -192,14 +192,16 @@ pub(crate) fn try_collect_left(
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
} else {
Ok(Some(Arc::new(
- HashJoinExecBuilder::from(hash_join)
+ hash_join
+ .builder()
.with_partition_mode(PartitionMode::CollectLeft)
.build()?,
)))
}
}
(true, false) => Ok(Some(Arc::new(
- HashJoinExecBuilder::from(hash_join)
+ hash_join
+ .builder()
.with_partition_mode(PartitionMode::CollectLeft)
.build()?,
))),
@@ -243,7 +245,8 @@ pub(crate) fn partitioned_hash_join(
};
Ok(Arc::new(
- HashJoinExecBuilder::from(hash_join)
+ hash_join
+ .builder()
.with_partition_mode(partition_mode)
.build()?,
))
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index adb3c3af55..681a1345d8 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -25,6 +25,7 @@ pub use crate::ordering::InputOrderMode;
use crate::sort_pushdown::SortOrderPushdownResult;
pub use crate::stream::EmptyRecordBatchStream;
+use arrow_schema::Schema;
pub use datafusion_common::hash_utils;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
pub use datafusion_common::utils::project_schema;
@@ -38,7 +39,7 @@ pub use datafusion_physical_expr::{
use std::any::Any;
use std::fmt::Debug;
-use std::sync::Arc;
+use std::sync::{Arc, LazyLock};
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
@@ -1478,6 +1479,20 @@ pub enum CardinalityEffect {
GreaterEqual,
}
+/// Can be used in contexts where properties have not yet been initialized
properly.
+pub(crate) fn stub_properties() -> Arc<PlanProperties> {
+ static STUB_PROPERTIES: LazyLock<Arc<PlanProperties>> = LazyLock::new(|| {
+ Arc::new(PlanProperties::new(
+ EquivalenceProperties::new(Arc::new(Schema::empty())),
+ Partitioning::UnknownPartitioning(1),
+ EmissionType::Final,
+ Boundedness::Bounded,
+ ))
+ });
+
+ Arc::clone(&STUB_PROPERTIES)
+}
+
#[cfg(test)]
mod tests {
use std::any::Any;
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 1d54c89491..eda7e93eff 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -23,9 +23,7 @@ use std::sync::{Arc, OnceLock};
use std::{any::Any, vec};
use crate::ExecutionPlanProperties;
-use crate::execution_plan::{
- EmissionType, boundedness_from_children, has_same_children_properties,
-};
+use crate::execution_plan::{EmissionType, boundedness_from_children,
stub_properties};
use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription,
FilterPushdownPhase,
FilterPushdownPropagation,
@@ -69,7 +67,7 @@ use arrow::compute::concat_batches;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
-use arrow_schema::DataType;
+use arrow_schema::{DataType, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
@@ -251,21 +249,20 @@ impl JoinLeftData {
}
/// Helps to build [`HashJoinExec`].
+///
+/// Builder can be created from an existing [`HashJoinExec`] using
[`From::from`].
+/// In this case, all its fields are inherited. If a field that affects the
node's
+/// properties is modified, they will be automatically recomputed during the
build.
+///
+/// # Adding setters
+///
+/// When adding a new setter, it is necessary to ensure that the
`preserve_properties`
+/// flag is set to false if modifying the field requires a recomputation of
the plan's
+/// properties.
+///
pub struct HashJoinExecBuilder {
- left: Arc<dyn ExecutionPlan>,
- right: Arc<dyn ExecutionPlan>,
- on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
- join_type: JoinType,
- filter: Option<JoinFilter>,
- projection: Option<ProjectionRef>,
- partition_mode: PartitionMode,
- null_equality: NullEquality,
- null_aware: bool,
- /// Maximum number of rows to return
- ///
- /// If the operator produces `< fetch` rows, it returns all available rows.
- /// If it produces `>= fetch` rows, it returns exactly `fetch` rows and
stops early.
- fetch: Option<usize>,
+ exec: HashJoinExec,
+ preserve_properties: bool,
}
impl HashJoinExecBuilder {
@@ -277,19 +274,39 @@ impl HashJoinExecBuilder {
join_type: JoinType,
) -> Self {
Self {
- left,
- right,
- on,
- filter: None,
- projection: None,
- partition_mode: PartitionMode::Auto,
- join_type,
- null_equality: NullEquality::NullEqualsNothing,
- null_aware: false,
- fetch: None,
+ exec: HashJoinExec {
+ left,
+ right,
+ on,
+ filter: None,
+ join_type,
+ left_fut: Default::default(),
+ random_state: HASH_JOIN_SEED,
+ mode: PartitionMode::Auto,
+ fetch: None,
+ metrics: ExecutionPlanMetricsSet::new(),
+ projection: None,
+ column_indices: vec![],
+ null_equality: NullEquality::NullEqualsNothing,
+ null_aware: false,
+ dynamic_filter: None,
+ // Will be computed at when plan will be built.
+ cache: stub_properties(),
+ join_schema: Arc::new(Schema::empty()),
+ },
+ // As `exec` is initialized with stub properties,
+ // they will be properly computed when plan will be built.
+ preserve_properties: false,
}
}
+ /// Set join type.
+ pub fn with_type(mut self, join_type: JoinType) -> Self {
+ self.exec.join_type = join_type;
+ self.preserve_properties = false;
+ self
+ }
+
/// Set projection from the vector.
pub fn with_projection(self, projection: Option<Vec<usize>>) -> Self {
self.with_projection_ref(projection.map(Into::into))
@@ -297,54 +314,131 @@ impl HashJoinExecBuilder {
/// Set projection from the shared reference.
pub fn with_projection_ref(mut self, projection: Option<ProjectionRef>) ->
Self {
- self.projection = projection;
+ self.exec.projection = projection;
+ self.preserve_properties = false;
self
}
/// Set optional filter.
pub fn with_filter(mut self, filter: Option<JoinFilter>) -> Self {
- self.filter = filter;
+ self.exec.filter = filter;
+ self
+ }
+
+ /// Set expressions to join on.
+ pub fn with_on(mut self, on: Vec<(PhysicalExprRef, PhysicalExprRef)>) ->
Self {
+ self.exec.on = on;
+ self.preserve_properties = false;
self
}
/// Set partition mode.
pub fn with_partition_mode(mut self, mode: PartitionMode) -> Self {
- self.partition_mode = mode;
+ self.exec.mode = mode;
+ self.preserve_properties = false;
self
}
/// Set null equality property.
pub fn with_null_equality(mut self, null_equality: NullEquality) -> Self {
- self.null_equality = null_equality;
+ self.exec.null_equality = null_equality;
self
}
/// Set null aware property.
pub fn with_null_aware(mut self, null_aware: bool) -> Self {
- self.null_aware = null_aware;
+ self.exec.null_aware = null_aware;
self
}
- /// Set fetch limit.
+ /// Set fetch property.
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
- self.fetch = fetch;
+ self.exec.fetch = fetch;
+ self
+ }
+
+ /// Require to recompute plan properties.
+ pub fn recompute_properties(mut self) -> Self {
+ self.preserve_properties = false;
self
}
+ /// Replace children.
+ pub fn with_new_children(
+ mut self,
+ mut children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Self> {
+ assert_or_internal_err!(
+ children.len() == 2,
+ "wrong number of children passed into `HashJoinExecBuilder`"
+ );
+ self.exec.right = children.swap_remove(1);
+ self.exec.left = children.swap_remove(0);
+ self.preserve_properties = false;
+ Ok(self)
+ }
+
+ /// Reset runtime state.
+ pub fn reset_state(mut self) -> Self {
+ self.exec.left_fut = Default::default();
+ self.exec.dynamic_filter = None;
+ self.exec.metrics = ExecutionPlanMetricsSet::new();
+ self
+ }
+
+ /// Build result as a dyn execution plan.
+ pub fn build_exec(self) -> Result<Arc<dyn ExecutionPlan>> {
+ self.build().map(|p| Arc::new(p) as _)
+ }
+
/// Build resulting execution plan.
pub fn build(self) -> Result<HashJoinExec> {
let Self {
+ exec,
+ preserve_properties,
+ } = self;
+
+ // Validate null_aware flag
+ if exec.null_aware {
+ let join_type = exec.join_type();
+ if !matches!(join_type, JoinType::LeftAnti) {
+ return plan_err!(
+ "null_aware can only be true for LeftAnti joins, got
{join_type}"
+ );
+ }
+ let on = exec.on();
+ if on.len() != 1 {
+ return plan_err!(
+ "null_aware anti join only supports single column join
key, got {} columns",
+ on.len()
+ );
+ }
+ }
+
+ if preserve_properties {
+ return Ok(exec);
+ }
+
+ let HashJoinExec {
left,
right,
on,
- join_type,
filter,
+ join_type,
+ left_fut,
+ random_state,
+ mode,
+ metrics,
projection,
- partition_mode,
null_equality,
null_aware,
+ dynamic_filter,
fetch,
- } = self;
+ // Recomputed.
+ join_schema: _,
+ column_indices: _,
+ cache: _,
+ } = exec;
let left_schema = left.schema();
let right_schema = right.schema();
@@ -353,30 +447,12 @@ impl HashJoinExecBuilder {
}
check_join_is_valid(&left_schema, &right_schema, &on)?;
-
- // Validate null_aware flag
- if null_aware {
- if join_type != JoinType::LeftAnti {
- return plan_err!(
- "null_aware can only be true for LeftAnti joins, got
{join_type}"
- );
- }
- if on.len() != 1 {
- return plan_err!(
- "null_aware anti join only supports single column join
key, got {} columns",
- on.len()
- );
- }
- }
-
let (join_schema, column_indices) =
build_join_schema(&left_schema, &right_schema, &join_type);
- let random_state = HASH_JOIN_SEED;
-
let join_schema = Arc::new(join_schema);
- // check if the projection is valid
+ // Check if the projection is valid.
can_project(&join_schema, projection.as_deref())?;
let cache = HashJoinExec::compute_properties(
@@ -385,13 +461,10 @@ impl HashJoinExecBuilder {
&join_schema,
join_type,
&on,
- partition_mode,
+ mode,
projection.as_deref(),
)?;
- // Initialize both dynamic filter and bounds accumulator to None
- // They will be set later if dynamic filtering is enabled
-
Ok(HashJoinExec {
left,
right,
@@ -399,34 +472,49 @@ impl HashJoinExecBuilder {
filter,
join_type,
join_schema,
- left_fut: Default::default(),
+ left_fut,
random_state,
- mode: partition_mode,
- metrics: ExecutionPlanMetricsSet::new(),
+ mode,
+ metrics,
projection,
column_indices,
null_equality,
null_aware,
cache: Arc::new(cache),
- dynamic_filter: None,
+ dynamic_filter,
fetch,
})
}
+
+ fn with_dynamic_filter(mut self, filter:
Option<HashJoinExecDynamicFilter>) -> Self {
+ self.exec.dynamic_filter = filter;
+ self
+ }
}
impl From<&HashJoinExec> for HashJoinExecBuilder {
fn from(exec: &HashJoinExec) -> Self {
Self {
- left: Arc::clone(exec.left()),
- right: Arc::clone(exec.right()),
- on: exec.on.clone(),
- join_type: exec.join_type,
- filter: exec.filter.clone(),
- projection: exec.projection.clone(),
- partition_mode: exec.mode,
- null_equality: exec.null_equality,
- null_aware: exec.null_aware,
- fetch: exec.fetch,
+ exec: HashJoinExec {
+ left: Arc::clone(exec.left()),
+ right: Arc::clone(exec.right()),
+ on: exec.on.clone(),
+ filter: exec.filter.clone(),
+ join_type: exec.join_type,
+ join_schema: Arc::clone(&exec.join_schema),
+ left_fut: Arc::clone(&exec.left_fut),
+ random_state: exec.random_state.clone(),
+ mode: exec.mode,
+ metrics: exec.metrics.clone(),
+ projection: exec.projection.clone(),
+ column_indices: exec.column_indices.clone(),
+ null_equality: exec.null_equality,
+ null_aware: exec.null_aware,
+ cache: Arc::clone(&exec.cache),
+ dynamic_filter: exec.dynamic_filter.clone(),
+ fetch: exec.fetch,
+ },
+ preserve_properties: true,
}
}
}
@@ -706,7 +794,7 @@ impl EmbeddedProjection for HashJoinExec {
}
impl HashJoinExec {
- /// Tries to create a new [HashJoinExec].
+ /// Tries to create a new [`HashJoinExec`].
///
/// # Error
/// This function errors when it is not possible to join the left and
right sides on keys `on`.
@@ -731,6 +819,15 @@ impl HashJoinExec {
.build()
}
+ /// Create a builder based on the existing [`HashJoinExec`].
+ ///
+ /// Returned builder preserves all existing fields. If a field requiring
properties
+ /// recomputation is modified, this will be done automatically during the
node build.
+ ///
+ pub fn builder(&self) -> HashJoinExecBuilder {
+ self.into()
+ }
+
fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
// Extract the right-side keys (probe side keys) from the `on` clauses
// Dynamic filter will be created from build side values (left side)
and applied to probe side (right side)
@@ -844,9 +941,7 @@ impl HashJoinExec {
can_project(&self.schema(), projection.as_deref())?;
let projection =
combine_projections(projection.as_ref(),
self.projection.as_ref())?;
- HashJoinExecBuilder::from(self)
- .with_projection_ref(projection)
- .build()
+ self.builder().with_projection_ref(projection).build()
}
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
@@ -952,27 +1047,25 @@ impl HashJoinExec {
) -> Result<Arc<dyn ExecutionPlan>> {
let left = self.left();
let right = self.right();
- let new_join = HashJoinExecBuilder::new(
- Arc::clone(right),
- Arc::clone(left),
- self.on()
- .iter()
- .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
- .collect(),
- self.join_type().swap(),
- )
- .with_filter(self.filter().map(JoinFilter::swap))
- .with_projection(swap_join_projection(
- left.schema().fields().len(),
- right.schema().fields().len(),
- self.projection.as_deref(),
- self.join_type(),
- ))
- .with_partition_mode(partition_mode)
- .with_null_equality(self.null_equality())
- .with_null_aware(self.null_aware)
- .with_fetch(self.fetch)
- .build()?;
+ let new_join = self
+ .builder()
+ .with_type(self.join_type.swap())
+ .with_new_children(vec![Arc::clone(right), Arc::clone(left)])?
+ .with_on(
+ self.on()
+ .iter()
+ .map(|(l, r)| (Arc::clone(r), Arc::clone(l)))
+ .collect(),
+ )
+ .with_filter(self.filter().map(JoinFilter::swap))
+ .with_projection(swap_join_projection(
+ left.schema().fields().len(),
+ right.schema().fields().len(),
+ self.projection.as_deref(),
+ self.join_type(),
+ ))
+ .with_partition_mode(partition_mode)
+ .build()?;
// In case of anti / semi joins or if there is embedded projection in
HashJoinExec, output column order is preserved, no need to add projection again
if matches!(
self.join_type(),
@@ -1148,64 +1241,11 @@ impl ExecutionPlan for HashJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let cache = if has_same_children_properties(&self, &children)? {
- Arc::clone(&self.cache)
- } else {
- Arc::new(Self::compute_properties(
- &children[0],
- &children[1],
- &self.join_schema,
- self.join_type,
- &self.on,
- self.mode,
- self.projection.as_deref(),
- )?)
- };
-
- Ok(Arc::new(HashJoinExec {
- left: Arc::clone(&children[0]),
- right: Arc::clone(&children[1]),
- on: self.on.clone(),
- filter: self.filter.clone(),
- join_type: self.join_type,
- join_schema: Arc::clone(&self.join_schema),
- left_fut: Arc::clone(&self.left_fut),
- random_state: self.random_state.clone(),
- mode: self.mode,
- metrics: ExecutionPlanMetricsSet::new(),
- projection: self.projection.clone(),
- column_indices: self.column_indices.clone(),
- null_equality: self.null_equality,
- null_aware: self.null_aware,
- cache,
- // Keep the dynamic filter, bounds accumulator will be reset
- dynamic_filter: self.dynamic_filter.clone(),
- fetch: self.fetch,
- }))
+ self.builder().with_new_children(children)?.build_exec()
}
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(HashJoinExec {
- left: Arc::clone(&self.left),
- right: Arc::clone(&self.right),
- on: self.on.clone(),
- filter: self.filter.clone(),
- join_type: self.join_type,
- join_schema: Arc::clone(&self.join_schema),
- // Reset the left_fut to allow re-execution
- left_fut: Arc::new(OnceAsync::default()),
- random_state: self.random_state.clone(),
- mode: self.mode,
- metrics: ExecutionPlanMetricsSet::new(),
- projection: self.projection.clone(),
- column_indices: self.column_indices.clone(),
- null_equality: self.null_equality,
- null_aware: self.null_aware,
- cache: Arc::clone(&self.cache),
- // Reset dynamic filter and bounds accumulator to initial state
- dynamic_filter: None,
- fetch: self.fetch,
- }))
+ self.builder().reset_state().build_exec()
}
fn execute(
@@ -1423,22 +1463,17 @@ impl ExecutionPlan for HashJoinExec {
&schema,
self.filter(),
)? {
- Ok(Some(Arc::new(
- HashJoinExecBuilder::new(
+ self.builder()
+ .with_new_children(vec![
Arc::new(projected_left_child),
Arc::new(projected_right_child),
- join_on,
- *self.join_type(),
- )
+ ])?
+ .with_on(join_on)
.with_filter(join_filter)
// Returned early if projection is not None
.with_projection(None)
- .with_partition_mode(*self.partition_mode())
- .with_null_equality(self.null_equality)
- .with_null_aware(self.null_aware)
- .with_fetch(self.fetch)
- .build()?,
- )))
+ .build_exec()
+ .map(Some)
} else {
try_embed_projection(projection, self)
}
@@ -1584,29 +1619,14 @@ impl ExecutionPlan for HashJoinExec {
Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
{
// We successfully pushed down our self filter - we need to
make a new node with the dynamic filter
- let new_node = Arc::new(HashJoinExec {
- left: Arc::clone(&self.left),
- right: Arc::clone(&self.right),
- on: self.on.clone(),
- filter: self.filter.clone(),
- join_type: self.join_type,
- join_schema: Arc::clone(&self.join_schema),
- left_fut: Arc::clone(&self.left_fut),
- random_state: self.random_state.clone(),
- mode: self.mode,
- metrics: ExecutionPlanMetricsSet::new(),
- projection: self.projection.clone(),
- column_indices: self.column_indices.clone(),
- null_equality: self.null_equality,
- null_aware: self.null_aware,
- cache: Arc::clone(&self.cache),
- dynamic_filter: Some(HashJoinExecDynamicFilter {
+ let new_node = self
+ .builder()
+ .with_dynamic_filter(Some(HashJoinExecDynamicFilter {
filter: dynamic_filter,
build_accumulator: OnceLock::new(),
- }),
- fetch: self.fetch,
- });
- result = result.with_updated_node(new_node as Arc<dyn
ExecutionPlan>);
+ }))
+ .build_exec()?;
+ result = result.with_updated_node(new_node);
}
}
Ok(result)
@@ -1624,7 +1644,7 @@ impl ExecutionPlan for HashJoinExec {
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn
ExecutionPlan>> {
- HashJoinExecBuilder::from(self)
+ self.builder()
.with_fetch(limit)
.build()
.ok()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]