http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/range_del_aggregator.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/range_del_aggregator.cc 
b/thirdparty/rocksdb/db/range_del_aggregator.cc
new file mode 100644
index 0000000..0aa5d22
--- /dev/null
+++ b/thirdparty/rocksdb/db/range_del_aggregator.cc
@@ -0,0 +1,519 @@
+//  Copyright (c) 2016-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "db/range_del_aggregator.h"
+
+#include <algorithm>
+
+namespace rocksdb {
+
+RangeDelAggregator::RangeDelAggregator(
+    const InternalKeyComparator& icmp,
+    const std::vector<SequenceNumber>& snapshots,
+    bool collapse_deletions /* = true */)
+    : upper_bound_(kMaxSequenceNumber),
+      icmp_(icmp),
+      collapse_deletions_(collapse_deletions) {
+  InitRep(snapshots);
+}
+
+RangeDelAggregator::RangeDelAggregator(const InternalKeyComparator& icmp,
+                                       SequenceNumber snapshot,
+                                       bool collapse_deletions /* = false */)
+    : upper_bound_(snapshot),
+      icmp_(icmp),
+      collapse_deletions_(collapse_deletions) {}
+
+void RangeDelAggregator::InitRep(const std::vector<SequenceNumber>& snapshots) 
{
+  assert(rep_ == nullptr);
+  rep_.reset(new Rep());
+  for (auto snapshot : snapshots) {
+    rep_->stripe_map_.emplace(
+        snapshot,
+        PositionalTombstoneMap(TombstoneMap(
+            stl_wrappers::LessOfComparator(icmp_.user_comparator()))));
+  }
+  // Data newer than any snapshot falls in this catch-all stripe
+  rep_->stripe_map_.emplace(
+      kMaxSequenceNumber,
+      PositionalTombstoneMap(TombstoneMap(
+          stl_wrappers::LessOfComparator(icmp_.user_comparator()))));
+  rep_->pinned_iters_mgr_.StartPinning();
+}
+
+bool RangeDelAggregator::ShouldDelete(
+    const Slice& internal_key, RangeDelAggregator::RangePositioningMode mode) {
+  if (rep_ == nullptr) {
+    return false;
+  }
+  ParsedInternalKey parsed;
+  if (!ParseInternalKey(internal_key, &parsed)) {
+    assert(false);
+  }
+  return ShouldDelete(parsed, mode);
+}
+
+bool RangeDelAggregator::ShouldDelete(
+    const ParsedInternalKey& parsed,
+    RangeDelAggregator::RangePositioningMode mode) {
+  assert(IsValueType(parsed.type));
+  if (rep_ == nullptr) {
+    return false;
+  }
+  auto& positional_tombstone_map = GetPositionalTombstoneMap(parsed.sequence);
+  const auto& tombstone_map = positional_tombstone_map.raw_map;
+  if (tombstone_map.empty()) {
+    return false;
+  }
+  auto& tombstone_map_iter = positional_tombstone_map.iter;
+  if (tombstone_map_iter == tombstone_map.end() &&
+      (mode == kForwardTraversal || mode == kBackwardTraversal)) {
+    // invalid (e.g., if AddTombstones() changed the deletions), so need to
+    // reseek
+    mode = kBinarySearch;
+  }
+  switch (mode) {
+    case kFullScan:
+      assert(!collapse_deletions_);
+      // The maintained state (PositionalTombstoneMap::iter) isn't useful when
+      // we linear scan from the beginning each time, but we maintain it 
anyways
+      // for consistency.
+      tombstone_map_iter = tombstone_map.begin();
+      while (tombstone_map_iter != tombstone_map.end()) {
+        const auto& tombstone = tombstone_map_iter->second;
+        if (icmp_.user_comparator()->Compare(parsed.user_key,
+                                             tombstone.start_key_) < 0) {
+          break;
+        }
+        if (parsed.sequence < tombstone.seq_ &&
+            icmp_.user_comparator()->Compare(parsed.user_key,
+                                             tombstone.end_key_) < 0) {
+          return true;
+        }
+        ++tombstone_map_iter;
+      }
+      return false;
+    case kForwardTraversal:
+      assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end());
+      if (tombstone_map_iter == tombstone_map.begin() &&
+          icmp_.user_comparator()->Compare(parsed.user_key,
+                                           tombstone_map_iter->first) < 0) {
+        // before start of deletion intervals
+        return false;
+      }
+      while (std::next(tombstone_map_iter) != tombstone_map.end() &&
+             icmp_.user_comparator()->Compare(
+                 std::next(tombstone_map_iter)->first, parsed.user_key) <= 0) {
+        ++tombstone_map_iter;
+      }
+      break;
+    case kBackwardTraversal:
+      assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end());
+      while (tombstone_map_iter != tombstone_map.begin() &&
+             icmp_.user_comparator()->Compare(parsed.user_key,
+                                              tombstone_map_iter->first) < 0) {
+        --tombstone_map_iter;
+      }
+      if (tombstone_map_iter == tombstone_map.begin() &&
+          icmp_.user_comparator()->Compare(parsed.user_key,
+                                           tombstone_map_iter->first) < 0) {
+        // before start of deletion intervals
+        return false;
+      }
+      break;
+    case kBinarySearch:
+      assert(collapse_deletions_);
+      tombstone_map_iter =
+          tombstone_map.upper_bound(parsed.user_key);
+      if (tombstone_map_iter == tombstone_map.begin()) {
+        // before start of deletion intervals
+        return false;
+      }
+      --tombstone_map_iter;
+      break;
+  }
+  assert(mode != kFullScan);
+  assert(tombstone_map_iter != tombstone_map.end() &&
+         icmp_.user_comparator()->Compare(tombstone_map_iter->first,
+                                          parsed.user_key) <= 0);
+  assert(std::next(tombstone_map_iter) == tombstone_map.end() ||
+         icmp_.user_comparator()->Compare(
+             parsed.user_key, std::next(tombstone_map_iter)->first) < 0);
+  return parsed.sequence < tombstone_map_iter->second.seq_;
+}
+
+bool RangeDelAggregator::ShouldAddTombstones(
+    bool bottommost_level /* = false */) {
+  // TODO(andrewkr): can we just open a file and throw it away if it ends up
+  // empty after AddToBuilder()? This function doesn't take into subcompaction
+  // boundaries so isn't completely accurate.
+  if (rep_ == nullptr) {
+    return false;
+  }
+  auto stripe_map_iter = rep_->stripe_map_.begin();
+  assert(stripe_map_iter != rep_->stripe_map_.end());
+  if (bottommost_level) {
+    // For the bottommost level, keys covered by tombstones in the first
+    // (oldest) stripe have been compacted away, so the tombstones are 
obsolete.
+    ++stripe_map_iter;
+  }
+  while (stripe_map_iter != rep_->stripe_map_.end()) {
+    if (!stripe_map_iter->second.raw_map.empty()) {
+      return true;
+    }
+    ++stripe_map_iter;
+  }
+  return false;
+}
+
+Status RangeDelAggregator::AddTombstones(
+    std::unique_ptr<InternalIterator> input) {
+  if (input == nullptr) {
+    return Status::OK();
+  }
+  input->SeekToFirst();
+  bool first_iter = true;
+  while (input->Valid()) {
+    if (first_iter) {
+      if (rep_ == nullptr) {
+        InitRep({upper_bound_});
+      } else {
+        InvalidateTombstoneMapPositions();
+      }
+      first_iter = false;
+    }
+    ParsedInternalKey parsed_key;
+    if (!ParseInternalKey(input->key(), &parsed_key)) {
+      return Status::Corruption("Unable to parse range tombstone InternalKey");
+    }
+    RangeTombstone tombstone(parsed_key, input->value());
+    AddTombstone(std::move(tombstone));
+    input->Next();
+  }
+  if (!first_iter) {
+    rep_->pinned_iters_mgr_.PinIterator(input.release(), false /* arena */);
+  }
+  return Status::OK();
+}
+
+void RangeDelAggregator::InvalidateTombstoneMapPositions() {
+  if (rep_ == nullptr) {
+    return;
+  }
+  for (auto stripe_map_iter = rep_->stripe_map_.begin();
+       stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) {
+    stripe_map_iter->second.iter = stripe_map_iter->second.raw_map.end();
+  }
+}
+
+Status RangeDelAggregator::AddTombstone(RangeTombstone tombstone) {
+  auto& positional_tombstone_map = GetPositionalTombstoneMap(tombstone.seq_);
+  auto& tombstone_map = positional_tombstone_map.raw_map;
+  if (collapse_deletions_) {
+    // In collapsed mode, we only fill the seq_ field in the TombstoneMap's
+    // values. The end_key is unneeded because we assume the tombstone extends
+    // until the next tombstone starts. For gaps between real tombstones and
+    // for the last real tombstone, we denote end keys by inserting fake
+    // tombstones with sequence number zero.
+    std::vector<RangeTombstone> new_range_dels{
+        tombstone, RangeTombstone(tombstone.end_key_, Slice(), 0)};
+    auto new_range_dels_iter = new_range_dels.begin();
+    // Position at the first overlapping existing tombstone; if none exists,
+    // insert until we find an existing one overlapping a new point
+    const Slice* tombstone_map_begin = nullptr;
+    if (!tombstone_map.empty()) {
+      tombstone_map_begin = &tombstone_map.begin()->first;
+    }
+    auto last_range_dels_iter = new_range_dels_iter;
+    while (new_range_dels_iter != new_range_dels.end() &&
+           (tombstone_map_begin == nullptr ||
+            icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_,
+                                             *tombstone_map_begin) < 0)) {
+      tombstone_map.emplace(
+          new_range_dels_iter->start_key_,
+          RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_));
+      last_range_dels_iter = new_range_dels_iter;
+      ++new_range_dels_iter;
+    }
+    if (new_range_dels_iter == new_range_dels.end()) {
+      return Status::OK();
+    }
+    // above loop advances one too far
+    new_range_dels_iter = last_range_dels_iter;
+    auto tombstone_map_iter =
+        tombstone_map.upper_bound(new_range_dels_iter->start_key_);
+    // if nothing overlapped we would've already inserted all the new points
+    // and returned early
+    assert(tombstone_map_iter != tombstone_map.begin());
+    tombstone_map_iter--;
+
+    // untermed_seq is non-kMaxSequenceNumber when we covered an existing point
+    // but haven't seen its corresponding endpoint. It's used for (1) deciding
+    // whether to forcibly insert the new interval's endpoint; and (2) possibly
+    // raising the seqnum for the to-be-inserted element (we insert the max
+    // seqnum between the next new interval and the unterminated interval).
+    SequenceNumber untermed_seq = kMaxSequenceNumber;
+    while (tombstone_map_iter != tombstone_map.end() &&
+           new_range_dels_iter != new_range_dels.end()) {
+      const Slice *tombstone_map_iter_end = nullptr,
+                  *new_range_dels_iter_end = nullptr;
+      if (tombstone_map_iter != tombstone_map.end()) {
+        auto next_tombstone_map_iter = std::next(tombstone_map_iter);
+        if (next_tombstone_map_iter != tombstone_map.end()) {
+          tombstone_map_iter_end = &next_tombstone_map_iter->first;
+        }
+      }
+      if (new_range_dels_iter != new_range_dels.end()) {
+        auto next_new_range_dels_iter = std::next(new_range_dels_iter);
+        if (next_new_range_dels_iter != new_range_dels.end()) {
+          new_range_dels_iter_end = &next_new_range_dels_iter->start_key_;
+        }
+      }
+
+      // our positions in existing/new tombstone collections should always
+      // overlap. The non-overlapping cases are handled above and below this
+      // loop.
+      assert(new_range_dels_iter_end == nullptr ||
+             icmp_.user_comparator()->Compare(tombstone_map_iter->first,
+                                              *new_range_dels_iter_end) < 0);
+      assert(tombstone_map_iter_end == nullptr ||
+             icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_,
+                                              *tombstone_map_iter_end) < 0);
+
+      int new_to_old_start_cmp = icmp_.user_comparator()->Compare(
+          new_range_dels_iter->start_key_, tombstone_map_iter->first);
+      // nullptr end means extends infinitely rightwards, set 
new_to_old_end_cmp
+      // accordingly so we can use common code paths later.
+      int new_to_old_end_cmp;
+      if (new_range_dels_iter_end == nullptr &&
+          tombstone_map_iter_end == nullptr) {
+        new_to_old_end_cmp = 0;
+      } else if (new_range_dels_iter_end == nullptr) {
+        new_to_old_end_cmp = 1;
+      } else if (tombstone_map_iter_end == nullptr) {
+        new_to_old_end_cmp = -1;
+      } else {
+        new_to_old_end_cmp = icmp_.user_comparator()->Compare(
+            *new_range_dels_iter_end, *tombstone_map_iter_end);
+      }
+
+      if (new_to_old_start_cmp < 0) {
+        // the existing one's left endpoint comes after, so raise/delete it if
+        // it's covered.
+        if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) {
+          untermed_seq = tombstone_map_iter->second.seq_;
+          if (tombstone_map_iter != tombstone_map.begin() &&
+              std::prev(tombstone_map_iter)->second.seq_ ==
+                  new_range_dels_iter->seq_) {
+            tombstone_map_iter = tombstone_map.erase(tombstone_map_iter);
+            --tombstone_map_iter;
+          } else {
+            tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_;
+          }
+        }
+      } else if (new_to_old_start_cmp > 0) {
+        if (untermed_seq != kMaxSequenceNumber ||
+            tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) {
+          auto seq = tombstone_map_iter->second.seq_;
+          // need to adjust this element if not intended to span beyond the new
+          // element (i.e., was_tombstone_map_iter_raised == true), or if it
+          // can be raised
+          tombstone_map_iter = tombstone_map.emplace(
+              new_range_dels_iter->start_key_,
+              RangeTombstone(
+                  Slice(), Slice(),
+                  std::max(
+                      untermed_seq == kMaxSequenceNumber ? 0 : untermed_seq,
+                      new_range_dels_iter->seq_)));
+          untermed_seq = seq;
+        }
+      } else {
+        // their left endpoints coincide, so raise the existing one if needed
+        if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) {
+          untermed_seq = tombstone_map_iter->second.seq_;
+          tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_;
+        }
+      }
+
+      // advance whichever one ends earlier, or both if their right endpoints
+      // coincide
+      if (new_to_old_end_cmp < 0) {
+        ++new_range_dels_iter;
+      } else if (new_to_old_end_cmp > 0) {
+        ++tombstone_map_iter;
+        untermed_seq = kMaxSequenceNumber;
+      } else {
+        ++new_range_dels_iter;
+        ++tombstone_map_iter;
+        untermed_seq = kMaxSequenceNumber;
+      }
+    }
+    while (new_range_dels_iter != new_range_dels.end()) {
+      tombstone_map.emplace(
+          new_range_dels_iter->start_key_,
+          RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_));
+      ++new_range_dels_iter;
+    }
+  } else {
+    tombstone_map.emplace(tombstone.start_key_, std::move(tombstone));
+  }
+  return Status::OK();
+}
+
+RangeDelAggregator::PositionalTombstoneMap&
+RangeDelAggregator::GetPositionalTombstoneMap(SequenceNumber seq) {
+  assert(rep_ != nullptr);
+  // The stripe includes seqnum for the snapshot above and excludes seqnum for
+  // the snapshot below.
+  StripeMap::iterator iter;
+  if (seq > 0) {
+    // upper_bound() checks strict inequality so need to subtract one
+    iter = rep_->stripe_map_.upper_bound(seq - 1);
+  } else {
+    iter = rep_->stripe_map_.begin();
+  }
+  // catch-all stripe justifies this assertion in either of above cases
+  assert(iter != rep_->stripe_map_.end());
+  return iter->second;
+}
+
+// TODO(andrewkr): We should implement an iterator over range tombstones in our
+// map. It'd enable compaction to open tables on-demand, i.e., only once range
+// tombstones are known to be available, without the code duplication we have
+// in ShouldAddTombstones(). It'll also allow us to move the table-modifying
+// code into more coherent places: CompactionJob and BuildTable().
+void RangeDelAggregator::AddToBuilder(
+    TableBuilder* builder, const Slice* lower_bound, const Slice* upper_bound,
+    FileMetaData* meta,
+    CompactionIterationStats* range_del_out_stats /* = nullptr */,
+    bool bottommost_level /* = false */) {
+  if (rep_ == nullptr) {
+    return;
+  }
+  auto stripe_map_iter = rep_->stripe_map_.begin();
+  assert(stripe_map_iter != rep_->stripe_map_.end());
+  if (bottommost_level) {
+    // TODO(andrewkr): these are counted for each compaction output file, so
+    // lots of double-counting.
+    if (!stripe_map_iter->second.raw_map.empty()) {
+      range_del_out_stats->num_range_del_drop_obsolete +=
+          static_cast<int64_t>(stripe_map_iter->second.raw_map.size()) -
+          (collapse_deletions_ ? 1 : 0);
+      range_del_out_stats->num_record_drop_obsolete +=
+          static_cast<int64_t>(stripe_map_iter->second.raw_map.size()) -
+          (collapse_deletions_ ? 1 : 0);
+    }
+    // For the bottommost level, keys covered by tombstones in the first
+    // (oldest) stripe have been compacted away, so the tombstones are 
obsolete.
+    ++stripe_map_iter;
+  }
+
+  // Note the order in which tombstones are stored is insignificant since we
+  // insert them into a std::map on the read path.
+  bool first_added = false;
+  while (stripe_map_iter != rep_->stripe_map_.end()) {
+    for (auto tombstone_map_iter = stripe_map_iter->second.raw_map.begin();
+         tombstone_map_iter != stripe_map_iter->second.raw_map.end();
+         ++tombstone_map_iter) {
+      RangeTombstone tombstone;
+      if (collapse_deletions_) {
+        auto next_tombstone_map_iter = std::next(tombstone_map_iter);
+        if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end() ||
+            tombstone_map_iter->second.seq_ == 0) {
+          // it's a sentinel tombstone
+          continue;
+        }
+        tombstone.start_key_ = tombstone_map_iter->first;
+        tombstone.end_key_ = next_tombstone_map_iter->first;
+        tombstone.seq_ = tombstone_map_iter->second.seq_;
+      } else {
+        tombstone = tombstone_map_iter->second;
+      }
+      if (upper_bound != nullptr &&
+          icmp_.user_comparator()->Compare(*upper_bound,
+                                           tombstone.start_key_) <= 0) {
+        // Tombstones starting at upper_bound or later only need to be included
+        // in the next table. Break because subsequent tombstones will start
+        // even later.
+        break;
+      }
+      if (lower_bound != nullptr &&
+          icmp_.user_comparator()->Compare(tombstone.end_key_,
+                                           *lower_bound) <= 0) {
+        // Tombstones ending before or at lower_bound only need to be included
+        // in the prev table. Continue because subsequent tombstones may still
+        // overlap [lower_bound, upper_bound).
+        continue;
+      }
+
+      auto ikey_and_end_key = tombstone.Serialize();
+      builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
+      if (!first_added) {
+        first_added = true;
+        InternalKey smallest_candidate = std::move(ikey_and_end_key.first);;
+        if (lower_bound != nullptr &&
+            icmp_.user_comparator()->Compare(smallest_candidate.user_key(),
+                                             *lower_bound) <= 0) {
+          // Pretend the smallest key has the same user key as lower_bound
+          // (the max key in the previous table or subcompaction) in order for
+          // files to appear key-space partitioned.
+          //
+          // Choose lowest seqnum so this file's smallest internal key comes
+          // after the previous file's/subcompaction's largest. The fake seqnum
+          // is OK because the read path's file-picking code only considers 
user
+          // key.
+          smallest_candidate = InternalKey(*lower_bound, 0, 
kTypeRangeDeletion);
+        }
+        if (meta->smallest.size() == 0 ||
+            icmp_.Compare(smallest_candidate, meta->smallest) < 0) {
+          meta->smallest = std::move(smallest_candidate);
+        }
+      }
+      InternalKey largest_candidate = tombstone.SerializeEndKey();
+      if (upper_bound != nullptr &&
+          icmp_.user_comparator()->Compare(*upper_bound,
+                                           largest_candidate.user_key()) <= 0) 
{
+        // Pretend the largest key has the same user key as upper_bound (the
+        // min key in the following table or subcompaction) in order for files
+        // to appear key-space partitioned.
+        //
+        // Choose highest seqnum so this file's largest internal key comes
+        // before the next file's/subcompaction's smallest. The fake seqnum is
+        // OK because the read path's file-picking code only considers the user
+        // key portion.
+        //
+        // Note Seek() also creates InternalKey with (user_key,
+        // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
+        // kTypeRangeDeletion (0xF), so the range tombstone comes before the
+        // Seek() key in InternalKey's ordering. So Seek() will look in the
+        // next file for the user key.
+        largest_candidate = InternalKey(*upper_bound, kMaxSequenceNumber,
+                                        kTypeRangeDeletion);
+      }
+      if (meta->largest.size() == 0 ||
+          icmp_.Compare(meta->largest, largest_candidate) < 0) {
+        meta->largest = std::move(largest_candidate);
+      }
+      meta->smallest_seqno = std::min(meta->smallest_seqno, tombstone.seq_);
+      meta->largest_seqno = std::max(meta->largest_seqno, tombstone.seq_);
+    }
+    ++stripe_map_iter;
+  }
+}
+
+bool RangeDelAggregator::IsEmpty() {
+  if (rep_ == nullptr) {
+    return true;
+  }
+  for (auto stripe_map_iter = rep_->stripe_map_.begin();
+       stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) {
+    if (!stripe_map_iter->second.raw_map.empty()) {
+      return false;
+    }
+  }
+  return true;
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/range_del_aggregator.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/range_del_aggregator.h 
b/thirdparty/rocksdb/db/range_del_aggregator.h
new file mode 100644
index 0000000..9d4b8ca
--- /dev/null
+++ b/thirdparty/rocksdb/db/range_del_aggregator.h
@@ -0,0 +1,161 @@
+//  Copyright (c) 2016-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "db/compaction_iteration_stats.h"
+#include "db/dbformat.h"
+#include "db/pinned_iterators_manager.h"
+#include "db/version_edit.h"
+#include "include/rocksdb/comparator.h"
+#include "include/rocksdb/types.h"
+#include "table/internal_iterator.h"
+#include "table/scoped_arena_iterator.h"
+#include "table/table_builder.h"
+#include "util/kv_map.h"
+
+namespace rocksdb {
+
+// A RangeDelAggregator aggregates range deletion tombstones as they are
+// encountered in memtables/SST files. It provides methods that check whether a
+// key is covered by range tombstones or write the relevant tombstones to a new
+// SST file.
+class RangeDelAggregator {
+ public:
+  // @param snapshots These are used to organize the tombstones into snapshot
+  //    stripes, which is the seqnum range between consecutive snapshots,
+  //    including the higher snapshot and excluding the lower one. Currently,
+  //    this is used by ShouldDelete() to prevent deletion of keys that are
+  //    covered by range tombstones in other snapshot stripes. This constructor
+  //    is used for writes (flush/compaction). All DB snapshots are provided
+  //    such that no keys are removed that are uncovered according to any DB
+  //    snapshot.
+  // Note this overload does not lazily initialize Rep.
+  RangeDelAggregator(const InternalKeyComparator& icmp,
+                     const std::vector<SequenceNumber>& snapshots,
+                     bool collapse_deletions = true);
+
+  // @param upper_bound Similar to snapshots above, except with a single
+  //    snapshot, which allows us to store the snapshot on the stack and defer
+  //    initialization of heap-allocating members (in Rep) until the first 
range
+  //    deletion is encountered. This constructor is used in case of reads 
(get/
+  //    iterator), for which only the user snapshot (upper_bound) is provided
+  //    such that the seqnum space is divided into two stripes. Only the older
+  //    stripe will be used by ShouldDelete().
+  RangeDelAggregator(const InternalKeyComparator& icmp,
+                     SequenceNumber upper_bound,
+                     bool collapse_deletions = false);
+
+  // We maintain position in the tombstone map across calls to ShouldDelete. 
The
+  // caller may wish to specify a mode to optimize positioning the iterator
+  // during the next call to ShouldDelete. The non-kFullScan modes are only
+  // available when deletion collapsing is enabled.
+  //
+  // For example, if we invoke Next() on an iterator, kForwardTraversal should
+  // be specified to advance one-by-one through deletions until one is found
+  // with its interval containing the key. This will typically be faster than
+  // doing a full binary search (kBinarySearch).
+  enum RangePositioningMode {
+    kFullScan,  // used iff collapse_deletions_ == false
+    kForwardTraversal,
+    kBackwardTraversal,
+    kBinarySearch,
+  };
+
+  // Returns whether the key should be deleted, which is the case when it is
+  // covered by a range tombstone residing in the same snapshot stripe.
+  // @param mode If collapse_deletions_ is true, this dictates how we will find
+  //             the deletion whose interval contains this key. Otherwise, its
+  //             value must be kFullScan indicating linear scan from 
beginning..
+  bool ShouldDelete(const ParsedInternalKey& parsed,
+                    RangePositioningMode mode = kFullScan);
+  bool ShouldDelete(const Slice& internal_key,
+                    RangePositioningMode mode = kFullScan);
+  bool ShouldAddTombstones(bool bottommost_level = false);
+
+  // Adds tombstones to the tombstone aggregation structure maintained by this
+  // object.
+  // @return non-OK status if any of the tombstone keys are corrupted.
+  Status AddTombstones(std::unique_ptr<InternalIterator> input);
+
+  // Resets iterators maintained across calls to ShouldDelete(). This may be
+  // called when the tombstones change, or the owner may call explicitly, e.g.,
+  // if it's an iterator that just seeked to an arbitrary position. The effect
+  // of invalidation is that the following call to ShouldDelete() will binary
+  // search for its tombstone.
+  void InvalidateTombstoneMapPositions();
+
+  // Writes tombstones covering a range to a table builder.
+  // @param extend_before_min_key If true, the range of tombstones to be added
+  //    to the TableBuilder starts from the beginning of the key-range;
+  //    otherwise, it starts from meta->smallest.
+  // @param lower_bound/upper_bound Any range deletion with [start_key, 
end_key)
+  //    that overlaps the target range [*lower_bound, *upper_bound) is added to
+  //    the builder. If lower_bound is nullptr, the target range extends
+  //    infinitely to the left. If upper_bound is nullptr, the target range
+  //    extends infinitely to the right. If both are nullptr, the target range
+  //    extends infinitely in both directions, i.e., all range deletions are
+  //    added to the builder.
+  // @param meta The file's metadata. We modify the begin and end keys 
according
+  //    to the range tombstones added to this file such that the read path does
+  //    not miss range tombstones that cover gaps before/after/between files in
+  //    a level. lower_bound/upper_bound above constrain how far file 
boundaries
+  //    can be extended.
+  // @param bottommost_level If true, we will filter out any tombstones
+  //    belonging to the oldest snapshot stripe, because all keys potentially
+  //    covered by this tombstone are guaranteed to have been deleted by
+  //    compaction.
+  void AddToBuilder(TableBuilder* builder, const Slice* lower_bound,
+                    const Slice* upper_bound, FileMetaData* meta,
+                    CompactionIterationStats* range_del_out_stats = nullptr,
+                    bool bottommost_level = false);
+  bool IsEmpty();
+
+ private:
+  // Maps tombstone user start key -> tombstone object
+  typedef std::multimap<Slice, RangeTombstone, stl_wrappers::LessOfComparator>
+      TombstoneMap;
+  // Also maintains position in TombstoneMap last seen by ShouldDelete(). The
+  // end iterator indicates invalidation (e.g., if AddTombstones() changes the
+  // underlying map). End iterator cannot be invalidated.
+  struct PositionalTombstoneMap {
+    explicit PositionalTombstoneMap(TombstoneMap _raw_map)
+        : raw_map(std::move(_raw_map)), iter(raw_map.end()) {}
+    PositionalTombstoneMap(const PositionalTombstoneMap&) = delete;
+    PositionalTombstoneMap(PositionalTombstoneMap&& other)
+        : raw_map(std::move(other.raw_map)), iter(raw_map.end()) {}
+
+    TombstoneMap raw_map;
+    TombstoneMap::const_iterator iter;
+  };
+
+  // Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e.,
+  // their seqnums are greater than the next smaller snapshot's seqnum.
+  typedef std::map<SequenceNumber, PositionalTombstoneMap> StripeMap;
+
+  struct Rep {
+    StripeMap stripe_map_;
+    PinnedIteratorsManager pinned_iters_mgr_;
+  };
+  // Initializes rep_ lazily. This aggregator object is constructed for every
+  // read, so expensive members should only be created when necessary, i.e.,
+  // once the first range deletion is encountered.
+  void InitRep(const std::vector<SequenceNumber>& snapshots);
+
+  PositionalTombstoneMap& GetPositionalTombstoneMap(SequenceNumber seq);
+  Status AddTombstone(RangeTombstone tombstone);
+
+  SequenceNumber upper_bound_;
+  std::unique_ptr<Rep> rep_;
+  const InternalKeyComparator& icmp_;
+  // collapse range deletions so they're binary searchable
+  const bool collapse_deletions_;
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/repair.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/repair.cc b/thirdparty/rocksdb/db/repair.cc
new file mode 100644
index 0000000..9ed3260
--- /dev/null
+++ b/thirdparty/rocksdb/db/repair.cc
@@ -0,0 +1,650 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// Repairer does best effort recovery to recover as much data as possible after
+// a disaster without compromising consistency. It does not guarantee bringing
+// the database to a time consistent state.
+//
+// Repair process is broken into 4 phases:
+// (a) Find files
+// (b) Convert logs to tables
+// (c) Extract metadata
+// (d) Write Descriptor
+//
+// (a) Find files
+//
+// The repairer goes through all the files in the directory, and classifies 
them
+// based on their file name. Any file that cannot be identified by name will be
+// ignored.
+//
+// (b) Convert logs to table
+//
+// Every log file that is active is replayed. All sections of the file where 
the
+// checksum does not match is skipped over. We intentionally give preference to
+// data consistency.
+//
+// (c) Extract metadata
+//
+// We scan every table to compute
+// (1) smallest/largest for the table
+// (2) largest sequence number in the table
+//
+// If we are unable to scan the file, then we ignore the table.
+//
+// (d) Write Descriptor
+//
+// We generate descriptor contents:
+//  - log number is set to zero
+//  - next-file-number is set to 1 + largest file number we found
+//  - last-sequence-number is set to largest sequence# found across
+//    all tables (see 2c)
+//  - compaction pointers are cleared
+//  - every table file is added at level 0
+//
+// Possible optimization 1:
+//   (a) Compute total size and use to pick appropriate max-level M
+//   (b) Sort tables by largest sequence# in the table
+//   (c) For each table: if it overlaps earlier table, place in level-0,
+//       else place in level-M.
+//   (d) We can provide options for time consistent recovery and unsafe 
recovery
+//       (ignore checksum failure when applicable)
+// Possible optimization 2:
+//   Store per-table metadata (smallest, largest, largest-seq#, ...)
+//   in the table's meta section to speed up ScanTable.
+
+#ifndef ROCKSDB_LITE
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include "db/builder.h"
+#include "db/db_impl.h"
+#include "db/dbformat.h"
+#include "db/log_reader.h"
+#include "db/log_writer.h"
+#include "db/memtable.h"
+#include "db/table_cache.h"
+#include "db/version_edit.h"
+#include "db/write_batch_internal.h"
+#include "options/cf_options.h"
+#include "rocksdb/comparator.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "rocksdb/write_buffer_manager.h"
+#include "table/scoped_arena_iterator.h"
+#include "util/file_reader_writer.h"
+#include "util/filename.h"
+#include "util/string_util.h"
+
+namespace rocksdb {
+
+namespace {
+
+class Repairer {
+ public:
+  Repairer(const std::string& dbname, const DBOptions& db_options,
+           const std::vector<ColumnFamilyDescriptor>& column_families,
+           const ColumnFamilyOptions& default_cf_opts,
+           const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs)
+      : dbname_(dbname),
+        env_(db_options.env),
+        env_options_(),
+        db_options_(SanitizeOptions(dbname_, db_options)),
+        immutable_db_options_(db_options_),
+        icmp_(default_cf_opts.comparator),
+        default_cf_opts_(default_cf_opts),
+        default_cf_iopts_(
+            ImmutableCFOptions(immutable_db_options_, default_cf_opts)),
+        unknown_cf_opts_(unknown_cf_opts),
+        create_unknown_cfs_(create_unknown_cfs),
+        raw_table_cache_(
+            // TableCache can be small since we expect each table to be opened
+            // once.
+            NewLRUCache(10, db_options_.table_cache_numshardbits)),
+        table_cache_(new TableCache(default_cf_iopts_, env_options_,
+                                    raw_table_cache_.get())),
+        wb_(db_options_.db_write_buffer_size),
+        wc_(db_options_.delayed_write_rate),
+        vset_(dbname_, &immutable_db_options_, env_options_,
+              raw_table_cache_.get(), &wb_, &wc_),
+        next_file_number_(1) {
+    for (const auto& cfd : column_families) {
+      cf_name_to_opts_[cfd.name] = cfd.options;
+    }
+  }
+
+  const ColumnFamilyOptions* GetColumnFamilyOptions(
+      const std::string& cf_name) {
+    if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) {
+      if (create_unknown_cfs_) {
+        return &unknown_cf_opts_;
+      }
+      return nullptr;
+    }
+    return &cf_name_to_opts_[cf_name];
+  }
+
+  // Adds a column family to the VersionSet with cf_options_ and updates
+  // manifest.
+  Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) {
+    const auto* cf_opts = GetColumnFamilyOptions(cf_name);
+    if (cf_opts == nullptr) {
+      return Status::Corruption("Encountered unknown column family with name=" 
+
+                                cf_name + ", id=" + ToString(cf_id));
+    }
+    Options opts(db_options_, *cf_opts);
+    MutableCFOptions mut_cf_opts(opts);
+
+    VersionEdit edit;
+    edit.SetComparatorName(opts.comparator->Name());
+    edit.SetLogNumber(0);
+    edit.SetColumnFamily(cf_id);
+    ColumnFamilyData* cfd;
+    cfd = nullptr;
+    edit.AddColumnFamily(cf_name);
+
+    mutex_.Lock();
+    Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_,
+                                      nullptr /* db_directory */,
+                                      false /* new_descriptor_log */, cf_opts);
+    mutex_.Unlock();
+    return status;
+  }
+
+  ~Repairer() {
+    delete table_cache_;
+  }
+
+  Status Run() {
+    Status status = FindFiles();
+    if (status.ok()) {
+      // Discard older manifests and start a fresh one
+      for (size_t i = 0; i < manifests_.size(); i++) {
+        ArchiveFile(dbname_ + "/" + manifests_[i]);
+      }
+      // Just create a DBImpl temporarily so we can reuse NewDB()
+      DBImpl* db_impl = new DBImpl(db_options_, dbname_);
+      status = db_impl->NewDB();
+      delete db_impl;
+    }
+
+    if (status.ok()) {
+      // Recover using the fresh manifest created by NewDB()
+      status =
+          vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false);
+    }
+    if (status.ok()) {
+      // Need to scan existing SST files first so the column families are
+      // created before we process WAL files
+      ExtractMetaData();
+
+      // ExtractMetaData() uses table_fds_ to know which SST files' metadata to
+      // extract -- we need to clear it here since metadata for existing SST
+      // files has been extracted already
+      table_fds_.clear();
+      ConvertLogFilesToTables();
+      ExtractMetaData();
+      status = AddTables();
+    }
+    if (status.ok()) {
+      uint64_t bytes = 0;
+      for (size_t i = 0; i < tables_.size(); i++) {
+        bytes += tables_[i].meta.fd.GetFileSize();
+      }
+      ROCKS_LOG_WARN(db_options_.info_log,
+                     "**** Repaired rocksdb %s; "
+                     "recovered %" ROCKSDB_PRIszt " files; %" PRIu64
+                     "bytes. "
+                     "Some data may have been lost. "
+                     "****",
+                     dbname_.c_str(), tables_.size(), bytes);
+    }
+    return status;
+  }
+
+ private:
+  struct TableInfo {
+    FileMetaData meta;
+    uint32_t column_family_id;
+    std::string column_family_name;
+    SequenceNumber min_sequence;
+    SequenceNumber max_sequence;
+  };
+
+  std::string const dbname_;
+  Env* const env_;
+  const EnvOptions env_options_;
+  const DBOptions db_options_;
+  const ImmutableDBOptions immutable_db_options_;
+  const InternalKeyComparator icmp_;
+  const ColumnFamilyOptions default_cf_opts_;
+  const ImmutableCFOptions default_cf_iopts_;  // table_cache_ holds reference
+  const ColumnFamilyOptions unknown_cf_opts_;
+  const bool create_unknown_cfs_;
+  std::shared_ptr<Cache> raw_table_cache_;
+  TableCache* table_cache_;
+  WriteBufferManager wb_;
+  WriteController wc_;
+  VersionSet vset_;
+  std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_;
+  InstrumentedMutex mutex_;
+
+  std::vector<std::string> manifests_;
+  std::vector<FileDescriptor> table_fds_;
+  std::vector<uint64_t> logs_;
+  std::vector<TableInfo> tables_;
+  uint64_t next_file_number_;
+
+  Status FindFiles() {
+    std::vector<std::string> filenames;
+    bool found_file = false;
+    std::vector<std::string> to_search_paths;
+
+    for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) 
{
+        to_search_paths.push_back(db_options_.db_paths[path_id].path);
+    }
+
+    // search wal_dir if user uses a customize wal_dir
+    if (!db_options_.wal_dir.empty() && 
+        db_options_.wal_dir != dbname_) {
+        to_search_paths.push_back(db_options_.wal_dir);
+    }
+
+    for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
+      Status status =
+          env_->GetChildren(to_search_paths[path_id], &filenames);
+      if (!status.ok()) {
+        return status;
+      }
+      if (!filenames.empty()) {
+        found_file = true;
+      }
+
+      uint64_t number;
+      FileType type;
+      for (size_t i = 0; i < filenames.size(); i++) {
+        if (ParseFileName(filenames[i], &number, &type)) {
+          if (type == kDescriptorFile) {
+            manifests_.push_back(filenames[i]);
+          } else {
+            if (number + 1 > next_file_number_) {
+              next_file_number_ = number + 1;
+            }
+            if (type == kLogFile) {
+              logs_.push_back(number);
+            } else if (type == kTableFile) {
+              table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
+                                      0);
+            } else {
+              // Ignore other files
+            }
+          }
+        }
+      }
+    }
+    if (!found_file) {
+      return Status::Corruption(dbname_, "repair found no files");
+    }
+    return Status::OK();
+  }
+
+  void ConvertLogFilesToTables() {
+    for (size_t i = 0; i < logs_.size(); i++) {
+      // we should use LogFileName(wal_dir, logs_[i]) here. user might uses 
wal_dir option.
+      std::string logname = LogFileName(db_options_.wal_dir, logs_[i]);
+      Status status = ConvertLogToTable(logs_[i]);
+      if (!status.ok()) {
+        ROCKS_LOG_WARN(db_options_.info_log,
+                       "Log #%" PRIu64 ": ignoring conversion error: %s",
+                       logs_[i], status.ToString().c_str());
+      }
+      ArchiveFile(logname);
+    }
+  }
+
+  Status ConvertLogToTable(uint64_t log) {
+    struct LogReporter : public log::Reader::Reporter {
+      Env* env;
+      std::shared_ptr<Logger> info_log;
+      uint64_t lognum;
+      virtual void Corruption(size_t bytes, const Status& s) override {
+        // We print error messages for corruption, but continue repairing.
+        ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s",
+                        lognum, static_cast<int>(bytes), s.ToString().c_str());
+      }
+    };
+
+    // Open the log file
+    std::string logname = LogFileName(db_options_.wal_dir, log);
+    unique_ptr<SequentialFile> lfile;
+    Status status = env_->NewSequentialFile(
+        logname, &lfile, env_->OptimizeForLogRead(env_options_));
+    if (!status.ok()) {
+      return status;
+    }
+    unique_ptr<SequentialFileReader> lfile_reader(
+        new SequentialFileReader(std::move(lfile)));
+
+    // Create the log reader.
+    LogReporter reporter;
+    reporter.env = env_;
+    reporter.info_log = db_options_.info_log;
+    reporter.lognum = log;
+    // We intentionally make log::Reader do checksumming so that
+    // corruptions cause entire commits to be skipped instead of
+    // propagating bad information (like overly large sequence
+    // numbers).
+    log::Reader reader(db_options_.info_log, std::move(lfile_reader), 
&reporter,
+                       true /*enable checksum*/, 0 /*initial_offset*/, log);
+
+    // Initialize per-column family memtables
+    for (auto* cfd : *vset_.GetColumnFamilySet()) {
+      cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
+                             kMaxSequenceNumber);
+    }
+    auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());
+
+    // Read all the records and add to a memtable
+    std::string scratch;
+    Slice record;
+    WriteBatch batch;
+    int counter = 0;
+    while (reader.ReadRecord(&record, &scratch)) {
+      if (record.size() < WriteBatchInternal::kHeader) {
+        reporter.Corruption(
+            record.size(), Status::Corruption("log record too small"));
+        continue;
+      }
+      WriteBatchInternal::SetContents(&batch, record);
+      status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr);
+      if (status.ok()) {
+        counter += WriteBatchInternal::Count(&batch);
+      } else {
+        ROCKS_LOG_WARN(db_options_.info_log, "Log #%" PRIu64 ": ignoring %s",
+                       log, status.ToString().c_str());
+        status = Status::OK();  // Keep going with rest of file
+      }
+    }
+
+    // Dump a table for each column family with entries in this log file.
+    for (auto* cfd : *vset_.GetColumnFamilySet()) {
+      // Do not record a version edit for this conversion to a Table
+      // since ExtractMetaData() will also generate edits.
+      MemTable* mem = cfd->mem();
+      if (mem->IsEmpty()) {
+        continue;
+      }
+
+      FileMetaData meta;
+      meta.fd = FileDescriptor(next_file_number_++, 0, 0);
+      ReadOptions ro;
+      ro.total_order_seek = true;
+      Arena arena;
+      ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
+      EnvOptions optimized_env_options =
+          env_->OptimizeForCompactionTableWrite(env_options_, 
immutable_db_options_);
+
+      int64_t _current_time = 0;
+      status = env_->GetCurrentTime(&_current_time);  // ignore error
+      const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+      status = BuildTable(
+          dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
+          optimized_env_options, table_cache_, iter.get(),
+          
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
+          &meta, cfd->internal_comparator(),
+          cfd->int_tbl_prop_collector_factories(), cfd->GetID(), 
cfd->GetName(),
+          {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false,
+          nullptr /* internal_stats */, TableFileCreationReason::kRecovery,
+          nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
+          nullptr /* table_properties */, -1 /* level */, current_time);
+      ROCKS_LOG_INFO(db_options_.info_log,
+                     "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
+                     log, counter, meta.fd.GetNumber(),
+                     status.ToString().c_str());
+      if (status.ok()) {
+        if (meta.fd.GetFileSize() > 0) {
+          table_fds_.push_back(meta.fd);
+        }
+      } else {
+        break;
+      }
+    }
+    delete cf_mems;
+    return status;
+  }
+
+  void ExtractMetaData() {
+    for (size_t i = 0; i < table_fds_.size(); i++) {
+      TableInfo t;
+      t.meta.fd = table_fds_[i];
+      Status status = ScanTable(&t);
+      if (!status.ok()) {
+        std::string fname = TableFileName(
+            db_options_.db_paths, t.meta.fd.GetNumber(), 
t.meta.fd.GetPathId());
+        char file_num_buf[kFormatFileNumberBufSize];
+        FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
+                         file_num_buf, sizeof(file_num_buf));
+        ROCKS_LOG_WARN(db_options_.info_log, "Table #%s: ignoring %s",
+                       file_num_buf, status.ToString().c_str());
+        ArchiveFile(fname);
+      } else {
+        tables_.push_back(t);
+      }
+    }
+  }
+
+  Status ScanTable(TableInfo* t) {
+    std::string fname = TableFileName(
+        db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId());
+    int counter = 0;
+    uint64_t file_size;
+    Status status = env_->GetFileSize(fname, &file_size);
+    t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
+                                file_size);
+    std::shared_ptr<const TableProperties> props;
+    if (status.ok()) {
+      status = table_cache_->GetTableProperties(env_options_, icmp_, 
t->meta.fd,
+                                                &props);
+    }
+    if (status.ok()) {
+      t->column_family_id = static_cast<uint32_t>(props->column_family_id);
+      if (t->column_family_id ==
+          TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) {
+        ROCKS_LOG_WARN(
+            db_options_.info_log,
+            "Table #%" PRIu64
+            ": column family unknown (probably due to legacy format); "
+            "adding to default column family id 0.",
+            t->meta.fd.GetNumber());
+        t->column_family_id = 0;
+      }
+
+      if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) ==
+          nullptr) {
+        status =
+            AddColumnFamily(props->column_family_name, t->column_family_id);
+      }
+    }
+    ColumnFamilyData* cfd = nullptr;
+    if (status.ok()) {
+      cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id);
+      if (cfd->GetName() != props->column_family_name) {
+        ROCKS_LOG_ERROR(
+            db_options_.info_log,
+            "Table #%" PRIu64
+            ": inconsistent column family name '%s'; expected '%s' for column "
+            "family id %" PRIu32 ".",
+            t->meta.fd.GetNumber(), props->column_family_name.c_str(),
+            cfd->GetName().c_str(), t->column_family_id);
+        status = Status::Corruption(dbname_, "inconsistent column family 
name");
+      }
+    }
+    if (status.ok()) {
+      InternalIterator* iter = table_cache_->NewIterator(
+          ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd,
+          nullptr /* range_del_agg */);
+      bool empty = true;
+      ParsedInternalKey parsed;
+      t->min_sequence = 0;
+      t->max_sequence = 0;
+      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+        Slice key = iter->key();
+        if (!ParseInternalKey(key, &parsed)) {
+          ROCKS_LOG_ERROR(db_options_.info_log,
+                          "Table #%" PRIu64 ": unparsable key %s",
+                          t->meta.fd.GetNumber(), EscapeString(key).c_str());
+          continue;
+        }
+
+        counter++;
+        if (empty) {
+          empty = false;
+          t->meta.smallest.DecodeFrom(key);
+          t->min_sequence = parsed.sequence;
+        }
+        t->meta.largest.DecodeFrom(key);
+        if (parsed.sequence < t->min_sequence) {
+          t->min_sequence = parsed.sequence;
+        }
+        if (parsed.sequence > t->max_sequence) {
+          t->max_sequence = parsed.sequence;
+        }
+      }
+      if (!iter->status().ok()) {
+        status = iter->status();
+      }
+      delete iter;
+
+      ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s",
+                     t->meta.fd.GetNumber(), counter,
+                     status.ToString().c_str());
+    }
+    return status;
+  }
+
+  Status AddTables() {
+    std::unordered_map<uint32_t, std::vector<const TableInfo*>> 
cf_id_to_tables;
+    SequenceNumber max_sequence = 0;
+    for (size_t i = 0; i < tables_.size(); i++) {
+      cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]);
+      if (max_sequence < tables_[i].max_sequence) {
+        max_sequence = tables_[i].max_sequence;
+      }
+    }
+    vset_.SetLastToBeWrittenSequence(max_sequence);
+    vset_.SetLastSequence(max_sequence);
+
+    for (const auto& cf_id_and_tables : cf_id_to_tables) {
+      auto* cfd =
+          vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first);
+      VersionEdit edit;
+      edit.SetComparatorName(cfd->user_comparator()->Name());
+      edit.SetLogNumber(0);
+      edit.SetNextFile(next_file_number_);
+      edit.SetColumnFamily(cfd->GetID());
+
+      // TODO(opt): separate out into multiple levels
+      for (const auto* table : cf_id_and_tables.second) {
+        edit.AddFile(0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
+                     table->meta.fd.GetFileSize(), table->meta.smallest,
+                     table->meta.largest, table->min_sequence,
+                     table->max_sequence, table->meta.marked_for_compaction);
+      }
+      mutex_.Lock();
+      Status status = vset_.LogAndApply(
+          cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
+          nullptr /* db_directory */, false /* new_descriptor_log */);
+      mutex_.Unlock();
+      if (!status.ok()) {
+        return status;
+      }
+    }
+    return Status::OK();
+  }
+
+  void ArchiveFile(const std::string& fname) {
+    // Move into another directory.  E.g., for
+    //    dir/foo
+    // rename to
+    //    dir/lost/foo
+    const char* slash = strrchr(fname.c_str(), '/');
+    std::string new_dir;
+    if (slash != nullptr) {
+      new_dir.assign(fname.data(), slash - fname.data());
+    }
+    new_dir.append("/lost");
+    env_->CreateDir(new_dir);  // Ignore error
+    std::string new_file = new_dir;
+    new_file.append("/");
+    new_file.append((slash == nullptr) ? fname.c_str() : slash + 1);
+    Status s = env_->RenameFile(fname, new_file);
+    ROCKS_LOG_INFO(db_options_.info_log, "Archiving %s: %s\n", fname.c_str(),
+                   s.ToString().c_str());
+  }
+};
+
+Status GetDefaultCFOptions(
+    const std::vector<ColumnFamilyDescriptor>& column_families,
+    ColumnFamilyOptions* res) {
+  assert(res != nullptr);
+  auto iter = std::find_if(column_families.begin(), column_families.end(),
+                           [](const ColumnFamilyDescriptor& cfd) {
+                             return cfd.name == kDefaultColumnFamilyName;
+                           });
+  if (iter == column_families.end()) {
+    return Status::InvalidArgument(
+        "column_families", "Must contain entry for default column family");
+  }
+  *res = iter->options;
+  return Status::OK();
+}
+}  // anonymous namespace
+
+Status RepairDB(const std::string& dbname, const DBOptions& db_options,
+                const std::vector<ColumnFamilyDescriptor>& column_families) {
+  ColumnFamilyOptions default_cf_opts;
+  Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
+  if (status.ok()) {
+    Repairer repairer(dbname, db_options, column_families, default_cf_opts,
+                      ColumnFamilyOptions() /* unknown_cf_opts */,
+                      false /* create_unknown_cfs */);
+    status = repairer.Run();
+  }
+  return status;
+}
+
+Status RepairDB(const std::string& dbname, const DBOptions& db_options,
+                const std::vector<ColumnFamilyDescriptor>& column_families,
+                const ColumnFamilyOptions& unknown_cf_opts) {
+  ColumnFamilyOptions default_cf_opts;
+  Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
+  if (status.ok()) {
+    Repairer repairer(dbname, db_options, column_families, default_cf_opts,
+                      unknown_cf_opts, true /* create_unknown_cfs */);
+    status = repairer.Run();
+  }
+  return status;
+}
+
+Status RepairDB(const std::string& dbname, const Options& options) {
+  DBOptions db_options(options);
+  ColumnFamilyOptions cf_options(options);
+  Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */,
+                    cf_options /* unknown_cf_opts */,
+                    true /* create_unknown_cfs */);
+  return repairer.Run();
+}
+
+}  // namespace rocksdb
+
+#endif  // ROCKSDB_LITE

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/snapshot_impl.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/snapshot_impl.cc 
b/thirdparty/rocksdb/db/snapshot_impl.cc
new file mode 100644
index 0000000..032ef39
--- /dev/null
+++ b/thirdparty/rocksdb/db/snapshot_impl.cc
@@ -0,0 +1,26 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+
+#include "rocksdb/snapshot.h"
+
+#include "rocksdb/db.h"
+
+namespace rocksdb {
+
+ManagedSnapshot::ManagedSnapshot(DB* db) : db_(db),
+                                           snapshot_(db->GetSnapshot()) {}
+
+ManagedSnapshot::ManagedSnapshot(DB* db, const Snapshot* _snapshot)
+    : db_(db), snapshot_(_snapshot) {}
+
+ManagedSnapshot::~ManagedSnapshot() {
+  if (snapshot_) {
+    db_->ReleaseSnapshot(snapshot_);
+  }
+}
+
+const Snapshot* ManagedSnapshot::snapshot() { return snapshot_;}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/snapshot_impl.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/snapshot_impl.h 
b/thirdparty/rocksdb/db/snapshot_impl.h
new file mode 100644
index 0000000..b94602f
--- /dev/null
+++ b/thirdparty/rocksdb/db/snapshot_impl.h
@@ -0,0 +1,130 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+#include <vector>
+
+#include "rocksdb/db.h"
+
+namespace rocksdb {
+
+class SnapshotList;
+
+// Snapshots are kept in a doubly-linked list in the DB.
+// Each SnapshotImpl corresponds to a particular sequence number.
+class SnapshotImpl : public Snapshot {
+ public:
+  SequenceNumber number_;  // const after creation
+
+  virtual SequenceNumber GetSequenceNumber() const override { return number_; }
+
+ private:
+  friend class SnapshotList;
+
+  // SnapshotImpl is kept in a doubly-linked circular list
+  SnapshotImpl* prev_;
+  SnapshotImpl* next_;
+
+  SnapshotList* list_;                 // just for sanity checks
+
+  int64_t unix_time_;
+
+  // Will this snapshot be used by a Transaction to do write-conflict checking?
+  bool is_write_conflict_boundary_;
+};
+
+class SnapshotList {
+ public:
+  SnapshotList() {
+    list_.prev_ = &list_;
+    list_.next_ = &list_;
+    list_.number_ = 0xFFFFFFFFL;      // placeholder marker, for debugging
+    count_ = 0;
+  }
+
+  bool empty() const { return list_.next_ == &list_; }
+  SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; }
+  SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; }
+
+  const SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq,
+                          uint64_t unix_time, bool is_write_conflict_boundary) 
{
+    s->number_ = seq;
+    s->unix_time_ = unix_time;
+    s->is_write_conflict_boundary_ = is_write_conflict_boundary;
+    s->list_ = this;
+    s->next_ = &list_;
+    s->prev_ = list_.prev_;
+    s->prev_->next_ = s;
+    s->next_->prev_ = s;
+    count_++;
+    return s;
+  }
+
+  // Do not responsible to free the object.
+  void Delete(const SnapshotImpl* s) {
+    assert(s->list_ == this);
+    s->prev_->next_ = s->next_;
+    s->next_->prev_ = s->prev_;
+    count_--;
+  }
+
+  // retrieve all snapshot numbers. They are sorted in ascending order.
+  std::vector<SequenceNumber> GetAll(
+      SequenceNumber* oldest_write_conflict_snapshot = nullptr) {
+    std::vector<SequenceNumber> ret;
+
+    if (oldest_write_conflict_snapshot != nullptr) {
+      *oldest_write_conflict_snapshot = kMaxSequenceNumber;
+    }
+
+    if (empty()) {
+      return ret;
+    }
+    SnapshotImpl* s = &list_;
+    while (s->next_ != &list_) {
+      ret.push_back(s->next_->number_);
+
+      if (oldest_write_conflict_snapshot != nullptr &&
+          *oldest_write_conflict_snapshot == kMaxSequenceNumber &&
+          s->next_->is_write_conflict_boundary_) {
+        // If this is the first write-conflict boundary snapshot in the list,
+        // it is the oldest
+        *oldest_write_conflict_snapshot = s->next_->number_;
+      }
+
+      s = s->next_;
+    }
+    return ret;
+  }
+
+  // get the sequence number of the most recent snapshot
+  SequenceNumber GetNewest() {
+    if (empty()) {
+      return 0;
+    }
+    return newest()->number_;
+  }
+
+  int64_t GetOldestSnapshotTime() const {
+    if (empty()) {
+      return 0;
+    } else {
+      return oldest()->unix_time_;
+    }
+  }
+
+  uint64_t count() const { return count_; }
+
+ private:
+  // Dummy head of doubly-linked list of snapshots
+  SnapshotImpl list_;
+  uint64_t count_;
+};
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/table_cache.cc
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/table_cache.cc 
b/thirdparty/rocksdb/db/table_cache.cc
new file mode 100644
index 0000000..b4d5cc1
--- /dev/null
+++ b/thirdparty/rocksdb/db/table_cache.cc
@@ -0,0 +1,472 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/table_cache.h"
+
+#include "db/dbformat.h"
+#include "db/version_edit.h"
+#include "util/filename.h"
+
+#include "monitoring/perf_context_imp.h"
+#include "rocksdb/statistics.h"
+#include "table/get_context.h"
+#include "table/internal_iterator.h"
+#include "table/iterator_wrapper.h"
+#include "table/table_builder.h"
+#include "table/table_reader.h"
+#include "util/coding.h"
+#include "util/file_reader_writer.h"
+#include "util/stop_watch.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+namespace {
+
+template <class T>
+static void DeleteEntry(const Slice& key, void* value) {
+  T* typed_value = reinterpret_cast<T*>(value);
+  delete typed_value;
+}
+
+static void UnrefEntry(void* arg1, void* arg2) {
+  Cache* cache = reinterpret_cast<Cache*>(arg1);
+  Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
+  cache->Release(h);
+}
+
+static void DeleteTableReader(void* arg1, void* arg2) {
+  TableReader* table_reader = reinterpret_cast<TableReader*>(arg1);
+  delete table_reader;
+}
+
+static Slice GetSliceForFileNumber(const uint64_t* file_number) {
+  return Slice(reinterpret_cast<const char*>(file_number),
+               sizeof(*file_number));
+}
+
+#ifndef ROCKSDB_LITE
+
+void AppendVarint64(IterKey* key, uint64_t v) {
+  char buf[10];
+  auto ptr = EncodeVarint64(buf, v);
+  key->TrimAppend(key->Size(), buf, ptr - buf);
+}
+
+#endif  // ROCKSDB_LITE
+
+}  // namespace
+
+TableCache::TableCache(const ImmutableCFOptions& ioptions,
+                       const EnvOptions& env_options, Cache* const cache)
+    : ioptions_(ioptions), env_options_(env_options), cache_(cache) {
+  if (ioptions_.row_cache) {
+    // If the same cache is shared by multiple instances, we need to
+    // disambiguate its entries.
+    PutVarint64(&row_cache_id_, ioptions_.row_cache->NewId());
+  }
+}
+
+TableCache::~TableCache() {
+}
+
+TableReader* TableCache::GetTableReaderFromHandle(Cache::Handle* handle) {
+  return reinterpret_cast<TableReader*>(cache_->Value(handle));
+}
+
+void TableCache::ReleaseHandle(Cache::Handle* handle) {
+  cache_->Release(handle);
+}
+
+Status TableCache::GetTableReader(
+    const EnvOptions& env_options,
+    const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
+    bool sequential_mode, size_t readahead, bool record_read_stats,
+    HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader,
+    bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
+    bool for_compaction) {
+  std::string fname =
+      TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
+  unique_ptr<RandomAccessFile> file;
+  Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);
+
+  RecordTick(ioptions_.statistics, NO_FILE_OPENS);
+  if (s.ok()) {
+    if (readahead > 0) {
+      file = NewReadaheadRandomAccessFile(std::move(file), readahead);
+    }
+    if (!sequential_mode && ioptions_.advise_random_on_open) {
+      file->Hint(RandomAccessFile::RANDOM);
+    }
+    StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
+    std::unique_ptr<RandomAccessFileReader> file_reader(
+        new RandomAccessFileReader(
+            std::move(file), fname, ioptions_.env,
+            record_read_stats ? ioptions_.statistics : nullptr, 
SST_READ_MICROS,
+            file_read_hist, ioptions_.rate_limiter, for_compaction));
+    s = ioptions_.table_factory->NewTableReader(
+        TableReaderOptions(ioptions_, env_options, internal_comparator,
+                           skip_filters, level),
+        std::move(file_reader), fd.GetFileSize(), table_reader,
+        prefetch_index_and_filter_in_cache);
+    TEST_SYNC_POINT("TableCache::GetTableReader:0");
+  }
+  return s;
+}
+
+void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) {
+  ReleaseHandle(handle);
+  uint64_t number = fd.GetNumber();
+  Slice key = GetSliceForFileNumber(&number);
+  cache_->Erase(key);
+}
+
+Status TableCache::FindTable(const EnvOptions& env_options,
+                             const InternalKeyComparator& internal_comparator,
+                             const FileDescriptor& fd, Cache::Handle** handle,
+                             const bool no_io, bool record_read_stats,
+                             HistogramImpl* file_read_hist, bool skip_filters,
+                             int level,
+                             bool prefetch_index_and_filter_in_cache) {
+  PERF_TIMER_GUARD(find_table_nanos);
+  Status s;
+  uint64_t number = fd.GetNumber();
+  Slice key = GetSliceForFileNumber(&number);
+  *handle = cache_->Lookup(key);
+  TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0",
+                           const_cast<bool*>(&no_io));
+
+  if (*handle == nullptr) {
+    if (no_io) {  // Don't do IO and return a not-found status
+      return Status::Incomplete("Table not found in table_cache, no_io is 
set");
+    }
+    unique_ptr<TableReader> table_reader;
+    s = GetTableReader(env_options, internal_comparator, fd,
+                       false /* sequential mode */, 0 /* readahead */,
+                       record_read_stats, file_read_hist, &table_reader,
+                       skip_filters, level, 
prefetch_index_and_filter_in_cache);
+    if (!s.ok()) {
+      assert(table_reader == nullptr);
+      RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
+      // We do not cache error results so that if the error is transient,
+      // or somebody repairs the file, we recover automatically.
+    } else {
+      s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
+                         handle);
+      if (s.ok()) {
+        // Release ownership of table reader.
+        table_reader.release();
+      }
+    }
+  }
+  return s;
+}
+
+InternalIterator* TableCache::NewIterator(
+    const ReadOptions& options, const EnvOptions& env_options,
+    const InternalKeyComparator& icomparator, const FileDescriptor& fd,
+    RangeDelAggregator* range_del_agg, TableReader** table_reader_ptr,
+    HistogramImpl* file_read_hist, bool for_compaction, Arena* arena,
+    bool skip_filters, int level) {
+  PERF_TIMER_GUARD(new_table_iterator_nanos);
+
+  Status s;
+  bool create_new_table_reader = false;
+  TableReader* table_reader = nullptr;
+  Cache::Handle* handle = nullptr;
+  if (s.ok()) {
+    if (table_reader_ptr != nullptr) {
+      *table_reader_ptr = nullptr;
+    }
+    size_t readahead = 0;
+    if (for_compaction) {
+#ifndef NDEBUG
+      bool use_direct_reads_for_compaction = env_options.use_direct_reads;
+      TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction",
+                               &use_direct_reads_for_compaction);
+#endif  // !NDEBUG
+      if (ioptions_.new_table_reader_for_compaction_inputs) {
+        readahead = ioptions_.compaction_readahead_size;
+        create_new_table_reader = true;
+      }
+    } else {
+      readahead = options.readahead_size;
+      create_new_table_reader = readahead > 0;
+    }
+
+    if (create_new_table_reader) {
+      unique_ptr<TableReader> table_reader_unique_ptr;
+      s = GetTableReader(
+          env_options, icomparator, fd, true /* sequential_mode */, readahead,
+          !for_compaction /* record stats */, nullptr, 
&table_reader_unique_ptr,
+          false /* skip_filters */, level,
+          true /* prefetch_index_and_filter_in_cache */, for_compaction);
+      if (s.ok()) {
+        table_reader = table_reader_unique_ptr.release();
+      }
+    } else {
+      table_reader = fd.table_reader;
+      if (table_reader == nullptr) {
+        s = FindTable(env_options, icomparator, fd, &handle,
+                      options.read_tier == kBlockCacheTier /* no_io */,
+                      !for_compaction /* record read_stats */, file_read_hist,
+                      skip_filters, level);
+        if (s.ok()) {
+          table_reader = GetTableReaderFromHandle(handle);
+        }
+      }
+    }
+  }
+  InternalIterator* result = nullptr;
+  if (s.ok()) {
+    result = table_reader->NewIterator(options, arena, skip_filters);
+    if (create_new_table_reader) {
+      assert(handle == nullptr);
+      result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);
+    } else if (handle != nullptr) {
+      result->RegisterCleanup(&UnrefEntry, cache_, handle);
+      handle = nullptr;  // prevent from releasing below
+    }
+
+    if (for_compaction) {
+      table_reader->SetupForCompaction();
+    }
+    if (table_reader_ptr != nullptr) {
+      *table_reader_ptr = table_reader;
+    }
+  }
+  if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) {
+    std::unique_ptr<InternalIterator> range_del_iter(
+        table_reader->NewRangeTombstoneIterator(options));
+    if (range_del_iter != nullptr) {
+      s = range_del_iter->status();
+    }
+    if (s.ok()) {
+      s = range_del_agg->AddTombstones(std::move(range_del_iter));
+    }
+  }
+
+  if (handle != nullptr) {
+    ReleaseHandle(handle);
+  }
+  if (!s.ok()) {
+    assert(result == nullptr);
+    result = NewErrorInternalIterator(s, arena);
+  }
+  return result;
+}
+
+InternalIterator* TableCache::NewRangeTombstoneIterator(
+    const ReadOptions& options, const EnvOptions& env_options,
+    const InternalKeyComparator& icomparator, const FileDescriptor& fd,
+    HistogramImpl* file_read_hist, bool skip_filters, int level) {
+  Status s;
+  TableReader* table_reader = nullptr;
+  Cache::Handle* handle = nullptr;
+  table_reader = fd.table_reader;
+  if (table_reader == nullptr) {
+    s = FindTable(env_options, icomparator, fd, &handle,
+                  options.read_tier == kBlockCacheTier /* no_io */,
+                  true /* record read_stats */, file_read_hist, skip_filters,
+                  level);
+    if (s.ok()) {
+      table_reader = GetTableReaderFromHandle(handle);
+    }
+  }
+  InternalIterator* result = nullptr;
+  if (s.ok()) {
+    result = table_reader->NewRangeTombstoneIterator(options);
+    if (result != nullptr) {
+      if (handle != nullptr) {
+        result->RegisterCleanup(&UnrefEntry, cache_, handle);
+      }
+    }
+  }
+  if (result == nullptr && handle != nullptr) {
+    // the range deletion block didn't exist, or there was a failure between
+    // getting handle and getting iterator.
+    ReleaseHandle(handle);
+  }
+  if (!s.ok()) {
+    assert(result == nullptr);
+    result = NewErrorInternalIterator(s);
+  }
+  return result;
+}
+
+Status TableCache::Get(const ReadOptions& options,
+                       const InternalKeyComparator& internal_comparator,
+                       const FileDescriptor& fd, const Slice& k,
+                       GetContext* get_context, HistogramImpl* file_read_hist,
+                       bool skip_filters, int level) {
+  std::string* row_cache_entry = nullptr;
+  bool done = false;
+#ifndef ROCKSDB_LITE
+  IterKey row_cache_key;
+  std::string row_cache_entry_buffer;
+
+  // Check row cache if enabled. Since row cache does not currently store
+  // sequence numbers, we cannot use it if we need to fetch the sequence.
+  if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
+    uint64_t fd_number = fd.GetNumber();
+    auto user_key = ExtractUserKey(k);
+    // We use the user key as cache key instead of the internal key,
+    // otherwise the whole cache would be invalidated every time the
+    // sequence key increases. However, to support caching snapshot
+    // reads, we append the sequence number (incremented by 1 to
+    // distinguish from 0) only in this case.
+    uint64_t seq_no =
+        options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k);
+
+    // Compute row cache key.
+    row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
+                             row_cache_id_.size());
+    AppendVarint64(&row_cache_key, fd_number);
+    AppendVarint64(&row_cache_key, seq_no);
+    row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(),
+                             user_key.size());
+
+    if (auto row_handle =
+            ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
+      // Cleanable routine to release the cache entry
+      Cleanable value_pinner;
+      auto release_cache_entry_func = [](void* cache_to_clean,
+                                         void* cache_handle) {
+        ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
+      };
+      auto found_row_cache_entry = static_cast<const std::string*>(
+          ioptions_.row_cache->Value(row_handle));
+      // If it comes here value is located on the cache.
+      // found_row_cache_entry points to the value on cache,
+      // and value_pinner has cleanup procedure for the cached entry.
+      // After replayGetContextLog() returns, get_context.pinnable_slice_
+      // will point to cache entry buffer (or a copy based on that) and
+      // cleanup routine under value_pinner will be delegated to
+      // get_context.pinnable_slice_. Cache entry is released when
+      // get_context.pinnable_slice_ is reset.
+      value_pinner.RegisterCleanup(release_cache_entry_func,
+                                   ioptions_.row_cache.get(), row_handle);
+      replayGetContextLog(*found_row_cache_entry, user_key, get_context,
+                          &value_pinner);
+      RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
+      done = true;
+    } else {
+      // Not found, setting up the replay log.
+      RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
+      row_cache_entry = &row_cache_entry_buffer;
+    }
+  }
+#endif  // ROCKSDB_LITE
+  Status s;
+  TableReader* t = fd.table_reader;
+  Cache::Handle* handle = nullptr;
+  if (!done && s.ok()) {
+    if (t == nullptr) {
+      s = FindTable(env_options_, internal_comparator, fd, &handle,
+                    options.read_tier == kBlockCacheTier /* no_io */,
+                    true /* record_read_stats */, file_read_hist, skip_filters,
+                    level);
+      if (s.ok()) {
+        t = GetTableReaderFromHandle(handle);
+      }
+    }
+    if (s.ok() && get_context->range_del_agg() != nullptr &&
+        !options.ignore_range_deletions) {
+      std::unique_ptr<InternalIterator> range_del_iter(
+          t->NewRangeTombstoneIterator(options));
+      if (range_del_iter != nullptr) {
+        s = range_del_iter->status();
+      }
+      if (s.ok()) {
+        s = get_context->range_del_agg()->AddTombstones(
+            std::move(range_del_iter));
+      }
+    }
+    if (s.ok()) {
+      get_context->SetReplayLog(row_cache_entry);  // nullptr if no cache.
+      s = t->Get(options, k, get_context, skip_filters);
+      get_context->SetReplayLog(nullptr);
+    } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
+      // Couldn't find Table in cache but treat as kFound if no_io set
+      get_context->MarkKeyMayExist();
+      s = Status::OK();
+      done = true;
+    }
+  }
+
+#ifndef ROCKSDB_LITE
+  // Put the replay log in row cache only if something was found.
+  if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
+    size_t charge =
+        row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
+    void* row_ptr = new std::string(std::move(*row_cache_entry));
+    ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
+                                &DeleteEntry<std::string>);
+  }
+#endif  // ROCKSDB_LITE
+
+  if (handle != nullptr) {
+    ReleaseHandle(handle);
+  }
+  return s;
+}
+
+Status TableCache::GetTableProperties(
+    const EnvOptions& env_options,
+    const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
+    std::shared_ptr<const TableProperties>* properties, bool no_io) {
+  Status s;
+  auto table_reader = fd.table_reader;
+  // table already been pre-loaded?
+  if (table_reader) {
+    *properties = table_reader->GetTableProperties();
+
+    return s;
+  }
+
+  Cache::Handle* table_handle = nullptr;
+  s = FindTable(env_options, internal_comparator, fd, &table_handle, no_io);
+  if (!s.ok()) {
+    return s;
+  }
+  assert(table_handle);
+  auto table = GetTableReaderFromHandle(table_handle);
+  *properties = table->GetTableProperties();
+  ReleaseHandle(table_handle);
+  return s;
+}
+
+size_t TableCache::GetMemoryUsageByTableReader(
+    const EnvOptions& env_options,
+    const InternalKeyComparator& internal_comparator,
+    const FileDescriptor& fd) {
+  Status s;
+  auto table_reader = fd.table_reader;
+  // table already been pre-loaded?
+  if (table_reader) {
+    return table_reader->ApproximateMemoryUsage();
+  }
+
+  Cache::Handle* table_handle = nullptr;
+  s = FindTable(env_options, internal_comparator, fd, &table_handle, true);
+  if (!s.ok()) {
+    return 0;
+  }
+  assert(table_handle);
+  auto table = GetTableReaderFromHandle(table_handle);
+  auto ret = table->ApproximateMemoryUsage();
+  ReleaseHandle(table_handle);
+  return ret;
+}
+
+void TableCache::Evict(Cache* cache, uint64_t file_number) {
+  cache->Erase(GetSliceForFileNumber(&file_number));
+}
+
+}  // namespace rocksdb

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/table_cache.h
----------------------------------------------------------------------
diff --git a/thirdparty/rocksdb/db/table_cache.h 
b/thirdparty/rocksdb/db/table_cache.h
new file mode 100644
index 0000000..8b65baf
--- /dev/null
+++ b/thirdparty/rocksdb/db/table_cache.h
@@ -0,0 +1,146 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// Thread-safe (provides internal synchronization)
+
+#pragma once
+#include <string>
+#include <vector>
+#include <stdint.h>
+
+#include "db/dbformat.h"
+#include "db/range_del_aggregator.h"
+#include "options/cf_options.h"
+#include "port/port.h"
+#include "rocksdb/cache.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "rocksdb/table.h"
+#include "table/table_reader.h"
+
+namespace rocksdb {
+
+class Env;
+class Arena;
+struct FileDescriptor;
+class GetContext;
+class HistogramImpl;
+class InternalIterator;
+
+class TableCache {
+ public:
+  TableCache(const ImmutableCFOptions& ioptions,
+             const EnvOptions& storage_options, Cache* cache);
+  ~TableCache();
+
+  // Return an iterator for the specified file number (the corresponding
+  // file length must be exactly "file_size" bytes).  If "tableptr" is
+  // non-nullptr, also sets "*tableptr" to point to the Table object
+  // underlying the returned iterator, or nullptr if no Table object underlies
+  // the returned iterator.  The returned "*tableptr" object is owned by
+  // the cache and should not be deleted, and is valid for as long as the
+  // returned iterator is live.
+  // @param range_del_agg If non-nullptr, adds range deletions to the
+  //    aggregator. If an error occurs, returns it in a 
NewErrorInternalIterator
+  // @param skip_filters Disables loading/accessing the filter block
+  // @param level The level this table is at, -1 for "not set / don't know"
+  InternalIterator* NewIterator(
+      const ReadOptions& options, const EnvOptions& toptions,
+      const InternalKeyComparator& internal_comparator,
+      const FileDescriptor& file_fd, RangeDelAggregator* range_del_agg,
+      TableReader** table_reader_ptr = nullptr,
+      HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,
+      Arena* arena = nullptr, bool skip_filters = false, int level = -1);
+
+  InternalIterator* NewRangeTombstoneIterator(
+      const ReadOptions& options, const EnvOptions& toptions,
+      const InternalKeyComparator& internal_comparator,
+      const FileDescriptor& file_fd, HistogramImpl* file_read_hist,
+      bool skip_filters, int level);
+
+  // If a seek to internal key "k" in specified file finds an entry,
+  // call (*handle_result)(arg, found_key, found_value) repeatedly until
+  // it returns false.
+  // @param get_context State for get operation. If its range_del_agg() returns
+  //    non-nullptr, adds range deletions to the aggregator. If an error 
occurs,
+  //    returns non-ok status.
+  // @param skip_filters Disables loading/accessing the filter block
+  // @param level The level this table is at, -1 for "not set / don't know"
+  Status Get(const ReadOptions& options,
+             const InternalKeyComparator& internal_comparator,
+             const FileDescriptor& file_fd, const Slice& k,
+             GetContext* get_context, HistogramImpl* file_read_hist = nullptr,
+             bool skip_filters = false, int level = -1);
+
+  // Evict any entry for the specified file number
+  static void Evict(Cache* cache, uint64_t file_number);
+
+  // Clean table handle and erase it from the table cache
+  // Used in DB close, or the file is not live anymore.
+  void EraseHandle(const FileDescriptor& fd, Cache::Handle* handle);
+
+  // Find table reader
+  // @param skip_filters Disables loading/accessing the filter block
+  // @param level == -1 means not specified
+  Status FindTable(const EnvOptions& toptions,
+                   const InternalKeyComparator& internal_comparator,
+                   const FileDescriptor& file_fd, Cache::Handle**,
+                   const bool no_io = false, bool record_read_stats = true,
+                   HistogramImpl* file_read_hist = nullptr,
+                   bool skip_filters = false, int level = -1,
+                   bool prefetch_index_and_filter_in_cache = true);
+
+  // Get TableReader from a cache handle.
+  TableReader* GetTableReaderFromHandle(Cache::Handle* handle);
+
+  // Get the table properties of a given table.
+  // @no_io: indicates if we should load table to the cache if it is not 
present
+  //         in table cache yet.
+  // @returns: `properties` will be reset on success. Please note that we will
+  //            return Status::Incomplete() if table is not present in cache 
and
+  //            we set `no_io` to be true.
+  Status GetTableProperties(const EnvOptions& toptions,
+                            const InternalKeyComparator& internal_comparator,
+                            const FileDescriptor& file_meta,
+                            std::shared_ptr<const TableProperties>* properties,
+                            bool no_io = false);
+
+  // Return total memory usage of the table reader of the file.
+  // 0 if table reader of the file is not loaded.
+  size_t GetMemoryUsageByTableReader(
+      const EnvOptions& toptions,
+      const InternalKeyComparator& internal_comparator,
+      const FileDescriptor& fd);
+
+  // Release the handle from a cache
+  void ReleaseHandle(Cache::Handle* handle);
+
+  // Capacity of the backing Cache that indicates inifinite TableCache 
capacity.
+  // For example when max_open_files is -1 we set the backing Cache to this.
+  static const int kInfiniteCapacity = 0x400000;
+
+ private:
+  // Build a table reader
+  Status GetTableReader(const EnvOptions& env_options,
+                        const InternalKeyComparator& internal_comparator,
+                        const FileDescriptor& fd, bool sequential_mode,
+                        size_t readahead, bool record_read_stats,
+                        HistogramImpl* file_read_hist,
+                        unique_ptr<TableReader>* table_reader,
+                        bool skip_filters = false, int level = -1,
+                        bool prefetch_index_and_filter_in_cache = true,
+                        bool for_compaction = false);
+
+  const ImmutableCFOptions& ioptions_;
+  const EnvOptions& env_options_;
+  Cache* const cache_;
+  std::string row_cache_id_;
+};
+
+}  // namespace rocksdb

Reply via email to