crepererum commented on issue #5970:
URL: 
https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1506882436

   I couldn't get the datafusion-cli to reproduce, but I think it largely 
depends on the plan and the target partitions / CPU count. So here's a code 
test that reproduces the issue on my system as well:
   
   
   ```rust
   #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   async fn test_issue5970_mini() -> Result<()> {
       let config = 
SessionConfig::new().with_target_partitions(2).with_repartition_sorts(true);
       let ctx = SessionContext::with_config(config);
       let sql = "
   WITH
       m0(t) AS (
           VALUES (0), (1), (2)),
   
       m1(t) AS (
           VALUES (0), (1)),
   
       u AS (
           SELECT 0 as m, t FROM m0 GROUP BY 1, 2), 
   
       v AS (
           SELECT 1 as m, t FROM m1 GROUP BY 1, 2)
   SELECT * FROM u
   UNION ALL
   SELECT * FROM v
   ORDER BY 1, 2;
       ";
   
       // check phys. plan
       let dataframe = ctx.sql(sql).await.unwrap();
       let plan = dataframe.into_optimized_plan().unwrap();
       let plan = ctx.state().create_physical_plan(&plan).await.unwrap();
       let expected = vec![
           "SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
           "  UnionExec",
           "    SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
           "      ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
           "        AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as 
Int64(0), t@1 as t], aggr=[]",
           "          CoalesceBatchesExec: target_batch_size=8192",
           "            RepartitionExec: partitioning=Hash([Column { name: 
\"Int64(0)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), 
input_partitions=2",
           "              AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 
as t], aggr=[]",
           "                RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
           "                  ProjectionExec: expr=[column1@0 as t]",
           "                    ValuesExec",
           "    SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]",
           "      ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
           "        AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as 
Int64(1), t@1 as t], aggr=[]",
           "          CoalesceBatchesExec: target_batch_size=8192",
           "            RepartitionExec: partitioning=Hash([Column { name: 
\"Int64(1)\", index: 0 }, Column { name: \"t\", index: 1 }], 2), 
input_partitions=2",
           "              AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 
as t], aggr=[]",
           "                RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1",
           "                  ProjectionExec: expr=[column1@0 as t]",
           "                    ValuesExec",
       ];
       let formatted = displayable(plan.as_ref()).indent().to_string();
       let actual: Vec<&str> = formatted.trim().lines().collect();
       assert_eq!(
           expected, actual,
           "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
       );
   
       // sometimes it "just works"
       for i in 0..10 {
           println!("run: {i}");
           let actual = execute_to_batches(&ctx, sql).await;
           let expected = vec![
               "+---+---+",
               "| m | t |",
               "+---+---+",
               "| 0 | 0 |",
               "| 0 | 1 |",
               "| 0 | 2 |",
               "| 1 | 0 |",
               "| 1 | 1 |",
               "+---+---+",
           ];
           assert_batches_eq!(expected, &actual);
       }
       Ok(())
   }
   ```
   
   results in:
   
   ```text
   expected:
   
   [
       "+---+---+",
       "| m | t |",
       "+---+---+",
       "| 0 | 0 |",
       "| 0 | 1 |",
       "| 0 | 2 |",
       "| 1 | 0 |",
       "| 1 | 1 |",
       "+---+---+",
   ]
   actual:
   
   [
       "+---+---+",
       "| m | t |",
       "+---+---+",
       "| 0 | 0 |",
       "| 0 | 2 |",
       "| 1 | 0 |",
       "| 0 | 1 |",
       "| 1 | 1 |",
       "+---+---+",
   ]
   ```
   
   To me, the phys. plan looks fine. I guess either `SortExec` or 
`SortPreservingMergeExec` are buggy. The issue was bisected to #5661 which 
makes use of `SortPreservingMergeExec` so my current hypothesis is that this is 
the buggy one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to