berkaysynnada opened a new issue, #6492:
URL: https://github.com/apache/arrow-datafusion/issues/6492

   ### Describe the bug
   
   `SELECT SUM(c1) OVER(ORDER BY c1) as sum1 INTO new_table FROM 
annotated_data_infinite` 
   has no problem but
   `SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM 
annotated_data_infinite` 
   gives an error: **Plan("Mismatch between schema and batches")**. 
   
   The reason is that in `MemTable::try_new()`, the schema and partitions' 
schema don't match. I have tracked the reason and saw that the schema, which is 
created from the input LogicalPlan, has fields whose names are the result of 
[display_name()](https://github.com/apache/arrow-datafusion/blob/c7bfe15b4940ebff39f466212ccf32e891db7243/datafusion/expr/src/expr_schema.rs#L283)
 (It writes the whole expression, func + window specs). However, the 
RecordBatch's fields of partitions are the result of 
[physical_name()](https://github.com/apache/arrow-datafusion/blob/c7bfe15b4940ebff39f466212ccf32e891db7243/datafusion/core/src/physical_plan/planner.rs#L1601),
 in case of no alias. (It writes only the function part of the expr). 
   
   
   ### To Reproduce
   
   SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite
   
   ### Expected behavior
   
   I have 2 solution approaches:
   
   1) `create_window_expr()` gives the name with `display_name()` while 
constructing the window expr. However, there needs to be many changes in tests, 
and the exec lines will become too long like:
   `ProjectionExec: expr=[c2@0 as c2, MAX(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW@3 as MAX(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, 
SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@4 as SUM(aggregate_test_100.c9) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING, MIN(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c2 
ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW@2 as MIN(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c9 ASC NULLS LAST] 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]`
   Maybe Display's can be changed?
   
   2) `contains()` function implemented for `Schema` is used in try_new() of 
MemTable to match fields, and it uses `contains()` implemented for `Field`. It 
checks one-to-one equalities of all elements in the Field struct. Just for the 
name element, we can reduce the equality to something like 
`schema_field.name().starts_with(partition_field.name())`. If we do not prefer 
changing `contains()` function, maybe we can write some specialized function 
like
   ```
   fn validate_partitions_wth_schema(schema: &SchemaRef, partitions: 
&Vec<Vec<RecordBatch>>) -> bool {
       if !partitions.iter().flatten().all(|p| p.schema().fields().len() == 
schema.fields().len())
       { return false; }
       for partition in partitions.iter().flatten() {
           for (schema_field, partition_field) in 
schema.fields().iter().zip(partition.schema().fields().iter())
           {
               if !schema_field.name().starts_with(partition_field.name()) || 
                   schema_field.data_type() != partition_field.data_type()
               { return false; }
           }
       }
       true
   }
   ```
   But this approach also does not seem solid to me.
   
   ### Additional context
   
   Any advice is welcomed. I will solve the issue when we reach a common ground.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to