http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp deleted file mode 100644 index fb9a9d6..0000000 --- a/query_execution/QueryExecutionTypedefs.hpp +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_ - -#include <cstddef> -#include <unordered_map> -#include <vector> - -#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED -#include "threading/ThreadIDBasedMap.hpp" - -#include "tmb/address.h" -#include "tmb/id_typedefs.h" -#include "tmb/message_style.h" -#include "tmb/pure_memory_message_bus.h" -#include "tmb/tagged_message.h" - -namespace quickstep { - -/** \addtogroup QueryExecution - * @{ - */ - -typedef tmb::Address Address; -typedef tmb::AnnotatedMessage AnnotatedMessage; -typedef tmb::MessageBus MessageBus; -typedef tmb::MessageStyle MessageStyle; -typedef tmb::Priority Priority; -typedef tmb::PureMemoryMessageBus<false> MessageBusImpl; -typedef tmb::TaggedMessage TaggedMessage; -typedef tmb::client_id client_id; -typedef tmb::message_type_id message_type_id; - -using ClientIDMap = ThreadIDBasedMap<client_id, - 'C', - 'l', - 'i', - 'e', - 'n', - 't', - 'I', - 'D', - 'M', - 'a', - 'p'>; - -// We sort the following message types in the order of a life cycle of a query. -enum QueryExecutionMessageType : message_type_id { - kAdmitRequestMessage = 0, // Requesting a query (or queries) to be admitted, from - // the main thread to Foreman. - kWorkOrderMessage, // From Foreman to Worker. - kWorkOrderCompleteMessage, // From Worker to Foreman. - kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman. - kDataPipelineMessage, // From InsertDestination or some WorkOrders to Foreman. - kWorkOrderFeedbackMessage, // From some WorkOrders to Foreman on behalf of - // their corresponding RelationalOperators. - kRebuildWorkOrderMessage, // From Foreman to Worker. - kRebuildWorkOrderCompleteMessage, // From Worker to Foreman. - kWorkloadCompletionMessage, // From Foreman to main thread. - kPoisonMessage, // From the main thread to Foreman and Workers. - -#ifdef QUICKSTEP_DISTRIBUTED - kShiftbossRegistrationMessage, // From Shiftboss to Foreman. - kShiftbossRegistrationResponseMessage, // From Foreman to Shiftboss, or from - // Shiftboss to Worker. - kQueryInitiateMessage, // From Foreman to Shiftboss. - kQueryInitiateResponseMessage, // From Shiftboss to Foreman. - - kInitiateRebuildMessage, // From Foreman to Shiftboss. - kInitiateRebuildResponseMessage, // From Shiftboss to Foreman. - - kQueryTeardownMessage, // From Foreman to Shiftboss. - - kSaveQueryResultMessage, // From Foreman to Shiftboss. - kSaveQueryResultResponseMessage, // From Shiftboss to Foreman. - - // From Foreman to CLI. - kQueryExecutionSuccessMessage, - - // BlockLocator related messages, sorted in a life cycle of StorageManager - // with a unique block domain. - kBlockDomainRegistrationMessage, // From Worker to BlockLocator. - kBlockDomainRegistrationResponseMessage, // From BlockLocator to Worker. - kAddBlockLocationMessage, // From StorageManager to BlockLocator. - kDeleteBlockLocationMessage, // From StorageManager to BlockLocator. - kLocateBlockMessage, // From StorageManager to BlockLocator. - kLocateBlockResponseMessage, // From BlockLocator to StorageManager. - kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator. - kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager. - kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator. -#endif -}; - -// WorkOrder profiling data structures. -// Profiling record for an individual work order. -struct WorkOrderTimeEntry { - std::size_t worker_id; - std::size_t operator_id; - std::size_t start_time; // Epoch time measured in microseconds - std::size_t end_time; // Epoch time measured in microseconds -}; -// Key = query ID. -// Value = vector of work order profiling records. -typedef std::unordered_map<std::size_t, std::vector<WorkOrderTimeEntry>> WorkOrderTimeRecorder; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_TYPEDEFS_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp deleted file mode 100644 index b41965c..0000000 --- a/query_execution/QueryExecutionUtil.hpp +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_ - -#include <cstddef> -#include <memory> -#include <utility> - -#include "query_execution/AdmitRequestMessage.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -#include "tmb/address.h" -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/message_style.h" -#include "tmb/tagged_message.h" - -namespace quickstep { - -class QueryHandle; - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A static class for reusable methods in query_execution module. - **/ -class QueryExecutionUtil { - public: - /** - * @brief Send a TMB message to a single receiver. - * - * @param bus A pointer to the TMB. - * @param sender_id The client ID of the sender. - * @param receiver_id The client ID of the receiver. - * @param tagged_message A moved-from reference to the tagged message. - * - * @return A status code indicating the result of the message getting sent. - * The caller should ensure that the status is SendStatus::kOK. - **/ - static tmb::MessageBus::SendStatus SendTMBMessage( - tmb::MessageBus *bus, - tmb::client_id sender_id, - tmb::client_id receiver_id, - tmb::TaggedMessage &&tagged_message) { // NOLINT(whitespace/operators) - tmb::Address receiver_address; - receiver_address.AddRecipient(receiver_id); - - tmb::MessageStyle single_receiver_style; - return bus->Send(sender_id, - receiver_address, - single_receiver_style, - std::move(tagged_message)); - } - - /** - * @brief Construct and send an AdmitRequestMessage from a given sender to a - * given recipient. - * - * @param sender_id The TMB client ID of the sender. - * @param receiver_id The TMB client ID of the receiver. - * @param query_handle The QueryHandle used in the AdmitRequestMessage. - * @param bus A pointer to the TMB. - * @param tagged_message A moved from reference to the tagged message. - * - * @return A status code indicating the result of the message delivery. - * The caller should ensure that the status is SendStatus::kOK. - **/ - static tmb::MessageBus::SendStatus ConstructAndSendAdmitRequestMessage( - const tmb::client_id sender_id, - const tmb::client_id receiver_id, - QueryHandle *query_handle, - tmb::MessageBus *bus) { - std::unique_ptr<AdmitRequestMessage> request_message( - new AdmitRequestMessage(query_handle)); - const std::size_t size_of_request_msg = sizeof(*request_message); - tmb::TaggedMessage admit_tagged_message( - request_message.release(), size_of_request_msg, kAdmitRequestMessage); - - return QueryExecutionUtil::SendTMBMessage( - bus, sender_id, receiver_id, std::move(admit_tagged_message)); - } - - /** - * @brief Receive a query completion message. - * - * @param receiver_id The TMB client ID of the receiver thread. - * @param bus A pointer to the TMB. - * - * @note Right now the query completion message is of no interest to the - * caller. In the future, if this message needs to be fetched, make this - * function return the TaggedMessage. - **/ - static void ReceiveQueryCompletionMessage(const tmb::client_id receiver_id, - tmb::MessageBus *bus) { - const tmb::AnnotatedMessage annotated_msg = - bus->Receive(receiver_id, 0, true); - const tmb::TaggedMessage &tagged_message = annotated_msg.tagged_message; - DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type()); - } - - static void BroadcastMessage(const tmb::client_id sender_id, - const tmb::Address &addresses, - tmb::TaggedMessage &&tagged_message, // NOLINT(whitespace/operators) - tmb::MessageBus *bus) { - // The sender broadcasts the given message to all 'addresses'. - tmb::MessageStyle style; - style.Broadcast(true); - - const tmb::MessageBus::SendStatus send_status = - bus->Send(sender_id, addresses, style, std::move(tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); - } - - static void BroadcastPoisonMessage(const tmb::client_id sender_id, tmb::MessageBus *bus) { - // Terminate all threads. - // The sender thread broadcasts poison message to the workers and foreman. - // Each worker dies after receiving poison message. The order of workers' - // death is irrelavant. - tmb::MessageStyle style; - style.Broadcast(true); - tmb::Address address; - address.All(true); - tmb::TaggedMessage poison_tagged_message(kPoisonMessage); - - DLOG(INFO) << "TMB client ID " << sender_id - << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all"; - const tmb::MessageBus::SendStatus send_status = bus->Send( - sender_id, address, style, std::move(poison_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); - } - - private: - /** - * @brief Constructor. Made private to avoid instantiation. - **/ - QueryExecutionUtil(); - - DISALLOW_COPY_AND_ASSIGN(QueryExecutionUtil); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_UTIL_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp deleted file mode 100644 index 8e37da8..0000000 --- a/query_execution/QueryManagerBase.cpp +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/QueryManagerBase.hpp" - -#include <memory> -#include <utility> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/QueryContext.hpp" -#include "query_optimizer/QueryHandle.hpp" -#include "query_optimizer/QueryPlan.hpp" -#include "relational_operators/WorkOrder.hpp" -#include "storage/StorageBlockInfo.hpp" - -#include "glog/logging.h" - -using std::pair; - -namespace quickstep { - -QueryManagerBase::QueryManagerBase(QueryHandle *query_handle) - : query_handle_(DCHECK_NOTNULL(query_handle)), - query_id_(query_handle->query_id()), - query_dag_(DCHECK_NOTNULL( - DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())), - num_operators_in_dag_(query_dag_->size()), - output_consumers_(num_operators_in_dag_), - blocking_dependencies_(num_operators_in_dag_), - query_exec_state_(new QueryExecutionState(num_operators_in_dag_)) { - for (dag_node_index node_index = 0; - node_index < num_operators_in_dag_; - ++node_index) { - const QueryContext::insert_destination_id insert_destination_index = - query_dag_->getNodePayload(node_index).getInsertDestinationID(); - if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) { - // Rebuild is necessary whenever InsertDestination is present. - query_exec_state_->setRebuildRequired(node_index); - query_exec_state_->setRebuildStatus(node_index, 0, false); - } - - for (const pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(node_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) { - // The link is not a pipeline-breaker. Streaming of blocks is possible - // between these two operators. - output_consumers_[node_index].push_back(dependent_op_index); - } else { - // The link is a pipeline-breaker. Streaming of blocks is not possible - // between these two operators. - blocking_dependencies_[dependent_op_index].push_back(node_index); - } - } - } -} - -QueryManagerBase::QueryStatusCode QueryManagerBase::queryStatus( - const dag_node_index op_index) { - // As kQueryExecuted takes precedence over kOperatorExecuted, we first check - // whether the query has finished its execution. - if (query_exec_state_->hasQueryExecutionFinished()) { - return QueryStatusCode::kQueryExecuted; - } - - if (query_exec_state_->hasExecutionFinished(op_index)) { - return QueryStatusCode::kOperatorExecuted; - } - - return QueryStatusCode::kNone; -} - -void QueryManagerBase::processFeedbackMessage( - const dag_node_index op_index, const WorkOrder::FeedbackMessage &msg) { - RelationalOperator *op = - query_dag_->getNodePayloadMutable(op_index); - op->receiveFeedbackMessage(msg); -} - -void QueryManagerBase::processWorkOrderCompleteMessage( - const dag_node_index op_index) { - query_exec_state_->decrementNumQueuedWorkOrders(op_index); - - // Check if new work orders are available and fetch them if so. - fetchNormalWorkOrders(op_index); - - if (checkRebuildRequired(op_index)) { - if (checkNormalExecutionOver(op_index)) { - if (!checkRebuildInitiated(op_index)) { - if (initiateRebuild(op_index)) { - // Rebuild initiated and completed right away. - markOperatorFinished(op_index); - } else { - // Rebuild under progress. - } - } else if (checkRebuildOver(op_index)) { - // Rebuild was under progress and now it is over. - markOperatorFinished(op_index); - } - } else { - // Normal execution under progress for this operator. - } - } else if (checkOperatorExecutionOver(op_index)) { - // Rebuild not required for this operator and its normal execution is - // complete. - markOperatorFinished(op_index); - } - - for (const pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - // Process the dependent operator (of the operator whose WorkOrder - // was just executed) for which all the dependencies have been met. - processOperator(dependent_op_index, true); - } - } -} - -void QueryManagerBase::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) { - query_exec_state_->decrementNumRebuildWorkOrders(op_index); - - if (checkRebuildOver(op_index)) { - markOperatorFinished(op_index); - - for (const pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); - } - } - } -} - -void QueryManagerBase::processOperator(const dag_node_index index, - const bool recursively_check_dependents) { - if (fetchNormalWorkOrders(index)) { - // Fetched work orders. Return to wait for the generated work orders to - // execute, and skip the execution-finished checks. - return; - } - - if (checkNormalExecutionOver(index)) { - if (checkRebuildRequired(index)) { - if (!checkRebuildInitiated(index)) { - // Rebuild hasn't started, initiate it. - if (initiateRebuild(index)) { - // Rebuild initiated and completed right away. - markOperatorFinished(index); - } else { - // Rebuild WorkOrders have been generated. - return; - } - } else if (checkRebuildOver(index)) { - // Rebuild had been initiated and it is over. - markOperatorFinished(index); - } - } else { - // Rebuild is not required and normal execution over, mark finished. - markOperatorFinished(index); - } - // If we reach here, that means the operator has been marked as finished. - if (recursively_check_dependents) { - for (const pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); - } - } - } - } -} - -void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index, - const block_id block, - const relation_id rel_id) { - for (const dag_node_index consumer_index : - output_consumers_[op_index]) { - // Feed the streamed block to the consumer. Note that 'output_consumers_' - // only contain those dependents of operator with index = op_index which are - // eligible to receive streamed input. - query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id); - // Because of the streamed input just fed, check if there are any new - // WorkOrders available and if so, fetch them. - fetchNormalWorkOrders(consumer_index); - } -} - -void QueryManagerBase::markOperatorFinished(const dag_node_index index) { - query_exec_state_->setExecutionFinished(index); - - RelationalOperator *op = query_dag_->getNodePayloadMutable(index); - op->updateCatalogOnCompletion(); - - const relation_id output_rel = op->getOutputRelationID(); - for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) { - const dag_node_index dependent_op_index = dependent_link.first; - RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index); - // Signal dependent operator that current operator is done feeding input blocks. - if (output_rel >= 0) { - dependent_op->doneFeedingInputBlocks(output_rel); - } - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - dependent_op->informAllBlockingDependenciesMet(); - } - } -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp deleted file mode 100644 index a274742..0000000 --- a/query_execution/QueryManagerBase.hpp +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_ - -#include <cstddef> -#include <memory> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/QueryExecutionState.hpp" -#include "relational_operators/RelationalOperator.hpp" -#include "relational_operators/WorkOrder.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "utility/DAG.hpp" -#include "utility/Macros.hpp" - -namespace quickstep { - -class QueryHandle; - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A base class that manages the execution of a query including - * generation of new work orders, and keeping track of the query - * exection state. - **/ -class QueryManagerBase { - public: - typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index; - - /** - * @brief Return codes for queryStatus() function. - * - * @note When both operator and query get executed, kQueryExecuted takes - * precedence over kOperatorExecuted. - **/ - enum class QueryStatusCode { - kOperatorExecuted = 0, // An operator in the query finished execution. - kQueryExecuted, // The query got executed. - kNone // None of the above. - }; - - /** - * @brief Constructor. - * - * @param query_handle The QueryHandle object for this query. - **/ - explicit QueryManagerBase(QueryHandle *query_handle); - - /** - * @brief Virtual destructor. - **/ - virtual ~QueryManagerBase() {} - - /** - * @brief Get the query handle. - **/ - const QueryHandle* query_handle() const { - return query_handle_; - } - - /** - * @brief Get the QueryExecutionState for this query. - **/ - inline const QueryExecutionState& getQueryExecutionState() const { - return *query_exec_state_; - } - - /** - * @brief Process the received WorkOrder complete message. - * - * @param op_index The index of the specified operator node in the query DAG - * for the completed WorkOrder. - **/ - void processWorkOrderCompleteMessage(const dag_node_index op_index); - - /** - * @brief Process the received RebuildWorkOrder complete message. - * - * @param op_index The index of the specified operator node in the query DAG - * for the completed RebuildWorkOrder. - **/ - void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index); - - /** - * @brief Process the received data pipeline message. - * - * @param op_index The index of the specified operator node in the query DAG - * for the pipelining block. - * @param block The block id. - * @param rel_id The ID of the relation that produced 'block'. - **/ - void processDataPipelineMessage(const dag_node_index op_index, - const block_id block, - const relation_id rel_id); - - /** - * @brief Fetch all work orders currently available in relational operator and - * store them internally. - * - * @param index The index of the relational operator to be processed in the - * query plan DAG. - * - * @return Whether any work order was generated by op. - **/ - virtual bool fetchNormalWorkOrders(const dag_node_index index) = 0; - - /** - * @brief Process the received work order feedback message and notify - * relational operator. - * - * @param op_index The index of the specified operator node in the query DAG - * for the feedback message. - * @param message Feedback message from work order. - **/ - void processFeedbackMessage(const dag_node_index op_index, - const WorkOrder::FeedbackMessage &message); - - /** - * @brief Get the query status after processing an incoming message. - * - * @param op_index The index of the specified operator node in the query DAG - * for the incoming message. - * - * @return QueryStatusCode as determined after the message is processed. - **/ - QueryStatusCode queryStatus(const dag_node_index op_index); - - protected: - /** - * @brief Process a current relational operator: Get its workorders and store - * them in the WorkOrdersContainer for this query. If the operator can - * be marked as done, do so. - * - * @param index The index of the relational operator to be processed in the - * query plan DAG. - * @param recursively_check_dependents If an operator is done, should we - * call processOperator on its dependents recursively. - **/ - void processOperator(const dag_node_index index, - const bool recursively_check_dependents); - - /** - * @brief This function does the following things: - * 1. Mark the given relational operator as "done". - * 2. For all the dependents of this operator, check if all of their - * blocking dependencies are met. If so inform them that the blocking - * dependencies are met. - * 3. Check if the given operator is done producing output. If it's - * done, inform the dependents that they won't receive input anymore - * from the given operator. - * - * @param index The index of the given relational operator in the DAG. - **/ - void markOperatorFinished(const dag_node_index index); - - /** - * @brief Check if all the dependencies of the node at specified index have - * finished their execution. - * - * @note This function's true return value is a pre-requisite for calling - * getRebuildWorkOrders() - * - * @param node_index The index of the specified node in the query DAG. - * - * @return True if all the dependencies have finished their execution. False - * otherwise. - **/ - inline bool checkAllDependenciesMet(const dag_node_index node_index) const { - for (const dag_node_index dependency_index : - query_dag_->getDependencies(node_index)) { - // If at least one of the dependencies is not met, return false. - if (!query_exec_state_->hasExecutionFinished(dependency_index)) { - return false; - } - } - return true; - } - - /** - * @brief Check if all the blocking dependencies of the node at specified - * index have finished their execution. - * - * @note A blocking dependency is the one which is pipeline breaker. Output of - * a dependency can't be streamed to its dependent if the link between - * them is pipeline breaker. - * - * @param node_index The index of the specified node in the query DAG. - * - * @return True if all the blocking dependencies have finished their - * execution. False otherwise. - **/ - inline bool checkAllBlockingDependenciesMet( - const dag_node_index node_index) const { - for (const dag_node_index blocking_dependency_index : - blocking_dependencies_[node_index]) { - if (!query_exec_state_->hasExecutionFinished( - blocking_dependency_index)) { - return false; - } - } - return true; - } - - /** - * @brief Check if the execution of the given operator is over. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the execution of the given operator is over, false - * otherwise. - **/ - inline bool checkOperatorExecutionOver(const dag_node_index index) const { - return this->checkNormalExecutionOver(index) && - (!checkRebuildRequired(index) || this->checkRebuildOver(index)); - } - - /** - * @brief Check if the rebuild operation is required for a given operator. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the rebuild operation is required, false otherwise. - **/ - inline bool checkRebuildRequired(const dag_node_index index) const { - return query_exec_state_->isRebuildRequired(index); - } - - /** - * @brief Check if the rebuild operation for a given operator has been - * initiated. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the rebuild operation has been initiated, false otherwise. - **/ - inline bool checkRebuildInitiated(const dag_node_index index) const { - return query_exec_state_->hasRebuildInitiated(index); - } - - const QueryHandle *query_handle_; - - const std::size_t query_id_; - - DAG<RelationalOperator, bool> *query_dag_; // Owned by 'query_handle_'. - const dag_node_index num_operators_in_dag_; - - // For all nodes, store their receiving dependents. - std::vector<std::vector<dag_node_index>> output_consumers_; - - // For all nodes, store their pipeline breaking dependencies (if any). - std::vector<std::vector<dag_node_index>> blocking_dependencies_; - - std::unique_ptr<QueryExecutionState> query_exec_state_; - - private: - /** - * @brief Check if the given operator's normal execution is over. - * - * @note The conditions for a given operator's normal execution to get over: - * 1. All of its normal (i.e. non rebuild) WorkOrders have finished - * execution. - * 2. The operator is done generating work orders. - * 3. All of the dependencies of the given operator have been met. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the normal execution of the given operator is over, false - * otherwise. - **/ - virtual bool checkNormalExecutionOver(const dag_node_index index) const = 0; - - /** - * @brief Initiate the rebuild process for partially filled blocks generated - * during the execution of the given operator. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the rebuild is over immediately, i.e. the operator didn't - * generate any rebuild WorkOrders, false otherwise. - **/ - virtual bool initiateRebuild(const dag_node_index index) = 0; - - /** - * @brief Check if the rebuild operation for a given operator is over. - * - * @param index The index of the given operator in the DAG. - * - * @return True if the rebuild operation is over, false otherwise. - **/ - virtual bool checkRebuildOver(const dag_node_index index) const = 0; - - DISALLOW_COPY_AND_ASSIGN(QueryManagerBase); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_BASE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp deleted file mode 100644 index 7d45933..0000000 --- a/query_execution/QueryManagerDistributed.cpp +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/QueryManagerDistributed.hpp" - -#include <cstddef> -#include <cstdlib> -#include <memory> -#include <utility> - -#include "query_execution/QueryContext.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryExecutionUtil.hpp" -#include "query_execution/ShiftbossDirectory.hpp" -#include "query_execution/WorkOrderProtosContainer.hpp" -#include "relational_operators/RelationalOperator.hpp" -#include "relational_operators/WorkOrder.pb.h" -#include "utility/DAG.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" - -using std::free; -using std::malloc; -using std::move; -using std::size_t; -using std::unique_ptr; - -namespace quickstep { - -QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle, - const ShiftbossDirectory *shiftboss_directory, - const tmb::client_id foreman_client_id, - tmb::MessageBus *bus) - : QueryManagerBase(query_handle), - shiftboss_directory_(shiftboss_directory), - foreman_client_id_(foreman_client_id), - bus_(bus), - normal_workorder_protos_container_( - new WorkOrderProtosContainer(num_operators_in_dag_)) { - // Collect all the workorders from all the relational operators in the DAG. - for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) { - if (checkAllBlockingDependenciesMet(index)) { - query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet(); - processOperator(index, false); - } - } -} - -serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage( - const dag_node_index start_operator_index) { - // Default policy: Operator with lowest index first. - size_t num_operators_checked = 0; - for (dag_node_index index = start_operator_index; - num_operators_checked < num_operators_in_dag_; - index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) { - if (query_exec_state_->hasExecutionFinished(index)) { - continue; - } - unique_ptr<serialization::WorkOrder> work_order_proto( - normal_workorder_protos_container_->getWorkOrderProto(index)); - if (work_order_proto != nullptr) { - query_exec_state_->incrementNumQueuedWorkOrders(index); - - unique_ptr<serialization::WorkOrderMessage> message_proto(new serialization::WorkOrderMessage); - message_proto->set_query_id(query_id_); - message_proto->set_operator_index(index); - message_proto->mutable_work_order()->MergeFrom(*work_order_proto); - - return message_proto.release(); - } - } - // No normal WorkOrder protos available right now. - return nullptr; -} - -bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) { - bool generated_new_workorder_protos = false; - if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) { - // Do not fetch any work units until all blocking dependencies are met. - // The releational operator is not aware of blocking dependencies for - // uncorrelated scalar queries. - if (!checkAllBlockingDependenciesMet(index)) { - return false; - } - const size_t num_pending_workorder_protos_before = - normal_workorder_protos_container_->getNumWorkOrderProtos(index); - const bool done_generation = - query_dag_->getNodePayloadMutable(index) - ->getAllWorkOrderProtos(normal_workorder_protos_container_.get()); - if (done_generation) { - query_exec_state_->setDoneGenerationWorkOrders(index); - } - - // TODO(shoban): It would be a good check to see if operator is making - // useful progress, i.e., the operator either generates work orders to - // execute or still has pending work orders executing. However, this will not - // work if Foreman polls operators without feeding data. This check can be - // enabled, if Foreman is refactored to call getAllWorkOrders() only when - // pending work orders are completed or new input blocks feed. - - generated_new_workorder_protos = - (num_pending_workorder_protos_before < - normal_workorder_protos_container_->getNumWorkOrderProtos(index)); - } - return generated_new_workorder_protos; -} - -void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index, - const std::size_t num_rebuild_work_orders) { - // TODO(zuyu): Multiple Shiftbosses support. - query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true); - - if (num_rebuild_work_orders != 0u) { - // Wait for the rebuild work orders to finish. - return; - } - - // No needs for rebuilds. - markOperatorFinished(op_index); - - for (const std::pair<dag_node_index, bool> &dependent_link : - query_dag_->getDependents(op_index)) { - const dag_node_index dependent_op_index = dependent_link.first; - if (checkAllBlockingDependenciesMet(dependent_op_index)) { - processOperator(dependent_op_index, true); - } - } -} - -bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) { - DCHECK(checkRebuildRequired(index)); - DCHECK(!checkRebuildInitiated(index)); - - const RelationalOperator &op = query_dag_->getNodePayload(index); - DCHECK_NE(op.getInsertDestinationID(), QueryContext::kInvalidInsertDestinationId); - - serialization::InitiateRebuildMessage proto; - proto.set_query_id(query_id_); - proto.set_operator_index(index); - proto.set_insert_destination_index(op.getInsertDestinationID()); - proto.set_relation_id(op.getOutputRelationID()); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage tagged_msg(static_cast<const void *>(proto_bytes), - proto_length, - kInitiateRebuildMessage); - free(proto_bytes); - - LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage - << "') to Shiftboss"; - // TODO(zuyu): Multiple workers support. - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - shiftboss_directory_->getClientId(0), - move(tagged_msg)); - - // The negative value indicates that the number of rebuild work orders is to be - // determined. - query_exec_state_->setRebuildStatus(index, -1, true); - - // Wait for Shiftbosses to report the number of rebuild work orders. - return false; -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp deleted file mode 100644 index e609ab8..0000000 --- a/query_execution/QueryManagerDistributed.hpp +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_ - -#include <cstddef> -#include <memory> - -#include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryManagerBase.hpp" -#include "query_execution/WorkOrderProtosContainer.hpp" -#include "utility/Macros.hpp" - -#include "tmb/id_typedefs.h" - -namespace tmb { class MessageBus; } - -namespace quickstep { - -class QueryHandle; -class ShiftbossDirectory; - -namespace serialization { class WorkOrderMessage; } - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A class that manages the execution of a query including generation - * of new work orders, keeping track of the query exection state. - **/ -class QueryManagerDistributed final : public QueryManagerBase { - public: - /** - * @brief Constructor. - * - * @param query_handle The QueryHandle object for this query. - * @param shiftboss_directory The ShiftbossDirectory to use. - * @param foreman_client_id The TMB client ID of the foreman thread. - * @param bus The TMB used for communication. - **/ - QueryManagerDistributed(QueryHandle *query_handle, - const ShiftbossDirectory *shiftboss_directory, - const tmb::client_id foreman_client_id, - tmb::MessageBus *bus); - - ~QueryManagerDistributed() override {} - - bool fetchNormalWorkOrders(const dag_node_index index) override; - - /** - * @brief Process the initiate rebuild work order response message. - * - * @param op_index The index of the specified operator node in the query DAG - * for initiating the rebuild work order. - * @param num_rebuild_work_orders The number of the rebuild work orders - * generated for the operator indexed by 'op_index'. - **/ - void processInitiateRebuildResponseMessage(const dag_node_index op_index, - const std::size_t num_rebuild_work_orders); - - /** - * @brief Get the next normal workorder to be excuted, wrapped in a - * WorkOrderMessage proto. - * - * @param start_operator_index Begin the search for the schedulable WorkOrder - * with the operator at this index. - * - * @return A pointer to the WorkOrderMessage proto. If there is no WorkOrder - * to be executed, return NULL. - **/ - serialization::WorkOrderMessage* getNextWorkOrderMessage( - const dag_node_index start_operator_index); - - private: - bool checkNormalExecutionOver(const dag_node_index index) const override { - return (checkAllDependenciesMet(index) && - !normal_workorder_protos_container_->hasWorkOrderProto(index) && - query_exec_state_->getNumQueuedWorkOrders(index) == 0 && - query_exec_state_->hasDoneGenerationWorkOrders(index)); - } - - bool initiateRebuild(const dag_node_index index) override; - - bool checkRebuildOver(const dag_node_index index) const override { - return query_exec_state_->hasRebuildInitiated(index) && - (query_exec_state_->getNumRebuildWorkOrders(index) == 0); - } - - const ShiftbossDirectory *shiftboss_directory_; - - const tmb::client_id foreman_client_id_; - tmb::MessageBus *bus_; - - std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_; - - DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp deleted file mode 100644 index 237796f..0000000 --- a/query_execution/QueryManagerSingleNode.cpp +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/QueryManagerSingleNode.hpp" - -#include <cstddef> -#include <memory> -#include <utility> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/WorkerMessage.hpp" -#include "query_optimizer/QueryHandle.hpp" -#include "relational_operators/RebuildWorkOrder.hpp" -#include "relational_operators/RelationalOperator.hpp" -#include "storage/InsertDestination.hpp" -#include "storage/StorageBlock.hpp" -#include "utility/DAG.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" - -namespace quickstep { - -class WorkOrder; - -QueryManagerSingleNode::QueryManagerSingleNode( - const tmb::client_id foreman_client_id, - const std::size_t num_numa_nodes, - QueryHandle *query_handle, - CatalogDatabaseLite *catalog_database, - StorageManager *storage_manager, - tmb::MessageBus *bus) - : QueryManagerBase(query_handle), - foreman_client_id_(foreman_client_id), - storage_manager_(DCHECK_NOTNULL(storage_manager)), - bus_(DCHECK_NOTNULL(bus)), - query_context_(new QueryContext(query_handle->getQueryContextProto(), - *catalog_database, - storage_manager_, - foreman_client_id_, - bus_)), - workorders_container_( - new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) { - // Collect all the workorders from all the relational operators in the DAG. - for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) { - if (checkAllBlockingDependenciesMet(index)) { - query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet(); - processOperator(index, false); - } - } -} - -WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage( - const dag_node_index start_operator_index, const numa_node_id numa_node) { - // Default policy: Operator with lowest index first. - WorkOrder *work_order = nullptr; - size_t num_operators_checked = 0; - for (dag_node_index index = start_operator_index; - num_operators_checked < num_operators_in_dag_; - index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) { - if (query_exec_state_->hasExecutionFinished(index)) { - continue; - } - if (numa_node != kAnyNUMANodeID) { - // First try to get a normal WorkOrder from the specified NUMA node. - work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node); - if (work_order != nullptr) { - // A WorkOrder found on the given NUMA node. - query_exec_state_->incrementNumQueuedWorkOrders(index); - return WorkerMessage::WorkOrderMessage(work_order, index); - } else { - // Normal workorder not found on this node. Look for a rebuild workorder - // on this NUMA node. - work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node); - if (work_order != nullptr) { - return WorkerMessage::RebuildWorkOrderMessage(work_order, index); - } - } - } - // Either no workorder found on the given NUMA node, or numa_node is - // 'kAnyNUMANodeID'. - // Try to get a normal WorkOrder from other NUMA nodes. - work_order = workorders_container_->getNormalWorkOrder(index); - if (work_order != nullptr) { - query_exec_state_->incrementNumQueuedWorkOrders(index); - return WorkerMessage::WorkOrderMessage(work_order, index); - } else { - // Normal WorkOrder not found, look for a RebuildWorkOrder. - work_order = workorders_container_->getRebuildWorkOrder(index); - if (work_order != nullptr) { - return WorkerMessage::RebuildWorkOrderMessage(work_order, index); - } - } - } - // No WorkOrders available right now. - return nullptr; -} - -bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) { - bool generated_new_workorders = false; - if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) { - // Do not fetch any work units until all blocking dependencies are met. - // The releational operator is not aware of blocking dependencies for - // uncorrelated scalar queries. - if (!checkAllBlockingDependenciesMet(index)) { - return false; - } - const size_t num_pending_workorders_before = - workorders_container_->getNumNormalWorkOrders(index); - const bool done_generation = - query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(), - query_context_.get(), - storage_manager_, - foreman_client_id_, - bus_); - if (done_generation) { - query_exec_state_->setDoneGenerationWorkOrders(index); - } - - // TODO(shoban): It would be a good check to see if operator is making - // useful progress, i.e., the operator either generates work orders to - // execute or still has pending work orders executing. However, this will not - // work if Foreman polls operators without feeding data. This check can be - // enabled, if Foreman is refactored to call getAllWorkOrders() only when - // pending work orders are completed or new input blocks feed. - - generated_new_workorders = - (num_pending_workorders_before < - workorders_container_->getNumNormalWorkOrders(index)); - } - return generated_new_workorders; -} - -bool QueryManagerSingleNode::initiateRebuild(const dag_node_index index) { - DCHECK(!workorders_container_->hasRebuildWorkOrder(index)); - DCHECK(checkRebuildRequired(index)); - DCHECK(!checkRebuildInitiated(index)); - - getRebuildWorkOrders(index, workorders_container_.get()); - - query_exec_state_->setRebuildStatus( - index, workorders_container_->getNumRebuildWorkOrders(index), true); - - return (query_exec_state_->getNumRebuildWorkOrders(index) == 0); -} - -void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index, - WorkOrdersContainer *container) { - const RelationalOperator &op = query_dag_->getNodePayload(index); - const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID(); - - if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) { - return; - } - - std::vector<MutableBlockReference> partially_filled_block_refs; - - DCHECK(query_context_ != nullptr); - InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index); - DCHECK(insert_destination != nullptr); - - insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs); - - for (std::vector<MutableBlockReference>::size_type i = 0; - i < partially_filled_block_refs.size(); - ++i) { - container->addRebuildWorkOrder( - new RebuildWorkOrder(query_id_, - std::move(partially_filled_block_refs[i]), - index, - op.getOutputRelationID(), - foreman_client_id_, - bus_), - index); - } -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryManagerSingleNode.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp deleted file mode 100644 index dd044a5..0000000 --- a/query_execution/QueryManagerSingleNode.hpp +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_ - -#include <cstddef> -#include <memory> - -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/QueryContext.hpp" -#include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryManagerBase.hpp" -#include "query_execution/WorkOrdersContainer.hpp" -#include "utility/Macros.hpp" - -#include "tmb/id_typedefs.h" - -namespace tmb { class MessageBus; } - -namespace quickstep { - -class CatalogDatabaseLite; -class QueryHandle; -class StorageManager; -class WorkerMessage; - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A class that manages the execution of a query including generation - * of new work orders, keeping track of the query exection state. - **/ -class QueryManagerSingleNode final : public QueryManagerBase { - public: - /** - * @brief Constructor. - * - * @param foreman_client_id The TMB client ID of the foreman thread. - * @param num_numa_nodes The number of NUMA nodes used by the system. - * @param query_handle The QueryHandle object for this query. - * @param catalog_database The CatalogDatabse used by the query. - * @param storage_manager The StorageManager used by the query. - * @param bus The TMB used for communication. - **/ - QueryManagerSingleNode(const tmb::client_id foreman_client_id, - const std::size_t num_numa_nodes, - QueryHandle *query_handle, - CatalogDatabaseLite *catalog_database, - StorageManager *storage_manager, - tmb::MessageBus *bus); - - ~QueryManagerSingleNode() override {} - - bool fetchNormalWorkOrders(const dag_node_index index) override; - - /** - * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage. - * - * @param start_operator_index Begin the search for the schedulable WorkOrder - * with the operator at this index. - * @param numa_node The next WorkOrder should preferably have its input(s) - * from this numa_node. This is a hint and not a binding requirement. - * - * @return A pointer to the WorkerMessage. If there's no WorkOrder to be - * executed, return NULL. - **/ - WorkerMessage* getNextWorkerMessage( - const dag_node_index start_operator_index, - const numa_node_id node_id = kAnyNUMANodeID); - - /** - * @brief Get a pointer to the QueryContext. - **/ - inline QueryContext* getQueryContextMutable() { - return query_context_.get(); - } - - private: - bool checkNormalExecutionOver(const dag_node_index index) const override { - return (checkAllDependenciesMet(index) && - !workorders_container_->hasNormalWorkOrder(index) && - query_exec_state_->getNumQueuedWorkOrders(index) == 0 && - query_exec_state_->hasDoneGenerationWorkOrders(index)); - } - - bool initiateRebuild(const dag_node_index index) override; - - bool checkRebuildOver(const dag_node_index index) const override { - return query_exec_state_->hasRebuildInitiated(index) && - !workorders_container_->hasRebuildWorkOrder(index) && - (query_exec_state_->getNumRebuildWorkOrders(index) == 0); - } - - /** - * @brief Get the rebuild WorkOrders for an operator. - * - * @note This function should be called only once, when all the normal - * WorkOrders generated by an operator finish their execution. - * - * @param index The index of the operator in the query plan DAG. - * @param container A pointer to a WorkOrdersContainer to be used to store the - * generated WorkOrders. - **/ - void getRebuildWorkOrders(const dag_node_index index, - WorkOrdersContainer *container); - - const tmb::client_id foreman_client_id_; - - StorageManager *storage_manager_; - tmb::MessageBus *bus_; - - std::unique_ptr<QueryContext> query_context_; - - std::unique_ptr<WorkOrdersContainer> workorders_container_; - - DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/README.md ---------------------------------------------------------------------- diff --git a/query_execution/README.md b/query_execution/README.md deleted file mode 100644 index 12e0f57..0000000 --- a/query_execution/README.md +++ /dev/null @@ -1,149 +0,0 @@ -# An Overview of Quickstep's Execution Engine - -## Types of threads -There are two kinds of threads in Quickstep - Foreman and Worker. The foreman -thread controls the query execution progress, finds schedulable work (called as -WorkOrder) and assigns (or schedules) it for execution to the Worker threads. -The Worker threads receive the WorkOrders and execute them. After execution they -send a completion message (or response message) back to Foreman. - -## High level functionality of Foreman -Foreman requests all the RelationalOperators in the physical query plan -represented as a DAG to give any schedulable work (in the form of WorkOrders). -While doing so, Foreman has to respect dependencies between operators. There are -two kinds of dependencies between operators - pipeline breaking (or blocking) -and pipeline non-breaking (or non-blocking). In the first case, the output of -the producer operator can't be pipelined to the consumer operator. In the second -case, the Foreman will facilitate the pipelining of the intermediate output -produced by the producer operator to the consumer operator. - -## Messages in execution engine - -### WorkerMessage -There are multiple types of WorkerMessage, each of which indicates the purpose -of the message. - -Foreman -> Worker : WorkerMessage which consists of the following things -- A pointer to the WorkOrder to be executed. The WorkOrder could be a normal -WorkOrder or a rebuild WorkOrder. A normal WorkOrder involves the invocation of -WorkOrder::execute() method which is overriden by all of the RelationalOperator -classes. A rebuild WorkOrder has one StorageBlock as input and calls a -rebuild() method on the block. More details about rebuild() can be found in the -storage module. -- The index of the relational operator in the query plan DAG that produced the -WorkOrder. - -### ForemanMessage -Multiple senders are possible for this message. There are multiple types of -ForemanMessages, each of which indicates the purpose of the message. - -Worker -> Foreman : ForemanMessage of types WorkOrderCompletion and -RebuildCompletion are sent after a Worker finishes executing a respective type -of WorkOrder. This message helps the Foreman track the progress of individual -operators as well as the whole query. - -Some relational operators and InsertDestination -> Foreman : ForemanMessage of -types DataPipeline and WorkOrdersAvailable. InsertDestination first determines -when an output block of a relational operator gets full. Once a block is full, -it streams the unique block ID of the filled block along with the index of the -relational operator that produced the block to Foreman with the message type -DataPipeline. Some operators which modify the block in place also send similar -messages to Foreman. - -### FeedbackMessage -This message is sent from Workers to the Foreman during a WorkOrder execution. - -In certain operators, e.g. TextScan (used for bulk loading data from text files) -and Sort, there is a communication between the relational operator and its -WorkOrders. In such cases, when a WorkOrder is under execution on a Worker -thread, a FeedbackMessage is sent from the WorkOrder via the Worker to Foreman. -Foreman relays this message to the relational operator that produced the sender -WorkOrder. The relational operator uses this message to update its internal -state to potentially generate newer WorkOrders. - -### PoisonMessage -This message is used to terminate a thread (i.e., Foreman and Worker), typically -when shutting down the Quickstep process. - -## How does the Foreman react after receiving various messages? -### WorkOrder completion message -* Update the book-keeping of pending WorkOrders per Worker and per operator. -* Fetch new WorkOrders if available for the operator of whose WorkOrder was -just executed. -* Update the state of an operator - the possible options are: - - Normal WorkOrders are still under execution - - All normal WorkOrders have finished execution and rebuild WorkOrders are yet - to be generated. - - All normal WorkOrders have finished execution, rebuild WorkOrders have been - generated and issued to Workers. - - All normal and rebuild WorkOrders have been executed AND all the dependency - operators for the given operator have finished execution, therefore the given - operator has finished its execution. -* Fetch the WorkOrders from the dependents of the given operator. - -### Rebuild WorkOrder completion message -* Update the book-keeping of pending WorkOrders per Worker and per operator. -* If all the rebuild WorkOrders have finished their execution, try to fetch the -WorkOrders of the dependent operators of the operator whose rebuild WorkOrder -was just executed. - -### Data pipeline message -* Find the consumer operators (i.e. operators which have a non -pipeline-breaking link) of the producer operator. -* Stream the block ID to the eligible consumer operators. -* Fetch new WorkOrders from these consumer operators which may have become -available because of the streaming of data. - -### WorkOrder available message -* Fetch new WorkOrders that may have become available. - -### Feedback message -* Relay the feedback message to a specified relational operator. The recipient -operator is specified in the header of the message. - -## Example -We look at a sample query to better describe the flow of messages - - -SELECT R.a, S.b from R, S where R.a = S.a and R.c < 20; - -This is an equi-join query which can be implemented using a hash join. We assume -that S is a larger relation and the build relation is the output of the -selection on R. - -The query execution plan involves the following operators: -* SelectOperator to filter R based on predicate R.c < 20 (We call the output as -R') -* BuildHashOperator to construct a hash table on R' -* HashJoinOperator to probe the hash table, where the probe relation is S -* DestroyHashTableOperator to destroy the hash table after the join is done -* Multiple DropTableOperators to destroy the temporaray relations produced as -output. - -R has two blocks with IDs as 1 and 2. S has two blocks with IDs as 3 and 4. -We assume that the SelectOperator produces one filled block and one partially -filled block as output. Note that in the query plan DAG, the link between -SelectOperator and BuildHashOperator allows streaming of data. The -HashJoinOperator's WorkOrder can't be generated unless all of the -BuildHashOperator's WorkOrders have finished their execution. The execution is -assumed to be performed by a single Worker thread. - -The following table describes the message exchange that happens during the -query excution. We primarily focus on three operators - Select, BuildHash and -HashJoin (probe). - -| Sender | Receiver | Message | Message Description | -|:-----------------:|----------|---------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | SelectWorkOrder on block 1. | -| InsertDestination | Foreman | ForemanMessage of type kDataPipeline | SelectWorkOrder on block 1 produced one fully filled block as output. The output block ID as pipelined from the InsertDestination to Foreman. Foreman relays this block ID to BuildHashOperator, which generates a WorkOrder which is ready to be scheduled. | -| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | SelectWorkOrder on block 1 completed. | -| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | SelectWorkOrder on block 2. | -| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | SelectWorkOrder on block 2 completed. As a result of this execution, a partially filled block of output was produced. | -| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | BuildHashWorkOrder on the fully filled block of R' | -| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | BuildHashWorkOrder execution complete. | -| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | BuildHashWorkOrder on the partially filled block of R' | -| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | BuildHashWorkOrder execution complete. | -| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | HashJoinWorkOrder for block 3 from S | -| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | HashJoinWorkOrder execution complete. | -| Foreman | Worker | WorkerMessage of type kWorkOrderMessage | HashJoinWorkOrder for block 4 from S | -| Worker | Foreman | ForemanMessage of type kWorkOrderCompletion | HashJoinWorkOrder execution complete. | - http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp deleted file mode 100644 index ed4bade..0000000 --- a/query_execution/Shiftboss.cpp +++ /dev/null @@ -1,400 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/Shiftboss.hpp" - -#include <cstddef> -#include <cstdlib> -#include <memory> -#include <string> -#include <unordered_map> -#include <utility> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/QueryContext.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryExecutionUtil.hpp" -#include "query_execution/WorkerMessage.hpp" -#include "relational_operators/RebuildWorkOrder.hpp" -#include "relational_operators/WorkOrderFactory.hpp" -#include "storage/InsertDestination.hpp" -#include "storage/StorageBlock.hpp" -#include "storage/StorageManager.hpp" -#include "threading/ThreadUtil.hpp" - -#include "glog/logging.h" - -#include "tmb/address.h" -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/message_style.h" -#include "tmb/tagged_message.h" - -using std::free; -using std::malloc; -using std::move; -using std::size_t; -using std::string; -using std::unique_ptr; -using std::vector; - -using tmb::AnnotatedMessage; -using tmb::MessageBus; -using tmb::TaggedMessage; - -namespace quickstep { - -class WorkOrder; - -void Shiftboss::run() { - if (cpu_id_ >= 0) { - // We can pin the shiftboss thread to a CPU if specified. - ThreadUtil::BindToCPU(cpu_id_); - } - - processShiftbossRegistrationResponseMessage(); - - for (;;) { - // Receive() is a blocking call, causing this thread to sleep until next - // message is received. - AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true)); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') received the typed '" << annotated_message.tagged_message.message_type() - << "' message from client " << annotated_message.sender; - switch (annotated_message.tagged_message.message_type()) { - case kQueryInitiateMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::QueryInitiateMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context()); - break; - } - case kWorkOrderMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::WorkOrderMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const std::size_t query_id = proto.query_id(); - DCHECK_EQ(1u, query_contexts_.count(query_id)); - - WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(), - shiftboss_index_, - &database_cache_, - query_contexts_[query_id].get(), - storage_manager_, - shiftboss_client_id_, - bus_); - - unique_ptr<WorkerMessage> worker_message( - WorkerMessage::WorkOrderMessage(work_order, proto.operator_index())); - - TaggedMessage worker_tagged_message(worker_message.get(), - sizeof(*worker_message), - kWorkOrderMessage); - - const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage - << "') from Foreman to worker " << worker_index; - - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - workers_->getClientID(worker_index), - move(worker_tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kInitiateRebuildMessage: { - // Construct rebuild work orders, and send back their number to - // 'ForemanDistributed'. - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::InitiateRebuildMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processInitiateRebuildMessage(proto.query_id(), - proto.operator_index(), - proto.insert_destination_index(), - proto.relation_id()); - break; - } - case kCatalogRelationNewBlockMessage: // Fall through. - case kDataPipelineMessage: - case kWorkOrderFeedbackMessage: - case kWorkOrderCompleteMessage: - case kRebuildWorkOrderCompleteMessage: { - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded typed '" << annotated_message.tagged_message.message_type() - << "' message from Worker with TMB client ID '" << annotated_message.sender - << "' to Foreman with TMB client ID " << foreman_client_id_; - - DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(annotated_message.tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kQueryTeardownMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::QueryTeardownMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - query_contexts_.erase(proto.query_id()); - break; - } - case kSaveQueryResultMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::SaveQueryResultMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - for (int i = 0; i < proto.blocks_size(); ++i) { - storage_manager_->saveBlockOrBlob(proto.blocks(i)); - } - - // Clean up query execution states, i.e., QueryContext. - query_contexts_.erase(proto.query_id()); - - serialization::SaveQueryResultResponseMessage proto_response; - proto_response.set_query_id(proto.query_id()); - proto_response.set_relation_id(proto.relation_id()); - proto_response.set_cli_id(proto.cli_id()); - proto_response.set_shiftboss_index(shiftboss_index_); - - const size_t proto_response_length = proto_response.ByteSize(); - char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); - CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length)); - - TaggedMessage message_response(static_cast<const void*>(proto_response_bytes), - proto_response_length, - kSaveQueryResultResponseMessage); - free(proto_response_bytes); - - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage - << "') to Foreman with TMB client ID " << foreman_client_id_; - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(message_response)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kPoisonMessage: { - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded PoisonMessage (typed '" << kPoisonMessage - << "') from Foreman to all workers"; - - tmb::MessageStyle broadcast_style; - broadcast_style.Broadcast(true); - - const MessageBus::SendStatus send_status = - bus_->Send(shiftboss_client_id_, - worker_addresses_, - broadcast_style, - move(annotated_message.tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - return; - } - default: { - LOG(FATAL) << "Unknown TMB message type"; - } - } - } -} - -size_t Shiftboss::getSchedulableWorker() { - const size_t num_workers = workers_->getNumWorkers(); - - size_t curr_worker = start_worker_index_; - for (;;) { - if (workers_->getNumQueuedWorkOrders(curr_worker) < max_msgs_per_worker_) { - start_worker_index_ = (curr_worker + 1) % num_workers; - // TODO(zuyu): workers_->incrementNumQueuedWorkOrders(curr_worker); - // But we need a WorkOrder queue first. - return curr_worker; - } - - curr_worker = (curr_worker + 1) % num_workers; - } -} - -void Shiftboss::registerWithForeman() { - tmb::Address all_addresses; - all_addresses.All(true); - - tmb::MessageStyle style; - - serialization::ShiftbossRegistrationMessage proto; - proto.set_work_order_capacity(getWorkOrderCapacity()); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kShiftbossRegistrationMessage); - free(proto_bytes); - - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage - << "') to all"; - tmb::MessageBus::SendStatus send_status = - bus_->Send(shiftboss_client_id_, all_addresses, style, move(message)); - DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); -} - -void Shiftboss::processShiftbossRegistrationResponseMessage() { - AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true)); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type()); - - foreman_client_id_ = annotated_message.sender; - DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') received the typed '" << kShiftbossRegistrationResponseMessage - << "' message from ForemanDistributed with client " << foreman_client_id_; - - serialization::ShiftbossRegistrationResponseMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - shiftboss_index_ = proto.shiftboss_index(); - - // Forward this message to Workers regarding <shiftboss_index_>. - QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_, - worker_addresses_, - move(annotated_message.tagged_message), - bus_); -} - -void Shiftboss::processQueryInitiateMessage( - const std::size_t query_id, - const serialization::CatalogDatabase &catalog_database_cache_proto, - const serialization::QueryContext &query_context_proto) { - database_cache_.update(catalog_database_cache_proto); - - auto query_context = std::make_unique<QueryContext>( - query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_); - query_contexts_.emplace(query_id, move(query_context)); - - serialization::QueryInitiateResponseMessage proto; - proto.set_query_id(query_id); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message_response(static_cast<const void*>(proto_bytes), - proto_length, - kQueryInitiateResponseMessage); - free(proto_bytes); - - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage - << "') to Foreman with TMB client ID " << foreman_client_id_; - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(message_response)); - CHECK(send_status == MessageBus::SendStatus::kOK); -} - -void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, - const std::size_t op_index, - const QueryContext::insert_destination_id dest_index, - const relation_id rel_id) { - DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); - - DCHECK_EQ(1u, query_contexts_.count(query_id)); - InsertDestination *insert_destination = query_contexts_[query_id]->getInsertDestination(dest_index); - DCHECK(insert_destination != nullptr); - - vector<MutableBlockReference> partially_filled_block_refs; - insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs); - - serialization::InitiateRebuildResponseMessage proto; - proto.set_query_id(query_id); - proto.set_operator_index(op_index); - proto.set_num_rebuild_work_orders(partially_filled_block_refs.size()); - proto.set_shiftboss_index(shiftboss_index_); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message_response(static_cast<const void*>(proto_bytes), - proto_length, - kInitiateRebuildResponseMessage); - free(proto_bytes); - - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage - << "') to Foreman with TMB client ID " << foreman_client_id_; - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(message_response)); - CHECK(send_status == MessageBus::SendStatus::kOK); - - for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) { - // NOTE(zuyu): Worker releases the memory after the execution of - // RebuildWorkOrder on the Worker. - WorkOrder *rebuild_work_order = - new RebuildWorkOrder(query_id, - move(partially_filled_block_refs[i]), - op_index, - rel_id, - shiftboss_client_id_, - bus_); - - unique_ptr<WorkerMessage> worker_message( - WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index)); - - TaggedMessage worker_tagged_message(worker_message.get(), - sizeof(*worker_message), - kRebuildWorkOrderMessage); - - const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage - << "') to worker " << worker_index; - - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - workers_->getClientID(worker_index), - move(worker_tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - } -} - -} // namespace quickstep