ozankabak commented on code in PR #4455:
URL: https://github.com/apache/arrow-datafusion/pull/4455#discussion_r1037054965
##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -73,13 +77,43 @@ impl WindowAggExec {
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
+ let window_expr_len = window_expr.len();
+ // Although WindowAggExec does not change the output ordering from the
input, but can not return the output ordering
+ // from the input directly, need to adjust the column index to align
with the new schema.
+ let output_ordering = input
+ .output_ordering()
+ .map(|sort_exprs| {
+ let new_sort_exprs: Result<Vec<PhysicalSortExpr>> = sort_exprs
+ .iter()
+ .map(|e| {
+ let new_expr = e.expr.clone().transform_down(&|e|
match e
+ .as_any()
+ .downcast_ref::<Column>()
+ {
+ Some(col) => Ok(Some(Arc::new(Column::new(
+ col.name(),
+ window_expr_len + col.index(),
+ )))),
+ None => Ok(None),
+ });
Review Comment:
This is not very important, but it may improve readability and be a little
more idiomatic:
```rust
let new_expr = e.expr.clone().transform_down(&|e| {
Ok(e.as_any().downcast_ref::<Column>().map(|col| {
Arc::new(Column::new(
col.name(),
window_expr_len + col.index(),
))
as Arc<dyn PhysicalExpr>
}))
});
```
##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -116,14 +150,38 @@ impl ExecutionPlan for WindowAggExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- // because we can have repartitioning using the partition keys
- // this would be either 1 or more than 1 depending on the presense of
- // repartitioning
- self.input.output_partitioning()
+ // Although WindowAggExec does not change the output partitioning from
the input, but can not return the output partitioning
+ // from the input directly, need to adjust the column index to align
with the new schema.
+ let window_expr_len = self.window_expr.len();
+ let input_partitioning = self.input.output_partitioning();
+ match input_partitioning {
+ Partitioning::RoundRobinBatch(size) =>
Partitioning::RoundRobinBatch(size),
+ Partitioning::UnknownPartitioning(size) => {
+ Partitioning::UnknownPartitioning(size)
+ }
+ Partitioning::Hash(exprs, size) => {
+ let new_exprs = exprs
+ .into_iter()
+ .map(|expr| {
+ expr.transform_down(
Review Comment:
In the same vein, it may be a good idea to use:
```rust
expr.transform_down(&|e| {
Ok(e.as_any().downcast_ref::<Column>().map(|col| {
Arc::new(Column::new(
col.name(),
window_expr_len + col.index(),
))
as Arc<dyn PhysicalExpr>
}))
})
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]