alamb commented on issue #8738: URL: https://github.com/apache/arrow-datafusion/issues/8738#issuecomment-1879003042
Ok, I have figured out what is going on here
The original plan is here
<details><summary>Details</summary>
<p>
```
| physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST,tag_id@1
ASC NULLS LAST]
|
| | SortExec: expr=[time@0 ASC NULLS LAST,tag_id@1 ASC NULLS
LAST]
|
| | ProjectionExec: expr=[timestamp@0 as time, tag_id@1 as
tag_id, field@2 as field, value@3 as value]
|
| | AggregateExec: mode=FinalPartitioned,
gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as
value], aggr=[]
|
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec: partitioning=Hash([timestamp@0,
tag_id@1, field@2, value@3], 16), input_partitions=32
|
| | AggregateExec: mode=Partial, gby=[timestamp@0
as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as value], aggr=[]
<****** This is the aggregate that is hitting the problem
|
| | UnionExec
|
| | ProjectionExec: expr=[timestamp@0 as
timestamp, tag_id@1 as tag_id, field@2 as field, CAST(value@3 AS Utf8) as
value]
|
| | AggregateExec: mode=FinalPartitioned,
gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as
value], aggr=[]
|
| | CoalesceBatchesExec:
target_batch_size=8192
|
| | RepartitionExec:
partitioning=Hash([timestamp@0, tag_id@1, field@2, value@3], 16),
input_partitions=32
|
| | AggregateExec: mode=Partial,
gby=[timestamp@0 as timestamp, tag_id@1 as tag_id, field@2 as field, value@3 as
value], aggr=[]
|
| | UnionExec
|
| | ProjectionExec: expr=[time@2
as timestamp, tag_id@0 as tag_id, active_power as field, f5@1 as value]
|
| | ProjectionExec:
expr=[CAST(column2@1 AS Dictionary(Int32, Utf8)) as tag_id, CAST(column3@2 AS
Float64) as f5, CAST(column4@3 AS Timestamp(Nanosecond, None)) as time]
|
| | RepartitionExec:
partitioning=RoundRobinBatch(16), input_partitions=1
|
| | CoalesceBatchesExec:
target_batch_size=8192
|
| | FilterExec: column4@3
>= 1701783995000000000 AND column4@3 < 1704289595000000000 AND CAST(column3@2
AS Float64) IS NOT NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = active
AND CAST(column2@1 AS Dictionary(Int32, Utf8)) = 1000
|
| | ValuesExec
|
| | ProjectionExec: expr=[time@2
as timestamp, tag_id@0 as tag_id, f1 as field, f1@1 as value]
|
| | ProjectionExec:
expr=[CAST(column1@0 AS Dictionary(Int32, Utf8)) as tag_id, CAST(column2@1 AS
Float64) as f1, CAST(column6@2 AS Timestamp(Nanosecond, None)) as time]
|
| | RepartitionExec:
partitioning=RoundRobinBatch(16), input_partitions=1
|
| | CoalesceBatchesExec:
target_batch_size=8192
|
| | FilterExec: column6@2
>= 1701783995000000000 AND column6@2 < 1704289595000000000 AND CAST(column2@1
AS Float64) IS NOT NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = 1000
|
| | ProjectionExec:
expr=[column1@0 as column1, column2@1 as column2, column6@5 as column6]
|
| | ValuesExec
|
| | ProjectionExec: expr=[time@2 as timestamp,
tag_id@0 as tag_id, f2 as field, f2@1 as value]
|
| | ProjectionExec: expr=[CAST(column1@0 AS
Dictionary(Int32, Utf8)) as tag_id, column3@1 as f2, CAST(column6@2 AS
Timestamp(Nanosecond, None)) as time]
|
| | RepartitionExec:
partitioning=RoundRobinBatch(16), input_partitions=1
|
| | CoalesceBatchesExec:
target_batch_size=8192
|
| | FilterExec: column6@2 >=
1701783995000000000 AND column6@2 < 1704289595000000000 AND column3@1 IS NOT
NULL AND CAST(column1@0 AS Dictionary(Int32, Utf8)) = 1000
|
| | ProjectionExec: expr=[column1@0
as column1, column3@2 as column3, column6@5 as column6]
|
| | ValuesExec
|
| |
|
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 rows in set. Query took 0.116 seconds.
```
</p>
</details>
The `AggregateExec` that is having the issue is reading from a `UnionExec`
that looks like this
```
UnionExec
ProjectionExec: expr=[timestamp@0 as timestamp, tag_id@1 as tag_id,
field@2 as field, CAST(value@3 AS Utf8) as value]
AggregateExec: mode=Final, gby=[timestamp@0 as timestamp, tag_id@1 as
tag_id, field@2 as field, value@3 as value], aggr=[]
CoalescePartitionsExec
... <snip> ...
ProjectionExec: expr=[time@2 as timestamp, tag_id@0 as tag_id, f2 as
field, f2@1 as value]
ProjectionExec: expr=[CAST(column1@0 AS Dictionary(Int32, Utf8)) as
tag_id, column3@1 as f2, CAST(column6@2 AS Timestamp(Nanosecond, None)) as time]
CoalesceBatchesExec: target_batch_size=8192
FilterExec: column6@2 >= 1701783995000000000 AND column6@2 <
1704289595000000000 AND column3@1 IS NOT NULL AND CAST(column1@0 AS
Dictionary(Int32, Utf8)) = 1000
ProjectionExec: expr=[column1@0 as column1, column3@2 as column3,
column6@5 as column6]
ValuesExec
```
The problem here is that after
https://github.com/apache/arrow-datafusion/pull/8291 the inputs to the
UnionExec produce different schemas.
* The first input has an `AggregateExec` now produces a `tag_id` column that
is `Utf8` (as that is the materialized version of the code)
* The second input has no `AggregateExec` and passes through the `tag_id`
column that is still `Dictionary(Int32, Utf8)` encoded.
So the `UnionExec` cod reports that it will produce `tag_id` as a `Utf8`
column but some of the batches contain `Utf8` and some are `Dictionary(Int32,
Utf8)` which is what causes the runtime error
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
