mingmwang commented on code in PR #4455:
URL: https://github.com/apache/arrow-datafusion/pull/4455#discussion_r1037112819
##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -116,14 +150,38 @@ impl ExecutionPlan for WindowAggExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- // because we can have repartitioning using the partition keys
- // this would be either 1 or more than 1 depending on the presense of
- // repartitioning
- self.input.output_partitioning()
+ // Although WindowAggExec does not change the output partitioning from
the input, but can not return the output partitioning
+ // from the input directly, need to adjust the column index to align
with the new schema.
+ let window_expr_len = self.window_expr.len();
+ let input_partitioning = self.input.output_partitioning();
+ match input_partitioning {
+ Partitioning::RoundRobinBatch(size) =>
Partitioning::RoundRobinBatch(size),
+ Partitioning::UnknownPartitioning(size) => {
+ Partitioning::UnknownPartitioning(size)
+ }
+ Partitioning::Hash(exprs, size) => {
+ let new_exprs = exprs
+ .into_iter()
+ .map(|expr| {
+ expr.transform_down(
Review Comment:
> Thank you for fixing this bug! I agree that this helps with
@mustafasrepo's example, but I also agree with @metesynnada that it does not
change the fact that the `output_ordering` and `output_partitioning` APIs do
not work inside `create_initial_plan` (do not work in the specific sense that
they may return non-final values, which can be inconsistent with other APIs
like `required_output_ordering`).
>
> So things boil down to whether we expect them to work (or not) while
inside `create_initial_plan`. @mingmwang, what do you think? Should we expect
them to return reliable values (e.g. consistent with
`required_input_ordering()` API), or should we not expect this behavior?
Based on the current design, we can not expect them to return a fixed
reliable values. Because the physical plan
tree will be adjusted and changed very dynamically, so the **real** output
partitioning and output ordering is not fixed values until pass the
Enforcement rule.
For your use cases, I think if your have specific physical operators that
have requirements on the ordering and partitioning, just specify the
requirements through the method `required_input_ordering()` and
`required_input_distribution()`. And If you have rules need to co-operator with
the output partitioning and output ordering, need to add the rules after the
Enforcement rule.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]