Introduced QueryManager and its test (#152) Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d3725840 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d3725840 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d3725840
Branch: refs/heads/master Commit: d372584096651be14721ab1ffe1696dfdb70382b Parents: 0fd8c03 Author: Harshad Deshmukh <[email protected]> Authored: Fri Apr 15 17:36:15 2016 -0500 Committer: Zuyu ZHANG <[email protected]> Committed: Fri Apr 15 17:36:15 2016 -0500 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 58 +- query_execution/Foreman.cpp | 4 +- query_execution/QueryManager.cpp | 469 ++++++++++ query_execution/QueryManager.hpp | 371 ++++++++ query_execution/WorkOrdersContainer.hpp | 4 +- query_execution/WorkerMessage.hpp | 12 +- query_execution/tests/QueryManager_unittest.cpp | 933 +++++++++++++++++++ storage/InsertDestination.hpp | 1 + 8 files changed, 1842 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3725840/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index b682618..5887237 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -29,9 +29,10 @@ add_library(quickstep_queryexecution_QueryContext_proto add_library(quickstep_queryexecution_QueryExecutionMessages_proto ${queryexecution_QueryExecutionMessages_proto_srcs} ${queryexecution_QueryExecutionMessages_proto_hdrs}) -add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryExecutionState.hpp) +add_library(quickstep_queryexecution_QueryExecutionState ../empty_src.cpp QueryExecutionState.hpp) add_library(quickstep_queryexecution_QueryExecutionTypedefs ../empty_src.cpp QueryExecutionTypedefs.hpp) add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryExecutionUtil.hpp) +add_library(quickstep_queryexecution_QueryManager QueryManager.cpp QueryManager.hpp) add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp) add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp) add_library(quickstep_queryexecution_WorkerDirectory ../empty_src.cpp WorkerDirectory.hpp) @@ -112,6 +113,26 @@ target_link_libraries(quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_QueryExecutionTypedefs quickstep_utility_Macros tmb) +target_link_libraries(quickstep_queryexecution_QueryManager + quickstep_catalog_CatalogDatabase + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_catalog_PartitionScheme + quickstep_queryexecution_QueryContext + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionState + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_WorkOrdersContainer + quickstep_queryexecution_WorkerMessage + quickstep_queryoptimizer_QueryHandle + quickstep_relationaloperators_RebuildWorkOrder + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_storage_StorageBlock + quickstep_storage_StorageBlockInfo + quickstep_utility_DAG + quickstep_utility_Macros + tmb) target_link_libraries(quickstep_queryexecution_WorkOrdersContainer glog quickstep_relationaloperators_WorkOrder @@ -147,6 +168,7 @@ target_link_libraries(quickstep_queryexecution quickstep_queryexecution_QueryExecutionState quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_QueryManager quickstep_queryexecution_WorkOrdersContainer quickstep_queryexecution_Worker quickstep_queryexecution_WorkerDirectory @@ -154,7 +176,7 @@ target_link_libraries(quickstep_queryexecution quickstep_queryexecution_WorkerSelectionPolicy) # Tests: add_executable(Foreman_unittest - "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp") + "${CMAKE_CURRENT_SOURCE_DIR}/tests/Foreman_unittest.cpp") target_link_libraries(Foreman_unittest glog gtest @@ -183,6 +205,38 @@ target_link_libraries(Foreman_unittest tmb) add_test(Foreman_unittest Foreman_unittest) +add_executable(QueryManager_unittest + "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp") +target_link_libraries(QueryManager_unittest + glog + gtest + gtest_main + quickstep_catalog_CatalogDatabase + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_QueryContext + quickstep_queryexecution_QueryContext_proto + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionState + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryManager + quickstep_queryexecution_WorkOrdersContainer + quickstep_queryexecution_WorkerDirectory + quickstep_queryexecution_WorkerMessage + quickstep_queryoptimizer_QueryHandle + quickstep_queryoptimizer_QueryPlan + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_storage_InsertDestination + quickstep_storage_InsertDestination_proto + quickstep_storage_StorageBlock + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageManager + quickstep_utility_DAG + quickstep_utility_Macros + tmb) +add_test(QueryManager_unittest QueryManager_unittest) + add_executable(WorkOrdersContainer_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/WorkOrdersContainer_unittest.cpp") target_link_libraries(WorkOrdersContainer_unittest http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3725840/query_execution/Foreman.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp index 2b2581a..304c429 100644 --- a/query_execution/Foreman.cpp +++ b/query_execution/Foreman.cpp @@ -371,9 +371,9 @@ WorkerMessage* Foreman::getNextWorkerMessage( void Foreman::sendWorkerMessage(const std::size_t worker_thread_index, const WorkerMessage &message) { message_type_id type; - if (message.getType() == WorkerMessage::kRebuildWorkOrder) { + if (message.getType() == WorkerMessage::WorkerMessageType::kRebuildWorkOrder) { type = kRebuildWorkOrderMessage; - } else if (message.getType() == WorkerMessage::kWorkOrder) { + } else if (message.getType() == WorkerMessage::WorkerMessageType::kWorkOrder) { type = kWorkOrderMessage; } else { FATAL_ERROR("Invalid WorkerMessageType"); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3725840/query_execution/QueryManager.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp new file mode 100644 index 0000000..02c5d4c --- /dev/null +++ b/query_execution/QueryManager.cpp @@ -0,0 +1,469 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/QueryManager.hpp" + +#include <cstddef> +#include <memory> +#include <utility> +#include <vector> + +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "catalog/PartitionScheme.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/WorkerMessage.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "relational_operators/RebuildWorkOrder.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" + +#include "glog/logging.h" + +using std::pair; + +namespace quickstep { +class CatalogDatabaseLite; +class StorageManager; +} + +namespace quickstep { + +QueryManager::QueryManager(const tmb::client_id foreman_client_id, + const std::size_t num_numa_nodes, + QueryHandle *query_handle, + CatalogDatabaseLite *catalog_database, + StorageManager *storage_manager, + tmb::MessageBus *bus) + : foreman_client_id_(foreman_client_id), + query_id_(DCHECK_NOTNULL(query_handle)->query_id()), + catalog_database_(DCHECK_NOTNULL(catalog_database)), + storage_manager_(DCHECK_NOTNULL(storage_manager)), + bus_(DCHECK_NOTNULL(bus)) { + DCHECK(query_handle->getQueryPlanMutable() != nullptr); + query_dag_ = query_handle->getQueryPlanMutable()->getQueryPlanDAGMutable(); + DCHECK(query_dag_ != nullptr); + + const dag_node_index num_operators_in_dag = query_dag_->size(); + + output_consumers_.resize(num_operators_in_dag); + blocking_dependencies_.resize(num_operators_in_dag); + + query_exec_state_.reset(new QueryExecutionState(num_operators_in_dag)); + workorders_container_.reset( + new WorkOrdersContainer(num_operators_in_dag, num_numa_nodes)); + + query_context_.reset(new QueryContext(query_handle->getQueryContextProto(), + *catalog_database_, + storage_manager_, + foreman_client_id_, + bus_)); + + for (dag_node_index node_index = 0; + node_index < num_operators_in_dag; + ++node_index) { + const QueryContext::insert_destination_id insert_destination_index = + query_dag_->getNodePayload(node_index).getInsertDestinationID(); + if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) { + // Rebuild is necessary whenever InsertDestination is present. + query_exec_state_->setRebuildRequired(node_index); + query_exec_state_->setRebuildStatus(node_index, 0, false); + } + + for (const pair<dag_node_index, bool> &dependent_link : + query_dag_->getDependents(node_index)) { + const dag_node_index dependent_op_index = dependent_link.first; + if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) { + // The link is not a pipeline-breaker. Streaming of blocks is possible + // between these two operators. + output_consumers_[node_index].push_back(dependent_op_index); + } else { + // The link is a pipeline-breaker. Streaming of blocks is not possible + // between these two operators. + blocking_dependencies_[dependent_op_index].push_back(node_index); + } + } + } + + // Collect all the workorders from all the relational operators in the DAG. + for (dag_node_index index = 0; index < num_operators_in_dag; ++index) { + if (checkAllBlockingDependenciesMet(index)) { + query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet(); + processOperator(index, false); + } + } +} + +WorkerMessage* QueryManager::getNextWorkerMessage( + const dag_node_index start_operator_index, const int numa_node) { + // Default policy: Operator with lowest index first. + WorkOrder *work_order = nullptr; + size_t num_operators_checked = 0; + for (dag_node_index index = start_operator_index; + num_operators_checked < query_dag_->size(); + index = (index + 1) % query_dag_->size(), ++num_operators_checked) { + if (query_exec_state_->hasExecutionFinished(index)) { + continue; + } + if (numa_node != -1) { + // First try to get a normal WorkOrder from the specified NUMA node. + work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node); + if (work_order != nullptr) { + // A WorkOrder found on the given NUMA node. + query_exec_state_->incrementNumQueuedWorkOrders(index); + return WorkerMessage::WorkOrderMessage(work_order, index); + } else { + // Normal workorder not found on this node. Look for a rebuild workorder + // on this NUMA node. + work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node); + if (work_order != nullptr) { + return WorkerMessage::RebuildWorkOrderMessage(work_order, index); + } + } + } + // Either no workorder found on the given NUMA node, or numa_node is -1. + // Try to get a normal WorkOrder from other NUMA nodes. + work_order = workorders_container_->getNormalWorkOrder(index); + if (work_order != nullptr) { + query_exec_state_->incrementNumQueuedWorkOrders(index); + return WorkerMessage::WorkOrderMessage(work_order, index); + } else { + // Normal WorkOrder not found, look for a RebuildWorkOrder. + work_order = workorders_container_->getRebuildWorkOrder(index); + if (work_order != nullptr) { + return WorkerMessage::RebuildWorkOrderMessage(work_order, index); + } + } + } + // No WorkOrders available right now. + return nullptr; +} + +QueryManager::QueryStatusCode QueryManager::processMessage( + const TaggedMessage &tagged_message) { + dag_node_index op_index; + switch (tagged_message.message_type()) { + case kWorkOrderCompleteMessage: { + serialization::WorkOrderCompletionMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + + op_index = proto.operator_index(); + processWorkOrderCompleteMessage(proto.operator_index()); + break; + } + case kRebuildWorkOrderCompleteMessage: { + serialization::WorkOrderCompletionMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + + op_index = proto.operator_index(); + processRebuildWorkOrderCompleteMessage(proto.operator_index()); + break; + } + case kCatalogRelationNewBlockMessage: { + serialization::CatalogRelationNewBlockMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + + const block_id block = proto.block_id(); + + CatalogRelation *relation = + static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id()); + relation->addBlock(block); + + if (proto.has_partition_id()) { + relation->getPartitionSchemeMutable()->addBlockToPartition( + proto.partition_id(), block); + } + return QueryStatusCode::kNone; + } + case kDataPipelineMessage: { + // Possible message senders include InsertDestinations and some + // operators which modify existing blocks. + serialization::DataPipelineMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + + op_index = proto.operator_index(); + processDataPipelineMessage(proto.operator_index(), + proto.block_id(), + proto.relation_id()); + break; + } + case kWorkOrdersAvailableMessage: { + serialization::WorkOrdersAvailableMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + + op_index = proto.operator_index(); + + // Check if new work orders are available. + fetchNormalWorkOrders(op_index); + + // Dispatch the WorkerMessages to the workers. We prefer to start the search + // for the schedulable WorkOrders beginning from 'op_index'. The first + // candidate worker to receive the next WorkOrder is the one that sent the + // response message to Foreman. + // TODO(zuyu): Improve the data locality for the next WorkOrder. + break; + } + case kWorkOrderFeedbackMessage: { + WorkOrder::FeedbackMessage msg( + const_cast<void *>(tagged_message.message()), + tagged_message.message_bytes()); + + op_index = msg.header().rel_op_index; + processFeedbackMessage(msg); + break; + } + default: + LOG(FATAL) << "Unknown message type found in QueryManager"; + } + + if (query_exec_state_->hasExecutionFinished(op_index)) { + return QueryStatusCode::kOperatorExecuted; + } + + // As kQueryExecuted takes precedence over kOperatorExecuted, we check again. + if (query_exec_state_->hasQueryExecutionFinished()) { + return QueryStatusCode::kQueryExecuted; + } + + return QueryStatusCode::kNone; +} + +void QueryManager::processFeedbackMessage( + const WorkOrder::FeedbackMessage &msg) { + RelationalOperator *op = + query_dag_->getNodePayloadMutable(msg.header().rel_op_index); + op->receiveFeedbackMessage(msg); +} + +void QueryManager::processWorkOrderCompleteMessage( + const dag_node_index op_index) { + query_exec_state_->decrementNumQueuedWorkOrders(op_index); + + // Check if new work orders are available and fetch them if so. + fetchNormalWorkOrders(op_index); + + if (checkRebuildRequired(op_index)) { + if (checkNormalExecutionOver(op_index)) { + if (!checkRebuildInitiated(op_index)) { + if (initiateRebuild(op_index)) { + // Rebuild initiated and completed right away. + markOperatorFinished(op_index); + } else { + // Rebuild under progress. + } + } else if (checkRebuildOver(op_index)) { + // Rebuild was under progress and now it is over. + markOperatorFinished(op_index); + } + } else { + // Normal execution under progress for this operator. + } + } else if (checkOperatorExecutionOver(op_index)) { + // Rebuild not required for this operator and its normal execution is + // complete. + markOperatorFinished(op_index); + } + + for (const pair<dag_node_index, bool> &dependent_link : + query_dag_->getDependents(op_index)) { + const dag_node_index dependent_op_index = dependent_link.first; + if (checkAllBlockingDependenciesMet(dependent_op_index)) { + // Process the dependent operator (of the operator whose WorkOrder + // was just executed) for which all the dependencies have been met. + processOperator(dependent_op_index, true); + } + } +} + +void QueryManager::processRebuildWorkOrderCompleteMessage(const dag_node_index op_index) { + query_exec_state_->decrementNumRebuildWorkOrders(op_index); + + if (checkRebuildOver(op_index)) { + markOperatorFinished(op_index); + + for (const pair<dag_node_index, bool> &dependent_link : + query_dag_->getDependents(op_index)) { + const dag_node_index dependent_op_index = dependent_link.first; + if (checkAllBlockingDependenciesMet(dependent_op_index)) { + processOperator(dependent_op_index, true); + } + } + } +} + +void QueryManager::processOperator(const dag_node_index index, + const bool recursively_check_dependents) { + if (fetchNormalWorkOrders(index)) { + // Fetched work orders. Return to wait for the generated work orders to + // execute, and skip the execution-finished checks. + return; + } + + if (checkNormalExecutionOver(index)) { + if (checkRebuildRequired(index)) { + if (!checkRebuildInitiated(index)) { + // Rebuild hasn't started, initiate it. + if (initiateRebuild(index)) { + // Rebuild initiated and completed right away. + markOperatorFinished(index); + } else { + // Rebuild WorkOrders have been generated. + return; + } + } else if (checkRebuildOver(index)) { + // Rebuild had been initiated and it is over. + markOperatorFinished(index); + } + } else { + // Rebuild is not required and normal execution over, mark finished. + markOperatorFinished(index); + } + // If we reach here, that means the operator has been marked as finished. + if (recursively_check_dependents) { + for (const pair<dag_node_index, bool> &dependent_link : + query_dag_->getDependents(index)) { + const dag_node_index dependent_op_index = dependent_link.first; + if (checkAllBlockingDependenciesMet(dependent_op_index)) { + processOperator(dependent_op_index, true); + } + } + } + } +} + +void QueryManager::processDataPipelineMessage(const dag_node_index op_index, + const block_id block, + const relation_id rel_id) { + for (const dag_node_index consumer_index : + output_consumers_[op_index]) { + // Feed the streamed block to the consumer. Note that 'output_consumers_' + // only contain those dependents of operator with index = op_index which are + // eligible to receive streamed input. + query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id); + // Because of the streamed input just fed, check if there are any new + // WorkOrders available and if so, fetch them. + fetchNormalWorkOrders(consumer_index); + } +} + +bool QueryManager::fetchNormalWorkOrders(const dag_node_index index) { + bool generated_new_workorders = false; + if (!query_exec_state_->hasDoneGenerationWorkOrders(index)) { + // Do not fetch any work units until all blocking dependencies are met. + // The releational operator is not aware of blocking dependencies for + // uncorrelated scalar queries. + if (!checkAllBlockingDependenciesMet(index)) { + return false; + } + const size_t num_pending_workorders_before = + workorders_container_->getNumNormalWorkOrders(index); + const bool done_generation = + query_dag_->getNodePayloadMutable(index)->getAllWorkOrders(workorders_container_.get(), + query_context_.get(), + storage_manager_, + foreman_client_id_, + bus_); + if (done_generation) { + query_exec_state_->setDoneGenerationWorkOrders(index); + } + + // TODO(shoban): It would be a good check to see if operator is making + // useful progress, i.e., the operator either generates work orders to + // execute or still has pending work orders executing. However, this will not + // work if Foreman polls operators without feeding data. This check can be + // enabled, if Foreman is refactored to call getAllWorkOrders() only when + // pending work orders are completed or new input blocks feed. + + generated_new_workorders = + (num_pending_workorders_before < + workorders_container_->getNumNormalWorkOrders(index)); + } + return generated_new_workorders; +} + +void QueryManager::markOperatorFinished(const dag_node_index index) { + query_exec_state_->setExecutionFinished(index); + + RelationalOperator *op = query_dag_->getNodePayloadMutable(index); + op->updateCatalogOnCompletion(); + + const relation_id output_rel = op->getOutputRelationID(); + for (const pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) { + const dag_node_index dependent_op_index = dependent_link.first; + RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index); + // Signal dependent operator that current operator is done feeding input blocks. + if (output_rel >= 0) { + dependent_op->doneFeedingInputBlocks(output_rel); + } + if (checkAllBlockingDependenciesMet(dependent_op_index)) { + dependent_op->informAllBlockingDependenciesMet(); + } + } +} + +bool QueryManager::initiateRebuild(const dag_node_index index) { + DCHECK(!workorders_container_->hasRebuildWorkOrder(index)); + DCHECK(checkRebuildRequired(index)); + DCHECK(!checkRebuildInitiated(index)); + + getRebuildWorkOrders(index, workorders_container_.get()); + + query_exec_state_->setRebuildStatus( + index, workorders_container_->getNumRebuildWorkOrders(index), true); + + return (query_exec_state_->getNumRebuildWorkOrders(index) == 0); +} + +void QueryManager::getRebuildWorkOrders(const dag_node_index index, + WorkOrdersContainer *container) { + const RelationalOperator &op = query_dag_->getNodePayload(index); + const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID(); + + if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) { + return; + } + + std::vector<MutableBlockReference> partially_filled_block_refs; + + DCHECK(query_context_ != nullptr); + InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index); + DCHECK(insert_destination != nullptr); + + insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs); + + for (std::vector<MutableBlockReference>::size_type i = 0; + i < partially_filled_block_refs.size(); + ++i) { + container->addRebuildWorkOrder( + new RebuildWorkOrder(std::move(partially_filled_block_refs[i]), + index, + op.getOutputRelationID(), + foreman_client_id_, + bus_), + index); + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3725840/query_execution/QueryManager.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManager.hpp b/query_execution/QueryManager.hpp new file mode 100644 index 0000000..47f54c5 --- /dev/null +++ b/query_execution/QueryManager.hpp @@ -0,0 +1,371 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_ + +#include <cstddef> +#include <memory> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/QueryExecutionState.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "utility/DAG.hpp" +#include "utility/Macros.hpp" + +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +namespace quickstep { + +class CatalogDatabaseLite; +class ForemanMessage; +class QueryHandle; +class StorageManager; +class WorkerMessage; + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief A class that manages the execution of a query including generation + * of new work orders, keeping track of the query exection state. + **/ +class QueryManager { + public: + typedef DAG<RelationalOperator, bool>::size_type_nodes dag_node_index; + + /** + * @brief Return codes for processMessage() function. + * + * @note When both operator and query get executed, kQueryExecuted takes + * precedence over kOperatorExecuted. + **/ + enum class QueryStatusCode { + kOperatorExecuted = 0, // An operator in the query finished execution. + kQueryExecuted, // The query got executed. + kNone // None of the above. + }; + + /** + * @brief Constructor. + * + * @param foreman_client_id The TMB client ID of the foreman thread. + * @param num_numa_nodes The number of NUMA nodes used by the system. + * @param query_handle The QueryHandle object for this query. + * @param catalog_database The CatalogDatabse used by the query. + * @param storage_manager The StorageManager used by the query. + * @param bus The TMB used for communication. + **/ + QueryManager(const tmb::client_id foreman_client_id, + const std::size_t num_numa_nodes, + QueryHandle *query_handle, + CatalogDatabaseLite *catalog_database, + StorageManager *storage_manager, + tmb::MessageBus *bus); + + /** + * @brief Get the next workorder to be excuted, wrapped in a WorkerMessage. + * + * @param start_operator_index Begin the search for the schedulable WorkOrder + * with the operator at this index. + * @param numa_node The next WorkOrder should preferably have its input(s) + * from this numa_node. This is a hint and not a binding requirement. + * + * @return A pointer to the WorkerMessage. If there's no WorkOrder to be + * executed, return NULL. + **/ + WorkerMessage *getNextWorkerMessage( + const dag_node_index start_operator_index, + const numa_node_id node_id = -1); + + /** + * @brief Process a message sent to the QueryManager. + * + * @param tagged_message TaggedMessage sent to the QueryManager. + * + * @return QueryStatusCode as determined after the message is processed. + **/ + QueryStatusCode processMessage(const TaggedMessage &tagged_message); + + /** + * @brief Get the QueryExecutionState for this query. + **/ + inline const QueryExecutionState& getQueryExecutionState() const { + return *query_exec_state_; + } + + /** + * @brief Get a pointer to the QueryContext. + **/ + inline QueryContext* getQueryContextMutable() { + return query_context_.get(); + } + + private: + /** + * @brief Process the received WorkOrder complete message. + * + * @param node_index The index of the specified operator node in the query DAG + * for the completed WorkOrder. + **/ + void processWorkOrderCompleteMessage(const dag_node_index op_index); + + /** + * @brief Process the received RebuildWorkOrder complete message. + * + * @param node_index The index of the specified operator node in the query DAG + * for the completed RebuildWorkOrder. + **/ + void processRebuildWorkOrderCompleteMessage(const dag_node_index op_index); + + /** + * @brief Process a current relational operator: Get its workorders and store + * them in the WorkOrdersContainer for this query. If the operator can + * be marked as done, do so. + * + * @param index The index of the relational operator to be processed in the + * query plan DAG. + * @param recursively_check_dependents If an operator is done, should we + * call processOperator on its dependents recursively. + **/ + void processOperator(const dag_node_index index, + const bool recursively_check_dependents); + + /** + * @brief Process the received data pipeline message. + * + * @param node_index The index of the specified operator node in the query DAG + * for the pipelining block. + * @param block The block id. + * @param rel_id The ID of the relation that produced 'block'. + **/ + void processDataPipelineMessage(const dag_node_index op_index, + const block_id block, + const relation_id rel_id); + + /** + * @brief Process the received work order feedback message and notify + * relational operator. + * + * @param message Feedback message from work order. + **/ + void processFeedbackMessage(const WorkOrder::FeedbackMessage &message); + + /** + * @brief Fetch all work orders currently available in relational operator and + * store them internally. + * + * @param index The index of the relational operator to be processed in the + * query plan DAG. + * + * @return Whether any work order was generated by op. + **/ + bool fetchNormalWorkOrders(const dag_node_index index); + + /** + * @brief This function does the following things: + * 1. Mark the given relational operator as "done". + * 2. For all the dependents of this operator, check if all of their + * blocking dependencies are met. If so inform them that the blocking + * dependencies are met. + * 3. Check if the given operator is done producing output. If it's + * done, inform the dependents that they won't receive input anymore + * from the given operator. + * + * @param index The index of the given relational operator in the DAG. + **/ + void markOperatorFinished(const dag_node_index index); + + /** + * @brief Check if all the dependencies of the node at specified index have + * finished their execution. + * + * @note This function's true return value is a pre-requisite for calling + * getRebuildWorkOrders() + * + * @param node_index The index of the specified node in the query DAG. + * + * @return True if all the dependencies have finished their execution. False + * otherwise. + **/ + inline bool checkAllDependenciesMet(const dag_node_index node_index) const { + for (const dag_node_index dependency_index : + query_dag_->getDependencies(node_index)) { + // If at least one of the dependencies is not met, return false. + if (!query_exec_state_->hasExecutionFinished(dependency_index)) { + return false; + } + } + return true; + } + + /** + * @brief Check if all the blocking dependencies of the node at specified + * index have finished their execution. + * + * @note A blocking dependency is the one which is pipeline breaker. Output of + * a dependency can't be streamed to its dependent if the link between + * them is pipeline breaker. + * + * @param node_index The index of the specified node in the query DAG. + * + * @return True if all the blocking dependencies have finished their + * execution. False otherwise. + **/ + inline bool checkAllBlockingDependenciesMet( + const dag_node_index node_index) const { + for (const dag_node_index blocking_dependency_index : + blocking_dependencies_[node_index]) { + if (!query_exec_state_->hasExecutionFinished( + blocking_dependency_index)) { + return false; + } + } + return true; + } + + /** + * @brief Check if the execution of the given operator is over. + * + * @param index The index of the given operator in the DAG. + * + * @return True if the execution of the given operator is over, false + * otherwise. + **/ + inline bool checkOperatorExecutionOver(const dag_node_index index) const { + if (checkRebuildRequired(index)) { + return (checkNormalExecutionOver(index) && checkRebuildOver(index)); + } else { + return checkNormalExecutionOver(index); + } + } + + /** + * @brief Check if the given operator's normal execution is over. + * + * @note The conditions for a given operator's normal execution to get over: + * 1. All of its normal (i.e. non rebuild) WorkOrders have finished + * execution. + * 2. The operator is done generating work orders. + * 3. All of the dependencies of the given operator have been met. + * + * @param index The index of the given operator in the DAG. + * + * @return True if the normal execution of the given operator is over, false + * otherwise. + **/ + inline bool checkNormalExecutionOver(const dag_node_index index) const { + return (checkAllDependenciesMet(index) && + !workorders_container_->hasNormalWorkOrder(index) && + query_exec_state_->getNumQueuedWorkOrders(index) == 0 && + query_exec_state_->hasDoneGenerationWorkOrders(index)); + } + + /** + * @brief Check if the rebuild operation is required for a given operator. + * + * @param index The index of the given operator in the DAG. + * + * @return True if the rebuild operation is required, false otherwise. + **/ + inline bool checkRebuildRequired(const dag_node_index index) const { + return query_exec_state_->isRebuildRequired(index); + } + + /** + * @brief Check if the rebuild operation for a given operator is over. + * + * @param index The index of the given operator in the DAG. + * + * @return True if the rebuild operation is over, false otherwise. + **/ + inline bool checkRebuildOver(const dag_node_index index) const { + return query_exec_state_->hasRebuildInitiated(index) && + !workorders_container_->hasRebuildWorkOrder(index) && + (query_exec_state_->getNumRebuildWorkOrders(index) == 0); + } + + /** + * @brief Check if the rebuild operation for a given operator has been + * initiated. + * + * @param index The index of the given operator in the DAG. + * + * @return True if the rebuild operation has been initiated, false otherwise. + **/ + inline bool checkRebuildInitiated(const dag_node_index index) const { + return query_exec_state_->hasRebuildInitiated(index); + } + + /** + * @brief Initiate the rebuild process for partially filled blocks generated + * during the execution of the given operator. + * + * @param index The index of the given operator in the DAG. + * + * @return True if the rebuild is over immediately, i.e. the operator didn't + * generate any rebuild WorkOrders, false otherwise. + **/ + bool initiateRebuild(const dag_node_index index); + + /** + * @brief Get the rebuild WorkOrders for an operator. + * + * @note This function should be called only once, when all the normal + * WorkOrders generated by an operator finish their execution. + * + * @param index The index of the operator in the query plan DAG. + * @param container A pointer to a WorkOrdersContainer to be used to store the + * generated WorkOrders. + **/ + void getRebuildWorkOrders(const dag_node_index index, + WorkOrdersContainer *container); + + const tmb::client_id foreman_client_id_; + const std::size_t query_id_; + + CatalogDatabaseLite *catalog_database_; + StorageManager *storage_manager_; + tmb::MessageBus *bus_; + + DAG<RelationalOperator, bool> *query_dag_; + + std::unique_ptr<QueryContext> query_context_; + + // For all nodes, store their receiving dependents. + std::vector<std::vector<dag_node_index>> output_consumers_; + + // For all nodes, store their pipeline breaking dependencies (if any). + std::vector<std::vector<dag_node_index>> blocking_dependencies_; + + std::unique_ptr<QueryExecutionState> query_exec_state_; + + std::unique_ptr<WorkOrdersContainer> workorders_container_; + + DISALLOW_COPY_AND_ASSIGN(QueryManager); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3725840/query_execution/WorkOrdersContainer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp index b4a0a03..eb9aedd 100644 --- a/query_execution/WorkOrdersContainer.hpp +++ b/query_execution/WorkOrdersContainer.hpp @@ -48,8 +48,8 @@ class WorkOrdersContainer { * @param num_numa_nodes Number of NUMA nodes in the system. **/ WorkOrdersContainer(const std::size_t num_operators, - const std::size_t num_numa_nodes) - : num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) { + const std::size_t num_numa_nodes) + : num_operators_(num_operators), num_numa_nodes_(num_numa_nodes) { DEBUG_ASSERT(num_operators != 0); for (std::size_t op = 0; op < num_operators; ++op) { normal_workorders_.push_back( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3725840/query_execution/WorkerMessage.hpp ---------------------------------------------------------------------- diff --git a/query_execution/WorkerMessage.hpp b/query_execution/WorkerMessage.hpp index 875fa5a..ec63af9 100644 --- a/query_execution/WorkerMessage.hpp +++ b/query_execution/WorkerMessage.hpp @@ -30,7 +30,7 @@ class WorkOrder; **/ class WorkerMessage { public: - enum WorkerMessageType { + enum class WorkerMessageType { kRebuildWorkOrder = 0, kWorkOrder, kPoison @@ -46,7 +46,9 @@ class WorkerMessage { * @return The constructed RebuildWorkOrderMessage. **/ static WorkerMessage* RebuildWorkOrderMessage(WorkOrder *rebuild_workorder, const std::size_t relational_op_index) { - return new WorkerMessage(rebuild_workorder, relational_op_index, kRebuildWorkOrder); + return new WorkerMessage(rebuild_workorder, + relational_op_index, + WorkerMessageType::kRebuildWorkOrder); } /** @@ -60,7 +62,9 @@ class WorkerMessage { * @return The constructed WorkOrderMessage. **/ static WorkerMessage* WorkOrderMessage(WorkOrder *workorder, const std::size_t relational_op_index) { - return new WorkerMessage(workorder, relational_op_index, kWorkOrder); + return new WorkerMessage(workorder, + relational_op_index, + WorkerMessageType::kWorkOrder); } /** @@ -69,7 +73,7 @@ class WorkerMessage { * @return The constructed PoisonMessage. **/ static WorkerMessage* PoisonMessage() { - return new WorkerMessage(nullptr, 0, kPoison); + return new WorkerMessage(nullptr, 0, WorkerMessageType::kPoison); } /** http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3725840/query_execution/tests/QueryManager_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp new file mode 100644 index 0000000..1b9be48 --- /dev/null +++ b/query_execution/tests/QueryManager_unittest.cpp @@ -0,0 +1,933 @@ +/** + * Copyright 2011-2015 Quickstep Technologies LLC. + * Copyright 2015-2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include <climits> +#include <memory> +#include <utility> +#include <vector> + +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/QueryContext.pb.h" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionState.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryManager.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "query_execution/WorkerDirectory.hpp" +#include "query_execution/WorkerMessage.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "query_optimizer/QueryPlan.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "storage/InsertDestination.hpp" +#include "storage/InsertDestination.pb.h" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "utility/DAG.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" +#include "gtest/gtest.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::move; +using std::unique_ptr; +using std::vector; + +using tmb::client_id; + +namespace quickstep { + +class MockWorkOrder : public WorkOrder { + public: + explicit MockWorkOrder(const int op_index) + : op_index_(op_index) {} + + void execute() override { + VLOG(3) << "WorkOrder[" << op_index_ << "] executing."; + } + + inline QueryPlan::DAGNodeIndex getOpIndex() const { + return op_index_; + } + + private: + const QueryPlan::DAGNodeIndex op_index_; + + DISALLOW_COPY_AND_ASSIGN(MockWorkOrder); +}; + +class MockOperator: public RelationalOperator { + public: + enum function_name { + kFeedInputBlock = 0, + kFeedInputBlocks, + kDoneFeedingInputBlocks, + kGetAllWorkOrders + }; + + MockOperator(const bool produce_workorders, + const bool has_streaming_input, + const int max_getworkorder_iters = 1, + const int max_workorders = INT_MAX) + : produce_workorders_(produce_workorders), + has_streaming_input_(has_streaming_input), + max_workorders_(max_workorders), + max_getworkorder_iters_(max_getworkorder_iters), + num_calls_get_workorders_(0), + num_workorders_generated_(0), + num_calls_feedblock_(0), + num_calls_feedblocks_(0), + num_calls_donefeedingblocks_(0) { + } + +#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": " + + // The methods below are used to check whether QueryManager calls the Relational + // operator, how many times it calls a particular method etc. + inline int getNumWorkOrders() const { + return num_workorders_generated_; + } + + inline int getNumCalls(const function_name fname) const { + switch (fname) { + case kFeedInputBlock: + return num_calls_feedblock_; + case kFeedInputBlocks: + return num_calls_feedblocks_; + case kDoneFeedingInputBlocks: + return num_calls_donefeedingblocks_; + case kGetAllWorkOrders: + return num_calls_get_workorders_; + default: + return -1; + } + } + + inline bool getBlockingDependenciesMet() const { + MOCK_OP_LOG(3) << "met."; + return blocking_dependencies_met_; + } + + void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) { + insert_destination_index_ = insert_destination_index; + } + + // Mock to trigger doneFeedingInputBlocks for the dependent operators + // in QueryManager::markOperatorFinished. + void setOutputRelationID(const relation_id rel_id) { + output_relation_id_ = rel_id; + } + + // Override methods from the base class. + bool getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id foreman_client_id, + tmb::MessageBus *bus) override { + ++num_calls_get_workorders_; + if (produce_workorders_) { + if (has_streaming_input_) { + if ((num_calls_feedblock_ > 0 || num_calls_feedblocks_ > 0) && (num_workorders_generated_ < max_workorders_)) { + MOCK_OP_LOG(3) << "[stream] generate WorkOrder"; + container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_); + ++num_workorders_generated_; + } + } else { + if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) { + MOCK_OP_LOG(3) << "[static] generate WorkOrder"; + container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_); + ++num_workorders_generated_; + } + } + } + MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") " + << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")"; + return num_calls_get_workorders_ == max_getworkorder_iters_; + } + + void feedInputBlock(const block_id input_block_id, + const relation_id input_relation_id) override { + ++num_calls_feedblock_; + MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")"; + } + + void feedInputBlocks(const relation_id rel_id, + std::vector<block_id> *partially_filled_blocks) override { + ++num_calls_feedblocks_; + MOCK_OP_LOG(3) << "count(" << num_calls_feedblocks_ << ")"; + } + + void doneFeedingInputBlocks(const relation_id rel_id) override { + ++num_calls_donefeedingblocks_; + MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")"; + } + + QueryContext::insert_destination_id getInsertDestinationID() const override { + return insert_destination_index_; + } + + const relation_id getOutputRelationID() const override { + return output_relation_id_; + } + + private: + const bool produce_workorders_; + const bool has_streaming_input_; + const int max_workorders_; + const int max_getworkorder_iters_; + + int num_calls_get_workorders_; + int num_workorders_generated_; + int num_calls_feedblock_; + int num_calls_feedblocks_; + int num_calls_donefeedingblocks_; + + QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId; + + relation_id output_relation_id_ = -1; + +#undef MOCK_OP_LOG + + DISALLOW_COPY_AND_ASSIGN(MockOperator); +}; + + +class QueryManagerTest : public ::testing::Test { + protected: + virtual void SetUp() { + db_.reset(new CatalogDatabase(nullptr /* catalog */, "database")); + storage_manager_.reset(new StorageManager("./")); + bus_.Initialize(); + query_handle_.reset(new QueryHandle(0)); + query_plan_ = query_handle_->getQueryPlanMutable(); + } + + inline void constructQueryManager() { + query_manager_.reset(new QueryManager( + 0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_)); + } + + inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const { + return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index); + } + + inline const int getNumOperatorsFinished() const { + return query_manager_->getQueryExecutionState().getNumOperatorsFinished(); + } + + inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const { + return query_manager_->getQueryExecutionState().hasExecutionFinished(index); + } + + inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) { + VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]"; + serialization::DataPipelineMessage proto; + proto.set_operator_index(source_operator_index); + + proto.set_block_id(0); // dummy block ID + proto.set_relation_id(0); // dummy relation ID. + + // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. + const std::size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes), + proto_length, + kDataPipelineMessage); + std::free(proto_bytes); + query_manager_->processMessage(tagged_message); + return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); + } + + inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { + VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]"; + TaggedMessage tagged_message; + serialization::WorkOrderCompletionMessage proto; + proto.set_operator_index(index); + proto.set_worker_thread_index(1); // dummy worker ID. + + // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kWorkOrderCompleteMessage); + std::free(proto_bytes); + query_manager_->processMessage(message); + + return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); + } + + inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) { + VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]"; + // foreman_->processRebuildWorkOrderCompleteMessage(index, 0 /* worker id */); + serialization::WorkOrderCompletionMessage proto; + proto.set_operator_index(index); + proto.set_worker_thread_index(1); // dummy worker thread ID. + + // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kRebuildWorkOrderCompleteMessage); + + std::free(proto_bytes); + query_manager_->processMessage(message); + + return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); + } + + inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) { + VLOG(3) << "Place OutputBlock message for Op[" << index << "]"; + serialization::DataPipelineMessage proto; + proto.set_operator_index(index); + + proto.set_block_id(0); // dummy block ID + proto.set_relation_id(0); // dummy relation ID. + + // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. + const std::size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + tmb::TaggedMessage tagged_message(static_cast<const void *>(proto_bytes), + proto_length, + kDataPipelineMessage); + std::free(proto_bytes); + query_manager_->processMessage(tagged_message); + return query_manager_->getQueryExecutionState().hasQueryExecutionFinished(); + } + + unique_ptr<CatalogDatabase> db_; + unique_ptr<StorageManager> storage_manager_; + + QueryPlan *query_plan_; + unique_ptr<QueryHandle> query_handle_; + unique_ptr<QueryManager> query_manager_; + + // unique_ptr<Foreman> foreman_; + MessageBusImpl bus_; + + client_id worker_client_id_; + + unique_ptr<WorkerDirectory> workers_; +}; + +TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) { + // This test creates a DAG of a single node. No workorders are generated. + query_plan_->addRelationalOperator(new MockOperator(false, false)); + // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); + + const MockOperator &op = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(0)); + + constructQueryManager(); + + // op doesn't have any dependencies. + EXPECT_TRUE(op.getBlockingDependenciesMet()); + + // We expect one call for op's getAllWorkOrders(). + EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); +} + +TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) { + // This test creates a DAG of a single node. Static workorders are generated. + const QueryPlan::DAGNodeIndex id = + query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); + // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); + + const MockOperator &op = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(id)); + + constructQueryManager(); + + // op doesn't have any dependencies. + EXPECT_TRUE(op.getBlockingDependenciesMet()); + + // We expect one call for op's getAllWorkOrders(). + EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); + + // One workorder is generated. + EXPECT_EQ(1, op.getNumWorkOrders()); + + unique_ptr<WorkerMessage> worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(0, -1)); + EXPECT_TRUE(worker_message != nullptr); + + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(0u, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id)); + EXPECT_EQ(0, getNumOperatorsFinished()); + + // Send a message to QueryManager upon workorder completion. + // Last event processed by QueryManager. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id)); + EXPECT_EQ(1, getNumOperatorsFinished()); + EXPECT_TRUE(getOperatorFinishedStatus(id)); +} + +TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) { + // This test creates a DAG of a single node. WorkOrders are generated + // dynamically as pending work orders complete execution, i.e., + // getAllWorkOrders() is called multiple times. getAllWorkOrders() will be + // called 5 times and 3 work orders will be returned, i.e., 1st 3 calls to + // getAllWorkOrders() insert 1 WorkOrder and return false, and the next will + // insert no WorkOrder and return true. + + // TODO(shoban): This test can not be more robust than this because of fixed + // scaffolding of mocking. If we use gMock, we can do much better. + const QueryPlan::DAGNodeIndex id = + query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3)); + // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable()); + + const MockOperator &op = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(id)); + + constructQueryManager(); + + // op doesn't have any dependencies. + EXPECT_TRUE(op.getBlockingDependenciesMet()); + + for (int i = 0; i < 3; ++i) { + // We expect one call for op's getAllWorkOrders(). + EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders)); + + // One workorder is generated. + // EXPECT_EQ(1, getWorkerInputQueueSize()); + EXPECT_EQ(i + 1, op.getNumWorkOrders()); + + unique_ptr<WorkerMessage> worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id)); + EXPECT_EQ(0, getNumOperatorsFinished()); + + if (i < 2) { + // Send a message to QueryManager upon workorder completion. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id)); + } else { + // Send a message to QueryManager upon workorder completion. + // Last event. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id)); + } + } + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id)); + + EXPECT_EQ(1, getNumOperatorsFinished()); + EXPECT_TRUE(getOperatorFinishedStatus(id)); + + // We place this check in the end, since it's true throughout the test. + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlocks)); +} + +TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) { + // We use two nodes in the DAG with a blocking link between them. + // There is no streaming of data involved in this test. + const QueryPlan::DAGNodeIndex id1 = + query_plan_->addRelationalOperator(new MockOperator(true, false)); + const QueryPlan::DAGNodeIndex id2 = + query_plan_->addRelationalOperator(new MockOperator(true, false)); + + // Create a blocking link. + query_plan_->addDirectDependency(id2, id1, true); + + static_cast<MockOperator *>( + query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)) + ->setOutputRelationID(0xdead); + + const MockOperator &op1 = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(id1)); + const MockOperator &op2 = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(id2)); + + constructQueryManager(); + + // op1 doesn't have any dependencies + EXPECT_TRUE(op1.getBlockingDependenciesMet()); + + // Only op1 should receive a call to getAllWorkOrders initially. + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); + + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); + + // Only op1 should produce a workorder. + EXPECT_EQ(1, op1.getNumWorkOrders()); + EXPECT_EQ(0, op2.getNumWorkOrders()); + + // Foreman hasn't yet got workorder completion response for the workorder. + unique_ptr<WorkerMessage> worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id1)); + EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); + EXPECT_EQ(0, getNumOperatorsFinished()); + + // Send a message to Foreman upon workorder (generated by op1) completion. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); + // op1 is over now, op2 still to go. + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + EXPECT_EQ(1, getNumOperatorsFinished()); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); + + // op1 is op2's blocking dependency. + EXPECT_TRUE(op2.getBlockingDependenciesMet()); + + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + // op2 should get first call of getAllWorkOrders() when op1 is over. + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + + EXPECT_EQ(1, op2.getNumWorkOrders()); + + // Send a message to QueryManager upon workorder (generated by op2) completion. + // Note that the worker hasn't yet popped the workorder. Usually this won't + // happen as workers pop workorders first, execute and then send the response. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); + + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); + EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); + + EXPECT_EQ(2, getNumOperatorsFinished()); + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_TRUE(getOperatorFinishedStatus(id2)); + + // Expect no additional calls to getAllWorkOrders. + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); +} + +TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) { + // We use two nodes in the DAG with a non-blocking link between them. + // We stream output of op1 to op2. Sequeuce of events is as follows: + // 1. op1 creates a workorder. + // 2. We send a "block full" (from op1) to QueryManager. + // 3. op2 creates a workorder because of step 2. + const QueryPlan::DAGNodeIndex id1 = + query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); + const QueryPlan::DAGNodeIndex id2 = + query_plan_->addRelationalOperator(new MockOperator(true, true, 3)); + + // Create a non-blocking link. + query_plan_->addDirectDependency(id2, id1, false); + + static_cast<MockOperator *>( + query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)) + ->setOutputRelationID(0xdead); + + const MockOperator &op1 = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(id1)); + const MockOperator &op2 = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(id2)); + + constructQueryManager(); + + // As none of the operators have a blocking link, blocking dependencies should + // be met. + EXPECT_TRUE(op1.getBlockingDependenciesMet()); + EXPECT_TRUE(op2.getBlockingDependenciesMet()); + + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op1.getNumWorkOrders()); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); + + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + // op2 will generate workorder only after receiving a streaming input. + EXPECT_EQ(0, op2.getNumWorkOrders()); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); + + unique_ptr<WorkerMessage> worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // Send a message to QueryManager upon block getting full (output of op1). + EXPECT_FALSE(placeOutputBlockMessage(id1)); + + // op1 is not finished yet because the response of workorder completion hasn't + // been received yet by the QueryManager. + EXPECT_FALSE(getOperatorFinishedStatus(id1)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + + // No additional call to op1's getAllWorkOrders. + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlocks)); + + // Output from op1 should be fed to op2. + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock)); + EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlocks)); + + // A call to op2's getAllWorkOrders because of the streamed input. + EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op2.getNumWorkOrders()); + + // Place a message of a workorder completion of op1 on Foreman's input queue. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + + // An additional call to op2's getAllWorkOrders because of completion of op1. + EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(2, op2.getNumWorkOrders()); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // Place a message of a workorder completion of op2 on Foreman's input queue. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id2)); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + + // Send a message to Foreman upon workorder (generated by op2) completion. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); + EXPECT_TRUE(getOperatorFinishedStatus(id2)); +} + +TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) { + // In this test, we create a 2-node DAG with a non-blocking link between them. + // There is no streaming of data from op1 to op2 during the execution of op1. + // op1 produces a partially filled block at the end of its execution which is + // rebuilt and then fed to op2. + const QueryPlan::DAGNodeIndex id1 = + query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); + const QueryPlan::DAGNodeIndex id2 = + query_plan_->addRelationalOperator(new MockOperator(true, true, 3, 1)); + + // Create a non-blocking link. + query_plan_->addDirectDependency(id2, id1, false); + + // Create a relation, owned by db_.*/ + CatalogRelation *relation = + new CatalogRelation(nullptr /* catalog_database */, "test_relation"); + const relation_id output_relation_id = db_->addRelation(relation); + + // Setup the InsertDestination proto in the query context proto. + serialization::QueryContext *query_context_proto = + query_handle_->getQueryContextProtoMutable(); + + const QueryContext::insert_destination_id insert_destination_index = + query_context_proto->insert_destinations_size(); + serialization::InsertDestination *insert_destination_proto = + query_context_proto->add_insert_destinations(); + + insert_destination_proto->set_insert_destination_type( + serialization::InsertDestinationType::BLOCK_POOL); + insert_destination_proto->set_relation_id(output_relation_id); + insert_destination_proto->set_relational_op_index(id1); + + MockOperator *op1_mutable = static_cast<MockOperator *>( + query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1)); + op1_mutable->setInsertDestinationID(insert_destination_index); + op1_mutable->setOutputRelationID(output_relation_id); + + const MockOperator &op1 = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(id1)); + const MockOperator &op2 = static_cast<const MockOperator &>( + query_plan_->getQueryPlanDAG().getNodePayload(id2)); + + constructQueryManager(); + + // NOTE(zuyu): An operator generally has no ideas about partially filled + // blocks, but InsertDestination in QueryContext does. + // Mock to add partially filled blocks in the InsertDestination. + InsertDestination *insert_destination = + query_manager_->getQueryContextMutable()->getInsertDestination( + insert_destination_index); + DCHECK(insert_destination != nullptr); + MutableBlockReference block_ref; + static_cast<BlockPoolInsertDestination *>(insert_destination) + ->available_block_refs_.push_back(move(block_ref)); + + // There's no blocking dependency in the DAG. + EXPECT_TRUE(op1.getBlockingDependenciesMet()); + EXPECT_TRUE(op2.getBlockingDependenciesMet()); + + EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op1.getNumWorkOrders()); + + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(0, op2.getNumWorkOrders()); + + unique_ptr<WorkerMessage> worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // Send a message to QueryManager upon workorder (generated by op1) completion. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id1)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id1)); + + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder, + worker_message->getType()); + + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // op1 generates a rebuild workorder. The block is rebuilt and streamed + // to Foreman. + EXPECT_FALSE(placeDataPipelineMessage(id1)); + + EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1)); + // Based on the streamed input, op2's getAllWorkOrders should produce a + // workorder. + EXPECT_EQ(3, op2.getNumCalls(MockOperator::kGetAllWorkOrders)); + EXPECT_EQ(1, op2.getNumWorkOrders()); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + EXPECT_EQ(1, getNumWorkOrdersInExecution(id2)); + + // Send a message to QueryManager upon workorder (generated by op2) completion. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id2)); + + EXPECT_EQ(0, getNumWorkOrdersInExecution(id2)); + + EXPECT_TRUE(getOperatorFinishedStatus(id2)); +} + +TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) { + // When an operator produces workorders but no output, the QueryManager should + // check the dependents of this operator to make progress. + const QueryPlan::DAGNodeIndex kNumNodes = 5; + std::vector<QueryPlan::DAGNodeIndex> ids; + ids.reserve(kNumNodes); + + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { + if (i == 0) { + ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, false)); + } else { + ids[i] = query_plan_->addRelationalOperator(new MockOperator(true, true)); + } + VLOG(3) << ids[i]; + } + + /** + * The DAG looks like this: + * + * op1 -> op2 -> op3 -> op4 -> op5 + * + **/ + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) { + query_plan_->addDirectDependency(ids[i + 1], ids[i], false); + static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(ids[i])) + ->setOutputRelationID(0xdead); + } + + std::vector<const MockOperator*> operators; + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { + operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(ids[i]))); + } + + constructQueryManager(); + + // operators[0] should have produced a workorder by now. + EXPECT_EQ(1, operators[0]->getNumWorkOrders()); + + unique_ptr<WorkerMessage> worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(ids[0], -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + + EXPECT_EQ(ids[0], worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + EXPECT_EQ(1, getNumWorkOrdersInExecution(ids[0])); + EXPECT_FALSE(getOperatorFinishedStatus(ids[0])); + + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { + EXPECT_EQ(1, operators[ids[i]]->getNumCalls(MockOperator::kGetAllWorkOrders)); + } + + // Send a message to QueryManager upon workorder (generated by operators[0]) + // completion. + EXPECT_TRUE(placeWorkOrderCompleteMessage(ids[0])); + + for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) { + EXPECT_EQ(0, getNumWorkOrdersInExecution(ids[i])); + EXPECT_TRUE(getOperatorFinishedStatus(ids[i])); + if (i < kNumNodes - 1) { + EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks)); + } + } +} + +TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) { + // Consider two operators, both generate one workorder each. The dependent's + // workorder finishes before dependency's workorder. + const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1)); + const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1)); + + // Create a non-blocking link. + query_plan_->addDirectDependency(id2, id1, false); + + constructQueryManager(); + + unique_ptr<WorkerMessage> worker_message; + worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1)); + + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + + EXPECT_EQ(id1, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // Send a message to QueryManager upon a block (output of op1) getting full. + EXPECT_FALSE(placeOutputBlockMessage(id1)); + + // op1 is not finished yet because the response of workorder completion hasn't + // been received yet. + EXPECT_FALSE(getOperatorFinishedStatus(id1)); + EXPECT_FALSE(getOperatorFinishedStatus(id2)); + + worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1)); + EXPECT_TRUE(worker_message != nullptr); + EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder, + worker_message->getType()); + + EXPECT_EQ(id2, worker_message->getRelationalOpIndex()); + + delete worker_message->getWorkOrder(); + + // As mentioned earlier, op2 finishes before op1. + EXPECT_FALSE(placeWorkOrderCompleteMessage(id2)); + + // op1's workorder execution is over. + EXPECT_TRUE(placeWorkOrderCompleteMessage(id1)); + + EXPECT_TRUE(getOperatorFinishedStatus(id1)); + EXPECT_TRUE(getOperatorFinishedStatus(id2)); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d3725840/storage/InsertDestination.hpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp index a2ed029..670cd6c 100644 --- a/storage/InsertDestination.hpp +++ b/storage/InsertDestination.hpp @@ -387,6 +387,7 @@ class BlockPoolInsertDestination : public InsertDestination { private: FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest); + FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest); // A vector of references to blocks which are loaded in memory. std::vector<MutableBlockReference> available_block_refs_;
