Gentoli opened a new issue, #3681:
URL: https://github.com/apache/arrow-datafusion/issues/3681
**Describe the bug**
Filter is not pushdown if a dataframe is registered as a table.
**To Reproduce**
Not isolated to join, using it here to illustrate the issue.
Steps to reproduce the behaviour:
``` rust
let ctx = SessionContext::new();
ctx.register_csv("test", "./test.csv", CsvReadOptions::new()).await.unwrap();
let right = ctx.read_csv("./test2.csv",
CsvReadOptions::new()).await.unwrap();
let df = ctx.table("test").unwrap()
.join(right, JoinType::Inner, &["name_1"], &["name_2"], None)
.unwrap();
println!("works");
df.filter(col("name_1").eq(lit("andrew"))).unwrap()
.explain(false, false).unwrap().show().await.unwrap();
ctx.register_table("table_alias", df.clone()).unwrap();
println!("don't work");
ctx.table("table_alias").unwrap()
.filter(col("name_1").eq(lit("andrew"))).unwrap()
.explain(false, false).unwrap().show().await.unwrap();
```
<details>
<summary>Output + Test files</summary>
``` log
works
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Inner Join: #test.name_1 = #?table?.name_2
|
| | Filter: #test.name_1 = Utf8("andrew")
|
| | TableScan: test projection=[name_1, num],
partial_filters=[#test.name_1 = Utf8("andrew")]
|
| | Filter: #?table?.name_2 = Utf8("andrew")
|
| | TableScan: ?table? projection=[name_2, id],
partial_filters=[#?table?.name_2 = Utf8("andrew")]
|
| physical_plan | CoalesceBatchesExec: target_batch_size=4096
|
| | HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(Column { name: "name_1", index: 0 }, Column { name: "name_2", index: 0 })]
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | RepartitionExec: partitioning=Hash([Column { name:
"name_1", index: 0 }], 12)
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | FilterExec: name_1@0 = andrew
|
| | RepartitionExec:
partitioning=RoundRobinBatch(12)
|
| | CsvExec: files=[<>/test.csv],
has_header=true, limit=None, projection=[name_1, num] |
| | CoalesceBatchesExec: target_batch_size=4096
|
| | RepartitionExec: partitioning=Hash([Column { name:
"name_2", index: 0 }], 12)
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | FilterExec: name_2@0 = andrew
|
| | RepartitionExec:
partitioning=RoundRobinBatch(12)
|
| | CsvExec: files=[<>/test2.csv],
has_header=true, limit=None, projection=[name_2, id] |
| |
|
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------+
don't work
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: #table_alias.name_1 = Utf8("andrew")
|
| | TableScan: table_alias projection=[name_1, num, name_2,
id]
|
| physical_plan | CoalesceBatchesExec: target_batch_size=4096
|
| | FilterExec: name_1@0 = andrew
|
| | ProjectionExec: expr=[name_1@0 as name_1, num@1 as
num, name_2@2 as name_2, id@3 as id]
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(Column { name: "name_1", index: 0 }, Column { name: "name_2", index: 0 })]
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | RepartitionExec: partitioning=Hash([Column
{ name: "name_1", index: 0 }], 12)
|
| | CoalesceBatchesExec:
target_batch_size=4096
|
| | RepartitionExec:
partitioning=RoundRobinBatch(12)
|
| | RepartitionExec:
partitioning=RoundRobinBatch(12)
|
| | CsvExec: files=[<>/test.csv],
has_header=true, limit=None, projection=[name_1, num] |
| | CoalesceBatchesExec: target_batch_size=4096
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | RepartitionExec: partitioning=Hash([Column
{ name: "name_2", index: 0 }], 12)
|
| | CoalesceBatchesExec:
target_batch_size=4096
|
| | RepartitionExec:
partitioning=RoundRobinBatch(12)
|
| | RepartitionExec:
partitioning=RoundRobinBatch(12)
|
| | CsvExec: files=[<>/test2.csv],
has_header=true, limit=None, projection=[name_2, id] |
| |
|
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
test.csv
``` csv
name_1,num
andrew,100
jorge,200
andy,150
paul,300
```
test2.csv
```
name_2,id
andrew,1
jorge,2
andy,3
paul,4
```
</details>
**Expected behavior**
With join, the filtering should happening before join.
**Additional context**
Seems like the `TableProvider` implementation [tries to support
filtering](https://github.com/apache/arrow-datafusion/blob/0b204c608ca6a00b279c2aeb731a40c5d07e3357/datafusion/core/src/dataframe.rs#L804-L807).
But it's the default `supports_filter_pushdown` is responding to with
`Unsupported` thus there will not be filters passed down.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]