crepererum commented on issue #5970: URL: https://github.com/apache/arrow-datafusion/issues/5970#issuecomment-1507189056
So the plan is seemingly right but actual wrong: The two `SortExec`s in the above plan have two partitions each (= 4 in total). They are [hash-partitioned](https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/mod.rs#L471-L473) because this is what you get from the group-by operation. The `UnionExec` thinks it is "partition aware" (whatever that means): https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/union.rs#L157-L168 As a consequence, it only emits 2 output partitions (instead of 4) and concats the input data: https://github.com/apache/arrow-datafusion/blob/fcd8b899e2a62f798413c536f47078289ece9d05/datafusion/core/src/physical_plan/union.rs#L276-L292 Hence, it no longer preserve sorting. However the `EnforceSorting` optimizer pass thinks it can push `SortExec` safely through `UnionExec`: ```text [2023-04-13T15:26:06Z TRACE datafusion::physical_plan::planner] Optimized physical plan by EnforceDistribution: SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST] SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST] UnionExec ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t] AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[] RepartitionExec: partitioning=Hash([Column { name: "Int64(0)", index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2 AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[] RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ProjectionExec: expr=[column1@0 as t] ValuesExec ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t] AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[] RepartitionExec: partitioning=Hash([Column { name: "Int64(1)", index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2 AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[] RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ProjectionExec: expr=[column1@0 as t] ValuesExec [2023-04-13T15:26:06Z TRACE datafusion::physical_plan::planner] Optimized physical plan by EnforceSorting: SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC NULLS LAST] UnionExec SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST] ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t] AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[] RepartitionExec: partitioning=Hash([Column { name: "Int64(0)", index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2 AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[] RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ProjectionExec: expr=[column1@0 as t] ValuesExec SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST] ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t] AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[] RepartitionExec: partitioning=Hash([Column { name: "Int64(1)", index: 0 }, Column { name: "t", index: 1 }], 2), input_partitions=2 AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[] RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ProjectionExec: expr=[column1@0 as t] ValuesExec ``` Now we could either blame `EnforceSorting` for performing the wrong optimization or `UnionExec` for having some life on its own while seemingly being a really simple node (who would have expected that it concats input batches from different input partitions?!). I would argue that `UnionExec` should NEVER modify its inputs but just be a plain, simple node that forwards its inputs w/o messing up sorting (or any other property). CC @mustafasrepo who authored #5661 (= making `EnforceSorting` smarter) and @mingmwang who taught `UnionExec` to be "partition aware" in #4043. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
