crepererum commented on issue #5970:
URL: 
https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1507189056

   So the plan is seemingly right but actual wrong:
   
   The two `SortExec`s in the above plan have two partitions each (= 4 in 
total). They are 
[hash-partitioned](https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/mod.rs#L471-L473)
 because this is what you get from the group-by operation.
   
   The `UnionExec` thinks it is "partition aware" (whatever that means):
   
   
https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/union.rs#L157-L168
   
   As a consequence, it only emits 2 output partitions (instead of 4) and 
concats the input data:
   
   
https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/union.rs#L276-L292
   
   Hence, it no longer preserve sorting. However the `EnforceSorting` optimizer 
pass thinks it can push `SortExec` safely through `UnionExec`:
   
   ```text
   [2023-04-13T15:26:06Z TRACE datafusion::physical_plan::planner] Optimized 
physical plan by EnforceDistribution:
       SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
         SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
           UnionExec
             ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]
               AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as 
Int64(0), t@1 as t], aggr=[]
                 RepartitionExec: partitioning=Hash([Column { name: "Int64(0)", 
index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2
                   AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], 
aggr=[]
                     RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                       ProjectionExec: expr=[column1@0 as t]
                         ValuesExec
             ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]
               AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as 
Int64(1), t@1 as t], aggr=[]
                 RepartitionExec: partitioning=Hash([Column { name: "Int64(1)", 
index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2
                   AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], 
aggr=[]
                     RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                       ProjectionExec: expr=[column1@0 as t]
                         ValuesExec
   
   
   [2023-04-13T15:26:06Z TRACE datafusion::physical_plan::planner] Optimized 
physical plan by EnforceSorting:
       SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
         UnionExec
           SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
             ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]
               AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as 
Int64(0), t@1 as t], aggr=[]
                 RepartitionExec: partitioning=Hash([Column { name: "Int64(0)", 
index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2
                   AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], 
aggr=[]
                     RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                       ProjectionExec: expr=[column1@0 as t]
                         ValuesExec
           SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
             ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]
               AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as 
Int64(1), t@1 as t], aggr=[]
                 RepartitionExec: partitioning=Hash([Column { name: "Int64(1)", 
index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2
                   AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], 
aggr=[]
                     RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
                       ProjectionExec: expr=[column1@0 as t]
                         ValuesExec
   ```
   
   Now we could either blame `EnforceSorting` for performing the wrong 
optimization or `UnionExec` for having some life on its own while seemingly 
being a really simple node (who would have expected that it concats input 
batches from different input partitions?!). I would argue that `UnionExec` 
should NEVER modify its inputs but just be a plain, simple node that forwards 
its inputs w/o messing up sorting (or any other property).
   
   CC @mustafasrepo who authored #5661 (= making `EnforceSorting` smarter) and 
@mingmwang who taught `UnionExec` to be "partition aware" in #4043.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to