wangxiaoying opened a new issue, #2271:
URL: https://github.com/apache/arrow-datafusion/issues/2271

   **Describe the bug**
   Currently the join result might be incorrect when the predicate contains OR 
operation, since the filter will be pushed down.
   
   **To Reproduce**
   ```rust
   fn main() {
       let ctx = SessionContext::new();
   
       let id64_array = Int64Array::from(vec![Some(1), Some(2), Some(0), 
Some(3), Some(4)]);
       let str_array = StringArray::from(vec!["1", "2", "3", "4", "5"]);
       let schema = Schema::new(vec![
           Field::new("test_int", DataType::Int64, true),
           Field::new("str", DataType::Utf8, false),
       ]);
   
       let batch = RecordBatch::try_new(
           Arc::new(schema),
           vec![Arc::new(id64_array), Arc::new(str_array)],
       )
       .unwrap();
   
       let db1 = MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap();
       ctx.register_table("t1", Arc::new(db1)).unwrap();
   
       let sql = "select * from t1 inner join t1 tmp on t1.test_int = 
tmp.test_int and t1.str = '3' OR tmp.str = '1000'";
   
       let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to 
create runtime"));
       let df = rt.block_on(ctx.sql(sql)).unwrap();
       rt.block_on(df.limit(5).unwrap().explain(false, false).unwrap().show())
           .unwrap();
       rt.block_on(df.limit(5).unwrap().show()).unwrap();
       let num_rows = rt
           .block_on(df.collect())
           .unwrap()
           .into_iter()
           .map(|rb| rb.num_rows())
           .sum::<usize>();
       println!("Final # rows: {}", num_rows);
   }
   ```
   
   Result is
   ```
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
    |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: 5                                                   
                                                                                
    |
   |               |   Projection: #t1.test_int, #t1.str, #tmp.test_int, 
#tmp.str                                                                        
           |
   |               |     Inner Join: #t1.test_int = #tmp.test_int               
                                                                                
    |
   |               |       Filter: #t1.str = Utf8("3")                          
                                                                                
    |
   |               |         TableScan: t1 projection=Some([0, 1])              
                                                                                
    |
   |               |       Filter: #tmp.str = Utf8("1000")                      
                                                                                
    |
   |               |         SubqueryAlias: tmp                                 
                                                                                
    |
   |               |           TableScan: t1 projection=Some([0, 1])            
                                                                                
    |
   | physical_plan | GlobalLimitExec: limit=5                                   
                                                                                
    |
   |               |   CoalescePartitionsExec                                   
                                                                                
    |
   |               |     LocalLimitExec: limit=5                                
                                                                                
    |
   |               |       ProjectionExec: expr=[test_int@0 as test_int, str@1 
as str, test_int@2 as test_int, str@3 as str]                                   
     |
   |               |         CoalesceBatchesExec: target_batch_size=4096        
                                                                                
    |
   |               |           HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(Column { name: "test_int", index: 0 }, Column { name: "test_int", index: 0 
})] |
   |               |             CoalesceBatchesExec: target_batch_size=4096    
                                                                                
    |
   |               |               RepartitionExec: partitioning=Hash([Column { 
name: "test_int", index: 0 }], 8)                                               
    |
   |               |                 CoalesceBatchesExec: 
target_batch_size=4096                                                          
                          |
   |               |                   FilterExec: str@1 = 3                    
                                                                                
    |
   |               |                     RepartitionExec: 
partitioning=RoundRobinBatch(8)                                                 
                          |
   |               |                       MemoryExec: partitions=1, 
partition_sizes=[1]                                                             
               |
   |               |             CoalesceBatchesExec: target_batch_size=4096    
                                                                                
    |
   |               |               RepartitionExec: partitioning=Hash([Column { 
name: "test_int", index: 0 }], 8)                                               
    |
   |               |                 CoalesceBatchesExec: 
target_batch_size=4096                                                          
                          |
   |               |                   FilterExec: str@1 = 1000                 
                                                                                
    |
   |               |                     RepartitionExec: 
partitioning=RoundRobinBatch(8)                                                 
                          |
   |               |                       MemoryExec: partitions=1, 
partition_sizes=[1]                                                             
               |
   |               |                                                            
                                                                                
    |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
   ++
   ++
   Final # rows: 0
   ```
   
   **Expected behavior**
   
   The result should look like
   ```
   
+---------------+-------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                           |
   
+---------------+-------------------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: 5                                                   
                                           |
   |               |   Projection: #t1.test_int, #t1.str, #tmp.test_int, 
#tmp.str                                          |
   |               |     Filter: #t1.test_int = #tmp.test_int AND #t1.str = 
Utf8("3") OR #tmp.str = Utf8("1000")           |
   |               |       CrossJoin:                                           
                                           |
   |               |         TableScan: t1 projection=Some([0, 1])              
                                           |
   |               |         SubqueryAlias: tmp                                 
                                           |
   |               |           TableScan: t1 projection=Some([0, 1])            
                                           |
   | physical_plan | GlobalLimitExec: limit=5                                   
                                           |
   |               |   CoalescePartitionsExec                                   
                                           |
   |               |     ProjectionExec: expr=[test_int@0 as test_int, str@1 as 
str, test_int@2 as test_int, str@3 as str] |
   |               |       CoalesceBatchesExec: target_batch_size=4096          
                                           |
   |               |         FilterExec: test_int@0 = test_int@2 AND str@1 = 3 
OR str@3 = 1000                             |
   |               |           CrossJoinExec                                    
                                           |
   |               |             RepartitionExec: 
partitioning=RoundRobinBatch(8)                                          |
   |               |               MemoryExec: partitions=1, 
partition_sizes=[1]                                           |
   |               |             RepartitionExec: 
partitioning=RoundRobinBatch(8)                                          |
   |               |               MemoryExec: partitions=1, 
partition_sizes=[1]                                           |
   |               |                                                            
                                           |
   
+---------------+-------------------------------------------------------------------------------------------------------+
   +----------+-----+----------+-----+
   | test_int | str | test_int | str |
   +----------+-----+----------+-----+
   | 0        | 3   | 0        | 3   |
   +----------+-----+----------+-----+
   Final # rows: 1
   ```
   
   Where the predicate `Filter: #t1.str = Utf8("3")` should not be pushed down 
due to the `OR`.
   
   I created a similar table in postgres and apply a similar query, here is the 
correct result:
   <img width="1082" alt="image" 
src="https://user-images.githubusercontent.com/5569610/163919342-c8d23585-03f2-4ce2-a5ac-e043c56282b1.png";>
   
   **Additional context**
   These two queries should output the same result:
   q1: `select * from t1 cross join t1 tmp where t1.test_int = tmp.test_int and 
t1.str = '3' OR tmp.str = '1000'`
   q2: `select * from t1 inner join t1 tmp on t1.test_int = tmp.test_int and 
t1.str = '3' OR tmp.str = '1000'`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to