Repository: kudu Updated Branches: refs/heads/master 7aab411d3 -> ccdb6b557
[tablet] clean-up on replay of WAL entries Simplified handling of the ownership of log entries read from disk. This changelist also contains other changes in tablet_bootstrap.cc and corresponding WAL utilities: * reduced number of memory allocations while reading WAL entries * replaced gscoped_ptr with std::unique_ptr * other minor updates around WAL-related utilities and tests Change-Id: Ie39516b09e4c756e8af1d8e1a5604672c96a80cb Reviewed-on: http://gerrit.cloudera.org:8080/11032 Tested-by: Alexey Serbin <aser...@cloudera.com> Reviewed-by: Adar Dembo <a...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ccdb6b55 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ccdb6b55 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ccdb6b55 Branch: refs/heads/master Commit: ccdb6b557d440423843952cace3e7d351a743fa6 Parents: 7aab411 Author: Alexey Serbin <aser...@cloudera.com> Authored: Thu Jul 19 17:49:17 2018 -0700 Committer: Alexey Serbin <aser...@cloudera.com> Committed: Tue Jul 24 20:24:57 2018 +0000 ---------------------------------------------------------------------- src/kudu/consensus/log-test-base.h | 7 +- src/kudu/consensus/log-test.cc | 41 ++- src/kudu/consensus/log_reader.cc | 6 +- src/kudu/consensus/log_reader.h | 3 +- src/kudu/consensus/log_util.cc | 49 +-- src/kudu/consensus/log_util.h | 13 +- .../consensus/raft_consensus_quorum-test.cc | 46 ++- src/kudu/integration-tests/log_verifier.cc | 6 +- src/kudu/tablet/tablet_bootstrap.cc | 304 ++++++++++--------- src/kudu/tools/tool_action_common.cc | 14 +- src/kudu/tools/tool_action_local_replica.cc | 6 +- 11 files changed, 256 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log-test-base.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h index 987b769..89b39d0 100644 --- a/src/kudu/consensus/log-test-base.h +++ b/src/kudu/consensus/log-test-base.h @@ -22,6 +22,7 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include <memory> #include <string> #include <utility> #include <vector> @@ -32,6 +33,7 @@ #include "kudu/common/wire_protocol-test-util.h" #include "kudu/consensus/log_anchor_registry.h" #include "kudu/consensus/log_reader.h" +#include "kudu/consensus/log_util.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/consensus/opid_util.h" #include "kudu/fs/fs_manager.h" @@ -164,7 +166,6 @@ class LogTestBase : public KuduTest { void TearDown() override { KuduTest::TearDown(); - STLDeleteElements(&entries_); } Status BuildLog() { @@ -196,7 +197,7 @@ class LogTestBase : public KuduTest { } void EntriesToIdList(std::vector<uint32_t>* ids) { - for (const LogEntryPB* entry : entries_) { + for (const auto& entry : entries_) { VLOG(2) << "Entry contents: " << pb_util::SecureDebugString(*entry); if (entry->type() == REPLICATE) { ids->push_back(entry->replicate().id().index()); @@ -381,7 +382,7 @@ class LogTestBase : public KuduTest { int64_t current_index_; LogOptions options_; // Reusable entries vector that deletes the entries on destruction. - std::vector<LogEntryPB* > entries_; + LogEntries entries_; scoped_refptr<LogAnchorRegistry> log_anchor_registry_; scoped_refptr<clock::Clock> clock_; }; http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc index e92f112..ebb4aad 100644 --- a/src/kudu/consensus/log-test.cc +++ b/src/kudu/consensus/log-test.cc @@ -193,11 +193,10 @@ TEST_P(LogTestOptionalCompression, TestMultipleEntriesInABatch) { // RollOver() the batch so that we have a properly formed footer. ASSERT_OK(log_->AllocateSegmentAndRollOver()); - vector<LogEntryPB*> entries; - ElementDeleter deleter(&entries); SegmentSequence segments; ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments)); + LogEntries entries; ASSERT_OK(segments[0]->ReadEntries(&entries)); ASSERT_EQ(2, entries.size()); @@ -285,14 +284,13 @@ TEST_P(LogTestOptionalCompression, TestLogNotTrimmed) { AppendNoOp(&opid); - vector<LogEntryPB*> entries; - ElementDeleter deleter(&entries); SegmentSequence segments; ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments)); + LogEntries entries; ASSERT_OK(segments[0]->ReadEntries(&entries)); // Close after testing to ensure correct shutdown - // TODO : put this in TearDown() with a test on log state? + // TODO(unknown): put this in TearDown() with a test on log state? ASSERT_OK(log_->Close()); } @@ -306,15 +304,14 @@ TEST_P(LogTestOptionalCompression, TestBlankLogFile) { ASSERT_EQ(log_->reader()->num_segments(), 1); // ...and we're able to read from it. - vector<LogEntryPB*> entries; - ElementDeleter deleter(&entries); SegmentSequence segments; ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments)); + LogEntries entries; ASSERT_OK(segments[0]->ReadEntries(&entries)); // ...It's just that it's empty. - ASSERT_EQ(entries.size(), 0); + ASSERT_TRUE(entries.empty()); } void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place, @@ -354,7 +351,7 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place, ASSERT_OK(reader->GetSegmentsSnapshot(&segments)); Status s = segments[0]->ReadEntries(&entries_); ASSERT_EQ(s.CodeAsString(), expected_status.CodeAsString()) - << "Got unexpected status: " << s.ToString(); + << "Got unexpected status: " << s.ToString(); // Last entry is ignored, but we should still see the previous ones. ASSERT_EQ(expected_entries, entries_.size()); @@ -440,12 +437,11 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) { ASSERT_GT(header_size, 0); readable_segment->UpdateReadableToOffset(header_size); - vector<LogEntryPB*> entries; - // Reading the readable segment now should return OK but yield no // entries. + LogEntries entries; ASSERT_OK(readable_segment->ReadEntries(&entries)); - ASSERT_EQ(entries.size(), 0); + ASSERT_TRUE(entries.empty()); // Dummy add_entry to help us estimate the size of what // gets written to disk. @@ -469,18 +465,18 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) { // Updating the readable segment with the offset of the first entry should // make it read a single entry even though there are several in the log. readable_segment->UpdateReadableToOffset(header_size + single_entry_size); + entries.clear(); ASSERT_OK(readable_segment->ReadEntries(&entries)); - ASSERT_EQ(entries.size(), 1); - STLDeleteElements(&entries); + ASSERT_EQ(1, entries.size()); // Now append another entry so that the Log sets the correct readable offset // on the reader. ASSERT_OK(AppendNoOps(&op_id, 1, &written_entries_size)); // Now the reader should be able to read all 5 entries. + entries.clear(); ASSERT_OK(readable_segment->ReadEntries(&entries)); - ASSERT_EQ(entries.size(), 5); - STLDeleteElements(&entries); + ASSERT_EQ(5, entries.size()); // Offset should get updated for an additional entry. ASSERT_EQ(single_entry_size * (kNumEntries + 1) + header_size, @@ -495,11 +491,11 @@ TEST_F(LogTest, TestWriteAndReadToAndFromInProgressSegment) { // Now that we closed the original segment. If we get a segment from the reader // again, we should get one with a footer and we should be able to read all entries. ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(segments.size(), 2); + ASSERT_EQ(2, segments.size()); readable_segment = segments[0]; + entries.clear(); ASSERT_OK(readable_segment->ReadEntries(&entries)); - ASSERT_EQ(entries.size(), 5); - STLDeleteElements(&entries); + ASSERT_EQ(5, entries.size()); // Offset should get updated for an additional entry, again. ASSERT_OK(AppendNoOp(&op_id, &written_entries_size)); @@ -653,7 +649,7 @@ TEST_P(LogTestOptionalCompression, TestWaitUntilAllFlushed) { ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments)); ASSERT_OK(segments[0]->ReadEntries(&entries_)); - ASSERT_EQ(entries_.size(), 4); + ASSERT_EQ(4, entries_.size()); for (int i = 0; i < 4 ; i++) { if (i % 2 == 0) { ASSERT_TRUE(entries_[i]->has_replicate()); @@ -761,7 +757,7 @@ TEST_P(LogTestOptionalCompression, TestWriteManyBatches) { ASSERT_OK(reader->GetSegmentsSnapshot(&segments)); for (const scoped_refptr<ReadableLogSegment>& entry : segments) { - STLDeleteElements(&entries_); + entries_.clear(); ASSERT_OK(entry->ReadEntries(&entries_)); num_entries += entries_.size(); } @@ -817,8 +813,7 @@ TEST_P(LogTestOptionalCompression, TestLogReaderReturnsLatestSegmentIfIndexEmpty ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments)); ASSERT_EQ(segments.size(), 1); - vector<LogEntryPB*> entries; - ElementDeleter deleter(&entries); + LogEntries entries; ASSERT_OK(segments[0]->ReadEntries(&entries)); ASSERT_EQ(2, entries.size()); } http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log_reader.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc index 581f3ca..9fd09b5 100644 --- a/src/kudu/consensus/log_reader.cc +++ b/src/kudu/consensus/log_reader.cc @@ -18,6 +18,7 @@ #include "kudu/consensus/log_reader.h" #include <algorithm> +#include <memory> #include <mutex> #include <ostream> @@ -57,6 +58,7 @@ using kudu::pb_util::SecureDebugString; using kudu::pb_util::SecureShortDebugString; using std::shared_ptr; using std::string; +using std::unique_ptr; using std::vector; using strings::Substitute; @@ -235,7 +237,7 @@ scoped_refptr<ReadableLogSegment> LogReader::GetSegmentBySequenceNumber(int64_t Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry, faststring* tmp_buf, - gscoped_ptr<LogEntryBatchPB>* batch) const { + unique_ptr<LogEntryBatchPB>* batch) const { const int64_t index = index_entry.op_id.index(); scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber( @@ -281,7 +283,7 @@ Status LogReader::ReadReplicatesInRange(int64_t starting_at, int64_t total_size = 0; bool limit_exceeded = false; faststring tmp_buf; - gscoped_ptr<LogEntryBatchPB> batch; + unique_ptr<LogEntryBatchPB> batch; for (int64_t index = starting_at; index <= up_to && !limit_exceeded; index++) { LogIndexEntry index_entry; RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry), http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log_reader.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h index e9c5e84..7d6ae2f 100644 --- a/src/kudu/consensus/log_reader.h +++ b/src/kudu/consensus/log_reader.h @@ -25,7 +25,6 @@ #include <gtest/gtest_prod.h> #include "kudu/consensus/log_util.h" -#include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" #include "kudu/util/locks.h" @@ -170,7 +169,7 @@ class LogReader : public enable_make_shared<LogReader> { // 'tmp_buf' is used as scratch space to avoid extra allocation. Status ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry, faststring* tmp_buf, - gscoped_ptr<LogEntryBatchPB>* batch) const; + std::unique_ptr<LogEntryBatchPB>* batch) const; // Reads the headers of all segments in 'tablet_wal_path'. Status Init(const std::string& tablet_wal_path); http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc index 0f0fd23..3680fd3 100644 --- a/src/kudu/consensus/log_util.cc +++ b/src/kudu/consensus/log_util.cc @@ -20,6 +20,7 @@ #include <algorithm> #include <cstring> #include <iostream> +#include <memory> #include <gflags/gflags.h> #include <glog/logging.h> @@ -65,10 +66,7 @@ DEFINE_double(fault_crash_before_write_log_segment_header, 0.0, "Fraction of the time we will crash just before writing the log segment header"); TAG_FLAG(fault_crash_before_write_log_segment_header, unsafe); -namespace kudu { -namespace log { - -using consensus::OpId; +using kudu::consensus::OpId; using std::string; using std::shared_ptr; using std::vector; @@ -76,6 +74,9 @@ using std::unique_ptr; using strings::Substitute; using strings::SubstituteAndAppend; +namespace kudu { +namespace log { + const char kLogSegmentHeaderMagicString[] = "kudulogf"; // A magic that is written as the very last thing when a segment is closed. @@ -128,7 +129,7 @@ LogEntryReader::LogEntryReader(ReadableLogSegment* seg) LogEntryReader::~LogEntryReader() {} -Status LogEntryReader::ReadNextEntry(LogEntryPB* entry) { +Status LogEntryReader::ReadNextEntry(unique_ptr<LogEntryPB>* entry) { // Refill pending_entries_ if none are available. while (pending_entries_.empty()) { @@ -145,7 +146,7 @@ Status LogEntryReader::ReadNextEntry(LogEntryPB* entry) { } // We still expect to have more entries in the log. - gscoped_ptr<LogEntryBatchPB> current_batch; + unique_ptr<LogEntryBatchPB> current_batch; // Read and validate the entry header first. Status s; @@ -182,7 +183,7 @@ Status LogEntryReader::ReadNextEntry(LogEntryPB* entry) { 0, current_batch->entry_size(), nullptr); } - pending_entries_[0]->Swap(entry); + *entry = std::move(pending_entries_.front()); pending_entries_.pop_front(); return Status::OK(); } @@ -372,14 +373,15 @@ Status ReadableLogSegment::RebuildFooterByScanning() { LogSegmentFooterPB new_footer; int num_entries = 0; - LogEntryPB entry; while (true) { + unique_ptr<LogEntryPB> entry; Status s = reader.ReadNextEntry(&entry); if (s.IsEndOfFile()) break; RETURN_NOT_OK(s); - if (entry.has_replicate()) { - UpdateFooterForReplicateEntry(entry, &new_footer); + DCHECK(entry); + if (entry->has_replicate()) { + UpdateFooterForReplicateEntry(*entry, &new_footer); } num_entries++; } @@ -545,17 +547,20 @@ Status ReadableLogSegment::ParseFooterMagicAndFooterLength(const Slice &data, return Status::OK(); } -Status ReadableLogSegment::ReadEntries(vector<LogEntryPB*>* entries) { +Status ReadableLogSegment::ReadEntries(LogEntries* entries) { TRACE_EVENT1("log", "ReadableLogSegment::ReadEntries", "path", path_); LogEntryReader reader(this); while (true) { - unique_ptr<LogEntryPB> entry(new LogEntryPB()); - Status s = reader.ReadNextEntry(entry.get()); - if (s.IsEndOfFile()) break; + unique_ptr<LogEntryPB> entry; + Status s = reader.ReadNextEntry(&entry); + if (s.IsEndOfFile()) { + break; + } RETURN_NOT_OK(s); - entries->push_back(entry.release()); + DCHECK(entry); + entries->emplace_back(std::move(entry)); } return Status::OK(); @@ -573,8 +578,8 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t offset, bool* has_va << "following offset " << offset << "..."; *has_valid_entries = false; - const int kChunkSize = 1024 * 1024; - gscoped_ptr<uint8_t[]> buf(new uint8_t[kChunkSize]); + constexpr auto kChunkSize = 1024 * 1024; + unique_ptr<uint8_t[]> buf(new uint8_t[kChunkSize]); // We overlap the reads by the size of the header, so that if a header // spans chunks, we don't miss it. @@ -611,7 +616,7 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t offset, bool* has_va } Status ReadableLogSegment::ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf, - gscoped_ptr<LogEntryBatchPB>* batch, + unique_ptr<LogEntryBatchPB>* batch, EntryHeaderStatus* status_detail) { int64_t cur_offset = *offset; EntryHeader header; @@ -681,10 +686,10 @@ EntryHeaderStatus ReadableLogSegment::DecodeEntryHeader( } -Status ReadableLogSegment::ReadEntryBatch(int64_t *offset, +Status ReadableLogSegment::ReadEntryBatch(int64_t* offset, const EntryHeader& header, - faststring *tmp_buf, - gscoped_ptr<LogEntryBatchPB> *entry_batch) { + faststring* tmp_buf, + unique_ptr<LogEntryBatchPB>* entry_batch) { TRACE_EVENT2("log", "ReadableLogSegment::ReadEntryBatch", "path", path_, "range", Substitute("offset=$0 entry_len=$1", @@ -733,7 +738,7 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t *offset, entry_batch_slice = Slice(uncompress_buf, header.msg_length); } - gscoped_ptr<LogEntryBatchPB> read_entry_batch(new LogEntryBatchPB()); + unique_ptr<LogEntryBatchPB> read_entry_batch(new LogEntryBatchPB); s = pb_util::ParseFromArray(read_entry_batch.get(), entry_batch_slice.data(), header.msg_length); http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log_util.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h index af81c82..9d504f3 100644 --- a/src/kudu/consensus/log_util.h +++ b/src/kudu/consensus/log_util.h @@ -32,7 +32,6 @@ #include "kudu/consensus/log.pb.h" #include "kudu/consensus/opid.pb.h" #include "kudu/consensus/ref_counted_replicate.h" -#include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/ref_counted.h" #include "kudu/util/atomic.h" @@ -56,6 +55,8 @@ extern const size_t kEntryHeaderSizeV2; class ReadableLogSegment; +typedef std::vector<std::unique_ptr<LogEntryPB>> LogEntries; + // Options for the State Machine/Write Ahead Log struct LogOptions { @@ -108,7 +109,7 @@ class LogEntryReader { // Read the next entry from the log, replacing the contents of 'entry'. // // When there are no more entries to read, returns Status::EndOfFile(). - Status ReadNextEntry(LogEntryPB* entry); + Status ReadNextEntry(std::unique_ptr<LogEntryPB>* entry); // Return the offset of the next entry to be read from the file. int64_t offset() const { @@ -206,7 +207,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> { // If the log is corrupted (i.e. the returned 'Status' is 'Corruption') all // the log entries read up to the corrupted one are returned in the 'entries' // vector. - Status ReadEntries(std::vector<LogEntryPB*>* entries); + Status ReadEntries(LogEntries* entries); // Rebuilds this segment's footer by scanning its entries. // This is an expensive operation as it reads and parses the whole segment @@ -337,7 +338,7 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> { // If unsuccessful, '*offset' is not updated, and *status_detail will be updated // to indicate the cause of the error. Status ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf, - gscoped_ptr<LogEntryBatchPB>* batch, + std::unique_ptr<LogEntryBatchPB>* batch, EntryHeaderStatus* status_detail); // Reads a log entry header from the segment. @@ -357,10 +358,10 @@ class ReadableLogSegment : public RefCountedThreadSafe<ReadableLogSegment> { // Reads a log entry batch from the provided readable segment, which gets decoded // into 'entry_batch' and increments 'offset' by the batch's length. - Status ReadEntryBatch(int64_t *offset, + Status ReadEntryBatch(int64_t* offset, const EntryHeader& header, faststring* tmp_buf, - gscoped_ptr<LogEntryBatchPB>* entry_batch); + std::unique_ptr<LogEntryBatchPB>* entry_batch); void UpdateReadableToOffset(int64_t readable_to_offset); http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/raft_consensus_quorum-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index dc6cdb9..a6dbab0 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -80,13 +80,14 @@ DECLARE_bool(enable_leader_failure_detection); METRIC_DECLARE_entity(tablet); -using kudu::pb_util::SecureShortDebugString; using kudu::log::Log; using kudu::log::LogEntryPB; using kudu::log::LogOptions; using kudu::log::LogReader; +using kudu::pb_util::SecureShortDebugString; using std::shared_ptr; using std::string; +using std::unique_ptr; using std::vector; using strings::Substitute; using strings::SubstituteAndAppend; @@ -115,6 +116,8 @@ Status WaitUntilLeaderForTests(RaftConsensus* raft) { // without integrating with other components, such as transactions. class RaftConsensusQuorumTest : public KuduTest { public: + typedef vector<unique_ptr<LogEntryPB>> LogEntries; + RaftConsensusQuorumTest() : clock_(clock::LogicalClock::CreateStartingAt(Timestamp(1))), metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-test")), @@ -360,11 +363,9 @@ class RaftConsensusQuorumTest : public KuduTest { } // Gather the replica and leader operations for printing - vector<LogEntryPB*> replica_ops; - ElementDeleter repl0_deleter(&replica_ops); + LogEntries replica_ops; GatherLogEntries(peer_idx, logs_[peer_idx], &replica_ops); - vector<LogEntryPB*> leader_ops; - ElementDeleter leader_deleter(&leader_ops); + LogEntries leader_ops; GatherLogEntries(leader_idx, logs_[leader_idx], &leader_ops); SCOPED_TRACE(PrintOnError(replica_ops, Substitute("local peer ($0)", peer->peer_uuid()))); SCOPED_TRACE(PrintOnError(leader_ops, Substitute("leader (peer-$0)", leader_idx))); @@ -423,7 +424,8 @@ class RaftConsensusQuorumTest : public KuduTest { } } - void GatherLogEntries(int idx, const scoped_refptr<Log>& log, vector<LogEntryPB* >* entries) { + void GatherLogEntries(int idx, const scoped_refptr<Log>& log, + LogEntries* entries) { ASSERT_OK(log->WaitUntilAllFlushed()); log->Close(); shared_ptr<LogReader> log_reader; @@ -432,16 +434,14 @@ class RaftConsensusQuorumTest : public KuduTest { kTestTablet, metric_entity_.get(), &log_reader)); - vector<LogEntryPB*> ret; - ElementDeleter deleter(&ret); log::SegmentSequence segments; ASSERT_OK(log_reader->GetSegmentsSnapshot(&segments)); + LogEntries ret; for (const log::SegmentSequence::value_type& entry : segments) { ASSERT_OK(entry->ReadEntries(&ret)); } - - entries->swap(ret); + *entries = std::move(ret); } // Verifies that the replica's log match the leader's. This deletes the @@ -460,15 +460,13 @@ class RaftConsensusQuorumTest : public KuduTest { entry.second->Shutdown(); } - vector<LogEntryPB*> leader_entries; - ElementDeleter leader_entry_deleter(&leader_entries); + LogEntries leader_entries; GatherLogEntries(leader_idx, logs_[leader_idx], &leader_entries); shared_ptr<RaftConsensus> leader; CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; replica_idx++) { - vector<LogEntryPB*> replica_entries; - ElementDeleter replica_entry_deleter(&replica_entries); + LogEntries replica_entries; GatherLogEntries(replica_idx, logs_[replica_idx], &replica_entries); shared_ptr<RaftConsensus> replica; @@ -480,18 +478,18 @@ class RaftConsensusQuorumTest : public KuduTest { } } - void ExtractReplicateIds(const vector<LogEntryPB*>& entries, + void ExtractReplicateIds(const LogEntries& entries, vector<OpId>* ids) { ids->reserve(entries.size() / 2); - for (const LogEntryPB* entry : entries) { + for (const auto& entry : entries) { if (entry->has_replicate()) { ids->push_back(entry->replicate().id()); } } } - void VerifyReplicateOrderMatches(const vector<LogEntryPB*>& leader_entries, - const vector<LogEntryPB*>& replica_entries) { + void VerifyReplicateOrderMatches(const LogEntries& leader_entries, + const LogEntries& replica_entries) { vector<OpId> leader_ids, replica_ids; ExtractReplicateIds(leader_entries, &leader_ids); ExtractReplicateIds(replica_entries, &replica_ids); @@ -502,10 +500,10 @@ class RaftConsensusQuorumTest : public KuduTest { } } - void VerifyNoCommitsBeforeReplicates(const vector<LogEntryPB*>& entries) { + void VerifyNoCommitsBeforeReplicates(const LogEntries& entries) { std::unordered_set<OpId, OpIdHashFunctor, OpIdEqualsFunctor> replication_ops; - for (const LogEntryPB* entry : entries) { + for (const auto& entry : entries) { if (entry->has_replicate()) { ASSERT_TRUE(InsertIfNotPresent(&replication_ops, entry->replicate().id())) << "REPLICATE op id showed up twice: " << SecureShortDebugString(*entry); @@ -516,8 +514,8 @@ class RaftConsensusQuorumTest : public KuduTest { } } - void VerifyReplica(const vector<LogEntryPB*>& leader_entries, - const vector<LogEntryPB*>& replica_entries, + void VerifyReplica(const LogEntries& leader_entries, + const LogEntries& replica_entries, const string& leader_name, const string& replica_name) { SCOPED_TRACE(PrintOnError(leader_entries, Substitute("Leader: $0", leader_name))); @@ -532,12 +530,12 @@ class RaftConsensusQuorumTest : public KuduTest { VerifyNoCommitsBeforeReplicates(leader_entries); } - string PrintOnError(const vector<LogEntryPB*>& replica_entries, + string PrintOnError(const LogEntries& replica_entries, const string& replica_id) { string ret = ""; SubstituteAndAppend(&ret, "$1 log entries for replica $0:\n", replica_id, replica_entries.size()); - for (LogEntryPB* replica_entry : replica_entries) { + for (const auto& replica_entry : replica_entries) { StrAppend(&ret, "Replica log entry: ", SecureShortDebugString(*replica_entry), "\n"); } return ret; http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/integration-tests/log_verifier.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/log_verifier.cc b/src/kudu/integration-tests/log_verifier.cc index beeb546..e6bf62e 100644 --- a/src/kudu/integration-tests/log_verifier.cc +++ b/src/kudu/integration-tests/log_verifier.cc @@ -79,15 +79,15 @@ Status LogVerifier::ScanForCommittedOpIds(int ts_idx, const string& tablet_id, scoped_refptr<MetricEntity>(), &reader)); log::SegmentSequence segs; RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segs)); - log::LogEntryPB entry; + unique_ptr<log::LogEntryPB> entry; for (const auto& seg : segs) { log::LogEntryReader reader(seg.get()); while (true) { Status s = reader.ReadNextEntry(&entry); if (s.IsEndOfFile() || s.IsCorruption()) break; RETURN_NOT_OK(s); - if (entry.type() != log::COMMIT) continue; - const auto& op_id = entry.commit().commited_op_id(); + if (entry->type() != log::COMMIT) continue; + const auto& op_id = entry->commit().commited_op_id(); if (!InsertIfNotPresent(index_to_term, op_id.index(), op_id.term())) { return Status::Corruption(Substitute( http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/tablet/tablet_bootstrap.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc index fcc472a..670d6ee 100644 --- a/src/kudu/tablet/tablet_bootstrap.cc +++ b/src/kudu/tablet/tablet_bootstrap.cc @@ -59,7 +59,6 @@ #include "kudu/gutil/map-util.h" #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" -#include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/human_readable.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/rpc/result_tracker.h" @@ -88,6 +87,7 @@ #include "kudu/util/monotime.h" #include "kudu/util/path_util.h" #include "kudu/util/pb_util.h" +#include "kudu/util/scoped_cleanup.h" #include "kudu/util/stopwatch.h" DECLARE_int32(group_commit_queue_size_bytes); @@ -100,36 +100,34 @@ TAG_FLAG(fault_crash_during_log_replay, unsafe); DECLARE_int32(max_clock_sync_error_usec); -namespace kudu { -namespace tablet { - -using clock::Clock; -using consensus::ALTER_SCHEMA_OP; -using consensus::CHANGE_CONFIG_OP; -using consensus::CommitMsg; -using consensus::ConsensusBootstrapInfo; -using consensus::MinimumOpId; -using consensus::NO_OP; -using consensus::OpId; -using consensus::OpIdEquals; -using consensus::OpIdEqualsFunctor; -using consensus::OpIdHashFunctor; -using consensus::OpIdToString; -using consensus::OperationType; -using consensus::OperationType_Name; -using consensus::RaftConfigPB; -using consensus::ReplicateMsg; -using consensus::WRITE_OP; -using log::Log; -using log::LogAnchorRegistry; -using log::LogEntryPB; -using log::LogIndex; -using log::LogOptions; -using log::LogReader; -using log::ReadableLogSegment; -using rpc::ResultTracker; -using pb_util::SecureDebugString; -using pb_util::SecureShortDebugString; +using kudu::clock::Clock; +using kudu::consensus::ALTER_SCHEMA_OP; +using kudu::consensus::CHANGE_CONFIG_OP; +using kudu::consensus::CommitMsg; +using kudu::consensus::ConsensusBootstrapInfo; +using kudu::consensus::MinimumOpId; +using kudu::consensus::NO_OP; +using kudu::consensus::OpId; +using kudu::consensus::OpIdEquals; +using kudu::consensus::OpIdToString; +using kudu::consensus::OperationType; +using kudu::consensus::OperationType_Name; +using kudu::consensus::RaftConfigPB; +using kudu::consensus::ReplicateMsg; +using kudu::consensus::WRITE_OP; +using kudu::log::Log; +using kudu::log::LogAnchorRegistry; +using kudu::log::LogEntryPB; +using kudu::log::LogIndex; +using kudu::log::LogOptions; +using kudu::log::LogReader; +using kudu::log::ReadableLogSegment; +using kudu::pb_util::SecureDebugString; +using kudu::pb_util::SecureShortDebugString; +using kudu::rpc::ResultTracker; +using kudu::tserver::AlterSchemaRequestPB; +using kudu::tserver::WriteRequestPB; +using kudu::tserver::WriteResponsePB; using std::map; using std::shared_ptr; using std::string; @@ -137,10 +135,9 @@ using std::unique_ptr; using std::unordered_map; using std::vector; using strings::Substitute; -using tserver::AlterSchemaRequestPB; -using tserver::WriteRequestPB; -using tserver::WriteResponsePB; +namespace kudu { +namespace tablet { struct ReplayState; @@ -339,11 +336,19 @@ class TabletBootstrap { void DumpReplayStateToLog(const ReplayState& state); + Status HandleEntry(ReplayState* state, + unique_ptr<LogEntryPB> entry, + string* entry_debug_info); + // Handlers for each type of message seen in the log during replay. - Status HandleEntry(ReplayState* state, LogEntryPB* entry); - Status HandleReplicateMessage(ReplayState* state, LogEntryPB* replicate_entry); - Status HandleCommitMessage(ReplayState* state, LogEntryPB* commit_entry); - Status ApplyCommitMessage(ReplayState* state, LogEntryPB* commit_entry); + Status HandleReplicateMessage(ReplayState* state, + unique_ptr<LogEntryPB> entry, + string* entry_debug_info); + Status HandleCommitMessage(ReplayState* state, + unique_ptr<LogEntryPB> entry, + string* entry_debug_info); + + Status ApplyCommitMessage(ReplayState* state, LogEntryPB* entry); Status HandleEntryPair(LogEntryPB* replicate_entry, LogEntryPB* commit_entry); // Checks that an orphaned commit message is actually irrelevant, i.e that none @@ -367,7 +372,7 @@ class TabletBootstrap { scoped_refptr<rpc::ResultTracker> result_tracker_; MetricRegistry* metric_registry_; scoped_refptr<TabletReplica> tablet_replica_; - gscoped_ptr<tablet::Tablet> tablet_; + unique_ptr<tablet::Tablet> tablet_; const scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_; scoped_refptr<log::Log> log_; std::shared_ptr<log::LogReader> log_reader_; @@ -461,17 +466,20 @@ static string DebugInfo(const string& tablet_id, int segment_seqno, int entry_idx, const string& segment_path, - const LogEntryPB& entry) { + const string& entry_debug_info) { // Truncate the debug string to a reasonable length for logging. // Otherwise, glog will truncate for us and we may miss important // information which came after this long string. - string debug_str = SecureShortDebugString(entry); + string debug_str = entry_debug_info; if (debug_str.size() > 500) { debug_str.resize(500); debug_str.append("..."); } + if (!debug_str.empty()) { + debug_str = Substitute(" Entry: $0", debug_str); + } return Substitute("Debug Info: Error playing entry $0 of segment $1 of tablet $2. " - "Segment path: $3. Entry: $4", entry_idx, segment_seqno, tablet_id, + "Segment path: $3.$4", entry_idx, segment_seqno, tablet_id, segment_path, debug_str); } @@ -613,18 +621,18 @@ Status TabletBootstrap::FinishBootstrap(const string& message, } Status TabletBootstrap::OpenTablet(bool* has_blocks) { - gscoped_ptr<Tablet> tablet(new Tablet(tablet_meta_, - clock_, - mem_tracker_, - metric_registry_, - log_anchor_registry_)); + unique_ptr<Tablet> tablet(new Tablet(tablet_meta_, + clock_, + mem_tracker_, + metric_registry_, + log_anchor_registry_)); // doing nothing for now except opening a tablet locally. { SCOPED_LOG_SLOW_EXECUTION_PREFIX(INFO, 100, LogPrefix(), "opening tablet"); RETURN_NOT_OK(tablet->Open()); } *has_blocks = tablet->num_rowsets() != 0; - tablet_.reset(tablet.release()); + tablet_ = std::move(tablet); return Status::OK(); } @@ -729,18 +737,13 @@ Status TabletBootstrap::OpenNewLog() { return Status::OK(); } -typedef map<int64_t, LogEntryPB*> OpIndexToEntryMap; +typedef map<int64_t, unique_ptr<LogEntryPB>> OpIndexToEntryMap; // State kept during replay. struct ReplayState { ReplayState() - : prev_op_id(MinimumOpId()), - committed_op_id(MinimumOpId()) { - } - - ~ReplayState() { - STLDeleteValues(&pending_replicates); - STLDeleteValues(&pending_commits); + : prev_op_id(MinimumOpId()), + committed_op_id(MinimumOpId()) { } // Return true if 'b' is allowed to immediately follow 'a' in the log. @@ -784,8 +787,8 @@ struct ReplayState { } void AddEntriesToStrings(const OpIndexToEntryMap& entries, vector<string>* strings) const { - for (const OpIndexToEntryMap::value_type& map_entry : entries) { - LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second); + for (const auto& map_entry : entries) { + const LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second.get()); strings->push_back(Substitute(" $0", SecureShortDebugString(*entry))); } } @@ -819,23 +822,26 @@ struct ReplayState { OpIndexToEntryMap pending_commits; }; -// Handle the given log entry. If OK is returned, then takes ownership of 'entry'. -// Otherwise, caller frees. -Status TabletBootstrap::HandleEntry(ReplayState* state, LogEntryPB* entry) { +// Handle the given log entry. +Status TabletBootstrap::HandleEntry(ReplayState* state, + unique_ptr<LogEntryPB> entry, + string* entry_debug_info) { + DCHECK(entry); if (VLOG_IS_ON(1)) { VLOG_WITH_PREFIX(1) << "Handling entry: " << SecureShortDebugString(*entry); } - switch (entry->type()) { + const auto entry_type = entry->type(); + switch (entry_type) { case log::REPLICATE: - RETURN_NOT_OK(HandleReplicateMessage(state, entry)); + RETURN_NOT_OK(HandleReplicateMessage(state, std::move(entry), entry_debug_info)); break; case log::COMMIT: // check the unpaired ops for the matching replicate msg, abort if not found - RETURN_NOT_OK(HandleCommitMessage(state, entry)); + RETURN_NOT_OK(HandleCommitMessage(state, std::move(entry), entry_debug_info)); break; default: - return Status::Corruption(Substitute("Unexpected log entry type: $0", entry->type())); + return Status::Corruption(Substitute("unexpected log entry type: $0", entry_type)); } MAYBE_FAULT(FLAGS_fault_crash_during_log_replay); return Status::OK(); @@ -860,64 +866,73 @@ void CheckAndRepairOpIdOverflow(OpId* opid) { } } -// Takes ownership of 'replicate_entry' on OK status. -Status TabletBootstrap::HandleReplicateMessage(ReplayState* state, LogEntryPB* replicate_entry) { +Status TabletBootstrap::HandleReplicateMessage(ReplayState* state, + unique_ptr<LogEntryPB> entry, + string* entry_debug_info) { + auto info_collector = MakeScopedCleanup([&]() { + if (entry) { + *entry_debug_info = SecureShortDebugString(*entry); + } + }); + DCHECK(entry->has_replicate()) + << "not a replicate message: " << SecureDebugString(*entry); stats_.ops_read++; - DCHECK(replicate_entry->has_replicate()); - // Fix overflow if necessary (see KUDU-1933). - CheckAndRepairOpIdOverflow(replicate_entry->mutable_replicate()->mutable_id()); + CheckAndRepairOpIdOverflow(entry->mutable_replicate()->mutable_id()); - const ReplicateMsg& replicate = replicate_entry->replicate(); + const ReplicateMsg& replicate = entry->replicate(); RETURN_NOT_OK(state->CheckSequentialReplicateId(replicate)); DCHECK(replicate.has_timestamp()); CHECK_OK(UpdateClock(replicate.timestamp())); // Append the replicate message to the log as is - RETURN_NOT_OK(log_->Append(replicate_entry)); - - int64_t index = replicate_entry->replicate().id().index(); - - LogEntryPB** existing_entry_ptr = InsertOrReturnExisting( - &state->pending_replicates, index, replicate_entry); + RETURN_NOT_OK(log_->Append(entry.get())); - // If there was a entry with the same index we're overwriting then we need to delete - // that entry and all entries with higher indexes. - if (existing_entry_ptr) { - LogEntryPB* existing_entry = *existing_entry_ptr; + const int64_t index = replicate.id().index(); + const auto existing_entry_iter = state->pending_replicates.find(index); + if (existing_entry_iter != state->pending_replicates.end()) { + // If there was a entry with the same index we're overwriting then we need + // to delete that entry and all entries with higher indexes. + const auto& existing_entry = existing_entry_iter->second; auto iter = state->pending_replicates.lower_bound(index); - DCHECK(OpIdEquals((*iter).second->replicate().id(), existing_entry->replicate().id())); - - LogEntryPB* last_entry = (*state->pending_replicates.rbegin()).second; + DCHECK(OpIdEquals(iter->second->replicate().id(), existing_entry->replicate().id())); + const auto& last_entry = state->pending_replicates.rbegin()->second; LOG_WITH_PREFIX(INFO) << "Overwriting operations starting at: " << existing_entry->replicate().id() << " up to: " << last_entry->replicate().id() - << " with operation: " << replicate_entry->replicate().id(); + << " with operation: " << replicate.id(); while (iter != state->pending_replicates.end()) { - delete (*iter).second; - state->pending_replicates.erase(iter++); + iter = state->pending_replicates.erase(iter); stats_.ops_overwritten++; } - - InsertOrDie(&state->pending_replicates, index, replicate_entry); } + EmplaceOrDie(&state->pending_replicates, index, std::move(entry)); + info_collector.cancel(); + return Status::OK(); } -// Takes ownership of 'commit_entry' on OK status. -Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB* commit_entry) { - DCHECK(commit_entry->has_commit()) << "Not a commit message: " - << SecureDebugString(*commit_entry); +// On returning OK, takes ownership of the pointer from the 'entry_ptr' wrapper. +Status TabletBootstrap::HandleCommitMessage(ReplayState* state, + unique_ptr<LogEntryPB> entry, + string* entry_debug_info) { + auto info_collector = MakeScopedCleanup([&]() { + if (entry) { + *entry_debug_info = SecureShortDebugString(*entry); + } + }); + DCHECK(entry->has_commit()) + << "not a commit message: " << SecureDebugString(*entry); // Fix overflow if necessary (see KUDU-1933). - CheckAndRepairOpIdOverflow(commit_entry->mutable_commit()->mutable_commited_op_id()); + CheckAndRepairOpIdOverflow(entry->mutable_commit()->mutable_commited_op_id()); // Match up the COMMIT record with the original entry that it's applied to. - const OpId& committed_op_id = commit_entry->commit().commited_op_id(); + const OpId& committed_op_id = entry->commit().commited_op_id(); state->UpdateCommittedOpId(committed_op_id); // If there are no pending replicates, or if this commit's index is lower than the @@ -925,9 +940,9 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB* comm if (state->pending_replicates.empty() || (*state->pending_replicates.begin()).first > committed_op_id.index()) { VLOG_WITH_PREFIX(2) << "Found orphaned commit for " << committed_op_id; - RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(commit_entry->commit())); + RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit())); stats_.orphaned_commits++; - delete commit_entry; + info_collector.cancel(); return Status::OK(); } @@ -936,30 +951,31 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB* comm if ((*state->pending_replicates.begin()).first != committed_op_id.index()) { if (!ContainsKey(state->pending_replicates, committed_op_id.index())) { return Status::Corruption(Substitute("Could not find replicate for commit: $0", - SecureShortDebugString(*commit_entry))); + SecureShortDebugString(*entry))); } VLOG_WITH_PREFIX(2) << "Adding pending commit for " << committed_op_id; - InsertOrDie(&state->pending_commits, committed_op_id.index(), commit_entry); + EmplaceOrDie(&state->pending_commits, committed_op_id.index(), std::move(entry)); + info_collector.cancel(); return Status::OK(); } // ... if it does, we apply it and all the commits that immediately follow in the sequence. - OpId last_applied = commit_entry->commit().commited_op_id(); - RETURN_NOT_OK(ApplyCommitMessage(state, commit_entry)); + OpId last_applied = committed_op_id; + RETURN_NOT_OK(ApplyCommitMessage(state, entry.get())); auto iter = state->pending_commits.begin(); while (iter != state->pending_commits.end()) { - if ((*iter).first == last_applied.index() + 1) { - gscoped_ptr<LogEntryPB> buffered_commit_entry((*iter).second); - state->pending_commits.erase(iter++); + if (iter->first == last_applied.index() + 1) { + auto& buffered_commit_entry(iter->second); last_applied = buffered_commit_entry->commit().commited_op_id(); RETURN_NOT_OK(ApplyCommitMessage(state, buffered_commit_entry.get())); + iter = state->pending_commits.erase(iter); continue; } break; } - delete commit_entry; + info_collector.cancel(); return Status::OK(); } @@ -996,35 +1012,33 @@ Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& com return Status::OK(); } -Status TabletBootstrap::ApplyCommitMessage(ReplayState* state, LogEntryPB* commit_entry) { - - const OpId& committed_op_id = commit_entry->commit().commited_op_id(); +Status TabletBootstrap::ApplyCommitMessage(ReplayState* state, LogEntryPB* entry) { + const OpId& committed_op_id = entry->commit().commited_op_id(); VLOG_WITH_PREFIX(2) << "Applying commit for " << committed_op_id; - gscoped_ptr<LogEntryPB> pending_replicate_entry; // They should also have an associated replicate index (it may have been in a // deleted log segment though). - pending_replicate_entry.reset(EraseKeyReturnValuePtr(&state->pending_replicates, - committed_op_id.index())); - - if (pending_replicate_entry != nullptr) { + unique_ptr<LogEntryPB> pending_replicate_entry(EraseKeyReturnValuePtr( + &state->pending_replicates, committed_op_id.index())); + if (pending_replicate_entry) { // We found a replicate with the same index, make sure it also has the same // term. - if (!OpIdEquals(committed_op_id, pending_replicate_entry->replicate().id())) { + const auto& replicate = pending_replicate_entry->replicate(); + if (!OpIdEquals(committed_op_id, replicate.id())) { string error_msg = Substitute("Committed operation's OpId: $0 didn't match the" "commit message's committed OpId: $1. Pending operation: $2, Commit message: $3", - SecureShortDebugString(pending_replicate_entry->replicate().id()), + SecureShortDebugString(replicate.id()), SecureShortDebugString(committed_op_id), - SecureShortDebugString(pending_replicate_entry->replicate()), - SecureShortDebugString(commit_entry->commit())); + SecureShortDebugString(replicate), + SecureShortDebugString(entry->commit())); LOG_WITH_PREFIX(DFATAL) << error_msg; return Status::Corruption(error_msg); } - RETURN_NOT_OK(HandleEntryPair(pending_replicate_entry.get(), commit_entry)); + RETURN_NOT_OK(HandleEntryPair(pending_replicate_entry.get(), entry)); stats_.ops_committed++; } else { stats_.orphaned_commits++; - RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(commit_entry->commit())); + RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit())); } return Status::OK(); @@ -1143,36 +1157,36 @@ Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) { int entry_count = 0; while (true) { - unique_ptr<LogEntryPB> entry(new LogEntryPB); - - Status s = reader.ReadNextEntry(entry.get()); - if (PREDICT_FALSE(!s.ok())) { - if (s.IsEndOfFile()) { - break; + { + unique_ptr<LogEntryPB> entry; + Status s = reader.ReadNextEntry(&entry); + if (PREDICT_FALSE(!s.ok())) { + if (s.IsEndOfFile()) { + break; + } + return Status::Corruption( + Substitute("Error reading Log Segment of tablet $0: $1 " + "(Read up to entry $2 of segment $3, in path $4)", + tablet_->tablet_id(), + s.ToString(), + entry_count, + segment->header().sequence_number(), + segment->path())); } - return Status::Corruption(Substitute("Error reading Log Segment of tablet $0: $1 " - "(Read up to entry $2 of segment $3, in path $4)", - tablet_->tablet_id(), - s.ToString(), - entry_count, - segment->header().sequence_number(), - segment->path())); - } - entry_count++; + entry_count++; - s = HandleEntry(&state, entry.get()); - if (!s.ok()) { - DumpReplayStateToLog(state); - RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(), - segment->header().sequence_number(), - entry_count, segment->path(), - *entry)); + string entry_debug_info; + s = HandleEntry(&state, std::move(entry), &entry_debug_info); + if (!s.ok()) { + DumpReplayStateToLog(state); + RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(), + segment->header().sequence_number(), + entry_count, segment->path(), + entry_debug_info)); + } } - // If HandleEntry returns OK, then it has taken ownership of the entry. - entry.release(); - - auto now = MonoTime::Now(); + const auto now = MonoTime::Now(); if (now - last_status_update > kStatusUpdateInterval) { SetStatusMessage(Substitute("Bootstrap replaying log segment $0/$1 " "($2/$3 this segment, stats: $4)", http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/tools/tool_action_common.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc index 09d0cf1..e3bcc8f 100644 --- a/src/kudu/tools/tool_action_common.cc +++ b/src/kudu/tools/tool_action_common.cc @@ -339,22 +339,24 @@ Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) { RETURN_NOT_OK(SchemaFromPB(segment->header().schema(), &tablet_schema)); LogEntryReader reader(segment.get()); - LogEntryPB entry; while (true) { + unique_ptr<LogEntryPB> entry; Status s = reader.ReadNextEntry(&entry); - if (s.IsEndOfFile()) break; + if (s.IsEndOfFile()) { + break; + } RETURN_NOT_OK(s); if (print_type == PRINT_PB) { if (FLAGS_truncate_data > 0) { - pb_util::TruncateFields(&entry, FLAGS_truncate_data); + pb_util::TruncateFields(entry.get(), FLAGS_truncate_data); } - cout << "Entry:\n" << SecureDebugString(entry); + cout << "Entry:\n" << SecureDebugString(*entry); } else if (print_type == PRINT_DECODED) { - RETURN_NOT_OK(PrintDecoded(entry, tablet_schema)); + RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema)); } else if (print_type == PRINT_ID) { - PrintIdOnly(entry); + PrintIdOnly(*entry); } } } http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/tools/tool_action_local_replica.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc index c65e51e..085c259 100644 --- a/src/kudu/tools/tool_action_local_replica.cc +++ b/src/kudu/tools/tool_action_local_replica.cc @@ -185,12 +185,12 @@ Status FindLastLoggedOpId(FsManager* fs, const string& tablet_id, for (seg = segs.rbegin(); seg != segs.rend(); ++seg) { LogEntryReader reader(seg->get()); while (true) { - LogEntryPB entry; + unique_ptr<LogEntryPB> entry; Status s = reader.ReadNextEntry(&entry); if (s.IsEndOfFile()) break; RETURN_NOT_OK_PREPEND(s, "Error in log segment"); - if (entry.type() != log::REPLICATE) continue; - *last_logged_opid = entry.replicate().id(); + if (entry->type() != log::REPLICATE) continue; + *last_logged_opid = entry->replicate().id(); found = true; } if (found) return Status::OK();