wangxiaoying opened a new issue, #2271:
URL: https://github.com/apache/arrow-datafusion/issues/2271
**Describe the bug**
Currently the join result might be incorrect when the predicate contains OR
operation, since the filter will be pushed down.
**To Reproduce**
```rust
fn main() {
let ctx = SessionContext::new();
let id64_array = Int64Array::from(vec![Some(1), Some(2), Some(0),
Some(3), Some(4)]);
let str_array = StringArray::from(vec!["1", "2", "3", "4", "5"]);
let schema = Schema::new(vec![
Field::new("test_int", DataType::Int64, true),
Field::new("str", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(id64_array), Arc::new(str_array)],
)
.unwrap();
let db1 = MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap();
ctx.register_table("t1", Arc::new(db1)).unwrap();
let sql = "select * from t1 inner join t1 tmp on t1.test_int =
tmp.test_int and t1.str = '3' OR tmp.str = '1000'";
let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to
create runtime"));
let df = rt.block_on(ctx.sql(sql)).unwrap();
rt.block_on(df.limit(5).unwrap().explain(false, false).unwrap().show())
.unwrap();
rt.block_on(df.limit(5).unwrap().show()).unwrap();
let num_rows = rt
.block_on(df.collect())
.unwrap()
.into_iter()
.map(|rb| rb.num_rows())
.sum::<usize>();
println!("Final # rows: {}", num_rows);
}
```
Result is
```
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: 5
|
| | Projection: #t1.test_int, #t1.str, #tmp.test_int,
#tmp.str
|
| | Inner Join: #t1.test_int = #tmp.test_int
|
| | Filter: #t1.str = Utf8("3")
|
| | TableScan: t1 projection=Some([0, 1])
|
| | Filter: #tmp.str = Utf8("1000")
|
| | SubqueryAlias: tmp
|
| | TableScan: t1 projection=Some([0, 1])
|
| physical_plan | GlobalLimitExec: limit=5
|
| | CoalescePartitionsExec
|
| | LocalLimitExec: limit=5
|
| | ProjectionExec: expr=[test_int@0 as test_int, str@1
as str, test_int@2 as test_int, str@3 as str]
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(Column { name: "test_int", index: 0 }, Column { name: "test_int", index: 0
})] |
| | CoalesceBatchesExec: target_batch_size=4096
|
| | RepartitionExec: partitioning=Hash([Column {
name: "test_int", index: 0 }], 8)
|
| | CoalesceBatchesExec:
target_batch_size=4096
|
| | FilterExec: str@1 = 3
|
| | RepartitionExec:
partitioning=RoundRobinBatch(8)
|
| | MemoryExec: partitions=1,
partition_sizes=[1]
|
| | CoalesceBatchesExec: target_batch_size=4096
|
| | RepartitionExec: partitioning=Hash([Column {
name: "test_int", index: 0 }], 8)
|
| | CoalesceBatchesExec:
target_batch_size=4096
|
| | FilterExec: str@1 = 1000
|
| | RepartitionExec:
partitioning=RoundRobinBatch(8)
|
| | MemoryExec: partitions=1,
partition_sizes=[1]
|
| |
|
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
++
++
Final # rows: 0
```
**Expected behavior**
The result should look like
```
+---------------+-------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: 5
|
| | Projection: #t1.test_int, #t1.str, #tmp.test_int,
#tmp.str |
| | Filter: #t1.test_int = #tmp.test_int AND #t1.str =
Utf8("3") OR #tmp.str = Utf8("1000") |
| | CrossJoin:
|
| | TableScan: t1 projection=Some([0, 1])
|
| | SubqueryAlias: tmp
|
| | TableScan: t1 projection=Some([0, 1])
|
| physical_plan | GlobalLimitExec: limit=5
|
| | CoalescePartitionsExec
|
| | ProjectionExec: expr=[test_int@0 as test_int, str@1 as
str, test_int@2 as test_int, str@3 as str] |
| | CoalesceBatchesExec: target_batch_size=4096
|
| | FilterExec: test_int@0 = test_int@2 AND str@1 = 3
OR str@3 = 1000 |
| | CrossJoinExec
|
| | RepartitionExec:
partitioning=RoundRobinBatch(8) |
| | MemoryExec: partitions=1,
partition_sizes=[1] |
| | RepartitionExec:
partitioning=RoundRobinBatch(8) |
| | MemoryExec: partitions=1,
partition_sizes=[1] |
| |
|
+---------------+-------------------------------------------------------------------------------------------------------+
+----------+-----+----------+-----+
| test_int | str | test_int | str |
+----------+-----+----------+-----+
| 0 | 3 | 0 | 3 |
+----------+-----+----------+-----+
Final # rows: 1
```
Where the predicate `Filter: #t1.str = Utf8("3")` should not be pushed down
due to the `OR`.
I created a similar table in postgres and apply a similar query, here is the
correct result:
<img width="1082" alt="image"
src="https://user-images.githubusercontent.com/5569610/163919342-c8d23585-03f2-4ce2-a5ac-e043c56282b1.png">
**Additional context**
These two queries should output the same result:
q1: `select * from t1 cross join t1 tmp where t1.test_int = tmp.test_int and
t1.str = '3' OR tmp.str = '1000'`
q2: `select * from t1 inner join t1 tmp on t1.test_int = tmp.test_int and
t1.str = '3' OR tmp.str = '1000'`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]