Added the preload support in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c9be13b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c9be13b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c9be13b7 Branch: refs/heads/LIP-time-decomposition Commit: c9be13b7b1fa47bba65cbd40452013d2a45087a0 Parents: 0528c77 Author: Zuyu Zhang <zu...@apache.org> Authored: Fri Mar 10 01:45:06 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Fri Mar 10 01:45:06 2017 -0800 ---------------------------------------------------------------------- catalog/CatalogDatabase.cpp | 2 +- query_execution/CMakeLists.txt | 3 +++ query_execution/ForemanDistributed.cpp | 1 + query_execution/QueryExecutionMessages.proto | 1 + query_execution/Shiftboss.cpp | 20 ++++++++++++++++++++ 5 files changed, 26 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9be13b7/catalog/CatalogDatabase.cpp ---------------------------------------------------------------------- diff --git a/catalog/CatalogDatabase.cpp b/catalog/CatalogDatabase.cpp index c95196c..2200230 100644 --- a/catalog/CatalogDatabase.cpp +++ b/catalog/CatalogDatabase.cpp @@ -145,7 +145,7 @@ serialization::CatalogDatabase CatalogDatabase::getProto() const { if (it.isNull()) { proto.add_null_relations(i); } else { - proto.add_relations()->CopyFrom(it->getProto()); + proto.add_relations()->MergeFrom(it->getProto()); } } return proto; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9be13b7/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 82ff183..d81ab44 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -298,8 +298,10 @@ target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_Shiftboss glog + quickstep_catalog_CatalogDatabase quickstep_catalog_CatalogDatabaseCache quickstep_catalog_CatalogTypedefs + quickstep_cli_Flags quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs @@ -310,6 +312,7 @@ if (ENABLE_DISTRIBUTED) quickstep_relationaloperators_WorkOrderFactory quickstep_storage_Flags quickstep_storage_InsertDestination + quickstep_storage_PreloaderThread quickstep_storage_StorageBlock quickstep_storage_StorageManager quickstep_threading_Thread http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9be13b7/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 3903e8a..06fb5a1 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -425,6 +425,7 @@ void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shi const std::size_t work_order_capacity) { S::ShiftbossRegistrationResponseMessage proto; proto.set_shiftboss_index(shiftboss_directory_.size()); + proto.mutable_catalog_database()->MergeFrom(static_cast<CatalogDatabase*>(catalog_database_)->getProto()); const size_t proto_length = proto.ByteSize(); char *proto_bytes = static_cast<char*>(malloc(proto_length)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9be13b7/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index dd3c9a7..e8f102a 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -79,6 +79,7 @@ message ShiftbossRegistrationMessage { message ShiftbossRegistrationResponseMessage { required uint64 shiftboss_index = 1; + required CatalogDatabase catalog_database = 2; } message SqlQueryMessage { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9be13b7/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index e227385..01c81b2 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -19,7 +19,9 @@ #include "query_execution/Shiftboss.hpp" +#include <chrono> #include <cstddef> +#include <cstdio> #include <cstdlib> #include <memory> #include <string> @@ -27,7 +29,9 @@ #include <utility> #include <vector> +#include "catalog/CatalogDatabase.hpp" #include "catalog/CatalogTypedefs.hpp" +#include "cli/Flags.hpp" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionTypedefs.hpp" @@ -36,6 +40,7 @@ #include "relational_operators/RebuildWorkOrder.hpp" #include "relational_operators/WorkOrderFactory.hpp" #include "storage/InsertDestination.hpp" +#include "storage/PreloaderThread.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageManager.hpp" #include "threading/ThreadUtil.hpp" @@ -51,6 +56,7 @@ using std::free; using std::malloc; using std::move; +using std::printf; using std::size_t; using std::string; using std::unique_ptr; @@ -333,6 +339,20 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() { shiftboss_index_ = proto.shiftboss_index(); storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_); + if (FLAGS_preload_buffer_pool) { + const CatalogDatabase catalog_database(proto.catalog_database()); + + PreloaderThread preloader(catalog_database, storage_manager_, cpu_id_); + + printf("Preloading the buffer pool ... \n"); + const std::chrono::time_point<std::chrono::steady_clock> preload_start = std::chrono::steady_clock::now(); + preloader.start(); + preloader.join(); + const std::chrono::time_point<std::chrono::steady_clock> preload_end = std::chrono::steady_clock::now(); + printf("in %g seconds\n", + std::chrono::duration<double>(preload_end - preload_start).count()); + } + // Forward this message to Workers regarding <shiftboss_index_>. QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_local_, worker_addresses_,