Github user zuyu commented on a diff in the pull request:
https://github.com/apache/incubator-quickstep/pull/49#discussion_r69974697
--- Diff: query_optimizer/ExecutionGenerator.cpp ---
@@ -1639,5 +1641,101 @@ void ExecutionGenerator::convertTableGenerator(
temporary_relation_info_vec_.emplace_back(tablegen_index,
output_relation);
}
+void ExecutionGenerator::convertWindowAggregate(
+ const P::WindowAggregatePtr &physical_plan) {
+ // Create window_aggregation_operation_state proto.
+ const QueryContext::window_aggregation_state_id window_aggr_state_index =
+ query_context_proto_->window_aggregation_states_size();
+ S::WindowAggregationOperationState *window_aggr_state_proto =
+ query_context_proto_->add_window_aggregation_states();
+
+ // Get input.
+ const CatalogRelationInfo *input_relation_info =
+ findRelationInfoOutputByPhysical(physical_plan->input());
+
window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+
+ // Get window aggregate function expression.
+ const E::AliasPtr &named_window_aggregate_expression =
+ physical_plan->window_aggregate_expression();
+ const E::WindowAggregateFunctionPtr &window_aggregate_function =
+ std::static_pointer_cast<const E::WindowAggregateFunction>(
+ named_window_aggregate_expression->expression());
+
+ // Set the AggregateFunction.
+ window_aggr_state_proto->mutable_function()->CopyFrom(
+ window_aggregate_function->window_aggregate().getProto());
+
+ // Set the arguments.
+ for (const E::ScalarPtr &argument :
window_aggregate_function->arguments()) {
+ unique_ptr<const Scalar>
concretized_argument(argument->concretize(attribute_substitution_map_));
+
window_aggr_state_proto->add_arguments()->CopyFrom(concretized_argument->getProto());
+ }
+
+ // Set partition keys.
+ const E::WindowInfo window_info =
window_aggregate_function->window_info();
+ for (const E::ScalarPtr &partition_by_attribute
+ : window_info.partition_by_attributes) {
+ unique_ptr<const Scalar> concretized_partition_by_attribute(
+ partition_by_attribute->concretize(attribute_substitution_map_));
+ window_aggr_state_proto->add_partition_by_attributes()
+ ->CopyFrom(concretized_partition_by_attribute->getProto());
+ }
+
+ // Set window frame info.
+ if (window_info.frame_info == nullptr) {
+ // If the frame is not specified, use the default setting:
+ // 1. If ORDER BY key is specified, use cumulative aggregation:
+ // ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
+ // 2. If ORDER BY key is not specified either, use the whole
partition:
+ // ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
+ window_aggr_state_proto->set_is_row(true); /* frame mode: ROWS
*/
+ window_aggr_state_proto->set_num_preceding(-1); /* UNBOUNDED
PRECEDING */
+ if (window_info.order_by_attributes.empty()) {
+ window_aggr_state_proto->set_num_following(-1); /* UNBOUNDED
FOLLOWING */
+ } else {
+ window_aggr_state_proto->set_num_following(0); /* CURRENT ROW */
+ }
+ } else {
+ const E::WindowFrameInfo* window_frame_info = window_info.frame_info;
--- End diff --
Code style: `const E::WindowFrameInfo *window_frame_info`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---