log block manager: push down container locks This simplifies container logic a bit. PosixRWFile is now fully reentrant, and there's a new lock in WritablePBContainerFile to synchronize accesses to offset_.
Change-Id: Ie2f69bc8b455bce2e1d89ed22713e57a9c3006dc Reviewed-on: http://gerrit.cloudera.org:8080/5029 Reviewed-by: Dan Burkert <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/583aa224 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/583aa224 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/583aa224 Branch: refs/heads/master Commit: 583aa2247b664ad06b18a6bb472665f6fab4aab1 Parents: 2cd0792 Author: Adar Dembo <[email protected]> Authored: Fri Nov 4 19:06:59 2016 -0700 Committer: Adar Dembo <[email protected]> Committed: Wed Nov 16 01:59:59 2016 +0000 ---------------------------------------------------------------------- src/kudu/fs/log_block_manager.cc | 23 ++--------------------- src/kudu/util/env.h | 8 ++++++-- src/kudu/util/env_posix.cc | 35 +++++++++++++++++++++-------------- src/kudu/util/pb_util.cc | 16 +++++++++++----- src/kudu/util/pb_util.h | 19 +++++++++++++++---- 5 files changed, 55 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/fs/log_block_manager.cc ---------------------------------------------------------------------- diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc index 91609da..65d0665 100644 --- a/src/kudu/fs/log_block_manager.cc +++ b/src/kudu/fs/log_block_manager.cc @@ -286,14 +286,8 @@ class LogBlockContainer { int64_t preallocated_offset_ = 0; // Opened file handles to the container's files. - // - // RWFile is not thread safe so access to each writer must be - // serialized through a (sleeping) mutex. We use different mutexes to - // avoid contention in cases where only one writer is needed. gscoped_ptr<WritablePBContainerFile> metadata_file_; - Mutex metadata_file_lock_; gscoped_ptr<RWFile> data_file_; - Mutex data_file_lock_; // The amount of data written thus far in the container. int64_t total_bytes_written_ = 0; @@ -556,7 +550,6 @@ Status LogBlockContainer::DeleteBlock(int64_t offset, int64_t length) { // It is invalid to punch a zero-size hole. if (length) { - std::lock_guard<Mutex> l(data_file_lock_); // Round up to the nearest filesystem block so that the kernel will // actually reclaim disk space. // @@ -570,10 +563,7 @@ Status LogBlockContainer::DeleteBlock(int64_t offset, int64_t length) { Status LogBlockContainer::AppendData(const Slice& data) { RETURN_NOT_OK(EnsurePreallocated(data.size())); - { - std::lock_guard<Mutex> l(data_file_lock_); - RETURN_NOT_OK(data_file_->Write(total_bytes_written_, data)); - } + RETURN_NOT_OK(data_file_->Write(total_bytes_written_, data)); total_bytes_written_ += data.size(); // This append may have changed the container size if: @@ -595,26 +585,21 @@ Status LogBlockContainer::ReadData(int64_t offset, size_t length, Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) { // Note: We don't check for sufficient disk space for metadata writes in // order to allow for block deletion on full disks. - std::lock_guard<Mutex> l(metadata_file_lock_); return metadata_file_->Append(pb); } Status LogBlockContainer::FlushData(int64_t offset, int64_t length) { DCHECK_GE(offset, 0); DCHECK_GE(length, 0); - - std::lock_guard<Mutex> l(data_file_lock_); return data_file_->Flush(RWFile::FLUSH_ASYNC, offset, length); } Status LogBlockContainer::FlushMetadata() { - std::lock_guard<Mutex> l(metadata_file_lock_); return metadata_file_->Flush(); } Status LogBlockContainer::SyncData() { if (FLAGS_enable_data_block_fsync) { - std::lock_guard<Mutex> l(data_file_lock_); return data_file_->Sync(); } return Status::OK(); @@ -622,7 +607,6 @@ Status LogBlockContainer::SyncData() { Status LogBlockContainer::SyncMetadata() { if (FLAGS_enable_data_block_fsync) { - std::lock_guard<Mutex> l(metadata_file_lock_); return metadata_file_->Sync(); } return Status::OK(); @@ -639,10 +623,7 @@ Status LogBlockContainer::EnsurePreallocated(size_t next_append_bytes) { next_append_bytes > preallocated_offset_ - total_bytes_written_) { int64_t off = std::max(preallocated_offset_, total_bytes_written_); int64_t len = FLAGS_log_container_preallocate_bytes; - { - std::lock_guard<Mutex> l(data_file_lock_); - RETURN_NOT_OK(data_file_->PreAllocate(off, len)); - } + RETURN_NOT_OK(data_file_->PreAllocate(off, len)); RETURN_NOT_OK(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS)); VLOG(2) << Substitute("Preallocated $0 bytes at offset $1 in container $2", len, off, ToString()); http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/util/env.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h index 60cb8f5..a59a766 100644 --- a/src/kudu/util/env.h +++ b/src/kudu/util/env.h @@ -423,8 +423,10 @@ struct RWFileOptions { // file offset is ever used; instead, all operations must provide an // explicit offset. // -// All "read" operations are safe for concurrent use by multiple threads, -// but "write" operations must be externally synchronized. +// All operations are safe for concurrent use by multiple threads (unless +// noted otherwise) bearing in mind the usual filesystem coherency guarantees +// (e.g. two threads that write concurrently to the same file offset will +// probably yield garbage). class RWFile { public: enum FlushMode { @@ -495,6 +497,8 @@ class RWFile { // Closes the file, optionally calling Sync() on it if the file was // created with the sync_on_close option enabled. + // + // Not thread-safe. virtual Status Close() = 0; // Retrieves the file's size. http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/util/env_posix.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc index 60d1df4..a1a8b89 100644 --- a/src/kudu/util/env_posix.cc +++ b/src/kudu/util/env_posix.cc @@ -28,6 +28,7 @@ #include "kudu/gutil/callback.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/util/atomic.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/env.h" #include "kudu/util/errno.h" @@ -514,12 +515,11 @@ class PosixRWFile : public RWFile { : filename_(std::move(fname)), fd_(fd), sync_on_close_(sync_on_close), - pending_sync_(false) {} + pending_sync_(false), + closed_(false) {} ~PosixRWFile() { - if (fd_ >= 0) { - WARN_NOT_OK(Close(), "Failed to close " + filename_); - } + WARN_NOT_OK(Close(), "Failed to close " + filename_); } virtual Status Read(uint64_t offset, size_t length, @@ -567,7 +567,7 @@ class PosixRWFile : public RWFile { data.size(), written)); } - pending_sync_ = true; + pending_sync_.Store(true); return Status::OK(); } @@ -633,18 +633,23 @@ class PosixRWFile : public RWFile { } virtual Status Sync() OVERRIDE { + if (!pending_sync_.CompareAndSwap(true, false)) { + return Status::OK(); + } + TRACE_EVENT1("io", "PosixRWFile::Sync", "path", filename_); ThreadRestrictions::AssertIOAllowed(); LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("sync call for $0", filename())) { - if (pending_sync_) { - pending_sync_ = false; - RETURN_NOT_OK(DoSync(fd_, filename_)); - } + RETURN_NOT_OK(DoSync(fd_, filename_)); } return Status::OK(); } virtual Status Close() OVERRIDE { + if (closed_) { + return Status::OK(); + } + TRACE_EVENT1("io", "PosixRWFile::Close", "path", filename_); ThreadRestrictions::AssertIOAllowed(); Status s; @@ -662,7 +667,7 @@ class PosixRWFile : public RWFile { } } - fd_ = -1; + closed_ = true; return s; } @@ -683,12 +688,14 @@ class PosixRWFile : public RWFile { private: const std::string filename_; - int fd_; - bool sync_on_close_; - bool pending_sync_; + const int fd_; + const bool sync_on_close_; + + AtomicBool pending_sync_; + bool closed_; }; -static int LockOrUnlock(int fd, bool lock) { +int LockOrUnlock(int fd, bool lock) { ThreadRestrictions::AssertIOAllowed(); errno = 0; struct flock f; http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/util/pb_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc index 8f7b203..d19b16f 100644 --- a/src/kudu/util/pb_util.cc +++ b/src/kudu/util/pb_util.cc @@ -26,6 +26,7 @@ #include <deque> #include <initializer_list> #include <memory> +#include <mutex> #include <ostream> #include <sstream> #include <string> @@ -57,6 +58,7 @@ #include "kudu/util/env.h" #include "kudu/util/env_util.h" #include "kudu/util/jsonwriter.h" +#include "kudu/util/mutex.h" #include "kudu/util/path_util.h" #include "kudu/util/pb_util-internal.h" #include "kudu/util/pb_util.pb.h" @@ -601,16 +603,20 @@ Status WritablePBContainerFile::Init(const Message& msg) { Status WritablePBContainerFile::Reopen() { DCHECK(state_ == FileState::NOT_INITIALIZED || state_ == FileState::OPEN) << state_; - offset_ = 0; - RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_)); - ContainerSupHeaderPB sup_header; - RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, &sup_header)); - RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end of the file. + { + std::lock_guard<Mutex> l(offset_lock_); + offset_ = 0; + RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_)); + ContainerSupHeaderPB sup_header; + RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, &sup_header)); + RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end of the file. + } state_ = FileState::OPEN; return Status::OK(); } Status WritablePBContainerFile::AppendBytes(const Slice& data) { + std::lock_guard<Mutex> l(offset_lock_); RETURN_NOT_OK(writer_->Write(offset_, data)); offset_ += data.size(); return Status::OK(); http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/util/pb_util.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/pb_util.h b/src/kudu/util/pb_util.h index 606463e..f674a61 100644 --- a/src/kudu/util/pb_util.h +++ b/src/kudu/util/pb_util.h @@ -28,6 +28,7 @@ #include "kudu/gutil/gscoped_ptr.h" #include "kudu/util/debug/trace_event.h" #include "kudu/util/faststring.h" +#include "kudu/util/mutex.h" namespace google { namespace protobuf { @@ -243,11 +244,10 @@ void TruncateFields(google::protobuf::Message* message, int max_len); // https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf // https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf -// Protobuf container file opened for writing. +// Protobuf container file opened for writing. Can be built around an existing +// file or a completely new file. // -// Can be built around an existing file or a completely new file. -// -// Not thread-safe. +// Every function is thread-safe unless indicated otherwise. class WritablePBContainerFile { public: @@ -263,6 +263,8 @@ class WritablePBContainerFile { // // 'msg' need not be populated; its type is used to "lock" the container // to a particular protobuf message type in Append(). + // + // Not thread-safe. Status Init(const google::protobuf::Message& msg); // Reopen a protobuf container file for append. The file must already have a @@ -302,6 +304,8 @@ class WritablePBContainerFile { Status Sync(); // Closes the container. + // + // Not thread-safe. Status Close(); // Returns the path to the container's underlying file handle. @@ -330,12 +334,19 @@ class WritablePBContainerFile { // Append bytes to the file. Status AppendBytes(const Slice& data); + // State of the file. FileState state_; + // Protects offset_. + Mutex offset_lock_; + // Current write offset into the file. uint64_t offset_; + + // Protobuf container file version. int version_; + // File writer. gscoped_ptr<RWFile> writer_; };
