mustafasrepo commented on code in PR #4691:
URL: https://github.com/apache/arrow-datafusion/pull/4691#discussion_r1056221676
##########
datafusion/core/src/physical_plan/windows/window_agg_exec.rs:
##########
@@ -368,16 +397,58 @@ impl WindowAggStream {
let batch = concat_batches(&self.input.schema(), &self.batches)?;
- // calculate window cols
- let mut columns = compute_window_aggregates(&self.window_expr, &batch)
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+ let partition_by_sort_keys = self
+ .partition_by_sort_keys
+ .iter()
+ .map(|elem| elem.evaluate_to_sort_column(&batch))
+ .collect::<Result<Vec<_>>>()?;
+ let partition_points =
+ self.evaluate_partition_points(batch.num_rows(),
&partition_by_sort_keys)?;
+
+ let mut partition_results = vec![];
+ // Calculate window cols
+ for partition_point in partition_points {
+ let length = partition_point.end - partition_point.start;
+ partition_results.push(
+ compute_window_aggregates(
+ &self.window_expr,
+ &batch.slice(partition_point.start, length),
+ )
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))?,
+ )
+ }
+ let mut columns = transpose(partition_results)
Review Comment:
Yes, it is the case. Previously, partition boundaries were calculated at the
`evaluate` method of `window_expr`. This implementation was doing same work for
each window expression inside the `WindowAggExec`, although partition points of
each window expression are same. This simplifies body of the `evaluate` method
also.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]