Repository: incubator-quickstep Updated Branches: refs/heads/collision-free-agg 6986b2a3f -> 8a694213f
Updates Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8a694213 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8a694213 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8a694213 Branch: refs/heads/collision-free-agg Commit: 8a694213fd16082a886378b70effba45a2fc85cd Parents: 6986b2a Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Fri Feb 3 14:45:14 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Fri Feb 3 14:45:14 2017 -0600 ---------------------------------------------------------------------- query_optimizer/CMakeLists.txt | 2 +- query_optimizer/ExecutionGenerator.cpp | 8 +- relational_operators/CMakeLists.txt | 10 +- .../FinalizeAggregationOperator.cpp | 8 +- .../InitializeAggregationOperator.cpp | 78 ++++++++++++ .../InitializeAggregationOperator.hpp | 122 +++++++++++++++++++ .../InitializeAggregationStateOperator.cpp | 68 ----------- .../InitializeAggregationStateOperator.hpp | 103 ---------------- relational_operators/WorkOrder.proto | 10 +- relational_operators/WorkOrderFactory.cpp | 13 ++ 10 files changed, 236 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index 7f75264..a755832 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -126,7 +126,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator quickstep_relationaloperators_HashJoinOperator - quickstep_relationaloperators_InitializeAggregationStateOperator + quickstep_relationaloperators_InitializeAggregationOperator quickstep_relationaloperators_InsertOperator quickstep_relationaloperators_NestedLoopsJoinOperator quickstep_relationaloperators_RelationalOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index b2ce27b..04fb105 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -106,7 +106,7 @@ #include "relational_operators/DropTableOperator.hpp" #include "relational_operators/FinalizeAggregationOperator.hpp" #include "relational_operators/HashJoinOperator.hpp" -#include "relational_operators/InitializeAggregationStateOperator.hpp" +#include "relational_operators/InitializeAggregationOperator.hpp" #include "relational_operators/InsertOperator.hpp" #include "relational_operators/NestedLoopsJoinOperator.hpp" #include "relational_operators/RelationalOperator.hpp" @@ -1670,14 +1670,14 @@ void ExecutionGenerator::convertAggregate( } if (use_parallel_initialization) { - const QueryPlan::DAGNodeIndex initialize_aggregation_state_operator_index = + const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index = execution_plan_->addRelationalOperator( - new InitializeAggregationStateOperator( + new InitializeAggregationOperator( query_handle_->query_id(), aggr_state_index)); execution_plan_->addDirectDependency(aggregation_operator_index, - initialize_aggregation_state_operator_index, + initialize_aggregation_operator_index, true /* is_pipeline_breaker */); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index bd20059..df4114d 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -47,9 +47,9 @@ add_library(quickstep_relationaloperators_FinalizeAggregationOperator FinalizeAggregationOperator.cpp FinalizeAggregationOperator.hpp) add_library(quickstep_relationaloperators_HashJoinOperator HashJoinOperator.cpp HashJoinOperator.hpp) -add_library(quickstep_relationaloperators_InitializeAggregationStateOperator - InitializeAggregationStateOperator.cpp - InitializeAggregationStateOperator.hpp) +add_library(quickstep_relationaloperators_InitializeAggregationOperator + InitializeAggregationOperator.cpp + InitializeAggregationOperator.hpp) add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp InsertOperator.hpp) add_library(quickstep_relationaloperators_NestedLoopsJoinOperator NestedLoopsJoinOperator.cpp @@ -257,7 +257,7 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator quickstep_utility_lipfilter_LIPFilterAdaptiveProber quickstep_utility_lipfilter_LIPFilterUtil tmb) -target_link_libraries(quickstep_relationaloperators_InitializeAggregationStateOperator +target_link_libraries(quickstep_relationaloperators_InitializeAggregationOperator glog quickstep_queryexecution_QueryContext quickstep_queryexecution_WorkOrderProtosContainer @@ -562,7 +562,7 @@ target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator quickstep_relationaloperators_HashJoinOperator - quickstep_relationaloperators_InitializeAggregationStateOperator + quickstep_relationaloperators_InitializeAggregationOperator quickstep_relationaloperators_InsertOperator quickstep_relationaloperators_NestedLoopsJoinOperator quickstep_relationaloperators_RebuildWorkOrder http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index b66030b..72beb60 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -44,13 +44,13 @@ bool FinalizeAggregationOperator::getAllWorkOrders( AggregationOperationState *agg_state = query_context->getAggregationState(aggr_state_index_); DCHECK(agg_state != nullptr); - for (std::size_t partition_id = 0; - partition_id < agg_state->getNumPartitions(); - ++partition_id) { + for (std::size_t part_id = 0; + part_id < agg_state->getNumPartitions(); + ++part_id) { container->addNormalWorkOrder( new FinalizeAggregationWorkOrder( query_id_, - partition_id, + part_id, agg_state, query_context->getInsertDestination(output_destination_index_)), op_index_); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/InitializeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp new file mode 100644 index 0000000..3da719d --- /dev/null +++ b/relational_operators/InitializeAggregationOperator.cpp @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/InitializeAggregationOperator.hpp" + +#include <cstddef> + +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" +#include "storage/AggregationOperationState.hpp" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool InitializeAggregationOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + if (!started_) { + AggregationOperationState *agg_state = + query_context->getAggregationState(aggr_state_index_); + DCHECK(agg_state != nullptr); + + for (std::size_t part_id = 0; + part_id < agg_state->getNumInitializationPartitions(); + ++part_id) { + container->addNormalWorkOrder( + new InitializeAggregationWorkOrder(query_id_, + part_id, + agg_state), + op_index_); + } + started_ = true; + } + return true; +} + +bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + if (!started_) { + started_ = true; + + serialization::WorkOrder *proto = new serialization::WorkOrder; + proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION); + proto->set_query_id(query_id_); + proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, + aggr_state_index_); + + container->addWorkOrderProto(proto, op_index_); + } + return started_; +} + +void InitializeAggregationWorkOrder::execute() { + state_->initializeState(partition_id_); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/InitializeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp new file mode 100644 index 0000000..4ca3bd5 --- /dev/null +++ b/relational_operators/InitializeAggregationOperator.hpp @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_ + +#include <string> + +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class AggregationOperationState; +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +namespace serialization { class WorkOrder; } + +/** \addtogroup RelationalOperators + * @{ + */ + +/** + * @brief An operator which initializes an AggregationOperationState. + **/ +class InitializeAggregationOperator : public RelationalOperator { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of this query. + * @param aggr_state_index The index of the AggregationOperationState in QueryContext. + **/ + InitializeAggregationOperator(const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index) + : RelationalOperator(query_id), + aggr_state_index_(aggr_state_index), + started_(false) {} + + ~InitializeAggregationOperator() override {} + + std::string getName() const override { + return "InitializeAggregationStateOperator"; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + private: + const QueryContext::aggregation_state_id aggr_state_index_; + bool started_; + + DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator); +}; + +/** + * @brief A WorkOrder produced by InitializeAggregationOperator. + **/ +class InitializeAggregationWorkOrder : public WorkOrder { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of the query to which this operator belongs. + * @param partition_id The partition ID for which the work order is issued. + * @param state The AggregationOperationState to be initialized. + */ + InitializeAggregationWorkOrder(const std::size_t query_id, + const std::size_t partition_id, + AggregationOperationState *state) + : WorkOrder(query_id), + partition_id_(partition_id), + state_(DCHECK_NOTNULL(state)) {} + + ~InitializeAggregationWorkOrder() override {} + + void execute() override; + + private: + const std::size_t partition_id_; + + AggregationOperationState *state_; + + DISALLOW_COPY_AND_ASSIGN(InitializeAggregationWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/InitializeAggregationStateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationStateOperator.cpp b/relational_operators/InitializeAggregationStateOperator.cpp deleted file mode 100644 index b041aef..0000000 --- a/relational_operators/InitializeAggregationStateOperator.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "relational_operators/InitializeAggregationStateOperator.hpp" - -#include <vector> - -#include "query_execution/QueryContext.hpp" -#include "query_execution/WorkOrderProtosContainer.hpp" -#include "query_execution/WorkOrdersContainer.hpp" -#include "relational_operators/WorkOrder.pb.h" -#include "storage/AggregationOperationState.hpp" - -#include "tmb/id_typedefs.h" - -namespace quickstep { - -bool InitializeAggregationStateOperator::getAllWorkOrders( - WorkOrdersContainer *container, - QueryContext *query_context, - StorageManager *storage_manager, - const tmb::client_id scheduler_client_id, - tmb::MessageBus *bus) { - if (!started_) { - AggregationOperationState *agg_state = - query_context->getAggregationState(aggr_state_index_); - DCHECK(agg_state != nullptr); - - for (std::size_t part_id = 0; - part_id < agg_state->getNumInitializationPartitions(); - ++part_id) { - container->addNormalWorkOrder( - new InitializeAggregationStateWorkOrder(query_id_, - part_id, - agg_state), - op_index_); - } - started_ = true; - } - return true; -} - -bool InitializeAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { - // TODO - LOG(FATAL) << "Not implemented"; -} - -void InitializeAggregationStateWorkOrder::execute() { - state_->initializeState(partition_id_); -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/InitializeAggregationStateOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationStateOperator.hpp b/relational_operators/InitializeAggregationStateOperator.hpp deleted file mode 100644 index 10403b3..0000000 --- a/relational_operators/InitializeAggregationStateOperator.hpp +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_ -#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_ - -#include <string> - -#include "query_execution/QueryContext.hpp" -#include "relational_operators/RelationalOperator.hpp" -#include "relational_operators/WorkOrder.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" - -namespace tmb { class MessageBus; } - -namespace quickstep { - -class AggregationOperationState; -class StorageManager; -class WorkOrderProtosContainer; -class WorkOrdersContainer; - -namespace serialization { class WorkOrder; } - -/** \addtogroup RelationalOperators - * @{ - */ - -class InitializeAggregationStateOperator : public RelationalOperator { - public: - InitializeAggregationStateOperator(const std::size_t query_id, - const QueryContext::aggregation_state_id aggr_state_index) - : RelationalOperator(query_id), - aggr_state_index_(aggr_state_index), - started_(false) {} - - ~InitializeAggregationStateOperator() override {} - - std::string getName() const override { - return "InitializeAggregationStateOperator"; - } - - bool getAllWorkOrders(WorkOrdersContainer *container, - QueryContext *query_context, - StorageManager *storage_manager, - const tmb::client_id scheduler_client_id, - tmb::MessageBus *bus) override; - - bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; - - private: - const QueryContext::aggregation_state_id aggr_state_index_; - bool started_; - - DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateOperator); -}; - -class InitializeAggregationStateWorkOrder : public WorkOrder { - public: - InitializeAggregationStateWorkOrder(const std::size_t query_id, - const std::size_t partition_id, - AggregationOperationState *state) - : WorkOrder(query_id), - partition_id_(partition_id), - state_(DCHECK_NOTNULL(state)) {} - - ~InitializeAggregationStateWorkOrder() override {} - - void execute() override; - - private: - const std::size_t partition_id_; - - AggregationOperationState *state_; - - DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateWorkOrder); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 76753d2..83bb121 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -44,6 +44,7 @@ enum WorkOrderType { UPDATE = 20; WINDOW_AGGREGATION = 21; DESTROY_AGGREGATION_STATE = 22; + INITIALIZE_AGGREGATION = 23; } message WorkOrder { @@ -278,6 +279,13 @@ message WindowAggregationWorkOrder { message DestroyAggregationStateWorkOrder { extend WorkOrder { - optional uint32 aggr_state_index = 339; + optional uint32 aggr_state_index = 352; + } +} + +message InitializeAggregationWorkOrder { + extend WorkOrder { + // All required. + optional uint32 aggr_state_index = 368; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8a694213/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 5e8d03d..99bca7b 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -37,6 +37,7 @@ #include "relational_operators/DropTableOperator.hpp" #include "relational_operators/FinalizeAggregationOperator.hpp" #include "relational_operators/HashJoinOperator.hpp" +#include "relational_operators/InitializeAggregationOperator.hpp" #include "relational_operators/InsertOperator.hpp" #include "relational_operators/NestedLoopsJoinOperator.hpp" #include "relational_operators/SampleOperator.hpp" @@ -319,6 +320,13 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder LOG(FATAL) << "Unknown HashJoinWorkOrder Type in WorkOrderFactory::ReconstructFromProto"; } } + case serialization::INITIALIZE_AGGREGATION: { + LOG(INFO) << "Creating InitializeAggregationWorkOrder in Shiftboss " << shiftboss_index; + return new InitializeAggregationWorkOrder( + proto.query_id(), + query_context->getAggregationState(proto.GetExtension( + serialization::InitializeAggregationWorkOrder::aggr_state_index))); + } case serialization::INSERT: { LOG(INFO) << "Creating InsertWorkOrder in Shiftboss " << shiftboss_index; return new InsertWorkOrder( @@ -693,6 +701,11 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)) && proto.HasExtension(serialization::HashJoinWorkOrder::block_id); } + case serialization::INITIALIZE_AGGREGATION: { + return proto.HasExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index) && + query_context.isValidAggregationStateId( + proto.GetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index)); + } case serialization::INSERT: { return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) && query_context.isValidInsertDestinationId(