Repository: incubator-quickstep Updated Branches: refs/heads/master 7f25d1c14 -> e37ec541c
Added \analyze support in the distributed version. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e37ec541 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e37ec541 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e37ec541 Branch: refs/heads/master Commit: e37ec541c1490d8434eb441a1c0389e20f80ec6e Parents: 7f25d1c Author: Zuyu Zhang <zu...@apache.org> Authored: Thu Mar 2 23:36:28 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Fri Mar 3 04:09:36 2017 -0800 ---------------------------------------------------------------------- cli/CMakeLists.txt | 2 +- cli/distributed/CMakeLists.txt | 7 + cli/distributed/Cli.cpp | 8 +- cli/distributed/Conductor.cpp | 99 ++++++++++++- cli/distributed/Conductor.hpp | 8 +- query_execution/CMakeLists.txt | 12 ++ query_execution/ForemanDistributed.cpp | 25 +++- query_execution/ForemanDistributed.hpp | 12 +- query_execution/PolicyEnforcerDistributed.cpp | 155 ++++++++++++++++++--- query_execution/PolicyEnforcerDistributed.hpp | 14 ++ query_execution/QueryExecutionTypedefs.hpp | 2 +- query_optimizer/CMakeLists.txt | 1 + query_optimizer/QueryHandle.hpp | 43 +++++- 13 files changed, 351 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt index ffeed2f..4562fe6 100644 --- a/cli/CMakeLists.txt +++ b/cli/CMakeLists.txt @@ -84,6 +84,7 @@ target_link_libraries(quickstep_cli_CommandExecutor quickstep_expressions_aggregation_AggregateFunctionMax quickstep_expressions_aggregation_AggregateFunctionMin quickstep_parser_ParseStatement + quickstep_parser_ParseString quickstep_parser_SqlParserWrapper quickstep_queryoptimizer_QueryHandle quickstep_queryoptimizer_QueryPlan @@ -93,7 +94,6 @@ target_link_libraries(quickstep_cli_CommandExecutor quickstep_storage_StorageManager quickstep_storage_TupleIdSequence quickstep_storage_TupleStorageSubBlock - quickstep_parser_ParseString quickstep_types_Type quickstep_types_TypeID quickstep_types_TypedValue http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt index 1f7dee0..2db27e5 100644 --- a/cli/distributed/CMakeLists.txt +++ b/cli/distributed/CMakeLists.txt @@ -25,13 +25,19 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp) # Link dependencies: target_link_libraries(quickstep_cli_distributed_Conductor glog + quickstep_catalog_CatalogAttribute quickstep_catalog_CatalogDatabase + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs quickstep_cli_CommandExecutorUtil quickstep_cli_Constants quickstep_cli_DefaultsConfigurator quickstep_cli_Flags quickstep_cli_distributed_Role + quickstep_expressions_aggregation_AggregateFunctionMax + quickstep_expressions_aggregation_AggregateFunctionMin quickstep_parser_ParseStatement + quickstep_parser_ParseString quickstep_parser_SqlParserWrapper quickstep_queryexecution_BlockLocator quickstep_queryexecution_ForemanDistributed @@ -42,6 +48,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor quickstep_queryoptimizer_QueryProcessor quickstep_storage_StorageConstants quickstep_utility_Macros + quickstep_utility_PtrVector quickstep_utility_SqlError quickstep_utility_StringUtil tmb) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Cli.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp index 63f3259..14880a7 100644 --- a/cli/distributed/Cli.cpp +++ b/cli/distributed/Cli.cpp @@ -167,11 +167,9 @@ void Cli::run() { const ParseCommand &parse_command = static_cast<const ParseCommand &>(statement); const std::string &command = parse_command.command()->value(); try { - if (command == C::kAnalyzeCommand) { - // TODO(zuyu): support '\analyze'. - THROW_SQL_ERROR_AT(parse_command.command()) << "Unsupported Command"; - } else if (command != C::kDescribeDatabaseCommand && - command != C::kDescribeTableCommand) { + if (command != C::kAnalyzeCommand && + command != C::kDescribeDatabaseCommand && + command != C::kDescribeTableCommand) { THROW_SQL_ERROR_AT(parse_command.command()) << "Invalid Command"; } } catch (const SqlError &error) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index 1b8bfb2..5fb4453 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -27,13 +27,20 @@ #include <sstream> #include <string> #include <utility> +#include <vector> +#include "catalog/CatalogAttribute.hpp" #include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" #include "cli/CommandExecutorUtil.hpp" #include "cli/Constants.hpp" #include "cli/DefaultsConfigurator.hpp" #include "cli/Flags.hpp" +#include "expressions/aggregation/AggregateFunctionMax.hpp" +#include "expressions/aggregation/AggregateFunctionMin.hpp" #include "parser/ParseStatement.hpp" +#include "parser/ParseString.hpp" #include "parser/SqlParserWrapper.hpp" #include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" @@ -43,6 +50,7 @@ #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryProcessor.hpp" #include "storage/StorageConstants.hpp" +#include "utility/PtrVector.hpp" #include "utility/SqlError.hpp" #include "utility/StringUtil.hpp" @@ -69,6 +77,8 @@ namespace quickstep { namespace C = cli; namespace S = serialization; +class Type; + void Conductor::init() { try { string catalog_path = FLAGS_storage_path + kCatalogFilename; @@ -154,7 +164,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm SqlParserWrapper parser_wrapper; parser_wrapper.feedNextBuffer(command_string); ParseResult parse_result = parser_wrapper.getNextStatement(); - CHECK(parse_result.condition == ParseResult::kSuccess) + CHECK_EQ(ParseResult::kSuccess, parse_result.condition) << "Any syntax error should be addressed in the DistributedCli."; const ParseStatement &statement = *parse_result.parsed_statement; @@ -165,6 +175,11 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm const PtrVector<ParseString> &arguments = *(parse_command.arguments()); const string &command = parse_command.command()->value(); + if (command == C::kAnalyzeCommand) { + executeAnalyze(sender, arguments); + return; + } + string command_response; if (command == C::kDescribeDatabaseCommand) { command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_); @@ -225,4 +240,86 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm } } +void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments) { + std::vector<std::reference_wrapper<const CatalogRelation>> relations; + if (arguments.empty()) { + relations.insert(relations.end(), catalog_database_->begin(), catalog_database_->end()); + } else { + for (const auto &argument : arguments) { + const CatalogRelation *relation = catalog_database_->getRelationByName(argument.value()); + if (relation == nullptr) { + THROW_SQL_ERROR_AT(&argument) << "Table does not exist"; + } + + relations.emplace_back(*relation); + } + } + + // Analyze each relation in the database. + for (const CatalogRelation &relation : relations) { + const relation_id rel_id = relation.getID(); + const string rel_name = EscapeQuotes(relation.getName(), '"'); + + // Get the number of distinct values for each column. + for (const CatalogAttribute &attribute : relation) { + const string attr_name = EscapeQuotes(attribute.getName(), '"'); + const Type &attr_type = attribute.getType(); + const bool is_min_applicable = + AggregateFunctionMin::Instance().canApplyToTypes({&attr_type}); + const bool is_max_applicable = + AggregateFunctionMax::Instance().canApplyToTypes({&attr_type}); + + // NOTE(jianqiao): Note that the relation name and the attribute names may + // contain non-letter characters, e.g. CREATE TABLE "with space"("1" int). + // So here we need to format the names with double quotes ("). + string *query_string = new string("SELECT COUNT(DISTINCT \""); + query_string->append(attr_name); + query_string->append("\")"); + if (is_min_applicable) { + query_string->append(", MIN(\""); + query_string->append(attr_name); + query_string->append("\")"); + } + if (is_max_applicable) { + query_string->append(", MAX(\""); + query_string->append(attr_name); + query_string->append("\")"); + } + query_string->append(" FROM \""); + query_string->append(rel_name); + query_string->append("\";"); + + submitQuery(sender, query_string, + new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(), + attribute.getID(), is_min_applicable, is_max_applicable)); + } + + // Get the number of tuples for the relation. + string *query_string = new string("SELECT COUNT(*) FROM \""); + query_string->append(rel_name); + query_string->append("\";"); + + submitQuery(sender, query_string, + new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size())); + } +} + +void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) { + SqlParserWrapper parser_wrapper; + parser_wrapper.feedNextBuffer(query); + ParseResult parse_result = parser_wrapper.getNextStatement(); + DCHECK_EQ(ParseResult::kSuccess, parse_result.condition); + + const ParseStatement &statement = *parse_result.parsed_statement; + + // Generate the query plan. + auto query_handle = + make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info); + query_processor_->generateQueryHandle(statement, query_handle.get()); + DCHECK(query_handle->getQueryPlanMutable() != nullptr); + + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/cli/distributed/Conductor.hpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp index 0c0f7e5..28c8e02 100644 --- a/cli/distributed/Conductor.hpp +++ b/cli/distributed/Conductor.hpp @@ -26,15 +26,17 @@ #include "cli/distributed/Role.hpp" #include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" +#include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryProcessor.hpp" #include "utility/Macros.hpp" +#include "utility/PtrVector.hpp" #include "tmb/id_typedefs.h" namespace quickstep { class CatalogDatabase; -class ParseCommand; +class ParseString; /** \addtogroup CliDistributed * @{ @@ -62,6 +64,10 @@ class Conductor final : public Role { private: void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string); + void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments); + + void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info); + std::unique_ptr<QueryProcessor> query_processor_; // Not owned. CatalogDatabase *catalog_database_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 23b706f..82ff183 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -101,8 +101,10 @@ if (ENABLE_DISTRIBUTED) quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_catalog_Catalog_proto + quickstep_cli_Flags quickstep_queryexecution_AdmitRequestMessage quickstep_queryexecution_BlockLocator + quickstep_queryexecution_BlockLocatorUtil quickstep_queryexecution_ForemanBase quickstep_queryexecution_PolicyEnforcerBase quickstep_queryexecution_PolicyEnforcerDistributed @@ -112,7 +114,9 @@ if (ENABLE_DISTRIBUTED) quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_ShiftbossDirectory quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_DataExchangerAsync quickstep_storage_StorageBlockInfo + quickstep_storage_StorageManager quickstep_threading_ThreadUtil quickstep_utility_EqualsAnyConstant quickstep_utility_Macros @@ -151,7 +155,10 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed glog + quickstep_catalog_CatalogDatabase quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogRelationSchema + quickstep_catalog_CatalogRelationStatistics quickstep_catalog_CatalogTypedefs quickstep_catalog_Catalog_proto quickstep_queryexecution_PolicyEnforcerBase @@ -166,7 +173,12 @@ if (ENABLE_DISTRIBUTED) quickstep_queryexecution_ShiftbossDirectory quickstep_queryoptimizer_QueryHandle quickstep_queryoptimizer_QueryProcessor + quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo + quickstep_storage_StorageManager + quickstep_storage_TupleIdSequence + quickstep_storage_TupleStorageSubBlock + quickstep_types_TypedValue quickstep_utility_ExecutionDAGVisualizer quickstep_utility_Macros tmb http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 57f432f..3d47fb6 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -27,8 +27,10 @@ #include "catalog/CatalogDatabase.hpp" #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" +#include "cli/Flags.hpp" #include "query_execution/AdmitRequestMessage.hpp" #include "query_execution/BlockLocator.hpp" +#include "query_execution/BlockLocatorUtil.hpp" #include "query_execution/PolicyEnforcerBase.hpp" #include "query_execution/PolicyEnforcerDistributed.hpp" #include "query_execution/QueryContext.hpp" @@ -37,7 +39,9 @@ #include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/ShiftbossDirectory.hpp" #include "relational_operators/WorkOrder.pb.h" +#include "storage/DataExchangerAsync.hpp" #include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" #include "threading/ThreadUtil.hpp" #include "utility/EqualsAnyConstant.hpp" @@ -49,6 +53,7 @@ #include "tmb/message_style.h" #include "tmb/tagged_message.h" +using std::make_unique; using std::move; using std::size_t; using std::unique_ptr; @@ -75,6 +80,7 @@ ForemanDistributed::ForemanDistributed( block_locator_(block_locator), catalog_database_(DCHECK_NOTNULL(catalog_database)) { const std::vector<QueryExecutionMessageType> sender_message_types{ + kBlockDomainRegistrationMessage, kShiftbossRegistrationResponseMessage, kQueryInitiateMessage, kWorkOrderMessage, @@ -82,6 +88,7 @@ ForemanDistributed::ForemanDistributed( kQueryTeardownMessage, kSaveQueryResultMessage, kQueryExecutionSuccessMessage, + kCommandResponseMessage, kPoisonMessage}; for (const auto message_type : sender_message_types) { @@ -89,6 +96,7 @@ ForemanDistributed::ForemanDistributed( } const std::vector<QueryExecutionMessageType> receiver_message_types{ + kBlockDomainRegistrationResponseMessage, kShiftbossRegistrationMessage, kAdmitRequestMessage, kQueryInitiateResponseMessage, @@ -105,12 +113,17 @@ ForemanDistributed::ForemanDistributed( bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); } - policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>( - foreman_client_id_, - catalog_database_, - query_processor, - &shiftboss_directory_, - bus_); + client_id locator_client_id; + storage_manager_ = make_unique<StorageManager>( + FLAGS_storage_path, + block_locator::getBlockDomain(data_exchanger_.network_address(), foreman_client_id_, &locator_client_id, bus_), + locator_client_id, bus_); + + data_exchanger_.set_storage_manager(storage_manager_.get()); + data_exchanger_.start(); + + policy_enforcer_ = make_unique<PolicyEnforcerDistributed>( + foreman_client_id_, catalog_database_, query_processor, storage_manager_.get(), &shiftboss_directory_, bus_); } void ForemanDistributed::run() { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp index 7fc98bd..4053b9d 100644 --- a/query_execution/ForemanDistributed.hpp +++ b/query_execution/ForemanDistributed.hpp @@ -25,6 +25,8 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/ForemanBase.hpp" #include "query_execution/ShiftbossDirectory.hpp" +#include "storage/DataExchangerAsync.hpp" +#include "storage/StorageManager.hpp" #include "utility/Macros.hpp" #include "tmb/id_typedefs.h" @@ -70,7 +72,11 @@ class ForemanDistributed final : public ForemanBase { QueryProcessor *query_processor, const int cpu_id = -1); - ~ForemanDistributed() override {} + ~ForemanDistributed() override { + data_exchanger_.shutdown(); + storage_manager_.reset(); + data_exchanger_.join(); + } void printWorkOrderProfilingResults(const std::size_t query_id, std::FILE *out) const override; @@ -126,6 +132,10 @@ class ForemanDistributed final : public ForemanBase { CatalogDatabaseLite *catalog_database_; + // Used for '\analyze'. + DataExchangerAsync data_exchanger_; + std::unique_ptr<StorageManager> storage_manager_; + // From a query id to a set of Shiftbosses that save query result. std::unordered_map<std::size_t, std::unordered_set<std::size_t>> query_result_saved_shiftbosses_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 25f2d72..424452e 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -26,7 +26,10 @@ #include <vector> #include "catalog/Catalog.pb.h" +#include "catalog/CatalogDatabase.hpp" #include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogRelationSchema.hpp" +#include "catalog/CatalogRelationStatistics.hpp" #include "query_execution/QueryContext.pb.h" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionState.hpp" @@ -36,7 +39,12 @@ #include "query_execution/QueryManagerDistributed.hpp" #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryProcessor.hpp" +#include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "storage/TupleIdSequence.hpp" +#include "storage/TupleStorageSubBlock.hpp" +#include "types/TypedValue.hpp" #include "utility/ExecutionDAGVisualizer.hpp" #include "gflags/gflags.h" @@ -50,7 +58,9 @@ using std::free; using std::malloc; using std::move; +using std::ostringstream; using std::size_t; +using std::string; using std::unique_ptr; using std::vector; @@ -231,7 +241,7 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) { const QueryHandle *query_handle = query_manager->query_handle(); - const CatalogRelation *query_result = query_handle->getQueryResultRelation(); + const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation(); const tmb::client_id cli_id = query_handle->getClientId(); const std::size_t query_id = query_handle->query_id(); @@ -259,7 +269,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i)); } - if (query_result == nullptr) { + if (query_result_relation == nullptr) { if (query_processor_) { query_processor_->saveCatalog(); } @@ -272,17 +282,12 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage char *proto_bytes = static_cast<char*>(malloc(proto_length)); CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kQueryTeardownMessage); + TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage); free(proto_bytes); DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage << "') to all Shiftbosses"; - QueryExecutionUtil::BroadcastMessage(foreman_client_id_, - shiftboss_addresses, - move(message), - bus_); + QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_); TaggedMessage cli_message(kQueryExecutionSuccessMessage); @@ -299,12 +304,33 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage return; } + const QueryHandle::AnalyzeQueryInfo *analyze_query_info = query_handle->analyze_query_info(); + if (analyze_query_info) { + processAnalyzeQueryResult(cli_id, query_result_relation, analyze_query_info); + + // Clean up query execution states, i.e., QueryContext, in Shiftbosses. + S::QueryTeardownMessage proto; + proto.set_query_id(query_id); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage); + free(proto_bytes); + + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage + << "') to all Shiftbosses"; + QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_); + return; + } + // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss. S::SaveQueryResultMessage proto; proto.set_query_id(query_id); - proto.set_relation_id(query_result->getID()); + proto.set_relation_id(query_result_relation->getID()); - const vector<block_id> blocks(query_result->getBlocksSnapshot()); + const vector<block_id> blocks(query_result_relation->getBlocksSnapshot()); for (const block_id block : blocks) { proto.add_blocks(block); } @@ -315,18 +341,111 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage char *proto_bytes = static_cast<char*>(malloc(proto_length)); CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kSaveQueryResultMessage); + TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kSaveQueryResultMessage); free(proto_bytes); // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage << "') to all Shiftbosses"; - QueryExecutionUtil::BroadcastMessage(foreman_client_id_, - shiftboss_addresses, - move(message), - bus_); + QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_); +} + +void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id cli_id, + const CatalogRelation *query_result_relation, + const QueryHandle::AnalyzeQueryInfo *analyze_query_info) { + const relation_id rel_id = analyze_query_info->rel_id; + CatalogRelation *mutable_relation = + static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(rel_id); + CatalogRelationStatistics *mutable_stat = mutable_relation->getStatisticsMutable(); + + const auto analyze_query_result = [this, &query_result_relation]() { + const vector<block_id> blocks = query_result_relation->getBlocksSnapshot(); + DCHECK_EQ(1u, blocks.size()); + + vector<TypedValue> query_result; + { + BlockReference block = storage_manager_->getBlock(blocks.front(), *query_result_relation); + const TupleStorageSubBlock &tuple_store = block->getTupleStorageSubBlock(); + DCHECK_EQ(1, tuple_store.numTuples()); + + const std::size_t num_columns = tuple_store.getRelation().size(); + if (tuple_store.isPacked()) { + for (std::size_t i = 0; i < num_columns; ++i) { + query_result.emplace_back(tuple_store.getAttributeValueTyped(0, i)); + } + } else { + std::unique_ptr<TupleIdSequence> existence_map(tuple_store.getExistenceMap()); + for (std::size_t i = 0; i < num_columns; ++i) { + query_result.emplace_back( + tuple_store.getAttributeValueTyped(*existence_map->begin(), i)); + } + } + } + + // Clean up the query result relation. + for (const block_id block : blocks) { + storage_manager_->deleteBlockOrBlobFile(block); + } + catalog_database_->dropRelationById(query_result_relation->getID()); + + return query_result; + }(); + + if (analyze_query_info->is_analyze_attribute_query) { + const attribute_id attr_id = analyze_query_info->attr_id; + + auto cit = analyze_query_result.begin(); + DCHECK_EQ(TypeID::kLong, cit->getTypeID()); + mutable_stat->setNumDistinctValues(attr_id, cit->getLiteral<std::int64_t>()); + + if (analyze_query_info->is_min_applicable) { + ++cit; + mutable_stat->setMinValue(attr_id, *cit); + } + + if (analyze_query_info->is_max_applicable) { + ++cit; + mutable_stat->setMaxValue(attr_id, *cit); + } + } else { + completed_analyze_relations_[cli_id].push_back(rel_id); + + DCHECK_EQ(1u, analyze_query_result.size()); + const TypedValue &num_tuples = analyze_query_result.front(); + DCHECK_EQ(TypeID::kLong, num_tuples.getTypeID()); + mutable_stat->setNumTuples(num_tuples.getLiteral<std::int64_t>()); + + mutable_stat->setExactness(true); + + if (completed_analyze_relations_[cli_id].size() == analyze_query_info->num_relations) { + query_processor_->markCatalogAltered(); + query_processor_->saveCatalog(); + + ostringstream analyze_command_response; + for (const relation_id rel_id : completed_analyze_relations_[cli_id]) { + analyze_command_response << "Analyzing " << catalog_database_->getRelationSchemaById(rel_id).getName() + << " ... done\n"; + } + + S::CommandResponseMessage proto; + proto.set_command_response(analyze_command_response.str()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage); + free(proto_bytes); + + DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage + << "') to CLI with TMB client id " << cli_id; + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); + + completed_analyze_relations_.erase(cli_id); + } + } } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index 18fd9ae..5334da0 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -28,6 +28,7 @@ #include "query_execution/QueryManagerBase.hpp" #include "query_execution/ShiftbossDirectory.hpp" #include "query_optimizer/QueryHandle.hpp" +#include "types/TypedValue.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -42,7 +43,9 @@ class TaggedMessage; namespace quickstep { class CatalogDatabaseLite; +class CatalogRelation; class QueryProcessor; +class StorageManager; /** \addtogroup QueryExecution * @{ @@ -61,16 +64,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { * @param catalog_database The CatalogDatabase used. * @param query_processor The QueryProcessor to save catalog upon the query * completion. + * @param storage_manager The StorageManager to use. + * @param shiftboss_directory The ShiftbossDirectory to manage Shiftbosses. * @param bus The TMB. **/ PolicyEnforcerDistributed(const tmb::client_id foreman_client_id, CatalogDatabaseLite *catalog_database, QueryProcessor *query_processor, + StorageManager *storage_manager, ShiftbossDirectory *shiftboss_directory, tmb::MessageBus *bus) : PolicyEnforcerBase(catalog_database), foreman_client_id_(foreman_client_id), query_processor_(query_processor), + storage_manager_(storage_manager), shiftboss_directory_(shiftboss_directory), bus_(bus) {} @@ -157,13 +164,20 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { void initiateQueryInShiftboss(QueryHandle *query_handle); + void processAnalyzeQueryResult(const tmb::client_id cli_id, + const CatalogRelation *query_result_relation, + const QueryHandle::AnalyzeQueryInfo *analyze_query_info); + const tmb::client_id foreman_client_id_; QueryProcessor *query_processor_; + StorageManager *storage_manager_; ShiftbossDirectory *shiftboss_directory_; tmb::MessageBus *bus_; + std::unordered_map<tmb::client_id, std::vector<relation_id>> completed_analyze_relations_; + DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index a49de5e..081852f 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -90,7 +90,7 @@ enum QueryExecutionMessageType : message_type_id { kDistributedCliRegistrationMessage, // From CLI to Conductor. kDistributedCliRegistrationResponseMessage, // From Conductor to CLI. - kSqlQueryMessage, // From CLI to Conductor. + kSqlQueryMessage, // From CLI to Conductor. kQueryInitiateMessage, // From Foreman to Shiftboss. kQueryInitiateResponseMessage, // From Shiftboss to Foreman. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index 3ff783c..9bdb753 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -229,6 +229,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator quickstep_utility_Macros quickstep_utility_PlanVisualizer) target_link_libraries(quickstep_queryoptimizer_QueryHandle + quickstep_catalog_CatalogTypedefs quickstep_catalog_Catalog_proto quickstep_queryexecution_QueryContext_proto quickstep_queryoptimizer_QueryPlan http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e37ec541/query_optimizer/QueryHandle.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp index 7cb4f68..6feeb4c 100644 --- a/query_optimizer/QueryHandle.hpp +++ b/query_optimizer/QueryHandle.hpp @@ -25,6 +25,7 @@ #include <memory> #include "catalog/Catalog.pb.h" +#include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryContext.pb.h" #include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "query_optimizer/QueryPlan.hpp" @@ -45,19 +46,47 @@ class CatalogRelation; **/ class QueryHandle { public: + // The info for generated queries during executing '\analyze'. + struct AnalyzeQueryInfo { + AnalyzeQueryInfo(const bool is_analyze_attribute_query_in, + const relation_id rel_id_in, + const std::size_t num_relations_in, + const attribute_id attr_id_in = kInvalidCatalogId, + const bool is_min_applicable_in = false, + const bool is_max_applicable_in = false) + : is_analyze_attribute_query(is_analyze_attribute_query_in), + rel_id(rel_id_in), + num_relations(num_relations_in), + attr_id(attr_id_in), + is_min_applicable(is_min_applicable_in), + is_max_applicable(is_max_applicable_in) {} + + const bool is_analyze_attribute_query; + const relation_id rel_id; + const std::size_t num_relations; + + // Only valid if 'is_analyze_attribute_query' is true. + const attribute_id attr_id; + const bool is_min_applicable; + const bool is_max_applicable; + }; + /** * @brief Constructor. * * @param query_id The given query id. * @param cli_id The client id of the CLI which submits the query. * @param query_priority The priority of this query. + * @param analyze_query_info The info of this analyze query. */ - explicit QueryHandle(const std::size_t query_id, - const tmb::client_id cli_id, - const std::uint64_t query_priority = 1) + QueryHandle(const std::size_t query_id, + const tmb::client_id cli_id, + const std::uint64_t query_priority = 1, + AnalyzeQueryInfo *analyze_query_info = nullptr) : query_id_(query_id), cli_id_(cli_id), query_priority_(query_priority), + analyze_query_info_(analyze_query_info), query_plan_(new QueryPlan()), query_result_relation_(nullptr) {} @@ -87,6 +116,13 @@ class QueryHandle { } /** + * @brief Get the query info for the command '\analyze'. + **/ + const AnalyzeQueryInfo* analyze_query_info() const { + return analyze_query_info_.get(); + } + + /** * @return The const query plan. */ const QueryPlan& getQueryPlan() const { @@ -165,6 +201,7 @@ class QueryHandle { const tmb::client_id cli_id_; const std::uint64_t query_priority_; + std::unique_ptr<AnalyzeQueryInfo> analyze_query_info_; std::unique_ptr<QueryPlan> query_plan_;