http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_test_util.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_test_util.cc 
b/thirdparty/rocksdb/db/db_test_util.cc
new file mode 100644
index 0000000..c4d465b
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_test_util.cc
@@ -0,0 +1,1395 @@
+// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/db_test_util.h"
+#include "db/forward_iterator.h"
+#include "rocksdb/env_encryption.h"
+
+namespace rocksdb {
+
+// Special Env used to delay background operations
+
+SpecialEnv::SpecialEnv(Env* base)
+    : EnvWrapper(base),
+      rnd_(301),
+      sleep_counter_(this),
+      addon_time_(0),
+      time_elapse_only_sleep_(false),
+      no_slowdown_(false) {
+  delay_sstable_sync_.store(false, std::memory_order_release);
+  drop_writes_.store(false, std::memory_order_release);
+  no_space_.store(false, std::memory_order_release);
+  non_writable_.store(false, std::memory_order_release);
+  count_random_reads_ = false;
+  count_sequential_reads_ = false;
+  manifest_sync_error_.store(false, std::memory_order_release);
+  manifest_write_error_.store(false, std::memory_order_release);
+  log_write_error_.store(false, std::memory_order_release);
+  random_file_open_counter_.store(0, std::memory_order_relaxed);
+  delete_count_.store(0, std::memory_order_relaxed);
+  num_open_wal_file_.store(0);
+  log_write_slowdown_ = 0;
+  bytes_written_ = 0;
+  sync_counter_ = 0;
+  non_writeable_rate_ = 0;
+  new_writable_count_ = 0;
+  non_writable_count_ = 0;
+  table_write_callback_ = nullptr;
+}
+#ifndef ROCKSDB_LITE
+ROT13BlockCipher rot13Cipher_(16);
+#endif  // ROCKSDB_LITE
+
+DBTestBase::DBTestBase(const std::string path)
+    : mem_env_(!getenv("MEM_ENV") ? nullptr : new MockEnv(Env::Default())),
+#ifndef ROCKSDB_LITE
+      encrypted_env_(
+          !getenv("ENCRYPTED_ENV")
+              ? nullptr
+              : NewEncryptedEnv(mem_env_ ? mem_env_ : Env::Default(),
+                                new CTREncryptionProvider(rot13Cipher_))),
+#else
+      encrypted_env_(nullptr),
+#endif  // ROCKSDB_LITE
+      env_(new SpecialEnv(encrypted_env_
+                              ? encrypted_env_
+                              : (mem_env_ ? mem_env_ : Env::Default()))),
+      option_config_(kDefault) {
+  env_->SetBackgroundThreads(1, Env::LOW);
+  env_->SetBackgroundThreads(1, Env::HIGH);
+  dbname_ = test::TmpDir(env_) + path;
+  alternative_wal_dir_ = dbname_ + "/wal";
+  alternative_db_log_dir_ = dbname_ + "/db_log_dir";
+  auto options = CurrentOptions();
+  options.env = env_;
+  auto delete_options = options;
+  delete_options.wal_dir = alternative_wal_dir_;
+  EXPECT_OK(DestroyDB(dbname_, delete_options));
+  // Destroy it for not alternative WAL dir is used.
+  EXPECT_OK(DestroyDB(dbname_, options));
+  db_ = nullptr;
+  Reopen(options);
+  Random::GetTLSInstance()->Reset(0xdeadbeef);
+}
+
+DBTestBase::~DBTestBase() {
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  rocksdb::SyncPoint::GetInstance()->LoadDependency({});
+  rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
+  Close();
+  Options options;
+  options.db_paths.emplace_back(dbname_, 0);
+  options.db_paths.emplace_back(dbname_ + "_2", 0);
+  options.db_paths.emplace_back(dbname_ + "_3", 0);
+  options.db_paths.emplace_back(dbname_ + "_4", 0);
+  options.env = env_;
+
+  if (getenv("KEEP_DB")) {
+    printf("DB is still at %s\n", dbname_.c_str());
+  } else {
+    EXPECT_OK(DestroyDB(dbname_, options));
+  }
+  delete env_;
+}
+
+bool DBTestBase::ShouldSkipOptions(int option_config, int skip_mask) {
+#ifdef ROCKSDB_LITE
+    // These options are not supported in ROCKSDB_LITE
+  if (option_config == kHashSkipList ||
+      option_config == kPlainTableFirstBytePrefix ||
+      option_config == kPlainTableCappedPrefix ||
+      option_config == kPlainTableCappedPrefixNonMmap ||
+      option_config == kPlainTableAllBytesPrefix ||
+      option_config == kVectorRep || option_config == kHashLinkList ||
+      option_config == kHashCuckoo || option_config == kUniversalCompaction ||
+      option_config == kUniversalCompactionMultiLevel ||
+      option_config == kUniversalSubcompactions ||
+      option_config == kFIFOCompaction ||
+      option_config == kConcurrentSkipList) {
+    return true;
+    }
+#endif
+
+    if ((skip_mask & kSkipUniversalCompaction) &&
+        (option_config == kUniversalCompaction ||
+         option_config == kUniversalCompactionMultiLevel)) {
+      return true;
+    }
+    if ((skip_mask & kSkipMergePut) && option_config == kMergePut) {
+      return true;
+    }
+    if ((skip_mask & kSkipNoSeekToLast) &&
+        (option_config == kHashLinkList || option_config == kHashSkipList)) {
+      return true;
+    }
+    if ((skip_mask & kSkipPlainTable) &&
+        (option_config == kPlainTableAllBytesPrefix ||
+         option_config == kPlainTableFirstBytePrefix ||
+         option_config == kPlainTableCappedPrefix ||
+         option_config == kPlainTableCappedPrefixNonMmap)) {
+      return true;
+    }
+    if ((skip_mask & kSkipHashIndex) &&
+        (option_config == kBlockBasedTableWithPrefixHashIndex ||
+         option_config == kBlockBasedTableWithWholeKeyHashIndex)) {
+      return true;
+    }
+    if ((skip_mask & kSkipHashCuckoo) && (option_config == kHashCuckoo)) {
+      return true;
+    }
+    if ((skip_mask & kSkipFIFOCompaction) && option_config == kFIFOCompaction) 
{
+      return true;
+    }
+    if ((skip_mask & kSkipMmapReads) && option_config == kWalDirAndMmapReads) {
+      return true;
+    }
+    return false;
+}
+
+// Switch to a fresh database with the next option configuration to
+// test.  Return false if there are no more configurations to test.
+bool DBTestBase::ChangeOptions(int skip_mask) {
+  for (option_config_++; option_config_ < kEnd; option_config_++) {
+    if (ShouldSkipOptions(option_config_, skip_mask)) {
+      continue;
+    }
+    break;
+  }
+
+  if (option_config_ >= kEnd) {
+    Destroy(last_options_);
+    return false;
+  } else {
+    auto options = CurrentOptions();
+    options.create_if_missing = true;
+    DestroyAndReopen(options);
+    return true;
+  }
+}
+
+// Switch between different compaction styles.
+bool DBTestBase::ChangeCompactOptions() {
+  if (option_config_ == kDefault) {
+    option_config_ = kUniversalCompaction;
+    Destroy(last_options_);
+    auto options = CurrentOptions();
+    options.create_if_missing = true;
+    TryReopen(options);
+    return true;
+  } else if (option_config_ == kUniversalCompaction) {
+    option_config_ = kUniversalCompactionMultiLevel;
+    Destroy(last_options_);
+    auto options = CurrentOptions();
+    options.create_if_missing = true;
+    TryReopen(options);
+    return true;
+  } else if (option_config_ == kUniversalCompactionMultiLevel) {
+    option_config_ = kLevelSubcompactions;
+    Destroy(last_options_);
+    auto options = CurrentOptions();
+    assert(options.max_subcompactions > 1);
+    TryReopen(options);
+    return true;
+  } else if (option_config_ == kLevelSubcompactions) {
+    option_config_ = kUniversalSubcompactions;
+    Destroy(last_options_);
+    auto options = CurrentOptions();
+    assert(options.max_subcompactions > 1);
+    TryReopen(options);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+// Switch between different WAL settings
+bool DBTestBase::ChangeWalOptions() {
+  if (option_config_ == kDefault) {
+    option_config_ = kDBLogDir;
+    Destroy(last_options_);
+    auto options = CurrentOptions();
+    Destroy(options);
+    options.create_if_missing = true;
+    TryReopen(options);
+    return true;
+  } else if (option_config_ == kDBLogDir) {
+    option_config_ = kWalDirAndMmapReads;
+    Destroy(last_options_);
+    auto options = CurrentOptions();
+    Destroy(options);
+    options.create_if_missing = true;
+    TryReopen(options);
+    return true;
+  } else if (option_config_ == kWalDirAndMmapReads) {
+    option_config_ = kRecycleLogFiles;
+    Destroy(last_options_);
+    auto options = CurrentOptions();
+    Destroy(options);
+    TryReopen(options);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+// Switch between different filter policy
+// Jump from kDefault to kFilter to kFullFilter
+bool DBTestBase::ChangeFilterOptions() {
+  if (option_config_ == kDefault) {
+    option_config_ = kFilter;
+  } else if (option_config_ == kFilter) {
+    option_config_ = kFullFilterWithNewTableReaderForCompactions;
+  } else if (option_config_ == kFullFilterWithNewTableReaderForCompactions) {
+    option_config_ = kPartitionedFilterWithNewTableReaderForCompactions;
+  } else {
+    return false;
+  }
+  Destroy(last_options_);
+
+  auto options = CurrentOptions();
+  options.create_if_missing = true;
+  TryReopen(options);
+  return true;
+}
+
+// Return the current option configuration.
+Options DBTestBase::CurrentOptions(
+    const anon::OptionsOverride& options_override) const {
+  return GetOptions(option_config_, GetDefaultOptions(), options_override);
+}
+
+Options DBTestBase::CurrentOptions(
+    const Options& default_options,
+    const anon::OptionsOverride& options_override) const {
+  return GetOptions(option_config_, default_options, options_override);
+}
+
+Options DBTestBase::GetDefaultOptions() {
+  Options options;
+  options.write_buffer_size = 4090 * 4096;
+  options.target_file_size_base = 2 * 1024 * 1024;
+  options.max_bytes_for_level_base = 10 * 1024 * 1024;
+  options.max_open_files = 5000;
+  options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
+  options.compaction_pri = CompactionPri::kByCompensatedSize;
+  return options;
+}
+
+Options DBTestBase::GetOptions(
+    int option_config, const Options& default_options,
+    const anon::OptionsOverride& options_override) const {
+  // this redundant copy is to minimize code change w/o having lint error.
+  Options options = default_options;
+  BlockBasedTableOptions table_options;
+  bool set_block_based_table_factory = true;
+#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) &&  \
+  !defined(OS_AIX)
+  rocksdb::SyncPoint::GetInstance()->ClearCallBack(
+      "NewRandomAccessFile:O_DIRECT");
+  rocksdb::SyncPoint::GetInstance()->ClearCallBack(
+      "NewWritableFile:O_DIRECT");
+#endif
+
+  bool can_allow_mmap = IsMemoryMappedAccessSupported();
+  switch (option_config) {
+#ifndef ROCKSDB_LITE
+    case kHashSkipList:
+      options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+      options.memtable_factory.reset(NewHashSkipListRepFactory(16));
+      options.allow_concurrent_memtable_write = false;
+      break;
+    case kPlainTableFirstBytePrefix:
+      options.table_factory.reset(new PlainTableFactory());
+      options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+      options.allow_mmap_reads = can_allow_mmap;
+      options.max_sequential_skip_in_iterations = 999999;
+      set_block_based_table_factory = false;
+      break;
+    case kPlainTableCappedPrefix:
+      options.table_factory.reset(new PlainTableFactory());
+      options.prefix_extractor.reset(NewCappedPrefixTransform(8));
+      options.allow_mmap_reads = can_allow_mmap;
+      options.max_sequential_skip_in_iterations = 999999;
+      set_block_based_table_factory = false;
+      break;
+    case kPlainTableCappedPrefixNonMmap:
+      options.table_factory.reset(new PlainTableFactory());
+      options.prefix_extractor.reset(NewCappedPrefixTransform(8));
+      options.allow_mmap_reads = false;
+      options.max_sequential_skip_in_iterations = 999999;
+      set_block_based_table_factory = false;
+      break;
+    case kPlainTableAllBytesPrefix:
+      options.table_factory.reset(new PlainTableFactory());
+      options.prefix_extractor.reset(NewNoopTransform());
+      options.allow_mmap_reads = can_allow_mmap;
+      options.max_sequential_skip_in_iterations = 999999;
+      set_block_based_table_factory = false;
+      break;
+    case kVectorRep:
+      options.memtable_factory.reset(new VectorRepFactory(100));
+      options.allow_concurrent_memtable_write = false;
+      break;
+    case kHashLinkList:
+      options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+      options.memtable_factory.reset(
+          NewHashLinkListRepFactory(4, 0, 3, true, 4));
+      options.allow_concurrent_memtable_write = false;
+      break;
+    case kHashCuckoo:
+      options.memtable_factory.reset(
+          NewHashCuckooRepFactory(options.write_buffer_size));
+      options.allow_concurrent_memtable_write = false;
+      break;
+#endif  // ROCKSDB_LITE
+    case kMergePut:
+      options.merge_operator = MergeOperators::CreatePutOperator();
+      break;
+    case kFilter:
+      table_options.filter_policy.reset(NewBloomFilterPolicy(10, true));
+      break;
+    case kFullFilterWithNewTableReaderForCompactions:
+      table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
+      options.new_table_reader_for_compaction_inputs = true;
+      options.compaction_readahead_size = 10 * 1024 * 1024;
+      break;
+    case kPartitionedFilterWithNewTableReaderForCompactions:
+      table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
+      table_options.partition_filters = true;
+      table_options.index_type =
+          BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
+      options.new_table_reader_for_compaction_inputs = true;
+      options.compaction_readahead_size = 10 * 1024 * 1024;
+      break;
+    case kUncompressed:
+      options.compression = kNoCompression;
+      break;
+    case kNumLevel_3:
+      options.num_levels = 3;
+      break;
+    case kDBLogDir:
+      options.db_log_dir = alternative_db_log_dir_;
+      break;
+    case kWalDirAndMmapReads:
+      options.wal_dir = alternative_wal_dir_;
+      // mmap reads should be orthogonal to WalDir setting, so we piggyback to
+      // this option config to test mmap reads as well
+      options.allow_mmap_reads = can_allow_mmap;
+      break;
+    case kManifestFileSize:
+      options.max_manifest_file_size = 50;  // 50 bytes
+      break;
+    case kPerfOptions:
+      options.soft_rate_limit = 2.0;
+      options.delayed_write_rate = 8 * 1024 * 1024;
+      options.report_bg_io_stats = true;
+      // TODO(3.13) -- test more options
+      break;
+    case kUniversalCompaction:
+      options.compaction_style = kCompactionStyleUniversal;
+      options.num_levels = 1;
+      break;
+    case kUniversalCompactionMultiLevel:
+      options.compaction_style = kCompactionStyleUniversal;
+      options.num_levels = 8;
+      break;
+    case kCompressedBlockCache:
+      options.allow_mmap_writes = can_allow_mmap;
+      table_options.block_cache_compressed = NewLRUCache(8 * 1024 * 1024);
+      break;
+    case kInfiniteMaxOpenFiles:
+      options.max_open_files = -1;
+      break;
+    case kxxHashChecksum: {
+      table_options.checksum = kxxHash;
+      break;
+    }
+    case kFIFOCompaction: {
+      options.compaction_style = kCompactionStyleFIFO;
+      break;
+    }
+    case kBlockBasedTableWithPrefixHashIndex: {
+      table_options.index_type = BlockBasedTableOptions::kHashSearch;
+      options.prefix_extractor.reset(NewFixedPrefixTransform(1));
+      break;
+    }
+    case kBlockBasedTableWithWholeKeyHashIndex: {
+      table_options.index_type = BlockBasedTableOptions::kHashSearch;
+      options.prefix_extractor.reset(NewNoopTransform());
+      break;
+    }
+    case kBlockBasedTableWithPartitionedIndex: {
+      table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
+      options.prefix_extractor.reset(NewNoopTransform());
+      break;
+    }
+    case kBlockBasedTableWithIndexRestartInterval: {
+      table_options.index_block_restart_interval = 8;
+      break;
+    }
+    case kOptimizeFiltersForHits: {
+      options.optimize_filters_for_hits = true;
+      set_block_based_table_factory = true;
+      break;
+    }
+    case kRowCache: {
+      options.row_cache = NewLRUCache(1024 * 1024);
+      break;
+    }
+    case kRecycleLogFiles: {
+      options.recycle_log_file_num = 2;
+      break;
+    }
+    case kLevelSubcompactions: {
+      options.max_subcompactions = 4;
+      break;
+    }
+    case kUniversalSubcompactions: {
+      options.compaction_style = kCompactionStyleUniversal;
+      options.num_levels = 8;
+      options.max_subcompactions = 4;
+      break;
+    }
+    case kConcurrentSkipList: {
+      options.allow_concurrent_memtable_write = true;
+      options.enable_write_thread_adaptive_yield = true;
+      break;
+    }
+    case kDirectIO: {
+      options.use_direct_reads = true;
+      options.use_direct_io_for_flush_and_compaction = true;
+      options.compaction_readahead_size = 2 * 1024 * 1024;
+#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \
+    !defined(OS_AIX)
+      rocksdb::SyncPoint::GetInstance()->SetCallBack(
+          "NewWritableFile:O_DIRECT", [&](void* arg) {
+            int* val = static_cast<int*>(arg);
+            *val &= ~O_DIRECT;
+          });
+      rocksdb::SyncPoint::GetInstance()->SetCallBack(
+          "NewRandomAccessFile:O_DIRECT", [&](void* arg) {
+            int* val = static_cast<int*>(arg);
+            *val &= ~O_DIRECT;
+          });
+      rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+#endif
+      break;
+    }
+    case kPipelinedWrite: {
+      options.enable_pipelined_write = true;
+      break;
+    }
+    case kConcurrentWALWrites: {
+      // This options optimize 2PC commit path
+      options.concurrent_prepare = true;
+      options.manual_wal_flush = true;
+      break;
+    }
+
+    default:
+      break;
+  }
+
+  if (options_override.filter_policy) {
+    table_options.filter_policy = options_override.filter_policy;
+    table_options.partition_filters = options_override.partition_filters;
+    table_options.metadata_block_size = options_override.metadata_block_size;
+  }
+  if (set_block_based_table_factory) {
+    options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+  }
+  options.env = env_;
+  options.create_if_missing = true;
+  options.fail_if_options_file_error = true;
+  return options;
+}
+
+void DBTestBase::CreateColumnFamilies(const std::vector<std::string>& cfs,
+                                      const Options& options) {
+  ColumnFamilyOptions cf_opts(options);
+  size_t cfi = handles_.size();
+  handles_.resize(cfi + cfs.size());
+  for (auto cf : cfs) {
+    ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
+  }
+}
+
+void DBTestBase::CreateAndReopenWithCF(const std::vector<std::string>& cfs,
+                                       const Options& options) {
+  CreateColumnFamilies(cfs, options);
+  std::vector<std::string> cfs_plus_default = cfs;
+  cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
+  ReopenWithColumnFamilies(cfs_plus_default, options);
+}
+
+void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+                                          const std::vector<Options>& options) 
{
+  ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
+}
+
+void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+                                          const Options& options) {
+  ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
+}
+
+Status DBTestBase::TryReopenWithColumnFamilies(
+    const std::vector<std::string>& cfs, const std::vector<Options>& options) {
+  Close();
+  EXPECT_EQ(cfs.size(), options.size());
+  std::vector<ColumnFamilyDescriptor> column_families;
+  for (size_t i = 0; i < cfs.size(); ++i) {
+    column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
+  }
+  DBOptions db_opts = DBOptions(options[0]);
+  return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
+}
+
+Status DBTestBase::TryReopenWithColumnFamilies(
+    const std::vector<std::string>& cfs, const Options& options) {
+  Close();
+  std::vector<Options> v_opts(cfs.size(), options);
+  return TryReopenWithColumnFamilies(cfs, v_opts);
+}
+
+void DBTestBase::Reopen(const Options& options) {
+  ASSERT_OK(TryReopen(options));
+}
+
+void DBTestBase::Close() {
+  for (auto h : handles_) {
+    db_->DestroyColumnFamilyHandle(h);
+  }
+  handles_.clear();
+  delete db_;
+  db_ = nullptr;
+}
+
+void DBTestBase::DestroyAndReopen(const Options& options) {
+  // Destroy using last options
+  Destroy(last_options_);
+  ASSERT_OK(TryReopen(options));
+}
+
+void DBTestBase::Destroy(const Options& options) {
+  Close();
+  ASSERT_OK(DestroyDB(dbname_, options));
+}
+
+Status DBTestBase::ReadOnlyReopen(const Options& options) {
+  return DB::OpenForReadOnly(options, dbname_, &db_);
+}
+
+Status DBTestBase::TryReopen(const Options& options) {
+  Close();
+  last_options_.table_factory.reset();
+  // Note: operator= is an unsafe approach here since it destructs shared_ptr 
in
+  // the same order of their creation, in contrast to destructors which
+  // destructs them in the opposite order of creation. One particular problme 
is
+  // that the cache destructor might invoke callback functions that use Option
+  // members such as statistics. To work around this problem, we manually call
+  // destructor of table_facotry which eventually clears the block cache.
+  last_options_ = options;
+  return DB::Open(options, dbname_, &db_);
+}
+
+bool DBTestBase::IsDirectIOSupported() {
+  EnvOptions env_options;
+  env_options.use_mmap_writes = false;
+  env_options.use_direct_writes = true;
+  std::string tmp = TempFileName(dbname_, 999);
+  Status s;
+  {
+    unique_ptr<WritableFile> file;
+    s = env_->NewWritableFile(tmp, &file, env_options);
+  }
+  if (s.ok()) {
+    s = env_->DeleteFile(tmp);
+  }
+  return s.ok();
+}
+
+bool DBTestBase::IsMemoryMappedAccessSupported() const {
+  return (!encrypted_env_);
+}
+
+Status DBTestBase::Flush(int cf) {
+  if (cf == 0) {
+    return db_->Flush(FlushOptions());
+  } else {
+    return db_->Flush(FlushOptions(), handles_[cf]);
+  }
+}
+
+Status DBTestBase::Put(const Slice& k, const Slice& v, WriteOptions wo) {
+  if (kMergePut == option_config_) {
+    return db_->Merge(wo, k, v);
+  } else {
+    return db_->Put(wo, k, v);
+  }
+}
+
+Status DBTestBase::Put(int cf, const Slice& k, const Slice& v,
+                       WriteOptions wo) {
+  if (kMergePut == option_config_) {
+    return db_->Merge(wo, handles_[cf], k, v);
+  } else {
+    return db_->Put(wo, handles_[cf], k, v);
+  }
+}
+
+Status DBTestBase::Merge(const Slice& k, const Slice& v, WriteOptions wo) {
+  return db_->Merge(wo, k, v);
+}
+
+Status DBTestBase::Merge(int cf, const Slice& k, const Slice& v,
+                         WriteOptions wo) {
+  return db_->Merge(wo, handles_[cf], k, v);
+}
+
+Status DBTestBase::Delete(const std::string& k) {
+  return db_->Delete(WriteOptions(), k);
+}
+
+Status DBTestBase::Delete(int cf, const std::string& k) {
+  return db_->Delete(WriteOptions(), handles_[cf], k);
+}
+
+Status DBTestBase::SingleDelete(const std::string& k) {
+  return db_->SingleDelete(WriteOptions(), k);
+}
+
+Status DBTestBase::SingleDelete(int cf, const std::string& k) {
+  return db_->SingleDelete(WriteOptions(), handles_[cf], k);
+}
+
+std::string DBTestBase::Get(const std::string& k, const Snapshot* snapshot) {
+  ReadOptions options;
+  options.verify_checksums = true;
+  options.snapshot = snapshot;
+  std::string result;
+  Status s = db_->Get(options, k, &result);
+  if (s.IsNotFound()) {
+    result = "NOT_FOUND";
+  } else if (!s.ok()) {
+    result = s.ToString();
+  }
+  return result;
+}
+
+std::string DBTestBase::Get(int cf, const std::string& k,
+                            const Snapshot* snapshot) {
+  ReadOptions options;
+  options.verify_checksums = true;
+  options.snapshot = snapshot;
+  std::string result;
+  Status s = db_->Get(options, handles_[cf], k, &result);
+  if (s.IsNotFound()) {
+    result = "NOT_FOUND";
+  } else if (!s.ok()) {
+    result = s.ToString();
+  }
+  return result;
+}
+
+Status DBTestBase::Get(const std::string& k, PinnableSlice* v) {
+  ReadOptions options;
+  options.verify_checksums = true;
+  Status s = dbfull()->Get(options, dbfull()->DefaultColumnFamily(), k, v);
+  return s;
+}
+
+uint64_t DBTestBase::GetNumSnapshots() {
+  uint64_t int_num;
+  EXPECT_TRUE(dbfull()->GetIntProperty("rocksdb.num-snapshots", &int_num));
+  return int_num;
+}
+
+uint64_t DBTestBase::GetTimeOldestSnapshots() {
+  uint64_t int_num;
+  EXPECT_TRUE(
+      dbfull()->GetIntProperty("rocksdb.oldest-snapshot-time", &int_num));
+  return int_num;
+}
+
+// Return a string that contains all key,value pairs in order,
+// formatted like "(k1->v1)(k2->v2)".
+std::string DBTestBase::Contents(int cf) {
+  std::vector<std::string> forward;
+  std::string result;
+  Iterator* iter = (cf == 0) ? db_->NewIterator(ReadOptions())
+                             : db_->NewIterator(ReadOptions(), handles_[cf]);
+  for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+    std::string s = IterStatus(iter);
+    result.push_back('(');
+    result.append(s);
+    result.push_back(')');
+    forward.push_back(s);
+  }
+
+  // Check reverse iteration results are the reverse of forward results
+  unsigned int matched = 0;
+  for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
+    EXPECT_LT(matched, forward.size());
+    EXPECT_EQ(IterStatus(iter), forward[forward.size() - matched - 1]);
+    matched++;
+  }
+  EXPECT_EQ(matched, forward.size());
+
+  delete iter;
+  return result;
+}
+
+std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
+  Arena arena;
+  auto options = CurrentOptions();
+  InternalKeyComparator icmp(options.comparator);
+  RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
+  ScopedArenaIterator iter;
+  if (cf == 0) {
+    iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg));
+  } else {
+    iter.set(
+        dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf]));
+  }
+  InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
+  iter->Seek(target.Encode());
+  std::string result;
+  if (!iter->status().ok()) {
+    result = iter->status().ToString();
+  } else {
+    result = "[ ";
+    bool first = true;
+    while (iter->Valid()) {
+      ParsedInternalKey ikey(Slice(), 0, kTypeValue);
+      if (!ParseInternalKey(iter->key(), &ikey)) {
+        result += "CORRUPTED";
+      } else {
+        if (!last_options_.comparator->Equal(ikey.user_key, user_key)) {
+          break;
+        }
+        if (!first) {
+          result += ", ";
+        }
+        first = false;
+        switch (ikey.type) {
+          case kTypeValue:
+            result += iter->value().ToString();
+            break;
+          case kTypeMerge:
+            // keep it the same as kTypeValue for testing kMergePut
+            result += iter->value().ToString();
+            break;
+          case kTypeDeletion:
+            result += "DEL";
+            break;
+          case kTypeSingleDeletion:
+            result += "SDEL";
+            break;
+          default:
+            assert(false);
+            break;
+        }
+      }
+      iter->Next();
+    }
+    if (!first) {
+      result += " ";
+    }
+    result += "]";
+  }
+  return result;
+}
+
+#ifndef ROCKSDB_LITE
+int DBTestBase::NumSortedRuns(int cf) {
+  ColumnFamilyMetaData cf_meta;
+  if (cf == 0) {
+    db_->GetColumnFamilyMetaData(&cf_meta);
+  } else {
+    db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
+  }
+  int num_sr = static_cast<int>(cf_meta.levels[0].files.size());
+  for (size_t i = 1U; i < cf_meta.levels.size(); i++) {
+    if (cf_meta.levels[i].files.size() > 0) {
+      num_sr++;
+    }
+  }
+  return num_sr;
+}
+
+uint64_t DBTestBase::TotalSize(int cf) {
+  ColumnFamilyMetaData cf_meta;
+  if (cf == 0) {
+    db_->GetColumnFamilyMetaData(&cf_meta);
+  } else {
+    db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
+  }
+  return cf_meta.size;
+}
+
+uint64_t DBTestBase::SizeAtLevel(int level) {
+  std::vector<LiveFileMetaData> metadata;
+  db_->GetLiveFilesMetaData(&metadata);
+  uint64_t sum = 0;
+  for (const auto& m : metadata) {
+    if (m.level == level) {
+      sum += m.size;
+    }
+  }
+  return sum;
+}
+
+size_t DBTestBase::TotalLiveFiles(int cf) {
+  ColumnFamilyMetaData cf_meta;
+  if (cf == 0) {
+    db_->GetColumnFamilyMetaData(&cf_meta);
+  } else {
+    db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta);
+  }
+  size_t num_files = 0;
+  for (auto& level : cf_meta.levels) {
+    num_files += level.files.size();
+  }
+  return num_files;
+}
+
+size_t DBTestBase::CountLiveFiles() {
+  std::vector<LiveFileMetaData> metadata;
+  db_->GetLiveFilesMetaData(&metadata);
+  return metadata.size();
+}
+#endif  // ROCKSDB_LITE
+
+int DBTestBase::NumTableFilesAtLevel(int level, int cf) {
+  std::string property;
+  if (cf == 0) {
+    // default cfd
+    EXPECT_TRUE(db_->GetProperty(
+        "rocksdb.num-files-at-level" + NumberToString(level), &property));
+  } else {
+    EXPECT_TRUE(db_->GetProperty(
+        handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
+        &property));
+  }
+  return atoi(property.c_str());
+}
+
+double DBTestBase::CompressionRatioAtLevel(int level, int cf) {
+  std::string property;
+  if (cf == 0) {
+    // default cfd
+    EXPECT_TRUE(db_->GetProperty(
+        "rocksdb.compression-ratio-at-level" + NumberToString(level),
+        &property));
+  } else {
+    EXPECT_TRUE(db_->GetProperty(
+        handles_[cf],
+        "rocksdb.compression-ratio-at-level" + NumberToString(level),
+        &property));
+  }
+  return std::stod(property);
+}
+
+int DBTestBase::TotalTableFiles(int cf, int levels) {
+  if (levels == -1) {
+    levels = (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
+  }
+  int result = 0;
+  for (int level = 0; level < levels; level++) {
+    result += NumTableFilesAtLevel(level, cf);
+  }
+  return result;
+}
+
+// Return spread of files per level
+std::string DBTestBase::FilesPerLevel(int cf) {
+  int num_levels =
+      (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
+  std::string result;
+  size_t last_non_zero_offset = 0;
+  for (int level = 0; level < num_levels; level++) {
+    int f = NumTableFilesAtLevel(level, cf);
+    char buf[100];
+    snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
+    result += buf;
+    if (f > 0) {
+      last_non_zero_offset = result.size();
+    }
+  }
+  result.resize(last_non_zero_offset);
+  return result;
+}
+
+size_t DBTestBase::CountFiles() {
+  std::vector<std::string> files;
+  env_->GetChildren(dbname_, &files);
+
+  std::vector<std::string> logfiles;
+  if (dbname_ != last_options_.wal_dir) {
+    env_->GetChildren(last_options_.wal_dir, &logfiles);
+  }
+
+  return files.size() + logfiles.size();
+}
+
+uint64_t DBTestBase::Size(const Slice& start, const Slice& limit, int cf) {
+  Range r(start, limit);
+  uint64_t size;
+  if (cf == 0) {
+    db_->GetApproximateSizes(&r, 1, &size);
+  } else {
+    db_->GetApproximateSizes(handles_[1], &r, 1, &size);
+  }
+  return size;
+}
+
+void DBTestBase::Compact(int cf, const Slice& start, const Slice& limit,
+                         uint32_t target_path_id) {
+  CompactRangeOptions compact_options;
+  compact_options.target_path_id = target_path_id;
+  ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
+}
+
+void DBTestBase::Compact(int cf, const Slice& start, const Slice& limit) {
+  ASSERT_OK(
+      db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
+}
+
+void DBTestBase::Compact(const Slice& start, const Slice& limit) {
+  ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
+}
+
+// Do n memtable compactions, each of which produces an sstable
+// covering the range [small,large].
+void DBTestBase::MakeTables(int n, const std::string& small,
+                            const std::string& large, int cf) {
+  for (int i = 0; i < n; i++) {
+    ASSERT_OK(Put(cf, small, "begin"));
+    ASSERT_OK(Put(cf, large, "end"));
+    ASSERT_OK(Flush(cf));
+    MoveFilesToLevel(n - i - 1, cf);
+  }
+}
+
+// Prevent pushing of new sstables into deeper levels by adding
+// tables that cover a specified range to all levels.
+void DBTestBase::FillLevels(const std::string& smallest,
+                            const std::string& largest, int cf) {
+  MakeTables(db_->NumberLevels(handles_[cf]), smallest, largest, cf);
+}
+
+void DBTestBase::MoveFilesToLevel(int level, int cf) {
+  for (int l = 0; l < level; ++l) {
+    if (cf > 0) {
+      dbfull()->TEST_CompactRange(l, nullptr, nullptr, handles_[cf]);
+    } else {
+      dbfull()->TEST_CompactRange(l, nullptr, nullptr);
+    }
+  }
+}
+
+void DBTestBase::DumpFileCounts(const char* label) {
+  fprintf(stderr, "---\n%s:\n", label);
+  fprintf(stderr, "maxoverlap: %" PRIu64 "\n",
+          dbfull()->TEST_MaxNextLevelOverlappingBytes());
+  for (int level = 0; level < db_->NumberLevels(); level++) {
+    int num = NumTableFilesAtLevel(level);
+    if (num > 0) {
+      fprintf(stderr, "  level %3d : %d files\n", level, num);
+    }
+  }
+}
+
+std::string DBTestBase::DumpSSTableList() {
+  std::string property;
+  db_->GetProperty("rocksdb.sstables", &property);
+  return property;
+}
+
+void DBTestBase::GetSstFiles(std::string path,
+                             std::vector<std::string>* files) {
+  env_->GetChildren(path, files);
+
+  files->erase(
+      std::remove_if(files->begin(), files->end(), [](std::string name) {
+        uint64_t number;
+        FileType type;
+        return !(ParseFileName(name, &number, &type) && type == kTableFile);
+      }), files->end());
+}
+
+int DBTestBase::GetSstFileCount(std::string path) {
+  std::vector<std::string> files;
+  GetSstFiles(path, &files);
+  return static_cast<int>(files.size());
+}
+
+// this will generate non-overlapping files since it keeps increasing key_idx
+void DBTestBase::GenerateNewFile(int cf, Random* rnd, int* key_idx,
+                                 bool nowait) {
+  for (int i = 0; i < KNumKeysByGenerateNewFile; i++) {
+    ASSERT_OK(Put(cf, Key(*key_idx), RandomString(rnd, (i == 99) ? 1 : 990)));
+    (*key_idx)++;
+  }
+  if (!nowait) {
+    dbfull()->TEST_WaitForFlushMemTable();
+    dbfull()->TEST_WaitForCompact();
+  }
+}
+
+// this will generate non-overlapping files since it keeps increasing key_idx
+void DBTestBase::GenerateNewFile(Random* rnd, int* key_idx, bool nowait) {
+  for (int i = 0; i < KNumKeysByGenerateNewFile; i++) {
+    ASSERT_OK(Put(Key(*key_idx), RandomString(rnd, (i == 99) ? 1 : 990)));
+    (*key_idx)++;
+  }
+  if (!nowait) {
+    dbfull()->TEST_WaitForFlushMemTable();
+    dbfull()->TEST_WaitForCompact();
+  }
+}
+
+const int DBTestBase::kNumKeysByGenerateNewRandomFile = 51;
+
+void DBTestBase::GenerateNewRandomFile(Random* rnd, bool nowait) {
+  for (int i = 0; i < kNumKeysByGenerateNewRandomFile; i++) {
+    ASSERT_OK(Put("key" + RandomString(rnd, 7), RandomString(rnd, 2000)));
+  }
+  ASSERT_OK(Put("key" + RandomString(rnd, 7), RandomString(rnd, 200)));
+  if (!nowait) {
+    dbfull()->TEST_WaitForFlushMemTable();
+    dbfull()->TEST_WaitForCompact();
+  }
+}
+
+std::string DBTestBase::IterStatus(Iterator* iter) {
+  std::string result;
+  if (iter->Valid()) {
+    result = iter->key().ToString() + "->" + iter->value().ToString();
+  } else {
+    result = "(invalid)";
+  }
+  return result;
+}
+
+Options DBTestBase::OptionsForLogIterTest() {
+  Options options = CurrentOptions();
+  options.create_if_missing = true;
+  options.WAL_ttl_seconds = 1000;
+  return options;
+}
+
+std::string DBTestBase::DummyString(size_t len, char c) {
+  return std::string(len, c);
+}
+
+void DBTestBase::VerifyIterLast(std::string expected_key, int cf) {
+  Iterator* iter;
+  ReadOptions ro;
+  if (cf == 0) {
+    iter = db_->NewIterator(ro);
+  } else {
+    iter = db_->NewIterator(ro, handles_[cf]);
+  }
+  iter->SeekToLast();
+  ASSERT_EQ(IterStatus(iter), expected_key);
+  delete iter;
+}
+
+// Used to test InplaceUpdate
+
+// If previous value is nullptr or delta is > than previous value,
+//   sets newValue with delta
+// If previous value is not empty,
+//   updates previous value with 'b' string of previous value size - 1.
+UpdateStatus DBTestBase::updateInPlaceSmallerSize(char* prevValue,
+                                                  uint32_t* prevSize,
+                                                  Slice delta,
+                                                  std::string* newValue) {
+  if (prevValue == nullptr) {
+    *newValue = std::string(delta.size(), 'c');
+    return UpdateStatus::UPDATED;
+  } else {
+    *prevSize = *prevSize - 1;
+    std::string str_b = std::string(*prevSize, 'b');
+    memcpy(prevValue, str_b.c_str(), str_b.size());
+    return UpdateStatus::UPDATED_INPLACE;
+  }
+}
+
+UpdateStatus DBTestBase::updateInPlaceSmallerVarintSize(char* prevValue,
+                                                        uint32_t* prevSize,
+                                                        Slice delta,
+                                                        std::string* newValue) 
{
+  if (prevValue == nullptr) {
+    *newValue = std::string(delta.size(), 'c');
+    return UpdateStatus::UPDATED;
+  } else {
+    *prevSize = 1;
+    std::string str_b = std::string(*prevSize, 'b');
+    memcpy(prevValue, str_b.c_str(), str_b.size());
+    return UpdateStatus::UPDATED_INPLACE;
+  }
+}
+
+UpdateStatus DBTestBase::updateInPlaceLargerSize(char* prevValue,
+                                                 uint32_t* prevSize,
+                                                 Slice delta,
+                                                 std::string* newValue) {
+  *newValue = std::string(delta.size(), 'c');
+  return UpdateStatus::UPDATED;
+}
+
+UpdateStatus DBTestBase::updateInPlaceNoAction(char* prevValue,
+                                               uint32_t* prevSize, Slice delta,
+                                               std::string* newValue) {
+  return UpdateStatus::UPDATE_FAILED;
+}
+
+// Utility method to test InplaceUpdate
+void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
+  ScopedArenaIterator iter;
+  Arena arena;
+  auto options = CurrentOptions();
+  InternalKeyComparator icmp(options.comparator);
+  RangeDelAggregator range_del_agg(icmp, {} /* snapshots */);
+  if (cf != 0) {
+    iter.set(
+        dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[cf]));
+  } else {
+    iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg));
+  }
+  iter->SeekToFirst();
+  ASSERT_EQ(iter->status().ok(), true);
+  int seq = numValues;
+  while (iter->Valid()) {
+    ParsedInternalKey ikey;
+    ikey.sequence = -1;
+    ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
+
+    // checks sequence number for updates
+    ASSERT_EQ(ikey.sequence, (unsigned)seq--);
+    iter->Next();
+  }
+  ASSERT_EQ(0, seq);
+}
+
+void DBTestBase::CopyFile(const std::string& source,
+                          const std::string& destination, uint64_t size) {
+  const EnvOptions soptions;
+  unique_ptr<SequentialFile> srcfile;
+  ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
+  unique_ptr<WritableFile> destfile;
+  ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
+
+  if (size == 0) {
+    // default argument means copy everything
+    ASSERT_OK(env_->GetFileSize(source, &size));
+  }
+
+  char buffer[4096];
+  Slice slice;
+  while (size > 0) {
+    uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
+    ASSERT_OK(srcfile->Read(one, &slice, buffer));
+    ASSERT_OK(destfile->Append(slice));
+    size -= slice.size();
+  }
+  ASSERT_OK(destfile->Close());
+}
+
+std::unordered_map<std::string, uint64_t> DBTestBase::GetAllSSTFiles(
+    uint64_t* total_size) {
+  std::unordered_map<std::string, uint64_t> res;
+
+  if (total_size) {
+    *total_size = 0;
+  }
+  std::vector<std::string> files;
+  env_->GetChildren(dbname_, &files);
+  for (auto& file_name : files) {
+    uint64_t number;
+    FileType type;
+    std::string file_path = dbname_ + "/" + file_name;
+    if (ParseFileName(file_name, &number, &type) && type == kTableFile) {
+      uint64_t file_size = 0;
+      env_->GetFileSize(file_path, &file_size);
+      res[file_path] = file_size;
+      if (total_size) {
+        *total_size += file_size;
+      }
+    }
+  }
+  return res;
+}
+
+std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
+                                                      const std::string& path) 
{
+  std::vector<std::string> files;
+  std::vector<uint64_t> file_numbers;
+  env->GetChildren(path, &files);
+  uint64_t number;
+  FileType type;
+  for (size_t i = 0; i < files.size(); ++i) {
+    if (ParseFileName(files[i], &number, &type)) {
+      if (type == kTableFile) {
+        file_numbers.push_back(number);
+      }
+    }
+  }
+  return file_numbers;
+}
+
+void DBTestBase::VerifyDBFromMap(std::map<std::string, std::string> true_data,
+                                 size_t* total_reads_res, bool tailing_iter,
+                                 std::map<std::string, Status> status) {
+  size_t total_reads = 0;
+
+  for (auto& kv : true_data) {
+    Status s = status[kv.first];
+    if (s.ok()) {
+      ASSERT_EQ(Get(kv.first), kv.second);
+    } else {
+      std::string value;
+      ASSERT_EQ(s, db_->Get(ReadOptions(), kv.first, &value));
+    }
+    total_reads++;
+  }
+
+  // Normal Iterator
+  {
+    int iter_cnt = 0;
+    ReadOptions ro;
+    ro.total_order_seek = true;
+    Iterator* iter = db_->NewIterator(ro);
+    // Verify Iterator::Next()
+    iter_cnt = 0;
+    auto data_iter = true_data.begin();
+    Status s;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
+      ASSERT_EQ(iter->key().ToString(), data_iter->first);
+      Status current_status = status[data_iter->first];
+      if (!current_status.ok()) {
+        s = current_status;
+      }
+      ASSERT_EQ(iter->status(), s);
+      if (current_status.ok()) {
+        ASSERT_EQ(iter->value().ToString(), data_iter->second);
+      }
+      iter_cnt++;
+      total_reads++;
+    }
+    ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
+                                          << true_data.size();
+    delete iter;
+
+    // Verify Iterator::Prev()
+    // Use a new iterator to make sure its status is clean.
+    iter = db_->NewIterator(ro);
+    iter_cnt = 0;
+    s = Status::OK();
+    auto data_rev = true_data.rbegin();
+    for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) {
+      ASSERT_EQ(iter->key().ToString(), data_rev->first);
+      Status current_status = status[data_rev->first];
+      if (!current_status.ok()) {
+        s = current_status;
+      }
+      ASSERT_EQ(iter->status(), s);
+      if (current_status.ok()) {
+        ASSERT_EQ(iter->value().ToString(), data_rev->second);
+      }
+      iter_cnt++;
+      total_reads++;
+    }
+    ASSERT_EQ(data_rev, true_data.rend()) << iter_cnt << " / "
+                                          << true_data.size();
+
+    // Verify Iterator::Seek()
+    for (auto kv : true_data) {
+      iter->Seek(kv.first);
+      ASSERT_EQ(kv.first, iter->key().ToString());
+      ASSERT_EQ(kv.second, iter->value().ToString());
+      total_reads++;
+    }
+    delete iter;
+  }
+
+  if (tailing_iter) {
+#ifndef ROCKSDB_LITE
+    // Tailing iterator
+    int iter_cnt = 0;
+    ReadOptions ro;
+    ro.tailing = true;
+    ro.total_order_seek = true;
+    Iterator* iter = db_->NewIterator(ro);
+
+    // Verify ForwardIterator::Next()
+    iter_cnt = 0;
+    auto data_iter = true_data.begin();
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) {
+      ASSERT_EQ(iter->key().ToString(), data_iter->first);
+      ASSERT_EQ(iter->value().ToString(), data_iter->second);
+      iter_cnt++;
+      total_reads++;
+    }
+    ASSERT_EQ(data_iter, true_data.end()) << iter_cnt << " / "
+                                          << true_data.size();
+
+    // Verify ForwardIterator::Seek()
+    for (auto kv : true_data) {
+      iter->Seek(kv.first);
+      ASSERT_EQ(kv.first, iter->key().ToString());
+      ASSERT_EQ(kv.second, iter->value().ToString());
+      total_reads++;
+    }
+
+    delete iter;
+#endif  // ROCKSDB_LITE
+  }
+
+  if (total_reads_res) {
+    *total_reads_res = total_reads;
+  }
+}
+
+void DBTestBase::VerifyDBInternal(
+    std::vector<std::pair<std::string, std::string>> true_data) {
+  Arena arena;
+  InternalKeyComparator icmp(last_options_.comparator);
+  RangeDelAggregator range_del_agg(icmp, {});
+  auto iter = dbfull()->NewInternalIterator(&arena, &range_del_agg);
+  iter->SeekToFirst();
+  for (auto p : true_data) {
+    ASSERT_TRUE(iter->Valid());
+    ParsedInternalKey ikey;
+    ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
+    ASSERT_EQ(p.first, ikey.user_key);
+    ASSERT_EQ(p.second, iter->value());
+    iter->Next();
+  };
+  ASSERT_FALSE(iter->Valid());
+  iter->~InternalIterator();
+}
+
+#ifndef ROCKSDB_LITE
+
+uint64_t DBTestBase::GetNumberOfSstFilesForColumnFamily(
+    DB* db, std::string column_family_name) {
+  std::vector<LiveFileMetaData> metadata;
+  db->GetLiveFilesMetaData(&metadata);
+  uint64_t result = 0;
+  for (auto& fileMetadata : metadata) {
+    result += (fileMetadata.column_family_name == column_family_name);
+  }
+  return result;
+}
+#endif  // ROCKSDB_LITE
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_test_util.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/db_test_util.h 
b/thirdparty/rocksdb/db/db_test_util.h
new file mode 100644
index 0000000..cd1265e
--- /dev/null
+++ b/thirdparty/rocksdb/db/db_test_util.h
@@ -0,0 +1,939 @@
+// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <fcntl.h>
+#include <inttypes.h>
+
+#include <algorithm>
+#include <map>
+#include <set>
+#include <string>
+#include <thread>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "db/db_impl.h"
+#include "db/dbformat.h"
+#include "env/mock_env.h"
+#include "memtable/hash_linklist_rep.h"
+#include "rocksdb/cache.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/filter_policy.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/sst_file_writer.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/table.h"
+#include "rocksdb/utilities/checkpoint.h"
+#include "table/block_based_table_factory.h"
+#include "table/mock_table.h"
+#include "table/plain_table_factory.h"
+#include "table/scoped_arena_iterator.h"
+#include "util/compression.h"
+#include "util/filename.h"
+#include "util/mutexlock.h"
+
+#include "util/string_util.h"
+#include "util/sync_point.h"
+#include "util/testharness.h"
+#include "util/testutil.h"
+#include "utilities/merge_operators.h"
+
+namespace rocksdb {
+
+namespace anon {
+class AtomicCounter {
+ public:
+  explicit AtomicCounter(Env* env = NULL)
+      : env_(env), cond_count_(&mu_), count_(0) {}
+
+  void Increment() {
+    MutexLock l(&mu_);
+    count_++;
+    cond_count_.SignalAll();
+  }
+
+  int Read() {
+    MutexLock l(&mu_);
+    return count_;
+  }
+
+  bool WaitFor(int count) {
+    MutexLock l(&mu_);
+
+    uint64_t start = env_->NowMicros();
+    while (count_ < count) {
+      uint64_t now = env_->NowMicros();
+      cond_count_.TimedWait(now + /*1s*/ 1 * 1000 * 1000);
+      if (env_->NowMicros() - start > /*10s*/ 10 * 1000 * 1000) {
+        return false;
+      }
+      if (count_ < count) {
+        GTEST_LOG_(WARNING) << "WaitFor is taking more time than usual";
+      }
+    }
+
+    return true;
+  }
+
+  void Reset() {
+    MutexLock l(&mu_);
+    count_ = 0;
+    cond_count_.SignalAll();
+  }
+
+ private:
+  Env* env_;
+  port::Mutex mu_;
+  port::CondVar cond_count_;
+  int count_;
+};
+
+struct OptionsOverride {
+  std::shared_ptr<const FilterPolicy> filter_policy = nullptr;
+  // These will be used only if filter_policy is set
+  bool partition_filters = false;
+  uint64_t metadata_block_size = 1024;
+  BlockBasedTableOptions::IndexType index_type =
+      BlockBasedTableOptions::IndexType::kBinarySearch;
+
+  // Used as a bit mask of individual enums in which to skip an XF test point
+  int skip_policy = 0;
+};
+
+}  // namespace anon
+
+enum SkipPolicy { kSkipNone = 0, kSkipNoSnapshot = 1, kSkipNoPrefix = 2 };
+
+// A hacky skip list mem table that triggers flush after number of entries.
+class SpecialMemTableRep : public MemTableRep {
+ public:
+  explicit SpecialMemTableRep(Allocator* allocator, MemTableRep* memtable,
+                              int num_entries_flush)
+      : MemTableRep(allocator),
+        memtable_(memtable),
+        num_entries_flush_(num_entries_flush),
+        num_entries_(0) {}
+
+  virtual KeyHandle Allocate(const size_t len, char** buf) override {
+    return memtable_->Allocate(len, buf);
+  }
+
+  // Insert key into the list.
+  // REQUIRES: nothing that compares equal to key is currently in the list.
+  virtual void Insert(KeyHandle handle) override {
+    memtable_->Insert(handle);
+    num_entries_++;
+  }
+
+  // Returns true iff an entry that compares equal to key is in the list.
+  virtual bool Contains(const char* key) const override {
+    return memtable_->Contains(key);
+  }
+
+  virtual size_t ApproximateMemoryUsage() override {
+    // Return a high memory usage when number of entries exceeds the threshold
+    // to trigger a flush.
+    return (num_entries_ < num_entries_flush_) ? 0 : 1024 * 1024 * 1024;
+  }
+
+  virtual void Get(const LookupKey& k, void* callback_args,
+                   bool (*callback_func)(void* arg,
+                                         const char* entry)) override {
+    memtable_->Get(k, callback_args, callback_func);
+  }
+
+  uint64_t ApproximateNumEntries(const Slice& start_ikey,
+                                 const Slice& end_ikey) override {
+    return memtable_->ApproximateNumEntries(start_ikey, end_ikey);
+  }
+
+  virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override {
+    return memtable_->GetIterator(arena);
+  }
+
+  virtual ~SpecialMemTableRep() override {}
+
+ private:
+  unique_ptr<MemTableRep> memtable_;
+  int num_entries_flush_;
+  int num_entries_;
+};
+
+// The factory for the hacky skip list mem table that triggers flush after
+// number of entries exceeds a threshold.
+class SpecialSkipListFactory : public MemTableRepFactory {
+ public:
+  // After number of inserts exceeds `num_entries_flush` in a mem table, 
trigger
+  // flush.
+  explicit SpecialSkipListFactory(int num_entries_flush)
+      : num_entries_flush_(num_entries_flush) {}
+
+  using MemTableRepFactory::CreateMemTableRep;
+  virtual MemTableRep* CreateMemTableRep(
+      const MemTableRep::KeyComparator& compare, Allocator* allocator,
+      const SliceTransform* transform, Logger* logger) override {
+    return new SpecialMemTableRep(
+        allocator, factory_.CreateMemTableRep(compare, allocator, transform, 
0),
+        num_entries_flush_);
+  }
+  virtual const char* Name() const override { return "SkipListFactory"; }
+
+  bool IsInsertConcurrentlySupported() const override {
+    return factory_.IsInsertConcurrentlySupported();
+  }
+
+ private:
+  SkipListFactory factory_;
+  int num_entries_flush_;
+};
+
+// Special Env used to delay background operations
+class SpecialEnv : public EnvWrapper {
+ public:
+  explicit SpecialEnv(Env* base);
+
+  Status NewWritableFile(const std::string& f, unique_ptr<WritableFile>* r,
+                         const EnvOptions& soptions) override {
+    class SSTableFile : public WritableFile {
+     private:
+      SpecialEnv* env_;
+      unique_ptr<WritableFile> base_;
+
+     public:
+      SSTableFile(SpecialEnv* env, unique_ptr<WritableFile>&& base)
+          : env_(env), base_(std::move(base)) {}
+      Status Append(const Slice& data) override {
+        if (env_->table_write_callback_) {
+          (*env_->table_write_callback_)();
+        }
+        if (env_->drop_writes_.load(std::memory_order_acquire)) {
+          // Drop writes on the floor
+          return Status::OK();
+        } else if (env_->no_space_.load(std::memory_order_acquire)) {
+          return Status::NoSpace("No space left on device");
+        } else {
+          env_->bytes_written_ += data.size();
+          return base_->Append(data);
+        }
+      }
+      Status PositionedAppend(const Slice& data, uint64_t offset) override {
+        if (env_->table_write_callback_) {
+          (*env_->table_write_callback_)();
+        }
+        if (env_->drop_writes_.load(std::memory_order_acquire)) {
+          // Drop writes on the floor
+          return Status::OK();
+        } else if (env_->no_space_.load(std::memory_order_acquire)) {
+          return Status::NoSpace("No space left on device");
+        } else {
+          env_->bytes_written_ += data.size();
+          return base_->PositionedAppend(data, offset);
+        }
+      }
+      Status Truncate(uint64_t size) override { return base_->Truncate(size); }
+      Status RangeSync(uint64_t offset, uint64_t nbytes) override {
+        Status s = base_->RangeSync(offset, nbytes);
+#if !(defined NDEBUG) || !defined(OS_WIN)
+        TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::RangeSync", &s);
+#endif  // !(defined NDEBUG) || !defined(OS_WIN)
+        return s;
+      }
+      Status Close() override {
+// SyncPoint is not supported in Released Windows Mode.
+#if !(defined NDEBUG) || !defined(OS_WIN)
+        // Check preallocation size
+        // preallocation size is never passed to base file.
+        size_t preallocation_size = preallocation_block_size();
+        TEST_SYNC_POINT_CALLBACK("DBTestWritableFile.GetPreallocationStatus",
+                                 &preallocation_size);
+#endif  // !(defined NDEBUG) || !defined(OS_WIN)
+        Status s = base_->Close();
+#if !(defined NDEBUG) || !defined(OS_WIN)
+        TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Close", &s);
+#endif  // !(defined NDEBUG) || !defined(OS_WIN)
+        return s;
+      }
+      Status Flush() override { return base_->Flush(); }
+      Status Sync() override {
+        ++env_->sync_counter_;
+        while (env_->delay_sstable_sync_.load(std::memory_order_acquire)) {
+          env_->SleepForMicroseconds(100000);
+        }
+        Status s = base_->Sync();
+#if !(defined NDEBUG) || !defined(OS_WIN)
+        TEST_SYNC_POINT_CALLBACK("SpecialEnv::SStableFile::Sync", &s);
+#endif  // !(defined NDEBUG) || !defined(OS_WIN)
+        return s;
+      }
+      void SetIOPriority(Env::IOPriority pri) override {
+        base_->SetIOPriority(pri);
+      }
+      Env::IOPriority GetIOPriority() override {
+        return base_->GetIOPriority();
+      }
+      bool use_direct_io() const override {
+        return base_->use_direct_io();
+      }
+      Status Allocate(uint64_t offset, uint64_t len) override {
+        return base_->Allocate(offset, len);
+      }
+    };
+    class ManifestFile : public WritableFile {
+     public:
+      ManifestFile(SpecialEnv* env, unique_ptr<WritableFile>&& b)
+          : env_(env), base_(std::move(b)) {}
+      Status Append(const Slice& data) override {
+        if (env_->manifest_write_error_.load(std::memory_order_acquire)) {
+          return Status::IOError("simulated writer error");
+        } else {
+          return base_->Append(data);
+        }
+      }
+      Status Truncate(uint64_t size) override { return base_->Truncate(size); }
+      Status Close() override { return base_->Close(); }
+      Status Flush() override { return base_->Flush(); }
+      Status Sync() override {
+        ++env_->sync_counter_;
+        if (env_->manifest_sync_error_.load(std::memory_order_acquire)) {
+          return Status::IOError("simulated sync error");
+        } else {
+          return base_->Sync();
+        }
+      }
+      uint64_t GetFileSize() override { return base_->GetFileSize(); }
+
+     private:
+      SpecialEnv* env_;
+      unique_ptr<WritableFile> base_;
+    };
+    class WalFile : public WritableFile {
+     public:
+      WalFile(SpecialEnv* env, unique_ptr<WritableFile>&& b)
+          : env_(env), base_(std::move(b)) {
+        env_->num_open_wal_file_.fetch_add(1);
+      }
+      virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); }
+      Status Append(const Slice& data) override {
+#if !(defined NDEBUG) || !defined(OS_WIN)
+        TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
+#endif
+        Status s;
+        if (env_->log_write_error_.load(std::memory_order_acquire)) {
+          s = Status::IOError("simulated writer error");
+        } else {
+          int slowdown =
+              env_->log_write_slowdown_.load(std::memory_order_acquire);
+          if (slowdown > 0) {
+            env_->SleepForMicroseconds(slowdown);
+          }
+          s = base_->Append(data);
+        }
+#if !(defined NDEBUG) || !defined(OS_WIN)
+        TEST_SYNC_POINT("SpecialEnv::WalFile::Append:2");
+#endif
+        return s;
+      }
+      Status Truncate(uint64_t size) override { return base_->Truncate(size); }
+      Status Close() override {
+// SyncPoint is not supported in Released Windows Mode.
+#if !(defined NDEBUG) || !defined(OS_WIN)
+        // Check preallocation size
+        // preallocation size is never passed to base file.
+        size_t preallocation_size = preallocation_block_size();
+        TEST_SYNC_POINT_CALLBACK("DBTestWalFile.GetPreallocationStatus",
+                                 &preallocation_size);
+#endif  // !(defined NDEBUG) || !defined(OS_WIN)
+
+        return base_->Close();
+      }
+      Status Flush() override { return base_->Flush(); }
+      Status Sync() override {
+        ++env_->sync_counter_;
+        return base_->Sync();
+      }
+      bool IsSyncThreadSafe() const override {
+        return env_->is_wal_sync_thread_safe_.load();
+      }
+
+     private:
+      SpecialEnv* env_;
+      unique_ptr<WritableFile> base_;
+    };
+
+    if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
+      uint32_t random_number;
+      {
+        MutexLock l(&rnd_mutex_);
+        random_number = rnd_.Uniform(100);
+      }
+      if (random_number < non_writeable_rate_.load()) {
+        return Status::IOError("simulated random write error");
+      }
+    }
+
+    new_writable_count_++;
+
+    if (non_writable_count_.load() > 0) {
+      non_writable_count_--;
+      return Status::IOError("simulated write error");
+    }
+
+    EnvOptions optimized = soptions;
+    if (strstr(f.c_str(), "MANIFEST") != nullptr ||
+        strstr(f.c_str(), "log") != nullptr) {
+      optimized.use_mmap_writes = false;
+      optimized.use_direct_writes = false;
+    }
+
+    Status s = target()->NewWritableFile(f, r, optimized);
+    if (s.ok()) {
+      if (strstr(f.c_str(), ".sst") != nullptr) {
+        r->reset(new SSTableFile(this, std::move(*r)));
+      } else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
+        r->reset(new ManifestFile(this, std::move(*r)));
+      } else if (strstr(f.c_str(), "log") != nullptr) {
+        r->reset(new WalFile(this, std::move(*r)));
+      }
+    }
+    return s;
+  }
+
+  Status NewRandomAccessFile(const std::string& f,
+                             unique_ptr<RandomAccessFile>* r,
+                             const EnvOptions& soptions) override {
+    class CountingFile : public RandomAccessFile {
+     public:
+      CountingFile(unique_ptr<RandomAccessFile>&& target,
+                   anon::AtomicCounter* counter,
+                   std::atomic<size_t>* bytes_read)
+          : target_(std::move(target)),
+            counter_(counter),
+            bytes_read_(bytes_read) {}
+      virtual Status Read(uint64_t offset, size_t n, Slice* result,
+                          char* scratch) const override {
+        counter_->Increment();
+        Status s = target_->Read(offset, n, result, scratch);
+        *bytes_read_ += result->size();
+        return s;
+      }
+
+     private:
+      unique_ptr<RandomAccessFile> target_;
+      anon::AtomicCounter* counter_;
+      std::atomic<size_t>* bytes_read_;
+    };
+
+    Status s = target()->NewRandomAccessFile(f, r, soptions);
+    random_file_open_counter_++;
+    if (s.ok() && count_random_reads_) {
+      r->reset(new CountingFile(std::move(*r), &random_read_counter_,
+                                &random_read_bytes_counter_));
+    }
+    return s;
+  }
+
+  Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
+                           const EnvOptions& soptions) override {
+    class CountingFile : public SequentialFile {
+     public:
+      CountingFile(unique_ptr<SequentialFile>&& target,
+                   anon::AtomicCounter* counter)
+          : target_(std::move(target)), counter_(counter) {}
+      virtual Status Read(size_t n, Slice* result, char* scratch) override {
+        counter_->Increment();
+        return target_->Read(n, result, scratch);
+      }
+      virtual Status Skip(uint64_t n) override { return target_->Skip(n); }
+
+     private:
+      unique_ptr<SequentialFile> target_;
+      anon::AtomicCounter* counter_;
+    };
+
+    Status s = target()->NewSequentialFile(f, r, soptions);
+    if (s.ok() && count_sequential_reads_) {
+      r->reset(new CountingFile(std::move(*r), &sequential_read_counter_));
+    }
+    return s;
+  }
+
+  virtual void SleepForMicroseconds(int micros) override {
+    sleep_counter_.Increment();
+    if (no_slowdown_ || time_elapse_only_sleep_) {
+      addon_time_.fetch_add(micros);
+    }
+    if (!no_slowdown_) {
+      target()->SleepForMicroseconds(micros);
+    }
+  }
+
+  virtual Status GetCurrentTime(int64_t* unix_time) override {
+    Status s;
+    if (!time_elapse_only_sleep_) {
+      s = target()->GetCurrentTime(unix_time);
+    }
+    if (s.ok()) {
+      *unix_time += addon_time_.load();
+    }
+    return s;
+  }
+
+  virtual uint64_t NowNanos() override {
+    return (time_elapse_only_sleep_ ? 0 : target()->NowNanos()) +
+           addon_time_.load() * 1000;
+  }
+
+  virtual uint64_t NowMicros() override {
+    return (time_elapse_only_sleep_ ? 0 : target()->NowMicros()) +
+           addon_time_.load();
+  }
+
+  virtual Status DeleteFile(const std::string& fname) override {
+    delete_count_.fetch_add(1);
+    return target()->DeleteFile(fname);
+  }
+
+  Random rnd_;
+  port::Mutex rnd_mutex_;  // Lock to pretect rnd_
+
+  // sstable Sync() calls are blocked while this pointer is non-nullptr.
+  std::atomic<bool> delay_sstable_sync_;
+
+  // Drop writes on the floor while this pointer is non-nullptr.
+  std::atomic<bool> drop_writes_;
+
+  // Simulate no-space errors while this pointer is non-nullptr.
+  std::atomic<bool> no_space_;
+
+  // Simulate non-writable file system while this pointer is non-nullptr
+  std::atomic<bool> non_writable_;
+
+  // Force sync of manifest files to fail while this pointer is non-nullptr
+  std::atomic<bool> manifest_sync_error_;
+
+  // Force write to manifest files to fail while this pointer is non-nullptr
+  std::atomic<bool> manifest_write_error_;
+
+  // Force write to log files to fail while this pointer is non-nullptr
+  std::atomic<bool> log_write_error_;
+
+  // Slow down every log write, in micro-seconds.
+  std::atomic<int> log_write_slowdown_;
+
+  // Number of WAL files that are still open for write.
+  std::atomic<int> num_open_wal_file_;
+
+  bool count_random_reads_;
+  anon::AtomicCounter random_read_counter_;
+  std::atomic<size_t> random_read_bytes_counter_;
+  std::atomic<int> random_file_open_counter_;
+
+  bool count_sequential_reads_;
+  anon::AtomicCounter sequential_read_counter_;
+
+  anon::AtomicCounter sleep_counter_;
+
+  std::atomic<int64_t> bytes_written_;
+
+  std::atomic<int> sync_counter_;
+
+  std::atomic<uint32_t> non_writeable_rate_;
+
+  std::atomic<uint32_t> new_writable_count_;
+
+  std::atomic<uint32_t> non_writable_count_;
+
+  std::function<void()>* table_write_callback_;
+
+  std::atomic<int64_t> addon_time_;
+
+  std::atomic<int> delete_count_;
+
+  bool time_elapse_only_sleep_;
+
+  bool no_slowdown_;
+
+  std::atomic<bool> is_wal_sync_thread_safe_{true};
+};
+
+#ifndef ROCKSDB_LITE
+class OnFileDeletionListener : public EventListener {
+ public:
+  OnFileDeletionListener() : matched_count_(0), expected_file_name_("") {}
+
+  void SetExpectedFileName(const std::string file_name) {
+    expected_file_name_ = file_name;
+  }
+
+  void VerifyMatchedCount(size_t expected_value) {
+    ASSERT_EQ(matched_count_, expected_value);
+  }
+
+  void OnTableFileDeleted(const TableFileDeletionInfo& info) override {
+    if (expected_file_name_ != "") {
+      ASSERT_EQ(expected_file_name_, info.file_path);
+      expected_file_name_ = "";
+      matched_count_++;
+    }
+  }
+
+ private:
+  size_t matched_count_;
+  std::string expected_file_name_;
+};
+#endif
+
+// A test merge operator mimics put but also fails if one of merge operands is
+// "corrupted".
+class TestPutOperator : public MergeOperator {
+ public:
+  virtual bool FullMergeV2(const MergeOperationInput& merge_in,
+                           MergeOperationOutput* merge_out) const override {
+    if (merge_in.existing_value != nullptr &&
+        *(merge_in.existing_value) == "corrupted") {
+      return false;
+    }
+    for (auto value : merge_in.operand_list) {
+      if (value == "corrupted") {
+        return false;
+      }
+    }
+    merge_out->existing_operand = merge_in.operand_list.back();
+    return true;
+  }
+
+  virtual const char* Name() const override { return "TestPutOperator"; }
+};
+
+class DBTestBase : public testing::Test {
+ public:
+  // Sequence of option configurations to try
+  enum OptionConfig : int {
+    kDefault = 0,
+    kBlockBasedTableWithPrefixHashIndex = 1,
+    kBlockBasedTableWithWholeKeyHashIndex = 2,
+    kPlainTableFirstBytePrefix = 3,
+    kPlainTableCappedPrefix = 4,
+    kPlainTableCappedPrefixNonMmap = 5,
+    kPlainTableAllBytesPrefix = 6,
+    kVectorRep = 7,
+    kHashLinkList = 8,
+    kHashCuckoo = 9,
+    kMergePut = 10,
+    kFilter = 11,
+    kFullFilterWithNewTableReaderForCompactions = 12,
+    kUncompressed = 13,
+    kNumLevel_3 = 14,
+    kDBLogDir = 15,
+    kWalDirAndMmapReads = 16,
+    kManifestFileSize = 17,
+    kPerfOptions = 18,
+    kHashSkipList = 19,
+    kUniversalCompaction = 20,
+    kUniversalCompactionMultiLevel = 21,
+    kCompressedBlockCache = 22,
+    kInfiniteMaxOpenFiles = 23,
+    kxxHashChecksum = 24,
+    kFIFOCompaction = 25,
+    kOptimizeFiltersForHits = 26,
+    kRowCache = 27,
+    kRecycleLogFiles = 28,
+    kConcurrentSkipList = 29,
+    kPipelinedWrite = 30,
+    kConcurrentWALWrites = 31,
+    kEnd = 32,
+    kDirectIO = 33,
+    kLevelSubcompactions = 34,
+    kUniversalSubcompactions = 35,
+    kBlockBasedTableWithIndexRestartInterval = 36,
+    kBlockBasedTableWithPartitionedIndex = 37,
+    kPartitionedFilterWithNewTableReaderForCompactions = 38,
+  };
+
+ public:
+  std::string dbname_;
+  std::string alternative_wal_dir_;
+  std::string alternative_db_log_dir_;
+  MockEnv* mem_env_;
+  Env* encrypted_env_;
+  SpecialEnv* env_;
+  DB* db_;
+  std::vector<ColumnFamilyHandle*> handles_;
+
+  int option_config_;
+  Options last_options_;
+
+  // Skip some options, as they may not be applicable to a specific test.
+  // To add more skip constants, use values 4, 8, 16, etc.
+  enum OptionSkip {
+    kNoSkip = 0,
+    kSkipDeletesFilterFirst = 1,
+    kSkipUniversalCompaction = 2,
+    kSkipMergePut = 4,
+    kSkipPlainTable = 8,
+    kSkipHashIndex = 16,
+    kSkipNoSeekToLast = 32,
+    kSkipHashCuckoo = 64,
+    kSkipFIFOCompaction = 128,
+    kSkipMmapReads = 256,
+  };
+
+  explicit DBTestBase(const std::string path);
+
+  ~DBTestBase();
+
+  static std::string RandomString(Random* rnd, int len) {
+    std::string r;
+    test::RandomString(rnd, len, &r);
+    return r;
+  }
+
+  static std::string Key(int i) {
+    char buf[100];
+    snprintf(buf, sizeof(buf), "key%06d", i);
+    return std::string(buf);
+  }
+
+  static bool ShouldSkipOptions(int option_config, int skip_mask = kNoSkip);
+
+  // Switch to a fresh database with the next option configuration to
+  // test.  Return false if there are no more configurations to test.
+  bool ChangeOptions(int skip_mask = kNoSkip);
+
+  // Switch between different compaction styles.
+  bool ChangeCompactOptions();
+
+  // Switch between different WAL-realted options.
+  bool ChangeWalOptions();
+
+  // Switch between different filter policy
+  // Jump from kDefault to kFilter to kFullFilter
+  bool ChangeFilterOptions();
+
+  // Return the current option configuration.
+  Options CurrentOptions(const anon::OptionsOverride& options_override =
+                             anon::OptionsOverride()) const;
+
+  Options CurrentOptions(const Options& default_options,
+                         const anon::OptionsOverride& options_override =
+                             anon::OptionsOverride()) const;
+
+  static Options GetDefaultOptions();
+
+  Options GetOptions(int option_config,
+                     const Options& default_options = GetDefaultOptions(),
+                     const anon::OptionsOverride& options_override =
+                         anon::OptionsOverride()) const;
+
+  DBImpl* dbfull() { return reinterpret_cast<DBImpl*>(db_); }
+
+  void CreateColumnFamilies(const std::vector<std::string>& cfs,
+                            const Options& options);
+
+  void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
+                             const Options& options);
+
+  void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+                                const std::vector<Options>& options);
+
+  void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+                                const Options& options);
+
+  Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+                                     const std::vector<Options>& options);
+
+  Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
+                                     const Options& options);
+
+  void Reopen(const Options& options);
+
+  void Close();
+
+  void DestroyAndReopen(const Options& options);
+
+  void Destroy(const Options& options);
+
+  Status ReadOnlyReopen(const Options& options);
+
+  Status TryReopen(const Options& options);
+
+  bool IsDirectIOSupported();
+
+  bool IsMemoryMappedAccessSupported() const;
+
+  Status Flush(int cf = 0);
+
+  Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions());
+
+  Status Put(int cf, const Slice& k, const Slice& v,
+             WriteOptions wo = WriteOptions());
+
+  Status Merge(const Slice& k, const Slice& v,
+               WriteOptions wo = WriteOptions());
+
+  Status Merge(int cf, const Slice& k, const Slice& v,
+               WriteOptions wo = WriteOptions());
+
+  Status Delete(const std::string& k);
+
+  Status Delete(int cf, const std::string& k);
+
+  Status SingleDelete(const std::string& k);
+
+  Status SingleDelete(int cf, const std::string& k);
+
+  std::string Get(const std::string& k, const Snapshot* snapshot = nullptr);
+
+  std::string Get(int cf, const std::string& k,
+                  const Snapshot* snapshot = nullptr);
+
+  Status Get(const std::string& k, PinnableSlice* v);
+
+  uint64_t GetNumSnapshots();
+
+  uint64_t GetTimeOldestSnapshots();
+
+  // Return a string that contains all key,value pairs in order,
+  // formatted like "(k1->v1)(k2->v2)".
+  std::string Contents(int cf = 0);
+
+  std::string AllEntriesFor(const Slice& user_key, int cf = 0);
+
+#ifndef ROCKSDB_LITE
+  int NumSortedRuns(int cf = 0);
+
+  uint64_t TotalSize(int cf = 0);
+
+  uint64_t SizeAtLevel(int level);
+
+  size_t TotalLiveFiles(int cf = 0);
+
+  size_t CountLiveFiles();
+#endif  // ROCKSDB_LITE
+
+  int NumTableFilesAtLevel(int level, int cf = 0);
+
+  double CompressionRatioAtLevel(int level, int cf = 0);
+
+  int TotalTableFiles(int cf = 0, int levels = -1);
+
+  // Return spread of files per level
+  std::string FilesPerLevel(int cf = 0);
+
+  size_t CountFiles();
+
+  uint64_t Size(const Slice& start, const Slice& limit, int cf = 0);
+
+  void Compact(int cf, const Slice& start, const Slice& limit,
+               uint32_t target_path_id);
+
+  void Compact(int cf, const Slice& start, const Slice& limit);
+
+  void Compact(const Slice& start, const Slice& limit);
+
+  // Do n memtable compactions, each of which produces an sstable
+  // covering the range [small,large].
+  void MakeTables(int n, const std::string& small, const std::string& large,
+                  int cf = 0);
+
+  // Prevent pushing of new sstables into deeper levels by adding
+  // tables that cover a specified range to all levels.
+  void FillLevels(const std::string& smallest, const std::string& largest,
+                  int cf);
+
+  void MoveFilesToLevel(int level, int cf = 0);
+
+  void DumpFileCounts(const char* label);
+
+  std::string DumpSSTableList();
+
+  void GetSstFiles(std::string path, std::vector<std::string>* files);
+
+  int GetSstFileCount(std::string path);
+
+  // this will generate non-overlapping files since it keeps increasing key_idx
+  void GenerateNewFile(Random* rnd, int* key_idx, bool nowait = false);
+
+  void GenerateNewFile(int fd, Random* rnd, int* key_idx, bool nowait = false);
+
+  static const int kNumKeysByGenerateNewRandomFile;
+  static const int KNumKeysByGenerateNewFile = 100;
+
+  void GenerateNewRandomFile(Random* rnd, bool nowait = false);
+
+  std::string IterStatus(Iterator* iter);
+
+  Options OptionsForLogIterTest();
+
+  std::string DummyString(size_t len, char c = 'a');
+
+  void VerifyIterLast(std::string expected_key, int cf = 0);
+
+  // Used to test InplaceUpdate
+
+  // If previous value is nullptr or delta is > than previous value,
+  //   sets newValue with delta
+  // If previous value is not empty,
+  //   updates previous value with 'b' string of previous value size - 1.
+  static UpdateStatus updateInPlaceSmallerSize(char* prevValue,
+                                               uint32_t* prevSize, Slice delta,
+                                               std::string* newValue);
+
+  static UpdateStatus updateInPlaceSmallerVarintSize(char* prevValue,
+                                                     uint32_t* prevSize,
+                                                     Slice delta,
+                                                     std::string* newValue);
+
+  static UpdateStatus updateInPlaceLargerSize(char* prevValue,
+                                              uint32_t* prevSize, Slice delta,
+                                              std::string* newValue);
+
+  static UpdateStatus updateInPlaceNoAction(char* prevValue, uint32_t* 
prevSize,
+                                            Slice delta, std::string* 
newValue);
+
+  // Utility method to test InplaceUpdate
+  void validateNumberOfEntries(int numValues, int cf = 0);
+
+  void CopyFile(const std::string& source, const std::string& destination,
+                uint64_t size = 0);
+
+  std::unordered_map<std::string, uint64_t> GetAllSSTFiles(
+      uint64_t* total_size = nullptr);
+
+  std::vector<std::uint64_t> ListTableFiles(Env* env, const std::string& path);
+
+  void VerifyDBFromMap(
+      std::map<std::string, std::string> true_data,
+      size_t* total_reads_res = nullptr, bool tailing_iter = false,
+      std::map<std::string, Status> status = std::map<std::string, Status>());
+
+  void VerifyDBInternal(
+      std::vector<std::pair<std::string, std::string>> true_data);
+
+#ifndef ROCKSDB_LITE
+  uint64_t GetNumberOfSstFilesForColumnFamily(DB* db,
+                                              std::string column_family_name);
+#endif  // ROCKSDB_LITE
+
+  uint64_t TestGetTickerCount(const Options& options, Tickers ticker_type) {
+    return options.statistics->getTickerCount(ticker_type);
+  }
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/dbformat.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/dbformat.cc 
b/thirdparty/rocksdb/db/dbformat.cc
new file mode 100644
index 0000000..20c5449
--- /dev/null
+++ b/thirdparty/rocksdb/db/dbformat.cc
@@ -0,0 +1,177 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "db/dbformat.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <stdio.h>
+#include "monitoring/perf_context_imp.h"
+#include "port/port.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace rocksdb {
+
+// kValueTypeForSeek defines the ValueType that should be passed when
+// constructing a ParsedInternalKey object for seeking to a particular
+// sequence number (since we sort sequence numbers in decreasing order
+// and the value type is embedded as the low 8 bits in the sequence
+// number in internal keys, we need to use the highest-numbered
+// ValueType, not the lowest).
+const ValueType kValueTypeForSeek = kTypeSingleDeletion;
+const ValueType kValueTypeForSeekForPrev = kTypeDeletion;
+
+uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
+  assert(seq <= kMaxSequenceNumber);
+  assert(IsExtendedValueType(t));
+  return (seq << 8) | t;
+}
+
+void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) {
+  *seq = packed >> 8;
+  *t = static_cast<ValueType>(packed & 0xff);
+
+  assert(*seq <= kMaxSequenceNumber);
+  assert(IsExtendedValueType(*t));
+}
+
+void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
+  result->append(key.user_key.data(), key.user_key.size());
+  PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
+}
+
+void AppendInternalKeyFooter(std::string* result, SequenceNumber s,
+                             ValueType t) {
+  PutFixed64(result, PackSequenceAndType(s, t));
+}
+
+std::string ParsedInternalKey::DebugString(bool hex) const {
+  char buf[50];
+  snprintf(buf, sizeof(buf), "' seq:%" PRIu64 ", type:%d", sequence,
+           static_cast<int>(type));
+  std::string result = "'";
+  result += user_key.ToString(hex);
+  result += buf;
+  return result;
+}
+
+std::string InternalKey::DebugString(bool hex) const {
+  std::string result;
+  ParsedInternalKey parsed;
+  if (ParseInternalKey(rep_, &parsed)) {
+    result = parsed.DebugString(hex);
+  } else {
+    result = "(bad)";
+    result.append(EscapeString(rep_));
+  }
+  return result;
+}
+
+const char* InternalKeyComparator::Name() const {
+  return name_.c_str();
+}
+
+int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const 
{
+  // Order by:
+  //    increasing user key (according to user-supplied comparator)
+  //    decreasing sequence number
+  //    decreasing type (though sequence# should be enough to disambiguate)
+  int r = user_comparator_->Compare(ExtractUserKey(akey), 
ExtractUserKey(bkey));
+  PERF_COUNTER_ADD(user_key_comparison_count, 1);
+  if (r == 0) {
+    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
+    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
+    if (anum > bnum) {
+      r = -1;
+    } else if (anum < bnum) {
+      r = +1;
+    }
+  }
+  return r;
+}
+
+int InternalKeyComparator::Compare(const ParsedInternalKey& a,
+                                   const ParsedInternalKey& b) const {
+  // Order by:
+  //    increasing user key (according to user-supplied comparator)
+  //    decreasing sequence number
+  //    decreasing type (though sequence# should be enough to disambiguate)
+  int r = user_comparator_->Compare(a.user_key, b.user_key);
+  PERF_COUNTER_ADD(user_key_comparison_count, 1);
+  if (r == 0) {
+    if (a.sequence > b.sequence) {
+      r = -1;
+    } else if (a.sequence < b.sequence) {
+      r = +1;
+    } else if (a.type > b.type) {
+      r = -1;
+    } else if (a.type < b.type) {
+      r = +1;
+    }
+  }
+  return r;
+}
+
+void InternalKeyComparator::FindShortestSeparator(
+      std::string* start,
+      const Slice& limit) const {
+  // Attempt to shorten the user portion of the key
+  Slice user_start = ExtractUserKey(*start);
+  Slice user_limit = ExtractUserKey(limit);
+  std::string tmp(user_start.data(), user_start.size());
+  user_comparator_->FindShortestSeparator(&tmp, user_limit);
+  if (tmp.size() <= user_start.size() &&
+      user_comparator_->Compare(user_start, tmp) < 0) {
+    // User key has become shorter physically, but larger logically.
+    // Tack on the earliest possible number to the shortened user key.
+    PutFixed64(&tmp, 
PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
+    assert(this->Compare(*start, tmp) < 0);
+    assert(this->Compare(tmp, limit) < 0);
+    start->swap(tmp);
+  }
+}
+
+void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
+  Slice user_key = ExtractUserKey(*key);
+  std::string tmp(user_key.data(), user_key.size());
+  user_comparator_->FindShortSuccessor(&tmp);
+  if (tmp.size() <= user_key.size() &&
+      user_comparator_->Compare(user_key, tmp) < 0) {
+    // User key has become shorter physically, but larger logically.
+    // Tack on the earliest possible number to the shortened user key.
+    PutFixed64(&tmp, 
PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
+    assert(this->Compare(*key, tmp) < 0);
+    key->swap(tmp);
+  }
+}
+
+LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
+  size_t usize = _user_key.size();
+  size_t needed = usize + 13;  // A conservative estimate
+  char* dst;
+  if (needed <= sizeof(space_)) {
+    dst = space_;
+  } else {
+    dst = new char[needed];
+  }
+  start_ = dst;
+  // NOTE: We don't support users keys of more than 2GB :)
+  dst = EncodeVarint32(dst, static_cast<uint32_t>(usize + 8));
+  kstart_ = dst;
+  memcpy(dst, _user_key.data(), usize);
+  dst += usize;
+  EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
+  dst += 8;
+  end_ = dst;
+}
+
+}  // namespace rocksdb

Reply via email to