mingmwang commented on code in PR #4455:
URL: https://github.com/apache/arrow-datafusion/pull/4455#discussion_r1037112819


##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -116,14 +150,38 @@ impl ExecutionPlan for WindowAggExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        // because we can have repartitioning using the partition keys
-        // this would be either 1 or more than 1 depending on the presense of
-        // repartitioning
-        self.input.output_partitioning()
+        // Although WindowAggExec does not change the output partitioning from 
the input, but can not return the output partitioning
+        // from the input directly, need to adjust the column index to align 
with the new schema.
+        let window_expr_len = self.window_expr.len();
+        let input_partitioning = self.input.output_partitioning();
+        match input_partitioning {
+            Partitioning::RoundRobinBatch(size) => 
Partitioning::RoundRobinBatch(size),
+            Partitioning::UnknownPartitioning(size) => {
+                Partitioning::UnknownPartitioning(size)
+            }
+            Partitioning::Hash(exprs, size) => {
+                let new_exprs = exprs
+                    .into_iter()
+                    .map(|expr| {
+                        expr.transform_down(

Review Comment:
   > Thank you for fixing this bug! I agree that this helps with 
@mustafasrepo's example, but I also agree with @metesynnada that it does not 
change the fact that the `output_ordering` and `output_partitioning` APIs do 
not work inside `create_initial_plan` (do not work in the specific sense that 
they may return non-final values, which can be inconsistent with other APIs 
like `required_output_ordering`).
   > 
   > So things boil down to whether we expect them to work (or not) while 
inside `create_initial_plan`. @mingmwang, what do you think? Should we expect 
them to return reliable values (e.g. consistent with 
`required_input_ordering()` API), or should we not expect this behavior?
   
   Based on the current design, we can not expect them to return a fixed 
reliable values. Because the physical plan
   tree will be adjusted and changed very dynamically, so the **real** output 
partitioning and output ordering is not fixed  values until pass the 
Enforcement rule.
   
   For your use cases, I think if your have specific physical operators that 
have requirements on the ordering and partitioning, just specify the 
requirements through the method `required_input_ordering()` and 
`required_input_distribution()`. And If you have rules need to co-operator with 
the output partitioning and output ordering,  need to add the rules after the 
Enforcement rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to