Repository: kudu Updated Branches: refs/heads/master 937064f91 -> fc50b98aa
log: improve mt-log-test benchmark - builds the entries to be appended outside of holding any lock, which is more realistic. The entries get their IDs set once inside the lock (similar to what we do in real code). - use the same higher-level APIs that the real append paths use - make the log segment size overridable - allow the "verification" step to be disabled to serve as a better benchmark - allow the concurrent "reader" thread to be disabled - a couple bug fixes here and there Change-Id: I870dc26e2937e7c92e3f0530e2c2880178507f12 Reviewed-on: http://gerrit.cloudera.org:8080/6283 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/57a26b19 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/57a26b19 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/57a26b19 Branch: refs/heads/master Commit: 57a26b199571d0bb39ede709e8ee569370340417 Parents: 937064f Author: Todd Lipcon <[email protected]> Authored: Mon Mar 6 21:48:59 2017 -0800 Committer: Todd Lipcon <[email protected]> Committed: Thu Mar 9 19:13:57 2017 +0000 ---------------------------------------------------------------------- src/kudu/consensus/mt-log-test.cc | 144 ++++++++++++++++++--------------- 1 file changed, 78 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/57a26b19/src/kudu/consensus/mt-log-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc index 160dfb9..09cd819 100644 --- a/src/kudu/consensus/mt-log-test.cc +++ b/src/kudu/consensus/mt-log-test.cc @@ -32,8 +32,10 @@ #include "kudu/util/thread.h" DEFINE_int32(num_writer_threads, 4, "Number of threads writing to the log"); +DEFINE_int32(num_reader_threads, 1, "Number of threads accessing the log while writes are ongoing"); DEFINE_int32(num_batches_per_thread, 2000, "Number of batches per thread"); DEFINE_int32(num_ops_per_batch_avg, 5, "Target average number of ops per batch"); +DEFINE_bool(verify_log, true, "Whether to verify the log by reading it after the writes complete"); namespace kudu { namespace log { @@ -80,54 +82,54 @@ class MultiThreadedLogTest : public LogTestBase { LogTestBase::SetUp(); } + vector<consensus::ReplicateRefPtr> CreateRandomBatch() { + int num_ops = static_cast<int>(random_.Normal( + static_cast<double>(FLAGS_num_ops_per_batch_avg), 1.0)); + DVLOG(1) << num_ops << " ops in this batch"; + num_ops = std::max(num_ops, 1); + vector<consensus::ReplicateRefPtr> ret; + for (int j = 0; j < num_ops; j++) { + ReplicateRefPtr replicate = make_scoped_refptr_replicate(new ReplicateMsg); + replicate->get()->set_op_type(WRITE_OP); + replicate->get()->set_timestamp(clock_->Now().ToUint64()); + tserver::WriteRequestPB* request = replicate->get()->mutable_write_request(); + AddTestRowToPB(RowOperationsPB::INSERT, schema_, 12345, 0, + "this is a test insert", + request->mutable_row_operations()); + request->set_tablet_id(kTestTablet); + ret.push_back(replicate); + } + return ret; + } + + void AssignIndexes(vector<consensus::ReplicateRefPtr>* batch) { + for (auto& rep : *batch) { + OpId* op_id = rep->get()->mutable_id(); + op_id->set_term(0); + op_id->set_index(current_index_++); + } + } + void LogWriterThread(int thread_id) { CountDownLatch latch(FLAGS_num_batches_per_thread); vector<Status> errors; for (int i = 0; i < FLAGS_num_batches_per_thread; i++) { - LogEntryBatch* entry_batch; - vector<consensus::ReplicateRefPtr> batch_replicates; - int num_ops = static_cast<int>(random_.Normal( - static_cast<double>(FLAGS_num_ops_per_batch_avg), 1.0)); - DVLOG(1) << num_ops << " ops in this batch"; - num_ops = std::max(num_ops, 1); - { - std::lock_guard<simple_spinlock> lock_guard(lock_); - for (int j = 0; j < num_ops; j++) { - ReplicateRefPtr replicate = make_scoped_refptr_replicate(new ReplicateMsg); - int32_t index = current_index_++; - OpId* op_id = replicate->get()->mutable_id(); - op_id->set_term(0); - op_id->set_index(index); - - replicate->get()->set_op_type(WRITE_OP); - replicate->get()->set_timestamp(clock_->Now().ToUint64()); - - tserver::WriteRequestPB* request = replicate->get()->mutable_write_request(); - AddTestRowToPB(RowOperationsPB::INSERT, schema_, index, 0, - "this is a test insert", - request->mutable_row_operations()); - request->set_tablet_id(kTestTablet); - batch_replicates.push_back(replicate); - } - - gscoped_ptr<log::LogEntryBatchPB> entry_batch_pb; - CreateBatchFromAllocatedOperations(batch_replicates, - &entry_batch_pb); - - ASSERT_OK(log_->Reserve(REPLICATE, std::move(entry_batch_pb), &entry_batch)); - } // lock_guard scope + // Do the expensive allocation outside the lock. + vector<consensus::ReplicateRefPtr> batch_replicates = CreateRandomBatch(); auto cb = new CustomLatchCallback(&latch, &errors); - entry_batch->SetReplicates(batch_replicates); - log_->AsyncAppend(entry_batch, cb->AsStatusCallback()); - } - LOG_TIMING(INFO, strings::Substitute("thread $0 waiting to append and sync $1 batches", - thread_id, FLAGS_num_batches_per_thread)) { - latch.Wait(); + // Assign indexes and append inside the lock, so that the index order and + // log order match up. + { + std::lock_guard<simple_spinlock> l(lock_); + AssignIndexes(&batch_replicates); + ASSERT_OK(log_->AsyncAppendReplicates(batch_replicates, cb->AsStatusCallback())); + } } + latch.Wait(); for (const Status& status : errors) { WARN_NOT_OK(status, "Unexpected failure during AsyncAppend"); } - ASSERT_EQ(0, errors.size()); + CHECK_EQ(0, errors.size()); } void Run() { @@ -141,15 +143,18 @@ class MultiThreadedLogTest : public LogTestBase { // Start a thread which calls some read-only methods on the log // to check for races against writers. std::atomic<bool> stop_reader(false); - std::thread reader_thread([&]() { - std::map<int64_t, int64_t> map; - OpId opid; - while (!stop_reader) { - log_->GetLatestEntryOpId(&opid); - log_->GetReplaySizeMap(&map); - IgnoreResult(log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread))); - } - }); + vector<std::thread> reader_threads; + for (int i = 0; i < FLAGS_num_reader_threads; i++) { + reader_threads.emplace_back([&]() { + std::map<int64_t, int64_t> map; + OpId opid; + while (!stop_reader) { + log_->GetLatestEntryOpId(&opid); + log_->GetReplaySizeMap(&map); + IgnoreResult(log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread))); + } + }); + } // Wait for the writers to finish. for (scoped_refptr<kudu::Thread>& thread : threads_) { @@ -158,7 +163,9 @@ class MultiThreadedLogTest : public LogTestBase { // Then stop the reader and join on it as well. stop_reader = true; - reader_thread.join(); + for (auto& t : reader_threads) { + t.join(); + } } private: ThreadSafeRandom random_; @@ -167,31 +174,36 @@ class MultiThreadedLogTest : public LogTestBase { }; TEST_F(MultiThreadedLogTest, TestAppends) { - // Roll frequently to stress related code paths. - options_.segment_size_mb = 1; + // Roll frequently to stress related code paths, unless overridden + // on the command line. + if (google::GetCommandLineFlagInfoOrDie("log_segment_size_mb").is_default) { + options_.segment_size_mb = 1; + } ASSERT_OK(BuildLog()); int start_current_id = current_index_; LOG_TIMING(INFO, strings::Substitute("inserting $0 batches($1 threads, $2 per-thread)", - FLAGS_num_writer_threads * FLAGS_num_batches_per_thread, - FLAGS_num_batches_per_thread, FLAGS_num_writer_threads)) { + FLAGS_num_writer_threads * FLAGS_num_batches_per_thread, + FLAGS_num_writer_threads, + FLAGS_num_batches_per_thread)) { ASSERT_NO_FATAL_FAILURE(Run()); } ASSERT_OK(log_->Close()); - - shared_ptr<LogReader> reader; - ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader)); - SegmentSequence segments; - ASSERT_OK(reader->GetSegmentsSnapshot(&segments)); - - for (const SegmentSequence::value_type& entry : segments) { - ASSERT_OK(entry->ReadEntries(&entries_)); + if (FLAGS_verify_log) { + shared_ptr<LogReader> reader; + ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader)); + SegmentSequence segments; + ASSERT_OK(reader->GetSegmentsSnapshot(&segments)); + + for (const SegmentSequence::value_type& entry : segments) { + ASSERT_OK(entry->ReadEntries(&entries_)); + } + vector<uint32_t> ids; + EntriesToIdList(&ids); + DVLOG(1) << "Wrote total of " << current_index_ - start_current_id << " ops"; + ASSERT_EQ(current_index_ - start_current_id, ids.size()); + ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end())); } - vector<uint32_t> ids; - EntriesToIdList(&ids); - DVLOG(1) << "Wrote total of " << current_index_ - start_current_id << " ops"; - ASSERT_EQ(current_index_ - start_current_id, ids.size()); - ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end())); } } // namespace log
