This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 002165b55 Implement right semi join and support in HashBuildProbeorder 
(#3958)
002165b55 is described below

commit 002165b55beb3aeb15ad4d9955295af7ff045945
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Thu Oct 27 20:22:43 2022 +0200

    Implement right semi join and support in HashBuildProbeorder (#3958)
    
    * Implement right semi join
    
    * Change error a bit
    
    * protobuf
    
    * protobuf
    
    * protobuf
    
    * Change column name to b2
    
    * Rename everything
    
    * Rename & fmt
    
    * Change display to leftanti
    
    * Fix last expected plan
    
    * Commit generated file
    
    * generated
---
 benchmarks/expected-plans/q16.txt                  |   2 +-
 benchmarks/expected-plans/q18.txt                  |   2 +-
 benchmarks/expected-plans/q20.txt                  |   4 +-
 benchmarks/expected-plans/q21.txt                  |   4 +-
 benchmarks/expected-plans/q22.txt                  |   2 +-
 benchmarks/expected-plans/q4.txt                   |   2 +-
 .../physical_optimizer/hash_build_probe_order.rs   |  11 +-
 .../core/src/physical_plan/joins/hash_join.rs      | 132 ++++++++++++++++++---
 .../src/physical_plan/joins/sort_merge_join.rs     |  35 ++++--
 datafusion/core/src/physical_plan/joins/utils.rs   |  27 ++++-
 datafusion/core/tests/join_fuzz.rs                 |   4 +-
 datafusion/core/tests/sql/subqueries.rs            |  10 +-
 datafusion/expr/src/logical_plan/builder.rs        |   7 +-
 datafusion/expr/src/logical_plan/plan.rs           |  15 ++-
 .../optimizer/src/decorrelate_where_exists.rs      |  20 ++--
 datafusion/optimizer/src/decorrelate_where_in.rs   |  28 ++---
 datafusion/optimizer/src/filter_push_down.rs       |   7 +-
 .../optimizer/src/subquery_filter_to_join.rs       |  20 ++--
 datafusion/optimizer/tests/integration-test.rs     |  10 +-
 datafusion/proto/proto/datafusion.proto            |   5 +-
 datafusion/proto/src/generated/pbjson.rs           |  15 ++-
 datafusion/proto/src/generated/prost.rs            |  10 +-
 datafusion/proto/src/logical_plan.rs               |  10 +-
 23 files changed, 266 insertions(+), 116 deletions(-)

diff --git a/benchmarks/expected-plans/q16.txt 
b/benchmarks/expected-plans/q16.txt
index 11943cf24..b1efb18fa 100644
--- a/benchmarks/expected-plans/q16.txt
+++ b/benchmarks/expected-plans/q16.txt
@@ -3,7 +3,7 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS 
LAST, part.p_type AS
     Projection: group_alias_0 AS p_brand, group_alias_1 AS p_type, 
group_alias_2 AS p_size, COUNT(alias1) AS COUNT(DISTINCT partsupp.ps_suppkey)
       Aggregate: groupBy=[[group_alias_0, group_alias_1, group_alias_2]], 
aggr=[[COUNT(alias1)]]
         Aggregate: groupBy=[[part.p_brand AS group_alias_0, part.p_type AS 
group_alias_1, part.p_size AS group_alias_2, partsupp.ps_suppkey AS alias1]], 
aggr=[[]]
-          Anti Join: partsupp.ps_suppkey = __sq_1.s_suppkey
+          LeftAnti Join: partsupp.ps_suppkey = __sq_1.s_suppkey
             Inner Join: partsupp.ps_partkey = part.p_partkey
               TableScan: partsupp projection=[ps_partkey, ps_suppkey]
               Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT 
LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), 
Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
diff --git a/benchmarks/expected-plans/q18.txt 
b/benchmarks/expected-plans/q18.txt
index ebc22ea5d..ce0b20c20 100644
--- a/benchmarks/expected-plans/q18.txt
+++ b/benchmarks/expected-plans/q18.txt
@@ -1,7 +1,7 @@
 Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
   Projection: customer.c_name, customer.c_custkey, orders.o_orderkey, 
orders.o_orderdate, orders.o_totalprice, SUM(lineitem.l_quantity)
     Aggregate: groupBy=[[customer.c_name, customer.c_custkey, 
orders.o_orderkey, orders.o_orderdate, orders.o_totalprice]], 
aggr=[[SUM(lineitem.l_quantity)]]
-      Semi Join: orders.o_orderkey = __sq_1.l_orderkey
+      LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey
         Inner Join: orders.o_orderkey = lineitem.l_orderkey
           Inner Join: customer.c_custkey = orders.o_custkey
             TableScan: customer projection=[c_custkey, c_name]
diff --git a/benchmarks/expected-plans/q20.txt 
b/benchmarks/expected-plans/q20.txt
index 6d3ef1f6c..e5398325e 100644
--- a/benchmarks/expected-plans/q20.txt
+++ b/benchmarks/expected-plans/q20.txt
@@ -1,6 +1,6 @@
 Sort: supplier.s_name ASC NULLS LAST
   Projection: supplier.s_name, supplier.s_address
-    Semi Join: supplier.s_suppkey = __sq_2.ps_suppkey
+    LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey
       Inner Join: supplier.s_nationkey = nation.n_nationkey
         TableScan: supplier projection=[s_suppkey, s_name, s_address, 
s_nationkey]
         Filter: nation.n_name = Utf8("CANADA")
@@ -8,7 +8,7 @@ Sort: supplier.s_name ASC NULLS LAST
       Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2
         Filter: CAST(partsupp.ps_availqty AS Decimal128(38, 17)) > 
__sq_3.__value
           Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, 
partsupp.ps_suppkey = __sq_3.l_suppkey
-            Semi Join: partsupp.ps_partkey = __sq_1.p_partkey
+            LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
               TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_availqty]
               Projection: part.p_partkey AS p_partkey, alias=__sq_1
                 Filter: part.p_name LIKE Utf8("forest%")
diff --git a/benchmarks/expected-plans/q21.txt 
b/benchmarks/expected-plans/q21.txt
index f5aa1dc84..397e0a8d8 100644
--- a/benchmarks/expected-plans/q21.txt
+++ b/benchmarks/expected-plans/q21.txt
@@ -1,8 +1,8 @@
 Sort: numwait DESC NULLS FIRST, supplier.s_name ASC NULLS LAST
   Projection: supplier.s_name, COUNT(UInt8(1)) AS numwait
     Aggregate: groupBy=[[supplier.s_name]], aggr=[[COUNT(UInt8(1))]]
-      Anti Join: l1.l_orderkey = l3.l_orderkey Filter: l3.l_suppkey != 
l1.l_suppkey
-        Semi Join: l1.l_orderkey = l2.l_orderkey Filter: l2.l_suppkey != 
l1.l_suppkey
+      LeftAnti Join: l1.l_orderkey = l3.l_orderkey Filter: l3.l_suppkey != 
l1.l_suppkey
+        LeftSemi Join: l1.l_orderkey = l2.l_orderkey Filter: l2.l_suppkey != 
l1.l_suppkey
           Inner Join: supplier.s_nationkey = nation.n_nationkey
             Inner Join: l1.l_orderkey = orders.o_orderkey
               Inner Join: supplier.s_suppkey = l1.l_suppkey
diff --git a/benchmarks/expected-plans/q22.txt 
b/benchmarks/expected-plans/q22.txt
index 919372f04..b56c8ff96 100644
--- a/benchmarks/expected-plans/q22.txt
+++ b/benchmarks/expected-plans/q22.txt
@@ -5,7 +5,7 @@ Sort: custsale.cntrycode ASC NULLS LAST
         Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, 
customer.c_acctbal, alias=custsale
           Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > 
__sq_1.__value
             CrossJoin:
-              Anti Join: customer.c_custkey = orders.o_custkey
+              LeftAnti Join: customer.c_custkey = orders.o_custkey
                 Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN 
([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), 
Utf8("17")])
                   TableScan: customer projection=[c_custkey, c_phone, 
c_acctbal]
                 TableScan: orders projection=[o_custkey]
diff --git a/benchmarks/expected-plans/q4.txt b/benchmarks/expected-plans/q4.txt
index a4339732e..3610ae175 100644
--- a/benchmarks/expected-plans/q4.txt
+++ b/benchmarks/expected-plans/q4.txt
@@ -1,7 +1,7 @@
 Sort: orders.o_orderpriority ASC NULLS LAST
   Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count
     Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
-      Semi Join: orders.o_orderkey = lineitem.l_orderkey
+      LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey
         Filter: orders.o_orderdate >= Date32("8582") AND orders.o_orderdate < 
Date32("8674")
           TableScan: orders projection=[o_orderkey, o_orderdate, 
o_orderpriority]
         Filter: lineitem.l_commitdate < lineitem.l_receiptdate
diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs 
b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
index 6817001d3..014921046 100644
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
@@ -70,8 +70,13 @@ fn should_swap_join_order(left: &dyn ExecutionPlan, right: 
&dyn ExecutionPlan) -
 
 fn supports_swap(join_type: JoinType) -> bool {
     match join_type {
-        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
true,
-        JoinType::Semi | JoinType::Anti => false,
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::Right
+        | JoinType::Full
+        | JoinType::LeftSemi
+        | JoinType::RightSemi => true,
+        JoinType::LeftAnti => false,
     }
 }
 
@@ -81,6 +86,8 @@ fn swap_join_type(join_type: JoinType) -> JoinType {
         JoinType::Full => JoinType::Full,
         JoinType::Left => JoinType::Right,
         JoinType::Right => JoinType::Left,
+        JoinType::LeftSemi => JoinType::RightSemi,
+        JoinType::RightSemi => JoinType::LeftSemi,
         _ => unreachable!(),
     }
 }
diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs 
b/datafusion/core/src/physical_plan/joins/hash_join.rs
index ff036b78b..b41c0df1e 100644
--- a/datafusion/core/src/physical_plan/joins/hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/hash_join.rs
@@ -653,7 +653,7 @@ fn build_batch(
         (left_indices, right_indices)
     };
 
-    if matches!(join_type, JoinType::Semi | JoinType::Anti) {
+    if matches!(join_type, JoinType::LeftSemi | JoinType::LeftAnti) {
         return Ok((
             RecordBatch::new_empty(Arc::new(schema.clone())),
             left_filtered_indices,
@@ -719,7 +719,7 @@ fn build_join_indexes(
     let left = &left_data.0;
 
     match join_type {
-        JoinType::Inner | JoinType::Semi | JoinType::Anti => {
+        JoinType::Inner | JoinType::LeftSemi | JoinType::LeftAnti => {
             // Using a buffer builder to avoid slower normal builder
             let mut left_indices = UInt64BufferBuilder::new(0);
             let mut right_indices = UInt32BufferBuilder::new(0);
@@ -765,6 +765,54 @@ fn build_join_indexes(
                 PrimitiveArray::<UInt32Type>::from(right),
             ))
         }
+        JoinType::RightSemi => {
+            let mut left_indices = UInt64BufferBuilder::new(0);
+            let mut right_indices = UInt32BufferBuilder::new(0);
+
+            // Visit all of the right rows
+            for (row, hash_value) in hash_values.iter().enumerate() {
+                // Get the hash and find it in the build index
+
+                // For every item on the left and right we check if it matches
+                // This possibly contains rows with hash collisions,
+                // So we have to check here whether rows are equal or not
+                // We only produce one row if there is a match
+                if let Some((_, indices)) =
+                    left.0.get(*hash_value, |(hash, _)| *hash_value == *hash)
+                {
+                    for &i in indices {
+                        // Check hash collisions
+                        if equal_rows(
+                            i as usize,
+                            row,
+                            &left_join_values,
+                            &keys_values,
+                            *null_equals_null,
+                        )? {
+                            left_indices.append(i);
+                            right_indices.append(row as u32);
+                            break;
+                        }
+                    }
+                }
+            }
+
+            let left = ArrayData::builder(DataType::UInt64)
+                .len(left_indices.len())
+                .add_buffer(left_indices.finish())
+                .build()
+                .unwrap();
+            let right = ArrayData::builder(DataType::UInt32)
+                .len(right_indices.len())
+                .add_buffer(right_indices.finish())
+                .build()
+                .unwrap();
+
+            Ok((
+                PrimitiveArray::<UInt64Type>::from(left),
+                PrimitiveArray::<UInt32Type>::from(right),
+            ))
+        }
         JoinType::Left => {
             let mut left_indices = UInt64Builder::with_capacity(0);
             let mut right_indices = UInt32Builder::with_capacity(0);
@@ -853,7 +901,11 @@ fn apply_join_filter(
     )?;
 
     match join_type {
-        JoinType::Inner | JoinType::Left | JoinType::Anti | JoinType::Semi => {
+        JoinType::Inner
+        | JoinType::Left
+        | JoinType::LeftAnti
+        | JoinType::LeftSemi
+        | JoinType::RightSemi => {
             // For both INNER and LEFT joins, input arrays contains only 
indices for matched data.
             // Due to this fact it's correct to simply apply filter to 
intermediate batch and return
             // indices for left/right rows satisfying filter predicate
@@ -1280,14 +1332,19 @@ impl HashJoinStream {
         let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
             let num_rows = left_data.1.num_rows();
             match self.join_type {
-                JoinType::Left | JoinType::Full | JoinType::Semi | 
JoinType::Anti => {
+                JoinType::Left
+                | JoinType::Full
+                | JoinType::LeftSemi
+                | JoinType::LeftAnti => {
                     let mut buffer = BooleanBufferBuilder::new(num_rows);
 
                     buffer.append_n(num_rows, false);
 
                     buffer
                 }
-                JoinType::Inner | JoinType::Right => 
BooleanBufferBuilder::new(0),
+                JoinType::Inner | JoinType::Right | JoinType::RightSemi => {
+                    BooleanBufferBuilder::new(0)
+                }
             }
         });
 
@@ -1318,13 +1375,13 @@ impl HashJoinStream {
                         match self.join_type {
                             JoinType::Left
                             | JoinType::Full
-                            | JoinType::Semi
-                            | JoinType::Anti => {
+                            | JoinType::LeftSemi
+                            | JoinType::LeftAnti => {
                                 left_side.iter().flatten().for_each(|x| {
                                     visited_left_side.set_bit(x as usize, 
true);
                                 });
                             }
-                            JoinType::Inner | JoinType::Right => {}
+                            JoinType::Inner | JoinType::Right | 
JoinType::RightSemi => {}
                         }
                     }
                     Some(result.map(|x| x.0))
@@ -1335,8 +1392,8 @@ impl HashJoinStream {
                     match self.join_type {
                         JoinType::Left
                         | JoinType::Full
-                        | JoinType::Semi
-                        | JoinType::Anti
+                        | JoinType::LeftSemi
+                        | JoinType::LeftAnti
                             if !self.is_exhausted =>
                         {
                             let result = produce_from_matched(
@@ -1344,7 +1401,7 @@ impl HashJoinStream {
                                 &self.schema,
                                 &self.column_indices,
                                 left_data,
-                                self.join_type != JoinType::Semi,
+                                self.join_type != JoinType::LeftSemi,
                             );
                             if let Ok(ref batch) = result {
                                 self.join_metrics.input_batches.add(1);
@@ -1360,8 +1417,9 @@ impl HashJoinStream {
                         }
                         JoinType::Left
                         | JoinType::Full
-                        | JoinType::Semi
-                        | JoinType::Anti
+                        | JoinType::LeftSemi
+                        | JoinType::RightSemi
+                        | JoinType::LeftAnti
                         | JoinType::Inner
                         | JoinType::Right => {}
                     }
@@ -2094,7 +2152,49 @@ mod tests {
             Column::new_with_schema("b1", &right.schema())?,
         )];
 
-        let join = join(left, right, on, &JoinType::Semi, false)?;
+        let join = join(left, right, on, &JoinType::LeftSemi, false)?;
+
+        let columns = columns(&join.schema());
+        assert_eq!(columns, vec!["a1", "b1", "c1"]);
+
+        let stream = join.execute(0, task_ctx)?;
+        let batches = common::collect(stream).await?;
+
+        let expected = vec![
+            "+----+----+----+",
+            "| a1 | b1 | c1 |",
+            "+----+----+----+",
+            "| 1  | 4  | 7  |",
+            "| 2  | 5  | 8  |",
+            "| 2  | 5  | 8  |",
+            "+----+----+----+",
+        ];
+        assert_batches_sorted_eq!(expected, &batches);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn join_right_semi() -> Result<()> {
+        let session_ctx = SessionContext::new();
+        let task_ctx = session_ctx.task_ctx();
+        let left = build_table(
+            ("a2", &vec![10, 20, 30, 40]),
+            ("b2", &vec![4, 5, 6, 5]), // 5 is double on the left
+            ("c2", &vec![70, 80, 90, 100]),
+        );
+        let right = build_table(
+            ("a1", &vec![1, 2, 2, 3]),
+            ("b1", &vec![4, 5, 5, 7]), // 7 does not exist on the left
+            ("c1", &vec![7, 8, 8, 9]),
+        );
+
+        let on = vec![(
+            Column::new_with_schema("b2", &left.schema())?,
+            Column::new_with_schema("b1", &right.schema())?,
+        )];
+
+        let join = join(left, right, on, &JoinType::RightSemi, false)?;
 
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1"]);
@@ -2135,7 +2235,7 @@ mod tests {
             Column::new_with_schema("b1", &right.schema())?,
         )];
 
-        let join = join(left, right, on, &JoinType::Anti, false)?;
+        let join = join(left, right, on, &JoinType::LeftAnti, false)?;
 
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["a1", "b1", "c1"]);
@@ -2196,7 +2296,7 @@ mod tests {
         let filter =
             JoinFilter::new(filter_expression, column_indices, 
intermediate_schema);
 
-        let join = join_with_filter(left, right, on, filter, &JoinType::Anti, 
false)?;
+        let join = join_with_filter(left, right, on, filter, 
&JoinType::LeftAnti, false)?;
 
         let columns = columns(&join.schema());
         assert_eq!(columns, vec!["col1", "col2", "col3"]);
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs 
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 3de712745..dfcab88c2 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -89,6 +89,12 @@ impl SortMergeJoinExec {
         let left_schema = left.schema();
         let right_schema = right.schema();
 
+        if join_type == JoinType::RightSemi {
+            return Err(DataFusionError::NotImplemented(
+                "SortMergeJoinExec does not support 
JoinType::RightSemi".to_string(),
+            ));
+        }
+
         check_join_is_valid(&left_schema, &right_schema, &on)?;
         if sort_options.len() != on.len() {
             return Err(DataFusionError::Plan(format!(
@@ -129,10 +135,11 @@ impl ExecutionPlan for SortMergeJoinExec {
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         match self.join_type {
-            JoinType::Inner | JoinType::Left | JoinType::Semi | JoinType::Anti 
=> {
-                self.left.output_ordering()
-            }
-            JoinType::Right => self.right.output_ordering(),
+            JoinType::Inner
+            | JoinType::Left
+            | JoinType::LeftSemi
+            | JoinType::LeftAnti => self.left.output_ordering(),
+            JoinType::Right | JoinType::RightSemi => 
self.right.output_ordering(),
             JoinType::Full => None,
         }
     }
@@ -173,14 +180,14 @@ impl ExecutionPlan for SortMergeJoinExec {
             JoinType::Inner
             | JoinType::Left
             | JoinType::Full
-            | JoinType::Anti
-            | JoinType::Semi => (
+            | JoinType::LeftAnti
+            | JoinType::LeftSemi => (
                 self.left.clone(),
                 self.right.clone(),
                 self.on.iter().map(|on| on.0.clone()).collect(),
                 self.on.iter().map(|on| on.1.clone()).collect(),
             ),
-            JoinType::Right => (
+            JoinType::Right | JoinType::RightSemi => (
                 self.right.clone(),
                 self.left.clone(),
                 self.on.iter().map(|on| on.1.clone()).collect(),
@@ -767,13 +774,17 @@ impl SMJStream {
             Ordering::Less => {
                 if matches!(
                     self.join_type,
-                    JoinType::Left | JoinType::Right | JoinType::Full | 
JoinType::Anti
+                    JoinType::Left
+                        | JoinType::Right
+                        | JoinType::RightSemi
+                        | JoinType::Full
+                        | JoinType::LeftAnti
                 ) {
                     join_streamed = !self.streamed_joined;
                 }
             }
             Ordering::Equal => {
-                if matches!(self.join_type, JoinType::Semi) {
+                if matches!(self.join_type, JoinType::LeftSemi) {
                     join_streamed = !self.streamed_joined;
                 }
                 if matches!(
@@ -915,7 +926,7 @@ impl SMJStream {
             let buffered_indices: UInt64Array = 
chunk.buffered_indices.finish();
 
             let mut buffered_columns =
-                if matches!(self.join_type, JoinType::Semi | JoinType::Anti) {
+                if matches!(self.join_type, JoinType::LeftSemi | 
JoinType::LeftAnti) {
                     vec![]
                 } else if let Some(buffered_idx) = chunk.buffered_batch_idx {
                     self.buffered_data.batches[buffered_idx]
@@ -1732,7 +1743,7 @@ mod tests {
             Column::new_with_schema("b1", &right.schema())?,
         )];
 
-        let (_, batches) = join_collect(left, right, on, 
JoinType::Anti).await?;
+        let (_, batches) = join_collect(left, right, on, 
JoinType::LeftAnti).await?;
         let expected = vec![
             "+----+----+----+",
             "| a1 | b1 | c1 |",
@@ -1763,7 +1774,7 @@ mod tests {
             Column::new_with_schema("b1", &right.schema())?,
         )];
 
-        let (_, batches) = join_collect(left, right, on, 
JoinType::Semi).await?;
+        let (_, batches) = join_collect(left, right, on, 
JoinType::LeftSemi).await?;
         let expected = vec![
             "+----+----+----+",
             "| a1 | b1 | c1 |",
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs 
b/datafusion/core/src/physical_plan/joins/utils.rs
index f937dc1c4..1d1560478 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -169,8 +169,9 @@ fn output_join_field(old_field: &Field, join_type: 
&JoinType, is_left: bool) ->
         JoinType::Left => !is_left, // right input is padded with nulls
         JoinType::Right => is_left, // left input is padded with nulls
         JoinType::Full => true,     // both inputs can be padded with nulls
-        JoinType::Semi => false,    // doesn't introduce nulls
-        JoinType::Anti => false,    // doesn't introduce nulls (or can it??)
+        JoinType::LeftSemi => false, // doesn't introduce nulls
+        JoinType::RightSemi => false, // doesn't introduce nulls
+        JoinType::LeftAnti => false, // doesn't introduce nulls (or can it??)
     };
 
     if force_nullable {
@@ -221,7 +222,7 @@ pub fn build_join_schema(
             // left then right
             left_fields.chain(right_fields).unzip()
         }
-        JoinType::Semi | JoinType::Anti => left
+        JoinType::LeftSemi | JoinType::LeftAnti => left
             .fields()
             .iter()
             .cloned()
@@ -236,6 +237,21 @@ pub fn build_join_schema(
                 )
             })
             .unzip(),
+        JoinType::RightSemi => right
+            .fields()
+            .iter()
+            .cloned()
+            .enumerate()
+            .map(|(index, f)| {
+                (
+                    f,
+                    ColumnIndex {
+                        index,
+                        side: JoinSide::Right,
+                    },
+                )
+            })
+            .unzip(),
     };
 
     (Schema::new(fields), column_indices)
@@ -394,8 +410,9 @@ fn estimate_join_cardinality(
             })
         }
 
-        JoinType::Semi => None,
-        JoinType::Anti => None,
+        JoinType::LeftSemi => None,
+        JoinType::LeftAnti => None,
+        JoinType::RightSemi => None,
     }
 }
 
diff --git a/datafusion/core/tests/join_fuzz.rs 
b/datafusion/core/tests/join_fuzz.rs
index 8d4f31af5..1204b4428 100644
--- a/datafusion/core/tests/join_fuzz.rs
+++ b/datafusion/core/tests/join_fuzz.rs
@@ -78,7 +78,7 @@ async fn test_semi_join_1k() {
     run_join_test(
         make_staggered_batches(10000),
         make_staggered_batches(10000),
-        JoinType::Semi,
+        JoinType::LeftSemi,
     )
     .await
 }
@@ -88,7 +88,7 @@ async fn test_anti_join_1k() {
     run_join_test(
         make_staggered_batches(10000),
         make_staggered_batches(10000),
-        JoinType::Anti,
+        JoinType::LeftAnti,
     )
     .await
 }
diff --git a/datafusion/core/tests/sql/subqueries.rs 
b/datafusion/core/tests/sql/subqueries.rs
index 8c77d860e..ed65d4391 100644
--- a/datafusion/core/tests/sql/subqueries.rs
+++ b/datafusion/core/tests/sql/subqueries.rs
@@ -94,7 +94,7 @@ where o_orderstatus in (
     let plan = ctx.optimize(&plan).unwrap();
     let actual = format!("{}", plan.display_indent());
     let expected = r#"Projection: orders.o_orderkey
-  Semi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey = 
__sq_1.l_orderkey
+  LeftSemi Join: orders.o_orderstatus = __sq_1.l_linestatus, orders.o_orderkey 
= __sq_1.l_orderkey
     TableScan: orders projection=[o_orderkey, o_orderstatus]
     Projection: lineitem.l_linestatus AS l_linestatus, lineitem.l_orderkey AS 
l_orderkey, alias=__sq_1
       TableScan: lineitem projection=[l_orderkey, l_linestatus]"#
@@ -205,7 +205,7 @@ async fn tpch_q4_correlated() -> Result<()> {
     let expected = r#"Sort: orders.o_orderpriority ASC NULLS LAST
   Projection: orders.o_orderpriority, COUNT(UInt8(1)) AS order_count
     Aggregate: groupBy=[[orders.o_orderpriority]], aggr=[[COUNT(UInt8(1))]]
-      Semi Join: orders.o_orderkey = lineitem.l_orderkey
+      LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey
         TableScan: orders projection=[o_orderkey, o_orderpriority]
         Filter: lineitem.l_commitdate < lineitem.l_receiptdate
           TableScan: lineitem projection=[l_orderkey, l_commitdate, 
l_receiptdate]"#
@@ -322,7 +322,7 @@ order by s_name;
     let actual = format!("{}", plan.display_indent());
     let expected = r#"Sort: supplier.s_name ASC NULLS LAST
   Projection: supplier.s_name, supplier.s_address
-    Semi Join: supplier.s_suppkey = __sq_2.ps_suppkey
+    LeftSemi Join: supplier.s_suppkey = __sq_2.ps_suppkey
       Inner Join: supplier.s_nationkey = nation.n_nationkey
         TableScan: supplier projection=[s_suppkey, s_name, s_address, 
s_nationkey]
         Filter: nation.n_name = Utf8("CANADA")
@@ -330,7 +330,7 @@ order by s_name;
       Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2
         Filter: CAST(partsupp.ps_availqty AS Decimal128(38, 17)) > 
__sq_3.__value
           Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, 
partsupp.ps_suppkey = __sq_3.l_suppkey
-            Semi Join: partsupp.ps_partkey = __sq_1.p_partkey
+            LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
               TableScan: partsupp projection=[ps_partkey, ps_suppkey, 
ps_availqty]
               Projection: part.p_partkey AS p_partkey, alias=__sq_1
                 Filter: part.p_name LIKE Utf8("forest%")
@@ -386,7 +386,7 @@ order by cntrycode;"#;
         Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, 
customer.c_acctbal, alias=custsale
           Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > 
__sq_1.__value
             CrossJoin:
-              Anti Join: customer.c_custkey = orders.o_custkey
+              LeftAnti Join: customer.c_custkey = orders.o_custkey
                 Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN 
([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), 
Utf8("17")])
                   TableScan: customer projection=[c_custkey, c_phone, 
c_acctbal], partial_filters=[substr(customer.c_phone, Int64(1), Int64(2)) IN 
([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), 
Utf8("17")])]
                 TableScan: orders projection=[o_custkey]
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 631a64da6..829aa6682 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -743,7 +743,7 @@ impl LogicalPlanBuilder {
         LogicalPlanBuilder::intersect_or_except(
             left_plan,
             right_plan,
-            JoinType::Semi,
+            JoinType::LeftSemi,
             is_all,
         )
     }
@@ -757,7 +757,7 @@ impl LogicalPlanBuilder {
         LogicalPlanBuilder::intersect_or_except(
             left_plan,
             right_plan,
-            JoinType::Anti,
+            JoinType::LeftAnti,
             is_all,
         )
     }
@@ -823,10 +823,11 @@ pub fn build_join_schema(
             // left then right
             left_fields.chain(right_fields).cloned().collect()
         }
-        JoinType::Semi | JoinType::Anti => {
+        JoinType::LeftSemi | JoinType::LeftAnti => {
             // Only use the left side for the schema
             left.fields().clone()
         }
+        JoinType::RightSemi => right.fields().clone(),
     };
 
     let mut metadata = left.metadata().clone();
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 284e8c9aa..38b3a789a 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -976,10 +976,12 @@ pub enum JoinType {
     Right,
     /// Full Join
     Full,
-    /// Semi Join
-    Semi,
-    /// Anti Join
-    Anti,
+    /// Left Semi Join
+    LeftSemi,
+    /// Right Semi Join
+    RightSemi,
+    /// Left Anti Join
+    LeftAnti,
 }
 
 impl Display for JoinType {
@@ -989,8 +991,9 @@ impl Display for JoinType {
             JoinType::Left => "Left",
             JoinType::Right => "Right",
             JoinType::Full => "Full",
-            JoinType::Semi => "Semi",
-            JoinType::Anti => "Anti",
+            JoinType::LeftSemi => "LeftSemi",
+            JoinType::RightSemi => "RightSemi",
+            JoinType::LeftAnti => "LeftAnti",
         };
         write!(f, "{}", join_type)
     }
diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs 
b/datafusion/optimizer/src/decorrelate_where_exists.rs
index 78e8fc32d..c8102debe 100644
--- a/datafusion/optimizer/src/decorrelate_where_exists.rs
+++ b/datafusion/optimizer/src/decorrelate_where_exists.rs
@@ -176,8 +176,8 @@ fn optimize_exists(
 
     // join our sub query into the main plan
     let join_type = match query_info.negated {
-        true => JoinType::Anti,
-        false => JoinType::Semi,
+        true => JoinType::LeftAnti,
+        false => JoinType::LeftSemi,
     };
     let mut new_plan = LogicalPlanBuilder::from(outer_input.clone()).join(
         &subqry_plan,
@@ -230,8 +230,8 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
-    Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+    LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
       TableScan: customer [c_custkey:Int64, c_name:Utf8]
       TableScan: orders [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]
     TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, 
o_totalprice:Float64;N]"#;
@@ -266,9 +266,9 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
     TableScan: customer [c_custkey:Int64, c_name:Utf8]
-    Semi Join: orders.o_orderkey = lineitem.l_orderkey [o_orderkey:Int64, 
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
+    LeftSemi Join: orders.o_orderkey = lineitem.l_orderkey [o_orderkey:Int64, 
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
       TableScan: orders [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]
       TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, l_suppkey:Int64, 
l_linenumber:Int32, l_quantity:Float64, l_extendedprice:Float64]"#;
 
@@ -296,7 +296,7 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
     TableScan: customer [c_custkey:Int64, c_name:Utf8]
     Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]
       TableScan: orders [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
@@ -451,7 +451,7 @@ mod tests {
 
         // Doesn't matter we projected an expression, just that we returned a 
result
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
     TableScan: customer [c_custkey:Int64, c_name:Utf8]
     TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, 
o_totalprice:Float64;N]"#;
 
@@ -475,7 +475,7 @@ mod tests {
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
   Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
-    Semi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+    LeftSemi Join: customer.c_custkey = orders.o_custkey [c_custkey:Int64, 
c_name:Utf8]
       TableScan: customer [c_custkey:Int64, c_name:Utf8]
       TableScan: orders [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
 
@@ -527,7 +527,7 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: test.c [c:UInt32]
-  Semi Join: test.a = sq.a [a:UInt32, b:UInt32, c:UInt32]
+  LeftSemi Join: test.a = sq.a [a:UInt32, b:UInt32, c:UInt32]
     TableScan: test [a:UInt32, b:UInt32, c:UInt32]
     TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
 
diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs 
b/datafusion/optimizer/src/decorrelate_where_in.rs
index 052ed796a..fa0367c45 100644
--- a/datafusion/optimizer/src/decorrelate_where_in.rs
+++ b/datafusion/optimizer/src/decorrelate_where_in.rs
@@ -189,8 +189,8 @@ fn optimize_where_in(
 
     // join our sub query into the main plan
     let join_type = match query_info.negated {
-        true => JoinType::Anti,
-        false => JoinType::Semi,
+        true => JoinType::LeftAnti,
+        false => JoinType::LeftSemi,
     };
     let mut new_plan = LogicalPlanBuilder::from(outer_input.clone()).join(
         &subqry_plan,
@@ -259,8 +259,8 @@ mod tests {
         debug!("plan to optimize:\n{}", plan.display_indent());
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, 
c_name:Utf8]
-    Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+    LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
       TableScan: customer [c_custkey:Int64, c_name:Utf8]
       Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
         TableScan: orders [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]
@@ -297,10 +297,10 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = __sq_2.o_custkey [c_custkey:Int64, 
c_name:Utf8]
     TableScan: customer [c_custkey:Int64, c_name:Utf8]
     Projection: orders.o_custkey AS o_custkey, alias=__sq_2 [o_custkey:Int64]
-      Semi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, 
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
+      LeftSemi Join: orders.o_orderkey = __sq_1.l_orderkey [o_orderkey:Int64, 
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
         TableScan: orders [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]
         Projection: lineitem.l_orderkey AS l_orderkey, alias=__sq_1 
[l_orderkey:Int64]
           TableScan: lineitem [l_orderkey:Int64, l_partkey:Int64, 
l_suppkey:Int64, l_linenumber:Int32, l_quantity:Float64, 
l_extendedprice:Float64]"#;
@@ -329,7 +329,7 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
     TableScan: customer [c_custkey:Int64, c_name:Utf8]
     Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
       Filter: orders.o_orderkey = Int32(1) [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]
@@ -356,7 +356,7 @@ mod tests {
 
         // Query will fail, but we can still transform the plan
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
     TableScan: customer [c_custkey:Int64, c_name:Utf8]
     Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
       Filter: customer.c_custkey = customer.c_custkey [o_orderkey:Int64, 
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
@@ -382,7 +382,7 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
     TableScan: customer [c_custkey:Int64, c_name:Utf8]
     Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
       Filter: orders.o_custkey = orders.o_custkey [o_orderkey:Int64, 
o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
@@ -408,7 +408,7 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
-  Semi Join: customer.c_custkey = __sq_1.o_custkey Filter: customer.c_custkey 
!= orders.o_custkey [c_custkey:Int64, c_name:Utf8]
+  LeftSemi Join: customer.c_custkey = __sq_1.o_custkey Filter: 
customer.c_custkey != orders.o_custkey [c_custkey:Int64, c_name:Utf8]
     TableScan: customer [c_custkey:Int64, c_name:Utf8]
     Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
       TableScan: orders [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
@@ -585,7 +585,7 @@ mod tests {
 
         let expected = r#"Projection: customer.c_custkey [c_custkey:Int64]
   Filter: customer.c_custkey = Int32(1) [c_custkey:Int64, c_name:Utf8]
-    Semi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
+    LeftSemi Join: customer.c_custkey = __sq_1.o_custkey [c_custkey:Int64, 
c_name:Utf8]
       TableScan: customer [c_custkey:Int64, c_name:Utf8]
       Projection: orders.o_custkey AS o_custkey, alias=__sq_1 [o_custkey:Int64]
         TableScan: orders [o_orderkey:Int64, o_custkey:Int64, 
o_orderstatus:Utf8, o_totalprice:Float64;N]"#;
@@ -641,7 +641,7 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: test.b [b:UInt32]
-  Semi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, 
c:UInt32]
+  LeftSemi Join: test.c = __sq_1.c, test.a = __sq_1.a [a:UInt32, b:UInt32, 
c:UInt32]
     TableScan: test [a:UInt32, b:UInt32, c:UInt32]
     Projection: sq.c AS c, sq.a AS a, alias=__sq_1 [c:UInt32, a:UInt32]
       TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
@@ -660,7 +660,7 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: test.b [b:UInt32]
-  Semi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
+  LeftSemi Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
     TableScan: test [a:UInt32, b:UInt32, c:UInt32]
     Projection: sq.c AS c, alias=__sq_1 [c:UInt32]
       TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
@@ -679,7 +679,7 @@ mod tests {
             .build()?;
 
         let expected = r#"Projection: test.b [b:UInt32]
-  Anti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
+  LeftAnti Join: test.c = __sq_1.c [a:UInt32, b:UInt32, c:UInt32]
     TableScan: test [a:UInt32, b:UInt32, c:UInt32]
     Projection: sq.c AS c, alias=__sq_1 [c:UInt32]
       TableScan: sq [a:UInt32, b:UInt32, c:UInt32]"#;
diff --git a/datafusion/optimizer/src/filter_push_down.rs 
b/datafusion/optimizer/src/filter_push_down.rs
index 148ae6715..902b02817 100644
--- a/datafusion/optimizer/src/filter_push_down.rs
+++ b/datafusion/optimizer/src/filter_push_down.rs
@@ -176,7 +176,10 @@ fn lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, 
bool)> {
             JoinType::Full => Ok((false, false)),
             // No columns from the right side of the join can be referenced in 
output
             // predicates for semi/anti joins, so whether we specify t/f 
doesn't matter.
-            JoinType::Semi | JoinType::Anti => Ok((true, false)),
+            JoinType::LeftSemi | JoinType::LeftAnti => Ok((true, false)),
+            // No columns from the left side of the join can be referenced in 
output
+            // predicates for semi/anti joins, so whether we specify t/f 
doesn't matter.
+            JoinType::RightSemi => Ok((false, true)),
         },
         LogicalPlan::CrossJoin(_) => Ok((true, true)),
         _ => Err(DataFusionError::Internal(
@@ -195,7 +198,7 @@ fn on_lr_is_preserved(plan: &LogicalPlan) -> Result<(bool, 
bool)> {
             JoinType::Left => Ok((false, true)),
             JoinType::Right => Ok((true, false)),
             JoinType::Full => Ok((false, false)),
-            JoinType::Semi | JoinType::Anti => {
+            JoinType::LeftSemi | JoinType::LeftAnti | JoinType::RightSemi => {
                 // filter_push_down does not yet support SEMI/ANTI joins with 
join conditions
                 Ok((false, false))
             }
diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs 
b/datafusion/optimizer/src/subquery_filter_to_join.rs
index 29f51a42f..c1717da0d 100644
--- a/datafusion/optimizer/src/subquery_filter_to_join.rs
+++ b/datafusion/optimizer/src/subquery_filter_to_join.rs
@@ -116,9 +116,9 @@ impl OptimizerRule for SubqueryFilterToJoin {
                             };
 
                             let join_type = if *negated {
-                                JoinType::Anti
+                                JoinType::LeftAnti
                             } else {
-                                JoinType::Semi
+                                JoinType::LeftSemi
                             };
 
                             let schema = build_join_schema(
@@ -231,7 +231,7 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  Semi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
         \n    Projection: sq.c [c:UInt32]\
         \n      TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -250,7 +250,7 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  Anti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftAnti Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
         \n    Projection: sq.c [c:UInt32]\
         \n      TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -272,8 +272,8 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  Semi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
-        \n    Semi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftSemi Join: test.b = sq_2.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    LeftSemi Join: test.c = sq_1.c [a:UInt32, b:UInt32, c:UInt32]\
         \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
         \n      Projection: sq_1.c [c:UInt32]\
         \n        TableScan: sq_1 [a:UInt32, b:UInt32, c:UInt32]\
@@ -301,7 +301,7 @@ mod tests {
 
         let expected = "Projection: test.b [b:UInt32]\
         \n  Filter: test.a = UInt32(1) AND test.b < UInt32(30) [a:UInt32, 
b:UInt32, c:UInt32]\
-        \n    Semi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n    LeftSemi Join: test.c = sq.c [a:UInt32, b:UInt32, c:UInt32]\
         \n      TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
         \n      Projection: sq.c [c:UInt32]\
         \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]";
@@ -381,10 +381,10 @@ mod tests {
             .build()?;
 
         let expected = "Projection: test.b [b:UInt32]\
-        \n  Semi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\
+        \n  LeftSemi Join: test.b = sq.a [a:UInt32, b:UInt32, c:UInt32]\
         \n    TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
         \n    Projection: sq.a [a:UInt32]\
-        \n      Semi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n      LeftSemi Join: sq.a = sq_nested.c [a:UInt32, b:UInt32, 
c:UInt32]\
         \n        TableScan: sq [a:UInt32, b:UInt32, c:UInt32]\
         \n        Projection: sq_nested.c [c:UInt32]\
         \n          TableScan: sq_nested [a:UInt32, b:UInt32, c:UInt32]";
@@ -413,7 +413,7 @@ mod tests {
         \n    Subquery: [c:UInt32]\n      Projection: sq_outer.c [c:UInt32]\
         \n        TableScan: sq_outer [a:UInt32, b:UInt32, c:UInt32]\
         \n    Projection: test.b, test.c, alias=wrapped [b:UInt32, c:UInt32]\
-        \n      Semi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, c:UInt32]\
+        \n      LeftSemi Join: test.c = sq_inner.c [a:UInt32, b:UInt32, 
c:UInt32]\
         \n        TableScan: test [a:UInt32, b:UInt32, c:UInt32]\
         \n        Projection: sq_inner.c [c:UInt32]\
         \n          TableScan: sq_inner [a:UInt32, b:UInt32, c:UInt32]";
diff --git a/datafusion/optimizer/tests/integration-test.rs 
b/datafusion/optimizer/tests/integration-test.rs
index 8003d590e..71a18b624 100644
--- a/datafusion/optimizer/tests/integration-test.rs
+++ b/datafusion/optimizer/tests/integration-test.rs
@@ -117,7 +117,7 @@ fn semi_join_with_join_filter() -> Result<()> {
                AND test.col_uint32 != t2.col_uint32)";
     let plan = test_sql(sql)?;
     let expected = "Projection: test.col_utf8\
-                    \n  Semi Join: test.col_int32 = t2.col_int32 Filter: 
test.col_uint32 != t2.col_uint32\
+                    \n  LeftSemi Join: test.col_int32 = t2.col_int32 Filter: 
test.col_uint32 != t2.col_uint32\
                     \n    TableScan: test projection=[col_int32, col_uint32, 
col_utf8]\
                     \n    SubqueryAlias: t2\
                     \n      TableScan: test projection=[col_int32, col_uint32, 
col_utf8]";
@@ -133,7 +133,7 @@ fn anti_join_with_join_filter() -> Result<()> {
                AND test.col_uint32 != t2.col_uint32)";
     let plan = test_sql(sql)?;
     let expected = "Projection: test.col_utf8\
-                    \n  Anti Join: test.col_int32 = t2.col_int32 Filter: 
test.col_uint32 != t2.col_uint32\
+                    \n  LeftAnti Join: test.col_int32 = t2.col_int32 Filter: 
test.col_uint32 != t2.col_uint32\
                     \n    TableScan: test projection=[col_int32, col_uint32, 
col_utf8]\
                     \n    SubqueryAlias: t2\
                     \n      TableScan: test projection=[col_int32, col_uint32, 
col_utf8]";
@@ -148,7 +148,7 @@ fn where_exists_distinct() -> Result<()> {
                SELECT DISTINCT col_int32 FROM test t2 WHERE test.col_int32 = 
t2.col_int32)";
     let plan = test_sql(sql)?;
     let expected = "Projection: test.col_int32\
-                    \n  Semi Join: test.col_int32 = t2.col_int32\
+                    \n  LeftSemi Join: test.col_int32 = t2.col_int32\
                     \n    TableScan: test projection=[col_int32]\
                     \n    SubqueryAlias: t2\
                     \n      TableScan: test projection=[col_int32]";
@@ -163,9 +163,9 @@ fn intersect() -> Result<()> {
     INTERSECT SELECT col_int32, col_utf8 FROM test";
     let plan = test_sql(sql)?;
     let expected =
-        "Semi Join: test.col_int32 = test.col_int32, test.col_utf8 = 
test.col_utf8\
+        "LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = 
test.col_utf8\
     \n  Distinct:\
-    \n    Semi Join: test.col_int32 = test.col_int32, test.col_utf8 = 
test.col_utf8\
+    \n    LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = 
test.col_utf8\
     \n      Distinct:\
     \n        TableScan: test projection=[col_int32, col_utf8]\
     \n      TableScan: test projection=[col_int32, col_utf8]\
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index a878f285d..78b5669ed 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -230,8 +230,9 @@ enum JoinType {
   LEFT = 1;
   RIGHT = 2;
   FULL = 3;
-  SEMI = 4;
-  ANTI = 5;
+  LEFTSEMI = 4;
+  LEFTANTI = 5;
+  RIGHTSEMI = 6;
 }
 
 enum JoinConstraint {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index e6b095b84..e8110b431 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -6402,8 +6402,9 @@ impl serde::Serialize for JoinType {
             Self::Left => "LEFT",
             Self::Right => "RIGHT",
             Self::Full => "FULL",
-            Self::Semi => "SEMI",
-            Self::Anti => "ANTI",
+            Self::Leftsemi => "LEFTSEMI",
+            Self::Leftanti => "LEFTANTI",
+            Self::Rightsemi => "RIGHTSEMI",
         };
         serializer.serialize_str(variant)
     }
@@ -6419,8 +6420,9 @@ impl<'de> serde::Deserialize<'de> for JoinType {
             "LEFT",
             "RIGHT",
             "FULL",
-            "SEMI",
-            "ANTI",
+            "LEFTSEMI",
+            "LEFTANTI",
+            "RIGHTSEMI",
         ];
 
         struct GeneratedVisitor;
@@ -6467,8 +6469,9 @@ impl<'de> serde::Deserialize<'de> for JoinType {
                     "LEFT" => Ok(JoinType::Left),
                     "RIGHT" => Ok(JoinType::Right),
                     "FULL" => Ok(JoinType::Full),
-                    "SEMI" => Ok(JoinType::Semi),
-                    "ANTI" => Ok(JoinType::Anti),
+                    "LEFTSEMI" => Ok(JoinType::Leftsemi),
+                    "LEFTANTI" => Ok(JoinType::Leftanti),
+                    "RIGHTSEMI" => Ok(JoinType::Rightsemi),
                     _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
                 }
             }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 149edf020..cf6978faf 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1110,8 +1110,9 @@ pub enum JoinType {
     Left = 1,
     Right = 2,
     Full = 3,
-    Semi = 4,
-    Anti = 5,
+    Leftsemi = 4,
+    Leftanti = 5,
+    Rightsemi = 6,
 }
 impl JoinType {
     /// String value of the enum field names used in the ProtoBuf definition.
@@ -1124,8 +1125,9 @@ impl JoinType {
             JoinType::Left => "LEFT",
             JoinType::Right => "RIGHT",
             JoinType::Full => "FULL",
-            JoinType::Semi => "SEMI",
-            JoinType::Anti => "ANTI",
+            JoinType::Leftsemi => "LEFTSEMI",
+            JoinType::Leftanti => "LEFTANTI",
+            JoinType::Rightsemi => "RIGHTSEMI",
         }
     }
 }
diff --git a/datafusion/proto/src/logical_plan.rs 
b/datafusion/proto/src/logical_plan.rs
index 278130b06..1f0e40895 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -239,8 +239,9 @@ impl From<protobuf::JoinType> for JoinType {
             protobuf::JoinType::Left => JoinType::Left,
             protobuf::JoinType::Right => JoinType::Right,
             protobuf::JoinType::Full => JoinType::Full,
-            protobuf::JoinType::Semi => JoinType::Semi,
-            protobuf::JoinType::Anti => JoinType::Anti,
+            protobuf::JoinType::Leftsemi => JoinType::LeftSemi,
+            protobuf::JoinType::Rightsemi => JoinType::RightSemi,
+            protobuf::JoinType::Leftanti => JoinType::LeftAnti,
         }
     }
 }
@@ -252,8 +253,9 @@ impl From<JoinType> for protobuf::JoinType {
             JoinType::Left => protobuf::JoinType::Left,
             JoinType::Right => protobuf::JoinType::Right,
             JoinType::Full => protobuf::JoinType::Full,
-            JoinType::Semi => protobuf::JoinType::Semi,
-            JoinType::Anti => protobuf::JoinType::Anti,
+            JoinType::LeftSemi => protobuf::JoinType::Leftsemi,
+            JoinType::RightSemi => protobuf::JoinType::Rightsemi,
+            JoinType::LeftAnti => protobuf::JoinType::Leftanti,
         }
     }
 }

Reply via email to