alamb opened a new issue, #8043:
URL: https://github.com/apache/arrow-datafusion/issues/8043

   While testing https://github.com/apache/arrow-datafusion/pull/8006 with our 
internal test suite, one of our tests fail because there are no sort 
expressions in a sort preserving repartition
   
   The input plan looks like:
   ```
   2023-11-02T15:58:06.601675Z TRACE log: Optimized physical plan by 
CombinePartialFinalAggregate:
   OutputRequirementExec
     SortExec: expr=[time@1 ASC NULLS LAST]
       CoalescePartitionsExec
         ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, 
(selector_last(sum_idle,time)@1).[value] as last, 
(selector_last(sum_system,time)@2).[value] as last_1]
           AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], 
aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)], 
ordering_mode=Sorted
             SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), 
input_partitions=16, sort_exprs=time@0 ASC NULLS LAST
               AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 
0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
                 RepartitionExec: partitioning=RoundRobinBatch(16), 
input_partitions=1
                   SortExec: expr=[time@0 ASC NULLS LAST]
                     CoalescePartitionsExec
                       ProjectionExec: expr=[time@0 as time, 
SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system]
                         AggregateExec: mode=FinalPartitioned, gby=[time@0 as 
time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                           RepartitionExec: partitioning=Hash([time@0], 16), 
input_partitions=16
                             AggregateExec: mode=Partial, 
gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), 
SUM(cpu.usage_system)]
                               RepartitionExec: 
partitioning=RoundRobinBatch(16), input_partitions=1
                                 ProjectionExec: expr=[time@1 as time, 
usage_idle@2 as usage_idle, usage_system@3 as usage_system]
                                   FilterExec: date_bin(10000000000, time@1, 0) 
<= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total
                                     ParquetExec: file_groups={1 group: 
[[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]},
 projection=[cpu, time, usage_idle, usage_system], 
predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 
1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 
1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2
   ```
   
   But then after EnforceSorting the `SortPreservingMergeExec` seems to have to 
sort exprs anymore:
   ```
   2023-11-02T15:58:06.605925Z TRACE log: Optimized physical plan by 
EnforceSorting:
   OutputRequirementExec
     SortPreservingMergeExec: [time@1 ASC NULLS LAST] 
       SortExec: expr=[time@1 ASC NULLS LAST]
         ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, 
(selector_last(sum_idle,time)@1).[value] as last, 
(selector_last(sum_system,time)@2).[value] as last_1]
           AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], 
aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
       ----> SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), 
input_partitions=16 
               AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 
0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
                 RepartitionExec: partitioning=RoundRobinBatch(16), 
input_partitions=16
                   ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 
as sum_idle, SUM(cpu.usage_system)@2 as sum_system]
                     AggregateExec: mode=FinalPartitioned, gby=[time@0 as 
time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                       RepartitionExec: partitioning=Hash([time@0], 16), 
input_partitions=16
                         AggregateExec: mode=Partial, 
gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), 
SUM(cpu.usage_system)]
                           RepartitionExec: partitioning=RoundRobinBatch(16), 
input_partitions=1
                             ProjectionExec: expr=[time@1 as time, usage_idle@2 
as usage_idle, usage_system@3 as usage_system]
                               FilterExec: date_bin(10000000000, time@1, 0) <= 
1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total
                                 ParquetExec: file_groups={1 group: 
[[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]},
 projection=[cpu, time, usage_idle, usage_system], 
predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 
1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 
1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2
   ```
   
   This then causes a failure during execution / streaming merge
   
   ```
   Internal error: Sort expressions cannot be empty for streaming merge
   ```
   
   _Originally posted by @alamb in 
https://github.com/apache/arrow-datafusion/issues/8006#issuecomment-1791024449_
               


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to