ozankabak commented on code in PR #4455:
URL: https://github.com/apache/arrow-datafusion/pull/4455#discussion_r1037145029
##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -116,14 +150,38 @@ impl ExecutionPlan for WindowAggExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- // because we can have repartitioning using the partition keys
- // this would be either 1 or more than 1 depending on the presense of
- // repartitioning
- self.input.output_partitioning()
+ // Although WindowAggExec does not change the output partitioning from
the input, but can not return the output partitioning
+ // from the input directly, need to adjust the column index to align
with the new schema.
+ let window_expr_len = self.window_expr.len();
+ let input_partitioning = self.input.output_partitioning();
+ match input_partitioning {
+ Partitioning::RoundRobinBatch(size) =>
Partitioning::RoundRobinBatch(size),
+ Partitioning::UnknownPartitioning(size) => {
+ Partitioning::UnknownPartitioning(size)
+ }
+ Partitioning::Hash(exprs, size) => {
+ let new_exprs = exprs
+ .into_iter()
+ .map(|expr| {
+ expr.transform_down(
Review Comment:
Thanks for clarifying, much appreciated. @mustafasrepo will provide a
pointer to our relevant code so that you can comment.
To give you a little more context beforehand: We are implementing a
pipelineable version of `WindowAggExec`, but whether we can use this more
efficient version in the physical plan depends on the output ordering of the
previous operator. My understanding of your suggestion is that we should do
this within the context of a new rule that replaces the pipeline-breaking
operator with the new pipelineable operator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]