alamb opened a new issue, #8043: URL: https://github.com/apache/arrow-datafusion/issues/8043
While testing https://github.com/apache/arrow-datafusion/pull/8006 with our internal test suite, one of our tests fail because there are no sort expressions in a sort preserving repartition The input plan looks like: ``` 2023-11-02T15:58:06.601675Z TRACE log: Optimized physical plan by CombinePartialFinalAggregate: OutputRequirementExec SortExec: expr=[time@1 ASC NULLS LAST] CoalescePartitionsExec ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, (selector_last(sum_idle,time)@1).[value] as last, (selector_last(sum_system,time)@2).[value] as last_1] AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)], ordering_mode=Sorted SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16, sort_exprs=time@0 ASC NULLS LAST AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)] RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 SortExec: expr=[time@0 ASC NULLS LAST] CoalescePartitionsExec ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system] AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)] RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16 AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)] RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 ProjectionExec: expr=[time@1 as time, usage_idle@2 as usage_idle, usage_system@3 as usage_system] FilterExec: date_bin(10000000000, time@1, 0) <= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total ParquetExec: file_groups={1 group: [[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]}, projection=[cpu, time, usage_idle, usage_system], predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2 ``` But then after EnforceSorting the `SortPreservingMergeExec` seems to have to sort exprs anymore: ``` 2023-11-02T15:58:06.605925Z TRACE log: Optimized physical plan by EnforceSorting: OutputRequirementExec SortPreservingMergeExec: [time@1 ASC NULLS LAST] SortExec: expr=[time@1 ASC NULLS LAST] ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, (selector_last(sum_idle,time)@1).[value] as last, (selector_last(sum_system,time)@2).[value] as last_1] AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)] ----> SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16 AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)] RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=16 ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system] AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)] RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16 AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)] RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1 ProjectionExec: expr=[time@1 as time, usage_idle@2 as usage_idle, usage_system@3 as usage_system] FilterExec: date_bin(10000000000, time@1, 0) <= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total ParquetExec: file_groups={1 group: [[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]}, projection=[cpu, time, usage_idle, usage_system], predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2 ``` This then causes a failure during execution / streaming merge ``` Internal error: Sort expressions cannot be empty for streaming merge ``` _Originally posted by @alamb in https://github.com/apache/arrow-datafusion/issues/8006#issuecomment-1791024449_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
