Repository: kudu Updated Branches: refs/heads/master efb60241b -> 4d5fb0d3d
[benchmarks/tpch] introduced AUTO_FLUSH_BACKGROUND mode Added an ability to run KuduSession in AUTO_FLUSH_BACKGROUND mode while running TPC-H benchmarks. Also did other minor code clean-up like re-ordering implementation of methods to match their declaration order, etc. Change-Id: I69d6897e9d1126270f2dc8b7d913d37e73428c1f Reviewed-on: http://gerrit.cloudera.org:8080/4024 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d5fb0d3 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d5fb0d3 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d5fb0d3 Branch: refs/heads/master Commit: 4d5fb0d3dfbb611c768d19e1ed67ee5d18668feb Parents: efb6024 Author: Alexey Serbin <[email protected]> Authored: Wed Aug 17 19:41:40 2016 -0700 Committer: Adar Dembo <[email protected]> Committed: Fri Sep 2 20:56:56 2016 +0000 ---------------------------------------------------------------------- src/kudu/benchmarks/tpch/rpc_line_item_dao.cc | 113 +++++++++++---------- src/kudu/benchmarks/tpch/rpc_line_item_dao.h | 26 ++--- src/kudu/benchmarks/tpch/tpch1.cc | 4 +- src/kudu/benchmarks/tpch/tpch_real_world.cc | 38 ++++--- 4 files changed, 99 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc ---------------------------------------------------------------------- diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc b/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc index f76af5b..ab1b357 100644 --- a/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc +++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao.cc @@ -75,7 +75,7 @@ class FlushCallback : public KuduStatusCallback { private: void BatchFinished() { int nerrs = session_->CountPendingErrors(); - if (nerrs) { + if (nerrs > 0) { LOG(WARNING) << nerrs << " errors occured during last batch."; vector<KuduError*> errors; ElementDeleter d(&errors); @@ -98,6 +98,22 @@ class FlushCallback : public KuduStatusCallback { const Slice RpcLineItemDAO::kScanUpperBound = Slice("1998-09-02"); +RpcLineItemDAO::~RpcLineItemDAO() { + FinishWriting(); +} + +RpcLineItemDAO::RpcLineItemDAO(string master_address, string table_name, + int batch_op_num_max, int timeout_ms, + vector<const KuduPartialRow*> tablet_splits) + : master_address_(std::move(master_address)), + table_name_(std::move(table_name)), + timeout_(MonoDelta::FromMilliseconds(timeout_ms)), + batch_op_num_max_(batch_op_num_max), + tablet_splits_(std::move(tablet_splits)), + batch_op_num_(0), + semaphore_(1) { +} + void RpcLineItemDAO::Init() { const KuduSchema schema = tpch::CreateLineItemSchema(); @@ -121,32 +137,23 @@ void RpcLineItemDAO::Init() { session_ = client_->NewSession(); session_->SetTimeoutMillis(timeout_.ToMilliseconds()); - CHECK_OK(session_->SetFlushMode(KuduSession::MANUAL_FLUSH)); + CHECK_OK(session_->SetFlushMode(batch_op_num_max_ > 0 + ? KuduSession::MANUAL_FLUSH + : KuduSession::AUTO_FLUSH_BACKGROUND)); } void RpcLineItemDAO::WriteLine(boost::function<void(KuduPartialRow*)> f) { gscoped_ptr<KuduInsert> insert(client_table_->NewInsert()); f(insert->mutable_row()); CHECK_OK(session_->Apply(insert.release())); - ++batch_size_; - FlushIfBufferFull(); -} - -void RpcLineItemDAO::FlushIfBufferFull() { - if (batch_size_ < batch_max_) return; - - batch_size_ = 0; - - // The callback object frees itself after it is invoked. - session_->FlushAsync(new FlushCallback(session_, &semaphore_)); + HandleLine(); } void RpcLineItemDAO::MutateLine(boost::function<void(KuduPartialRow*)> f) { gscoped_ptr<KuduUpdate> update(client_table_->NewUpdate()); f(update->mutable_row()); CHECK_OK(session_->Apply(update.release())); - ++batch_size_; - FlushIfBufferFull(); + HandleLine(); } void RpcLineItemDAO::FinishWriting() { @@ -160,21 +167,7 @@ void RpcLineItemDAO::FinishWriting() { void RpcLineItemDAO::OpenScanner(const vector<string>& columns, gscoped_ptr<Scanner>* out_scanner) { vector<KuduPredicate*> preds; - OpenScanner(columns, preds, out_scanner); -} - -void RpcLineItemDAO::OpenScanner(const vector<string>& columns, - const vector<KuduPredicate*>& preds, - gscoped_ptr<Scanner>* out_scanner) { - gscoped_ptr<Scanner> ret(new Scanner); - ret->scanner_.reset(new KuduScanner(client_table_.get())); - ret->scanner_->SetCacheBlocks(FLAGS_tpch_cache_blocks_when_scanning); - CHECK_OK(ret->scanner_->SetProjectedColumns(columns)); - for (KuduPredicate* pred : preds) { - CHECK_OK(ret->scanner_->AddConjunctPredicate(pred)); - } - CHECK_OK(ret->scanner_->Open()); - out_scanner->swap(ret); + OpenScannerImpl(columns, preds, out_scanner); } void RpcLineItemDAO::OpenTpch1Scanner(gscoped_ptr<Scanner>* out_scanner) { @@ -182,7 +175,7 @@ void RpcLineItemDAO::OpenTpch1Scanner(gscoped_ptr<Scanner>* out_scanner) { preds.push_back(client_table_->NewComparisonPredicate( tpch::kShipDateColName, KuduPredicate::LESS_EQUAL, KuduValue::CopyString(kScanUpperBound))); - OpenScanner(tpch::GetTpchQ1QueryColumns(), preds, out_scanner); + OpenScannerImpl(tpch::GetTpchQ1QueryColumns(), preds, out_scanner); } void RpcLineItemDAO::OpenTpch1ScannerForOrderKeyRange(int64_t min_key, int64_t max_key, @@ -197,7 +190,41 @@ void RpcLineItemDAO::OpenTpch1ScannerForOrderKeyRange(int64_t min_key, int64_t m preds.push_back(client_table_->NewComparisonPredicate( tpch::kOrderKeyColName, KuduPredicate::LESS_EQUAL, KuduValue::FromInt(max_key))); - OpenScanner(tpch::GetTpchQ1QueryColumns(), preds, out_scanner); + OpenScannerImpl(tpch::GetTpchQ1QueryColumns(), preds, out_scanner); +} + +bool RpcLineItemDAO::IsTableEmpty() { + KuduScanner scanner(client_table_.get()); + CHECK_OK(scanner.Open()); + return !scanner.HasMoreRows(); +} + +void RpcLineItemDAO::OpenScannerImpl(const vector<string>& columns, + const vector<KuduPredicate*>& preds, + gscoped_ptr<Scanner>* out_scanner) { + gscoped_ptr<Scanner> ret(new Scanner); + ret->scanner_.reset(new KuduScanner(client_table_.get())); + ret->scanner_->SetCacheBlocks(FLAGS_tpch_cache_blocks_when_scanning); + CHECK_OK(ret->scanner_->SetProjectedColumns(columns)); + for (KuduPredicate* pred : preds) { + CHECK_OK(ret->scanner_->AddConjunctPredicate(pred)); + } + CHECK_OK(ret->scanner_->Open()); + out_scanner->swap(ret); +} + +void RpcLineItemDAO::HandleLine() { + if (batch_op_num_max_ == 0) { + // Nothing to take care in this case because it is an AUTO_FLUSH_BACKGROUND + // session. + return; + } + if (++batch_op_num_ < batch_op_num_max_) { + return; + } + batch_op_num_ = 0; + // The callback object frees itself after it is invoked. + session_->FlushAsync(new FlushCallback(session_, &semaphore_)); } bool RpcLineItemDAO::Scanner::HasMore() { @@ -212,26 +239,4 @@ void RpcLineItemDAO::Scanner::GetNext(vector<KuduRowResult> *rows) { CHECK_OK(scanner_->NextBatch(rows)); } -bool RpcLineItemDAO::IsTableEmpty() { - KuduScanner scanner(client_table_.get()); - CHECK_OK(scanner.Open()); - return !scanner.HasMoreRows(); -} - -RpcLineItemDAO::~RpcLineItemDAO() { - FinishWriting(); -} - -RpcLineItemDAO::RpcLineItemDAO(string master_address, string table_name, - int batch_size, int mstimeout, - vector<const KuduPartialRow*> tablet_splits) - : master_address_(std::move(master_address)), - table_name_(std::move(table_name)), - timeout_(MonoDelta::FromMilliseconds(mstimeout)), - batch_max_(batch_size), - tablet_splits_(std::move(tablet_splits)), - batch_size_(0), - semaphore_(1) { -} - } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/rpc_line_item_dao.h ---------------------------------------------------------------------- diff --git a/src/kudu/benchmarks/tpch/rpc_line_item_dao.h b/src/kudu/benchmarks/tpch/rpc_line_item_dao.h index b29eef2..98924d6 100644 --- a/src/kudu/benchmarks/tpch/rpc_line_item_dao.h +++ b/src/kudu/benchmarks/tpch/rpc_line_item_dao.h @@ -38,13 +38,13 @@ class RpcLineItemDAO { RpcLineItemDAO(std::string master_address, std::string table_name, - int batch_size, - int mstimeout = 5000, + int batch_op_num_max, + int timeout_ms = 5000, std::vector<const KuduPartialRow*> tablet_splits = {}); ~RpcLineItemDAO(); + void Init(); void WriteLine(boost::function<void(KuduPartialRow*)> f); void MutateLine(boost::function<void(KuduPartialRow*)> f); - void Init(); void FinishWriting(); // Deletes previous scanner if one is open. @@ -82,21 +82,21 @@ class RpcLineItemDAO { private: static const Slice kScanUpperBound; - void FlushIfBufferFull(); - void OpenScanner(const std::vector<std::string>& columns, - const std::vector<client::KuduPredicate*>& preds, - gscoped_ptr<Scanner>* scanner); + void OpenScannerImpl(const std::vector<std::string>& columns, + const std::vector<client::KuduPredicate*>& preds, + gscoped_ptr<Scanner>* scanner); + void HandleLine(); - simple_spinlock lock_; - client::sp::shared_ptr<client::KuduClient> client_; - client::sp::shared_ptr<client::KuduSession> session_; - client::sp::shared_ptr<client::KuduTable> client_table_; const std::string master_address_; const std::string table_name_; const MonoDelta timeout_; - const int batch_max_; + const int batch_op_num_max_; const std::vector<const KuduPartialRow*> tablet_splits_; - int batch_size_; + int batch_op_num_; + simple_spinlock lock_; + client::sp::shared_ptr<client::KuduClient> client_; + client::sp::shared_ptr<client::KuduSession> session_; + client::sp::shared_ptr<client::KuduTable> client_table_; // Semaphore which restricts us to one batch at a time. Semaphore semaphore_; http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/tpch1.cc ---------------------------------------------------------------------- diff --git a/src/kudu/benchmarks/tpch/tpch1.cc b/src/kudu/benchmarks/tpch/tpch1.cc index 91d22b3..05c7b32 100644 --- a/src/kudu/benchmarks/tpch/tpch1.cc +++ b/src/kudu/benchmarks/tpch/tpch1.cc @@ -89,7 +89,9 @@ DEFINE_string(mini_cluster_base_dir, "/tmp/tpch", DEFINE_string(master_address, "localhost", "Address of master for the cluster to operate on"); DEFINE_int32(tpch_max_batch_size, 1000, - "Maximum number of inserts/updates to batch at once"); + "Maximum number of inserts/updates to batch at once. Set to 0 " + "to delegate the batching control to the logic of the " + "KuduSession running in AUTO_BACKGROUND_MODE flush mode."); DEFINE_string(table_name, "lineitem", "The table name to write/read"); http://git-wip-us.apache.org/repos/asf/kudu/blob/4d5fb0d3/src/kudu/benchmarks/tpch/tpch_real_world.cc ---------------------------------------------------------------------- diff --git a/src/kudu/benchmarks/tpch/tpch_real_world.cc b/src/kudu/benchmarks/tpch/tpch_real_world.cc index e161ded..3a622a2 100644 --- a/src/kudu/benchmarks/tpch/tpch_real_world.cc +++ b/src/kudu/benchmarks/tpch/tpch_real_world.cc @@ -38,13 +38,14 @@ // insert, so the last timing shouldn't be used. // // TODO Make the inserts multi-threaded. See Kudu-629 for the technique. -#include <boost/bind.hpp> -#include <glog/logging.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> +#include <boost/bind.hpp> +#include <glog/logging.h> + #include "kudu/benchmarks/tpch/line_item_tsv_importer.h" #include "kudu/benchmarks/tpch/rpc_line_item_dao.h" #include "kudu/benchmarks/tpch/tpch-schemas.h" @@ -69,7 +70,9 @@ DEFINE_bool(tpch_load_data, true, DEFINE_bool(tpch_run_queries, true, "Query dbgen data as it is inserted"); DEFINE_int32(tpch_max_batch_size, 1000, - "Maximum number of inserts to batch at once"); + "Maximum number of inserts/updates to batch at once. Set to 0 " + "to delegate the batching control to the logic of the " + "KuduSession running in AUTO_BACKGROUND_MODE flush mode."); DEFINE_int32(tpch_test_client_timeout_msec, 10000, "Timeout that will be used for all operations and RPCs"); DEFINE_int32(tpch_test_runtime_sec, 0, @@ -250,11 +253,12 @@ gscoped_ptr<RpcLineItemDAO> TpchRealWorld::GetInittedDAO() { split_rows.push_back(row); } - gscoped_ptr<RpcLineItemDAO> dao(new RpcLineItemDAO(master_addresses_, - FLAGS_tpch_table_name, - FLAGS_tpch_max_batch_size, - FLAGS_tpch_test_client_timeout_msec, - split_rows)); + gscoped_ptr<RpcLineItemDAO> dao( + new RpcLineItemDAO(master_addresses_, + FLAGS_tpch_table_name, + FLAGS_tpch_max_batch_size, + FLAGS_tpch_test_client_timeout_msec, + split_rows)); dao->Init(); return std::move(dao); } @@ -266,14 +270,20 @@ void TpchRealWorld::LoadLineItemsThread(int i) { boost::function<void(KuduPartialRow*)> f = boost::bind(&LineItemTsvImporter::GetNextLine, &importer, _1); - while (importer.HasNextLine() && !stop_threads_.Load()) { - dao->WriteLine(f); - int64_t current_count = rows_inserted_.Increment(); - if (current_count % 250000 == 0) { - LOG(INFO) << "Inserted " << current_count << " rows"; + const string time_spent_msg = Substitute( + "by thread $0 to load generated data into the database", i); + LOG_TIMING(INFO, time_spent_msg) { + while (importer.HasNextLine() && !stop_threads_.Load()) { + dao->WriteLine(f); + int64_t current_count = rows_inserted_.Increment(); + if (current_count % 250000 == 0) { + LOG(INFO) << "Inserted " << current_count << " rows"; + } } + dao->FinishWriting(); } - dao->FinishWriting(); + LOG(INFO) << Substitute("Thread $0 inserted ", i) + << rows_inserted_.Load() << " rows in total"; } void TpchRealWorld::MonitorDbgenThread(int i) {
