http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/flush_job.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/flush_job.cc 
b/thirdparty/rocksdb/db/flush_job.cc
new file mode 100644
index 0000000..846edb4
--- /dev/null
+++ b/thirdparty/rocksdb/db/flush_job.cc
@@ -0,0 +1,359 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/flush_job.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+
+#include <algorithm>
+#include <vector>
+
+#include "db/builder.h"
+#include "db/db_iter.h"
+#include "db/dbformat.h"
+#include "db/event_helpers.h"
+#include "db/log_reader.h"
+#include "db/log_writer.h"
+#include "db/memtable_list.h"
+#include "db/merge_context.h"
+#include "db/version_set.h"
+#include "monitoring/iostats_context_imp.h"
+#include "monitoring/perf_context_imp.h"
+#include "monitoring/thread_status_util.h"
+#include "port/likely.h"
+#include "port/port.h"
+#include "db/memtable.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/status.h"
+#include "rocksdb/table.h"
+#include "table/block.h"
+#include "table/block_based_table_factory.h"
+#include "table/merging_iterator.h"
+#include "table/table_builder.h"
+#include "table/two_level_iterator.h"
+#include "util/coding.h"
+#include "util/event_logger.h"
+#include "util/file_util.h"
+#include "util/filename.h"
+#include "util/log_buffer.h"
+#include "util/logging.h"
+#include "util/mutexlock.h"
+#include "util/stop_watch.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
+                   const ImmutableDBOptions& db_options,
+                   const MutableCFOptions& mutable_cf_options,
+                   const EnvOptions& env_options, VersionSet* versions,
+                   InstrumentedMutex* db_mutex,
+                   std::atomic<bool>* shutting_down,
+                   std::vector<SequenceNumber> existing_snapshots,
+                   SequenceNumber earliest_write_conflict_snapshot,
+                   JobContext* job_context, LogBuffer* log_buffer,
+                   Directory* db_directory, Directory* output_file_directory,
+                   CompressionType output_compression, Statistics* stats,
+                   EventLogger* event_logger, bool measure_io_stats)
+    : dbname_(dbname),
+      cfd_(cfd),
+      db_options_(db_options),
+      mutable_cf_options_(mutable_cf_options),
+      env_options_(env_options),
+      versions_(versions),
+      db_mutex_(db_mutex),
+      shutting_down_(shutting_down),
+      existing_snapshots_(std::move(existing_snapshots)),
+      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
+      job_context_(job_context),
+      log_buffer_(log_buffer),
+      db_directory_(db_directory),
+      output_file_directory_(output_file_directory),
+      output_compression_(output_compression),
+      stats_(stats),
+      event_logger_(event_logger),
+      measure_io_stats_(measure_io_stats),
+      pick_memtable_called(false) {
+  // Update the thread status to indicate flush.
+  ReportStartedFlush();
+  TEST_SYNC_POINT("FlushJob::FlushJob()");
+}
+
+FlushJob::~FlushJob() {
+  ThreadStatusUtil::ResetThreadStatus();
+}
+
+void FlushJob::ReportStartedFlush() {
+  ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
+                                    db_options_.enable_thread_tracking);
+  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
+  ThreadStatusUtil::SetThreadOperationProperty(
+      ThreadStatus::COMPACTION_JOB_ID,
+      job_context_->job_id);
+  IOSTATS_RESET(bytes_written);
+}
+
+void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
+  uint64_t input_size = 0;
+  for (auto* mem : mems) {
+    input_size += mem->ApproximateMemoryUsage();
+  }
+  ThreadStatusUtil::IncreaseThreadOperationProperty(
+      ThreadStatus::FLUSH_BYTES_MEMTABLES,
+      input_size);
+}
+
+void FlushJob::RecordFlushIOStats() {
+  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
+  ThreadStatusUtil::IncreaseThreadOperationProperty(
+      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
+  IOSTATS_RESET(bytes_written);
+}
+
+void FlushJob::PickMemTable() {
+  db_mutex_->AssertHeld();
+  assert(!pick_memtable_called);
+  pick_memtable_called = true;
+  // Save the contents of the earliest memtable as a new Table
+  cfd_->imm()->PickMemtablesToFlush(&mems_);
+  if (mems_.empty()) {
+    return;
+  }
+
+  ReportFlushInputSize(mems_);
+
+  // entries mems are (implicitly) sorted in ascending order by their created
+  // time. We will use the first memtable's `edit` to keep the meta info for
+  // this flush.
+  MemTable* m = mems_[0];
+  edit_ = m->GetEdits();
+  edit_->SetPrevLogNumber(0);
+  // SetLogNumber(log_num) indicates logs with number smaller than log_num
+  // will no longer be picked up for recovery.
+  edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
+  edit_->SetColumnFamily(cfd_->GetID());
+
+  // path 0 for level 0 file.
+  meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
+
+  base_ = cfd_->current();
+  base_->Ref();  // it is likely that we do not need this reference
+}
+
+Status FlushJob::Run(FileMetaData* file_meta) {
+  db_mutex_->AssertHeld();
+  assert(pick_memtable_called);
+  AutoThreadOperationStageUpdater stage_run(
+      ThreadStatus::STAGE_FLUSH_RUN);
+  if (mems_.empty()) {
+    ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
+                     cfd_->GetName().c_str());
+    return Status::OK();
+  }
+
+  // I/O measurement variables
+  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
+  uint64_t prev_write_nanos = 0;
+  uint64_t prev_fsync_nanos = 0;
+  uint64_t prev_range_sync_nanos = 0;
+  uint64_t prev_prepare_write_nanos = 0;
+  if (measure_io_stats_) {
+    prev_perf_level = GetPerfLevel();
+    SetPerfLevel(PerfLevel::kEnableTime);
+    prev_write_nanos = IOSTATS(write_nanos);
+    prev_fsync_nanos = IOSTATS(fsync_nanos);
+    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
+    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
+  }
+
+  // This will release and re-acquire the mutex.
+  Status s = WriteLevel0Table();
+
+  if (s.ok() &&
+      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
+    s = Status::ShutdownInProgress(
+        "Database shutdown or Column family drop during flush");
+  }
+
+  if (!s.ok()) {
+    cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
+  } else {
+    TEST_SYNC_POINT("FlushJob::InstallResults");
+    // Replace immutable memtable with the generated Table
+    s = cfd_->imm()->InstallMemtableFlushResults(
+        cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
+        meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
+        log_buffer_);
+  }
+
+  if (s.ok() && file_meta != nullptr) {
+    *file_meta = meta_;
+  }
+  RecordFlushIOStats();
+
+  auto stream = event_logger_->LogToBuffer(log_buffer_);
+  stream << "job" << job_context_->job_id << "event"
+         << "flush_finished";
+  stream << "lsm_state";
+  stream.StartArray();
+  auto vstorage = cfd_->current()->storage_info();
+  for (int level = 0; level < vstorage->num_levels(); ++level) {
+    stream << vstorage->NumLevelFiles(level);
+  }
+  stream.EndArray();
+  stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
+
+  if (measure_io_stats_) {
+    if (prev_perf_level != PerfLevel::kEnableTime) {
+      SetPerfLevel(prev_perf_level);
+    }
+    stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
+    stream << "file_range_sync_nanos"
+           << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
+    stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
+    stream << "file_prepare_write_nanos"
+           << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
+  }
+
+  return s;
+}
+
+void FlushJob::Cancel() {
+  db_mutex_->AssertHeld();
+  assert(base_ != nullptr);
+  base_->Unref();
+}
+
+Status FlushJob::WriteLevel0Table() {
+  AutoThreadOperationStageUpdater stage_updater(
+      ThreadStatus::STAGE_FLUSH_WRITE_L0);
+  db_mutex_->AssertHeld();
+  const uint64_t start_micros = db_options_.env->NowMicros();
+  Status s;
+  {
+    db_mutex_->Unlock();
+    if (log_buffer_) {
+      log_buffer_->FlushBufferToLog();
+    }
+    // memtables and range_del_iters store internal iterators over each data
+    // memtable and its associated range deletion memtable, respectively, at
+    // corresponding indexes.
+    std::vector<InternalIterator*> memtables;
+    std::vector<InternalIterator*> range_del_iters;
+    ReadOptions ro;
+    ro.total_order_seek = true;
+    Arena arena;
+    uint64_t total_num_entries = 0, total_num_deletes = 0;
+    size_t total_memory_usage = 0;
+    for (MemTable* m : mems_) {
+      ROCKS_LOG_INFO(
+          db_options_.info_log,
+          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
+          cfd_->GetName().c_str(), job_context_->job_id, 
m->GetNextLogNumber());
+      memtables.push_back(m->NewIterator(ro, &arena));
+      auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
+      if (range_del_iter != nullptr) {
+        range_del_iters.push_back(range_del_iter);
+      }
+      total_num_entries += m->num_entries();
+      total_num_deletes += m->num_deletes();
+      total_memory_usage += m->ApproximateMemoryUsage();
+    }
+
+    event_logger_->Log() << "job" << job_context_->job_id << "event"
+                         << "flush_started"
+                         << "num_memtables" << mems_.size() << "num_entries"
+                         << total_num_entries << "num_deletes"
+                         << total_num_deletes << "memory_usage"
+                         << total_memory_usage;
+
+    {
+      ScopedArenaIterator iter(
+          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
+                             static_cast<int>(memtables.size()), &arena));
+      std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
+          &cfd_->internal_comparator(),
+          range_del_iters.empty() ? nullptr : &range_del_iters[0],
+          static_cast<int>(range_del_iters.size())));
+      ROCKS_LOG_INFO(db_options_.info_log,
+                     "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
+                     cfd_->GetName().c_str(), job_context_->job_id,
+                     meta_.fd.GetNumber());
+
+      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
+                               &output_compression_);
+      EnvOptions optimized_env_options =
+          db_options_.env->OptimizeForCompactionTableWrite(env_options_, 
db_options_);
+
+      int64_t _current_time = 0;
+      db_options_.env->GetCurrentTime(&_current_time);  // ignore error
+      const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+      s = BuildTable(
+          dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
+          optimized_env_options, cfd_->table_cache(), iter.get(),
+          std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
+          cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
+          cfd_->GetName(), existing_snapshots_,
+          earliest_write_conflict_snapshot_, output_compression_,
+          cfd_->ioptions()->compression_opts,
+          mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
+          TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
+          Env::IO_HIGH, &table_properties_, 0 /* level */, current_time);
+      LogFlush(db_options_.info_log);
+    }
+    ROCKS_LOG_INFO(db_options_.info_log,
+                   "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
+                   " bytes %s"
+                   "%s",
+                   cfd_->GetName().c_str(), job_context_->job_id,
+                   meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
+                   s.ToString().c_str(),
+                   meta_.marked_for_compaction ? " (needs compaction)" : "");
+
+    if (output_file_directory_ != nullptr) {
+      output_file_directory_->Fsync();
+    }
+    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
+    db_mutex_->Lock();
+  }
+  base_->Unref();
+
+  // Note that if file_size is zero, the file has been deleted and
+  // should not be added to the manifest.
+  if (s.ok() && meta_.fd.GetFileSize() > 0) {
+    // if we have more than 1 background thread, then we cannot
+    // insert files directly into higher levels because some other
+    // threads could be concurrently producing compacted files for
+    // that key range.
+    // Add file to L0
+    edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
+                   meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
+                   meta_.smallest_seqno, meta_.largest_seqno,
+                   meta_.marked_for_compaction);
+  }
+
+  // Note that here we treat flush as level 0 compaction in internal stats
+  InternalStats::CompactionStats stats(1);
+  stats.micros = db_options_.env->NowMicros() - start_micros;
+  stats.bytes_written = meta_.fd.GetFileSize();
+  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
+  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
+                                     meta_.fd.GetFileSize());
+  RecordFlushIOStats();
+  return s;
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/flush_job.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/flush_job.h 
b/thirdparty/rocksdb/db/flush_job.h
new file mode 100644
index 0000000..4698ae7
--- /dev/null
+++ b/thirdparty/rocksdb/db/flush_job.h
@@ -0,0 +1,110 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#pragma once
+
+#include <atomic>
+#include <deque>
+#include <limits>
+#include <set>
+#include <utility>
+#include <vector>
+#include <string>
+
+#include "db/column_family.h"
+#include "db/dbformat.h"
+#include "db/flush_scheduler.h"
+#include "db/internal_stats.h"
+#include "db/job_context.h"
+#include "db/log_writer.h"
+#include "db/memtable_list.h"
+#include "db/snapshot_impl.h"
+#include "db/version_edit.h"
+#include "db/write_controller.h"
+#include "db/write_thread.h"
+#include "monitoring/instrumented_mutex.h"
+#include "options/db_options.h"
+#include "port/port.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/memtablerep.h"
+#include "rocksdb/transaction_log.h"
+#include "table/scoped_arena_iterator.h"
+#include "util/autovector.h"
+#include "util/event_logger.h"
+#include "util/stop_watch.h"
+#include "util/thread_local.h"
+
+namespace rocksdb {
+
+class MemTable;
+class TableCache;
+class Version;
+class VersionEdit;
+class VersionSet;
+class Arena;
+
+class FlushJob {
+ public:
+  // TODO(icanadi) make effort to reduce number of parameters here
+  // IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive
+  FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
+           const ImmutableDBOptions& db_options,
+           const MutableCFOptions& mutable_cf_options,
+           const EnvOptions& env_options, VersionSet* versions,
+           InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
+           std::vector<SequenceNumber> existing_snapshots,
+           SequenceNumber earliest_write_conflict_snapshot,
+           JobContext* job_context, LogBuffer* log_buffer,
+           Directory* db_directory, Directory* output_file_directory,
+           CompressionType output_compression, Statistics* stats,
+           EventLogger* event_logger, bool measure_io_stats);
+
+  ~FlushJob();
+
+  // Require db_mutex held.
+  // Once PickMemTable() is called, either Run() or Cancel() has to be called.
+  void PickMemTable();
+  Status Run(FileMetaData* file_meta = nullptr);
+  void Cancel();
+  TableProperties GetTableProperties() const { return table_properties_; }
+
+ private:
+  void ReportStartedFlush();
+  void ReportFlushInputSize(const autovector<MemTable*>& mems);
+  void RecordFlushIOStats();
+  Status WriteLevel0Table();
+  const std::string& dbname_;
+  ColumnFamilyData* cfd_;
+  const ImmutableDBOptions& db_options_;
+  const MutableCFOptions& mutable_cf_options_;
+  const EnvOptions& env_options_;
+  VersionSet* versions_;
+  InstrumentedMutex* db_mutex_;
+  std::atomic<bool>* shutting_down_;
+  std::vector<SequenceNumber> existing_snapshots_;
+  SequenceNumber earliest_write_conflict_snapshot_;
+  JobContext* job_context_;
+  LogBuffer* log_buffer_;
+  Directory* db_directory_;
+  Directory* output_file_directory_;
+  CompressionType output_compression_;
+  Statistics* stats_;
+  EventLogger* event_logger_;
+  TableProperties table_properties_;
+  bool measure_io_stats_;
+
+  // Variables below are set by PickMemTable():
+  FileMetaData meta_;
+  autovector<MemTable*> mems_;
+  VersionEdit* edit_;
+  Version* base_;
+  bool pick_memtable_called;
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/flush_scheduler.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/flush_scheduler.cc 
b/thirdparty/rocksdb/db/flush_scheduler.cc
new file mode 100644
index 0000000..8735a6b
--- /dev/null
+++ b/thirdparty/rocksdb/db/flush_scheduler.cc
@@ -0,0 +1,88 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "db/flush_scheduler.h"
+
+#include <cassert>
+
+#include "db/column_family.h"
+
+namespace rocksdb {
+
+void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) {
+#ifndef NDEBUG
+  std::lock_guard<std::mutex> lock(checking_mutex_);
+  assert(checking_set_.count(cfd) == 0);
+  checking_set_.insert(cfd);
+#endif  // NDEBUG
+  cfd->Ref();
+// Suppress false positive clang analyzer warnings.
+#ifndef __clang_analyzer__
+  Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)};
+  while (!head_.compare_exchange_strong(
+      node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) 
{
+    // failing CAS updates the first param, so we are already set for
+    // retry.  TakeNextColumnFamily won't happen until after another
+    // inter-thread synchronization, so we don't even need release
+    // semantics for this CAS
+  }
+#endif  // __clang_analyzer__
+}
+
+ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() {
+#ifndef NDEBUG
+  std::lock_guard<std::mutex> lock(checking_mutex_);
+#endif  // NDEBUG
+  while (true) {
+    if (head_.load(std::memory_order_relaxed) == nullptr) {
+      return nullptr;
+    }
+
+    // dequeue the head
+    Node* node = head_.load(std::memory_order_relaxed);
+    head_.store(node->next, std::memory_order_relaxed);
+    ColumnFamilyData* cfd = node->column_family;
+    delete node;
+
+#ifndef NDEBUG
+    auto iter = checking_set_.find(cfd);
+    assert(iter != checking_set_.end());
+    checking_set_.erase(iter);
+#endif  // NDEBUG
+
+    if (!cfd->IsDropped()) {
+      // success
+      return cfd;
+    }
+
+    // no longer relevant, retry
+    if (cfd->Unref()) {
+      delete cfd;
+    }
+  }
+}
+
+bool FlushScheduler::Empty() {
+#ifndef NDEBUG
+  std::lock_guard<std::mutex> lock(checking_mutex_);
+#endif  // NDEBUG
+  auto rv = head_.load(std::memory_order_relaxed) == nullptr;
+#ifndef NDEBUG
+  assert(rv == checking_set_.empty());
+#endif  // NDEBUG
+  return rv;
+}
+
+void FlushScheduler::Clear() {
+  ColumnFamilyData* cfd;
+  while ((cfd = TakeNextColumnFamily()) != nullptr) {
+    if (cfd->Unref()) {
+      delete cfd;
+    }
+  }
+  assert(head_.load(std::memory_order_relaxed) == nullptr);
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/flush_scheduler.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/flush_scheduler.h 
b/thirdparty/rocksdb/db/flush_scheduler.h
new file mode 100644
index 0000000..cd35758
--- /dev/null
+++ b/thirdparty/rocksdb/db/flush_scheduler.h
@@ -0,0 +1,48 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <stdint.h>
+#include <atomic>
+#include <mutex>
+#include <set>
+
+namespace rocksdb {
+
+class ColumnFamilyData;
+
+// Unless otherwise noted, all methods on FlushScheduler should be called
+// only with the DB mutex held or from a single-threaded recovery context.
+class FlushScheduler {
+ public:
+  FlushScheduler() : head_(nullptr) {}
+
+  // May be called from multiple threads at once, but not concurrent with
+  // any other method calls on this instance
+  void ScheduleFlush(ColumnFamilyData* cfd);
+
+  // Removes and returns Ref()-ed column family. Client needs to Unref().
+  // Filters column families that have been dropped.
+  ColumnFamilyData* TakeNextColumnFamily();
+
+  bool Empty();
+
+  void Clear();
+
+ private:
+  struct Node {
+    ColumnFamilyData* column_family;
+    Node* next;
+  };
+
+  std::atomic<Node*> head_;
+#ifndef NDEBUG
+  std::mutex checking_mutex_;
+  std::set<ColumnFamilyData*> checking_set_;
+#endif  // NDEBUG
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/forward_iterator.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/forward_iterator.cc 
b/thirdparty/rocksdb/db/forward_iterator.cc
new file mode 100644
index 0000000..65fff95
--- /dev/null
+++ b/thirdparty/rocksdb/db/forward_iterator.cc
@@ -0,0 +1,905 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+#include "db/forward_iterator.h"
+
+#include <limits>
+#include <string>
+#include <utility>
+
+#include "db/column_family.h"
+#include "db/db_impl.h"
+#include "db/db_iter.h"
+#include "db/dbformat.h"
+#include "db/job_context.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/slice_transform.h"
+#include "table/merging_iterator.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+// Usage:
+//     LevelIterator iter;
+//     iter.SetFileIndex(file_index);
+//     iter.Seek(target);
+//     iter.Next()
+class LevelIterator : public InternalIterator {
+ public:
+  LevelIterator(const ColumnFamilyData* const cfd,
+                const ReadOptions& read_options,
+                const std::vector<FileMetaData*>& files)
+      : cfd_(cfd),
+        read_options_(read_options),
+        files_(files),
+        valid_(false),
+        file_index_(std::numeric_limits<uint32_t>::max()),
+        file_iter_(nullptr),
+        pinned_iters_mgr_(nullptr) {}
+
+  ~LevelIterator() {
+    // Reset current pointer
+    if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+      pinned_iters_mgr_->PinIterator(file_iter_);
+    } else {
+      delete file_iter_;
+    }
+  }
+
+  void SetFileIndex(uint32_t file_index) {
+    assert(file_index < files_.size());
+    if (file_index != file_index_) {
+      file_index_ = file_index;
+      Reset();
+    }
+    valid_ = false;
+  }
+  void Reset() {
+    assert(file_index_ < files_.size());
+
+    // Reset current pointer
+    if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+      pinned_iters_mgr_->PinIterator(file_iter_);
+    } else {
+      delete file_iter_;
+    }
+
+    RangeDelAggregator range_del_agg(
+        cfd_->internal_comparator(), {} /* snapshots */);
+    file_iter_ = cfd_->table_cache()->NewIterator(
+        read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
+        files_[file_index_]->fd,
+        read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
+        nullptr /* table_reader_ptr */, nullptr, false);
+    file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
+    if (!range_del_agg.IsEmpty()) {
+      status_ = Status::NotSupported(
+          "Range tombstones unsupported with ForwardIterator");
+      valid_ = false;
+    }
+  }
+  void SeekToLast() override {
+    status_ = Status::NotSupported("LevelIterator::SeekToLast()");
+    valid_ = false;
+  }
+  void Prev() override {
+    status_ = Status::NotSupported("LevelIterator::Prev()");
+    valid_ = false;
+  }
+  bool Valid() const override {
+    return valid_;
+  }
+  void SeekToFirst() override {
+    SetFileIndex(0);
+    file_iter_->SeekToFirst();
+    valid_ = file_iter_->Valid();
+  }
+  void Seek(const Slice& internal_key) override {
+    assert(file_iter_ != nullptr);
+    file_iter_->Seek(internal_key);
+    valid_ = file_iter_->Valid();
+  }
+  void SeekForPrev(const Slice& internal_key) override {
+    status_ = Status::NotSupported("LevelIterator::SeekForPrev()");
+    valid_ = false;
+  }
+  void Next() override {
+    assert(valid_);
+    file_iter_->Next();
+    for (;;) {
+      if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) {
+        valid_ = !file_iter_->status().IsIncomplete();
+        return;
+      }
+      if (file_index_ + 1 >= files_.size()) {
+        valid_ = false;
+        return;
+      }
+      SetFileIndex(file_index_ + 1);
+      file_iter_->SeekToFirst();
+    }
+  }
+  Slice key() const override {
+    assert(valid_);
+    return file_iter_->key();
+  }
+  Slice value() const override {
+    assert(valid_);
+    return file_iter_->value();
+  }
+  Status status() const override {
+    if (!status_.ok()) {
+      return status_;
+    } else if (file_iter_ && !file_iter_->status().ok()) {
+      return file_iter_->status();
+    }
+    return Status::OK();
+  }
+  bool IsKeyPinned() const override {
+    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+           file_iter_->IsKeyPinned();
+  }
+  bool IsValuePinned() const override {
+    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+           file_iter_->IsValuePinned();
+  }
+  void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
+    pinned_iters_mgr_ = pinned_iters_mgr;
+    if (file_iter_) {
+      file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
+    }
+  }
+
+ private:
+  const ColumnFamilyData* const cfd_;
+  const ReadOptions& read_options_;
+  const std::vector<FileMetaData*>& files_;
+
+  bool valid_;
+  uint32_t file_index_;
+  Status status_;
+  InternalIterator* file_iter_;
+  PinnedIteratorsManager* pinned_iters_mgr_;
+};
+
+ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
+                                 ColumnFamilyData* cfd,
+                                 SuperVersion* current_sv)
+    : db_(db),
+      read_options_(read_options),
+      cfd_(cfd),
+      prefix_extractor_(cfd->ioptions()->prefix_extractor),
+      user_comparator_(cfd->user_comparator()),
+      immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
+      sv_(current_sv),
+      mutable_iter_(nullptr),
+      current_(nullptr),
+      valid_(false),
+      status_(Status::OK()),
+      immutable_status_(Status::OK()),
+      has_iter_trimmed_for_upper_bound_(false),
+      current_over_upper_bound_(false),
+      is_prev_set_(false),
+      is_prev_inclusive_(false),
+      pinned_iters_mgr_(nullptr) {
+  if (sv_) {
+    RebuildIterators(false);
+  }
+}
+
+ForwardIterator::~ForwardIterator() {
+  Cleanup(true);
+}
+
+namespace {
+// Used in PinnedIteratorsManager to release pinned SuperVersion
+static void ReleaseSuperVersionFunc(void* sv) {
+  delete reinterpret_cast<SuperVersion*>(sv);
+}
+}  // namespace
+
+void ForwardIterator::SVCleanup() {
+  if (sv_ != nullptr && sv_->Unref()) {
+    // Job id == 0 means that this is not our background process, but rather
+    // user thread
+    JobContext job_context(0);
+    db_->mutex_.Lock();
+    sv_->Cleanup();
+    db_->FindObsoleteFiles(&job_context, false, true);
+    if (read_options_.background_purge_on_iterator_cleanup) {
+      db_->ScheduleBgLogWriterClose(&job_context);
+    }
+    db_->mutex_.Unlock();
+    if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+      pinned_iters_mgr_->PinPtr(sv_, &ReleaseSuperVersionFunc);
+    } else {
+      delete sv_;
+    }
+    if (job_context.HaveSomethingToDelete()) {
+      db_->PurgeObsoleteFiles(
+          job_context, read_options_.background_purge_on_iterator_cleanup);
+    }
+    job_context.Clean();
+  }
+}
+
+void ForwardIterator::Cleanup(bool release_sv) {
+  if (mutable_iter_ != nullptr) {
+    DeleteIterator(mutable_iter_, true /* is_arena */);
+  }
+
+  for (auto* m : imm_iters_) {
+    DeleteIterator(m, true /* is_arena */);
+  }
+  imm_iters_.clear();
+
+  for (auto* f : l0_iters_) {
+    DeleteIterator(f);
+  }
+  l0_iters_.clear();
+
+  for (auto* l : level_iters_) {
+    DeleteIterator(l);
+  }
+  level_iters_.clear();
+
+  if (release_sv) {
+    SVCleanup();
+  }
+}
+
+bool ForwardIterator::Valid() const {
+  // See UpdateCurrent().
+  return valid_ ? !current_over_upper_bound_ : false;
+}
+
+void ForwardIterator::SeekToFirst() {
+  if (sv_ == nullptr) {
+    RebuildIterators(true);
+  } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
+    RenewIterators();
+  } else if (immutable_status_.IsIncomplete()) {
+    ResetIncompleteIterators();
+  }
+  SeekInternal(Slice(), true);
+}
+
+bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
+  return !(read_options_.iterate_upper_bound == nullptr ||
+           cfd_->internal_comparator().user_comparator()->Compare(
+               ExtractUserKey(internal_key),
+               *read_options_.iterate_upper_bound) < 0);
+}
+
+void ForwardIterator::Seek(const Slice& internal_key) {
+  if (IsOverUpperBound(internal_key)) {
+    valid_ = false;
+  }
+  if (sv_ == nullptr) {
+    RebuildIterators(true);
+  } else if (sv_->version_number != cfd_->GetSuperVersionNumber()) {
+    RenewIterators();
+  } else if (immutable_status_.IsIncomplete()) {
+    ResetIncompleteIterators();
+  }
+  SeekInternal(internal_key, false);
+}
+
+void ForwardIterator::SeekInternal(const Slice& internal_key,
+                                   bool seek_to_first) {
+  assert(mutable_iter_);
+  // mutable
+  seek_to_first ? mutable_iter_->SeekToFirst() :
+                  mutable_iter_->Seek(internal_key);
+
+  // immutable
+  // TODO(ljin): NeedToSeekImmutable has negative impact on performance
+  // if it turns to need to seek immutable often. We probably want to have
+  // an option to turn it off.
+  if (seek_to_first || NeedToSeekImmutable(internal_key)) {
+    immutable_status_ = Status::OK();
+    if (has_iter_trimmed_for_upper_bound_ &&
+        (
+            // prev_ is not set yet
+            is_prev_set_ == false ||
+            // We are doing SeekToFirst() and internal_key.size() = 0
+            seek_to_first ||
+            // prev_key_ > internal_key
+            cfd_->internal_comparator().InternalKeyComparator::Compare(
+                prev_key_.GetInternalKey(), internal_key) > 0)) {
+      // Some iterators are trimmed. Need to rebuild.
+      RebuildIterators(true);
+      // Already seeked mutable iter, so seek again
+      seek_to_first ? mutable_iter_->SeekToFirst()
+                    : mutable_iter_->Seek(internal_key);
+    }
+    {
+      auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
+      immutable_min_heap_.swap(tmp);
+    }
+    for (size_t i = 0; i < imm_iters_.size(); i++) {
+      auto* m = imm_iters_[i];
+      seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
+      if (!m->status().ok()) {
+        immutable_status_ = m->status();
+      } else if (m->Valid()) {
+        immutable_min_heap_.push(m);
+      }
+    }
+
+    Slice user_key;
+    if (!seek_to_first) {
+      user_key = ExtractUserKey(internal_key);
+    }
+    const VersionStorageInfo* vstorage = sv_->current->storage_info();
+    const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
+    for (size_t i = 0; i < l0.size(); ++i) {
+      if (!l0_iters_[i]) {
+        continue;
+      }
+      if (seek_to_first) {
+        l0_iters_[i]->SeekToFirst();
+      } else {
+        // If the target key passes over the larget key, we are sure Next()
+        // won't go over this file.
+        if (user_comparator_->Compare(user_key,
+              l0[i]->largest.user_key()) > 0) {
+          if (read_options_.iterate_upper_bound != nullptr) {
+            has_iter_trimmed_for_upper_bound_ = true;
+            DeleteIterator(l0_iters_[i]);
+            l0_iters_[i] = nullptr;
+          }
+          continue;
+        }
+        l0_iters_[i]->Seek(internal_key);
+      }
+
+      if (!l0_iters_[i]->status().ok()) {
+        immutable_status_ = l0_iters_[i]->status();
+      } else if (l0_iters_[i]->Valid()) {
+        if (!IsOverUpperBound(l0_iters_[i]->key())) {
+          immutable_min_heap_.push(l0_iters_[i]);
+        } else {
+          has_iter_trimmed_for_upper_bound_ = true;
+          DeleteIterator(l0_iters_[i]);
+          l0_iters_[i] = nullptr;
+        }
+      }
+    }
+
+    for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
+      const std::vector<FileMetaData*>& level_files =
+          vstorage->LevelFiles(level);
+      if (level_files.empty()) {
+        continue;
+      }
+      if (level_iters_[level - 1] == nullptr) {
+        continue;
+      }
+      uint32_t f_idx = 0;
+      if (!seek_to_first) {
+        f_idx = FindFileInRange(level_files, internal_key, 0,
+                                static_cast<uint32_t>(level_files.size()));
+      }
+
+      // Seek
+      if (f_idx < level_files.size()) {
+        level_iters_[level - 1]->SetFileIndex(f_idx);
+        seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
+                        level_iters_[level - 1]->Seek(internal_key);
+
+        if (!level_iters_[level - 1]->status().ok()) {
+          immutable_status_ = level_iters_[level - 1]->status();
+        } else if (level_iters_[level - 1]->Valid()) {
+          if (!IsOverUpperBound(level_iters_[level - 1]->key())) {
+            immutable_min_heap_.push(level_iters_[level - 1]);
+          } else {
+            // Nothing in this level is interesting. Remove.
+            has_iter_trimmed_for_upper_bound_ = true;
+            DeleteIterator(level_iters_[level - 1]);
+            level_iters_[level - 1] = nullptr;
+          }
+        }
+      }
+    }
+
+    if (seek_to_first) {
+      is_prev_set_ = false;
+    } else {
+      prev_key_.SetInternalKey(internal_key);
+      is_prev_set_ = true;
+      is_prev_inclusive_ = true;
+    }
+
+    TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Immutable", this);
+  } else if (current_ && current_ != mutable_iter_) {
+    // current_ is one of immutable iterators, push it back to the heap
+    immutable_min_heap_.push(current_);
+  }
+
+  UpdateCurrent();
+  TEST_SYNC_POINT_CALLBACK("ForwardIterator::SeekInternal:Return", this);
+}
+
+void ForwardIterator::Next() {
+  assert(valid_);
+  bool update_prev_key = false;
+
+  if (sv_ == nullptr ||
+      sv_->version_number != cfd_->GetSuperVersionNumber()) {
+    std::string current_key = key().ToString();
+    Slice old_key(current_key.data(), current_key.size());
+
+    if (sv_ == nullptr) {
+      RebuildIterators(true);
+    } else {
+      RenewIterators();
+    }
+    SeekInternal(old_key, false);
+    if (!valid_ || key().compare(old_key) != 0) {
+      return;
+    }
+  } else if (current_ != mutable_iter_) {
+    // It is going to advance immutable iterator
+
+    if (is_prev_set_ && prefix_extractor_) {
+      // advance prev_key_ to current_ only if they share the same prefix
+      update_prev_key =
+          prefix_extractor_->Transform(prev_key_.GetUserKey())
+              .compare(prefix_extractor_->Transform(current_->key())) == 0;
+    } else {
+      update_prev_key = true;
+    }
+
+
+    if (update_prev_key) {
+      prev_key_.SetInternalKey(current_->key());
+      is_prev_set_ = true;
+      is_prev_inclusive_ = false;
+    }
+  }
+
+  current_->Next();
+  if (current_ != mutable_iter_) {
+    if (!current_->status().ok()) {
+      immutable_status_ = current_->status();
+    } else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
+      immutable_min_heap_.push(current_);
+    } else {
+      if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
+        // remove the current iterator
+        DeleteCurrentIter();
+        current_ = nullptr;
+      }
+      if (update_prev_key) {
+        mutable_iter_->Seek(prev_key_.GetInternalKey());
+      }
+    }
+  }
+  UpdateCurrent();
+  TEST_SYNC_POINT_CALLBACK("ForwardIterator::Next:Return", this);
+}
+
+Slice ForwardIterator::key() const {
+  assert(valid_);
+  return current_->key();
+}
+
+Slice ForwardIterator::value() const {
+  assert(valid_);
+  return current_->value();
+}
+
+Status ForwardIterator::status() const {
+  if (!status_.ok()) {
+    return status_;
+  } else if (!mutable_iter_->status().ok()) {
+    return mutable_iter_->status();
+  }
+
+  return immutable_status_;
+}
+
+Status ForwardIterator::GetProperty(std::string prop_name, std::string* prop) {
+  assert(prop != nullptr);
+  if (prop_name == "rocksdb.iterator.super-version-number") {
+    *prop = ToString(sv_->version_number);
+    return Status::OK();
+  }
+  return Status::InvalidArgument();
+}
+
+void ForwardIterator::SetPinnedItersMgr(
+    PinnedIteratorsManager* pinned_iters_mgr) {
+  pinned_iters_mgr_ = pinned_iters_mgr;
+  UpdateChildrenPinnedItersMgr();
+}
+
+void ForwardIterator::UpdateChildrenPinnedItersMgr() {
+  // Set PinnedIteratorsManager for mutable memtable iterator.
+  if (mutable_iter_) {
+    mutable_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
+  }
+
+  // Set PinnedIteratorsManager for immutable memtable iterators.
+  for (InternalIterator* child_iter : imm_iters_) {
+    if (child_iter) {
+      child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
+    }
+  }
+
+  // Set PinnedIteratorsManager for L0 files iterators.
+  for (InternalIterator* child_iter : l0_iters_) {
+    if (child_iter) {
+      child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
+    }
+  }
+
+  // Set PinnedIteratorsManager for L1+ levels iterators.
+  for (LevelIterator* child_iter : level_iters_) {
+    if (child_iter) {
+      child_iter->SetPinnedItersMgr(pinned_iters_mgr_);
+    }
+  }
+}
+
+bool ForwardIterator::IsKeyPinned() const {
+  return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+         current_->IsKeyPinned();
+}
+
+bool ForwardIterator::IsValuePinned() const {
+  return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
+         current_->IsValuePinned();
+}
+
+void ForwardIterator::RebuildIterators(bool refresh_sv) {
+  // Clean up
+  Cleanup(refresh_sv);
+  if (refresh_sv) {
+    // New
+    sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
+  }
+  RangeDelAggregator range_del_agg(
+      InternalKeyComparator(cfd_->internal_comparator()), {} /* snapshots */);
+  mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
+  sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
+  if (!read_options_.ignore_range_deletions) {
+    std::unique_ptr<InternalIterator> range_del_iter(
+        sv_->mem->NewRangeTombstoneIterator(read_options_));
+    range_del_agg.AddTombstones(std::move(range_del_iter));
+    sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
+                                         &range_del_agg);
+  }
+  has_iter_trimmed_for_upper_bound_ = false;
+
+  const auto* vstorage = sv_->current->storage_info();
+  const auto& l0_files = vstorage->LevelFiles(0);
+  l0_iters_.reserve(l0_files.size());
+  for (const auto* l0 : l0_files) {
+    if ((read_options_.iterate_upper_bound != nullptr) &&
+        cfd_->internal_comparator().user_comparator()->Compare(
+            l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
+      has_iter_trimmed_for_upper_bound_ = true;
+      l0_iters_.push_back(nullptr);
+      continue;
+    }
+    l0_iters_.push_back(cfd_->table_cache()->NewIterator(
+        read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd,
+        read_options_.ignore_range_deletions ? nullptr : &range_del_agg));
+  }
+  BuildLevelIterators(vstorage);
+  current_ = nullptr;
+  is_prev_set_ = false;
+
+  UpdateChildrenPinnedItersMgr();
+  if (!range_del_agg.IsEmpty()) {
+    status_ = Status::NotSupported(
+        "Range tombstones unsupported with ForwardIterator");
+    valid_ = false;
+  }
+}
+
+void ForwardIterator::RenewIterators() {
+  SuperVersion* svnew;
+  assert(sv_);
+  svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
+
+  if (mutable_iter_ != nullptr) {
+    DeleteIterator(mutable_iter_, true /* is_arena */);
+  }
+  for (auto* m : imm_iters_) {
+    DeleteIterator(m, true /* is_arena */);
+  }
+  imm_iters_.clear();
+
+  mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
+  svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
+  RangeDelAggregator range_del_agg(
+      InternalKeyComparator(cfd_->internal_comparator()), {} /* snapshots */);
+  if (!read_options_.ignore_range_deletions) {
+    std::unique_ptr<InternalIterator> range_del_iter(
+        svnew->mem->NewRangeTombstoneIterator(read_options_));
+    range_del_agg.AddTombstones(std::move(range_del_iter));
+    sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
+                                         &range_del_agg);
+  }
+
+  const auto* vstorage = sv_->current->storage_info();
+  const auto& l0_files = vstorage->LevelFiles(0);
+  const auto* vstorage_new = svnew->current->storage_info();
+  const auto& l0_files_new = vstorage_new->LevelFiles(0);
+  size_t iold, inew;
+  bool found;
+  std::vector<InternalIterator*> l0_iters_new;
+  l0_iters_new.reserve(l0_files_new.size());
+
+  for (inew = 0; inew < l0_files_new.size(); inew++) {
+    found = false;
+    for (iold = 0; iold < l0_files.size(); iold++) {
+      if (l0_files[iold] == l0_files_new[inew]) {
+        found = true;
+        break;
+      }
+    }
+    if (found) {
+      if (l0_iters_[iold] == nullptr) {
+        l0_iters_new.push_back(nullptr);
+        TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Null", this);
+      } else {
+        l0_iters_new.push_back(l0_iters_[iold]);
+        l0_iters_[iold] = nullptr;
+        TEST_SYNC_POINT_CALLBACK("ForwardIterator::RenewIterators:Copy", this);
+      }
+      continue;
+    }
+    l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
+        read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
+        l0_files_new[inew]->fd,
+        read_options_.ignore_range_deletions ? nullptr : &range_del_agg));
+  }
+
+  for (auto* f : l0_iters_) {
+    DeleteIterator(f);
+  }
+  l0_iters_.clear();
+  l0_iters_ = l0_iters_new;
+
+  for (auto* l : level_iters_) {
+    DeleteIterator(l);
+  }
+  level_iters_.clear();
+  BuildLevelIterators(vstorage_new);
+  current_ = nullptr;
+  is_prev_set_ = false;
+  SVCleanup();
+  sv_ = svnew;
+
+  UpdateChildrenPinnedItersMgr();
+  if (!range_del_agg.IsEmpty()) {
+    status_ = Status::NotSupported(
+        "Range tombstones unsupported with ForwardIterator");
+    valid_ = false;
+  }
+}
+
+void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
+  level_iters_.reserve(vstorage->num_levels() - 1);
+  for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
+    const auto& level_files = vstorage->LevelFiles(level);
+    if ((level_files.empty()) ||
+        ((read_options_.iterate_upper_bound != nullptr) &&
+         (user_comparator_->Compare(*read_options_.iterate_upper_bound,
+                                    level_files[0]->smallest.user_key()) <
+          0))) {
+      level_iters_.push_back(nullptr);
+      if (!level_files.empty()) {
+        has_iter_trimmed_for_upper_bound_ = true;
+      }
+    } else {
+      level_iters_.push_back(
+          new LevelIterator(cfd_, read_options_, level_files));
+    }
+  }
+}
+
+void ForwardIterator::ResetIncompleteIterators() {
+  const auto& l0_files = sv_->current->storage_info()->LevelFiles(0);
+  for (size_t i = 0; i < l0_iters_.size(); ++i) {
+    assert(i < l0_files.size());
+    if (!l0_iters_[i] || !l0_iters_[i]->status().IsIncomplete()) {
+      continue;
+    }
+    DeleteIterator(l0_iters_[i]);
+    l0_iters_[i] = cfd_->table_cache()->NewIterator(
+        read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
+        l0_files[i]->fd, nullptr /* range_del_agg */);
+    l0_iters_[i]->SetPinnedItersMgr(pinned_iters_mgr_);
+  }
+
+  for (auto* level_iter : level_iters_) {
+    if (level_iter && level_iter->status().IsIncomplete()) {
+      level_iter->Reset();
+    }
+  }
+
+  current_ = nullptr;
+  is_prev_set_ = false;
+}
+
+void ForwardIterator::UpdateCurrent() {
+  if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
+    current_ = nullptr;
+  } else if (immutable_min_heap_.empty()) {
+    current_ = mutable_iter_;
+  } else if (!mutable_iter_->Valid()) {
+    current_ = immutable_min_heap_.top();
+    immutable_min_heap_.pop();
+  } else {
+    current_ = immutable_min_heap_.top();
+    assert(current_ != nullptr);
+    assert(current_->Valid());
+    int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
+        mutable_iter_->key(), current_->key());
+    assert(cmp != 0);
+    if (cmp > 0) {
+      immutable_min_heap_.pop();
+    } else {
+      current_ = mutable_iter_;
+    }
+  }
+  valid_ = (current_ != nullptr);
+  if (!status_.ok()) {
+    status_ = Status::OK();
+  }
+
+  // Upper bound doesn't apply to the memtable iterator. We want Valid() to
+  // return false when all iterators are over iterate_upper_bound, but can't
+  // just set valid_ to false, as that would effectively disable the tailing
+  // optimization (Seek() would be called on all immutable iterators regardless
+  // of whether the target key is greater than prev_key_).
+  current_over_upper_bound_ = valid_ && IsOverUpperBound(current_->key());
+}
+
+bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
+  // We maintain the interval (prev_key_, immutable_min_heap_.top()->key())
+  // such that there are no records with keys within that range in
+  // immutable_min_heap_. Since immutable structures (SST files and immutable
+  // memtables) can't change in this version, we don't need to do a seek if
+  // 'target' belongs to that interval (immutable_min_heap_.top() is already
+  // at the correct position).
+
+  if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
+    return true;
+  }
+  Slice prev_key = prev_key_.GetInternalKey();
+  if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
+    prefix_extractor_->Transform(prev_key)) != 0) {
+    return true;
+  }
+  if (cfd_->internal_comparator().InternalKeyComparator::Compare(
+        prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
+    return true;
+  }
+
+  if (immutable_min_heap_.empty() && current_ == mutable_iter_) {
+    // Nothing to seek on.
+    return false;
+  }
+  if (cfd_->internal_comparator().InternalKeyComparator::Compare(
+        target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
+                                          : current_->key()) > 0) {
+    return true;
+  }
+  return false;
+}
+
+void ForwardIterator::DeleteCurrentIter() {
+  const VersionStorageInfo* vstorage = sv_->current->storage_info();
+  const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
+  for (size_t i = 0; i < l0.size(); ++i) {
+    if (!l0_iters_[i]) {
+      continue;
+    }
+    if (l0_iters_[i] == current_) {
+      has_iter_trimmed_for_upper_bound_ = true;
+      DeleteIterator(l0_iters_[i]);
+      l0_iters_[i] = nullptr;
+      return;
+    }
+  }
+
+  for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
+    if (level_iters_[level - 1] == nullptr) {
+      continue;
+    }
+    if (level_iters_[level - 1] == current_) {
+      has_iter_trimmed_for_upper_bound_ = true;
+      DeleteIterator(level_iters_[level - 1]);
+      level_iters_[level - 1] = nullptr;
+    }
+  }
+}
+
+bool ForwardIterator::TEST_CheckDeletedIters(int* pdeleted_iters,
+                                             int* pnum_iters) {
+  bool retval = false;
+  int deleted_iters = 0;
+  int num_iters = 0;
+
+  const VersionStorageInfo* vstorage = sv_->current->storage_info();
+  const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
+  for (size_t i = 0; i < l0.size(); ++i) {
+    if (!l0_iters_[i]) {
+      retval = true;
+      deleted_iters++;
+    } else {
+      num_iters++;
+    }
+  }
+
+  for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
+    if ((level_iters_[level - 1] == nullptr) &&
+        (!vstorage->LevelFiles(level).empty())) {
+      retval = true;
+      deleted_iters++;
+    } else if (!vstorage->LevelFiles(level).empty()) {
+      num_iters++;
+    }
+  }
+  if ((!retval) && num_iters <= 1) {
+    retval = true;
+  }
+  if (pdeleted_iters) {
+    *pdeleted_iters = deleted_iters;
+  }
+  if (pnum_iters) {
+    *pnum_iters = num_iters;
+  }
+  return retval;
+}
+
+uint32_t ForwardIterator::FindFileInRange(
+    const std::vector<FileMetaData*>& files, const Slice& internal_key,
+    uint32_t left, uint32_t right) {
+  while (left < right) {
+    uint32_t mid = (left + right) / 2;
+    const FileMetaData* f = files[mid];
+    if (cfd_->internal_comparator().InternalKeyComparator::Compare(
+          f->largest.Encode(), internal_key) < 0) {
+      // Key at "mid.largest" is < "target".  Therefore all
+      // files at or before "mid" are uninteresting.
+      left = mid + 1;
+    } else {
+      // Key at "mid.largest" is >= "target".  Therefore all files
+      // after "mid" are uninteresting.
+      right = mid;
+    }
+  }
+  return right;
+}
+
+void ForwardIterator::DeleteIterator(InternalIterator* iter, bool is_arena) {
+  if (iter == nullptr) {
+    return;
+  }
+
+  if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
+    pinned_iters_mgr_->PinIterator(iter, is_arena);
+  } else {
+    if (is_arena) {
+      iter->~InternalIterator();
+    } else {
+      delete iter;
+    }
+  }
+}
+
+}  // namespace rocksdb
+
+#endif  // ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/forward_iterator.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/forward_iterator.h 
b/thirdparty/rocksdb/db/forward_iterator.h
new file mode 100644
index 0000000..d4f32cb
--- /dev/null
+++ b/thirdparty/rocksdb/db/forward_iterator.h
@@ -0,0 +1,153 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <string>
+#include <vector>
+#include <queue>
+
+#include "rocksdb/db.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "db/dbformat.h"
+#include "table/internal_iterator.h"
+#include "util/arena.h"
+
+namespace rocksdb {
+
+class DBImpl;
+class Env;
+struct SuperVersion;
+class ColumnFamilyData;
+class LevelIterator;
+class VersionStorageInfo;
+struct FileMetaData;
+
+class MinIterComparator {
+ public:
+  explicit MinIterComparator(const Comparator* comparator) :
+    comparator_(comparator) {}
+
+  bool operator()(InternalIterator* a, InternalIterator* b) {
+    return comparator_->Compare(a->key(), b->key()) > 0;
+  }
+ private:
+  const Comparator* comparator_;
+};
+
+typedef std::priority_queue<InternalIterator*, std::vector<InternalIterator*>,
+                            MinIterComparator> MinIterHeap;
+
+/**
+ * ForwardIterator is a special type of iterator that only supports Seek()
+ * and Next(). It is expected to perform better than TailingIterator by
+ * removing the encapsulation and making all information accessible within
+ * the iterator. At the current implementation, snapshot is taken at the
+ * time Seek() is called. The Next() followed do not see new values after.
+ */
+class ForwardIterator : public InternalIterator {
+ public:
+  ForwardIterator(DBImpl* db, const ReadOptions& read_options,
+                  ColumnFamilyData* cfd, SuperVersion* current_sv = nullptr);
+  virtual ~ForwardIterator();
+
+  void SeekForPrev(const Slice& target) override {
+    status_ = Status::NotSupported("ForwardIterator::SeekForPrev()");
+    valid_ = false;
+  }
+  void SeekToLast() override {
+    status_ = Status::NotSupported("ForwardIterator::SeekToLast()");
+    valid_ = false;
+  }
+  void Prev() override {
+    status_ = Status::NotSupported("ForwardIterator::Prev");
+    valid_ = false;
+  }
+
+  virtual bool Valid() const override;
+  void SeekToFirst() override;
+  virtual void Seek(const Slice& target) override;
+  virtual void Next() override;
+  virtual Slice key() const override;
+  virtual Slice value() const override;
+  virtual Status status() const override;
+  virtual Status GetProperty(std::string prop_name, std::string* prop) 
override;
+  virtual void SetPinnedItersMgr(
+      PinnedIteratorsManager* pinned_iters_mgr) override;
+  virtual bool IsKeyPinned() const override;
+  virtual bool IsValuePinned() const override;
+
+  bool TEST_CheckDeletedIters(int* deleted_iters, int* num_iters);
+
+ private:
+  void Cleanup(bool release_sv);
+  void SVCleanup();
+  void RebuildIterators(bool refresh_sv);
+  void RenewIterators();
+  void BuildLevelIterators(const VersionStorageInfo* vstorage);
+  void ResetIncompleteIterators();
+  void SeekInternal(const Slice& internal_key, bool seek_to_first);
+  void UpdateCurrent();
+  bool NeedToSeekImmutable(const Slice& internal_key);
+  void DeleteCurrentIter();
+  uint32_t FindFileInRange(
+    const std::vector<FileMetaData*>& files, const Slice& internal_key,
+    uint32_t left, uint32_t right);
+
+  bool IsOverUpperBound(const Slice& internal_key) const;
+
+  // Set PinnedIteratorsManager for all children Iterators, this function 
should
+  // be called whenever we update children Iterators or pinned_iters_mgr_.
+  void UpdateChildrenPinnedItersMgr();
+
+  // A helper function that will release iter in the proper manner, or pass it
+  // to pinned_iters_mgr_ to release it later if pinning is enabled.
+  void DeleteIterator(InternalIterator* iter, bool is_arena = false);
+
+  DBImpl* const db_;
+  const ReadOptions read_options_;
+  ColumnFamilyData* const cfd_;
+  const SliceTransform* const prefix_extractor_;
+  const Comparator* user_comparator_;
+  MinIterHeap immutable_min_heap_;
+
+  SuperVersion* sv_;
+  InternalIterator* mutable_iter_;
+  std::vector<InternalIterator*> imm_iters_;
+  std::vector<InternalIterator*> l0_iters_;
+  std::vector<LevelIterator*> level_iters_;
+  InternalIterator* current_;
+  bool valid_;
+
+  // Internal iterator status; set only by one of the unsupported methods.
+  Status status_;
+  // Status of immutable iterators, maintained here to avoid iterating over
+  // all of them in status().
+  Status immutable_status_;
+  // Indicates that at least one of the immutable iterators pointed to a key
+  // larger than iterate_upper_bound and was therefore destroyed. Seek() may
+  // need to rebuild such iterators.
+  bool has_iter_trimmed_for_upper_bound_;
+  // Is current key larger than iterate_upper_bound? If so, makes Valid()
+  // return false.
+  bool current_over_upper_bound_;
+
+  // Left endpoint of the range of keys that immutable iterators currently
+  // cover. When Seek() is called with a key that's within that range, 
immutable
+  // iterators don't need to be moved; see NeedToSeekImmutable(). This key is
+  // included in the range after a Seek(), but excluded when advancing the
+  // iterator using Next().
+  IterKey prev_key_;
+  bool is_prev_set_;
+  bool is_prev_inclusive_;
+
+  PinnedIteratorsManager* pinned_iters_mgr_;
+  Arena arena_;
+};
+
+}  // namespace rocksdb
+#endif  // ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/forward_iterator_bench.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/forward_iterator_bench.cc 
b/thirdparty/rocksdb/db/forward_iterator_bench.cc
new file mode 100644
index 0000000..e9ae770
--- /dev/null
+++ b/thirdparty/rocksdb/db/forward_iterator_bench.cc
@@ -0,0 +1,375 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#if !defined(GFLAGS) || defined(ROCKSDB_LITE)
+#include <cstdio>
+int main() {
+  fprintf(stderr, "Please install gflags to run rocksdb tools\n");
+  return 1;
+}
+#elif defined(OS_MACOSX) || defined(OS_WIN)
+// Block forward_iterator_bench under MAC and Windows
+int main() { return 0; }
+#else
+#include <gflags/gflags.h>
+#include <semaphore.h>
+#include <atomic>
+#include <bitset>
+#include <chrono>
+#include <climits>
+#include <condition_variable>
+#include <limits>
+#include <mutex>
+#include <queue>
+#include <random>
+#include <thread>
+
+#include "rocksdb/cache.h"
+#include "rocksdb/db.h"
+#include "rocksdb/status.h"
+#include "rocksdb/table.h"
+#include "port/port.h"
+#include "util/testharness.h"
+
+const int MAX_SHARDS = 100000;
+
+DEFINE_int32(writers, 8, "");
+DEFINE_int32(readers, 8, "");
+DEFINE_int64(rate, 100000, "");
+DEFINE_int64(value_size, 300, "");
+DEFINE_int64(shards, 1000, "");
+DEFINE_int64(memtable_size, 500000000, "");
+DEFINE_int64(block_cache_size, 300000000, "");
+DEFINE_int64(block_size, 65536, "");
+DEFINE_double(runtime, 300.0, "");
+DEFINE_bool(cache_only_first, true, "");
+DEFINE_bool(iterate_upper_bound, true, "");
+
+struct Stats {
+  char pad1[128] __attribute__((__unused__));
+  std::atomic<uint64_t> written{0};
+  char pad2[128] __attribute__((__unused__));
+  std::atomic<uint64_t> read{0};
+  std::atomic<uint64_t> cache_misses{0};
+  char pad3[128] __attribute__((__unused__));
+} stats;
+
+struct Key {
+  Key() {}
+  Key(uint64_t shard_in, uint64_t seqno_in)
+      : shard_be(htobe64(shard_in)), seqno_be(htobe64(seqno_in)) {}
+
+  uint64_t shard() const { return be64toh(shard_be); }
+  uint64_t seqno() const { return be64toh(seqno_be); }
+
+ private:
+  uint64_t shard_be;
+  uint64_t seqno_be;
+} __attribute__((__packed__));
+
+struct Reader;
+struct Writer;
+
+struct ShardState {
+  char pad1[128] __attribute__((__unused__));
+  std::atomic<uint64_t> last_written{0};
+  Writer* writer;
+  Reader* reader;
+  char pad2[128] __attribute__((__unused__));
+  std::atomic<uint64_t> last_read{0};
+  std::unique_ptr<rocksdb::Iterator> it;
+  std::unique_ptr<rocksdb::Iterator> it_cacheonly;
+  Key upper_bound;
+  rocksdb::Slice upper_bound_slice;
+  char pad3[128] __attribute__((__unused__));
+};
+
+struct Reader {
+ public:
+  explicit Reader(std::vector<ShardState>* shard_states, rocksdb::DB* db)
+      : shard_states_(shard_states), db_(db) {
+    sem_init(&sem_, 0, 0);
+    thread_ = port::Thread(&Reader::run, this);
+  }
+
+  void run() {
+    while (1) {
+      sem_wait(&sem_);
+      if (done_.load()) {
+        break;
+      }
+
+      uint64_t shard;
+      {
+        std::lock_guard<std::mutex> guard(queue_mutex_);
+        assert(!shards_pending_queue_.empty());
+        shard = shards_pending_queue_.front();
+        shards_pending_queue_.pop();
+        shards_pending_set_.reset(shard);
+      }
+      readOnceFromShard(shard);
+    }
+  }
+
+  void readOnceFromShard(uint64_t shard) {
+    ShardState& state = (*shard_states_)[shard];
+    if (!state.it) {
+      // Initialize iterators
+      rocksdb::ReadOptions options;
+      options.tailing = true;
+      if (FLAGS_iterate_upper_bound) {
+        state.upper_bound = Key(shard, std::numeric_limits<uint64_t>::max());
+        state.upper_bound_slice = rocksdb::Slice(
+            (const char*)&state.upper_bound, sizeof(state.upper_bound));
+        options.iterate_upper_bound = &state.upper_bound_slice;
+      }
+
+      state.it.reset(db_->NewIterator(options));
+
+      if (FLAGS_cache_only_first) {
+        options.read_tier = rocksdb::ReadTier::kBlockCacheTier;
+        state.it_cacheonly.reset(db_->NewIterator(options));
+      }
+    }
+
+    const uint64_t upto = state.last_written.load();
+    for (rocksdb::Iterator* it : {state.it_cacheonly.get(), state.it.get()}) {
+      if (it == nullptr) {
+        continue;
+      }
+      if (state.last_read.load() >= upto) {
+        break;
+      }
+      bool need_seek = true;
+      for (uint64_t seq = state.last_read.load() + 1; seq <= upto; ++seq) {
+        if (need_seek) {
+          Key from(shard, state.last_read.load() + 1);
+          it->Seek(rocksdb::Slice((const char*)&from, sizeof(from)));
+          need_seek = false;
+        } else {
+          it->Next();
+        }
+        if (it->status().IsIncomplete()) {
+          ++::stats.cache_misses;
+          break;
+        }
+        assert(it->Valid());
+        assert(it->key().size() == sizeof(Key));
+        Key key;
+        memcpy(&key, it->key().data(), it->key().size());
+        // fprintf(stderr, "Expecting (%ld, %ld) read (%ld, %ld)\n",
+        //         shard, seq, key.shard(), key.seqno());
+        assert(key.shard() == shard);
+        assert(key.seqno() == seq);
+        state.last_read.store(seq);
+        ++::stats.read;
+      }
+    }
+  }
+
+  void onWrite(uint64_t shard) {
+    {
+      std::lock_guard<std::mutex> guard(queue_mutex_);
+      if (!shards_pending_set_.test(shard)) {
+        shards_pending_queue_.push(shard);
+        shards_pending_set_.set(shard);
+        sem_post(&sem_);
+      }
+    }
+  }
+
+  ~Reader() {
+    done_.store(true);
+    sem_post(&sem_);
+    thread_.join();
+  }
+
+ private:
+  char pad1[128] __attribute__((__unused__));
+  std::vector<ShardState>* shard_states_;
+  rocksdb::DB* db_;
+  rocksdb::port::Thread thread_;
+  sem_t sem_;
+  std::mutex queue_mutex_;
+  std::bitset<MAX_SHARDS + 1> shards_pending_set_;
+  std::queue<uint64_t> shards_pending_queue_;
+  std::atomic<bool> done_{false};
+  char pad2[128] __attribute__((__unused__));
+};
+
+struct Writer {
+  explicit Writer(std::vector<ShardState>* shard_states, rocksdb::DB* db)
+      : shard_states_(shard_states), db_(db) {}
+
+  void start() { thread_ = port::Thread(&Writer::run, this); }
+
+  void run() {
+    std::queue<std::chrono::steady_clock::time_point> workq;
+    std::chrono::steady_clock::time_point deadline(
+        std::chrono::steady_clock::now() +
+        std::chrono::nanoseconds((uint64_t)(1000000000 * FLAGS_runtime)));
+    std::vector<uint64_t> my_shards;
+    for (int i = 1; i <= FLAGS_shards; ++i) {
+      if ((*shard_states_)[i].writer == this) {
+        my_shards.push_back(i);
+      }
+    }
+
+    std::mt19937 rng{std::random_device()()};
+    std::uniform_int_distribution<int> shard_dist(
+        0, static_cast<int>(my_shards.size()) - 1);
+    std::string value(FLAGS_value_size, '*');
+
+    while (1) {
+      auto now = std::chrono::steady_clock::now();
+      if (FLAGS_runtime >= 0 && now >= deadline) {
+        break;
+      }
+      if (workq.empty()) {
+        for (int i = 0; i < FLAGS_rate; i += FLAGS_writers) {
+          std::chrono::nanoseconds offset(1000000000LL * i / FLAGS_rate);
+          workq.push(now + offset);
+        }
+      }
+      while (!workq.empty() && workq.front() < now) {
+        workq.pop();
+        uint64_t shard = my_shards[shard_dist(rng)];
+        ShardState& state = (*shard_states_)[shard];
+        uint64_t seqno = state.last_written.load() + 1;
+        Key key(shard, seqno);
+        // fprintf(stderr, "Writing (%ld, %ld)\n", shard, seqno);
+        rocksdb::Status status =
+            db_->Put(rocksdb::WriteOptions(),
+                     rocksdb::Slice((const char*)&key, sizeof(key)),
+                     rocksdb::Slice(value));
+        assert(status.ok());
+        state.last_written.store(seqno);
+        state.reader->onWrite(shard);
+        ++::stats.written;
+      }
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+    // fprintf(stderr, "Writer done\n");
+  }
+
+  ~Writer() { thread_.join(); }
+
+ private:
+  char pad1[128] __attribute__((__unused__));
+  std::vector<ShardState>* shard_states_;
+  rocksdb::DB* db_;
+  rocksdb::port::Thread thread_;
+  char pad2[128] __attribute__((__unused__));
+};
+
+struct StatsThread {
+  explicit StatsThread(rocksdb::DB* db)
+      : db_(db), thread_(&StatsThread::run, this) {}
+
+  void run() {
+    //    using namespace std::chrono;
+    auto tstart = std::chrono::steady_clock::now(), tlast = tstart;
+    uint64_t wlast = 0, rlast = 0;
+    while (!done_.load()) {
+      {
+        std::unique_lock<std::mutex> lock(cvm_);
+        cv_.wait_for(lock, std::chrono::seconds(1));
+      }
+      auto now = std::chrono::steady_clock::now();
+      double elapsed =
+          std::chrono::duration_cast<std::chrono::duration<double> >(
+              now - tlast).count();
+      uint64_t w = ::stats.written.load();
+      uint64_t r = ::stats.read.load();
+      fprintf(stderr,
+              "%s elapsed %4lds | written %10ld | w/s %10.0f | read %10ld | "
+              "r/s %10.0f | cache misses %10ld\n",
+              db_->GetEnv()->TimeToString(time(nullptr)).c_str(),
+              std::chrono::duration_cast<std::chrono::seconds>(now - tstart)
+                  .count(),
+              w, (w - wlast) / elapsed, r, (r - rlast) / elapsed,
+              ::stats.cache_misses.load());
+      wlast = w;
+      rlast = r;
+      tlast = now;
+    }
+  }
+
+  ~StatsThread() {
+    {
+      std::lock_guard<std::mutex> guard(cvm_);
+      done_.store(true);
+    }
+    cv_.notify_all();
+    thread_.join();
+  }
+
+ private:
+  rocksdb::DB* db_;
+  std::mutex cvm_;
+  std::condition_variable cv_;
+  rocksdb::port::Thread thread_;
+  std::atomic<bool> done_{false};
+};
+
+int main(int argc, char** argv) {
+  GFLAGS::ParseCommandLineFlags(&argc, &argv, true);
+
+  std::mt19937 rng{std::random_device()()};
+  rocksdb::Status status;
+  std::string path = rocksdb::test::TmpDir() + "/forward_iterator_test";
+  fprintf(stderr, "db path is %s\n", path.c_str());
+  rocksdb::Options options;
+  options.create_if_missing = true;
+  options.compression = rocksdb::CompressionType::kNoCompression;
+  options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone;
+  options.level0_slowdown_writes_trigger = 99999;
+  options.level0_stop_writes_trigger = 99999;
+  options.use_direct_io_for_flush_and_compaction = true;
+  options.write_buffer_size = FLAGS_memtable_size;
+  rocksdb::BlockBasedTableOptions table_options;
+  table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size);
+  table_options.block_size = FLAGS_block_size;
+  options.table_factory.reset(
+      rocksdb::NewBlockBasedTableFactory(table_options));
+
+  status = rocksdb::DestroyDB(path, options);
+  assert(status.ok());
+  rocksdb::DB* db_raw;
+  status = rocksdb::DB::Open(options, path, &db_raw);
+  assert(status.ok());
+  std::unique_ptr<rocksdb::DB> db(db_raw);
+
+  std::vector<ShardState> shard_states(FLAGS_shards + 1);
+  std::deque<Reader> readers;
+  while (static_cast<int>(readers.size()) < FLAGS_readers) {
+    readers.emplace_back(&shard_states, db_raw);
+  }
+  std::deque<Writer> writers;
+  while (static_cast<int>(writers.size()) < FLAGS_writers) {
+    writers.emplace_back(&shard_states, db_raw);
+  }
+
+  // Each shard gets a random reader and random writer assigned to it
+  for (int i = 1; i <= FLAGS_shards; ++i) {
+    std::uniform_int_distribution<int> reader_dist(0, FLAGS_readers - 1);
+    std::uniform_int_distribution<int> writer_dist(0, FLAGS_writers - 1);
+    shard_states[i].reader = &readers[reader_dist(rng)];
+    shard_states[i].writer = &writers[writer_dist(rng)];
+  }
+
+  StatsThread stats_thread(db_raw);
+  for (Writer& w : writers) {
+    w.start();
+  }
+
+  writers.clear();
+  readers.clear();
+}
+#endif  // !defined(GFLAGS) || defined(ROCKSDB_LITE)

Reply via email to