crepererum commented on issue #5970:
URL:
https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1506882436
I couldn't get the datafusion-cli to reproduce, but I think it largely
depends on the plan and the target partitions / CPU count. So here's a code
test that reproduces the issue on my system as well:
```rust
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_issue5970_mini() -> Result<()> {
let config =
SessionConfig::new().with_target_partitions(2).with_repartition_sorts(true);
let ctx = SessionContext::with_config(config);
let sql = "
WITH
m0(t) AS (
VALUES (0), (1), (2)),
m1(t) AS (
VALUES (0), (1)),
u AS (
SELECT 0 as m, t FROM m0 GROUP BY 1, 2),
v AS (
SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
SELECT * FROM u
UNION ALL
SELECT * FROM v
ORDER BY 1, 2;
";
// check phys. plan
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
let expected = vec![
"SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
" UnionExec",
" SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
" ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as
Int64(0), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name:
\"Int64(0)\", index: 0 }, Column { name: \"t\", index: 1 }], 2),
input_partitions=2",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0
as t], aggr=[]",
" RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
" ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as
Int64(1), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([Column { name:
\"Int64(1)\", index: 0 }, Column { name: \"t\", index: 1 }], 2),
input_partitions=2",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0
as t], aggr=[]",
" RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
];
let formatted = displayable(plan.as_ref()).indent().to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
assert_eq!(
expected, actual,
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
// sometimes it "just works"
for i in 0..10 {
println!("run: {i}");
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+---+---+",
"| m | t |",
"+---+---+",
"| 0 | 0 |",
"| 0 | 1 |",
"| 0 | 2 |",
"| 1 | 0 |",
"| 1 | 1 |",
"+---+---+",
];
assert_batches_eq!(expected, &actual);
}
Ok(())
}
```
results in:
```text
expected:
[
"+---+---+",
"| m | t |",
"+---+---+",
"| 0 | 0 |",
"| 0 | 1 |",
"| 0 | 2 |",
"| 1 | 0 |",
"| 1 | 1 |",
"+---+---+",
]
actual:
[
"+---+---+",
"| m | t |",
"+---+---+",
"| 0 | 0 |",
"| 0 | 2 |",
"| 1 | 0 |",
"| 0 | 1 |",
"| 1 | 1 |",
"+---+---+",
]
```
To me, the phys. plan looks fine. I guess either `SortExec` or
`SortPreservingMergeExec` are buggy. The issue was bisected to #5661 which
makes use of `SortPreservingMergeExec` so my current hypothesis is that this is
the buggy one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]