andygrove opened a new issue, #2842:
URL: https://github.com/apache/arrow-datafusion/issues/2842

   **Describe the bug**
   We currently have incorrect behavior for Anti joins where this is a join 
filter. Here is a repro case demonstrated in Spark:
   
   ```scala
   scala> val df = spark.sql("""select * from (values (1,2,3), (3,4,5)) a
   | where not exists (select col1 from (values (1,2,3), (3,4,5)) b where 
b.col1 = a.col1 and b.col2 <> a.col2)
   | """)
   df: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 1 more field]
   
   scala> df.show
   +----+----+----+
   |col1|col2|col3|
   +----+----+----+
   |   1|   2|   3|
   |   3|   4|   5|
   +----+----+----+
   ```
   
   The following unit test can be added to `hash_join.rs` to attempt the same 
query.
   
   ```rust
       #[tokio::test]
       async fn join_anti_with_filter() -> Result<()> {
           let session_ctx = SessionContext::new();
           let task_ctx = session_ctx.task_ctx();
           let left = build_table(
               ("col1", &vec![1, 3]),
               ("col2", &vec![2, 4]),
               ("col3", &vec![3, 5]),
           );
           let right = left.clone();
   
           // join on col1
           let on = vec![(
               Column::new_with_schema("col1", &left.schema())?,
               Column::new_with_schema("col1", &right.schema())?,
           )];
   
           // build filter b.col2 <> a.col2
           let column_indices = vec![
               ColumnIndex {
                   index: 1,
                   side: JoinSide::Left,
               },
               ColumnIndex {
                   index: 1,
                   side: JoinSide::Right,
               },
           ];
           let intermediate_schema = Schema::new(vec![
               Field::new("x", DataType::Int32, true),
               Field::new("x", DataType::Int32, true),
           ]);
           let filter_expression = Arc::new(BinaryExpr::new(
               Arc::new(Column::new("x", 0)),
               Operator::NotEq,
               Arc::new(Column::new("x", 1)),
           )) as Arc<dyn PhysicalExpr>;
   
           let filter = JoinFilter::new(filter_expression, column_indices, 
intermediate_schema);
   
           let join = join_with_filter(left, right, on, filter,&JoinType::Anti, 
false)?;
   
           let columns = columns(&join.schema());
           assert_eq!(columns, vec!["col1", "col2", "col3"]);
   
           let stream = join.execute(0, task_ctx)?;
           let batches = common::collect(stream).await?;
   
           let expected = vec![
               "+----+----+----+",
               "| col1 | col2 | col3 |",
               "+----+----+----+",
               "| 1  | 2  | 3  |",
               "| 3  | 4  | 5 |",
               "+----+----+----+",
           ];
           assert_batches_sorted_eq!(expected, &batches);
           Ok(())
       }
   ```
   
   The test fails and produces an empty batch.
   
   **To Reproduce**
   See above
   
   **Expected behavior**
   Test should pass
   
   **Additional context**
   None
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to