http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/write_batch.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/write_batch.cc 
b/thirdparty/rocksdb/db/write_batch.cc
new file mode 100644
index 0000000..91be9a0
--- /dev/null
+++ b/thirdparty/rocksdb/db/write_batch.cc
@@ -0,0 +1,1396 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// WriteBatch::rep_ :=
+//    sequence: fixed64
+//    count: fixed32
+//    data: record[count]
+// record :=
+//    kTypeValue varstring varstring
+//    kTypeDeletion varstring
+//    kTypeSingleDeletion varstring
+//    kTypeMerge varstring varstring
+//    kTypeColumnFamilyValue varint32 varstring varstring
+//    kTypeColumnFamilyDeletion varint32 varstring varstring
+//    kTypeColumnFamilySingleDeletion varint32 varstring varstring
+//    kTypeColumnFamilyMerge varint32 varstring varstring
+//    kTypeBeginPrepareXID varstring
+//    kTypeEndPrepareXID
+//    kTypeCommitXID varstring
+//    kTypeRollbackXID varstring
+//    kTypeNoop
+// varstring :=
+//    len: varint32
+//    data: uint8[len]
+
+#include "rocksdb/write_batch.h"
+
+#include <map>
+#include <stack>
+#include <stdexcept>
+#include <type_traits>
+#include <vector>
+
+#include "db/column_family.h"
+#include "db/db_impl.h"
+#include "db/dbformat.h"
+#include "db/flush_scheduler.h"
+#include "db/memtable.h"
+#include "db/merge_context.h"
+#include "db/snapshot_impl.h"
+#include "db/write_batch_internal.h"
+#include "monitoring/perf_context_imp.h"
+#include "monitoring/statistics.h"
+#include "rocksdb/merge_operator.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace rocksdb {
+
+// anon namespace for file-local types
+namespace {
+
+enum ContentFlags : uint32_t {
+  DEFERRED = 1 << 0,
+  HAS_PUT = 1 << 1,
+  HAS_DELETE = 1 << 2,
+  HAS_SINGLE_DELETE = 1 << 3,
+  HAS_MERGE = 1 << 4,
+  HAS_BEGIN_PREPARE = 1 << 5,
+  HAS_END_PREPARE = 1 << 6,
+  HAS_COMMIT = 1 << 7,
+  HAS_ROLLBACK = 1 << 8,
+  HAS_DELETE_RANGE = 1 << 9,
+};
+
+struct BatchContentClassifier : public WriteBatch::Handler {
+  uint32_t content_flags = 0;
+
+  Status PutCF(uint32_t, const Slice&, const Slice&) override {
+    content_flags |= ContentFlags::HAS_PUT;
+    return Status::OK();
+  }
+
+  Status DeleteCF(uint32_t, const Slice&) override {
+    content_flags |= ContentFlags::HAS_DELETE;
+    return Status::OK();
+  }
+
+  Status SingleDeleteCF(uint32_t, const Slice&) override {
+    content_flags |= ContentFlags::HAS_SINGLE_DELETE;
+    return Status::OK();
+  }
+
+  Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
+    content_flags |= ContentFlags::HAS_DELETE_RANGE;
+    return Status::OK();
+  }
+
+  Status MergeCF(uint32_t, const Slice&, const Slice&) override {
+    content_flags |= ContentFlags::HAS_MERGE;
+    return Status::OK();
+  }
+
+  Status MarkBeginPrepare() override {
+    content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
+    return Status::OK();
+  }
+
+  Status MarkEndPrepare(const Slice&) override {
+    content_flags |= ContentFlags::HAS_END_PREPARE;
+    return Status::OK();
+  }
+
+  Status MarkCommit(const Slice&) override {
+    content_flags |= ContentFlags::HAS_COMMIT;
+    return Status::OK();
+  }
+
+  Status MarkRollback(const Slice&) override {
+    content_flags |= ContentFlags::HAS_ROLLBACK;
+    return Status::OK();
+  }
+};
+
+}  // anon namespace
+
+struct SavePoints {
+  std::stack<SavePoint> stack;
+};
+
+WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
+    : save_points_(nullptr), content_flags_(0), max_bytes_(max_bytes), rep_() {
+  rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
+    reserved_bytes : WriteBatchInternal::kHeader);
+  rep_.resize(WriteBatchInternal::kHeader);
+}
+
+WriteBatch::WriteBatch(const std::string& rep)
+    : save_points_(nullptr),
+      content_flags_(ContentFlags::DEFERRED),
+      max_bytes_(0),
+      rep_(rep) {}
+
+WriteBatch::WriteBatch(const WriteBatch& src)
+    : save_points_(src.save_points_),
+      wal_term_point_(src.wal_term_point_),
+      content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
+      max_bytes_(src.max_bytes_),
+      rep_(src.rep_) {}
+
+WriteBatch::WriteBatch(WriteBatch&& src)
+    : save_points_(std::move(src.save_points_)),
+      wal_term_point_(std::move(src.wal_term_point_)),
+      content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
+      max_bytes_(src.max_bytes_),
+      rep_(std::move(src.rep_)) {}
+
+WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
+  if (&src != this) {
+    this->~WriteBatch();
+    new (this) WriteBatch(src);
+  }
+  return *this;
+}
+
+WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
+  if (&src != this) {
+    this->~WriteBatch();
+    new (this) WriteBatch(std::move(src));
+  }
+  return *this;
+}
+
+WriteBatch::~WriteBatch() { delete save_points_; }
+
+WriteBatch::Handler::~Handler() { }
+
+void WriteBatch::Handler::LogData(const Slice& blob) {
+  // If the user has not specified something to do with blobs, then we ignore
+  // them.
+}
+
+bool WriteBatch::Handler::Continue() {
+  return true;
+}
+
+void WriteBatch::Clear() {
+  rep_.clear();
+  rep_.resize(WriteBatchInternal::kHeader);
+
+  content_flags_.store(0, std::memory_order_relaxed);
+
+  if (save_points_ != nullptr) {
+    while (!save_points_->stack.empty()) {
+      save_points_->stack.pop();
+    }
+  }
+
+  wal_term_point_.clear();
+}
+
+int WriteBatch::Count() const {
+  return WriteBatchInternal::Count(this);
+}
+
+uint32_t WriteBatch::ComputeContentFlags() const {
+  auto rv = content_flags_.load(std::memory_order_relaxed);
+  if ((rv & ContentFlags::DEFERRED) != 0) {
+    BatchContentClassifier classifier;
+    Iterate(&classifier);
+    rv = classifier.content_flags;
+
+    // this method is conceptually const, because it is performing a lazy
+    // computation that doesn't affect the abstract state of the batch.
+    // content_flags_ is marked mutable so that we can perform the
+    // following assignment
+    content_flags_.store(rv, std::memory_order_relaxed);
+  }
+  return rv;
+}
+
+void WriteBatch::MarkWalTerminationPoint() {
+  wal_term_point_.size = GetDataSize();
+  wal_term_point_.count = Count();
+  wal_term_point_.content_flags = content_flags_;
+}
+
+bool WriteBatch::HasPut() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
+}
+
+bool WriteBatch::HasDelete() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
+}
+
+bool WriteBatch::HasSingleDelete() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
+}
+
+bool WriteBatch::HasDeleteRange() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
+}
+
+bool WriteBatch::HasMerge() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
+}
+
+bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
+  assert(input != nullptr && key != nullptr);
+  // Skip tag byte
+  input->remove_prefix(1);
+
+  if (cf_record) {
+    // Skip column_family bytes
+    uint32_t cf;
+    if (!GetVarint32(input, &cf)) {
+      return false;
+    }
+  }
+
+  // Extract key
+  return GetLengthPrefixedSlice(input, key);
+}
+
+bool WriteBatch::HasBeginPrepare() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
+}
+
+bool WriteBatch::HasEndPrepare() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
+}
+
+bool WriteBatch::HasCommit() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
+}
+
+bool WriteBatch::HasRollback() const {
+  return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
+}
+
+Status ReadRecordFromWriteBatch(Slice* input, char* tag,
+                                uint32_t* column_family, Slice* key,
+                                Slice* value, Slice* blob, Slice* xid) {
+  assert(key != nullptr && value != nullptr);
+  *tag = (*input)[0];
+  input->remove_prefix(1);
+  *column_family = 0;  // default
+  switch (*tag) {
+    case kTypeColumnFamilyValue:
+      if (!GetVarint32(input, column_family)) {
+        return Status::Corruption("bad WriteBatch Put");
+      }
+    // intentional fallthrough
+    case kTypeValue:
+      if (!GetLengthPrefixedSlice(input, key) ||
+          !GetLengthPrefixedSlice(input, value)) {
+        return Status::Corruption("bad WriteBatch Put");
+      }
+      break;
+    case kTypeColumnFamilyDeletion:
+    case kTypeColumnFamilySingleDeletion:
+      if (!GetVarint32(input, column_family)) {
+        return Status::Corruption("bad WriteBatch Delete");
+      }
+    // intentional fallthrough
+    case kTypeDeletion:
+    case kTypeSingleDeletion:
+      if (!GetLengthPrefixedSlice(input, key)) {
+        return Status::Corruption("bad WriteBatch Delete");
+      }
+      break;
+    case kTypeColumnFamilyRangeDeletion:
+      if (!GetVarint32(input, column_family)) {
+        return Status::Corruption("bad WriteBatch DeleteRange");
+      }
+    // intentional fallthrough
+    case kTypeRangeDeletion:
+      // for range delete, "key" is begin_key, "value" is end_key
+      if (!GetLengthPrefixedSlice(input, key) ||
+          !GetLengthPrefixedSlice(input, value)) {
+        return Status::Corruption("bad WriteBatch DeleteRange");
+      }
+      break;
+    case kTypeColumnFamilyMerge:
+      if (!GetVarint32(input, column_family)) {
+        return Status::Corruption("bad WriteBatch Merge");
+      }
+    // intentional fallthrough
+    case kTypeMerge:
+      if (!GetLengthPrefixedSlice(input, key) ||
+          !GetLengthPrefixedSlice(input, value)) {
+        return Status::Corruption("bad WriteBatch Merge");
+      }
+      break;
+    case kTypeLogData:
+      assert(blob != nullptr);
+      if (!GetLengthPrefixedSlice(input, blob)) {
+        return Status::Corruption("bad WriteBatch Blob");
+      }
+      break;
+    case kTypeNoop:
+    case kTypeBeginPrepareXID:
+      break;
+    case kTypeEndPrepareXID:
+      if (!GetLengthPrefixedSlice(input, xid)) {
+        return Status::Corruption("bad EndPrepare XID");
+      }
+      break;
+    case kTypeCommitXID:
+      if (!GetLengthPrefixedSlice(input, xid)) {
+        return Status::Corruption("bad Commit XID");
+      }
+      break;
+    case kTypeRollbackXID:
+      if (!GetLengthPrefixedSlice(input, xid)) {
+        return Status::Corruption("bad Rollback XID");
+      }
+      break;
+    default:
+      return Status::Corruption("unknown WriteBatch tag");
+  }
+  return Status::OK();
+}
+
+Status WriteBatch::Iterate(Handler* handler) const {
+  Slice input(rep_);
+  if (input.size() < WriteBatchInternal::kHeader) {
+    return Status::Corruption("malformed WriteBatch (too small)");
+  }
+
+  input.remove_prefix(WriteBatchInternal::kHeader);
+  Slice key, value, blob, xid;
+  int found = 0;
+  Status s;
+  while (s.ok() && !input.empty() && handler->Continue()) {
+    char tag = 0;
+    uint32_t column_family = 0;  // default
+
+    s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
+                                 &blob, &xid);
+    if (!s.ok()) {
+      return s;
+    }
+
+    switch (tag) {
+      case kTypeColumnFamilyValue:
+      case kTypeValue:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
+        s = handler->PutCF(column_family, key, value);
+        found++;
+        break;
+      case kTypeColumnFamilyDeletion:
+      case kTypeDeletion:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
+        s = handler->DeleteCF(column_family, key);
+        found++;
+        break;
+      case kTypeColumnFamilySingleDeletion:
+      case kTypeSingleDeletion:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
+        s = handler->SingleDeleteCF(column_family, key);
+        found++;
+        break;
+      case kTypeColumnFamilyRangeDeletion:
+      case kTypeRangeDeletion:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
+        s = handler->DeleteRangeCF(column_family, key, value);
+        found++;
+        break;
+      case kTypeColumnFamilyMerge:
+      case kTypeMerge:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
+        s = handler->MergeCF(column_family, key, value);
+        found++;
+        break;
+      case kTypeLogData:
+        handler->LogData(blob);
+        break;
+      case kTypeBeginPrepareXID:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
+        handler->MarkBeginPrepare();
+        break;
+      case kTypeEndPrepareXID:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
+        handler->MarkEndPrepare(xid);
+        break;
+      case kTypeCommitXID:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
+        handler->MarkCommit(xid);
+        break;
+      case kTypeRollbackXID:
+        assert(content_flags_.load(std::memory_order_relaxed) &
+               (ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
+        handler->MarkRollback(xid);
+        break;
+      case kTypeNoop:
+        break;
+      default:
+        return Status::Corruption("unknown WriteBatch tag");
+    }
+  }
+  if (!s.ok()) {
+    return s;
+  }
+  if (found != WriteBatchInternal::Count(this)) {
+    return Status::Corruption("WriteBatch has wrong count");
+  } else {
+    return Status::OK();
+  }
+}
+
+int WriteBatchInternal::Count(const WriteBatch* b) {
+  return DecodeFixed32(b->rep_.data() + 8);
+}
+
+void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
+  EncodeFixed32(&b->rep_[8], n);
+}
+
+SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
+  return SequenceNumber(DecodeFixed64(b->rep_.data()));
+}
+
+void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
+  EncodeFixed64(&b->rep_[0], seq);
+}
+
+size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) {
+  return WriteBatchInternal::kHeader;
+}
+
+Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
+                               const Slice& key, const Slice& value) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeValue));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSlice(&b->rep_, key);
+  PutLengthPrefixedSlice(&b->rep_, value);
+  b->content_flags_.store(
+      b->content_flags_.load(std::memory_order_relaxed) | 
ContentFlags::HAS_PUT,
+      std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
+                       const Slice& value) {
+  return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
+                                 value);
+}
+
+Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
+                               const SliceParts& key, const SliceParts& value) 
{
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeValue));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSliceParts(&b->rep_, key);
+  PutLengthPrefixedSliceParts(&b->rep_, value);
+  b->content_flags_.store(
+      b->content_flags_.load(std::memory_order_relaxed) | 
ContentFlags::HAS_PUT,
+      std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& 
key,
+                       const SliceParts& value) {
+  return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
+                                 value);
+}
+
+Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
+  b->rep_.push_back(static_cast<char>(kTypeNoop));
+  return Status::OK();
+}
+
+Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
+  // a manually constructed batch can only contain one prepare section
+  assert(b->rep_[12] == static_cast<char>(kTypeNoop));
+
+  // all savepoints up to this point are cleared
+  if (b->save_points_ != nullptr) {
+    while (!b->save_points_->stack.empty()) {
+      b->save_points_->stack.pop();
+    }
+  }
+
+  // rewrite noop as begin marker
+  b->rep_[12] = static_cast<char>(kTypeBeginPrepareXID);
+  b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
+  PutLengthPrefixedSlice(&b->rep_, xid);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_END_PREPARE |
+                              ContentFlags::HAS_BEGIN_PREPARE,
+                          std::memory_order_relaxed);
+  return Status::OK();
+}
+
+Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
+  b->rep_.push_back(static_cast<char>(kTypeCommitXID));
+  PutLengthPrefixedSlice(&b->rep_, xid);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_COMMIT,
+                          std::memory_order_relaxed);
+  return Status::OK();
+}
+
+Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
+  b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
+  PutLengthPrefixedSlice(&b->rep_, xid);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_ROLLBACK,
+                          std::memory_order_relaxed);
+  return Status::OK();
+}
+
+Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
+                                  const Slice& key) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeDeletion));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSlice(&b->rep_, key);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_DELETE,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) 
{
+  return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
+                                    key);
+}
+
+Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
+                                  const SliceParts& key) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeDeletion));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSliceParts(&b->rep_, key);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_DELETE,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
+                          const SliceParts& key) {
+  return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
+                                    key);
+}
+
+Status WriteBatchInternal::SingleDelete(WriteBatch* b,
+                                        uint32_t column_family_id,
+                                        const Slice& key) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSlice(&b->rep_, key);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_SINGLE_DELETE,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
+                                const Slice& key) {
+  return WriteBatchInternal::SingleDelete(
+      this, GetColumnFamilyID(column_family), key);
+}
+
+Status WriteBatchInternal::SingleDelete(WriteBatch* b,
+                                        uint32_t column_family_id,
+                                        const SliceParts& key) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSliceParts(&b->rep_, key);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_SINGLE_DELETE,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
+                                const SliceParts& key) {
+  return WriteBatchInternal::SingleDelete(
+      this, GetColumnFamilyID(column_family), key);
+}
+
+Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t 
column_family_id,
+                                       const Slice& begin_key,
+                                       const Slice& end_key) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSlice(&b->rep_, begin_key);
+  PutLengthPrefixedSlice(&b->rep_, end_key);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_DELETE_RANGE,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
+                               const Slice& begin_key, const Slice& end_key) {
+  return WriteBatchInternal::DeleteRange(this, 
GetColumnFamilyID(column_family),
+                                         begin_key, end_key);
+}
+
+Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t 
column_family_id,
+                                       const SliceParts& begin_key,
+                                       const SliceParts& end_key) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSliceParts(&b->rep_, begin_key);
+  PutLengthPrefixedSliceParts(&b->rep_, end_key);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_DELETE_RANGE,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
+                               const SliceParts& begin_key,
+                               const SliceParts& end_key) {
+  return WriteBatchInternal::DeleteRange(this, 
GetColumnFamilyID(column_family),
+                                         begin_key, end_key);
+}
+
+Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
+                                 const Slice& key, const Slice& value) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeMerge));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSlice(&b->rep_, key);
+  PutLengthPrefixedSlice(&b->rep_, value);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_MERGE,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
+                         const Slice& value) {
+  return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
+                                   value);
+}
+
+Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
+                                 const SliceParts& key,
+                                 const SliceParts& value) {
+  LocalSavePoint save(b);
+  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
+  if (column_family_id == 0) {
+    b->rep_.push_back(static_cast<char>(kTypeMerge));
+  } else {
+    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
+    PutVarint32(&b->rep_, column_family_id);
+  }
+  PutLengthPrefixedSliceParts(&b->rep_, key);
+  PutLengthPrefixedSliceParts(&b->rep_, value);
+  b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
+                              ContentFlags::HAS_MERGE,
+                          std::memory_order_relaxed);
+  return save.commit();
+}
+
+Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
+                         const SliceParts& key, const SliceParts& value) {
+  return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
+                                   value);
+}
+
+Status WriteBatch::PutLogData(const Slice& blob) {
+  LocalSavePoint save(this);
+  rep_.push_back(static_cast<char>(kTypeLogData));
+  PutLengthPrefixedSlice(&rep_, blob);
+  return save.commit();
+}
+
+void WriteBatch::SetSavePoint() {
+  if (save_points_ == nullptr) {
+    save_points_ = new SavePoints();
+  }
+  // Record length and count of current batch of writes.
+  save_points_->stack.push(SavePoint(
+      GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
+}
+
+Status WriteBatch::RollbackToSavePoint() {
+  if (save_points_ == nullptr || save_points_->stack.size() == 0) {
+    return Status::NotFound();
+  }
+
+  // Pop the most recent savepoint off the stack
+  SavePoint savepoint = save_points_->stack.top();
+  save_points_->stack.pop();
+
+  assert(savepoint.size <= rep_.size());
+  assert(savepoint.count <= Count());
+
+  if (savepoint.size == rep_.size()) {
+    // No changes to rollback
+  } else if (savepoint.size == 0) {
+    // Rollback everything
+    Clear();
+  } else {
+    rep_.resize(savepoint.size);
+    WriteBatchInternal::SetCount(this, savepoint.count);
+    content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
+  }
+
+  return Status::OK();
+}
+
+Status WriteBatch::PopSavePoint() {
+  if (save_points_ == nullptr || save_points_->stack.size() == 0) {
+    return Status::NotFound();
+  }
+
+  // Pop the most recent savepoint off the stack
+  save_points_->stack.pop();
+
+  return Status::OK();
+}
+
+class MemTableInserter : public WriteBatch::Handler {
+
+  SequenceNumber sequence_;
+  ColumnFamilyMemTables* const cf_mems_;
+  FlushScheduler* const flush_scheduler_;
+  const bool ignore_missing_column_families_;
+  const uint64_t recovering_log_number_;
+  // log number that all Memtables inserted into should reference
+  uint64_t log_number_ref_;
+  DBImpl* db_;
+  const bool concurrent_memtable_writes_;
+  bool       post_info_created_;
+
+  bool* has_valid_writes_;
+  // On some (!) platforms just default creating
+  // a map is too expensive in the Write() path as they
+  // cause memory allocations though unused.
+  // Make creation optional but do not incur
+  // unique_ptr additional allocation
+  using 
+  MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
+  using
+  PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
+  PostMapType mem_post_info_map_;
+  // current recovered transaction we are rebuilding (recovery)
+  WriteBatch* rebuilding_trx_;
+
+  MemPostInfoMap& GetPostMap() {
+    assert(concurrent_memtable_writes_);
+    if(!post_info_created_) {
+      new (&mem_post_info_map_) MemPostInfoMap();
+      post_info_created_ = true;
+    }
+    return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
+  }
+
+public:
+  // cf_mems should not be shared with concurrent inserters
+ MemTableInserter(SequenceNumber _sequence, ColumnFamilyMemTables* cf_mems,
+                  FlushScheduler* flush_scheduler,
+                  bool ignore_missing_column_families,
+                  uint64_t recovering_log_number, DB* db,
+                  bool concurrent_memtable_writes,
+                  bool* has_valid_writes = nullptr)
+     : sequence_(_sequence),
+       cf_mems_(cf_mems),
+       flush_scheduler_(flush_scheduler),
+       ignore_missing_column_families_(ignore_missing_column_families),
+       recovering_log_number_(recovering_log_number),
+       log_number_ref_(0),
+       db_(reinterpret_cast<DBImpl*>(db)),
+       concurrent_memtable_writes_(concurrent_memtable_writes),
+       post_info_created_(false),
+       has_valid_writes_(has_valid_writes),
+       rebuilding_trx_(nullptr) {
+   assert(cf_mems_);
+  }
+
+  ~MemTableInserter() {
+    if (post_info_created_) {
+      reinterpret_cast<MemPostInfoMap*>
+        (&mem_post_info_map_)->~MemPostInfoMap();
+    }
+  }
+
+  MemTableInserter(const MemTableInserter&) = delete;
+  MemTableInserter& operator=(const MemTableInserter&) = delete;
+
+  void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
+
+  SequenceNumber sequence() const { return sequence_; }
+
+  void PostProcess() {
+    assert(concurrent_memtable_writes_);
+    // If post info was not created there is nothing
+    // to process and no need to create on demand
+    if(post_info_created_) {
+      for (auto& pair : GetPostMap()) {
+        pair.first->BatchPostProcess(pair.second);
+      }
+    }
+  }
+
+  bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
+    // If we are in a concurrent mode, it is the caller's responsibility
+    // to clone the original ColumnFamilyMemTables so that each thread
+    // has its own instance.  Otherwise, it must be guaranteed that there
+    // is no concurrent access
+    bool found = cf_mems_->Seek(column_family_id);
+    if (!found) {
+      if (ignore_missing_column_families_) {
+        *s = Status::OK();
+      } else {
+        *s = Status::InvalidArgument(
+            "Invalid column family specified in write batch");
+      }
+      return false;
+    }
+    if (recovering_log_number_ != 0 &&
+        recovering_log_number_ < cf_mems_->GetLogNumber()) {
+      // This is true only in recovery environment (recovering_log_number_ is
+      // always 0 in
+      // non-recovery, regular write code-path)
+      // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means 
that
+      // column
+      // family already contains updates from this log. We can't apply updates
+      // twice because of update-in-place or merge workloads -- ignore the
+      // update
+      *s = Status::OK();
+      return false;
+    }
+
+    if (has_valid_writes_ != nullptr) {
+      *has_valid_writes_ = true;
+    }
+
+    if (log_number_ref_ > 0) {
+      cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
+    }
+
+    return true;
+  }
+
+  virtual Status PutCF(uint32_t column_family_id, const Slice& key,
+                       const Slice& value) override {
+    if (rebuilding_trx_ != nullptr) {
+      WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
+      return Status::OK();
+    }
+
+    Status seek_status;
+    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
+      ++sequence_;
+      return seek_status;
+    }
+
+    MemTable* mem = cf_mems_->GetMemTable();
+    auto* moptions = mem->GetMemTableOptions();
+    if (!moptions->inplace_update_support) {
+      mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_,
+               get_post_process_info(mem));
+    } else if (moptions->inplace_callback == nullptr) {
+      assert(!concurrent_memtable_writes_);
+      mem->Update(sequence_, key, value);
+      RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
+    } else {
+      assert(!concurrent_memtable_writes_);
+      if (mem->UpdateCallback(sequence_, key, value)) {
+      } else {
+        // key not found in memtable. Do sst get, update, add
+        SnapshotImpl read_from_snapshot;
+        read_from_snapshot.number_ = sequence_;
+        ReadOptions ropts;
+        ropts.snapshot = &read_from_snapshot;
+
+        std::string prev_value;
+        std::string merged_value;
+
+        auto cf_handle = cf_mems_->GetColumnFamilyHandle();
+        Status s = Status::NotSupported();
+        if (db_ != nullptr && recovering_log_number_ == 0) {
+          if (cf_handle == nullptr) {
+            cf_handle = db_->DefaultColumnFamily();
+          }
+          s = db_->Get(ropts, cf_handle, key, &prev_value);
+        }
+
+        char* prev_buffer = const_cast<char*>(prev_value.c_str());
+        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
+        auto status = moptions->inplace_callback(s.ok() ? prev_buffer : 
nullptr,
+                                                 s.ok() ? &prev_size : nullptr,
+                                                 value, &merged_value);
+        if (status == UpdateStatus::UPDATED_INPLACE) {
+          // prev_value is updated in-place with final value.
+          mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
+          RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
+        } else if (status == UpdateStatus::UPDATED) {
+          // merged_value contains the final value.
+          mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
+          RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
+        }
+      }
+    }
+    // Since all Puts are logged in trasaction logs (if enabled), always bump
+    // sequence number. Even if the update eventually fails and does not result
+    // in memtable add/update.
+    sequence_++;
+    CheckMemtableFull();
+    return Status::OK();
+  }
+
+  Status DeleteImpl(uint32_t column_family_id, const Slice& key,
+                    const Slice& value, ValueType delete_type) {
+    MemTable* mem = cf_mems_->GetMemTable();
+    mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_,
+             get_post_process_info(mem));
+    sequence_++;
+    CheckMemtableFull();
+    return Status::OK();
+  }
+
+  virtual Status DeleteCF(uint32_t column_family_id,
+                          const Slice& key) override {
+    if (rebuilding_trx_ != nullptr) {
+      WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
+      return Status::OK();
+    }
+
+    Status seek_status;
+    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
+      ++sequence_;
+      return seek_status;
+    }
+
+    return DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
+  }
+
+  virtual Status SingleDeleteCF(uint32_t column_family_id,
+                                const Slice& key) override {
+    if (rebuilding_trx_ != nullptr) {
+      WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
+      return Status::OK();
+    }
+
+    Status seek_status;
+    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
+      ++sequence_;
+      return seek_status;
+    }
+
+    return DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
+  }
+
+  virtual Status DeleteRangeCF(uint32_t column_family_id,
+                               const Slice& begin_key,
+                               const Slice& end_key) override {
+    if (rebuilding_trx_ != nullptr) {
+      WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
+                                      begin_key, end_key);
+      return Status::OK();
+    }
+
+    Status seek_status;
+    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
+      ++sequence_;
+      return seek_status;
+    }
+    if (db_ != nullptr) {
+      auto cf_handle = cf_mems_->GetColumnFamilyHandle();
+      if (cf_handle == nullptr) {
+        cf_handle = db_->DefaultColumnFamily();
+      }
+      auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
+      if (!cfd->is_delete_range_supported()) {
+        return Status::NotSupported(
+            std::string("DeleteRange not supported for table type ") +
+            cfd->ioptions()->table_factory->Name() + " in CF " +
+            cfd->GetName());
+      }
+    }
+
+    return DeleteImpl(column_family_id, begin_key, end_key, 
kTypeRangeDeletion);
+  }
+
+  virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
+                         const Slice& value) override {
+    assert(!concurrent_memtable_writes_);
+    if (rebuilding_trx_ != nullptr) {
+      WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
+      return Status::OK();
+    }
+
+    Status seek_status;
+    if (!SeekToColumnFamily(column_family_id, &seek_status)) {
+      ++sequence_;
+      return seek_status;
+    }
+
+    MemTable* mem = cf_mems_->GetMemTable();
+    auto* moptions = mem->GetMemTableOptions();
+    bool perform_merge = false;
+
+    // If we pass DB through and options.max_successive_merges is hit
+    // during recovery, Get() will be issued which will try to acquire
+    // DB mutex and cause deadlock, as DB mutex is already held.
+    // So we disable merge in recovery
+    if (moptions->max_successive_merges > 0 && db_ != nullptr &&
+        recovering_log_number_ == 0) {
+      LookupKey lkey(key, sequence_);
+
+      // Count the number of successive merges at the head
+      // of the key in the memtable
+      size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
+
+      if (num_merges >= moptions->max_successive_merges) {
+        perform_merge = true;
+      }
+    }
+
+    if (perform_merge) {
+      // 1) Get the existing value
+      std::string get_value;
+
+      // Pass in the sequence number so that we also include previous merge
+      // operations in the same batch.
+      SnapshotImpl read_from_snapshot;
+      read_from_snapshot.number_ = sequence_;
+      ReadOptions read_options;
+      read_options.snapshot = &read_from_snapshot;
+
+      auto cf_handle = cf_mems_->GetColumnFamilyHandle();
+      if (cf_handle == nullptr) {
+        cf_handle = db_->DefaultColumnFamily();
+      }
+      db_->Get(read_options, cf_handle, key, &get_value);
+      Slice get_value_slice = Slice(get_value);
+
+      // 2) Apply this merge
+      auto merge_operator = moptions->merge_operator;
+      assert(merge_operator);
+
+      std::string new_value;
+
+      Status merge_status = MergeHelper::TimedFullMerge(
+          merge_operator, key, &get_value_slice, {value}, &new_value,
+          moptions->info_log, moptions->statistics, Env::Default());
+
+      if (!merge_status.ok()) {
+        // Failed to merge!
+        // Store the delta in memtable
+        perform_merge = false;
+      } else {
+        // 3) Add value to memtable
+        mem->Add(sequence_, kTypeValue, key, new_value);
+      }
+    }
+
+    if (!perform_merge) {
+      // Add merge operator to memtable
+      mem->Add(sequence_, kTypeMerge, key, value);
+    }
+
+    sequence_++;
+    CheckMemtableFull();
+    return Status::OK();
+  }
+
+  void CheckMemtableFull() {
+    if (flush_scheduler_ != nullptr) {
+      auto* cfd = cf_mems_->current();
+      assert(cfd != nullptr);
+      if (cfd->mem()->ShouldScheduleFlush() &&
+          cfd->mem()->MarkFlushScheduled()) {
+        // MarkFlushScheduled only returns true if we are the one that
+        // should take action, so no need to dedup further
+        flush_scheduler_->ScheduleFlush(cfd);
+      }
+    }
+  }
+
+  Status MarkBeginPrepare() override {
+    assert(rebuilding_trx_ == nullptr);
+    assert(db_);
+
+    if (recovering_log_number_ != 0) {
+      // during recovery we rebuild a hollow transaction
+      // from all encountered prepare sections of the wal
+      if (db_->allow_2pc() == false) {
+        return Status::NotSupported(
+            "WAL contains prepared transactions. Open with "
+            "TransactionDB::Open().");
+      }
+
+      // we are now iterating through a prepared section
+      rebuilding_trx_ = new WriteBatch();
+      if (has_valid_writes_ != nullptr) {
+        *has_valid_writes_ = true;
+      }
+    } else {
+      // in non-recovery we ignore prepare markers
+      // and insert the values directly. making sure we have a
+      // log for each insertion to reference.
+      assert(log_number_ref_ > 0);
+    }
+
+    return Status::OK();
+  }
+
+  Status MarkEndPrepare(const Slice& name) override {
+    assert(db_);
+    assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
+
+    if (recovering_log_number_ != 0) {
+      assert(db_->allow_2pc());
+      db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
+                                      rebuilding_trx_);
+      rebuilding_trx_ = nullptr;
+    } else {
+      assert(rebuilding_trx_ == nullptr);
+      assert(log_number_ref_ > 0);
+    }
+
+    return Status::OK();
+  }
+
+  Status MarkCommit(const Slice& name) override {
+    assert(db_);
+
+    Status s;
+
+    if (recovering_log_number_ != 0) {
+      // in recovery when we encounter a commit marker
+      // we lookup this transaction in our set of rebuilt transactions
+      // and commit.
+      auto trx = db_->GetRecoveredTransaction(name.ToString());
+
+      // the log contaiting the prepared section may have
+      // been released in the last incarnation because the
+      // data was flushed to L0
+      if (trx != nullptr) {
+        // at this point individual CF lognumbers will prevent
+        // duplicate re-insertion of values.
+        assert(log_number_ref_ == 0);
+        // all insertes must reference this trx log number
+        log_number_ref_ = trx->log_number_;
+        s = trx->batch_->Iterate(this);
+        log_number_ref_ = 0;
+
+        if (s.ok()) {
+          db_->DeleteRecoveredTransaction(name.ToString());
+        }
+        if (has_valid_writes_ != nullptr) {
+          *has_valid_writes_ = true;
+        }
+      }
+    } else {
+      // in non recovery we simply ignore this tag
+    }
+
+    return s;
+  }
+
+  Status MarkRollback(const Slice& name) override {
+    assert(db_);
+
+    if (recovering_log_number_ != 0) {
+      auto trx = db_->GetRecoveredTransaction(name.ToString());
+
+      // the log containing the transactions prep section
+      // may have been released in the previous incarnation
+      // because we knew it had been rolled back
+      if (trx != nullptr) {
+        db_->DeleteRecoveredTransaction(name.ToString());
+      }
+    } else {
+      // in non recovery we simply ignore this tag
+    }
+
+    return Status::OK();
+  }
+
+ private:
+  MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
+    if (!concurrent_memtable_writes_) {
+      // No need to batch counters locally if we don't use concurrent mode.
+      return nullptr;
+    }
+    return &GetPostMap()[mem];
+  }
+};
+
+// This function can only be called in these conditions:
+// 1) During Recovery()
+// 2) During Write(), in a single-threaded write thread
+// 3) During Write(), in a concurrent context where memtables has been cloned
+// The reason is that it calls memtables->Seek(), which has a stateful cache
+Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group,
+                                      SequenceNumber sequence,
+                                      ColumnFamilyMemTables* memtables,
+                                      FlushScheduler* flush_scheduler,
+                                      bool ignore_missing_column_families,
+                                      uint64_t recovery_log_number, DB* db,
+                                      bool concurrent_memtable_writes) {
+  MemTableInserter inserter(sequence, memtables, flush_scheduler,
+                            ignore_missing_column_families, 
recovery_log_number,
+                            db, concurrent_memtable_writes);
+  for (auto w : write_group) {
+    if (!w->ShouldWriteToMemtable()) {
+      continue;
+    }
+    SetSequence(w->batch, inserter.sequence());
+    inserter.set_log_number_ref(w->log_ref);
+    w->status = w->batch->Iterate(&inserter);
+    if (!w->status.ok()) {
+      return w->status;
+    }
+  }
+  return Status::OK();
+}
+
+Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
+                                      SequenceNumber sequence,
+                                      ColumnFamilyMemTables* memtables,
+                                      FlushScheduler* flush_scheduler,
+                                      bool ignore_missing_column_families,
+                                      uint64_t log_number, DB* db,
+                                      bool concurrent_memtable_writes) {
+  assert(writer->ShouldWriteToMemtable());
+  MemTableInserter inserter(sequence, memtables, flush_scheduler,
+                            ignore_missing_column_families, log_number, db,
+                            concurrent_memtable_writes);
+  SetSequence(writer->batch, sequence);
+  inserter.set_log_number_ref(writer->log_ref);
+  Status s = writer->batch->Iterate(&inserter);
+  if (concurrent_memtable_writes) {
+    inserter.PostProcess();
+  }
+  return s;
+}
+
+Status WriteBatchInternal::InsertInto(
+    const WriteBatch* batch, ColumnFamilyMemTables* memtables,
+    FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
+    uint64_t log_number, DB* db, bool concurrent_memtable_writes,
+    SequenceNumber* last_seq_used, bool* has_valid_writes) {
+  MemTableInserter inserter(Sequence(batch), memtables, flush_scheduler,
+                            ignore_missing_column_families, log_number, db,
+                            concurrent_memtable_writes, has_valid_writes);
+  Status s = batch->Iterate(&inserter);
+  if (last_seq_used != nullptr) {
+    *last_seq_used = inserter.sequence();
+  }
+  if (concurrent_memtable_writes) {
+    inserter.PostProcess();
+  }
+  return s;
+}
+
+Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
+  assert(contents.size() >= WriteBatchInternal::kHeader);
+  b->rep_.assign(contents.data(), contents.size());
+  b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
+  return Status::OK();
+}
+
+Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
+                                  const bool wal_only) {
+  size_t src_len;
+  int src_count;
+  uint32_t src_flags;
+
+  const SavePoint& batch_end = src->GetWalTerminationPoint();
+
+  if (wal_only && !batch_end.is_cleared()) {
+    src_len = batch_end.size - WriteBatchInternal::kHeader;
+    src_count = batch_end.count;
+    src_flags = batch_end.content_flags;
+  } else {
+    src_len = src->rep_.size() - WriteBatchInternal::kHeader;
+    src_count = Count(src);
+    src_flags = src->content_flags_.load(std::memory_order_relaxed);
+  }
+
+  SetCount(dst, Count(dst) + src_count);
+  assert(src->rep_.size() >= WriteBatchInternal::kHeader);
+  dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
+  dst->content_flags_.store(
+      dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
+      std::memory_order_relaxed);
+  return Status::OK();
+}
+
+size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
+                                            size_t rightByteSize) {
+  if (leftByteSize == 0 || rightByteSize == 0) {
+    return leftByteSize + rightByteSize;
+  } else {
+    return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
+  }
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/write_batch_base.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/write_batch_base.cc 
b/thirdparty/rocksdb/db/write_batch_base.cc
new file mode 100644
index 0000000..5522c1f
--- /dev/null
+++ b/thirdparty/rocksdb/db/write_batch_base.cc
@@ -0,0 +1,94 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/write_batch_base.h"
+
+#include <string>
+
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+
+namespace rocksdb {
+
+// Simple implementation of SlicePart variants of Put().  Child classes
+// can override these method with more performant solutions if they choose.
+Status WriteBatchBase::Put(ColumnFamilyHandle* column_family,
+                           const SliceParts& key, const SliceParts& value) {
+  std::string key_buf, value_buf;
+  Slice key_slice(key, &key_buf);
+  Slice value_slice(value, &value_buf);
+
+  return Put(column_family, key_slice, value_slice);
+}
+
+Status WriteBatchBase::Put(const SliceParts& key, const SliceParts& value) {
+  std::string key_buf, value_buf;
+  Slice key_slice(key, &key_buf);
+  Slice value_slice(value, &value_buf);
+
+  return Put(key_slice, value_slice);
+}
+
+Status WriteBatchBase::Delete(ColumnFamilyHandle* column_family,
+                              const SliceParts& key) {
+  std::string key_buf;
+  Slice key_slice(key, &key_buf);
+  return Delete(column_family, key_slice);
+}
+
+Status WriteBatchBase::Delete(const SliceParts& key) {
+  std::string key_buf;
+  Slice key_slice(key, &key_buf);
+  return Delete(key_slice);
+}
+
+Status WriteBatchBase::SingleDelete(ColumnFamilyHandle* column_family,
+                                    const SliceParts& key) {
+  std::string key_buf;
+  Slice key_slice(key, &key_buf);
+  return SingleDelete(column_family, key_slice);
+}
+
+Status WriteBatchBase::SingleDelete(const SliceParts& key) {
+  std::string key_buf;
+  Slice key_slice(key, &key_buf);
+  return SingleDelete(key_slice);
+}
+
+Status WriteBatchBase::DeleteRange(ColumnFamilyHandle* column_family,
+                                   const SliceParts& begin_key,
+                                   const SliceParts& end_key) {
+  std::string begin_key_buf, end_key_buf;
+  Slice begin_key_slice(begin_key, &begin_key_buf);
+  Slice end_key_slice(end_key, &end_key_buf);
+  return DeleteRange(column_family, begin_key_slice, end_key_slice);
+}
+
+Status WriteBatchBase::DeleteRange(const SliceParts& begin_key,
+                                   const SliceParts& end_key) {
+  std::string begin_key_buf, end_key_buf;
+  Slice begin_key_slice(begin_key, &begin_key_buf);
+  Slice end_key_slice(end_key, &end_key_buf);
+  return DeleteRange(begin_key_slice, end_key_slice);
+}
+
+Status WriteBatchBase::Merge(ColumnFamilyHandle* column_family,
+                             const SliceParts& key, const SliceParts& value) {
+  std::string key_buf, value_buf;
+  Slice key_slice(key, &key_buf);
+  Slice value_slice(value, &value_buf);
+
+  return Merge(column_family, key_slice, value_slice);
+}
+
+Status WriteBatchBase::Merge(const SliceParts& key, const SliceParts& value) {
+  std::string key_buf, value_buf;
+  Slice key_slice(key, &key_buf);
+  Slice value_slice(value, &value_buf);
+
+  return Merge(key_slice, value_slice);
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/write_batch_internal.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/write_batch_internal.h 
b/thirdparty/rocksdb/db/write_batch_internal.h
new file mode 100644
index 0000000..48a417c
--- /dev/null
+++ b/thirdparty/rocksdb/db/write_batch_internal.h
@@ -0,0 +1,227 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <vector>
+#include "db/write_thread.h"
+#include "rocksdb/types.h"
+#include "rocksdb/write_batch.h"
+#include "rocksdb/db.h"
+#include "rocksdb/options.h"
+#include "util/autovector.h"
+
+namespace rocksdb {
+
+class MemTable;
+class FlushScheduler;
+class ColumnFamilyData;
+
+class ColumnFamilyMemTables {
+ public:
+  virtual ~ColumnFamilyMemTables() {}
+  virtual bool Seek(uint32_t column_family_id) = 0;
+  // returns true if the update to memtable should be ignored
+  // (useful when recovering from log whose updates have already
+  // been processed)
+  virtual uint64_t GetLogNumber() const = 0;
+  virtual MemTable* GetMemTable() const = 0;
+  virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
+  virtual ColumnFamilyData* current() { return nullptr; }
+};
+
+class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
+ public:
+  explicit ColumnFamilyMemTablesDefault(MemTable* mem)
+      : ok_(false), mem_(mem) {}
+
+  bool Seek(uint32_t column_family_id) override {
+    ok_ = (column_family_id == 0);
+    return ok_;
+  }
+
+  uint64_t GetLogNumber() const override { return 0; }
+
+  MemTable* GetMemTable() const override {
+    assert(ok_);
+    return mem_;
+  }
+
+  ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
+
+ private:
+  bool ok_;
+  MemTable* mem_;
+};
+
+// WriteBatchInternal provides static methods for manipulating a
+// WriteBatch that we don't want in the public WriteBatch interface.
+class WriteBatchInternal {
+ public:
+
+  // WriteBatch header has an 8-byte sequence number followed by a 4-byte 
count.
+  static const size_t kHeader = 12;
+
+  // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
+  static Status Put(WriteBatch* batch, uint32_t column_family_id,
+                    const Slice& key, const Slice& value);
+
+  static Status Put(WriteBatch* batch, uint32_t column_family_id,
+                    const SliceParts& key, const SliceParts& value);
+
+  static Status Delete(WriteBatch* batch, uint32_t column_family_id,
+                       const SliceParts& key);
+
+  static Status Delete(WriteBatch* batch, uint32_t column_family_id,
+                       const Slice& key);
+
+  static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
+                             const SliceParts& key);
+
+  static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
+                             const Slice& key);
+
+  static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
+                            const Slice& begin_key, const Slice& end_key);
+
+  static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
+                            const SliceParts& begin_key,
+                            const SliceParts& end_key);
+
+  static Status Merge(WriteBatch* batch, uint32_t column_family_id,
+                      const Slice& key, const Slice& value);
+
+  static Status Merge(WriteBatch* batch, uint32_t column_family_id,
+                      const SliceParts& key, const SliceParts& value);
+
+  static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid);
+
+  static Status MarkRollback(WriteBatch* batch, const Slice& xid);
+
+  static Status MarkCommit(WriteBatch* batch, const Slice& xid);
+
+  static Status InsertNoop(WriteBatch* batch);
+
+  // Return the number of entries in the batch.
+  static int Count(const WriteBatch* batch);
+
+  // Set the count for the number of entries in the batch.
+  static void SetCount(WriteBatch* batch, int n);
+
+  // Return the seqeunce number for the start of this batch.
+  static SequenceNumber Sequence(const WriteBatch* batch);
+
+  // Store the specified number as the seqeunce number for the start of
+  // this batch.
+  static void SetSequence(WriteBatch* batch, SequenceNumber seq);
+
+  // Returns the offset of the first entry in the batch.
+  // This offset is only valid if the batch is not empty.
+  static size_t GetFirstOffset(WriteBatch* batch);
+
+  static Slice Contents(const WriteBatch* batch) {
+    return Slice(batch->rep_);
+  }
+
+  static size_t ByteSize(const WriteBatch* batch) {
+    return batch->rep_.size();
+  }
+
+  static Status SetContents(WriteBatch* batch, const Slice& contents);
+
+  // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
+  //
+  // If ignore_missing_column_families == true. WriteBatch
+  // referencing non-existing column family will be ignored.
+  // If ignore_missing_column_families == false, processing of the
+  // batches will be stopped if a reference is found to a non-existing
+  // column family and InvalidArgument() will be returned.  The writes
+  // in batches may be only partially applied at that point.
+  //
+  // If log_number is non-zero, the memtable will be updated only if
+  // memtables->GetLogNumber() >= log_number.
+  //
+  // If flush_scheduler is non-null, it will be invoked if the memtable
+  // should be flushed.
+  //
+  // Under concurrent use, the caller is responsible for making sure that
+  // the memtables object itself is thread-local.
+  static Status InsertInto(WriteThread::WriteGroup& write_group,
+                           SequenceNumber sequence,
+                           ColumnFamilyMemTables* memtables,
+                           FlushScheduler* flush_scheduler,
+                           bool ignore_missing_column_families = false,
+                           uint64_t log_number = 0, DB* db = nullptr,
+                           bool concurrent_memtable_writes = false);
+
+  // Convenience form of InsertInto when you have only one batch
+  // last_seq_used returns the last sequnce number used in a MemTable insert
+  static Status InsertInto(const WriteBatch* batch,
+                           ColumnFamilyMemTables* memtables,
+                           FlushScheduler* flush_scheduler,
+                           bool ignore_missing_column_families = false,
+                           uint64_t log_number = 0, DB* db = nullptr,
+                           bool concurrent_memtable_writes = false,
+                           SequenceNumber* last_seq_used = nullptr,
+                           bool* has_valid_writes = nullptr);
+
+  static Status InsertInto(WriteThread::Writer* writer, SequenceNumber 
sequence,
+                           ColumnFamilyMemTables* memtables,
+                           FlushScheduler* flush_scheduler,
+                           bool ignore_missing_column_families = false,
+                           uint64_t log_number = 0, DB* db = nullptr,
+                           bool concurrent_memtable_writes = false);
+
+  static Status Append(WriteBatch* dst, const WriteBatch* src,
+                       const bool WAL_only = false);
+
+  // Returns the byte size of appending a WriteBatch with ByteSize
+  // leftByteSize and a WriteBatch with ByteSize rightByteSize
+  static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
+};
+
+// LocalSavePoint is similar to a scope guard
+class LocalSavePoint {
+ public:
+  explicit LocalSavePoint(WriteBatch* batch)
+      : batch_(batch),
+        savepoint_(batch->GetDataSize(), batch->Count(),
+                   batch->content_flags_.load(std::memory_order_relaxed))
+#ifndef NDEBUG
+        ,
+        committed_(false)
+#endif
+  {
+  }
+
+#ifndef NDEBUG
+  ~LocalSavePoint() { assert(committed_); }
+#endif
+  Status commit() {
+#ifndef NDEBUG
+    committed_ = true;
+#endif
+    if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
+      batch_->rep_.resize(savepoint_.size);
+      WriteBatchInternal::SetCount(batch_, savepoint_.count);
+      batch_->content_flags_.store(savepoint_.content_flags,
+                                   std::memory_order_relaxed);
+      return Status::MemoryLimit();
+    }
+    return Status::OK();
+  }
+
+ private:
+  WriteBatch* batch_;
+  SavePoint savepoint_;
+#ifndef NDEBUG
+  bool committed_;
+#endif
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/write_callback.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/write_callback.h 
b/thirdparty/rocksdb/db/write_callback.h
new file mode 100644
index 0000000..6517a7c
--- /dev/null
+++ b/thirdparty/rocksdb/db/write_callback.h
@@ -0,0 +1,27 @@
+// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include "rocksdb/status.h"
+
+namespace rocksdb {
+
+class DB;
+
+class WriteCallback {
+ public:
+  virtual ~WriteCallback() {}
+
+  // Will be called while on the write thread before the write executes.  If
+  // this function returns a non-OK status, the write will be aborted and this
+  // status will be returned to the caller of DB::Write().
+  virtual Status Callback(DB* db) = 0;
+
+  // return true if writes with this callback can be batched with other writes
+  virtual bool AllowWriteBatching() = 0;
+};
+
+}  //  namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/write_controller.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/write_controller.cc 
b/thirdparty/rocksdb/db/write_controller.cc
new file mode 100644
index 0000000..558aa72
--- /dev/null
+++ b/thirdparty/rocksdb/db/write_controller.cc
@@ -0,0 +1,128 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "db/write_controller.h"
+
+#include <atomic>
+#include <cassert>
+#include <ratio>
+#include "rocksdb/env.h"
+
+namespace rocksdb {
+
+std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
+  ++total_stopped_;
+  return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
+}
+
+std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
+    uint64_t write_rate) {
+  total_delayed_++;
+  // Reset counters.
+  last_refill_time_ = 0;
+  bytes_left_ = 0;
+  set_delayed_write_rate(write_rate);
+  return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
+}
+
+std::unique_ptr<WriteControllerToken>
+WriteController::GetCompactionPressureToken() {
+  ++total_compaction_pressure_;
+  return std::unique_ptr<WriteControllerToken>(
+      new CompactionPressureToken(this));
+}
+
+bool WriteController::IsStopped() const {
+  return total_stopped_.load(std::memory_order_relaxed) > 0;
+}
+// This is inside DB mutex, so we can't sleep and need to minimize
+// frequency to get time.
+// If it turns out to be a performance issue, we can redesign the thread
+// synchronization model here.
+// The function trust caller will sleep micros returned.
+uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
+  if (total_stopped_.load(std::memory_order_relaxed) > 0) {
+    return 0;
+  }
+  if (total_delayed_.load(std::memory_order_relaxed) == 0) {
+    return 0;
+  }
+
+  const uint64_t kMicrosPerSecond = 1000000;
+  const uint64_t kRefillInterval = 1024U;
+
+  if (bytes_left_ >= num_bytes) {
+    bytes_left_ -= num_bytes;
+    return 0;
+  }
+  // The frequency to get time inside DB mutex is less than one per refill
+  // interval.
+  auto time_now = NowMicrosMonotonic(env);
+
+  uint64_t sleep_debt = 0;
+  uint64_t time_since_last_refill = 0;
+  if (last_refill_time_ != 0) {
+    if (last_refill_time_ > time_now) {
+      sleep_debt = last_refill_time_ - time_now;
+    } else {
+      time_since_last_refill = time_now - last_refill_time_;
+      bytes_left_ +=
+          static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /
+                                kMicrosPerSecond * delayed_write_rate_);
+      if (time_since_last_refill >= kRefillInterval &&
+          bytes_left_ > num_bytes) {
+        // If refill interval already passed and we have enough bytes
+        // return without extra sleeping.
+        last_refill_time_ = time_now;
+        bytes_left_ -= num_bytes;
+        return 0;
+      }
+    }
+  }
+
+  uint64_t single_refill_amount =
+      delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
+  if (bytes_left_ + single_refill_amount >= num_bytes) {
+    // Wait until a refill interval
+    // Never trigger expire for less than one refill interval to avoid to get
+    // time.
+    bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
+    last_refill_time_ = time_now + kRefillInterval;
+    return kRefillInterval + sleep_debt;
+  }
+
+  // Need to refill more than one interval. Need to sleep longer. Check
+  // whether expiration will hit
+
+  // Sleep just until `num_bytes` is allowed.
+  uint64_t sleep_amount =
+      static_cast<uint64_t>(num_bytes /
+                            static_cast<long double>(delayed_write_rate_) *
+                            kMicrosPerSecond) +
+      sleep_debt;
+  last_refill_time_ = time_now + sleep_amount;
+  return sleep_amount;
+}
+
+uint64_t WriteController::NowMicrosMonotonic(Env* env) {
+  return env->NowNanos() / std::milli::den;
+}
+
+StopWriteToken::~StopWriteToken() {
+  assert(controller_->total_stopped_ >= 1);
+  --controller_->total_stopped_;
+}
+
+DelayWriteToken::~DelayWriteToken() {
+  controller_->total_delayed_--;
+  assert(controller_->total_delayed_.load() >= 0);
+}
+
+CompactionPressureToken::~CompactionPressureToken() {
+  controller_->total_compaction_pressure_--;
+  assert(controller_->total_compaction_pressure_ >= 0);
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/write_controller.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/write_controller.h 
b/thirdparty/rocksdb/db/write_controller.h
new file mode 100644
index 0000000..7c301ce
--- /dev/null
+++ b/thirdparty/rocksdb/db/write_controller.h
@@ -0,0 +1,144 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include "rocksdb/rate_limiter.h"
+
+namespace rocksdb {
+
+class Env;
+class WriteControllerToken;
+
+// WriteController is controlling write stalls in our write code-path. Write
+// stalls happen when compaction can't keep up with write rate.
+// All of the methods here (including WriteControllerToken's destructors) need
+// to be called while holding DB mutex
+class WriteController {
+ public:
+  explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
+                           int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
+      : total_stopped_(0),
+        total_delayed_(0),
+        total_compaction_pressure_(0),
+        bytes_left_(0),
+        last_refill_time_(0),
+        low_pri_rate_limiter_(
+            NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
+    set_max_delayed_write_rate(_delayed_write_rate);
+  }
+  ~WriteController() = default;
+
+  // When an actor (column family) requests a stop token, all writes will be
+  // stopped until the stop token is released (deleted)
+  std::unique_ptr<WriteControllerToken> GetStopToken();
+  // When an actor (column family) requests a delay token, total delay for all
+  // writes to the DB will be controlled under the delayed write rate. Every
+  // write needs to call GetDelay() with number of bytes writing to the DB,
+  // which returns number of microseconds to sleep.
+  std::unique_ptr<WriteControllerToken> GetDelayToken(
+      uint64_t delayed_write_rate);
+  // When an actor (column family) requests a moderate token, compaction
+  // threads will be increased
+  std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
+
+  // these three metods are querying the state of the WriteController
+  bool IsStopped() const;
+  bool NeedsDelay() const { return total_delayed_.load() > 0; }
+  bool NeedSpeedupCompaction() const {
+    return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
+  }
+  // return how many microseconds the caller needs to sleep after the call
+  // num_bytes: how many number of bytes to put into the DB.
+  // Prerequisite: DB mutex held.
+  uint64_t GetDelay(Env* env, uint64_t num_bytes);
+  void set_delayed_write_rate(uint64_t write_rate) {
+    // avoid divide 0
+    if (write_rate == 0) {
+      write_rate = 1u;
+    } else if (write_rate > max_delayed_write_rate()) {
+      write_rate = max_delayed_write_rate();
+    }
+    delayed_write_rate_ = write_rate;
+  }
+
+  void set_max_delayed_write_rate(uint64_t write_rate) {
+    // avoid divide 0
+    if (write_rate == 0) {
+      write_rate = 1u;
+    }
+    max_delayed_write_rate_ = write_rate;
+    // update delayed_write_rate_ as well
+    delayed_write_rate_ = write_rate;
+  }
+
+  uint64_t delayed_write_rate() const { return delayed_write_rate_; }
+
+  uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
+
+  RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
+
+ private:
+  uint64_t NowMicrosMonotonic(Env* env);
+
+  friend class WriteControllerToken;
+  friend class StopWriteToken;
+  friend class DelayWriteToken;
+  friend class CompactionPressureToken;
+
+  std::atomic<int> total_stopped_;
+  std::atomic<int> total_delayed_;
+  std::atomic<int> total_compaction_pressure_;
+  uint64_t bytes_left_;
+  uint64_t last_refill_time_;
+  // write rate set when initialization or by `DBImpl::SetDBOptions`
+  uint64_t max_delayed_write_rate_;
+  // current write rate
+  uint64_t delayed_write_rate_;
+
+  std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
+};
+
+class WriteControllerToken {
+ public:
+  explicit WriteControllerToken(WriteController* controller)
+      : controller_(controller) {}
+  virtual ~WriteControllerToken() {}
+
+ protected:
+  WriteController* controller_;
+
+ private:
+  // no copying allowed
+  WriteControllerToken(const WriteControllerToken&) = delete;
+  void operator=(const WriteControllerToken&) = delete;
+};
+
+class StopWriteToken : public WriteControllerToken {
+ public:
+  explicit StopWriteToken(WriteController* controller)
+      : WriteControllerToken(controller) {}
+  virtual ~StopWriteToken();
+};
+
+class DelayWriteToken : public WriteControllerToken {
+ public:
+  explicit DelayWriteToken(WriteController* controller)
+      : WriteControllerToken(controller) {}
+  virtual ~DelayWriteToken();
+};
+
+class CompactionPressureToken : public WriteControllerToken {
+ public:
+  explicit CompactionPressureToken(WriteController* controller)
+      : WriteControllerToken(controller) {}
+  virtual ~CompactionPressureToken();
+};
+
+}  // namespace rocksdb

Reply via email to