alamb commented on issue #8738:
URL: 
https://github.com/apache/arrow-datafusion/issues/8738#issuecomment-1879003042

   Ok, I have figured out what is going on here
   
   The original plan is here
   
   <details><summary>Details</summary>
   <p>
   
   ```
   | physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST,tag_id@1 
ASC NULLS LAST]                                                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                                 |
   |               |   SortExec: expr=[time@0 ASC NULLS LAST,tag_id@1 ASC NULLS 
LAST]                                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |     ProjectionExec: expr=[timestamp@0 as time, tag_id@1 as 
tag_id, field@2 as field, value@3 as value]                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |       AggregateExec: mode=FinalPartitioned, 
gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as 
value], aggr=[]                                                                 
                                                                                
                                                                                
                                                                                
                                                              |
   |               |         CoalesceBatchesExec: target_batch_size=8192        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |           RepartitionExec: partitioning=Hash([timestamp@0, 
tag_id@1, field@2, value@3], 16), input_partitions=32                           
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |             AggregateExec: mode=Partial, gby=[timestamp@0 
as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as value], aggr=[] 
<****** This is the aggregate that is hitting the problem                       
                                                                                
                                                                                
                                                                   |
   |               |               UnionExec                                    
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |                 ProjectionExec: expr=[timestamp@0 as 
timestamp, tag_id@1 as tag_id, field@2 as field, CAST(value@3 AS Utf8) as 
value]                                                                          
                                                                                
                                                                                
                                                                                
                                                           |
   |               |                   AggregateExec: mode=FinalPartitioned, 
gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as 
value], aggr=[]                                                                 
                                                                                
                                                                                
                                                                                
                                                  |
   |               |                     CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                 |
   |               |                       RepartitionExec: 
partitioning=Hash([timestamp@0, tag_id@1, field@2, value@3], 16), 
input_partitions=32                                                             
                                                                                
                                                                                
                                                                                
                                                                                
 |
   |               |                         AggregateExec: mode=Partial, 
gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as 
value], aggr=[]                                                                 
                                                                                
                                                                                
                                                                                
                                                     |
   |               |                           UnionExec                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |                             ProjectionExec: expr=[time@2 
as timestamp, tag_id@0 as tag_id, active_power as field, f5@1 as value]         
                                                                                
                                                                                
                                                                                
                                                                                
                                                 |
   |               |                               ProjectionExec: 
expr=[CAST(column2@1 AS Dictionary(Int32, Utf8)) as tag_id, CAST(column3@2 AS 
Float64) as f5, CAST(column4@3 AS Timestamp(Nanosecond, None)) as time]         
                                                                                
                                                                                
                                                                                
                                                              |
   |               |                                 RepartitionExec: 
partitioning=RoundRobinBatch(16), input_partitions=1                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                         |
   |               |                                   CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                   |
   |               |                                     FilterExec: column4@3 
>= 1701783995000000000 AND column4@3 < 1704289595000000000 AND CAST(column3@2 
AS Float64) IS NOT NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = active 
AND CAST(column2@1 AS Dictionary(Int32, Utf8)) = 1000                           
                                                                                
                                                                                
                                                  |
   |               |                                       ValuesExec           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |                             ProjectionExec: expr=[time@2 
as timestamp, tag_id@0 as tag_id, f1 as field, f1@1 as value]                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                 |
   |               |                               ProjectionExec: 
expr=[CAST(column1@0 AS Dictionary(Int32, Utf8)) as tag_id, CAST(column2@1 AS 
Float64) as f1, CAST(column6@2 AS Timestamp(Nanosecond, None)) as time]         
                                                                                
                                                                                
                                                                                
                                                              |
   |               |                                 RepartitionExec: 
partitioning=RoundRobinBatch(16), input_partitions=1                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                         |
   |               |                                   CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                   |
   |               |                                     FilterExec: column6@2 
>= 1701783995000000000 AND column6@2 < 1704289595000000000 AND CAST(column2@1 
AS Float64) IS NOT NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = 1000   
                                                                                
                                                                                
                                                                                
                                                  |
   |               |                                       ProjectionExec: 
expr=[column1@0 as column1, column2@1 as column2, column6@5 as column6]         
                                                                                
                                                                                
                                                                                
                                                                                
                                                    |
   |               |                                         ValuesExec         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |                 ProjectionExec: expr=[time@2 as timestamp, 
tag_id@0 as tag_id, f2 as field, f2@1 as value]                                 
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |                   ProjectionExec: expr=[CAST(column1@0 AS 
Dictionary(Int32, Utf8)) as tag_id, column3@1 as f2, CAST(column6@2 AS 
Timestamp(Nanosecond, None)) as time]                                           
                                                                                
                                                                                
                                                                                
                                                         |
   |               |                     RepartitionExec: 
partitioning=RoundRobinBatch(16), input_partitions=1                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                     |
   |               |                       CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                               |
   |               |                         FilterExec: column6@2 >= 
1701783995000000000 AND column6@2 < 1704289595000000000 AND column3@1 IS NOT 
NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = 1000                      
                                                                                
                                                                                
                                                                                
                                                            |
   |               |                           ProjectionExec: expr=[column1@0 
as column1, column3@2 as column3, column6@5 as column6]                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                |
   |               |                             ValuesExec                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                               |
   
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 rows in set. Query took 0.116 seconds.
   
   ```
   
   </p>
   </details> 
   
   The `AggregateExec` that is having the issue is reading from a `UnionExec` 
that looks like this
   
   ```
   UnionExec
     ProjectionExec: expr=[timestamp@0 as timestamp, tag_id@1 as tag_id, 
field@2 as field, CAST(value@3 AS Utf8) as value]
       AggregateExec: mode=Final, gby=[timestamp@0 as timestamp, tag_id@1 as 
tag_id, field@2 as field, value@3 as value], aggr=[]
         CoalescePartitionsExec
          ... <snip> ...
     ProjectionExec: expr=[time@2 as timestamp, tag_id@0 as tag_id, f2 as 
field, f2@1 as value]
       ProjectionExec: expr=[CAST(column1@0 AS Dictionary(Int32, Utf8)) as 
tag_id, column3@1 as f2, CAST(column6@2 AS Timestamp(Nanosecond, None)) as time]
         CoalesceBatchesExec: target_batch_size=8192
           FilterExec: column6@2 >= 1701783995000000000 AND column6@2 < 
1704289595000000000 AND column3@1 IS NOT NULL AND CAST(column1@0 AS 
Dictionary(Int32, Utf8)) = 1000
             ProjectionExec: expr=[column1@0 as column1, column3@2 as column3, 
column6@5 as column6]
               ValuesExec
   ```
   
   The problem here is that after 
https://github.com/apache/arrow-datafusion/pull/8291 the inputs to the 
UnionExec produce different schemas. 
   * The first input has an `AggregateExec` now produces a `tag_id` column that 
is `Utf8` (as that is the materialized version of the code)
   * The second input has no `AggregateExec` and passes through the `tag_id` 
column that is still `Dictionary(Int32, Utf8)` encoded. 
   
   
   So the `UnionExec` cod reports that it will produce `tag_id` as a `Utf8` 
column but some of the batches contain `Utf8` and some are `Dictionary(Int32, 
Utf8)` which is what causes the runtime error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to