This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6cca0f7f37 Fix bug when pushing projection under joins (#11333)
6cca0f7f37 is described below
commit 6cca0f7f3725406aef4deb1ff1bbe299867ce82c
Author: Jonah Gao <[email protected]>
AuthorDate: Wed Jul 10 05:08:22 2024 +0800
Fix bug when pushing projection under joins (#11333)
* Fix bug in `ProjectionPushdown`
* add order by
* Fix join on
---
.../src/physical_optimizer/projection_pushdown.rs | 50 ++++++++++++-------
datafusion/sqllogictest/test_files/join.slt | 58 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 19 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 70524dfcea..3c2be59f75 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -46,7 +46,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
-use datafusion_common::{DataFusionError, JoinSide};
+use datafusion_common::{internal_err, JoinSide};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::{
utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef,
@@ -640,6 +640,7 @@ fn try_pushdown_through_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
hash_join.on(),
+ hash_join.left().schema().fields().len(),
) else {
return Ok(None);
};
@@ -649,8 +650,7 @@ fn try_pushdown_through_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
- hash_join.left(),
- hash_join.right(),
+ hash_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
@@ -750,8 +750,7 @@ fn try_swapping_with_nested_loop_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
- nl_join.left(),
- nl_join.right(),
+ nl_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
@@ -806,6 +805,7 @@ fn try_swapping_with_sort_merge_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
sm_join.on(),
+ sm_join.left().schema().fields().len(),
) else {
return Ok(None);
};
@@ -859,6 +859,7 @@ fn try_swapping_with_sym_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
sym_join.on(),
+ sym_join.left().schema().fields().len(),
) else {
return Ok(None);
};
@@ -868,8 +869,7 @@ fn try_swapping_with_sym_hash_join(
&projection_as_columns[0..=far_right_left_col_ind as _],
&projection_as_columns[far_left_right_col_ind as _..],
filter,
- sym_join.left(),
- sym_join.right(),
+ sym_join.left().schema().fields().len(),
) {
Some(updated_filter) => Some(updated_filter),
None => return Ok(None),
@@ -1090,6 +1090,7 @@ fn update_join_on(
proj_left_exprs: &[(Column, String)],
proj_right_exprs: &[(Column, String)],
hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
+ left_field_size: usize,
) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
// TODO: Clippy wants the "map" call removed, but doing so generates
// a compilation error. Remove the clippy directive once this
@@ -1100,8 +1101,9 @@ fn update_join_on(
.map(|(left, right)| (left, right))
.unzip();
- let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs);
- let new_right_columns = new_columns_for_join_on(&right_idx,
proj_right_exprs);
+ let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs,
0);
+ let new_right_columns =
+ new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
match (new_left_columns, new_right_columns) {
(Some(left), Some(right)) =>
Some(left.into_iter().zip(right).collect()),
@@ -1112,9 +1114,14 @@ fn update_join_on(
/// This function generates a new set of columns to be used in a hash join
/// operation based on a set of equi-join conditions (`hash_join_on`) and a
/// list of projection expressions (`projection_exprs`).
+///
+/// Notes: Column indices in the projection expressions are based on the join
schema,
+/// whereas the join on expressions are based on the join child schema.
`column_index_offset`
+/// represents the offset between them.
fn new_columns_for_join_on(
hash_join_on: &[&PhysicalExprRef],
projection_exprs: &[(Column, String)],
+ column_index_offset: usize,
) -> Option<Vec<PhysicalExprRef>> {
let new_columns = hash_join_on
.iter()
@@ -1130,6 +1137,8 @@ fn new_columns_for_join_on(
.enumerate()
.find(|(_, (proj_column, _))| {
column.name() == proj_column.name()
+ && column.index() + column_index_offset
+ == proj_column.index()
})
.map(|(index, (_, alias))| Column::new(alias,
index));
if let Some(new_column) = new_column {
@@ -1138,10 +1147,10 @@ fn new_columns_for_join_on(
// If the column is not found in the projection
expressions,
// it means that the column is not projected. In
this case,
// we cannot push the projection down.
- Err(DataFusionError::Internal(format!(
+ internal_err!(
"Column {:?} not found in projection
expressions",
column
- )))
+ )
}
} else {
Ok(Transformed::no(expr))
@@ -1160,21 +1169,20 @@ fn update_join_filter(
projection_left_exprs: &[(Column, String)],
projection_right_exprs: &[(Column, String)],
join_filter: &JoinFilter,
- join_left: &Arc<dyn ExecutionPlan>,
- join_right: &Arc<dyn ExecutionPlan>,
+ left_field_size: usize,
) -> Option<JoinFilter> {
let mut new_left_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Left,
projection_left_exprs,
- join_left.schema(),
+ 0,
)
.into_iter();
let mut new_right_indices = new_indices_for_join_filter(
join_filter,
JoinSide::Right,
projection_right_exprs,
- join_right.schema(),
+ left_field_size,
)
.into_iter();
@@ -1204,20 +1212,24 @@ fn update_join_filter(
/// This function determines and returns a vector of indices representing the
/// positions of columns in `projection_exprs` that are involved in
`join_filter`,
/// and correspond to a particular side (`join_side`) of the join operation.
+///
+/// Notes: Column indices in the projection expressions are based on the join
schema,
+/// whereas the join filter is based on the join child schema.
`column_index_offset`
+/// represents the offset between them.
fn new_indices_for_join_filter(
join_filter: &JoinFilter,
join_side: JoinSide,
projection_exprs: &[(Column, String)],
- join_child_schema: SchemaRef,
+ column_index_offset: usize,
) -> Vec<usize> {
join_filter
.column_indices()
.iter()
.filter(|col_idx| col_idx.side == join_side)
.filter_map(|col_idx| {
- projection_exprs.iter().position(|(col, _)| {
- col.name() == join_child_schema.fields()[col_idx.index].name()
- })
+ projection_exprs
+ .iter()
+ .position(|(col, _)| col_idx.index + column_index_offset ==
col.index())
})
.collect()
}
diff --git a/datafusion/sqllogictest/test_files/join.slt
b/datafusion/sqllogictest/test_files/join.slt
index 3c89109145..12cb8b3985 100644
--- a/datafusion/sqllogictest/test_files/join.slt
+++ b/datafusion/sqllogictest/test_files/join.slt
@@ -986,3 +986,61 @@ DROP TABLE employees
statement ok
DROP TABLE department
+
+
+# Test issue: https://github.com/apache/datafusion/issues/11269
+statement ok
+CREATE TABLE t1 (v0 BIGINT) AS VALUES (-503661263);
+
+statement ok
+CREATE TABLE t2 (v0 DOUBLE) AS VALUES (-1.663563947387);
+
+statement ok
+CREATE TABLE t3 (v0 DOUBLE) AS VALUES (0.05112015193508901);
+
+query RR
+SELECT t3.v0, t2.v0 FROM t1,t2,t3 WHERE t3.v0 >= t1.v0;
+----
+0.051120151935 -1.663563947387
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+statement ok
+DROP TABLE t3;
+
+
+# Test issue: https://github.com/apache/datafusion/issues/11275
+statement ok
+CREATE TABLE t0 (v1 BOOLEAN) AS VALUES (false), (null);
+
+statement ok
+CREATE TABLE t1 (v1 BOOLEAN) AS VALUES (false), (null), (false);
+
+statement ok
+CREATE TABLE t2 (v1 BOOLEAN) AS VALUES (false), (true);
+
+query BB
+SELECT t2.v1, t1.v1 FROM t0, t1, t2 WHERE t2.v1 IS DISTINCT FROM t0.v1 ORDER
BY 1,2;
+----
+false false
+false false
+false NULL
+true false
+true false
+true false
+true false
+true NULL
+true NULL
+
+statement ok
+DROP TABLE t0;
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]