This is an automated email from the ASF dual-hosted git repository. adar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
View the commit online: https://github.com/apache/kudu/commit/0eb0a60f23251afbbe987e9bf88036b0b95f97ab The following commit(s) were added to refs/heads/master by this push: new 0eb0a60 log: refactor close and replace last segment 0eb0a60 is described below commit 0eb0a60f23251afbbe987e9bf88036b0b95f97ab Author: Adar Dembo <[email protected]> AuthorDate: Wed Nov 20 21:17:57 2019 -0800 log: refactor close and replace last segment Log::Close and SegmentAllocator::RollOver both tried to do the same thing, except Log::Close didn't actually need to replace the last segment, nor did it need to close the current segment with state_lock_ held. I also snuck in a small fix to LogIndex::IndexChunk::Open's use of ftruncate, and a visibility fix to a SegmentAllocator member. Change-Id: I871b43514cbafe9a9b594a551fe653d766298123 Reviewed-on: http://gerrit.cloudera.org:8080/14767 Tested-by: Adar Dembo <[email protected]> Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/consensus/log.cc | 72 ++++++++++++++++++++++------------------- src/kudu/consensus/log.h | 19 +++++++++-- src/kudu/consensus/log_index.cc | 4 +-- 3 files changed, 56 insertions(+), 39 deletions(-) diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc index ae15638..2db6b9f 100644 --- a/src/kudu/consensus/log.cc +++ b/src/kudu/consensus/log.cc @@ -463,7 +463,7 @@ Status SegmentAllocator::Init(uint64_t sequence_number) { "could not instantiate compression codec"); } } - active_segment_sequence_number = sequence_number; + active_segment_sequence_number_ = sequence_number; RETURN_NOT_OK(ThreadPoolBuilder("log-alloc") .set_max_threads(1) .Build(&allocation_pool_)); @@ -531,7 +531,7 @@ Status SegmentAllocator::Sync() { return Status::OK(); } -Status SegmentAllocator::CloseCurrentSegment() { +Status SegmentAllocator::CloseCurrentSegment(CloseMode mode) { if (hooks_) { RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed"); } @@ -548,6 +548,13 @@ Status SegmentAllocator::CloseCurrentSegment() { if (hooks_) { RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed"); } + + if (mode == CLOSE_AND_REPLACE_LAST_SEGMENT) { + scoped_refptr<ReadableLogSegment> last_segment; + RETURN_NOT_OK(GetClosedSegment(&last_segment)); + return reader_replace_last_segment_(std::move(last_segment)); + } + return Status::OK(); } @@ -658,10 +665,10 @@ Status SegmentAllocator::AllocateNewSegment() { Status SegmentAllocator::SwitchToAllocatedSegment() { // Increment "next" log segment seqno. - active_segment_sequence_number++; + active_segment_sequence_number_++; const auto& tablet_id = ctx_->tablet_id; - string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(tablet_id, - active_segment_sequence_number); + string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName( + tablet_id, active_segment_sequence_number_); Env* env = ctx_->fs_manager->env(); RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "rename"); if (opts_->force_fsync_all) { @@ -674,7 +681,7 @@ Status SegmentAllocator::SwitchToAllocatedSegment() { // Set up the new header and footer. LogSegmentHeaderPB header; - header.set_sequence_number(active_segment_sequence_number); + header.set_sequence_number(active_segment_sequence_number_); header.set_tablet_id(tablet_id); if (codec_) { header.set_compression_codec(codec_->type()); @@ -723,10 +730,7 @@ Status SegmentAllocator::RollOver() { // If this isn't the first active segment, close the segment and make it // available to the log reader. if (active_segment_) { - RETURN_NOT_OK(CloseCurrentSegment()); - scoped_refptr<ReadableLogSegment> readable_segment; - RETURN_NOT_OK(GetClosedSegment(&readable_segment)); - RETURN_NOT_OK(reader_replace_last_segment_(readable_segment)); + RETURN_NOT_OK(CloseCurrentSegment(CLOSE_AND_REPLACE_LAST_SEGMENT)); } RETURN_NOT_OK(SwitchToAllocatedSegment()); @@ -790,14 +794,14 @@ Status Log::Init() { // The case where we are continuing an existing log. // We must pick up where the previous WAL left off in terms of // sequence numbers. - uint64_t active_segment_sequence_number = 0; + uint64_t active_seg_seq_num = 0; if (reader_->num_segments() != 0) { VLOG_WITH_PREFIX(1) << "Using existing " << reader_->num_segments() << " segments from path: " << ctx_.fs_manager->GetWalsRootDir(); vector<scoped_refptr<ReadableLogSegment> > segments; RETURN_NOT_OK(reader_->GetSegmentsSnapshot(&segments)); - active_segment_sequence_number = segments.back()->header().sequence_number(); + active_seg_seq_num = segments.back()->header().sequence_number(); } if (options_.force_fsync_all) { @@ -808,7 +812,7 @@ Status Log::Init() { } // We always create a new segment when the log starts. - RETURN_NOT_OK(segment_allocator_.Init(active_segment_sequence_number)); + RETURN_NOT_OK(segment_allocator_.Init(active_seg_seq_num)); RETURN_NOT_OK(append_thread_->Init()); log_state_ = kLogWriting; return Status::OK(); @@ -918,7 +922,7 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch, LogIndexEntry index_entry; index_entry.op_id = entry_pb.replicate().id(); - index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number; + index_entry.segment_sequence_number = segment_allocator_.active_segment_sequence_number(); index_entry.offset_in_segment = start_offset; RETURN_NOT_OK(log_index_->AddEntry(index_entry)); } @@ -1111,28 +1115,28 @@ Status Log::Close() { segment_allocator_.StopAllocationThread(); append_thread_->Shutdown(); - std::lock_guard<percpu_rwlock> l(state_lock_); - switch (log_state_) { - case kLogWriting: { - RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment()); - scoped_refptr<ReadableLogSegment> last_segment; - RETURN_NOT_OK(segment_allocator_.GetClosedSegment(&last_segment)); - RETURN_NOT_OK(reader_->ReplaceLastSegment(last_segment)); - log_state_ = kLogClosed; - VLOG_WITH_PREFIX(1) << "Log closed"; - - // Release FDs held by these objects. - log_index_.reset(); - reader_.reset(); - return Status::OK(); + { + std::lock_guard<percpu_rwlock> l(state_lock_); + switch (log_state_) { + case kLogWriting: + log_state_ = kLogClosed; + break; + case kLogClosed: + VLOG_WITH_PREFIX(1) << "Log already closed"; + return Status::OK(); + default: + return Status::IllegalState(Substitute( + "Log not open. State: $0", log_state_)); } - case kLogClosed: - VLOG_WITH_PREFIX(1) << "Log already closed"; - return Status::OK(); - - default: - return Status::IllegalState(Substitute("Log not open. State: $0", log_state_)); } + + RETURN_NOT_OK(segment_allocator_.CloseCurrentSegment(SegmentAllocator::CLOSE)); + VLOG_WITH_PREFIX(1) << "Log closed"; + + // Release FDs held by these objects. + log_index_.reset(); + reader_.reset(); + return Status::OK(); } bool Log::HasOnDiskData(FsManager* fs_manager, const string& tablet_id) { diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h index 06ebc4a..e877bf1 100644 --- a/src/kudu/consensus/log.h +++ b/src/kudu/consensus/log.h @@ -127,7 +127,15 @@ class SegmentAllocator { Status Sync(); // Syncs the current segment and writes out the footer. - Status CloseCurrentSegment(); + enum CloseMode { + // Just close the current semgent. + CLOSE, + + // Close the current segment and call reader_replace_last_segment_ to + // replace the last log segment in the log reader. + CLOSE_AND_REPLACE_LAST_SEGMENT, + }; + Status CloseCurrentSegment(CloseMode mode); // Update the footer based on the written 'batch', e.g. to track the // last-written OpId. @@ -137,10 +145,12 @@ class SegmentAllocator { // current active segment. void StopAllocationThread(); - uint64_t active_segment_sequence_number = 0; - std::string LogPrefix() const { return ctx_->LogPrefix(); } + uint64_t active_segment_sequence_number() const { + return active_segment_sequence_number_; + } + private: friend class Log; friend class LogTest; @@ -229,6 +239,9 @@ class SegmentAllocator { // Single-threaded threadpool on which to allocate segments. std::unique_ptr<ThreadPool> allocation_pool_; Promise<Status> allocation_status_; + + // The sequence number of the 'active' log segment. + uint64_t active_segment_sequence_number_ = 0; }; // Log interface, inspired by Raft's (logcabin) Log. Provides durability to diff --git a/src/kudu/consensus/log_index.cc b/src/kudu/consensus/log_index.cc index 587fb7a..bfeea72 100644 --- a/src/kudu/consensus/log_index.cc +++ b/src/kudu/consensus/log_index.cc @@ -126,12 +126,12 @@ Status LogIndex::IndexChunk::Open() { int err; RETRY_ON_EINTR(err, ftruncate(fd_, kChunkFileSize)); - RETURN_NOT_OK(CheckError(fd_, "truncate")); + RETURN_NOT_OK(CheckError(err, "truncate")); mapping_ = static_cast<uint8_t*>(mmap(nullptr, kChunkFileSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, 0)); if (mapping_ == nullptr) { - int err = errno; + err = errno; return Status::IOError("Unable to mmap()", ErrnoToString(err), err); }
