This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7c25bd079f Adjustment of HashJoinExec APIs to Preserve Probe Side
Order (#6858)
7c25bd079f is described below
commit 7c25bd079f9e407ba27ec679efb04eff6b4b4995
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Jul 7 09:39:48 2023 +0300
Adjustment of HashJoinExec APIs to Preserve Probe Side Order (#6858)
* Implementation and tests
* Update
datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
Co-authored-by: Mustafa Akur
<[email protected]>
* Visit comments
* Update hash_join.rs
---------
Co-authored-by: Mustafa Akur
<[email protected]>
---
.../src/physical_optimizer/sort_enforcement.rs | 53 ++++++++++-
.../core/src/physical_optimizer/sort_pushdown.rs | 19 +++-
.../core/src/physical_plan/joins/hash_join.rs | 43 ++++++++-
datafusion/core/src/physical_plan/joins/utils.rs | 59 +++++++++++-
.../test_files/join_disable_repartition_joins.slt | 103 +++++++++++++++++++++
5 files changed, 265 insertions(+), 12 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 719c152841..dc1f27738d 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -973,8 +973,8 @@ mod tests {
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::filter::FilterExec;
- use crate::physical_plan::joins::utils::JoinOn;
- use crate::physical_plan::joins::SortMergeJoinExec;
+ use crate::physical_plan::joins::utils::{JoinFilter, JoinOn};
+ use crate::physical_plan::joins::{HashJoinExec, PartitionMode,
SortMergeJoinExec};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::repartition::RepartitionExec;
@@ -1699,6 +1699,37 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_remove_unnecessary_sort5() -> Result<()> {
+ let left_schema = create_test_schema2()?;
+ let right_schema = create_test_schema3()?;
+ let left_input = memory_exec(&left_schema);
+ let parquet_sort_exprs = vec![sort_expr("a", &right_schema)];
+ let right_input = parquet_exec_sorted(&right_schema,
parquet_sort_exprs);
+
+ let on = vec![(
+ Column::new_with_schema("col_a", &left_schema)?,
+ Column::new_with_schema("c", &right_schema)?,
+ )];
+ let join = hash_join_exec(left_input, right_input, on, None,
&JoinType::Inner)?;
+ let physical_plan = sort_exec(vec![sort_expr("a", &join.schema())],
join);
+
+ let expected_input = vec![
+ "SortExec: expr=[a@2 ASC]",
+ " HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_a@0,
c@2)]",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 ASC]",
+ ];
+
+ let expected_optimized = vec![
+ "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_a@0,
c@2)]",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[a@0 ASC]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
+ Ok(())
+ }
+
#[tokio::test]
async fn test_remove_unnecessary_spm1() -> Result<()> {
let schema = create_test_schema()?;
@@ -2759,6 +2790,24 @@ mod tests {
Arc::new(SortExec::new(sort_exprs, input))
}
+ fn hash_join_exec(
+ left: Arc<dyn ExecutionPlan>,
+ right: Arc<dyn ExecutionPlan>,
+ on: JoinOn,
+ filter: Option<JoinFilter>,
+ join_type: &JoinType,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(HashJoinExec::try_new(
+ left,
+ right,
+ on,
+ filter,
+ join_type,
+ PartitionMode::Partitioned,
+ true,
+ )?))
+ }
+
fn sort_preserving_merge_exec(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index 20a5038b7a..06260eb4e1 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -17,7 +17,7 @@
use crate::physical_optimizer::utils::{add_sort_above, is_limit, is_union,
is_window};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::JoinSide;
-use crate::physical_plan::joins::SortMergeJoinExec;
+use crate::physical_plan::joins::{HashJoinExec, SortMergeJoinExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
@@ -263,16 +263,25 @@ fn pushdown_requirement_to_children(
// TODO: Add support for Projection push down
|| plan.as_any().is::<ProjectionExec>()
|| is_limit(plan)
+ || plan.as_any().is::<HashJoinExec>()
{
// If the current plan is a leaf node or can not maintain any of the
input ordering, can not pushed down requirements.
// For RepartitionExec, we always choose to not push down the sort
requirements even the RepartitionExec(input_partition=1) could maintain input
ordering.
// Pushing down is not beneficial
Ok(None)
} else {
- Ok(Some(vec![
- parent_required.map(|elem| elem.to_vec());
- plan.children().len()
- ]))
+ Ok(Some(
+ maintains_input_order
+ .iter()
+ .map(|flag| {
+ if *flag {
+ parent_required.map(|elem| elem.to_vec())
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>(),
+ ))
}
// TODO: Add support for Projection push down
}
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index d258e7529a..47403a70f4 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -58,7 +58,8 @@ use datafusion_execution::memory_pool::MemoryReservation;
use crate::physical_plan::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices,
build_batch_from_indices,
- get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
+ calculate_hash_join_output_order, get_final_indices_from_bit_map,
+ need_produce_result_in_final, JoinSide,
};
use crate::physical_plan::DisplayAs;
use crate::physical_plan::{
@@ -115,6 +116,8 @@ pub struct HashJoinExec {
left_fut: OnceAsync<JoinLeftData>,
/// Shares the `RandomState` for the hashing algorithm
random_state: RandomState,
+ /// Output order
+ output_order: Option<Vec<PhysicalSortExpr>>,
/// Partitioning mode to use
pub(crate) mode: PartitionMode,
/// Execution metrics
@@ -153,6 +156,13 @@ impl HashJoinExec {
let random_state = RandomState::with_seeds(0, 0, 0, 0);
+ let output_order = calculate_hash_join_output_order(
+ join_type,
+ left.output_ordering(),
+ right.output_ordering(),
+ left.schema().fields().len(),
+ )?;
+
Ok(HashJoinExec {
left,
right,
@@ -166,6 +176,7 @@ impl HashJoinExec {
metrics: ExecutionPlanMetricsSet::new(),
column_indices,
null_equals_null,
+ output_order,
})
}
@@ -328,10 +339,34 @@ impl ExecutionPlan for HashJoinExec {
}
}
- // TODO Output ordering might be kept for some cases.
- // For example if it is inner join then the stream side order can be kept
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ self.output_order.as_deref()
+ }
+
+ // For [JoinType::Inner] and [JoinType::RightSemi] in hash joins, the
probe phase initiates by
+ // applying the hash function to convert the join key(s) in each row into
a hash value from the
+ // probe side table in the order they're arranged. The hash value is used
to look up corresponding
+ // entries in the hash table that was constructed from the build side
table during the build phase.
+ //
+ // Because of the immediate generation of result rows once a match is
found,
+ // the output of the join tends to follow the order in which the rows were
read from
+ // the probe side table. This is simply due to the sequence in which the
rows were processed.
+ // Hence, it appears that the hash join is preserving the order of the
probe side.
+ //
+ // Meanwhile, in the case of a [JoinType::RightAnti] hash join,
+ // the unmatched rows from the probe side are also kept in order.
+ // This is because the **`RightAnti`** join is designed to return rows
from the right
+ // (probe side) table that have no match in the left (build side) table.
Because the rows
+ // are processed sequentially in the probe phase, and unmatched rows are
directly output
+ // as results, these results tend to retain the order of the probe side
table.
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![
+ false,
+ matches!(
+ self.join_type,
+ JoinType::Inner | JoinType::RightAnti | JoinType::RightSemi
+ ),
+ ]
}
fn equivalence_properties(&self) -> EquivalenceProperties {
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs
b/datafusion/core/src/physical_plan/joins/utils.rs
index 627bdeebc5..d4be2cfe10 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -40,7 +40,7 @@ use datafusion_common::cast::as_boolean_array;
use datafusion_common::{ScalarValue, SharedResult};
use datafusion_common::tree_node::{Transformed, TreeNode};
-use datafusion_physical_expr::{EquivalentClass, PhysicalExpr};
+use datafusion_physical_expr::{EquivalentClass, PhysicalExpr,
PhysicalSortExpr};
use datafusion_common::JoinType;
use datafusion_common::{DataFusionError, Result};
@@ -147,6 +147,63 @@ pub fn adjust_right_output_partitioning(
}
}
+fn adjust_right_order(
+ right_order: &[PhysicalSortExpr],
+ left_columns_len: usize,
+) -> Result<Vec<PhysicalSortExpr>> {
+ right_order
+ .iter()
+ .map(|sort_expr| {
+ let expr = sort_expr.expr.clone();
+ let adjusted = expr.transform_up(&|expr| {
+ Ok(
+ if let Some(column) =
expr.as_any().downcast_ref::<Column>() {
+ let new_col =
+ Column::new(column.name(), column.index() +
left_columns_len);
+ Transformed::Yes(Arc::new(new_col))
+ } else {
+ Transformed::No(expr)
+ },
+ )
+ })?;
+ Ok(PhysicalSortExpr {
+ expr: adjusted,
+ options: sort_expr.options,
+ })
+ })
+ .collect::<Result<Vec<_>>>()
+}
+
+/// Calculate the output order for hash join.
+pub fn calculate_hash_join_output_order(
+ join_type: &JoinType,
+ maybe_left_order: Option<&[PhysicalSortExpr]>,
+ maybe_right_order: Option<&[PhysicalSortExpr]>,
+ left_len: usize,
+) -> Result<Option<Vec<PhysicalSortExpr>>> {
+ match maybe_right_order {
+ Some(right_order) => {
+ let result = match join_type {
+ JoinType::Inner => {
+ // We modify the indices of the right order columns
because their
+ // columns are appended to the right side of the left
schema.
+ let mut adjusted_right_order =
+ adjust_right_order(right_order, left_len)?;
+ if let Some(left_order) = maybe_left_order {
+ adjusted_right_order.extend_from_slice(left_order);
+ }
+ Some(adjusted_right_order)
+ }
+ JoinType::RightAnti | JoinType::RightSemi =>
Some(right_order.to_vec()),
+ _ => None,
+ };
+
+ Ok(result)
+ }
+ None => Ok(None),
+ }
+}
+
/// Combine the Equivalence Properties for Join Node
pub fn combine_join_equivalence_properties(
join_type: JoinType,
diff --git
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
index 5f680fcae7..fcc6d665c6 100644
---
a/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
+++
b/datafusion/core/tests/sqllogictests/test_files/join_disable_repartition_joins.slt
@@ -25,6 +25,109 @@ set datafusion.optimizer.repartition_joins = false;
include ./join.slt
+statement ok
+CREATE EXTERNAL TABLE annotated_data (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC, c ASC)
+LOCATION 'tests/data/window_2.csv';
+
+query TT
+EXPLAIN SELECT t2.a
+ FROM annotated_data as t1
+ INNER JOIN annotated_data as t2
+ ON t1.c = t2.c ORDER BY t2.a
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+--Sort: t2.a ASC NULLS LAST, fetch=5
+----Projection: t2.a
+------Inner Join: t1.c = t2.c
+--------SubqueryAlias: t1
+----------TableScan: annotated_data projection=[c]
+--------SubqueryAlias: t2
+----------TableScan: annotated_data projection=[a, c]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+--ProjectionExec: expr=[a@1 as a]
+----CoalesceBatchesExec: target_batch_size=8192
+------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(c@0, c@1)]
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c],
has_header=true
+--------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c],
output_ordering=[a@0 ASC NULLS LAST], has_header=true
+
+# preserve_inner_join
+query III nosort
+SELECT t1.a, t2.a as a2, t2.b
+ FROM annotated_data as t1
+ INNER JOIN annotated_data as t2
+ ON t1.d = t2.d ORDER BY a2, t2.b
+ LIMIT 5
+----
+1 0 0
+1 0 0
+1 0 0
+1 0 0
+1 0 0
+
+query TT
+EXPLAIN SELECT t2.a as a2, t2.b
+ FROM annotated_data as t1
+ RIGHT SEMI JOIN annotated_data as t2
+ ON t1.d = t2.d AND t1.c = t2.c
+ WHERE t2.d = 3
+ ORDER BY a2, t2.b
+LIMIT 10
+----
+logical_plan
+Limit: skip=0, fetch=10
+--Sort: a2 ASC NULLS LAST, t2.b ASC NULLS LAST, fetch=10
+----Projection: t2.a AS a2, t2.b
+------RightSemi Join: t1.d = t2.d, t1.c = t2.c
+--------SubqueryAlias: t1
+----------TableScan: annotated_data projection=[c, d]
+--------SubqueryAlias: t2
+----------Filter: annotated_data.d = Int32(3)
+------------TableScan: annotated_data projection=[a, b, c, d],
partial_filters=[annotated_data.d = Int32(3)]
+physical_plan
+GlobalLimitExec: skip=0, fetch=10
+--SortPreservingMergeExec: [a2@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10
+----ProjectionExec: expr=[a@0 as a2, b@1 as b]
+------CoalesceBatchesExec: target_batch_size=8192
+--------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3),
(c@0, c@2)]
+----------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d],
has_header=true
+----------CoalesceBatchesExec: target_batch_size=8192
+------------FilterExec: d@3 = 3
+--------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+----------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b,
c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS
LAST], has_header=true
+
+# preserve_right_semi_join
+query II nosort
+SELECT t2.a as a2, t2.b
+ FROM annotated_data as t1
+ RIGHT SEMI JOIN annotated_data as t2
+ ON t1.d = t2.d AND t1.c = t2.c
+ WHERE t2.d = 3
+ ORDER BY a2, t2.b
+LIMIT 10
+----
+0 0
+0 0
+0 0
+0 1
+0 1
+0 1
+0 1
+0 1
+1 2
+1 2
+
# turn on repartition_joins
statement ok
set datafusion.optimizer.repartition_joins = true;