Added Vector Aggregation support in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e79b520e Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e79b520e Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e79b520e Branch: refs/heads/trace Commit: e79b520ec919fbe101ad72978c02216eeeeb6ca6 Parents: 8f094a1 Author: Zuyu Zhang <[email protected]> Authored: Fri Aug 4 17:03:34 2017 -0500 Committer: Zuyu Zhang <[email protected]> Committed: Thu Oct 12 11:44:44 2017 -0500 ---------------------------------------------------------------------- .../FinalizeAggregationOperator.cpp | 31 ++++++++++---------- .../InitializeAggregationOperator.cpp | 23 +++++++-------- relational_operators/WorkOrderFactory.cpp | 2 -- 3 files changed, 27 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 68d0ef4..92fc7f6 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -67,28 +67,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders( return true; } -// TODO(quickstep-team) : Think about how the number of partitions could be -// accessed in this function. Until then, we can't use partitioned aggregation -// finalization with the distributed version. bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { if (started_) { return true; } for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) { - serialization::WorkOrder *proto = new serialization::WorkOrder; - proto->set_work_order_type(serialization::FINALIZE_AGGREGATION); - proto->set_query_id(query_id_); - proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index, - aggr_state_index_); - proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id, - part_id); - proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id, - 0u); - proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index, - output_destination_index_); + for (std::size_t state_part_id = 0; + state_part_id < aggr_state_num_partitions_; + ++state_part_id) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::FINALIZE_AGGREGATION); + proto->set_query_id(query_id_); + proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index, + aggr_state_index_); + proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id, + part_id); + proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id, + state_part_id); + proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index, + output_destination_index_); - container->addWorkOrderProto(proto, op_index_); + container->addWorkOrderProto(proto, op_index_); + } } started_ = true; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/InitializeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp index 39a6fb4..89dfd7e 100644 --- a/relational_operators/InitializeAggregationOperator.cpp +++ b/relational_operators/InitializeAggregationOperator.cpp @@ -64,26 +64,25 @@ bool InitializeAggregationOperator::getAllWorkOrders( return true; } -// TODO(quickstep-team) : Think about how the number of partitions could be -// accessed in this function. Until then, we can't use partitioned aggregation -// initialization with the distributed version. bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { - LOG(FATAL) << "Not supported"; - if (started_) { return true; } for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) { - serialization::WorkOrder *proto = new serialization::WorkOrder; - proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION); - proto->set_query_id(query_id_); + for (std::size_t state_part_id = 0; + state_part_id < aggr_state_num_init_partitions_; + ++state_part_id) { + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION); + proto->set_query_id(query_id_); - proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_); - proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id); - proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, 0u); + proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_); + proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id); + proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, state_part_id); - container->addWorkOrderProto(proto, op_index_); + container->addWorkOrderProto(proto, op_index_); + } } started_ = true; return true; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e79b520e/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 25cc81a..3a991bd 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -237,8 +237,6 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder LOG(INFO) << "Creating FinalizeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id << " in Shiftboss " << shiftboss_index; - // TODO(quickstep-team): Handle inner-table partitioning in the distributed - // setting. return new FinalizeAggregationWorkOrder( query_id, part_id,
