http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/BlockLocator.cpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp deleted file mode 100644 index 81684ba..0000000 --- a/query_execution/BlockLocator.cpp +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/BlockLocator.hpp" - -#include <cstdlib> -#include <string> -#include <utility> - -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryExecutionUtil.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "threading/ThreadUtil.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" - -using std::free; -using std::malloc; -using std::move; - -using tmb::TaggedMessage; -using tmb::client_id; - -namespace quickstep { - -void BlockLocator::run() { - if (cpu_id_ >= 0) { - ThreadUtil::BindToCPU(cpu_id_); - } - - for (;;) { - // Receive() is a blocking call, causing this thread to sleep until next - // message is received. - const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - const client_id sender = annotated_message.sender; - LOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type() - << "' message from TMB Client " << sender; - switch (tagged_message.message_type()) { - case kBlockDomainRegistrationMessage: { - serialization::BlockDomainRegistrationMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processBlockDomainRegistrationMessage(sender, proto.domain_network_address()); - break; - } - case kAddBlockLocationMessage: { - serialization::BlockLocationMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const block_id block = proto.block_id(); - const block_id_domain domain = proto.block_domain(); - - const auto result_block_locations = block_locations_[block].insert(domain); - const auto result_domain_blocks = domain_blocks_[domain].insert(block); - DCHECK_EQ(result_block_locations.second, result_domain_blocks.second); - - if (result_domain_blocks.second) { - LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " loaded in Domain " << domain; - } else { - LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " existed in Domain " << domain; - } - break; - } - case kDeleteBlockLocationMessage: { - serialization::BlockLocationMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const block_id block = proto.block_id(); - const block_id_domain domain = proto.block_domain(); - - const auto cit = block_locations_[block].find(domain); - if (cit != block_locations_[block].end()) { - block_locations_[block].erase(domain); - domain_blocks_[domain].erase(block); - - LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " evicted in Domain " << domain; - } else { - LOG(INFO) << "Block " << BlockIdUtil::ToString(block) << " not found in Domain " << domain; - } - break; - } - case kLocateBlockMessage: { - serialization::BlockMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processLocateBlockMessage(sender, proto.block_id()); - break; - } - case kGetPeerDomainNetworkAddressesMessage: { - serialization::BlockMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id()); - break; - } - case kBlockDomainUnregistrationMessage: { - serialization::BlockDomainMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const block_id_domain domain = proto.block_domain(); - - domain_network_addresses_.erase(domain); - - for (const block_id block : domain_blocks_[domain]) { - block_locations_[block].erase(domain); - } - domain_blocks_.erase(domain); - - LOG(INFO) << "Unregistered Domain " << domain; - break; - } - case kPoisonMessage: { - return; - } - } - } -} - -void BlockLocator::processBlockDomainRegistrationMessage(const client_id receiver, - const std::string &network_address) { - DCHECK_LT(block_domain_, kMaxDomain); - - domain_network_addresses_.emplace(++block_domain_, network_address); - domain_blocks_[block_domain_]; - - serialization::BlockDomainMessage proto; - proto.set_block_domain(block_domain_); - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kBlockDomainRegistrationResponseMessage); - free(proto_bytes); - - LOG(INFO) << "BlockLocator (id '" << locator_client_id_ - << "') sent BlockDomainRegistrationResponseMessage (typed '" - << kBlockDomainRegistrationResponseMessage - << "') to Worker (id '" << receiver << "')"; - CHECK(tmb::MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(bus_, - locator_client_id_, - receiver, - move(message))); -} - -void BlockLocator::processLocateBlockMessage(const client_id receiver, - const block_id block) { - serialization::LocateBlockResponseMessage proto; - - for (const block_id_domain domain : block_locations_[block]) { - proto.add_block_domains(domain); - } - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kLocateBlockResponseMessage); - free(proto_bytes); - - LOG(INFO) << "BlockLocator (id '" << locator_client_id_ - << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage - << "') to StorageManager (id '" << receiver << "')"; - CHECK(tmb::MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(bus_, - locator_client_id_, - receiver, - move(message))); -} - -void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver, - const block_id block) { - serialization::GetPeerDomainNetworkAddressesResponseMessage proto; - - for (const block_id_domain domain : block_locations_[block]) { - proto.add_domain_network_addresses(domain_network_addresses_[domain]); - } - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kGetPeerDomainNetworkAddressesResponseMessage); - free(proto_bytes); - - LOG(INFO) << "BlockLocator (id '" << locator_client_id_ - << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '" - << kGetPeerDomainNetworkAddressesResponseMessage - << "') to StorageManager (id '" << receiver << "')"; - CHECK(tmb::MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(bus_, - locator_client_id_, - receiver, - move(message))); -} - -} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/BlockLocator.hpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp deleted file mode 100644 index a83a394..0000000 --- a/query_execution/BlockLocator.hpp +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_ - -#include <atomic> -#include <string> -#include <unordered_map> -#include <unordered_set> - -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "storage/StorageConstants.hpp" -#include "threading/Thread.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" - -namespace quickstep { - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A class for keeping trace of blocks loaded in a Worker's buffer pool - * in the distributed version. - **/ -class BlockLocator : public Thread { - public: - /** - * @brief Constructor. - * - * @param bus A pointer to the TMB. - * @param cpu_id The ID of the CPU to which the BlockLocator thread can be pinned. - * - * @note If cpu_id is not specified, BlockLocator thread can be possibly moved - * around on different CPUs by the OS. - **/ - BlockLocator(tmb::MessageBus *bus, - const int cpu_id = -1) - : bus_(DCHECK_NOTNULL(bus)), - cpu_id_(cpu_id), - block_domain_(0) { - locator_client_id_ = bus_->Connect(); - - bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainRegistrationMessage); - bus_->RegisterClientAsSender(locator_client_id_, kBlockDomainRegistrationResponseMessage); - - bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage); - bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage); - - bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage); - bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage); - - bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage); - bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage); - - bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage); - bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage); - } - - ~BlockLocator() override {} - - /** - * @brief Get the TMB client ID of BlockLocator thread. - * - * @return TMB client ID of BlockLocator thread. - **/ - tmb::client_id getBusClientID() const { - return locator_client_id_; - } - - protected: - void run() override; - - private: - void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address); - void processLocateBlockMessage(const tmb::client_id receiver, const block_id block); - void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block); - - tmb::MessageBus *bus_; - - // The ID of the CPU that the BlockLocator thread can optionally be pinned to. - const int cpu_id_; - - alignas(kCacheLineBytes) std::atomic<block_id_domain> block_domain_; - - // From a block domain to its network info in the ip:port format, i.e., - // "0.0.0.0:0". - std::unordered_map<block_id_domain, const std::string> domain_network_addresses_; - - // From a block to its domains. - std::unordered_map<block_id, std::unordered_set<block_id_domain>> block_locations_; - - // From a block domain to all blocks loaded in its buffer pool. - std::unordered_map<block_id_domain, std::unordered_set<block_id>> domain_blocks_; - - tmb::client_id locator_client_id_; - - DISALLOW_COPY_AND_ASSIGN(BlockLocator); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/BlockLocatorUtil.cpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocatorUtil.cpp b/query_execution/BlockLocatorUtil.cpp deleted file mode 100644 index d2d1e96..0000000 --- a/query_execution/BlockLocatorUtil.cpp +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/BlockLocatorUtil.hpp" - -#include <cstdlib> -#include <string> -#include <utility> - -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "storage/StorageBlockInfo.hpp" - -#include "glog/logging.h" - -#include "tmb/address.h" -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/message_style.h" -#include "tmb/tagged_message.h" - -using tmb::TaggedMessage; -using tmb::MessageBus; -using tmb::client_id; - -namespace quickstep { -namespace block_locator { - -namespace S = ::quickstep::serialization; - -block_id_domain getBlockDomain(const std::string &network_address, - const client_id cli_id, - client_id *locator_client_id, - MessageBus *bus) { - tmb::Address address; - address.All(true); - // NOTE(zuyu): The singleton BlockLocator would need only one copy of the message. - tmb::MessageStyle style; - - S::BlockDomainRegistrationMessage proto; - proto.set_domain_network_address(network_address); - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(std::malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kBlockDomainRegistrationMessage); - std::free(proto_bytes); - - DLOG(INFO) << "Client (id '" << cli_id - << "') broadcasts BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage - << "') to BlockLocator."; - - CHECK(MessageBus::SendStatus::kOK == - bus->Send(cli_id, address, style, std::move(message))); - - const tmb::AnnotatedMessage annotated_message(bus->Receive(cli_id, 0, true)); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); - - *locator_client_id = annotated_message.sender; - - DLOG(INFO) << "Client (id '" << cli_id - << "') received BlockDomainRegistrationResponseMessage (typed '" - << kBlockDomainRegistrationResponseMessage - << "') from BlockLocator (id '" << *locator_client_id << "')."; - - S::BlockDomainMessage response_proto; - CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - return static_cast<block_id_domain>(response_proto.block_domain()); -} - -} // namespace block_locator -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/BlockLocatorUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocatorUtil.hpp b/query_execution/BlockLocatorUtil.hpp deleted file mode 100644 index 74f65e4..0000000 --- a/query_execution/BlockLocatorUtil.hpp +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_ - -#include <string> - -#include "storage/StorageBlockInfo.hpp" - -#include "tmb/id_typedefs.h" - -namespace tmb { class MessageBus; } - -namespace quickstep { -namespace block_locator { - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief Broadcast to find BlockLocator to get a block domain for - * StorageManager with the given network address. - * - * @param network_address The network address of the StorageManager. - * @param cli_id The client ID of the block domain requester. - * @param locator_client_id The client ID of BlockLocator to set. - * @param bus A pointer to the TMB. - * - * @return The requested block domain. - **/ -block_id_domain getBlockDomain(const std::string &network_address, - const tmb::client_id cli_id, - tmb::client_id *locator_client_id, - tmb::MessageBus *bus); - -/** @} */ - -} // namespace block_locator -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt deleted file mode 100644 index 719d9f3..0000000 --- a/query_execution/CMakeLists.txt +++ /dev/null @@ -1,453 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryContext_proto_srcs queryexecution_QueryContext_proto_hdrs - QueryContext.proto) -QS_PROTOBUF_GENERATE_CPP(queryexecution_QueryExecutionMessages_proto_srcs - queryexecution_QueryExecutionMessages_proto_hdrs - QueryExecutionMessages.proto) - -if (BUILD_SHARED_LIBS) - set(GFLAGS_LIB_NAME gflags_nothreads-shared) -else() - set(GFLAGS_LIB_NAME gflags_nothreads-static) -endif() - -# Declare micro-libs: -add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp) -if (ENABLE_DISTRIBUTED) - add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) - add_library(quickstep_queryexecution_BlockLocatorUtil BlockLocatorUtil.cpp BlockLocatorUtil.hpp) -endif(ENABLE_DISTRIBUTED) -add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) -if (ENABLE_DISTRIBUTED) - add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp) -endif(ENABLE_DISTRIBUTED) -add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp) -add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp) -if (ENABLE_DISTRIBUTED) - add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp) -endif(ENABLE_DISTRIBUTED) -add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp) -add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp) -add_library(quickstep_queryexecution_QueryContext_proto - ${queryexecution_QueryContext_proto_srcs} - ${queryexecution_QueryContext_proto_hdrs}) -add_library(quickstep_queryexecution_QueryExecutionMessages_proto - ${queryexecution_QueryExecutionMessages_proto_srcs} - ${queryexecution_QueryExecutionMessages_proto_hdrs}) -add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryExecutionState.hpp) -add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp) -add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp) -add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp) -if (ENABLE_DISTRIBUTED) - add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp) -endif(ENABLE_DISTRIBUTED) -add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp) -if (ENABLE_DISTRIBUTED) - add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp) - add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp) -endif(ENABLE_DISTRIBUTED) -add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp) -add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp) -add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp) -add_library(quickstep_queryexecution_WorkerDirectory ../empty_src.cpp WorkerDirectory.hpp) -add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessage.hpp) -add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp) - -# Link dependencies: -target_link_libraries(quickstep_queryexecution_AdmitRequestMessage - quickstep_utility_Macros) -if (ENABLE_DISTRIBUTED) - target_link_libraries(quickstep_queryexecution_BlockLocator - glog - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageConstants - quickstep_threading_Thread - quickstep_threading_ThreadUtil - quickstep_utility_Macros - tmb) - target_link_libraries(quickstep_queryexecution_BlockLocatorUtil - glog - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_storage_StorageBlockInfo - tmb) -endif(ENABLE_DISTRIBUTED) -target_link_libraries(quickstep_queryexecution_ForemanBase - glog - quickstep_queryexecution_PolicyEnforcerBase - quickstep_threading_Thread - quickstep_utility_Macros - tmb) -if (ENABLE_DISTRIBUTED) - target_link_libraries(quickstep_queryexecution_ForemanDistributed - glog - quickstep_catalog_CatalogDatabase - quickstep_catalog_CatalogRelation - quickstep_catalog_CatalogTypedefs - quickstep_catalog_Catalog_proto - quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_ForemanBase - quickstep_queryexecution_PolicyEnforcerBase - quickstep_queryexecution_PolicyEnforcerDistributed - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_ShiftbossDirectory - quickstep_threading_ThreadUtil - quickstep_utility_EqualsAnyConstant - quickstep_utility_Macros - tmb - ${GFLAGS_LIB_NAME}) -endif(ENABLE_DISTRIBUTED) -target_link_libraries(quickstep_queryexecution_ForemanSingleNode - glog - quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_ForemanBase - quickstep_queryexecution_PolicyEnforcerBase - quickstep_queryexecution_PolicyEnforcerSingleNode - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_WorkerDirectory - quickstep_queryexecution_WorkerMessage - quickstep_threading_ThreadUtil - quickstep_utility_EqualsAnyConstant - quickstep_utility_Macros - tmb - ${GFLAGS_LIB_NAME}) -target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase - glog - quickstep_catalog_CatalogDatabase - quickstep_catalog_CatalogRelation - quickstep_catalog_PartitionScheme - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryManagerBase - quickstep_relationaloperators_WorkOrder - quickstep_storage_StorageBlockInfo - quickstep_utility_Macros - tmb - ${GFLAGS_LIB_NAME}) -if (ENABLE_DISTRIBUTED) - target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed - glog - quickstep_catalog_CatalogRelation - quickstep_catalog_Catalog_proto - quickstep_queryexecution_PolicyEnforcerBase - quickstep_queryexecution_QueryContext_proto - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_QueryManagerBase - quickstep_queryexecution_QueryManagerDistributed - quickstep_queryexecution_ShiftbossDirectory - quickstep_queryoptimizer_QueryHandle - quickstep_storage_StorageBlockInfo - quickstep_utility_Macros - tmb - ${GFLAGS_LIB_NAME}) -endif(ENABLE_DISTRIBUTED) -target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode - glog - quickstep_catalog_CatalogTypedefs - quickstep_queryexecution_PolicyEnforcerBase - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryManagerBase - quickstep_queryexecution_QueryManagerSingleNode - quickstep_queryexecution_WorkerDirectory - quickstep_queryexecution_WorkerMessage - quickstep_queryoptimizer_QueryHandle - quickstep_utility_Macros - tmb - ${GFLAGS_LIB_NAME}) -target_link_libraries(quickstep_queryexecution_QueryContext - glog - quickstep_catalog_CatalogDatabaseLite - quickstep_catalog_CatalogRelationSchema - quickstep_catalog_CatalogTypedefs - quickstep_expressions_ExpressionFactories - quickstep_expressions_predicate_Predicate - quickstep_expressions_scalar_Scalar - quickstep_expressions_tablegenerator_GeneratorFunctionFactory - quickstep_expressions_tablegenerator_GeneratorFunctionHandle - quickstep_expressions_tablegenerator_GeneratorFunction_proto - quickstep_queryexecution_QueryContext_proto - quickstep_storage_AggregationOperationState - quickstep_storage_HashTable - quickstep_storage_HashTableFactory - quickstep_storage_InsertDestination - quickstep_storage_InsertDestination_proto - quickstep_storage_WindowAggregationOperationState - quickstep_types_TypedValue - quickstep_types_containers_Tuple - quickstep_utility_Macros - quickstep_utility_SortConfiguration - quickstep_utility_lipfilter_LIPFilter - quickstep_utility_lipfilter_LIPFilterDeployment - quickstep_utility_lipfilter_LIPFilterFactory) -target_link_libraries(quickstep_queryexecution_QueryContext_proto - quickstep_expressions_Expressions_proto - quickstep_expressions_tablegenerator_GeneratorFunction_proto - quickstep_storage_AggregationOperationState_proto - quickstep_storage_HashTable_proto - quickstep_storage_InsertDestination_proto - quickstep_storage_WindowAggregationOperationState_proto - quickstep_types_containers_Tuple_proto - quickstep_utility_SortConfiguration_proto - quickstep_utility_lipfilter_LIPFilter_proto - ${PROTOBUF_LIBRARY}) -target_link_libraries(quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_catalog_Catalog_proto - quickstep_queryexecution_QueryContext_proto - quickstep_relationaloperators_WorkOrder_proto - ${PROTOBUF_LIBRARY}) -target_link_libraries(quickstep_queryexecution_QueryExecutionState - glog - quickstep_utility_Macros) -target_link_libraries(quickstep_queryexecution_QueryExecutionTypedefs - quickstep_threading_ThreadIDBasedMap - tmb) -target_link_libraries(quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_utility_Macros - tmb) -target_link_libraries(quickstep_queryexecution_QueryManagerBase - quickstep_catalog_CatalogTypedefs - quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryExecutionState - quickstep_queryoptimizer_QueryHandle - quickstep_queryoptimizer_QueryPlan - quickstep_relationaloperators_RelationalOperator - quickstep_relationaloperators_WorkOrder - quickstep_storage_StorageBlockInfo - quickstep_utility_DAG - quickstep_utility_Macros) -if (ENABLE_DISTRIBUTED) - target_link_libraries(quickstep_queryexecution_QueryManagerDistributed - quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_QueryManagerBase - quickstep_queryexecution_ShiftbossDirectory - quickstep_queryexecution_WorkOrderProtosContainer - quickstep_relationaloperators_RelationalOperator - quickstep_relationaloperators_WorkOrder_proto - quickstep_utility_DAG - quickstep_utility_Macros - tmb) -endif(ENABLE_DISTRIBUTED) -target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode - quickstep_catalog_CatalogTypedefs - quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryManagerBase - quickstep_queryexecution_WorkOrdersContainer - quickstep_queryexecution_WorkerMessage - quickstep_queryoptimizer_QueryHandle - quickstep_relationaloperators_RebuildWorkOrder - quickstep_relationaloperators_RelationalOperator - quickstep_storage_InsertDestination - quickstep_storage_StorageBlock - quickstep_utility_DAG - quickstep_utility_Macros - tmb) -if (ENABLE_DISTRIBUTED) - target_link_libraries(quickstep_queryexecution_Shiftboss - glog - quickstep_catalog_CatalogDatabaseCache - quickstep_catalog_CatalogTypedefs - quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_WorkerDirectory - quickstep_queryexecution_WorkerMessage - quickstep_relationaloperators_RebuildWorkOrder - quickstep_relationaloperators_WorkOrderFactory - quickstep_storage_InsertDestination - quickstep_storage_StorageBlock - quickstep_storage_StorageManager - quickstep_threading_Thread - quickstep_threading_ThreadUtil - quickstep_utility_Macros - tmb) - target_link_libraries(quickstep_queryexecution_ShiftbossDirectory - quickstep_utility_Macros - tmb) -endif(ENABLE_DISTRIBUTED) -target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer - glog - quickstep_relationaloperators_WorkOrder_proto - quickstep_utility_Macros) -target_link_libraries(quickstep_queryexecution_WorkOrdersContainer - glog - quickstep_relationaloperators_WorkOrder - quickstep_utility_Macros - quickstep_utility_PtrVector) -target_link_libraries(quickstep_queryexecution_Worker - glog - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_WorkerMessage - quickstep_relationaloperators_WorkOrder - quickstep_threading_Thread - quickstep_threading_ThreadIDBasedMap - quickstep_threading_ThreadUtil - quickstep_utility_Macros - tmb) -target_link_libraries(quickstep_queryexecution_WorkerDirectory - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_utility_Macros) -target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy - quickstep_queryexecution_WorkerDirectory - quickstep_utility_Macros) - -# Module all-in-one library: -add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp) -target_link_libraries(quickstep_queryexecution - quickstep_queryexecution_AdmitRequestMessage - quickstep_queryexecution_ForemanBase - quickstep_queryexecution_ForemanSingleNode - quickstep_queryexecution_PolicyEnforcerBase - quickstep_queryexecution_PolicyEnforcerSingleNode - quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryContext_proto - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_queryexecution_QueryManagerBase - quickstep_queryexecution_QueryManagerSingleNode - quickstep_queryexecution_WorkOrderProtosContainer - quickstep_queryexecution_WorkOrdersContainer - quickstep_queryexecution_Worker - quickstep_queryexecution_WorkerDirectory - quickstep_queryexecution_WorkerMessage - quickstep_queryexecution_WorkerSelectionPolicy) -if (ENABLE_DISTRIBUTED) - target_link_libraries(quickstep_queryexecution - quickstep_queryexecution_BlockLocator - quickstep_queryexecution_BlockLocatorUtil - quickstep_queryexecution_ForemanDistributed - quickstep_queryexecution_PolicyEnforcerDistributed - quickstep_queryexecution_QueryManagerDistributed - quickstep_queryexecution_Shiftboss - quickstep_queryexecution_ShiftbossDirectory) -endif(ENABLE_DISTRIBUTED) - -# Tests: -if (ENABLE_DISTRIBUTED) - add_executable(BlockLocator_unittest - "${CMAKE_CURRENT_SOURCE_DIR}/tests/BlockLocator_unittest.cpp") - target_link_libraries(BlockLocator_unittest - ${GFLAGS_LIB_NAME} - glog - gtest - quickstep_catalog_CatalogAttribute - quickstep_catalog_CatalogRelation - quickstep_queryexecution_BlockLocator - quickstep_queryexecution_BlockLocatorUtil - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil - quickstep_storage_StorageBlob - quickstep_storage_StorageBlock - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageConstants - quickstep_storage_StorageManager - quickstep_types_TypeFactory - quickstep_types_TypeID - tmb - ${LIBS}) - add_test(BlockLocator_unittest BlockLocator_unittest) -endif(ENABLE_DISTRIBUTED) - -add_executable(QueryManagerSingleNode_unittest - "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp") -target_link_libraries(QueryManagerSingleNode_unittest - glog - gtest - gtest_main - quickstep_catalog_CatalogDatabase - quickstep_catalog_CatalogRelation - quickstep_catalog_CatalogTypedefs - quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryContext_proto - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionState - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryManagerSingleNode - quickstep_queryexecution_WorkOrdersContainer - quickstep_queryexecution_WorkerDirectory - quickstep_queryexecution_WorkerMessage - quickstep_queryoptimizer_QueryHandle - quickstep_queryoptimizer_QueryPlan - quickstep_relationaloperators_RelationalOperator - quickstep_relationaloperators_WorkOrder - quickstep_storage_InsertDestination - quickstep_storage_InsertDestination_proto - quickstep_storage_StorageBlock - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageManager - quickstep_utility_DAG - quickstep_utility_Macros - tmb) -add_test(QueryManagerSingleNode_unittest QueryManagerSingleNode_unittest) - -add_executable(WorkOrdersContainer_unittest - "${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkOrdersContainer_unittest.cpp") -target_link_libraries(WorkOrdersContainer_unittest - glog - gtest - gtest_main - quickstep_queryexecution_WorkOrdersContainer - quickstep_relationaloperators_WorkOrder - quickstep_utility_Macros - quickstep_utility_PtrVector) -add_test(WorkOrdersContainer_unittest WorkOrdersContainer_unittest) - -add_executable(WorkerDirectory_unittest - "${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkerDirectory_unittest.cpp") -target_link_libraries(WorkerDirectory_unittest - gtest - gtest_main - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_WorkerDirectory) -add_test(WorkerDirectory_unittest WorkerDirectory_unittest) - -add_executable(WorkerSelectionPolicy_unittest - "${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkerSelectionPolicy_unittest.cpp") -target_link_libraries(WorkerSelectionPolicy_unittest - gtest - gtest_main - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_WorkerDirectory - quickstep_queryexecution_WorkerSelectionPolicy) -add_test(WorkerSelectionPolicy_unittest WorkerSelectionPolicy_unittest) - -file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/block_locator_test_data/) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanBase.hpp b/query_execution/ForemanBase.hpp deleted file mode 100644 index ee6c7ce..0000000 --- a/query_execution/ForemanBase.hpp +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_ - -#include <cstdio> -#include <memory> -#include <vector> - -#include "query_execution/PolicyEnforcerBase.hpp" -#include "threading/Thread.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" - -namespace quickstep { - -struct WorkOrderTimeEntry; - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A base class that Foreman implements. This class is used to derive - * for implementations for both the single-node and distributed versions. - **/ -class ForemanBase : public Thread { - public: - /** - * @brief Constructor. - * - * @param bus A pointer to the TMB. - * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. - * - * @note If cpu_id is not specified, Foreman thread can be possibly moved - * around on different CPUs by the OS. - **/ - ForemanBase(tmb::MessageBus *bus, - const int cpu_id) - : bus_(DCHECK_NOTNULL(bus)), - cpu_id_(cpu_id) { - foreman_client_id_ = bus_->Connect(); - } - - ~ForemanBase() override {} - - /** - * @brief Print the results of profiling individual work orders for a given - * query. - * - * TODO(harshad) - Add the name of the operator to the output. - * - * @param query_id The ID of the query for which the results are to be printed. - * @param out The file stream. - **/ - virtual void printWorkOrderProfilingResults(const std::size_t query_id, - std::FILE *out) const = 0; - - /** - * @brief Get the results of profiling individual work orders for a given - * query. - * - * @param query_id The ID of the query for which the results are to be printed. - * @return A vector of records, each being a single profiling entry. - **/ - const std::vector<WorkOrderTimeEntry>& getWorkOrderProfilingResults( - const std::size_t query_id) const { - return policy_enforcer_->getProfilingResults(query_id); - } - - /** - * @brief Get the TMB client ID of Foreman thread. - * - * @return TMB client ID of foreman thread. - **/ - tmb::client_id getBusClientID() const { - return foreman_client_id_; - } - - protected: - void run() override = 0; - - tmb::MessageBus *bus_; - - tmb::client_id foreman_client_id_; - - // The ID of the CPU that the Foreman thread can optionally be pinned to. - const int cpu_id_; - - std::unique_ptr<PolicyEnforcerBase> policy_enforcer_; - - private: - DISALLOW_COPY_AND_ASSIGN(ForemanBase); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_BASE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp deleted file mode 100644 index d619657..0000000 --- a/query_execution/ForemanDistributed.cpp +++ /dev/null @@ -1,347 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#include "query_execution/ForemanDistributed.hpp" - -#include <cstddef> -#include <cstdio> -#include <cstdlib> -#include <memory> -#include <unordered_map> -#include <unordered_set> -#include <utility> -#include <vector> - -#include "catalog/Catalog.pb.h" -#include "catalog/CatalogDatabase.hpp" -#include "catalog/CatalogRelation.hpp" -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/AdmitRequestMessage.hpp" -#include "query_execution/PolicyEnforcerBase.hpp" -#include "query_execution/PolicyEnforcerDistributed.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryExecutionUtil.hpp" -#include "query_execution/ShiftbossDirectory.hpp" -#include "threading/ThreadUtil.hpp" -#include "utility/EqualsAnyConstant.hpp" - -#include "glog/logging.h" - -#include "tmb/address.h" -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/message_style.h" -#include "tmb/tagged_message.h" - -using std::move; -using std::size_t; -using std::unique_ptr; -using std::vector; - -using tmb::AnnotatedMessage; -using tmb::MessageBus; -using tmb::TaggedMessage; -using tmb::client_id; - -namespace quickstep { - -namespace S = serialization; - -class QueryHandle; - -ForemanDistributed::ForemanDistributed( - MessageBus *bus, - CatalogDatabaseLite *catalog_database, - const int cpu_id) - : ForemanBase(bus, cpu_id), - catalog_database_(DCHECK_NOTNULL(catalog_database)) { - const std::vector<QueryExecutionMessageType> sender_message_types{ - kShiftbossRegistrationResponseMessage, - kQueryInitiateMessage, - kWorkOrderMessage, - kInitiateRebuildMessage, - kQueryTeardownMessage, - kSaveQueryResultMessage, - kQueryExecutionSuccessMessage, - kPoisonMessage}; - - for (const auto message_type : sender_message_types) { - bus_->RegisterClientAsSender(foreman_client_id_, message_type); - } - - const std::vector<QueryExecutionMessageType> receiver_message_types{ - kShiftbossRegistrationMessage, - kAdmitRequestMessage, - kQueryInitiateResponseMessage, - kCatalogRelationNewBlockMessage, - kDataPipelineMessage, - kInitiateRebuildResponseMessage, - kWorkOrderCompleteMessage, - kRebuildWorkOrderCompleteMessage, - kWorkOrderFeedbackMessage, - kSaveQueryResultResponseMessage, - kPoisonMessage}; - - for (const auto message_type : receiver_message_types) { - bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); - } - - policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>( - foreman_client_id_, - catalog_database_, - &shiftboss_directory_, - bus_); -} - -void ForemanDistributed::run() { - if (cpu_id_ >= 0) { - // We can pin the foreman thread to a CPU if specified. - ThreadUtil::BindToCPU(cpu_id_); - } - - // Ensure that at least one Shiftboss to register. - if (shiftboss_directory_.empty()) { - const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type()); - DLOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type() - << "' message from client " << annotated_message.sender; - - S::ShiftbossRegistrationMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity()); - DCHECK_EQ(1u, shiftboss_directory_.size()); - } - - // Event loop - for (;;) { - // Receive() causes this thread to sleep until next message is received. - const AnnotatedMessage annotated_message = - bus_->Receive(foreman_client_id_, 0, true); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - const tmb::message_type_id message_type = tagged_message.message_type(); - DLOG(INFO) << "ForemanDistributed received typed '" << message_type - << "' message from client " << annotated_message.sender; - switch (message_type) { - case kShiftbossRegistrationMessage: { - S::ShiftbossRegistrationMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity()); - break; - } - case kAdmitRequestMessage: { - const AdmitRequestMessage *request_message = - static_cast<const AdmitRequestMessage*>(tagged_message.message()); - - const vector<QueryHandle *> &query_handles = request_message->getQueryHandles(); - DCHECK(!query_handles.empty()); - - bool all_queries_admitted = true; - if (query_handles.size() == 1u) { - all_queries_admitted = - policy_enforcer_->admitQuery(query_handles.front()); - } else { - all_queries_admitted = policy_enforcer_->admitQueries(query_handles); - } - if (!all_queries_admitted) { - LOG(WARNING) << "The scheduler could not admit all the queries"; - // TODO(harshad) - Inform the main thread about the failure. - } - break; - } - case kQueryInitiateResponseMessage: { - S::QueryInitiateResponseMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - CHECK(policy_enforcer_->existQuery(proto.query_id())); - break; - } - case kCatalogRelationNewBlockMessage: // Fall through - case kDataPipelineMessage: - case kRebuildWorkOrderCompleteMessage: - case kWorkOrderCompleteMessage: - case kWorkOrderFeedbackMessage: { - policy_enforcer_->processMessage(tagged_message); - break; - } - case kInitiateRebuildResponseMessage: { - // A unique case in the distributed version. - static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())-> - processInitiateRebuildResponseMessage(tagged_message); - break; - } - case kSaveQueryResultResponseMessage: { - S::SaveQueryResultResponseMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const std::size_t query_id = proto.query_id(); - query_result_saved_shiftbosses_[query_id].insert(proto.shiftboss_index()); - - // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. - if (query_result_saved_shiftbosses_[query_id].size() == shiftboss_directory_.size()) { - processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id()); - query_result_saved_shiftbosses_.erase(query_id); - } - break; - } - case kPoisonMessage: { - if (policy_enforcer_->hasQueries()) { - LOG(WARNING) << "ForemanDistributed thread exiting while some queries are " - "under execution or waiting to be admitted"; - } - - // Shutdown all Shiftbosses. - tmb::Address shiftboss_addresses; - for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) { - shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i)); - } - - tmb::MessageStyle broadcast_style; - broadcast_style.Broadcast(true); - - TaggedMessage poison_message(kPoisonMessage); - - const MessageBus::SendStatus send_status = - bus_->Send(foreman_client_id_, - shiftboss_addresses, - broadcast_style, - move(poison_message)); - DCHECK(send_status == MessageBus::SendStatus::kOK); - return; - } - default: - LOG(FATAL) << "Unknown message type to ForemanDistributed"; - } - - if (canCollectNewMessages(message_type)) { - vector<unique_ptr<S::WorkOrderMessage>> new_messages; - static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())-> - getWorkOrderProtoMessages(&new_messages); - dispatchWorkOrderMessages(new_messages); - } - } -} - -bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) { - return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, - kCatalogRelationNewBlockMessage, - kWorkOrderFeedbackMessage) && - // TODO(zuyu): Multiple Shiftbosses support. - !shiftboss_directory_.hasReachedCapacity(0); -} - -void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) { - for (const auto &message : messages) { - DCHECK(message != nullptr); - // TODO(zuyu): Multiple Shiftbosses support. - sendWorkOrderMessage(0, *message); - shiftboss_directory_.incrementNumQueuedWorkOrders(0); - } -} - -void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index, - const S::WorkOrderMessage &proto) { - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kWorkOrderMessage); - free(proto_bytes); - - const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index); - DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage - << "') to Shiftboss with TMB client ID " << shiftboss_client_id; - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - shiftboss_client_id, - move(message)); - CHECK(send_status == MessageBus::SendStatus::kOK); -} - -void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id, - std::FILE *out) const { - const std::vector<WorkOrderTimeEntry> &recorded_times = - policy_enforcer_->getProfilingResults(query_id); - fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out); - for (const auto &workorder_entry : recorded_times) { - const std::size_t worker_id = workorder_entry.worker_id; - fprintf(out, - "%lu,%lu,%lu,%lu\n", - query_id, - worker_id, - workorder_entry.operator_id, // Operator ID. - workorder_entry.end_time - workorder_entry.start_time); // Time. - } -} - -void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shiftboss_client_id, - const std::size_t work_order_capacity) { - S::ShiftbossRegistrationResponseMessage proto; - proto.set_shiftboss_index(shiftboss_directory_.size()); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kShiftbossRegistrationResponseMessage); - free(proto_bytes); - - shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity); - - DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '" - << kShiftbossRegistrationResponseMessage - << "') to Shiftboss with TMB client id " << shiftboss_client_id; - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - shiftboss_client_id, - move(message)); - CHECK(send_status == MessageBus::SendStatus::kOK); -} - -void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id, - const relation_id result_relation_id) { - S::QueryExecutionSuccessMessage proto; - proto.mutable_result_relation()->MergeFrom( - static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto()); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kQueryExecutionSuccessMessage); - free(proto_bytes); - - // Notify the CLI regarding the query result. - DLOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '" - << kQueryExecutionSuccessMessage - << "') to CLI with TMB client id " << cli_id; - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - cli_id, - move(message)); - CHECK(send_status == MessageBus::SendStatus::kOK); -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp deleted file mode 100644 index ccdd0ae..0000000 --- a/query_execution/ForemanDistributed.hpp +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ - -#include <cstddef> -#include <cstdio> -#include <memory> -#include <unordered_map> -#include <unordered_set> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/ForemanBase.hpp" -#include "query_execution/ShiftbossDirectory.hpp" -#include "utility/Macros.hpp" - -#include "tmb/id_typedefs.h" - -namespace tmb { class MessageBus; } - -namespace quickstep { - -class CatalogDatabaseLite; - -namespace serialization { class WorkOrderMessage; } - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief The Foreman receives queries from the main thread, messages from the - * policy enforcer and dispatches the work to Shiftbosses. It also - * receives work completion messages from Shiftbosses. - **/ -class ForemanDistributed final : public ForemanBase { - public: - /** - * @brief Constructor. - * - * @param bus A pointer to the TMB. - * @param catalog_database The catalog database where this query is executed. - * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. - * - * @note If cpu_id is not specified, Foreman thread can be possibly moved - * around on different CPUs by the OS. - **/ - ForemanDistributed(tmb::MessageBus *bus, - CatalogDatabaseLite *catalog_database, - const int cpu_id = -1); - - ~ForemanDistributed() override {} - - void printWorkOrderProfilingResults(const std::size_t query_id, - std::FILE *out) const override; - - protected: - void run() override; - - private: - /** - * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the - * worker threads. - * - * @param messages The messages to be dispatched. - **/ - void dispatchWorkOrderMessages( - const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages); - - /** - * @brief Send the given message to the specified worker. - * - * @param worker_index The logical index of the recipient worker in - * ShiftbossDirectory. - * @param proto The WorkOrderMessage to be sent. - **/ - void sendWorkOrderMessage(const std::size_t worker_index, - const serialization::WorkOrderMessage &proto); - - void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id, - const std::size_t work_order_capacity); - - void processSaveQueryResultResponseMessage(const tmb::client_id cli_id, - const relation_id result_relation_id); - - /** - * @brief Check if we can collect new messages from the PolicyEnforcer. - * - * @param message_type The type of the last received message. - **/ - bool canCollectNewMessages(const tmb::message_type_id message_type); - - ShiftbossDirectory shiftboss_directory_; - - CatalogDatabaseLite *catalog_database_; - - // From a query id to a set of Shiftbosses that save query result. - std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_; - - DISALLOW_COPY_AND_ASSIGN(ForemanDistributed); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp deleted file mode 100644 index 02799c7..0000000 --- a/query_execution/ForemanSingleNode.cpp +++ /dev/null @@ -1,257 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/ForemanSingleNode.hpp" - -#include <cstddef> -#include <cstdio> -#include <memory> -#include <utility> -#include <vector> - -#include "query_execution/AdmitRequestMessage.hpp" -#include "query_execution/PolicyEnforcerBase.hpp" -#include "query_execution/PolicyEnforcerSingleNode.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryExecutionUtil.hpp" -#include "query_execution/WorkerDirectory.hpp" -#include "query_execution/WorkerMessage.hpp" -#include "threading/ThreadUtil.hpp" -#include "utility/EqualsAnyConstant.hpp" -#include "utility/Macros.hpp" - -#include "gflags/gflags.h" -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" - -using std::move; -using std::size_t; -using std::unique_ptr; -using std::vector; - -namespace quickstep { - -class QueryHandle; - -DEFINE_uint64(min_load_per_worker, 2, "The minimum load defined as the number " - "of pending work orders for the worker. This information is used " - "by the Foreman to assign work orders to worker threads"); - -ForemanSingleNode::ForemanSingleNode( - const tmb::client_id main_thread_client_id, - WorkerDirectory *worker_directory, - tmb::MessageBus *bus, - CatalogDatabaseLite *catalog_database, - StorageManager *storage_manager, - const int cpu_id, - const size_t num_numa_nodes) - : ForemanBase(bus, cpu_id), - main_thread_client_id_(main_thread_client_id), - worker_directory_(DCHECK_NOTNULL(worker_directory)), - catalog_database_(DCHECK_NOTNULL(catalog_database)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) { - const std::vector<QueryExecutionMessageType> sender_message_types{ - kPoisonMessage, - kRebuildWorkOrderMessage, - kWorkOrderMessage, - kWorkloadCompletionMessage}; - - for (const auto message_type : sender_message_types) { - bus_->RegisterClientAsSender(foreman_client_id_, message_type); - } - - const std::vector<QueryExecutionMessageType> receiver_message_types{ - kAdmitRequestMessage, - kCatalogRelationNewBlockMessage, - kDataPipelineMessage, - kPoisonMessage, - kRebuildWorkOrderCompleteMessage, - kWorkOrderFeedbackMessage, - kWorkOrderCompleteMessage}; - - for (const auto message_type : receiver_message_types) { - bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); - } - - policy_enforcer_ = std::make_unique<PolicyEnforcerSingleNode>( - foreman_client_id_, - num_numa_nodes, - catalog_database_, - storage_manager_, - worker_directory_, - bus_); -} - -void ForemanSingleNode::run() { - if (cpu_id_ >= 0) { - // We can pin the foreman thread to a CPU if specified. - ThreadUtil::BindToCPU(cpu_id_); - } - - // Event loop - for (;;) { - // Receive() causes this thread to sleep until next message is received. - const AnnotatedMessage annotated_msg = - bus_->Receive(foreman_client_id_, 0, true); - const TaggedMessage &tagged_message = annotated_msg.tagged_message; - const tmb::message_type_id message_type = tagged_message.message_type(); - switch (message_type) { - case kCatalogRelationNewBlockMessage: // Fall through - case kDataPipelineMessage: - case kRebuildWorkOrderCompleteMessage: - case kWorkOrderCompleteMessage: - case kWorkOrderFeedbackMessage: { - policy_enforcer_->processMessage(tagged_message); - break; - } - - case kAdmitRequestMessage: { - const AdmitRequestMessage *msg = - static_cast<const AdmitRequestMessage *>(tagged_message.message()); - const vector<QueryHandle *> &query_handles = msg->getQueryHandles(); - - DCHECK(!query_handles.empty()); - bool all_queries_admitted = true; - if (query_handles.size() == 1u) { - all_queries_admitted = - policy_enforcer_->admitQuery(query_handles.front()); - } else { - all_queries_admitted = policy_enforcer_->admitQueries(query_handles); - } - if (!all_queries_admitted) { - LOG(WARNING) << "The scheduler could not admit all the queries"; - // TODO(harshad) - Inform the main thread about the failure. - } - break; - } - case kPoisonMessage: { - if (policy_enforcer_->hasQueries()) { - LOG(WARNING) << "Foreman thread exiting while some queries are " - "under execution or waiting to be admitted"; - } - return; - } - default: - LOG(FATAL) << "Unknown message type to Foreman"; - } - - if (canCollectNewMessages(message_type)) { - vector<unique_ptr<WorkerMessage>> new_messages; - static_cast<PolicyEnforcerSingleNode*>(policy_enforcer_.get())-> - getWorkerMessages(&new_messages); - dispatchWorkerMessages(new_messages); - } - - // We check again, as some queries may produce zero work orders and finish - // their execution. - if (!policy_enforcer_->hasQueries()) { - // Signal the main thread that there are no queries to be executed. - // Currently the message doesn't have any real content. - TaggedMessage completion_tagged_message(kWorkloadCompletionMessage); - DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage - << "') to CLI with TMB client ID " << main_thread_client_id_; - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage( - bus_, - foreman_client_id_, - main_thread_client_id_, - move(completion_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); - } - } -} - -bool ForemanSingleNode::canCollectNewMessages(const tmb::message_type_id message_type) { - if (QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, - kCatalogRelationNewBlockMessage, - kWorkOrderFeedbackMessage)) { - return false; - } else if (worker_directory_->getLeastLoadedWorker().second <= - FLAGS_min_load_per_worker) { - // If the least loaded worker has only one pending work order, we should - // collect new messages and dispatch them. - return true; - } else { - return false; - } -} - -void ForemanSingleNode::dispatchWorkerMessages(const vector<unique_ptr<WorkerMessage>> &messages) { - for (const auto &message : messages) { - DCHECK(message != nullptr); - const int recipient_worker_thread_index = message->getRecipientHint(); - if (recipient_worker_thread_index != WorkerMessage::kInvalidRecipientIndexHint) { - sendWorkerMessage(static_cast<size_t>(recipient_worker_thread_index), - *message); - worker_directory_->incrementNumQueuedWorkOrders(recipient_worker_thread_index); - } else { - const size_t least_loaded_worker_thread_index = worker_directory_->getLeastLoadedWorker().first; - sendWorkerMessage(least_loaded_worker_thread_index, *message); - worker_directory_->incrementNumQueuedWorkOrders(least_loaded_worker_thread_index); - } - } -} - -void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index, - const WorkerMessage &message) { - tmb::message_type_id type; - if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) { - type = kRebuildWorkOrderMessage; - } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) { - type = kWorkOrderMessage; - } else { - FATAL_ERROR("Invalid WorkerMessageType"); - } - TaggedMessage worker_tagged_message(&message, sizeof(message), type); - - DLOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type - << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index); - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - worker_directory_->getClientID(worker_thread_index), - move(worker_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); -} - -void ForemanSingleNode::printWorkOrderProfilingResults(const std::size_t query_id, - std::FILE *out) const { - // TODO(harshad) - Add the CPU core ID of the operator to the output. This - // will require modifying the WorkerDirectory to remember worker affinities. - // Until then, the users can refer to the worker_affinities provided to the - // cli to infer the CPU core ID where a given worker is pinned. - const std::vector<WorkOrderTimeEntry> &recorded_times = - policy_enforcer_->getProfilingResults(query_id); - fputs("Query ID,Worker ID,NUMA Socket,Operator ID,Time (microseconds)\n", out); - for (auto workorder_entry : recorded_times) { - const std::size_t worker_id = workorder_entry.worker_id; - fprintf(out, - "%lu,%lu,%d,%lu,%lu\n", - query_id, - worker_id, - worker_directory_->getNUMANode(worker_id), - workorder_entry.operator_id, // Operator ID. - workorder_entry.end_time - workorder_entry.start_time); // Time. - } -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/ForemanSingleNode.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.hpp b/query_execution/ForemanSingleNode.hpp deleted file mode 100644 index d2db51b..0000000 --- a/query_execution/ForemanSingleNode.hpp +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ - -#include <cstddef> -#include <cstdio> -#include <memory> -#include <vector> - -#include "query_execution/ForemanBase.hpp" -#include "utility/Macros.hpp" - -#include "tmb/id_typedefs.h" - -namespace tmb { class MessageBus; } - -namespace quickstep { - -class CatalogDatabaseLite; -class StorageManager; -class WorkerDirectory; -class WorkerMessage; - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief The Foreman receives queries from the main thread, messages from the - * policy enforcer and dispatches the work to worker threads. It also - * receives work completion messages from workers. - **/ -class ForemanSingleNode final : public ForemanBase { - public: - /** - * @brief Constructor. - * - * @param main_thread_client_id The TMB client ID of the main thread. - * @param worker_directory The worker directory. - * @param bus A pointer to the TMB. - * @param catalog_database The catalog database where this query is executed. - * @param storage_manager The StorageManager to use. - * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. - * @param num_numa_nodes The number of NUMA nodes in the system. - * - * @note If cpu_id is not specified, Foreman thread can be possibly moved - * around on different CPUs by the OS. - **/ - ForemanSingleNode(const tmb::client_id main_thread_client_id, - WorkerDirectory *worker_directory, - tmb::MessageBus *bus, - CatalogDatabaseLite *catalog_database, - StorageManager *storage_manager, - const int cpu_id = -1, - const std::size_t num_numa_nodes = 1); - - ~ForemanSingleNode() override {} - - void printWorkOrderProfilingResults(const std::size_t query_id, - std::FILE *out) const override; - - protected: - void run() override; - - private: - /** - * @brief Dispatch schedulable WorkOrders, wrapped in WorkerMessages to the - * worker threads. - * - * @param messages The messages to be dispatched. - **/ - void dispatchWorkerMessages( - const std::vector<std::unique_ptr<WorkerMessage>> &messages); - - /** - * @brief Send the given message to the specified worker. - * - * @param worker_thread_index The logical index of the recipient worker thread - * in WorkerDirectory. - * @param message The WorkerMessage to be sent. - **/ - void sendWorkerMessage(const std::size_t worker_thread_index, - const WorkerMessage &message); - - /** - * @brief Check if we can collect new messages from the PolicyEnforcer. - * - * @param message_type The type of the last received message. - **/ - bool canCollectNewMessages(const tmb::message_type_id message_type); - - const tmb::client_id main_thread_client_id_; - - WorkerDirectory *worker_directory_; - - CatalogDatabaseLite *catalog_database_; - StorageManager *storage_manager_; - - DISALLOW_COPY_AND_ASSIGN(ForemanSingleNode); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_