This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 03a6a9ffc Remove the schema checking when creating `CrossJoinExec`
(#4432)
03a6a9ffc is described below
commit 03a6a9ffc8d652b28f7b4e56297ab119510c3362
Author: Remzi Yang <[email protected]>
AuthorDate: Fri Dec 9 19:50:28 2022 +0800
Remove the schema checking when creating `CrossJoinExec` (#4432)
* remove
Signed-off-by: remzi <[email protected]>
* clean
Signed-off-by: remzi <[email protected]>
Signed-off-by: remzi <[email protected]>
---
.../core/src/physical_optimizer/join_selection.rs | 2 +-
.../core/src/physical_plan/joins/cross_join.rs | 39 +++++++++-------------
datafusion/core/src/physical_plan/planner.rs | 2 +-
3 files changed, 17 insertions(+), 26 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs
b/datafusion/core/src/physical_optimizer/join_selection.rs
index 7b9873c65..428cd8de3 100644
--- a/datafusion/core/src/physical_optimizer/join_selection.rs
+++ b/datafusion/core/src/physical_optimizer/join_selection.rs
@@ -258,7 +258,7 @@ impl PhysicalOptimizerRule for JoinSelection {
let right = cross_join.right();
if should_swap_join_order(&**left, &**right) {
let new_join =
- CrossJoinExec::try_new(Arc::clone(right),
Arc::clone(left))?;
+ CrossJoinExec::new(Arc::clone(right),
Arc::clone(left));
// TODO avoid adding ProjectionExec again and again, only
adding Final Projection
let proj = ProjectionExec::try_new(
swap_reverting_projection(&left.schema(),
&right.schema()),
diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs
b/datafusion/core/src/physical_plan/joins/cross_join.rs
index 170153e07..1b43b6096 100644
--- a/datafusion/core/src/physical_plan/joins/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -39,8 +39,8 @@ use log::debug;
use std::time::Instant;
use super::utils::{
- adjust_right_output_partitioning, check_join_is_valid,
- cross_join_equivalence_properties, OnceAsync, OnceFut,
+ adjust_right_output_partitioning, cross_join_equivalence_properties,
OnceAsync,
+ OnceFut,
};
/// Data of the left side
@@ -61,34 +61,25 @@ pub struct CrossJoinExec {
}
impl CrossJoinExec {
- /// Tries to create a new [CrossJoinExec].
- /// # Error
- /// This function errors when left and right schema's can't be combined
- pub fn try_new(
- left: Arc<dyn ExecutionPlan>,
- right: Arc<dyn ExecutionPlan>,
- ) -> Result<Self> {
- let left_schema = left.schema();
- let right_schema = right.schema();
- check_join_is_valid(&left_schema, &right_schema, &[])?;
-
- let left_schema = left.schema();
- let left_fields = left_schema.fields().iter();
- let right_schema = right.schema();
-
- let right_fields = right_schema.fields().iter();
-
+ /// Create a new [CrossJoinExec].
+ pub fn new(left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>) ->
Self {
// left then right
- let all_columns = left_fields.chain(right_fields).cloned().collect();
+ let all_columns = {
+ let left_schema = left.schema();
+ let right_schema = right.schema();
+ let left_fields = left_schema.fields().iter();
+ let right_fields = right_schema.fields().iter();
+ left_fields.chain(right_fields).cloned().collect()
+ };
let schema = Arc::new(Schema::new(all_columns));
- Ok(CrossJoinExec {
+ CrossJoinExec {
left,
right,
schema,
left_fut: Default::default(),
- })
+ }
}
/// left (build) side which gets loaded in memory
@@ -156,10 +147,10 @@ impl ExecutionPlan for CrossJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(CrossJoinExec::try_new(
+ Ok(Arc::new(CrossJoinExec::new(
children[0].clone(),
children[1].clone(),
- )?))
+ )))
}
fn required_input_distribution(&self) -> Vec<Distribution> {
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index bbfa1b6e1..5b39d71f0 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -994,7 +994,7 @@ impl DefaultPhysicalPlanner {
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
let left = self.create_initial_plan(left,
session_state).await?;
let right = self.create_initial_plan(right,
session_state).await?;
- Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
+ Ok(Arc::new(CrossJoinExec::new(left, right)))
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {