Gentoli opened a new issue, #3681:
URL: https://github.com/apache/arrow-datafusion/issues/3681

   **Describe the bug**
   Filter is not pushdown if a dataframe is registered as a table.
   
   **To Reproduce**
   Not isolated to join, using it here to illustrate the issue.
   
   Steps to reproduce the behaviour:
   
   ``` rust
   let ctx = SessionContext::new();
   
   ctx.register_csv("test", "./test.csv", CsvReadOptions::new()).await.unwrap();
   let right = ctx.read_csv("./test2.csv", 
CsvReadOptions::new()).await.unwrap();
   
   let df = ctx.table("test").unwrap()
       .join(right, JoinType::Inner, &["name_1"], &["name_2"], None)
       .unwrap();
   
   println!("works");
   df.filter(col("name_1").eq(lit("andrew"))).unwrap()
       .explain(false, false).unwrap().show().await.unwrap();
   
   ctx.register_table("table_alias", df.clone()).unwrap();
   
   println!("don't work");
   ctx.table("table_alias").unwrap()
       .filter(col("name_1").eq(lit("andrew"))).unwrap()
       .explain(false, false).unwrap().show().await.unwrap();
   
   ```
   <details>
     <summary>Output + Test files</summary>
   
   
     ``` log
   works
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
          |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Inner Join: #test.name_1 = #?table?.name_2                 
                                                                                
          |
   |               |   Filter: #test.name_1 = Utf8("andrew")                    
                                                                                
          |
   |               |     TableScan: test projection=[name_1, num], 
partial_filters=[#test.name_1 = Utf8("andrew")]                                 
                       |
   |               |   Filter: #?table?.name_2 = Utf8("andrew")                 
                                                                                
          |
   |               |     TableScan: ?table? projection=[name_2, id], 
partial_filters=[#?table?.name_2 = Utf8("andrew")]                              
                     |
   | physical_plan | CoalesceBatchesExec: target_batch_size=4096                
                                                                                
          |
   |               |   HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(Column { name: "name_1", index: 0 }, Column { name: "name_2", index: 0 })] 
                  |
   |               |     CoalesceBatchesExec: target_batch_size=4096            
                                                                                
          |
   |               |       RepartitionExec: partitioning=Hash([Column { name: 
"name_1", index: 0 }], 12)                                                      
            |
   |               |         CoalesceBatchesExec: target_batch_size=4096        
                                                                                
          |
   |               |           FilterExec: name_1@0 = andrew                    
                                                                                
          |
   |               |             RepartitionExec: 
partitioning=RoundRobinBatch(12)                                                
                                        |
   |               |               CsvExec: files=[<>/test.csv], 
has_header=true, limit=None, projection=[name_1, num] |
   |               |     CoalesceBatchesExec: target_batch_size=4096            
                                                                                
          |
   |               |       RepartitionExec: partitioning=Hash([Column { name: 
"name_2", index: 0 }], 12)                                                      
            |
   |               |         CoalesceBatchesExec: target_batch_size=4096        
                                                                                
          |
   |               |           FilterExec: name_2@0 = andrew                    
                                                                                
          |
   |               |             RepartitionExec: 
partitioning=RoundRobinBatch(12)                                                
                                        |
   |               |               CsvExec: files=[<>/test2.csv], 
has_header=true, limit=None, projection=[name_2, id] |
   |               |                                                            
                                                                                
          |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
   don't work
   
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                    |
   
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Filter: #table_alias.name_1 = Utf8("andrew")               
                                                                                
                    |
   |               |   TableScan: table_alias projection=[name_1, num, name_2, 
id]                                                                             
                     |
   | physical_plan | CoalesceBatchesExec: target_batch_size=4096                
                                                                                
                    |
   |               |   FilterExec: name_1@0 = andrew                            
                                                                                
                    |
   |               |     ProjectionExec: expr=[name_1@0 as name_1, num@1 as 
num, name_2@2 as name_2, id@3 as id]                                            
                        |
   |               |       CoalesceBatchesExec: target_batch_size=4096          
                                                                                
                    |
   |               |         CoalesceBatchesExec: target_batch_size=4096        
                                                                                
                    |
   |               |           HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(Column { name: "name_1", index: 0 }, Column { name: "name_2", index: 0 })] 
                    |
   |               |             CoalesceBatchesExec: target_batch_size=4096    
                                                                                
                    |
   |               |               CoalesceBatchesExec: target_batch_size=4096  
                                                                                
                    |
   |               |                 RepartitionExec: partitioning=Hash([Column 
{ name: "name_1", index: 0 }], 12)                                              
                    |
   |               |                   CoalesceBatchesExec: 
target_batch_size=4096                                                          
                                        |
   |               |                     RepartitionExec: 
partitioning=RoundRobinBatch(12)                                                
                                          |
   |               |                       RepartitionExec: 
partitioning=RoundRobinBatch(12)                                                
                                        |
   |               |                         CsvExec: files=[<>/test.csv], 
has_header=true, limit=None, projection=[name_1, num] |
   |               |             CoalesceBatchesExec: target_batch_size=4096    
                                                                                
                    |
   |               |               CoalesceBatchesExec: target_batch_size=4096  
                                                                                
                    |
   |               |                 RepartitionExec: partitioning=Hash([Column 
{ name: "name_2", index: 0 }], 12)                                              
                    |
   |               |                   CoalesceBatchesExec: 
target_batch_size=4096                                                          
                                        |
   |               |                     RepartitionExec: 
partitioning=RoundRobinBatch(12)                                                
                                          |
   |               |                       RepartitionExec: 
partitioning=RoundRobinBatch(12)                                                
                                        |
   |               |                         CsvExec: files=[<>/test2.csv], 
has_header=true, limit=None, projection=[name_2, id] |
   |               |                                                            
                                                                                
                    |
   
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
   
     ```
   
   test.csv
   ``` csv
   name_1,num
   andrew,100
   jorge,200
   andy,150
   paul,300
   ```
   test2.csv
   ```
   name_2,id
   andrew,1
   jorge,2
   andy,3
   paul,4
   ```
   </details>
   
   **Expected behavior**
   With join, the filtering should happening before join.
   
   **Additional context**
   Seems like the `TableProvider` implementation [tries to support 
filtering](https://github.com/apache/arrow-datafusion/blob/0b204c608ca6a00b279c2aeb731a40c5d07e3357/datafusion/core/src/dataframe.rs#L804-L807).
 But it's the default `supports_filter_pushdown` is responding to with 
`Unsupported` thus there will not be filters passed down.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to