This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e57dd22720 [MINOR]: Move joinside struct to common (#7908)
e57dd22720 is described below
commit e57dd22720249dc4e4e0c9666596df27ea8c7968
Author: Mustafa Akur <[email protected]>
AuthorDate: Mon Oct 23 16:15:38 2023 +0300
[MINOR]: Move joinside struct to common (#7908)
* Move joinside struct to common
* Update documentation
---
datafusion/common/src/join_type.rs | 31 +++++++++++++++++++++-
datafusion/common/src/lib.rs | 2 +-
.../core/src/physical_optimizer/sort_pushdown.rs | 6 +++--
datafusion/physical-plan/src/joins/hash_join.rs | 15 ++++-------
.../physical-plan/src/joins/hash_join_utils.rs | 6 ++---
.../physical-plan/src/joins/nested_loop_join.rs | 5 ++--
.../physical-plan/src/joins/sort_merge_join.rs | 4 +--
.../physical-plan/src/joins/symmetric_hash_join.rs | 6 +++--
datafusion/physical-plan/src/joins/utils.rs | 31 +---------------------
datafusion/proto/src/physical_plan/from_proto.rs | 3 +--
datafusion/proto/src/physical_plan/to_proto.rs | 3 +--
11 files changed, 54 insertions(+), 58 deletions(-)
diff --git a/datafusion/common/src/join_type.rs
b/datafusion/common/src/join_type.rs
index 8d4657f1dc..0a00a57ba4 100644
--- a/datafusion/common/src/join_type.rs
+++ b/datafusion/common/src/join_type.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! [`JoinType`] and [`JoinConstraint`]
+//! Defines the [`JoinType`], [`JoinConstraint`] and [`JoinSide`] types.
use std::{
fmt::{self, Display, Formatter},
@@ -95,3 +95,32 @@ pub enum JoinConstraint {
/// Join USING
Using,
}
+
+impl Display for JoinSide {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ JoinSide::Left => write!(f, "left"),
+ JoinSide::Right => write!(f, "right"),
+ }
+ }
+}
+
+/// Join side.
+/// Stores the referred table side during calculations
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum JoinSide {
+ /// Left side of the join
+ Left,
+ /// Right side of the join
+ Right,
+}
+
+impl JoinSide {
+ /// Inverse the join side
+ pub fn negate(&self) -> Self {
+ match self {
+ JoinSide::Left => JoinSide::Right,
+ JoinSide::Right => JoinSide::Left,
+ }
+ }
+}
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index a939cf73dc..53c3cfddff 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -57,7 +57,7 @@ pub use functional_dependencies::{
aggregate_functional_dependencies, get_target_functional_dependencies,
Constraint,
Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
};
-pub use join_type::{JoinConstraint, JoinType};
+pub use join_type::{JoinConstraint, JoinSide, JoinType};
pub use scalar::{ScalarType, ScalarValue};
pub use schema_reference::{OwnedSchemaReference, SchemaReference};
pub use stats::{ColumnStatistics, Statistics};
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index bb991115ec..9b81ad3efb 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -21,7 +21,7 @@ use crate::physical_optimizer::utils::{
add_sort_above, is_limit, is_sort_preserving_merge, is_union, is_window,
};
use crate::physical_plan::filter::FilterExec;
-use crate::physical_plan::joins::utils::{calculate_join_output_ordering,
JoinSide};
+use crate::physical_plan::joins::utils::calculate_join_output_ordering;
use crate::physical_plan::joins::{HashJoinExec, SortMergeJoinExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
@@ -29,7 +29,9 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
-use datafusion_common::{plan_datafusion_err, plan_err, DataFusionError,
Result};
+use datafusion_common::{
+ plan_datafusion_err, plan_err, DataFusionError, JoinSide, Result,
+};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index 4fd8882c01..2ffa1f61a2 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -27,7 +27,7 @@ use std::{any::Any, usize, vec};
use crate::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices,
calculate_join_output_ordering,
combine_join_ordering_equivalence_properties,
- get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
+ get_final_indices_from_bit_map, need_produce_result_in_final,
};
use crate::DisplayAs;
use crate::{
@@ -64,7 +64,7 @@ use arrow::util::bit_util;
use arrow_array::cast::downcast_array;
use arrow_schema::ArrowError;
use datafusion_common::{
- exec_err, internal_err, plan_err, DataFusionError, JoinType, Result,
+ exec_err, internal_err, plan_err, DataFusionError, JoinSide, JoinType,
Result,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
@@ -1069,14 +1069,9 @@ mod tests {
use hashbrown::raw::RawTable;
use crate::{
- common,
- expressions::Column,
- hash_utils::create_hashes,
- joins::{hash_join::build_equal_condition_join_indices,
utils::JoinSide},
- memory::MemoryExec,
- repartition::RepartitionExec,
- test::build_table_i32,
- test::exec::MockExec,
+ common, expressions::Column, hash_utils::create_hashes,
+ joins::hash_join::build_equal_condition_join_indices,
memory::MemoryExec,
+ repartition::RepartitionExec, test::build_table_i32,
test::exec::MockExec,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs
b/datafusion/physical-plan/src/joins/hash_join_utils.rs
index 525c1a7145..3a2a85c727 100644
--- a/datafusion/physical-plan/src/joins/hash_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs
@@ -24,14 +24,14 @@ use std::ops::IndexMut;
use std::sync::Arc;
use std::{fmt, usize};
-use crate::joins::utils::{JoinFilter, JoinSide};
+use crate::joins::utils::JoinFilter;
use arrow::compute::concat_batches;
use arrow::datatypes::{ArrowNativeType, SchemaRef};
use arrow_array::builder::BooleanBufferBuilder;
use arrow_array::{ArrowPrimitiveType, NativeAdapter, PrimitiveArray,
RecordBatch};
use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{DataFusionError, JoinSide, Result, ScalarValue};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::intervals::{Interval, IntervalBound};
use datafusion_physical_expr::utils::collect_columns;
@@ -732,7 +732,7 @@ pub mod tests {
use crate::{
expressions::Column,
expressions::PhysicalSortExpr,
- joins::utils::{ColumnIndex, JoinFilter, JoinSide},
+ joins::utils::{ColumnIndex, JoinFilter},
};
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 944efb47f4..25cb374e94 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -31,7 +31,7 @@ use crate::joins::utils::{
estimate_join_statistics, get_anti_indices, get_anti_u64_indices,
get_final_indices_from_bit_map, get_semi_indices, get_semi_u64_indices,
partitioned_join_output_partitioning, BuildProbeJoinMetrics, ColumnIndex,
JoinFilter,
- JoinSide, OnceAsync, OnceFut,
+ OnceAsync, OnceFut,
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::{
@@ -45,7 +45,7 @@ use arrow::array::{
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
-use datafusion_common::{exec_err, DataFusionError, Result, Statistics};
+use datafusion_common::{exec_err, DataFusionError, JoinSide, Result,
Statistics};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_expr::JoinType;
@@ -743,7 +743,6 @@ mod tests {
use std::sync::Arc;
use super::*;
- use crate::joins::utils::JoinSide;
use crate::{
common, expressions::Column, memory::MemoryExec,
repartition::RepartitionExec,
test::build_table_i32,
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 61ac864bf2..98fe751b22 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -34,7 +34,7 @@ use crate::expressions::{Column, PhysicalSortExpr};
use crate::joins::utils::{
build_join_schema, calculate_join_output_ordering, check_join_is_valid,
combine_join_equivalence_properties,
combine_join_ordering_equivalence_properties,
- estimate_join_statistics, partitioned_join_output_partitioning, JoinOn,
JoinSide,
+ estimate_join_statistics, partitioned_join_output_partitioning, JoinOn,
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use crate::{
@@ -49,7 +49,7 @@ use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_common::{
- internal_err, not_impl_err, plan_err, DataFusionError, JoinType, Result,
+ internal_err, not_impl_err, plan_err, DataFusionError, JoinSide, JoinType,
Result,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 64128b6e3a..3450331133 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -43,7 +43,7 @@ use crate::joins::hash_join_utils::{
use crate::joins::utils::{
build_batch_from_indices, build_join_schema, check_join_is_valid,
combine_join_equivalence_properties, partitioned_join_output_partitioning,
- prepare_sorted_exprs, ColumnIndex, JoinFilter, JoinOn, JoinSide,
+ prepare_sorted_exprs, ColumnIndex, JoinFilter, JoinOn,
};
use crate::{
expressions::{Column, PhysicalSortExpr},
@@ -58,7 +58,9 @@ use arrow::compute::concat_batches;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::bisect;
-use datafusion_common::{internal_err, plan_err, DataFusionError, JoinType,
Result};
+use datafusion_common::{
+ internal_err, plan_err, DataFusionError, JoinSide, JoinType, Result,
+};
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::intervals::ExprIntervalGraph;
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index c523a8fe81..afde986c0b 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -18,7 +18,6 @@
//! Join related functionality used both on logical and physical plans
use std::collections::HashSet;
-use std::fmt::{Display, Formatter};
use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -42,7 +41,7 @@ use datafusion_common::cast::as_boolean_array;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{
- exec_err, plan_datafusion_err, plan_err, DataFusionError, JoinType, Result,
+ exec_err, plan_datafusion_err, plan_err, DataFusionError, JoinSide,
JoinType, Result,
SharedResult,
};
use datafusion_physical_expr::expressions::Column;
@@ -456,34 +455,6 @@ pub fn combine_join_ordering_equivalence_properties(
Ok(new_properties)
}
-impl Display for JoinSide {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- match self {
- JoinSide::Left => write!(f, "left"),
- JoinSide::Right => write!(f, "right"),
- }
- }
-}
-
-/// Used in ColumnIndex to distinguish which side the index is for
-#[derive(Debug, Clone, Copy, PartialEq)]
-pub enum JoinSide {
- /// Left side of the join
- Left,
- /// Right side of the join
- Right,
-}
-
-impl JoinSide {
- /// Inverse the join side
- pub fn negate(&self) -> Self {
- match self {
- JoinSide::Left => JoinSide::Right,
- JoinSide::Right => JoinSide::Left,
- }
- }
-}
-
/// Information about the index and placement (left or right) of the columns
#[derive(Debug, Clone)]
pub struct ColumnIndex {
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index cdc772f71d..a956eded90 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -35,13 +35,12 @@ use datafusion::physical_plan::expressions::{
Literal, NegativeExpr, NotExpr, TryCastExpr,
};
use datafusion::physical_plan::expressions::{GetFieldAccessExpr,
GetIndexedFieldExpr};
-use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion::physical_plan::windows::create_window_expr;
use datafusion::physical_plan::{
functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics,
WindowExpr,
};
use datafusion_common::stats::Precision;
-use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
+use datafusion_common::{not_impl_err, DataFusionError, JoinSide, Result,
ScalarValue};
use crate::common::proto_error;
use crate::convert_required;
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 466b99b684..114baab6cc 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -44,14 +44,13 @@ use datafusion::physical_plan::expressions::{
Regr, RegrType, RowNumber, Stddev, StddevPop, Sum, TryCastExpr, Variance,
VariancePop, WindowShift,
};
-use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr,
PlainAggregateWindowExpr};
use datafusion::physical_plan::{
AggregateExpr, ColumnStatistics, PhysicalExpr, Statistics, WindowExpr,
};
use datafusion_common::{
- internal_err, not_impl_err, stats::Precision, DataFusionError, Result,
+ internal_err, not_impl_err, stats::Precision, DataFusionError, JoinSide,
Result,
};
impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {