This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new ff8618a87 Consolidate physical join code into 
`datafusion/core/src/physical_plan/joins` (#3942)
ff8618a87 is described below

commit ff8618a8720b72c581d5e5530f8027b70ce43439
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Oct 24 17:24:52 2022 -0400

    Consolidate physical join code into 
`datafusion/core/src/physical_plan/joins` (#3942)
    
    * Consolidate physical join code into 
`datafusion/core/src/physical_plan/joins`
    
    * Update
    
    * Update `use` paths
    
    * Add RAT
---
 .../src/physical_optimizer/coalesce_batches.rs     |  5 ++-
 .../physical_optimizer/hash_build_probe_order.rs   |  9 ++---
 .../src/physical_plan/{ => joins}/cross_join.rs    | 19 +++++------
 .../src/physical_plan/{ => joins}/hash_join.rs     | 34 +++++++------------
 datafusion/core/src/physical_plan/joins/mod.rs     | 38 ++++++++++++++++++++++
 .../physical_plan/{ => joins}/sort_merge_join.rs   |  8 +++--
 .../{join_utils.rs => joins/utils.rs}              |  2 +-
 datafusion/core/src/physical_plan/mod.rs           |  5 +--
 datafusion/core/src/physical_plan/planner.rs       |  8 ++---
 datafusion/core/tests/join_fuzz.rs                 |  3 +-
 10 files changed, 77 insertions(+), 54 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs 
b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 51d56d28d..9f7e22dbb 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -22,9 +22,8 @@ use crate::{
     error::Result,
     physical_optimizer::PhysicalOptimizerRule,
     physical_plan::{
-        coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
-        hash_join::HashJoinExec, repartition::RepartitionExec,
-        with_new_children_if_necessary,
+        coalesce_batches::CoalesceBatchesExec, filter::FilterExec, 
joins::HashJoinExec,
+        repartition::RepartitionExec, with_new_children_if_necessary,
     },
 };
 use std::sync::Arc;
diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs 
b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
index 66dfc6e69..6817001d3 100644
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
@@ -22,10 +22,11 @@ use arrow::datatypes::Schema;
 
 use crate::execution::context::SessionConfig;
 use crate::logical_expr::JoinType;
-use crate::physical_plan::cross_join::CrossJoinExec;
 use crate::physical_plan::expressions::Column;
-use crate::physical_plan::hash_join::HashJoinExec;
-use crate::physical_plan::join_utils::{ColumnIndex, JoinFilter, JoinSide};
+use crate::physical_plan::joins::{
+    utils::{ColumnIndex, JoinFilter, JoinSide},
+    CrossJoinExec, HashJoinExec,
+};
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
 
@@ -197,7 +198,7 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
 mod tests {
     use crate::{
         physical_plan::{
-            displayable, hash_join::PartitionMode, ColumnStatistics, 
Statistics,
+            displayable, joins::PartitionMode, ColumnStatistics, Statistics,
         },
         test::exec::StatisticsExec,
     };
diff --git a/datafusion/core/src/physical_plan/cross_join.rs 
b/datafusion/core/src/physical_plan/joins/cross_join.rs
similarity index 97%
rename from datafusion/core/src/physical_plan/cross_join.rs
rename to datafusion/core/src/physical_plan/joins/cross_join.rs
index e3f25fc56..7a35116a4 100644
--- a/datafusion/core/src/physical_plan/cross_join.rs
+++ b/datafusion/core/src/physical_plan/joins/cross_join.rs
@@ -26,22 +26,19 @@ use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
-use super::expressions::PhysicalSortExpr;
-use super::{
-    coalesce_partitions::CoalescePartitionsExec, 
join_utils::check_join_is_valid,
-    ColumnStatistics, Statistics,
+use crate::execution::context::TaskContext;
+use crate::physical_plan::{
+    coalesce_batches::concat_batches, 
coalesce_partitions::CoalescePartitionsExec,
+    ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
+    SendableRecordBatchStream, Statistics,
 };
 use crate::{error::Result, scalar::ScalarValue};
 use async_trait::async_trait;
+use datafusion_physical_expr::PhysicalSortExpr;
+use log::debug;
 use std::time::Instant;
 
-use super::{
-    coalesce_batches::concat_batches, DisplayFormatType, ExecutionPlan, 
Partitioning,
-    RecordBatchStream, SendableRecordBatchStream,
-};
-use crate::execution::context::TaskContext;
-use crate::physical_plan::join_utils::{OnceAsync, OnceFut};
-use log::debug;
+use super::utils::{check_join_is_valid, OnceAsync, OnceFut};
 
 /// Data of the left side
 type JoinLeftData = RecordBatch;
diff --git a/datafusion/core/src/physical_plan/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/hash_join.rs
rename to datafusion/core/src/physical_plan/joins/hash_join.rs
index 972f432cc..ff036b78b 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -55,33 +55,32 @@ use arrow::array::{
 
 use hashbrown::raw::RawTable;
 
-use super::{
+use crate::physical_plan::{
+    coalesce_batches::concat_batches,
     coalesce_partitions::CoalescePartitionsExec,
+    expressions::Column,
     expressions::PhysicalSortExpr,
-    join_utils::{
+    hash_utils::create_hashes,
+    joins::utils::{
         build_join_schema, check_join_is_valid, estimate_join_statistics, 
ColumnIndex,
         JoinFilter, JoinOn, JoinSide,
     },
-};
-use super::{
-    expressions::Column,
     metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
+    DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, 
RecordBatchStream,
+    SendableRecordBatchStream, Statistics,
 };
-use super::{hash_utils::create_hashes, Statistics};
+
 use crate::error::{DataFusionError, Result};
 use crate::logical_expr::JoinType;
 
-use super::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
-    SendableRecordBatchStream,
-};
 use crate::arrow::array::BooleanBufferBuilder;
 use crate::arrow::datatypes::TimeUnit;
 use crate::execution::context::TaskContext;
-use crate::physical_plan::coalesce_batches::concat_batches;
-use crate::physical_plan::PhysicalExpr;
 
-use crate::physical_plan::join_utils::{OnceAsync, OnceFut};
+use super::{
+    utils::{OnceAsync, OnceFut},
+    PartitionMode,
+};
 use log::debug;
 use std::cmp;
 use std::fmt;
@@ -182,15 +181,6 @@ impl HashJoinMetrics {
     }
 }
 
-#[derive(Clone, Copy, Debug, PartialEq, Eq)]
-/// Partitioning mode to use for hash join
-pub enum PartitionMode {
-    /// Left/right children are partitioned using the left and right keys
-    Partitioned,
-    /// Left side will collected into one partition
-    CollectLeft,
-}
-
 impl HashJoinExec {
     /// Tries to create a new [HashJoinExec].
     /// # Error
diff --git a/datafusion/core/src/physical_plan/joins/mod.rs 
b/datafusion/core/src/physical_plan/joins/mod.rs
new file mode 100644
index 000000000..ae8f943af
--- /dev/null
+++ b/datafusion/core/src/physical_plan/joins/mod.rs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! DataFusion Join implementations
+
+mod cross_join;
+mod hash_join;
+mod sort_merge_join;
+pub mod utils;
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+/// Partitioning mode to use for hash join
+pub enum PartitionMode {
+    /// Left/right children are partitioned using the left and right keys
+    Partitioned,
+    /// Left side will collected into one partition
+    CollectLeft,
+}
+
+pub use cross_join::CrossJoinExec;
+pub use hash_join::HashJoinExec;
+
+// Note: SortMergeJoin is not used in plans yet
+pub use sort_merge_join::SortMergeJoinExec;
diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs 
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/sort_merge_join.rs
rename to datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 29da01a14..3de712745 100644
--- a/datafusion/core/src/physical_plan/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -42,7 +42,9 @@ use crate::logical_expr::JoinType;
 use crate::physical_plan::common::combine_batches;
 use crate::physical_plan::expressions::Column;
 use crate::physical_plan::expressions::PhysicalSortExpr;
-use crate::physical_plan::join_utils::{build_join_schema, check_join_is_valid, 
JoinOn};
+use crate::physical_plan::joins::utils::{
+    build_join_schema, check_join_is_valid, JoinOn,
+};
 use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, 
MetricsSet};
 use crate::physical_plan::{
     metrics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
@@ -1198,9 +1200,9 @@ mod tests {
     use crate::error::Result;
     use crate::logical_expr::JoinType;
     use crate::physical_plan::expressions::Column;
-    use crate::physical_plan::join_utils::JoinOn;
+    use crate::physical_plan::joins::utils::JoinOn;
+    use crate::physical_plan::joins::SortMergeJoinExec;
     use crate::physical_plan::memory::MemoryExec;
-    use crate::physical_plan::sort_merge_join::SortMergeJoinExec;
     use crate::physical_plan::{common, ExecutionPlan};
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::{build_table_i32, columns};
diff --git a/datafusion/core/src/physical_plan/join_utils.rs 
b/datafusion/core/src/physical_plan/joins/utils.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/join_utils.rs
rename to datafusion/core/src/physical_plan/joins/utils.rs
index 4ce72ccc2..f937dc1c4 100644
--- a/datafusion/core/src/physical_plan/join_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -33,7 +33,7 @@ use std::future::Future;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use super::{ColumnStatistics, ExecutionPlan, Statistics};
+use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};
 
 /// The on clause of the join, as vector of (left, right) columns.
 pub type JoinOn = Vec<(Column, Column)>;
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index b2f6c40f5..9e36c3ec8 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -520,22 +520,19 @@ pub mod analyze;
 pub mod coalesce_batches;
 pub mod coalesce_partitions;
 pub mod common;
-pub mod cross_join;
 pub mod display;
 pub mod empty;
 pub mod explain;
 pub mod file_format;
 pub mod filter;
-pub mod hash_join;
 pub mod hash_utils;
-pub mod join_utils;
+pub mod joins;
 pub mod limit;
 pub mod memory;
 pub mod metrics;
 pub mod planner;
 pub mod projection;
 pub mod repartition;
-pub mod sort_merge_join;
 pub mod sorts;
 pub mod stream;
 pub mod udaf;
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 4a8399af9..c209fd150 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -20,7 +20,7 @@
 use super::analyze::AnalyzeExec;
 use super::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use super::{
-    aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, 
union::UnionExec,
+    aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
     values::ValuesExec, windows,
 };
 use crate::config::{OPT_EXPLAIN_LOGICAL_PLAN_ONLY, 
OPT_EXPLAIN_PHYSICAL_PLAN_ONLY};
@@ -39,17 +39,17 @@ use crate::logical_expr::{Limit, Values};
 use crate::physical_expr::create_physical_expr;
 use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
-use crate::physical_plan::cross_join::CrossJoinExec;
 use crate::physical_plan::explain::ExplainExec;
 use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
 use crate::physical_plan::filter::FilterExec;
-use crate::physical_plan::hash_join::HashJoinExec;
+use crate::physical_plan::joins::CrossJoinExec;
+use crate::physical_plan::joins::HashJoinExec;
 use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use crate::physical_plan::projection::ProjectionExec;
 use crate::physical_plan::repartition::RepartitionExec;
 use crate::physical_plan::sorts::sort::SortExec;
 use crate::physical_plan::windows::WindowAggExec;
-use crate::physical_plan::{join_utils, Partitioning};
+use crate::physical_plan::{joins::utils as join_utils, Partitioning};
 use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, 
WindowExpr};
 use crate::{
     error::{DataFusionError, Result},
diff --git a/datafusion/core/tests/join_fuzz.rs 
b/datafusion/core/tests/join_fuzz.rs
index 9e402896c..c5111a075 100644
--- a/datafusion/core/tests/join_fuzz.rs
+++ b/datafusion/core/tests/join_fuzz.rs
@@ -26,9 +26,8 @@ use rand::{Rng, SeedableRng};
 
 use datafusion::physical_plan::collect;
 use datafusion::physical_plan::expressions::Column;
-use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
+use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode, 
SortMergeJoinExec};
 use datafusion::physical_plan::memory::MemoryExec;
-use datafusion::physical_plan::sort_merge_join::SortMergeJoinExec;
 use datafusion_expr::JoinType;
 
 use datafusion::prelude::{SessionConfig, SessionContext};

Reply via email to