This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 002165b55 Implement right semi join and support in HashBuildProbeorder
(#3958)
002165b55 is described below
commit 002165b55beb3aeb15ad4d9955295af7ff045945
Author: Daniƫl Heres <[email protected]>
AuthorDate: Thu Oct 27 20:22:43 2022 +0200
Implement right semi join and support in HashBuildProbeorder (#3958)
* Implement right semi join
* Change error a bit
* protobuf
* protobuf
* protobuf
* Change column name to b2
* Rename everything
* Rename & fmt
* Change display to leftanti
* Fix last expected plan
* Commit generated file
* generated
---
benchmarks/expected-plans/q16.txt | 2 +-
benchmarks/expected-plans/q18.txt | 2 +-
benchmarks/expected-plans/q20.txt | 4 +-
benchmarks/expected-plans/q21.txt | 4 +-
benchmarks/expected-plans/q22.txt | 2 +-
benchmarks/expected-plans/q4.txt | 2 +-
.../physical_optimizer/hash_build_probe_order.rs | 11 +-
.../core/src/physical_plan/joins/hash_join.rs | 132 ++++++++++++++++++---
.../src/physical_plan/joins/sort_merge_join.rs | 35 ++++--
datafusion/core/src/physical_plan/joins/utils.rs | 27 ++++-
datafusion/core/tests/join_fuzz.rs | 4 +-
datafusion/core/tests/sql/subqueries.rs | 10 +-
datafusion/expr/src/logical_plan/builder.rs | 7 +-
datafusion/expr/src/logical_plan/plan.rs | 15 ++-
.../optimizer/src/decorrelate_where_exists.rs | 20 ++--
datafusion/optimizer/src/decorrelate_where_in.rs | 28 ++---
datafusion/optimizer/src/filter_push_down.rs | 7 +-
.../optimizer/src/subquery_filter_to_join.rs | 20 ++--
datafusion/optimizer/tests/integration-test.rs | 10 +-
datafusion/proto/proto/datafusion.proto | 5 +-
datafusion/proto/src/generated/pbjson.rs | 15 ++-
datafusion/proto/src/generated/prost.rs | 10 +-
datafusion/proto/src/logical_plan.rs | 10 +-
23 files changed, 266 insertions(+), 116 deletions(-)
diff --git a/benchmarks/expected-plans/q16.txt
b/benchmarks/expected-plans/q16.txt
index 11943cf24..b1efb18fa 100644
--- a/benchmarks/expected-plans/q16.txt
+++ b/benchmarks/expected-plans/q16.txt
@@ -3,7 +3,7 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS
LAST, part.p_type AS
Projection: group_alias_0 AS p_brand, group_alias_1 AS p_type,
group_alias_2 AS p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)
Aggregate: groupBy=[[group_alias_0, group_alias_1, group_alias_2]],
aggr=[[COUNT(alias1)]]
Aggregate: groupBy=[[part.p_brand AS group_alias_0, part.p_type AS
group_alias_1, part.p_size AS group_alias_2, partsupp.ps_suppkey AS alias1]],
aggr=[[]]
- Anti Join: partsupp.ps_suppkey = __sq_1.s_suppkey
+ LeftAnti Join: partsupp.ps_suppkey = __sq_1.s_suppkey
Inner Join: partsupp.ps_partkey = part.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey]
Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT
LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14),
Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
diff --git a/benchmarks/expected-plans/q18.txt
b/benchmarks/expected-plans/q18.txt
index ebc22ea5d..ce0b20c20 100644
--- a/benchmarks/expected-plans/q18.txt
+++ b/benchmarks/expected-plans/q18.txt
@@ -1,7 +1,7 @@
Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
Projection: customer.c_name, customer.c_custkey, orders.o_orderkey,
orders.o_orderdate, orders.o_totalprice, SUM(lineitem.l_quantity)
Aggregate: groupBy=[[customer.c_name, customer.c_custkey,
orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]],
aggr=[[SUM(lineitem.l_quantity)]]
- Semi Join: orders.o_orderkey = __sq_1.l_orderkey
+ LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey
Inner Join: orders.o_orderkey = lineitem.l_orderkey
Inner Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey, c_name]
diff --git a/benchmarks/expected-plans/q20.txt
b/benchmarks/expected-plans/q20.txt
index 6d3ef1f6c..e5398325e 100644
--- a/benchmarks/expected-plans/q20.txt
+++ b/benchmarks/expected-plans/q20.txt
@@ -1,6 +1,6 @@
Sort: supplier.s_name ASC NULLS LAST
Projection: supplier.s_name, supplier.s_address
- Semi Join: supplier.s_suppkey = __sq_2.ps_suppkey
+ LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey
Inner Join: supplier.s_nationkey = nation.n_nationkey
TableScan: supplier projection=[s_suppkey, s_name, s_address,
s_nationkey]
Filter: nation.n_name = Utf8("CANADA")
@@ -8,7 +8,7 @@ Sort: supplier.s_name ASC NULLS LAST
Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2
Filter: CAST(partsupp.ps_availqty AS Decimal128(38, 17)) >
__sq_3.__value
Inner Join: partsupp.ps_partkey = __sq_3.l_partkey,
partsupp.ps_suppkey = __sq_3.l_suppkey
- Semi Join: partsupp.ps_partkey = __sq_1.p_partkey
+ LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey,
ps_availqty]
Projection: part.p_partkey AS p_partkey, alias=__sq_1
Filter: part.p_name LIKE Utf8("forest%")
diff --git a/benchmarks/expected-plans/q21.txt
b/benchmarks/expected-plans/q21.txt
index f5aa1dc84..397e0a8d8 100644
--- a/benchmarks/expected-plans/q21.txt
+++ b/benchmarks/expected-plans/q21.txt
@@ -1,8 +1,8 @@
Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
Projection: supplier.s_name, COUNT(UInt8(1)) AS numwait
Aggregate: groupBy=[[supplier.s_name]], aggr=[[COUNT(UInt8(1))]]
- Anti Join: l1.l_orderkey = l3.l_orderkey Filter: l3.l_suppkey !=
l1.l_suppkey
- Semi Join: l1.l_orderkey = l2.l_orderkey Filter: l2.l_suppkey !=
l1.l_suppkey
+ LeftAnti Join: l1.l_orderkey = l3.l_orderkey Filter: l3.l_suppkey !=
l1.l_suppkey
+ LeftSemi Join: l1.l_orderkey = l2.l_orderkey Filter: l2.l_suppkey !=
l1.l_suppkey
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: l1.l_orderkey = orders.o_orderkey
Inner Join: supplier.s_suppkey = l1.l_suppkey
diff --git a/benchmarks/expected-plans/q22.txt
b/benchmarks/expected-plans/q22.txt
index 919372f04..b56c8ff96 100644
--- a/benchmarks/expected-plans/q22.txt
+++ b/benchmarks/expected-plans/q22.txt
@@ -5,7 +5,7 @@ Sort: custsale.cntrycode ASC NULLS LAST
Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode,
customer.c_acctbal, alias=custsale
Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) >
__sq_1.__value
CrossJoin:
- Anti Join: customer.c_custkey = orders.o_custkey
+ LeftAnti Join: customer.c_custkey = orders.o_custkey
Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN
([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"),
Utf8("17")])
TableScan: customer projection=[c_custkey, c_phone,
c_acctbal]
TableScan: orders projection=[o_custkey]
diff --git a/benchmarks/expected-plans/q4.txt b/benchmarks/expected-plans/q4.txt
index a4339732e..3610ae175 100644
--- a/benchmarks/expected-plans/q4.txt
+++ b/benchmarks/expected-plans/q4.txt
@@ -1,7 +1,7 @@
Sort: orders.o_orderpriority ASC NULLS LAST
Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count
Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
- Semi Join: orders.o_orderkey = lineitem.l_orderkey
+ LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey
Filter: orders.o_orderdate >= Date32("8582") AND orders.o_orderdate <
Date32("8674")
TableScan: orders projection=[o_orderkey, o_orderdate,
o_orderpriority]
Filter: lineitem.l_commitdate < lineitem.l_receiptdate
diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
index 6817001d3..014921046 100644
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
@@ -70,8 +70,13 @@ fn should_swap_join_order(left: &dyn ExecutionPlan, right:
&dyn ExecutionPlan) -
fn supports_swap(join_type: JoinType) -> bool {
match join_type {
- JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full =>
true,
- JoinType::Semi | JoinType::Anti => false,
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::Right
+ | JoinType::Full
+ | JoinType::LeftSemi
+ | JoinType::RightSemi => true,
+ JoinType::LeftAnti => false,
}
}
@@ -81,6 +86,8 @@ fn swap_join_type(join_type: JoinType) -> JoinType {
JoinType::Full => JoinType::Full,
JoinType::Left => JoinType::Right,
JoinType::Right => JoinType::Left,
+ JoinType::LeftSemi => JoinType::RightSemi,
+ JoinType::RightSemi => JoinType::LeftSemi,
_ => unreachable!(),
}
}
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index ff036b78b..b41c0df1e 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -653,7 +653,7 @@ fn build_batch(
(left_indices, right_indices)
};
- if matches!(join_type, JoinType::Semi | JoinType::Anti) {
+ if matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti) {
return Ok((
RecordBatch::new_empty(Arc::new(schema.clone())),
left_filtered_indices,
@@ -719,7 +719,7 @@ fn build_join_indexes(
let left = &left_data.0;
match join_type {
- JoinType::Inner | JoinType::Semi | JoinType::Anti => {
+ JoinType::Inner | JoinType::LeftSemi | JoinType::LeftAnti => {
// Using a buffer builder to avoid slower normal builder
let mut left_indices = UInt64BufferBuilder::new(0);
let mut right_indices = UInt32BufferBuilder::new(0);
@@ -765,6 +765,54 @@ fn build_join_indexes(
PrimitiveArray::<UInt32Type>::from(right),
))
}
+ JoinType::RightSemi => {
+ let mut left_indices = UInt64BufferBuilder::new(0);
+ let mut right_indices = UInt32BufferBuilder::new(0);
+
+ // Visit all of the right rows
+ for (row, hash_value) in hash_values.iter().enumerate() {
+ // Get the hash and find it in the build index
+
+ // For every item on the left and right we check if it matches
+ // This possibly contains rows with hash collisions,
+ // So we have to check here whether rows are equal or not
+ // We only produce one row if there is a match
+ if let Some((_, indices)) =
+ left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
+ {
+ for &i in indices {
+ // Check hash collisions
+ if equal_rows(
+ i as usize,
+ row,
+ &left_join_values,
+ &keys_values,
+ *null_equals_null,
+ )? {
+ left_indices.append(i);
+ right_indices.append(row as u32);
+ break;
+ }
+ }
+ }
+ }
+
+ let left = ArrayData::builder(DataType::UInt64)
+ .len(left_indices.len())
+ .add_buffer(left_indices.finish())
+ .build()
+ .unwrap();
+ let right = ArrayData::builder(DataType::UInt32)
+ .len(right_indices.len())
+ .add_buffer(right_indices.finish())
+ .build()
+ .unwrap();
+
+ Ok((
+ PrimitiveArray::<UInt64Type>::from(left),
+ PrimitiveArray::<UInt32Type>::from(right),
+ ))
+ }
JoinType::Left => {
let mut left_indices = UInt64Builder::with_capacity(0);
let mut right_indices = UInt32Builder::with_capacity(0);
@@ -853,7 +901,11 @@ fn apply_join_filter(
)?;
match join_type {
- JoinType::Inner | JoinType::Left | JoinType::Anti | JoinType::Semi => {
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::LeftAnti
+ | JoinType::LeftSemi
+ | JoinType::RightSemi => {
// For both INNER and LEFT joins, input arrays contains only
indices for matched data.
// Due to this fact it's correct to simply apply filter to
intermediate batch and return
// indices for left/right rows satisfying filter predicate
@@ -1280,14 +1332,19 @@ impl HashJoinStream {
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
match self.join_type {
- JoinType::Left | JoinType::Full | JoinType::Semi |
JoinType::Anti => {
+ JoinType::Left
+ | JoinType::Full
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti => {
let mut buffer = BooleanBufferBuilder::new(num_rows);
buffer.append_n(num_rows, false);
buffer
}
- JoinType::Inner | JoinType::Right =>
BooleanBufferBuilder::new(0),
+ JoinType::Inner | JoinType::Right | JoinType::RightSemi => {
+ BooleanBufferBuilder::new(0)
+ }
}
});
@@ -1318,13 +1375,13 @@ impl HashJoinStream {
match self.join_type {
JoinType::Left
| JoinType::Full
- | JoinType::Semi
- | JoinType::Anti => {
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti => {
left_side.iter().flatten().for_each(|x| {
visited_left_side.set_bit(x as usize,
true);
});
}
- JoinType::Inner | JoinType::Right => {}
+ JoinType::Inner | JoinType::Right |
JoinType::RightSemi => {}
}
}
Some(result.map(|x| x.0))
@@ -1335,8 +1392,8 @@ impl HashJoinStream {
match self.join_type {
JoinType::Left
| JoinType::Full
- | JoinType::Semi
- | JoinType::Anti
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti
if !self.is_exhausted =>
{
let result = produce_from_matched(
@@ -1344,7 +1401,7 @@ impl HashJoinStream {
&self.schema,
&self.column_indices,
left_data,
- self.join_type != JoinType::Semi,
+ self.join_type != JoinType::LeftSemi,
);
if let Ok(ref batch) = result {
self.join_metrics.input_batches.add(1);
@@ -1360,8 +1417,9 @@ impl HashJoinStream {
}
JoinType::Left
| JoinType::Full
- | JoinType::Semi
- | JoinType::Anti
+ | JoinType::LeftSemi
+ | JoinType::RightSemi
+ | JoinType::LeftAnti
| JoinType::Inner
| JoinType::Right => {}
}
@@ -2094,7 +2152,49 @@ mod tests {
Column::new_with_schema("b1", &right.schema())?,
)];
- let join = join(left, right, on, &JoinType::Semi, false)?;
+ let join = join(left, right, on, &JoinType::LeftSemi, false)?;
+
+ let columns = columns(&join.schema());
+ assert_eq!(columns, vec!["a1", "b1", "c1"]);
+
+ let stream = join.execute(0, task_ctx)?;
+ let batches = common::collect(stream).await?;
+
+ let expected = vec![
+ "+----+----+----+",
+ "| a1 | b1 | c1 |",
+ "+----+----+----+",
+ "| 1 | 4 | 7 |",
+ "| 2 | 5 | 8 |",
+ "| 2 | 5 | 8 |",
+ "+----+----+----+",
+ ];
+ assert_batches_sorted_eq!(expected, &batches);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn join_right_semi() -> Result<()> {
+ let session_ctx = SessionContext::new();
+ let task_ctx = session_ctx.task_ctx();
+ let left = build_table(
+ ("a2", &vec![10, 20, 30, 40]),
+ ("b2", &vec![4, 5, 6, 5]), // 5 is double on the left
+ ("c2", &vec![70, 80, 90, 100]),
+ );
+ let right = build_table(
+ ("a1", &vec![1, 2, 2, 3]),
+ ("b1", &vec![4, 5, 5, 7]), // 7 does not exist on the left
+ ("c1", &vec![7, 8, 8, 9]),
+ );
+
+ let on = vec![(
+ Column::new_with_schema("b2", &left.schema())?,
+ Column::new_with_schema("b1", &right.schema())?,
+ )];
+
+ let join = join(left, right, on, &JoinType::RightSemi, false)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1"]);
@@ -2135,7 +2235,7 @@ mod tests {
Column::new_with_schema("b1", &right.schema())?,
)];
- let join = join(left, right, on, &JoinType::Anti, false)?;
+ let join = join(left, right, on, &JoinType::LeftAnti, false)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["a1", "b1", "c1"]);
@@ -2196,7 +2296,7 @@ mod tests {
let filter =
JoinFilter::new(filter_expression, column_indices,
intermediate_schema);
- let join = join_with_filter(left, right, on, filter, &JoinType::Anti,
false)?;
+ let join = join_with_filter(left, right, on, filter,
&JoinType::LeftAnti, false)?;
let columns = columns(&join.schema());
assert_eq!(columns, vec!["col1", "col2", "col3"]);
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 3de712745..dfcab88c2 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -89,6 +89,12 @@ impl SortMergeJoinExec {
let left_schema = left.schema();
let right_schema = right.schema();
+ if join_type == JoinType::RightSemi {
+ return Err(DataFusionError::NotImplemented(
+ "SortMergeJoinExec does not support
JoinType::RightSemi".to_string(),
+ ));
+ }
+
check_join_is_valid(&left_schema, &right_schema, &on)?;
if sort_options.len() != on.len() {
return Err(DataFusionError::Plan(format!(
@@ -129,10 +135,11 @@ impl ExecutionPlan for SortMergeJoinExec {
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
match self.join_type {
- JoinType::Inner | JoinType::Left | JoinType::Semi | JoinType::Anti
=> {
- self.left.output_ordering()
- }
- JoinType::Right => self.right.output_ordering(),
+ JoinType::Inner
+ | JoinType::Left
+ | JoinType::LeftSemi
+ | JoinType::LeftAnti => self.left.output_ordering(),
+ JoinType::Right | JoinType::RightSemi =>
self.right.output_ordering(),
JoinType::Full => None,
}
}
@@ -173,14 +180,14 @@ impl ExecutionPlan for SortMergeJoinExec {
JoinType::Inner
| JoinType::Left
| JoinType::Full
- | JoinType::Anti
- | JoinType::Semi => (
+ | JoinType::LeftAnti
+ | JoinType::LeftSemi => (
self.left.clone(),
self.right.clone(),
self.on.iter().map(|on| on.0.clone()).collect(),
self.on.iter().map(|on| on.1.clone()).collect(),
),
- JoinType::Right => (
+ JoinType::Right | JoinType::RightSemi => (
self.right.clone(),
self.left.clone(),
self.on.iter().map(|on| on.1.clone()).collect(),
@@ -767,13 +774,17 @@ impl SMJStream {
Ordering::Less => {
if matches!(
self.join_type,
- JoinType::Left | JoinType::Right | JoinType::Full |
JoinType::Anti
+ JoinType::Left
+ | JoinType::Right
+ | JoinType::RightSemi
+ | JoinType::Full
+ | JoinType::LeftAnti
) {
join_streamed = !self.streamed_joined;
}
}
Ordering::Equal => {
- if matches!(self.join_type, JoinType::Semi) {
+ if matches!(self.join_type, JoinType::LeftSemi) {
join_streamed = !self.streamed_joined;
}
if matches!(
@@ -915,7 +926,7 @@ impl SMJStream {
let buffered_indices: UInt64Array =
chunk.buffered_indices.finish();
let mut buffered_columns =
- if matches!(self.join_type, JoinType::Semi | JoinType::Anti) {
+ if matches!(self.join_type, JoinType::LeftSemi |
JoinType::LeftAnti) {
vec![]
} else if let Some(buffered_idx) = chunk.buffered_batch_idx {
self.buffered_data.batches[buffered_idx]
@@ -1732,7 +1743,7 @@ mod tests {
Column::new_with_schema("b1", &right.schema())?,
)];
- let (_, batches) = join_collect(left, right, on,
JoinType::Anti).await?;
+ let (_, batches) = join_collect(left, right, on,
JoinType::LeftAnti).await?;
let expected = vec![
"+----+----+----+",
"| a1 | b1 | c1 |",
@@ -1763,7 +1774,7 @@ mod tests {
Column::new_with_schema("b1", &right.schema())?,
)];
- let (_, batches) = join_collect(left, right, on,
JoinType::Semi).await?;
+ let (_, batches) = join_collect(left, right, on,
JoinType::LeftSemi).await?;
let expected = vec![
"+----+----+----+",
"| a1 | b1 | c1 |",
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs
b/datafusion/core/src/physical_plan/joins/utils.rs
index f937dc1c4..1d1560478 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -169,8 +169,9 @@ fn output_join_field(old_field: &Field, join_type:
&JoinType, is_left: bool) ->
JoinType::Left => !is_left, // right input is padded with nulls
JoinType::Right => is_left, // left input is padded with nulls
JoinType::Full => true, // both inputs can be padded with nulls
- JoinType::Semi => false, // doesn't introduce nulls
- JoinType::Anti => false, // doesn't introduce nulls (or can it??)
+ JoinType::LeftSemi => false, // doesn't introduce nulls
+ JoinType::RightSemi => false, // doesn't introduce nulls
+ JoinType::LeftAnti => false, // doesn't introduce nulls (or can it??)
};
if force_nullable {
@@ -221,7 +222,7 @@ pub fn build_join_schema(
// left then right
left_fields.chain(right_fields).unzip()
}
- JoinType::Semi | JoinType::Anti => left
+ JoinType::LeftSemi | JoinType::LeftAnti => left
.fields()
.iter()
.cloned()
@@ -236,6 +237,21 @@ pub fn build_join_schema(
)
})
.unzip(),
+ JoinType::RightSemi => right
+ .fields()
+ .iter()
+ .cloned()
+ .enumerate()
+ .map(|(index, f)| {
+ (
+ f,
+ ColumnIndex {
+ index,
+ side: JoinSide::Right,
+ },
+ )
+ })
+ .unzip(),
};
(Schema::new(fields), column_indices)
@@ -394,8 +410,9 @@ fn estimate_join_cardinality(
})
}
- JoinType::Semi => None,
- JoinType::Anti => None,
+ JoinType::LeftSemi => None,
+ JoinType::LeftAnti => None,
+ JoinType::RightSemi => None,
}
}
diff --git a/datafusion/core/tests/join_fuzz.rs
b/datafusion/core/tests/join_fuzz.rs
index 8d4f31af5..1204b4428 100644
--- a/datafusion/core/tests/join_fuzz.rs
+++ b/datafusion/core/tests/join_fuzz.rs
@@ -78,7 +78,7 @@ async fn test_semi_join_1k() {
run_join_test(
make_staggered_batches(10000),
make_staggered_batches(10000),
- JoinType::Semi,
+ JoinType::LeftSemi,
)
.await
}
@@ -88,7 +88,7 @@ async fn test_anti_join_1k() {
run_join_test(
make_staggered_batches(10000),
make_staggered_batches(10000),
- JoinType::Anti,
+ JoinType::LeftAnti,
)
.await
}
diff --git a/datafusion/core/tests/sql/subqueries.rs
b/datafusion/core/tests/sql/subqueries.rs
index 8c77d860e..ed65d4391 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -94,7 +94,7 @@ where o_orderstatus in (
let plan = ctx.optimize(&plan).unwrap();
let actual = format!("{}", plan.display_indent());
let expected = r#"Projection: orders.o_orderkey
- Semi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey =
__sq_1.l_orderkey
+ LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey
= __sq_1.l_orderkey
TableScan: orders projection=[o_orderkey, o_orderstatus]
Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS
l_orderkey, alias=__sq_1
TableScan: lineitem projection=[l_orderkey, l_linestatus]"#
@@ -205,7 +205,7 @@ async fn tpch_q4_correlated() -> Result<()> {
let expected = r#"Sort: orders.o_orderpriority ASC NULLS LAST
Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count
Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
- Semi Join: orders.o_orderkey = lineitem.l_orderkey
+ LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey
TableScan: orders projection=[o_orderkey, o_orderpriority]
Filter: lineitem.l_commitdate < lineitem.l_receiptdate
TableScan: lineitem projection=[l_orderkey, l_commitdate,
l_receiptdate]"#
@@ -322,7 +322,7 @@ order by s_name;
let actual = format!("{}", plan.display_indent());
let expected = r#"Sort: supplier.s_name ASC NULLS LAST
Projection: supplier.s_name, supplier.s_address
- Semi Join: supplier.s_suppkey = __sq_2.ps_suppkey
+ LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey
Inner Join: supplier.s_nationkey = nation.n_nationkey
TableScan: supplier projection=[s_suppkey, s_name, s_address,
s_nationkey]
Filter: nation.n_name = Utf8("CANADA")
@@ -330,7 +330,7 @@ order by s_name;
Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2
Filter: CAST(partsupp.ps_availqty AS Decimal128(38, 17)) >
__sq_3.__value
Inner Join: partsupp.ps_partkey = __sq_3.l_partkey,
partsupp.ps_suppkey = __sq_3.l_suppkey
- Semi Join: partsupp.ps_partkey = __sq_1.p_partkey
+ LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey,
ps_availqty]
Projection: part.p_partkey AS p_partkey, alias=__sq_1
Filter: part.p_name LIKE Utf8("forest%")
@@ -386,7 +386,7 @@ order by cntrycode;"#;
Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode,
customer.c_acctbal, alias=custsale
Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) >
__sq_1.__value
CrossJoin:
- Anti Join: customer.c_custkey = orders.o_custkey
+ LeftAnti Join: customer.c_custkey = orders.o_custkey
Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN
([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"),
Utf8("17")])
TableScan: customer projection=[c_custkey, c_phone,
c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN
([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"),
Utf8("17")])]
TableScan: orders projection=[o_custkey]
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 631a64da6..829aa6682 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -743,7 +743,7 @@ impl LogicalPlanBuilder {
LogicalPlanBuilder::intersect_or_except(
left_plan,
right_plan,
- JoinType::Semi,
+ JoinType::LeftSemi,
is_all,
)
}
@@ -757,7 +757,7 @@ impl LogicalPlanBuilder {
LogicalPlanBuilder::intersect_or_except(
left_plan,
right_plan,
- JoinType::Anti,
+ JoinType::LeftAnti,
is_all,
)
}
@@ -823,10 +823,11 @@ pub fn build_join_schema(
// left then right
left_fields.chain(right_fields).cloned().collect()
}
- JoinType::Semi | JoinType::Anti => {
+ JoinType::LeftSemi | JoinType::LeftAnti => {
// Only use the left side for the schema
left.fields().clone()
}
+ JoinType::RightSemi => right.fields().clone(),
};
let mut metadata = left.metadata().clone();
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 284e8c9aa..38b3a789a 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -976,10 +976,12 @@ pub enum JoinType {
Right,
/// Full Join
Full,
- /// Semi Join
- Semi,
- /// Anti Join
- Anti,
+ /// Left Semi Join
+ LeftSemi,
+ /// Right Semi Join
+ RightSemi,
+ /// Left Anti Join
+ LeftAnti,
}
impl Display for JoinType {
@@ -989,8 +991,9 @@ impl Display for JoinType {
JoinType::Left => "Left",
JoinType::Right => "Right",
JoinType::Full => "Full",
- JoinType::Semi => "Semi",
- JoinType::Anti => "Anti",
+ JoinType::LeftSemi => "LeftSemi",
+ JoinType::RightSemi => "RightSemi",
+ JoinType::LeftAnti => "LeftAnti",
};
write!(f, "{}", join_type)
}
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs
b/datafusion/optimizer/src/decorrelate_where_exists.rs
index 78e8fc32d..c8102debe 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -176,8 +176,8 @@ fn optimize_exists(
// join our sub query into the main plan
let join_type = match query_info.negated {
- true => JoinType::Anti,
- false => JoinType::Semi,
+ true => JoinType::LeftAnti,
+ false => JoinType::LeftSemi,
};
let mut new_plan = LogicalPlanBuilder::from(outer_input.clone()).join(
&subqry_plan,
@@ -230,8 +230,8 @@ mod tests {
.build()?;
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
- Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8,
o_totalprice:Float64;N]"#;
@@ -266,9 +266,9 @@ mod tests {
.build()?;
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
- Semi Join: orders.o_orderkey = lineitem.l_orderkey [o_orderkey:Int64,
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
+ LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey [o_orderkey:Int64,
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]
TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64,
l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"#;
@@ -296,7 +296,7 @@ mod tests {
.build()?;
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
@@ -451,7 +451,7 @@ mod tests {
// Doesn't matter we projected an expression, just that we returned a
result
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8,
o_totalprice:Float64;N]"#;
@@ -475,7 +475,7 @@ mod tests {
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
- Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
@@ -527,7 +527,7 @@ mod tests {
.build()?;
let expected = r#"Projection: test.c [c:UInt32]
- Semi Join: test.a = sq.a [a:UInt32, b:UInt32, c:UInt32]
+ LeftSemi Join: test.a = sq.a [a:UInt32, b:UInt32, c:UInt32]
TableScan: test [a:UInt32, b:UInt32, c:UInt32]
TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs
b/datafusion/optimizer/src/decorrelate_where_in.rs
index 052ed796a..fa0367c45 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -189,8 +189,8 @@ fn optimize_where_in(
// join our sub query into the main plan
let join_type = match query_info.negated {
- true => JoinType::Anti,
- false => JoinType::Semi,
+ true => JoinType::LeftAnti,
+ false => JoinType::LeftSemi,
};
let mut new_plan = LogicalPlanBuilder::from(outer_input.clone()).join(
&subqry_plan,
@@ -259,8 +259,8 @@ mod tests {
debug!("plan to optimize:\n{}", plan.display_indent());
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64,
c_name:Utf8]
- Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]
@@ -297,10 +297,10 @@ mod tests {
.build()?;
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
Projection: orders.o_custkey AS o_custkey, alias=__sq_2 [o_custkey:Int64]
- Semi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64,
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
+ LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64,
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]
Projection: lineitem.l_orderkey AS l_orderkey, alias=__sq_1
[l_orderkey:Int64]
TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64,
l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64,
l_extendedprice:Float64]"#;
@@ -329,7 +329,7 @@ mod tests {
.build()?;
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]
@@ -356,7 +356,7 @@ mod tests {
// Query will fail, but we can still transform the plan
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64,
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
@@ -382,7 +382,7 @@ mod tests {
.build()?;
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64,
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
@@ -408,7 +408,7 @@ mod tests {
.build()?;
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
- Semi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey
!= orders.o_custkey [c_custkey:Int64, c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter:
customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
@@ -585,7 +585,7 @@ mod tests {
let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
- Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
+ LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64,
c_name:Utf8]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64,
o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
@@ -641,7 +641,7 @@ mod tests {
.build()?;
let expected = r#"Projection: test.b [b:UInt32]
- Semi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32,
c:UInt32]
+ LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32,
c:UInt32]
TableScan: test [a:UInt32, b:UInt32, c:UInt32]
Projection: sq.c AS c, sq.a AS a, alias=__sq_1 [c:UInt32, a:UInt32]
TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
@@ -660,7 +660,7 @@ mod tests {
.build()?;
let expected = r#"Projection: test.b [b:UInt32]
- Semi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
+ LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
TableScan: test [a:UInt32, b:UInt32, c:UInt32]
Projection: sq.c AS c, alias=__sq_1 [c:UInt32]
TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
@@ -679,7 +679,7 @@ mod tests {
.build()?;
let expected = r#"Projection: test.b [b:UInt32]
- Anti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
+ LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
TableScan: test [a:UInt32, b:UInt32, c:UInt32]
Projection: sq.c AS c, alias=__sq_1 [c:UInt32]
TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
diff --git a/datafusion/optimizer/src/filter_push_down.rs
b/datafusion/optimizer/src/filter_push_down.rs
index 148ae6715..902b02817 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -176,7 +176,10 @@ fn lr_is_preserved(plan: &LogicalPlan) -> Result<(bool,
bool)> {
JoinType::Full => Ok((false, false)),
// No columns from the right side of the join can be referenced in
output
// predicates for semi/anti joins, so whether we specify t/f
doesn't matter.
- JoinType::Semi | JoinType::Anti => Ok((true, false)),
+ JoinType::LeftSemi | JoinType::LeftAnti => Ok((true, false)),
+ // No columns from the left side of the join can be referenced in
output
+ // predicates for semi/anti joins, so whether we specify t/f
doesn't matter.
+ JoinType::RightSemi => Ok((false, true)),
},
LogicalPlan::CrossJoin(_) => Ok((true, true)),
_ => Err(DataFusionError::Internal(
@@ -195,7 +198,7 @@ fn on_lr_is_preserved(plan: &LogicalPlan) -> Result<(bool,
bool)> {
JoinType::Left => Ok((false, true)),
JoinType::Right => Ok((true, false)),
JoinType::Full => Ok((false, false)),
- JoinType::Semi | JoinType::Anti => {
+ JoinType::LeftSemi | JoinType::LeftAnti | JoinType::RightSemi => {
// filter_push_down does not yet support SEMI/ANTI joins with
join conditions
Ok((false, false))
}
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs
b/datafusion/optimizer/src/subquery_filter_to_join.rs
index 29f51a42f..c1717da0d 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -116,9 +116,9 @@ impl OptimizerRule for SubqueryFilterToJoin {
};
let join_type = if *negated {
- JoinType::Anti
+ JoinType::LeftAnti
} else {
- JoinType::Semi
+ JoinType::LeftSemi
};
let schema = build_join_schema(
@@ -231,7 +231,7 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n Semi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: sq.c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -250,7 +250,7 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n Anti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftAnti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: sq.c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -272,8 +272,8 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n Semi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
- \n Semi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: sq_1.c [c:UInt32]\
\n TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
@@ -301,7 +301,7 @@ mod tests {
let expected = "Projection: test.b [b:UInt32]\
\n Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32,
b:UInt32, c:UInt32]\
- \n Semi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: sq.c [c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -381,10 +381,10 @@ mod tests {
.build()?;
let expected = "Projection: test.b [b:UInt32]\
- \n Semi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: sq.a [a:UInt32]\
- \n Semi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32,
c:UInt32]\
\n TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: sq_nested.c [c:UInt32]\
\n TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
@@ -413,7 +413,7 @@ mod tests {
\n Subquery: [c:UInt32]\n Projection: sq_outer.c [c:UInt32]\
\n TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: test.b, test.c, alias=wrapped [b:UInt32, c:UInt32]\
- \n Semi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\
+ \n LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32,
c:UInt32]\
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
\n Projection: sq_inner.c [c:UInt32]\
\n TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
diff --git a/datafusion/optimizer/tests/integration-test.rs
b/datafusion/optimizer/tests/integration-test.rs
index 8003d590e..71a18b624 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -117,7 +117,7 @@ fn semi_join_with_join_filter() -> Result<()> {
AND test.col_uint32 != t2.col_uint32)";
let plan = test_sql(sql)?;
let expected = "Projection: test.col_utf8\
- \n Semi Join: test.col_int32 = t2.col_int32 Filter:
test.col_uint32 != t2.col_uint32\
+ \n LeftSemi Join: test.col_int32 = t2.col_int32 Filter:
test.col_uint32 != t2.col_uint32\
\n TableScan: test projection=[col_int32, col_uint32,
col_utf8]\
\n SubqueryAlias: t2\
\n TableScan: test projection=[col_int32, col_uint32,
col_utf8]";
@@ -133,7 +133,7 @@ fn anti_join_with_join_filter() -> Result<()> {
AND test.col_uint32 != t2.col_uint32)";
let plan = test_sql(sql)?;
let expected = "Projection: test.col_utf8\
- \n Anti Join: test.col_int32 = t2.col_int32 Filter:
test.col_uint32 != t2.col_uint32\
+ \n LeftAnti Join: test.col_int32 = t2.col_int32 Filter:
test.col_uint32 != t2.col_uint32\
\n TableScan: test projection=[col_int32, col_uint32,
col_utf8]\
\n SubqueryAlias: t2\
\n TableScan: test projection=[col_int32, col_uint32,
col_utf8]";
@@ -148,7 +148,7 @@ fn where_exists_distinct() -> Result<()> {
SELECT DISTINCT col_int32 FROM test t2 WHERE test.col_int32 =
t2.col_int32)";
let plan = test_sql(sql)?;
let expected = "Projection: test.col_int32\
- \n Semi Join: test.col_int32 = t2.col_int32\
+ \n LeftSemi Join: test.col_int32 = t2.col_int32\
\n TableScan: test projection=[col_int32]\
\n SubqueryAlias: t2\
\n TableScan: test projection=[col_int32]";
@@ -163,9 +163,9 @@ fn intersect() -> Result<()> {
INTERSECT SELECT col_int32, col_utf8 FROM test";
let plan = test_sql(sql)?;
let expected =
- "Semi Join: test.col_int32 = test.col_int32, test.col_utf8 =
test.col_utf8\
+ "LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 =
test.col_utf8\
\n Distinct:\
- \n Semi Join: test.col_int32 = test.col_int32, test.col_utf8 =
test.col_utf8\
+ \n LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 =
test.col_utf8\
\n Distinct:\
\n TableScan: test projection=[col_int32, col_utf8]\
\n TableScan: test projection=[col_int32, col_utf8]\
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index a878f285d..78b5669ed 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -230,8 +230,9 @@ enum JoinType {
LEFT = 1;
RIGHT = 2;
FULL = 3;
- SEMI = 4;
- ANTI = 5;
+ LEFTSEMI = 4;
+ LEFTANTI = 5;
+ RIGHTSEMI = 6;
}
enum JoinConstraint {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index e6b095b84..e8110b431 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6402,8 +6402,9 @@ impl serde::Serialize for JoinType {
Self::Left => "LEFT",
Self::Right => "RIGHT",
Self::Full => "FULL",
- Self::Semi => "SEMI",
- Self::Anti => "ANTI",
+ Self::Leftsemi => "LEFTSEMI",
+ Self::Leftanti => "LEFTANTI",
+ Self::Rightsemi => "RIGHTSEMI",
};
serializer.serialize_str(variant)
}
@@ -6419,8 +6420,9 @@ impl<'de> serde::Deserialize<'de> for JoinType {
"LEFT",
"RIGHT",
"FULL",
- "SEMI",
- "ANTI",
+ "LEFTSEMI",
+ "LEFTANTI",
+ "RIGHTSEMI",
];
struct GeneratedVisitor;
@@ -6467,8 +6469,9 @@ impl<'de> serde::Deserialize<'de> for JoinType {
"LEFT" => Ok(JoinType::Left),
"RIGHT" => Ok(JoinType::Right),
"FULL" => Ok(JoinType::Full),
- "SEMI" => Ok(JoinType::Semi),
- "ANTI" => Ok(JoinType::Anti),
+ "LEFTSEMI" => Ok(JoinType::Leftsemi),
+ "LEFTANTI" => Ok(JoinType::Leftanti),
+ "RIGHTSEMI" => Ok(JoinType::Rightsemi),
_ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 149edf020..cf6978faf 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1110,8 +1110,9 @@ pub enum JoinType {
Left = 1,
Right = 2,
Full = 3,
- Semi = 4,
- Anti = 5,
+ Leftsemi = 4,
+ Leftanti = 5,
+ Rightsemi = 6,
}
impl JoinType {
/// String value of the enum field names used in the ProtoBuf definition.
@@ -1124,8 +1125,9 @@ impl JoinType {
JoinType::Left => "LEFT",
JoinType::Right => "RIGHT",
JoinType::Full => "FULL",
- JoinType::Semi => "SEMI",
- JoinType::Anti => "ANTI",
+ JoinType::Leftsemi => "LEFTSEMI",
+ JoinType::Leftanti => "LEFTANTI",
+ JoinType::Rightsemi => "RIGHTSEMI",
}
}
}
diff --git a/datafusion/proto/src/logical_plan.rs
b/datafusion/proto/src/logical_plan.rs
index 278130b06..1f0e40895 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -239,8 +239,9 @@ impl From<protobuf::JoinType> for JoinType {
protobuf::JoinType::Left => JoinType::Left,
protobuf::JoinType::Right => JoinType::Right,
protobuf::JoinType::Full => JoinType::Full,
- protobuf::JoinType::Semi => JoinType::Semi,
- protobuf::JoinType::Anti => JoinType::Anti,
+ protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
+ protobuf::JoinType::Rightsemi => JoinType::RightSemi,
+ protobuf::JoinType::Leftanti => JoinType::LeftAnti,
}
}
}
@@ -252,8 +253,9 @@ impl From<JoinType> for protobuf::JoinType {
JoinType::Left => protobuf::JoinType::Left,
JoinType::Right => protobuf::JoinType::Right,
JoinType::Full => protobuf::JoinType::Full,
- JoinType::Semi => protobuf::JoinType::Semi,
- JoinType::Anti => protobuf::JoinType::Anti,
+ JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
+ JoinType::RightSemi => protobuf::JoinType::Rightsemi,
+ JoinType::LeftAnti => protobuf::JoinType::Leftanti,
}
}
}