berkaysynnada opened a new issue, #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492
### Describe the bug
`SELECT SUM(c1) OVER(ORDER BY c1) as sum1 INTO new_table FROM
annotated_data_infinite`
has no problem but
`SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM
annotated_data_infinite`
gives an error: **Plan("Mismatch between schema and batches")**.
The reason is that in `MemTable::try_new()`, the schema and partitions'
schema don't match. I have tracked the reason and saw that the schema, which is
created from the input LogicalPlan, has fields whose names are the result of
[display_name()](https://github.com/apache/arrow-datafusion/blob/c7bfe15b4940ebff39f466212ccf32e891db7243/datafusion/expr/src/expr_schema.rs#L283)
(It writes the whole expression, func + window specs). However, the
RecordBatch's fields of partitions are the result of
[physical_name()](https://github.com/apache/arrow-datafusion/blob/c7bfe15b4940ebff39f466212ccf32e891db7243/datafusion/core/src/physical_plan/planner.rs#L1601),
in case of no alias. (It writes only the function part of the expr).
### To Reproduce
SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite
### Expected behavior
I have 2 solution approaches:
1) `create_window_expr()` gives the name with `display_name()` while
constructing the window expr. However, there needs to be many changes in tests,
and the exec lines will become too long like:
`ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW@3 as MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW,
SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@4 as SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2
ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9) ORDER BY
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST]
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]`
Maybe Display's can be changed?
2) `contains()` function implemented for `Schema` is used in try_new() of
MemTable to match fields, and it uses `contains()` implemented for `Field`. It
checks one-to-one equalities of all elements in the Field struct. Just for the
name element, we can reduce the equality to something like
`schema_field.name().starts_with(partition_field.name())`. If we do not prefer
changing `contains()` function, maybe we can write some specialized function
like
```
fn validate_partitions_wth_schema(schema: &SchemaRef, partitions:
&Vec<Vec<RecordBatch>>) -> bool {
if !partitions.iter().flatten().all(|p| p.schema().fields().len() ==
schema.fields().len())
{ return false; }
for partition in partitions.iter().flatten() {
for (schema_field, partition_field) in
schema.fields().iter().zip(partition.schema().fields().iter())
{
if !schema_field.name().starts_with(partition_field.name()) ||
schema_field.data_type() != partition_field.data_type()
{ return false; }
}
}
true
}
```
But this approach also does not seem solid to me.
### Additional context
Any advice is welcomed. I will solve the issue when we reach a common ground.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]