This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new ff8618a87 Consolidate physical join code into
`datafusion/core/src/physical_plan/joins` (#3942)
ff8618a87 is described below
commit ff8618a8720b72c581d5e5530f8027b70ce43439
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Oct 24 17:24:52 2022 -0400
Consolidate physical join code into
`datafusion/core/src/physical_plan/joins` (#3942)
* Consolidate physical join code into
`datafusion/core/src/physical_plan/joins`
* Update
* Update `use` paths
* Add RAT
---
.../src/physical_optimizer/coalesce_batches.rs | 5 ++-
.../physical_optimizer/hash_build_probe_order.rs | 9 ++---
.../src/physical_plan/{ => joins}/cross_join.rs | 19 +++++------
.../src/physical_plan/{ => joins}/hash_join.rs | 34 +++++++------------
datafusion/core/src/physical_plan/joins/mod.rs | 38 ++++++++++++++++++++++
.../physical_plan/{ => joins}/sort_merge_join.rs | 8 +++--
.../{join_utils.rs => joins/utils.rs} | 2 +-
datafusion/core/src/physical_plan/mod.rs | 5 +--
datafusion/core/src/physical_plan/planner.rs | 8 ++---
datafusion/core/tests/join_fuzz.rs | 3 +-
10 files changed, 77 insertions(+), 54 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 51d56d28d..9f7e22dbb 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -22,9 +22,8 @@ use crate::{
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
- coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
- hash_join::HashJoinExec, repartition::RepartitionExec,
- with_new_children_if_necessary,
+ coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
joins::HashJoinExec,
+ repartition::RepartitionExec, with_new_children_if_necessary,
},
};
use std::sync::Arc;
diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
index 66dfc6e69..6817001d3 100644
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
@@ -22,10 +22,11 @@ use arrow::datatypes::Schema;
use crate::execution::context::SessionConfig;
use crate::logical_expr::JoinType;
-use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::expressions::Column;
-use crate::physical_plan::hash_join::HashJoinExec;
-use crate::physical_plan::join_utils::{ColumnIndex, JoinFilter, JoinSide};
+use crate::physical_plan::joins::{
+ utils::{ColumnIndex, JoinFilter, JoinSide},
+ CrossJoinExec, HashJoinExec,
+};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
@@ -197,7 +198,7 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
mod tests {
use crate::{
physical_plan::{
- displayable, hash_join::PartitionMode, ColumnStatistics,
Statistics,
+ displayable, joins::PartitionMode, ColumnStatistics, Statistics,
},
test::exec::StatisticsExec,
};
diff --git a/datafusion/core/src/physical_plan/cross_join.rs
b/datafusion/core/src/physical_plan/joins/cross_join.rs
similarity index 97%
rename from datafusion/core/src/physical_plan/cross_join.rs
rename to datafusion/core/src/physical_plan/joins/cross_join.rs
index e3f25fc56..7a35116a4 100644
--- a/datafusion/core/src/physical_plan/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -26,22 +26,19 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
-use super::expressions::PhysicalSortExpr;
-use super::{
- coalesce_partitions::CoalescePartitionsExec,
join_utils::check_join_is_valid,
- ColumnStatistics, Statistics,
+use crate::execution::context::TaskContext;
+use crate::physical_plan::{
+ coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
+ ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
};
use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
+use datafusion_physical_expr::PhysicalSortExpr;
+use log::debug;
use std::time::Instant;
-use super::{
- coalesce_batches::concat_batches, DisplayFormatType, ExecutionPlan,
Partitioning,
- RecordBatchStream, SendableRecordBatchStream,
-};
-use crate::execution::context::TaskContext;
-use crate::physical_plan::join_utils::{OnceAsync, OnceFut};
-use log::debug;
+use super::utils::{check_join_is_valid, OnceAsync, OnceFut};
/// Data of the left side
type JoinLeftData = RecordBatch;
diff --git a/datafusion/core/src/physical_plan/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/hash_join.rs
rename to datafusion/core/src/physical_plan/joins/hash_join.rs
index 972f432cc..ff036b78b 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -55,33 +55,32 @@ use arrow::array::{
use hashbrown::raw::RawTable;
-use super::{
+use crate::physical_plan::{
+ coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
+ expressions::Column,
expressions::PhysicalSortExpr,
- join_utils::{
+ hash_utils::create_hashes,
+ joins::utils::{
build_join_schema, check_join_is_valid, estimate_join_statistics,
ColumnIndex,
JoinFilter, JoinOn, JoinSide,
},
-};
-use super::{
- expressions::Column,
metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+ DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
};
-use super::{hash_utils::create_hashes, Statistics};
+
use crate::error::{DataFusionError, Result};
use crate::logical_expr::JoinType;
-use super::{
- DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
- SendableRecordBatchStream,
-};
use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
use crate::execution::context::TaskContext;
-use crate::physical_plan::coalesce_batches::concat_batches;
-use crate::physical_plan::PhysicalExpr;
-use crate::physical_plan::join_utils::{OnceAsync, OnceFut};
+use super::{
+ utils::{OnceAsync, OnceFut},
+ PartitionMode,
+};
use log::debug;
use std::cmp;
use std::fmt;
@@ -182,15 +181,6 @@ impl HashJoinMetrics {
}
}
-#[derive(Clone, Copy, Debug, PartialEq, Eq)]
-/// Partitioning mode to use for hash join
-pub enum PartitionMode {
- /// Left/right children are partitioned using the left and right keys
- Partitioned,
- /// Left side will collected into one partition
- CollectLeft,
-}
-
impl HashJoinExec {
/// Tries to create a new [HashJoinExec].
/// # Error
diff --git a/datafusion/core/src/physical_plan/joins/mod.rs
b/datafusion/core/src/physical_plan/joins/mod.rs
new file mode 100644
index 000000000..ae8f943af
--- /dev/null
+++ b/datafusion/core/src/physical_plan/joins/mod.rs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataFusion Join implementations
+
+mod cross_join;
+mod hash_join;
+mod sort_merge_join;
+pub mod utils;
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+/// Partitioning mode to use for hash join
+pub enum PartitionMode {
+ /// Left/right children are partitioned using the left and right keys
+ Partitioned,
+ /// Left side will collected into one partition
+ CollectLeft,
+}
+
+pub use cross_join::CrossJoinExec;
+pub use hash_join::HashJoinExec;
+
+// Note: SortMergeJoin is not used in plans yet
+pub use sort_merge_join::SortMergeJoinExec;
diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/sort_merge_join.rs
rename to datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 29da01a14..3de712745 100644
--- a/datafusion/core/src/physical_plan/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -42,7 +42,9 @@ use crate::logical_expr::JoinType;
use crate::physical_plan::common::combine_batches;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::join_utils::{build_join_schema, check_join_is_valid,
JoinOn};
+use crate::physical_plan::joins::utils::{
+ build_join_schema, check_join_is_valid, JoinOn,
+};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder,
MetricsSet};
use crate::physical_plan::{
metrics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
@@ -1198,9 +1200,9 @@ mod tests {
use crate::error::Result;
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
- use crate::physical_plan::join_utils::JoinOn;
+ use crate::physical_plan::joins::utils::JoinOn;
+ use crate::physical_plan::joins::SortMergeJoinExec;
use crate::physical_plan::memory::MemoryExec;
- use crate::physical_plan::sort_merge_join::SortMergeJoinExec;
use crate::physical_plan::{common, ExecutionPlan};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_i32, columns};
diff --git a/datafusion/core/src/physical_plan/join_utils.rs
b/datafusion/core/src/physical_plan/joins/utils.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/join_utils.rs
rename to datafusion/core/src/physical_plan/joins/utils.rs
index 4ce72ccc2..f937dc1c4 100644
--- a/datafusion/core/src/physical_plan/join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -33,7 +33,7 @@ use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
-use super::{ColumnStatistics, ExecutionPlan, Statistics};
+use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
/// The on clause of the join, as vector of (left, right) columns.
pub type JoinOn = Vec<(Column, Column)>;
diff --git a/datafusion/core/src/physical_plan/mod.rs
b/datafusion/core/src/physical_plan/mod.rs
index b2f6c40f5..9e36c3ec8 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -520,22 +520,19 @@ pub mod analyze;
pub mod coalesce_batches;
pub mod coalesce_partitions;
pub mod common;
-pub mod cross_join;
pub mod display;
pub mod empty;
pub mod explain;
pub mod file_format;
pub mod filter;
-pub mod hash_join;
pub mod hash_utils;
-pub mod join_utils;
+pub mod joins;
pub mod limit;
pub mod memory;
pub mod metrics;
pub mod planner;
pub mod projection;
pub mod repartition;
-pub mod sort_merge_join;
pub mod sorts;
pub mod stream;
pub mod udaf;
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index 4a8399af9..c209fd150 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -20,7 +20,7 @@
use super::analyze::AnalyzeExec;
use super::sorts::sort_preserving_merge::SortPreservingMergeExec;
use super::{
- aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf,
union::UnionExec,
+ aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
};
use crate::config::{OPT_EXPLAIN_LOGICAL_PLAN_ONLY,
OPT_EXPLAIN_PHYSICAL_PLAN_ONLY};
@@ -39,17 +39,17 @@ use crate::logical_expr::{Limit, Values};
use crate::physical_expr::create_physical_expr;
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
-use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
-use crate::physical_plan::hash_join::HashJoinExec;
+use crate::physical_plan::joins::CrossJoinExec;
+use crate::physical_plan::joins::HashJoinExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
-use crate::physical_plan::{join_utils, Partitioning};
+use crate::physical_plan::{joins::utils as join_utils, Partitioning};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr,
WindowExpr};
use crate::{
error::{DataFusionError, Result},
diff --git a/datafusion/core/tests/join_fuzz.rs
b/datafusion/core/tests/join_fuzz.rs
index 9e402896c..c5111a075 100644
--- a/datafusion/core/tests/join_fuzz.rs
+++ b/datafusion/core/tests/join_fuzz.rs
@@ -26,9 +26,8 @@ use rand::{Rng, SeedableRng};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::expressions::Column;
-use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
+use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode,
SortMergeJoinExec};
use datafusion::physical_plan::memory::MemoryExec;
-use datafusion::physical_plan::sort_merge_join::SortMergeJoinExec;
use datafusion_expr::JoinType;
use datafusion::prelude::{SessionConfig, SessionContext};