log block manager: push down container locks

This simplifies container logic a bit. PosixRWFile is now fully reentrant,
and there's a new lock in WritablePBContainerFile to synchronize accesses
to offset_.

Change-Id: Ie2f69bc8b455bce2e1d89ed22713e57a9c3006dc
Reviewed-on: http://gerrit.cloudera.org:8080/5029
Reviewed-by: Dan Burkert <[email protected]>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/583aa224
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/583aa224
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/583aa224

Branch: refs/heads/master
Commit: 583aa2247b664ad06b18a6bb472665f6fab4aab1
Parents: 2cd0792
Author: Adar Dembo <[email protected]>
Authored: Fri Nov 4 19:06:59 2016 -0700
Committer: Adar Dembo <[email protected]>
Committed: Wed Nov 16 01:59:59 2016 +0000

----------------------------------------------------------------------
 src/kudu/fs/log_block_manager.cc | 23 ++---------------------
 src/kudu/util/env.h              |  8 ++++++--
 src/kudu/util/env_posix.cc       | 35 +++++++++++++++++++++--------------
 src/kudu/util/pb_util.cc         | 16 +++++++++++-----
 src/kudu/util/pb_util.h          | 19 +++++++++++++++----
 5 files changed, 55 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 91609da..65d0665 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -286,14 +286,8 @@ class LogBlockContainer {
   int64_t preallocated_offset_ = 0;
 
   // Opened file handles to the container's files.
-  //
-  // RWFile is not thread safe so access to each writer must be
-  // serialized through a (sleeping) mutex. We use different mutexes to
-  // avoid contention in cases where only one writer is needed.
   gscoped_ptr<WritablePBContainerFile> metadata_file_;
-  Mutex metadata_file_lock_;
   gscoped_ptr<RWFile> data_file_;
-  Mutex data_file_lock_;
 
   // The amount of data written thus far in the container.
   int64_t total_bytes_written_ = 0;
@@ -556,7 +550,6 @@ Status LogBlockContainer::DeleteBlock(int64_t offset, 
int64_t length) {
 
   // It is invalid to punch a zero-size hole.
   if (length) {
-    std::lock_guard<Mutex> l(data_file_lock_);
     // Round up to the nearest filesystem block so that the kernel will
     // actually reclaim disk space.
     //
@@ -570,10 +563,7 @@ Status LogBlockContainer::DeleteBlock(int64_t offset, 
int64_t length) {
 
 Status LogBlockContainer::AppendData(const Slice& data) {
   RETURN_NOT_OK(EnsurePreallocated(data.size()));
-  {
-    std::lock_guard<Mutex> l(data_file_lock_);
-    RETURN_NOT_OK(data_file_->Write(total_bytes_written_, data));
-  }
+  RETURN_NOT_OK(data_file_->Write(total_bytes_written_, data));
   total_bytes_written_ += data.size();
 
   // This append may have changed the container size if:
@@ -595,26 +585,21 @@ Status LogBlockContainer::ReadData(int64_t offset, size_t 
length,
 Status LogBlockContainer::AppendMetadata(const BlockRecordPB& pb) {
   // Note: We don't check for sufficient disk space for metadata writes in
   // order to allow for block deletion on full disks.
-  std::lock_guard<Mutex> l(metadata_file_lock_);
   return metadata_file_->Append(pb);
 }
 
 Status LogBlockContainer::FlushData(int64_t offset, int64_t length) {
   DCHECK_GE(offset, 0);
   DCHECK_GE(length, 0);
-
-  std::lock_guard<Mutex> l(data_file_lock_);
   return data_file_->Flush(RWFile::FLUSH_ASYNC, offset, length);
 }
 
 Status LogBlockContainer::FlushMetadata() {
-  std::lock_guard<Mutex> l(metadata_file_lock_);
   return metadata_file_->Flush();
 }
 
 Status LogBlockContainer::SyncData() {
   if (FLAGS_enable_data_block_fsync) {
-    std::lock_guard<Mutex> l(data_file_lock_);
     return data_file_->Sync();
   }
   return Status::OK();
@@ -622,7 +607,6 @@ Status LogBlockContainer::SyncData() {
 
 Status LogBlockContainer::SyncMetadata() {
   if (FLAGS_enable_data_block_fsync) {
-    std::lock_guard<Mutex> l(metadata_file_lock_);
     return metadata_file_->Sync();
   }
   return Status::OK();
@@ -639,10 +623,7 @@ Status LogBlockContainer::EnsurePreallocated(size_t 
next_append_bytes) {
       next_append_bytes > preallocated_offset_ - total_bytes_written_) {
     int64_t off = std::max(preallocated_offset_, total_bytes_written_);
     int64_t len = FLAGS_log_container_preallocate_bytes;
-    {
-      std::lock_guard<Mutex> l(data_file_lock_);
-      RETURN_NOT_OK(data_file_->PreAllocate(off, len));
-    }
+    RETURN_NOT_OK(data_file_->PreAllocate(off, len));
     RETURN_NOT_OK(data_dir_->RefreshIsFull(DataDir::RefreshMode::ALWAYS));
     VLOG(2) << Substitute("Preallocated $0 bytes at offset $1 in container $2",
                           len, off, ToString());

http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index 60cb8f5..a59a766 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -423,8 +423,10 @@ struct RWFileOptions {
 // file offset is ever used; instead, all operations must provide an
 // explicit offset.
 //
-// All "read" operations are safe for concurrent use by multiple threads,
-// but "write" operations must be externally synchronized.
+// All operations are safe for concurrent use by multiple threads (unless
+// noted otherwise) bearing in mind the usual filesystem coherency guarantees
+// (e.g. two threads that write concurrently to the same file offset will
+// probably yield garbage).
 class RWFile {
  public:
   enum FlushMode {
@@ -495,6 +497,8 @@ class RWFile {
 
   // Closes the file, optionally calling Sync() on it if the file was
   // created with the sync_on_close option enabled.
+  //
+  // Not thread-safe.
   virtual Status Close() = 0;
 
   // Retrieves the file's size.

http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 60d1df4..a1a8b89 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -28,6 +28,7 @@
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/atomic.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/env.h"
 #include "kudu/util/errno.h"
@@ -514,12 +515,11 @@ class PosixRWFile : public RWFile {
       : filename_(std::move(fname)),
         fd_(fd),
         sync_on_close_(sync_on_close),
-        pending_sync_(false) {}
+        pending_sync_(false),
+        closed_(false) {}
 
   ~PosixRWFile() {
-    if (fd_ >= 0) {
-      WARN_NOT_OK(Close(), "Failed to close " + filename_);
-    }
+    WARN_NOT_OK(Close(), "Failed to close " + filename_);
   }
 
   virtual Status Read(uint64_t offset, size_t length,
@@ -567,7 +567,7 @@ class PosixRWFile : public RWFile {
                      data.size(), written));
     }
 
-    pending_sync_ = true;
+    pending_sync_.Store(true);
     return Status::OK();
   }
 
@@ -633,18 +633,23 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status Sync() OVERRIDE {
+    if (!pending_sync_.CompareAndSwap(true, false)) {
+      return Status::OK();
+    }
+
     TRACE_EVENT1("io", "PosixRWFile::Sync", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("sync call for $0", 
filename())) {
-      if (pending_sync_) {
-        pending_sync_ = false;
-        RETURN_NOT_OK(DoSync(fd_, filename_));
-      }
+      RETURN_NOT_OK(DoSync(fd_, filename_));
     }
     return Status::OK();
   }
 
   virtual Status Close() OVERRIDE {
+    if (closed_) {
+      return Status::OK();
+    }
+
     TRACE_EVENT1("io", "PosixRWFile::Close", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     Status s;
@@ -662,7 +667,7 @@ class PosixRWFile : public RWFile {
       }
     }
 
-    fd_ = -1;
+    closed_ = true;
     return s;
   }
 
@@ -683,12 +688,14 @@ class PosixRWFile : public RWFile {
 
  private:
   const std::string filename_;
-  int fd_;
-  bool sync_on_close_;
-  bool pending_sync_;
+  const int fd_;
+  const bool sync_on_close_;
+
+  AtomicBool pending_sync_;
+  bool closed_;
 };
 
-static int LockOrUnlock(int fd, bool lock) {
+int LockOrUnlock(int fd, bool lock) {
   ThreadRestrictions::AssertIOAllowed();
   errno = 0;
   struct flock f;

http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index 8f7b203..d19b16f 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -26,6 +26,7 @@
 #include <deque>
 #include <initializer_list>
 #include <memory>
+#include <mutex>
 #include <ostream>
 #include <sstream>
 #include <string>
@@ -57,6 +58,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/jsonwriter.h"
+#include "kudu/util/mutex.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util-internal.h"
 #include "kudu/util/pb_util.pb.h"
@@ -601,16 +603,20 @@ Status WritablePBContainerFile::Init(const Message& msg) {
 
 Status WritablePBContainerFile::Reopen() {
   DCHECK(state_ == FileState::NOT_INITIALIZED || state_ == FileState::OPEN) << 
state_;
-  offset_ = 0;
-  RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_));
-  ContainerSupHeaderPB sup_header;
-  RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, 
&sup_header));
-  RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end 
of the file.
+  {
+    std::lock_guard<Mutex> l(offset_lock_);
+    offset_ = 0;
+    RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_));
+    ContainerSupHeaderPB sup_header;
+    RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, 
&sup_header));
+    RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the 
end of the file.
+  }
   state_ = FileState::OPEN;
   return Status::OK();
 }
 
 Status WritablePBContainerFile::AppendBytes(const Slice& data) {
+  std::lock_guard<Mutex> l(offset_lock_);
   RETURN_NOT_OK(writer_->Write(offset_, data));
   offset_ += data.size();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/583aa224/src/kudu/util/pb_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/pb_util.h b/src/kudu/util/pb_util.h
index 606463e..f674a61 100644
--- a/src/kudu/util/pb_util.h
+++ b/src/kudu/util/pb_util.h
@@ -28,6 +28,7 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/mutex.h"
 
 namespace google {
 namespace protobuf {
@@ -243,11 +244,10 @@ void TruncateFields(google::protobuf::Message* message, 
int max_len);
 // 
https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf
 // 
https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf
 
-// Protobuf container file opened for writing.
+// Protobuf container file opened for writing. Can be built around an existing
+// file or a completely new file.
 //
-// Can be built around an existing file or a completely new file.
-//
-// Not thread-safe.
+// Every function is thread-safe unless indicated otherwise.
 class WritablePBContainerFile {
  public:
 
@@ -263,6 +263,8 @@ class WritablePBContainerFile {
   //
   // 'msg' need not be populated; its type is used to "lock" the container
   // to a particular protobuf message type in Append().
+  //
+  // Not thread-safe.
   Status Init(const google::protobuf::Message& msg);
 
   // Reopen a protobuf container file for append. The file must already have a
@@ -302,6 +304,8 @@ class WritablePBContainerFile {
   Status Sync();
 
   // Closes the container.
+  //
+  // Not thread-safe.
   Status Close();
 
   // Returns the path to the container's underlying file handle.
@@ -330,12 +334,19 @@ class WritablePBContainerFile {
   // Append bytes to the file.
   Status AppendBytes(const Slice& data);
 
+  // State of the file.
   FileState state_;
 
+  // Protects offset_.
+  Mutex offset_lock_;
+
   // Current write offset into the file.
   uint64_t offset_;
+
+  // Protobuf container file version.
   int version_;
 
+  // File writer.
   gscoped_ptr<RWFile> writer_;
 };
 

Reply via email to