http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/range_del_aggregator.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/range_del_aggregator.cc b/thirdparty/rocksdb/db/range_del_aggregator.cc new file mode 100644 index 0000000..0aa5d22 --- /dev/null +++ b/thirdparty/rocksdb/db/range_del_aggregator.cc @@ -0,0 +1,519 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/range_del_aggregator.h" + +#include <algorithm> + +namespace rocksdb { + +RangeDelAggregator::RangeDelAggregator( + const InternalKeyComparator& icmp, + const std::vector<SequenceNumber>& snapshots, + bool collapse_deletions /* = true */) + : upper_bound_(kMaxSequenceNumber), + icmp_(icmp), + collapse_deletions_(collapse_deletions) { + InitRep(snapshots); +} + +RangeDelAggregator::RangeDelAggregator(const InternalKeyComparator& icmp, + SequenceNumber snapshot, + bool collapse_deletions /* = false */) + : upper_bound_(snapshot), + icmp_(icmp), + collapse_deletions_(collapse_deletions) {} + +void RangeDelAggregator::InitRep(const std::vector<SequenceNumber>& snapshots) { + assert(rep_ == nullptr); + rep_.reset(new Rep()); + for (auto snapshot : snapshots) { + rep_->stripe_map_.emplace( + snapshot, + PositionalTombstoneMap(TombstoneMap( + stl_wrappers::LessOfComparator(icmp_.user_comparator())))); + } + // Data newer than any snapshot falls in this catch-all stripe + rep_->stripe_map_.emplace( + kMaxSequenceNumber, + PositionalTombstoneMap(TombstoneMap( + stl_wrappers::LessOfComparator(icmp_.user_comparator())))); + rep_->pinned_iters_mgr_.StartPinning(); +} + +bool RangeDelAggregator::ShouldDelete( + const Slice& internal_key, RangeDelAggregator::RangePositioningMode mode) { + if (rep_ == nullptr) { + return false; + } + ParsedInternalKey parsed; + if (!ParseInternalKey(internal_key, &parsed)) { + assert(false); + } + return ShouldDelete(parsed, mode); +} + +bool RangeDelAggregator::ShouldDelete( + const ParsedInternalKey& parsed, + RangeDelAggregator::RangePositioningMode mode) { + assert(IsValueType(parsed.type)); + if (rep_ == nullptr) { + return false; + } + auto& positional_tombstone_map = GetPositionalTombstoneMap(parsed.sequence); + const auto& tombstone_map = positional_tombstone_map.raw_map; + if (tombstone_map.empty()) { + return false; + } + auto& tombstone_map_iter = positional_tombstone_map.iter; + if (tombstone_map_iter == tombstone_map.end() && + (mode == kForwardTraversal || mode == kBackwardTraversal)) { + // invalid (e.g., if AddTombstones() changed the deletions), so need to + // reseek + mode = kBinarySearch; + } + switch (mode) { + case kFullScan: + assert(!collapse_deletions_); + // The maintained state (PositionalTombstoneMap::iter) isn't useful when + // we linear scan from the beginning each time, but we maintain it anyways + // for consistency. + tombstone_map_iter = tombstone_map.begin(); + while (tombstone_map_iter != tombstone_map.end()) { + const auto& tombstone = tombstone_map_iter->second; + if (icmp_.user_comparator()->Compare(parsed.user_key, + tombstone.start_key_) < 0) { + break; + } + if (parsed.sequence < tombstone.seq_ && + icmp_.user_comparator()->Compare(parsed.user_key, + tombstone.end_key_) < 0) { + return true; + } + ++tombstone_map_iter; + } + return false; + case kForwardTraversal: + assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end()); + if (tombstone_map_iter == tombstone_map.begin() && + icmp_.user_comparator()->Compare(parsed.user_key, + tombstone_map_iter->first) < 0) { + // before start of deletion intervals + return false; + } + while (std::next(tombstone_map_iter) != tombstone_map.end() && + icmp_.user_comparator()->Compare( + std::next(tombstone_map_iter)->first, parsed.user_key) <= 0) { + ++tombstone_map_iter; + } + break; + case kBackwardTraversal: + assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end()); + while (tombstone_map_iter != tombstone_map.begin() && + icmp_.user_comparator()->Compare(parsed.user_key, + tombstone_map_iter->first) < 0) { + --tombstone_map_iter; + } + if (tombstone_map_iter == tombstone_map.begin() && + icmp_.user_comparator()->Compare(parsed.user_key, + tombstone_map_iter->first) < 0) { + // before start of deletion intervals + return false; + } + break; + case kBinarySearch: + assert(collapse_deletions_); + tombstone_map_iter = + tombstone_map.upper_bound(parsed.user_key); + if (tombstone_map_iter == tombstone_map.begin()) { + // before start of deletion intervals + return false; + } + --tombstone_map_iter; + break; + } + assert(mode != kFullScan); + assert(tombstone_map_iter != tombstone_map.end() && + icmp_.user_comparator()->Compare(tombstone_map_iter->first, + parsed.user_key) <= 0); + assert(std::next(tombstone_map_iter) == tombstone_map.end() || + icmp_.user_comparator()->Compare( + parsed.user_key, std::next(tombstone_map_iter)->first) < 0); + return parsed.sequence < tombstone_map_iter->second.seq_; +} + +bool RangeDelAggregator::ShouldAddTombstones( + bool bottommost_level /* = false */) { + // TODO(andrewkr): can we just open a file and throw it away if it ends up + // empty after AddToBuilder()? This function doesn't take into subcompaction + // boundaries so isn't completely accurate. + if (rep_ == nullptr) { + return false; + } + auto stripe_map_iter = rep_->stripe_map_.begin(); + assert(stripe_map_iter != rep_->stripe_map_.end()); + if (bottommost_level) { + // For the bottommost level, keys covered by tombstones in the first + // (oldest) stripe have been compacted away, so the tombstones are obsolete. + ++stripe_map_iter; + } + while (stripe_map_iter != rep_->stripe_map_.end()) { + if (!stripe_map_iter->second.raw_map.empty()) { + return true; + } + ++stripe_map_iter; + } + return false; +} + +Status RangeDelAggregator::AddTombstones( + std::unique_ptr<InternalIterator> input) { + if (input == nullptr) { + return Status::OK(); + } + input->SeekToFirst(); + bool first_iter = true; + while (input->Valid()) { + if (first_iter) { + if (rep_ == nullptr) { + InitRep({upper_bound_}); + } else { + InvalidateTombstoneMapPositions(); + } + first_iter = false; + } + ParsedInternalKey parsed_key; + if (!ParseInternalKey(input->key(), &parsed_key)) { + return Status::Corruption("Unable to parse range tombstone InternalKey"); + } + RangeTombstone tombstone(parsed_key, input->value()); + AddTombstone(std::move(tombstone)); + input->Next(); + } + if (!first_iter) { + rep_->pinned_iters_mgr_.PinIterator(input.release(), false /* arena */); + } + return Status::OK(); +} + +void RangeDelAggregator::InvalidateTombstoneMapPositions() { + if (rep_ == nullptr) { + return; + } + for (auto stripe_map_iter = rep_->stripe_map_.begin(); + stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) { + stripe_map_iter->second.iter = stripe_map_iter->second.raw_map.end(); + } +} + +Status RangeDelAggregator::AddTombstone(RangeTombstone tombstone) { + auto& positional_tombstone_map = GetPositionalTombstoneMap(tombstone.seq_); + auto& tombstone_map = positional_tombstone_map.raw_map; + if (collapse_deletions_) { + // In collapsed mode, we only fill the seq_ field in the TombstoneMap's + // values. The end_key is unneeded because we assume the tombstone extends + // until the next tombstone starts. For gaps between real tombstones and + // for the last real tombstone, we denote end keys by inserting fake + // tombstones with sequence number zero. + std::vector<RangeTombstone> new_range_dels{ + tombstone, RangeTombstone(tombstone.end_key_, Slice(), 0)}; + auto new_range_dels_iter = new_range_dels.begin(); + // Position at the first overlapping existing tombstone; if none exists, + // insert until we find an existing one overlapping a new point + const Slice* tombstone_map_begin = nullptr; + if (!tombstone_map.empty()) { + tombstone_map_begin = &tombstone_map.begin()->first; + } + auto last_range_dels_iter = new_range_dels_iter; + while (new_range_dels_iter != new_range_dels.end() && + (tombstone_map_begin == nullptr || + icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_, + *tombstone_map_begin) < 0)) { + tombstone_map.emplace( + new_range_dels_iter->start_key_, + RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_)); + last_range_dels_iter = new_range_dels_iter; + ++new_range_dels_iter; + } + if (new_range_dels_iter == new_range_dels.end()) { + return Status::OK(); + } + // above loop advances one too far + new_range_dels_iter = last_range_dels_iter; + auto tombstone_map_iter = + tombstone_map.upper_bound(new_range_dels_iter->start_key_); + // if nothing overlapped we would've already inserted all the new points + // and returned early + assert(tombstone_map_iter != tombstone_map.begin()); + tombstone_map_iter--; + + // untermed_seq is non-kMaxSequenceNumber when we covered an existing point + // but haven't seen its corresponding endpoint. It's used for (1) deciding + // whether to forcibly insert the new interval's endpoint; and (2) possibly + // raising the seqnum for the to-be-inserted element (we insert the max + // seqnum between the next new interval and the unterminated interval). + SequenceNumber untermed_seq = kMaxSequenceNumber; + while (tombstone_map_iter != tombstone_map.end() && + new_range_dels_iter != new_range_dels.end()) { + const Slice *tombstone_map_iter_end = nullptr, + *new_range_dels_iter_end = nullptr; + if (tombstone_map_iter != tombstone_map.end()) { + auto next_tombstone_map_iter = std::next(tombstone_map_iter); + if (next_tombstone_map_iter != tombstone_map.end()) { + tombstone_map_iter_end = &next_tombstone_map_iter->first; + } + } + if (new_range_dels_iter != new_range_dels.end()) { + auto next_new_range_dels_iter = std::next(new_range_dels_iter); + if (next_new_range_dels_iter != new_range_dels.end()) { + new_range_dels_iter_end = &next_new_range_dels_iter->start_key_; + } + } + + // our positions in existing/new tombstone collections should always + // overlap. The non-overlapping cases are handled above and below this + // loop. + assert(new_range_dels_iter_end == nullptr || + icmp_.user_comparator()->Compare(tombstone_map_iter->first, + *new_range_dels_iter_end) < 0); + assert(tombstone_map_iter_end == nullptr || + icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_, + *tombstone_map_iter_end) < 0); + + int new_to_old_start_cmp = icmp_.user_comparator()->Compare( + new_range_dels_iter->start_key_, tombstone_map_iter->first); + // nullptr end means extends infinitely rightwards, set new_to_old_end_cmp + // accordingly so we can use common code paths later. + int new_to_old_end_cmp; + if (new_range_dels_iter_end == nullptr && + tombstone_map_iter_end == nullptr) { + new_to_old_end_cmp = 0; + } else if (new_range_dels_iter_end == nullptr) { + new_to_old_end_cmp = 1; + } else if (tombstone_map_iter_end == nullptr) { + new_to_old_end_cmp = -1; + } else { + new_to_old_end_cmp = icmp_.user_comparator()->Compare( + *new_range_dels_iter_end, *tombstone_map_iter_end); + } + + if (new_to_old_start_cmp < 0) { + // the existing one's left endpoint comes after, so raise/delete it if + // it's covered. + if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { + untermed_seq = tombstone_map_iter->second.seq_; + if (tombstone_map_iter != tombstone_map.begin() && + std::prev(tombstone_map_iter)->second.seq_ == + new_range_dels_iter->seq_) { + tombstone_map_iter = tombstone_map.erase(tombstone_map_iter); + --tombstone_map_iter; + } else { + tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_; + } + } + } else if (new_to_old_start_cmp > 0) { + if (untermed_seq != kMaxSequenceNumber || + tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { + auto seq = tombstone_map_iter->second.seq_; + // need to adjust this element if not intended to span beyond the new + // element (i.e., was_tombstone_map_iter_raised == true), or if it + // can be raised + tombstone_map_iter = tombstone_map.emplace( + new_range_dels_iter->start_key_, + RangeTombstone( + Slice(), Slice(), + std::max( + untermed_seq == kMaxSequenceNumber ? 0 : untermed_seq, + new_range_dels_iter->seq_))); + untermed_seq = seq; + } + } else { + // their left endpoints coincide, so raise the existing one if needed + if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { + untermed_seq = tombstone_map_iter->second.seq_; + tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_; + } + } + + // advance whichever one ends earlier, or both if their right endpoints + // coincide + if (new_to_old_end_cmp < 0) { + ++new_range_dels_iter; + } else if (new_to_old_end_cmp > 0) { + ++tombstone_map_iter; + untermed_seq = kMaxSequenceNumber; + } else { + ++new_range_dels_iter; + ++tombstone_map_iter; + untermed_seq = kMaxSequenceNumber; + } + } + while (new_range_dels_iter != new_range_dels.end()) { + tombstone_map.emplace( + new_range_dels_iter->start_key_, + RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_)); + ++new_range_dels_iter; + } + } else { + tombstone_map.emplace(tombstone.start_key_, std::move(tombstone)); + } + return Status::OK(); +} + +RangeDelAggregator::PositionalTombstoneMap& +RangeDelAggregator::GetPositionalTombstoneMap(SequenceNumber seq) { + assert(rep_ != nullptr); + // The stripe includes seqnum for the snapshot above and excludes seqnum for + // the snapshot below. + StripeMap::iterator iter; + if (seq > 0) { + // upper_bound() checks strict inequality so need to subtract one + iter = rep_->stripe_map_.upper_bound(seq - 1); + } else { + iter = rep_->stripe_map_.begin(); + } + // catch-all stripe justifies this assertion in either of above cases + assert(iter != rep_->stripe_map_.end()); + return iter->second; +} + +// TODO(andrewkr): We should implement an iterator over range tombstones in our +// map. It'd enable compaction to open tables on-demand, i.e., only once range +// tombstones are known to be available, without the code duplication we have +// in ShouldAddTombstones(). It'll also allow us to move the table-modifying +// code into more coherent places: CompactionJob and BuildTable(). +void RangeDelAggregator::AddToBuilder( + TableBuilder* builder, const Slice* lower_bound, const Slice* upper_bound, + FileMetaData* meta, + CompactionIterationStats* range_del_out_stats /* = nullptr */, + bool bottommost_level /* = false */) { + if (rep_ == nullptr) { + return; + } + auto stripe_map_iter = rep_->stripe_map_.begin(); + assert(stripe_map_iter != rep_->stripe_map_.end()); + if (bottommost_level) { + // TODO(andrewkr): these are counted for each compaction output file, so + // lots of double-counting. + if (!stripe_map_iter->second.raw_map.empty()) { + range_del_out_stats->num_range_del_drop_obsolete += + static_cast<int64_t>(stripe_map_iter->second.raw_map.size()) - + (collapse_deletions_ ? 1 : 0); + range_del_out_stats->num_record_drop_obsolete += + static_cast<int64_t>(stripe_map_iter->second.raw_map.size()) - + (collapse_deletions_ ? 1 : 0); + } + // For the bottommost level, keys covered by tombstones in the first + // (oldest) stripe have been compacted away, so the tombstones are obsolete. + ++stripe_map_iter; + } + + // Note the order in which tombstones are stored is insignificant since we + // insert them into a std::map on the read path. + bool first_added = false; + while (stripe_map_iter != rep_->stripe_map_.end()) { + for (auto tombstone_map_iter = stripe_map_iter->second.raw_map.begin(); + tombstone_map_iter != stripe_map_iter->second.raw_map.end(); + ++tombstone_map_iter) { + RangeTombstone tombstone; + if (collapse_deletions_) { + auto next_tombstone_map_iter = std::next(tombstone_map_iter); + if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end() || + tombstone_map_iter->second.seq_ == 0) { + // it's a sentinel tombstone + continue; + } + tombstone.start_key_ = tombstone_map_iter->first; + tombstone.end_key_ = next_tombstone_map_iter->first; + tombstone.seq_ = tombstone_map_iter->second.seq_; + } else { + tombstone = tombstone_map_iter->second; + } + if (upper_bound != nullptr && + icmp_.user_comparator()->Compare(*upper_bound, + tombstone.start_key_) <= 0) { + // Tombstones starting at upper_bound or later only need to be included + // in the next table. Break because subsequent tombstones will start + // even later. + break; + } + if (lower_bound != nullptr && + icmp_.user_comparator()->Compare(tombstone.end_key_, + *lower_bound) <= 0) { + // Tombstones ending before or at lower_bound only need to be included + // in the prev table. Continue because subsequent tombstones may still + // overlap [lower_bound, upper_bound). + continue; + } + + auto ikey_and_end_key = tombstone.Serialize(); + builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second); + if (!first_added) { + first_added = true; + InternalKey smallest_candidate = std::move(ikey_and_end_key.first);; + if (lower_bound != nullptr && + icmp_.user_comparator()->Compare(smallest_candidate.user_key(), + *lower_bound) <= 0) { + // Pretend the smallest key has the same user key as lower_bound + // (the max key in the previous table or subcompaction) in order for + // files to appear key-space partitioned. + // + // Choose lowest seqnum so this file's smallest internal key comes + // after the previous file's/subcompaction's largest. The fake seqnum + // is OK because the read path's file-picking code only considers user + // key. + smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion); + } + if (meta->smallest.size() == 0 || + icmp_.Compare(smallest_candidate, meta->smallest) < 0) { + meta->smallest = std::move(smallest_candidate); + } + } + InternalKey largest_candidate = tombstone.SerializeEndKey(); + if (upper_bound != nullptr && + icmp_.user_comparator()->Compare(*upper_bound, + largest_candidate.user_key()) <= 0) { + // Pretend the largest key has the same user key as upper_bound (the + // min key in the following table or subcompaction) in order for files + // to appear key-space partitioned. + // + // Choose highest seqnum so this file's largest internal key comes + // before the next file's/subcompaction's smallest. The fake seqnum is + // OK because the read path's file-picking code only considers the user + // key portion. + // + // Note Seek() also creates InternalKey with (user_key, + // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of + // kTypeRangeDeletion (0xF), so the range tombstone comes before the + // Seek() key in InternalKey's ordering. So Seek() will look in the + // next file for the user key. + largest_candidate = InternalKey(*upper_bound, kMaxSequenceNumber, + kTypeRangeDeletion); + } + if (meta->largest.size() == 0 || + icmp_.Compare(meta->largest, largest_candidate) < 0) { + meta->largest = std::move(largest_candidate); + } + meta->smallest_seqno = std::min(meta->smallest_seqno, tombstone.seq_); + meta->largest_seqno = std::max(meta->largest_seqno, tombstone.seq_); + } + ++stripe_map_iter; + } +} + +bool RangeDelAggregator::IsEmpty() { + if (rep_ == nullptr) { + return true; + } + for (auto stripe_map_iter = rep_->stripe_map_.begin(); + stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) { + if (!stripe_map_iter->second.raw_map.empty()) { + return false; + } + } + return true; +} + +} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/range_del_aggregator.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/range_del_aggregator.h b/thirdparty/rocksdb/db/range_del_aggregator.h new file mode 100644 index 0000000..9d4b8ca --- /dev/null +++ b/thirdparty/rocksdb/db/range_del_aggregator.h @@ -0,0 +1,161 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <map> +#include <string> +#include <vector> + +#include "db/compaction_iteration_stats.h" +#include "db/dbformat.h" +#include "db/pinned_iterators_manager.h" +#include "db/version_edit.h" +#include "include/rocksdb/comparator.h" +#include "include/rocksdb/types.h" +#include "table/internal_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/table_builder.h" +#include "util/kv_map.h" + +namespace rocksdb { + +// A RangeDelAggregator aggregates range deletion tombstones as they are +// encountered in memtables/SST files. It provides methods that check whether a +// key is covered by range tombstones or write the relevant tombstones to a new +// SST file. +class RangeDelAggregator { + public: + // @param snapshots These are used to organize the tombstones into snapshot + // stripes, which is the seqnum range between consecutive snapshots, + // including the higher snapshot and excluding the lower one. Currently, + // this is used by ShouldDelete() to prevent deletion of keys that are + // covered by range tombstones in other snapshot stripes. This constructor + // is used for writes (flush/compaction). All DB snapshots are provided + // such that no keys are removed that are uncovered according to any DB + // snapshot. + // Note this overload does not lazily initialize Rep. + RangeDelAggregator(const InternalKeyComparator& icmp, + const std::vector<SequenceNumber>& snapshots, + bool collapse_deletions = true); + + // @param upper_bound Similar to snapshots above, except with a single + // snapshot, which allows us to store the snapshot on the stack and defer + // initialization of heap-allocating members (in Rep) until the first range + // deletion is encountered. This constructor is used in case of reads (get/ + // iterator), for which only the user snapshot (upper_bound) is provided + // such that the seqnum space is divided into two stripes. Only the older + // stripe will be used by ShouldDelete(). + RangeDelAggregator(const InternalKeyComparator& icmp, + SequenceNumber upper_bound, + bool collapse_deletions = false); + + // We maintain position in the tombstone map across calls to ShouldDelete. The + // caller may wish to specify a mode to optimize positioning the iterator + // during the next call to ShouldDelete. The non-kFullScan modes are only + // available when deletion collapsing is enabled. + // + // For example, if we invoke Next() on an iterator, kForwardTraversal should + // be specified to advance one-by-one through deletions until one is found + // with its interval containing the key. This will typically be faster than + // doing a full binary search (kBinarySearch). + enum RangePositioningMode { + kFullScan, // used iff collapse_deletions_ == false + kForwardTraversal, + kBackwardTraversal, + kBinarySearch, + }; + + // Returns whether the key should be deleted, which is the case when it is + // covered by a range tombstone residing in the same snapshot stripe. + // @param mode If collapse_deletions_ is true, this dictates how we will find + // the deletion whose interval contains this key. Otherwise, its + // value must be kFullScan indicating linear scan from beginning.. + bool ShouldDelete(const ParsedInternalKey& parsed, + RangePositioningMode mode = kFullScan); + bool ShouldDelete(const Slice& internal_key, + RangePositioningMode mode = kFullScan); + bool ShouldAddTombstones(bool bottommost_level = false); + + // Adds tombstones to the tombstone aggregation structure maintained by this + // object. + // @return non-OK status if any of the tombstone keys are corrupted. + Status AddTombstones(std::unique_ptr<InternalIterator> input); + + // Resets iterators maintained across calls to ShouldDelete(). This may be + // called when the tombstones change, or the owner may call explicitly, e.g., + // if it's an iterator that just seeked to an arbitrary position. The effect + // of invalidation is that the following call to ShouldDelete() will binary + // search for its tombstone. + void InvalidateTombstoneMapPositions(); + + // Writes tombstones covering a range to a table builder. + // @param extend_before_min_key If true, the range of tombstones to be added + // to the TableBuilder starts from the beginning of the key-range; + // otherwise, it starts from meta->smallest. + // @param lower_bound/upper_bound Any range deletion with [start_key, end_key) + // that overlaps the target range [*lower_bound, *upper_bound) is added to + // the builder. If lower_bound is nullptr, the target range extends + // infinitely to the left. If upper_bound is nullptr, the target range + // extends infinitely to the right. If both are nullptr, the target range + // extends infinitely in both directions, i.e., all range deletions are + // added to the builder. + // @param meta The file's metadata. We modify the begin and end keys according + // to the range tombstones added to this file such that the read path does + // not miss range tombstones that cover gaps before/after/between files in + // a level. lower_bound/upper_bound above constrain how far file boundaries + // can be extended. + // @param bottommost_level If true, we will filter out any tombstones + // belonging to the oldest snapshot stripe, because all keys potentially + // covered by this tombstone are guaranteed to have been deleted by + // compaction. + void AddToBuilder(TableBuilder* builder, const Slice* lower_bound, + const Slice* upper_bound, FileMetaData* meta, + CompactionIterationStats* range_del_out_stats = nullptr, + bool bottommost_level = false); + bool IsEmpty(); + + private: + // Maps tombstone user start key -> tombstone object + typedef std::multimap<Slice, RangeTombstone, stl_wrappers::LessOfComparator> + TombstoneMap; + // Also maintains position in TombstoneMap last seen by ShouldDelete(). The + // end iterator indicates invalidation (e.g., if AddTombstones() changes the + // underlying map). End iterator cannot be invalidated. + struct PositionalTombstoneMap { + explicit PositionalTombstoneMap(TombstoneMap _raw_map) + : raw_map(std::move(_raw_map)), iter(raw_map.end()) {} + PositionalTombstoneMap(const PositionalTombstoneMap&) = delete; + PositionalTombstoneMap(PositionalTombstoneMap&& other) + : raw_map(std::move(other.raw_map)), iter(raw_map.end()) {} + + TombstoneMap raw_map; + TombstoneMap::const_iterator iter; + }; + + // Maps snapshot seqnum -> map of tombstones that fall in that stripe, i.e., + // their seqnums are greater than the next smaller snapshot's seqnum. + typedef std::map<SequenceNumber, PositionalTombstoneMap> StripeMap; + + struct Rep { + StripeMap stripe_map_; + PinnedIteratorsManager pinned_iters_mgr_; + }; + // Initializes rep_ lazily. This aggregator object is constructed for every + // read, so expensive members should only be created when necessary, i.e., + // once the first range deletion is encountered. + void InitRep(const std::vector<SequenceNumber>& snapshots); + + PositionalTombstoneMap& GetPositionalTombstoneMap(SequenceNumber seq); + Status AddTombstone(RangeTombstone tombstone); + + SequenceNumber upper_bound_; + std::unique_ptr<Rep> rep_; + const InternalKeyComparator& icmp_; + // collapse range deletions so they're binary searchable + const bool collapse_deletions_; +}; + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/repair.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/repair.cc b/thirdparty/rocksdb/db/repair.cc new file mode 100644 index 0000000..9ed3260 --- /dev/null +++ b/thirdparty/rocksdb/db/repair.cc @@ -0,0 +1,650 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Repairer does best effort recovery to recover as much data as possible after +// a disaster without compromising consistency. It does not guarantee bringing +// the database to a time consistent state. +// +// Repair process is broken into 4 phases: +// (a) Find files +// (b) Convert logs to tables +// (c) Extract metadata +// (d) Write Descriptor +// +// (a) Find files +// +// The repairer goes through all the files in the directory, and classifies them +// based on their file name. Any file that cannot be identified by name will be +// ignored. +// +// (b) Convert logs to table +// +// Every log file that is active is replayed. All sections of the file where the +// checksum does not match is skipped over. We intentionally give preference to +// data consistency. +// +// (c) Extract metadata +// +// We scan every table to compute +// (1) smallest/largest for the table +// (2) largest sequence number in the table +// +// If we are unable to scan the file, then we ignore the table. +// +// (d) Write Descriptor +// +// We generate descriptor contents: +// - log number is set to zero +// - next-file-number is set to 1 + largest file number we found +// - last-sequence-number is set to largest sequence# found across +// all tables (see 2c) +// - compaction pointers are cleared +// - every table file is added at level 0 +// +// Possible optimization 1: +// (a) Compute total size and use to pick appropriate max-level M +// (b) Sort tables by largest sequence# in the table +// (c) For each table: if it overlaps earlier table, place in level-0, +// else place in level-M. +// (d) We can provide options for time consistent recovery and unsafe recovery +// (ignore checksum failure when applicable) +// Possible optimization 2: +// Store per-table metadata (smallest, largest, largest-seq#, ...) +// in the table's meta section to speed up ScanTable. + +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include <inttypes.h> +#include "db/builder.h" +#include "db/db_impl.h" +#include "db/dbformat.h" +#include "db/log_reader.h" +#include "db/log_writer.h" +#include "db/memtable.h" +#include "db/table_cache.h" +#include "db/version_edit.h" +#include "db/write_batch_internal.h" +#include "options/cf_options.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/write_buffer_manager.h" +#include "table/scoped_arena_iterator.h" +#include "util/file_reader_writer.h" +#include "util/filename.h" +#include "util/string_util.h" + +namespace rocksdb { + +namespace { + +class Repairer { + public: + Repairer(const std::string& dbname, const DBOptions& db_options, + const std::vector<ColumnFamilyDescriptor>& column_families, + const ColumnFamilyOptions& default_cf_opts, + const ColumnFamilyOptions& unknown_cf_opts, bool create_unknown_cfs) + : dbname_(dbname), + env_(db_options.env), + env_options_(), + db_options_(SanitizeOptions(dbname_, db_options)), + immutable_db_options_(db_options_), + icmp_(default_cf_opts.comparator), + default_cf_opts_(default_cf_opts), + default_cf_iopts_( + ImmutableCFOptions(immutable_db_options_, default_cf_opts)), + unknown_cf_opts_(unknown_cf_opts), + create_unknown_cfs_(create_unknown_cfs), + raw_table_cache_( + // TableCache can be small since we expect each table to be opened + // once. + NewLRUCache(10, db_options_.table_cache_numshardbits)), + table_cache_(new TableCache(default_cf_iopts_, env_options_, + raw_table_cache_.get())), + wb_(db_options_.db_write_buffer_size), + wc_(db_options_.delayed_write_rate), + vset_(dbname_, &immutable_db_options_, env_options_, + raw_table_cache_.get(), &wb_, &wc_), + next_file_number_(1) { + for (const auto& cfd : column_families) { + cf_name_to_opts_[cfd.name] = cfd.options; + } + } + + const ColumnFamilyOptions* GetColumnFamilyOptions( + const std::string& cf_name) { + if (cf_name_to_opts_.find(cf_name) == cf_name_to_opts_.end()) { + if (create_unknown_cfs_) { + return &unknown_cf_opts_; + } + return nullptr; + } + return &cf_name_to_opts_[cf_name]; + } + + // Adds a column family to the VersionSet with cf_options_ and updates + // manifest. + Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) { + const auto* cf_opts = GetColumnFamilyOptions(cf_name); + if (cf_opts == nullptr) { + return Status::Corruption("Encountered unknown column family with name=" + + cf_name + ", id=" + ToString(cf_id)); + } + Options opts(db_options_, *cf_opts); + MutableCFOptions mut_cf_opts(opts); + + VersionEdit edit; + edit.SetComparatorName(opts.comparator->Name()); + edit.SetLogNumber(0); + edit.SetColumnFamily(cf_id); + ColumnFamilyData* cfd; + cfd = nullptr; + edit.AddColumnFamily(cf_name); + + mutex_.Lock(); + Status status = vset_.LogAndApply(cfd, mut_cf_opts, &edit, &mutex_, + nullptr /* db_directory */, + false /* new_descriptor_log */, cf_opts); + mutex_.Unlock(); + return status; + } + + ~Repairer() { + delete table_cache_; + } + + Status Run() { + Status status = FindFiles(); + if (status.ok()) { + // Discard older manifests and start a fresh one + for (size_t i = 0; i < manifests_.size(); i++) { + ArchiveFile(dbname_ + "/" + manifests_[i]); + } + // Just create a DBImpl temporarily so we can reuse NewDB() + DBImpl* db_impl = new DBImpl(db_options_, dbname_); + status = db_impl->NewDB(); + delete db_impl; + } + + if (status.ok()) { + // Recover using the fresh manifest created by NewDB() + status = + vset_.Recover({{kDefaultColumnFamilyName, default_cf_opts_}}, false); + } + if (status.ok()) { + // Need to scan existing SST files first so the column families are + // created before we process WAL files + ExtractMetaData(); + + // ExtractMetaData() uses table_fds_ to know which SST files' metadata to + // extract -- we need to clear it here since metadata for existing SST + // files has been extracted already + table_fds_.clear(); + ConvertLogFilesToTables(); + ExtractMetaData(); + status = AddTables(); + } + if (status.ok()) { + uint64_t bytes = 0; + for (size_t i = 0; i < tables_.size(); i++) { + bytes += tables_[i].meta.fd.GetFileSize(); + } + ROCKS_LOG_WARN(db_options_.info_log, + "**** Repaired rocksdb %s; " + "recovered %" ROCKSDB_PRIszt " files; %" PRIu64 + "bytes. " + "Some data may have been lost. " + "****", + dbname_.c_str(), tables_.size(), bytes); + } + return status; + } + + private: + struct TableInfo { + FileMetaData meta; + uint32_t column_family_id; + std::string column_family_name; + SequenceNumber min_sequence; + SequenceNumber max_sequence; + }; + + std::string const dbname_; + Env* const env_; + const EnvOptions env_options_; + const DBOptions db_options_; + const ImmutableDBOptions immutable_db_options_; + const InternalKeyComparator icmp_; + const ColumnFamilyOptions default_cf_opts_; + const ImmutableCFOptions default_cf_iopts_; // table_cache_ holds reference + const ColumnFamilyOptions unknown_cf_opts_; + const bool create_unknown_cfs_; + std::shared_ptr<Cache> raw_table_cache_; + TableCache* table_cache_; + WriteBufferManager wb_; + WriteController wc_; + VersionSet vset_; + std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_opts_; + InstrumentedMutex mutex_; + + std::vector<std::string> manifests_; + std::vector<FileDescriptor> table_fds_; + std::vector<uint64_t> logs_; + std::vector<TableInfo> tables_; + uint64_t next_file_number_; + + Status FindFiles() { + std::vector<std::string> filenames; + bool found_file = false; + std::vector<std::string> to_search_paths; + + for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) { + to_search_paths.push_back(db_options_.db_paths[path_id].path); + } + + // search wal_dir if user uses a customize wal_dir + if (!db_options_.wal_dir.empty() && + db_options_.wal_dir != dbname_) { + to_search_paths.push_back(db_options_.wal_dir); + } + + for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) { + Status status = + env_->GetChildren(to_search_paths[path_id], &filenames); + if (!status.ok()) { + return status; + } + if (!filenames.empty()) { + found_file = true; + } + + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { + if (type == kDescriptorFile) { + manifests_.push_back(filenames[i]); + } else { + if (number + 1 > next_file_number_) { + next_file_number_ = number + 1; + } + if (type == kLogFile) { + logs_.push_back(number); + } else if (type == kTableFile) { + table_fds_.emplace_back(number, static_cast<uint32_t>(path_id), + 0); + } else { + // Ignore other files + } + } + } + } + } + if (!found_file) { + return Status::Corruption(dbname_, "repair found no files"); + } + return Status::OK(); + } + + void ConvertLogFilesToTables() { + for (size_t i = 0; i < logs_.size(); i++) { + // we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option. + std::string logname = LogFileName(db_options_.wal_dir, logs_[i]); + Status status = ConvertLogToTable(logs_[i]); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Log #%" PRIu64 ": ignoring conversion error: %s", + logs_[i], status.ToString().c_str()); + } + ArchiveFile(logname); + } + } + + Status ConvertLogToTable(uint64_t log) { + struct LogReporter : public log::Reader::Reporter { + Env* env; + std::shared_ptr<Logger> info_log; + uint64_t lognum; + virtual void Corruption(size_t bytes, const Status& s) override { + // We print error messages for corruption, but continue repairing. + ROCKS_LOG_ERROR(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s", + lognum, static_cast<int>(bytes), s.ToString().c_str()); + } + }; + + // Open the log file + std::string logname = LogFileName(db_options_.wal_dir, log); + unique_ptr<SequentialFile> lfile; + Status status = env_->NewSequentialFile( + logname, &lfile, env_->OptimizeForLogRead(env_options_)); + if (!status.ok()) { + return status; + } + unique_ptr<SequentialFileReader> lfile_reader( + new SequentialFileReader(std::move(lfile))); + + // Create the log reader. + LogReporter reporter; + reporter.env = env_; + reporter.info_log = db_options_.info_log; + reporter.lognum = log; + // We intentionally make log::Reader do checksumming so that + // corruptions cause entire commits to be skipped instead of + // propagating bad information (like overly large sequence + // numbers). + log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, + true /*enable checksum*/, 0 /*initial_offset*/, log); + + // Initialize per-column family memtables + for (auto* cfd : *vset_.GetColumnFamilySet()) { + cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + } + auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet()); + + // Read all the records and add to a memtable + std::string scratch; + Slice record; + WriteBatch batch; + int counter = 0; + while (reader.ReadRecord(&record, &scratch)) { + if (record.size() < WriteBatchInternal::kHeader) { + reporter.Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } + WriteBatchInternal::SetContents(&batch, record); + status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr); + if (status.ok()) { + counter += WriteBatchInternal::Count(&batch); + } else { + ROCKS_LOG_WARN(db_options_.info_log, "Log #%" PRIu64 ": ignoring %s", + log, status.ToString().c_str()); + status = Status::OK(); // Keep going with rest of file + } + } + + // Dump a table for each column family with entries in this log file. + for (auto* cfd : *vset_.GetColumnFamilySet()) { + // Do not record a version edit for this conversion to a Table + // since ExtractMetaData() will also generate edits. + MemTable* mem = cfd->mem(); + if (mem->IsEmpty()) { + continue; + } + + FileMetaData meta; + meta.fd = FileDescriptor(next_file_number_++, 0, 0); + ReadOptions ro; + ro.total_order_seek = true; + Arena arena; + ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); + EnvOptions optimized_env_options = + env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); + + int64_t _current_time = 0; + status = env_->GetCurrentTime(&_current_time); // ignore error + const uint64_t current_time = static_cast<uint64_t>(_current_time); + + status = BuildTable( + dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), + optimized_env_options, table_cache_, iter.get(), + std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)), + &meta, cfd->internal_comparator(), + cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), + {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, + nullptr /* internal_stats */, TableFileCreationReason::kRecovery, + nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, + nullptr /* table_properties */, -1 /* level */, current_time); + ROCKS_LOG_INFO(db_options_.info_log, + "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", + log, counter, meta.fd.GetNumber(), + status.ToString().c_str()); + if (status.ok()) { + if (meta.fd.GetFileSize() > 0) { + table_fds_.push_back(meta.fd); + } + } else { + break; + } + } + delete cf_mems; + return status; + } + + void ExtractMetaData() { + for (size_t i = 0; i < table_fds_.size(); i++) { + TableInfo t; + t.meta.fd = table_fds_[i]; + Status status = ScanTable(&t); + if (!status.ok()) { + std::string fname = TableFileName( + db_options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); + char file_num_buf[kFormatFileNumberBufSize]; + FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), + file_num_buf, sizeof(file_num_buf)); + ROCKS_LOG_WARN(db_options_.info_log, "Table #%s: ignoring %s", + file_num_buf, status.ToString().c_str()); + ArchiveFile(fname); + } else { + tables_.push_back(t); + } + } + } + + Status ScanTable(TableInfo* t) { + std::string fname = TableFileName( + db_options_.db_paths, t->meta.fd.GetNumber(), t->meta.fd.GetPathId()); + int counter = 0; + uint64_t file_size; + Status status = env_->GetFileSize(fname, &file_size); + t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(), + file_size); + std::shared_ptr<const TableProperties> props; + if (status.ok()) { + status = table_cache_->GetTableProperties(env_options_, icmp_, t->meta.fd, + &props); + } + if (status.ok()) { + t->column_family_id = static_cast<uint32_t>(props->column_family_id); + if (t->column_family_id == + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) { + ROCKS_LOG_WARN( + db_options_.info_log, + "Table #%" PRIu64 + ": column family unknown (probably due to legacy format); " + "adding to default column family id 0.", + t->meta.fd.GetNumber()); + t->column_family_id = 0; + } + + if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) == + nullptr) { + status = + AddColumnFamily(props->column_family_name, t->column_family_id); + } + } + ColumnFamilyData* cfd = nullptr; + if (status.ok()) { + cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id); + if (cfd->GetName() != props->column_family_name) { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Table #%" PRIu64 + ": inconsistent column family name '%s'; expected '%s' for column " + "family id %" PRIu32 ".", + t->meta.fd.GetNumber(), props->column_family_name.c_str(), + cfd->GetName().c_str(), t->column_family_id); + status = Status::Corruption(dbname_, "inconsistent column family name"); + } + } + if (status.ok()) { + InternalIterator* iter = table_cache_->NewIterator( + ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd, + nullptr /* range_del_agg */); + bool empty = true; + ParsedInternalKey parsed; + t->min_sequence = 0; + t->max_sequence = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + Slice key = iter->key(); + if (!ParseInternalKey(key, &parsed)) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Table #%" PRIu64 ": unparsable key %s", + t->meta.fd.GetNumber(), EscapeString(key).c_str()); + continue; + } + + counter++; + if (empty) { + empty = false; + t->meta.smallest.DecodeFrom(key); + t->min_sequence = parsed.sequence; + } + t->meta.largest.DecodeFrom(key); + if (parsed.sequence < t->min_sequence) { + t->min_sequence = parsed.sequence; + } + if (parsed.sequence > t->max_sequence) { + t->max_sequence = parsed.sequence; + } + } + if (!iter->status().ok()) { + status = iter->status(); + } + delete iter; + + ROCKS_LOG_INFO(db_options_.info_log, "Table #%" PRIu64 ": %d entries %s", + t->meta.fd.GetNumber(), counter, + status.ToString().c_str()); + } + return status; + } + + Status AddTables() { + std::unordered_map<uint32_t, std::vector<const TableInfo*>> cf_id_to_tables; + SequenceNumber max_sequence = 0; + for (size_t i = 0; i < tables_.size(); i++) { + cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]); + if (max_sequence < tables_[i].max_sequence) { + max_sequence = tables_[i].max_sequence; + } + } + vset_.SetLastToBeWrittenSequence(max_sequence); + vset_.SetLastSequence(max_sequence); + + for (const auto& cf_id_and_tables : cf_id_to_tables) { + auto* cfd = + vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first); + VersionEdit edit; + edit.SetComparatorName(cfd->user_comparator()->Name()); + edit.SetLogNumber(0); + edit.SetNextFile(next_file_number_); + edit.SetColumnFamily(cfd->GetID()); + + // TODO(opt): separate out into multiple levels + for (const auto* table : cf_id_and_tables.second) { + edit.AddFile(0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(), + table->meta.fd.GetFileSize(), table->meta.smallest, + table->meta.largest, table->min_sequence, + table->max_sequence, table->meta.marked_for_compaction); + } + mutex_.Lock(); + Status status = vset_.LogAndApply( + cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, + nullptr /* db_directory */, false /* new_descriptor_log */); + mutex_.Unlock(); + if (!status.ok()) { + return status; + } + } + return Status::OK(); + } + + void ArchiveFile(const std::string& fname) { + // Move into another directory. E.g., for + // dir/foo + // rename to + // dir/lost/foo + const char* slash = strrchr(fname.c_str(), '/'); + std::string new_dir; + if (slash != nullptr) { + new_dir.assign(fname.data(), slash - fname.data()); + } + new_dir.append("/lost"); + env_->CreateDir(new_dir); // Ignore error + std::string new_file = new_dir; + new_file.append("/"); + new_file.append((slash == nullptr) ? fname.c_str() : slash + 1); + Status s = env_->RenameFile(fname, new_file); + ROCKS_LOG_INFO(db_options_.info_log, "Archiving %s: %s\n", fname.c_str(), + s.ToString().c_str()); + } +}; + +Status GetDefaultCFOptions( + const std::vector<ColumnFamilyDescriptor>& column_families, + ColumnFamilyOptions* res) { + assert(res != nullptr); + auto iter = std::find_if(column_families.begin(), column_families.end(), + [](const ColumnFamilyDescriptor& cfd) { + return cfd.name == kDefaultColumnFamilyName; + }); + if (iter == column_families.end()) { + return Status::InvalidArgument( + "column_families", "Must contain entry for default column family"); + } + *res = iter->options; + return Status::OK(); +} +} // anonymous namespace + +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector<ColumnFamilyDescriptor>& column_families) { + ColumnFamilyOptions default_cf_opts; + Status status = GetDefaultCFOptions(column_families, &default_cf_opts); + if (status.ok()) { + Repairer repairer(dbname, db_options, column_families, default_cf_opts, + ColumnFamilyOptions() /* unknown_cf_opts */, + false /* create_unknown_cfs */); + status = repairer.Run(); + } + return status; +} + +Status RepairDB(const std::string& dbname, const DBOptions& db_options, + const std::vector<ColumnFamilyDescriptor>& column_families, + const ColumnFamilyOptions& unknown_cf_opts) { + ColumnFamilyOptions default_cf_opts; + Status status = GetDefaultCFOptions(column_families, &default_cf_opts); + if (status.ok()) { + Repairer repairer(dbname, db_options, column_families, default_cf_opts, + unknown_cf_opts, true /* create_unknown_cfs */); + status = repairer.Run(); + } + return status; +} + +Status RepairDB(const std::string& dbname, const Options& options) { + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */, + cf_options /* unknown_cf_opts */, + true /* create_unknown_cfs */); + return repairer.Run(); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/snapshot_impl.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/snapshot_impl.cc b/thirdparty/rocksdb/db/snapshot_impl.cc new file mode 100644 index 0000000..032ef39 --- /dev/null +++ b/thirdparty/rocksdb/db/snapshot_impl.cc @@ -0,0 +1,26 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/snapshot.h" + +#include "rocksdb/db.h" + +namespace rocksdb { + +ManagedSnapshot::ManagedSnapshot(DB* db) : db_(db), + snapshot_(db->GetSnapshot()) {} + +ManagedSnapshot::ManagedSnapshot(DB* db, const Snapshot* _snapshot) + : db_(db), snapshot_(_snapshot) {} + +ManagedSnapshot::~ManagedSnapshot() { + if (snapshot_) { + db_->ReleaseSnapshot(snapshot_); + } +} + +const Snapshot* ManagedSnapshot::snapshot() { return snapshot_;} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/snapshot_impl.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/snapshot_impl.h b/thirdparty/rocksdb/db/snapshot_impl.h new file mode 100644 index 0000000..b94602f --- /dev/null +++ b/thirdparty/rocksdb/db/snapshot_impl.h @@ -0,0 +1,130 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include <vector> + +#include "rocksdb/db.h" + +namespace rocksdb { + +class SnapshotList; + +// Snapshots are kept in a doubly-linked list in the DB. +// Each SnapshotImpl corresponds to a particular sequence number. +class SnapshotImpl : public Snapshot { + public: + SequenceNumber number_; // const after creation + + virtual SequenceNumber GetSequenceNumber() const override { return number_; } + + private: + friend class SnapshotList; + + // SnapshotImpl is kept in a doubly-linked circular list + SnapshotImpl* prev_; + SnapshotImpl* next_; + + SnapshotList* list_; // just for sanity checks + + int64_t unix_time_; + + // Will this snapshot be used by a Transaction to do write-conflict checking? + bool is_write_conflict_boundary_; +}; + +class SnapshotList { + public: + SnapshotList() { + list_.prev_ = &list_; + list_.next_ = &list_; + list_.number_ = 0xFFFFFFFFL; // placeholder marker, for debugging + count_ = 0; + } + + bool empty() const { return list_.next_ == &list_; } + SnapshotImpl* oldest() const { assert(!empty()); return list_.next_; } + SnapshotImpl* newest() const { assert(!empty()); return list_.prev_; } + + const SnapshotImpl* New(SnapshotImpl* s, SequenceNumber seq, + uint64_t unix_time, bool is_write_conflict_boundary) { + s->number_ = seq; + s->unix_time_ = unix_time; + s->is_write_conflict_boundary_ = is_write_conflict_boundary; + s->list_ = this; + s->next_ = &list_; + s->prev_ = list_.prev_; + s->prev_->next_ = s; + s->next_->prev_ = s; + count_++; + return s; + } + + // Do not responsible to free the object. + void Delete(const SnapshotImpl* s) { + assert(s->list_ == this); + s->prev_->next_ = s->next_; + s->next_->prev_ = s->prev_; + count_--; + } + + // retrieve all snapshot numbers. They are sorted in ascending order. + std::vector<SequenceNumber> GetAll( + SequenceNumber* oldest_write_conflict_snapshot = nullptr) { + std::vector<SequenceNumber> ret; + + if (oldest_write_conflict_snapshot != nullptr) { + *oldest_write_conflict_snapshot = kMaxSequenceNumber; + } + + if (empty()) { + return ret; + } + SnapshotImpl* s = &list_; + while (s->next_ != &list_) { + ret.push_back(s->next_->number_); + + if (oldest_write_conflict_snapshot != nullptr && + *oldest_write_conflict_snapshot == kMaxSequenceNumber && + s->next_->is_write_conflict_boundary_) { + // If this is the first write-conflict boundary snapshot in the list, + // it is the oldest + *oldest_write_conflict_snapshot = s->next_->number_; + } + + s = s->next_; + } + return ret; + } + + // get the sequence number of the most recent snapshot + SequenceNumber GetNewest() { + if (empty()) { + return 0; + } + return newest()->number_; + } + + int64_t GetOldestSnapshotTime() const { + if (empty()) { + return 0; + } else { + return oldest()->unix_time_; + } + } + + uint64_t count() const { return count_; } + + private: + // Dummy head of doubly-linked list of snapshots + SnapshotImpl list_; + uint64_t count_; +}; + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/table_cache.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/table_cache.cc b/thirdparty/rocksdb/db/table_cache.cc new file mode 100644 index 0000000..b4d5cc1 --- /dev/null +++ b/thirdparty/rocksdb/db/table_cache.cc @@ -0,0 +1,472 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/table_cache.h" + +#include "db/dbformat.h" +#include "db/version_edit.h" +#include "util/filename.h" + +#include "monitoring/perf_context_imp.h" +#include "rocksdb/statistics.h" +#include "table/get_context.h" +#include "table/internal_iterator.h" +#include "table/iterator_wrapper.h" +#include "table/table_builder.h" +#include "table/table_reader.h" +#include "util/coding.h" +#include "util/file_reader_writer.h" +#include "util/stop_watch.h" +#include "util/sync_point.h" + +namespace rocksdb { + +namespace { + +template <class T> +static void DeleteEntry(const Slice& key, void* value) { + T* typed_value = reinterpret_cast<T*>(value); + delete typed_value; +} + +static void UnrefEntry(void* arg1, void* arg2) { + Cache* cache = reinterpret_cast<Cache*>(arg1); + Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2); + cache->Release(h); +} + +static void DeleteTableReader(void* arg1, void* arg2) { + TableReader* table_reader = reinterpret_cast<TableReader*>(arg1); + delete table_reader; +} + +static Slice GetSliceForFileNumber(const uint64_t* file_number) { + return Slice(reinterpret_cast<const char*>(file_number), + sizeof(*file_number)); +} + +#ifndef ROCKSDB_LITE + +void AppendVarint64(IterKey* key, uint64_t v) { + char buf[10]; + auto ptr = EncodeVarint64(buf, v); + key->TrimAppend(key->Size(), buf, ptr - buf); +} + +#endif // ROCKSDB_LITE + +} // namespace + +TableCache::TableCache(const ImmutableCFOptions& ioptions, + const EnvOptions& env_options, Cache* const cache) + : ioptions_(ioptions), env_options_(env_options), cache_(cache) { + if (ioptions_.row_cache) { + // If the same cache is shared by multiple instances, we need to + // disambiguate its entries. + PutVarint64(&row_cache_id_, ioptions_.row_cache->NewId()); + } +} + +TableCache::~TableCache() { +} + +TableReader* TableCache::GetTableReaderFromHandle(Cache::Handle* handle) { + return reinterpret_cast<TableReader*>(cache_->Value(handle)); +} + +void TableCache::ReleaseHandle(Cache::Handle* handle) { + cache_->Release(handle); +} + +Status TableCache::GetTableReader( + const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, + bool sequential_mode, size_t readahead, bool record_read_stats, + HistogramImpl* file_read_hist, unique_ptr<TableReader>* table_reader, + bool skip_filters, int level, bool prefetch_index_and_filter_in_cache, + bool for_compaction) { + std::string fname = + TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); + unique_ptr<RandomAccessFile> file; + Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); + + RecordTick(ioptions_.statistics, NO_FILE_OPENS); + if (s.ok()) { + if (readahead > 0) { + file = NewReadaheadRandomAccessFile(std::move(file), readahead); + } + if (!sequential_mode && ioptions_.advise_random_on_open) { + file->Hint(RandomAccessFile::RANDOM); + } + StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS); + std::unique_ptr<RandomAccessFileReader> file_reader( + new RandomAccessFileReader( + std::move(file), fname, ioptions_.env, + record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS, + file_read_hist, ioptions_.rate_limiter, for_compaction)); + s = ioptions_.table_factory->NewTableReader( + TableReaderOptions(ioptions_, env_options, internal_comparator, + skip_filters, level), + std::move(file_reader), fd.GetFileSize(), table_reader, + prefetch_index_and_filter_in_cache); + TEST_SYNC_POINT("TableCache::GetTableReader:0"); + } + return s; +} + +void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) { + ReleaseHandle(handle); + uint64_t number = fd.GetNumber(); + Slice key = GetSliceForFileNumber(&number); + cache_->Erase(key); +} + +Status TableCache::FindTable(const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd, Cache::Handle** handle, + const bool no_io, bool record_read_stats, + HistogramImpl* file_read_hist, bool skip_filters, + int level, + bool prefetch_index_and_filter_in_cache) { + PERF_TIMER_GUARD(find_table_nanos); + Status s; + uint64_t number = fd.GetNumber(); + Slice key = GetSliceForFileNumber(&number); + *handle = cache_->Lookup(key); + TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0", + const_cast<bool*>(&no_io)); + + if (*handle == nullptr) { + if (no_io) { // Don't do IO and return a not-found status + return Status::Incomplete("Table not found in table_cache, no_io is set"); + } + unique_ptr<TableReader> table_reader; + s = GetTableReader(env_options, internal_comparator, fd, + false /* sequential mode */, 0 /* readahead */, + record_read_stats, file_read_hist, &table_reader, + skip_filters, level, prefetch_index_and_filter_in_cache); + if (!s.ok()) { + assert(table_reader == nullptr); + RecordTick(ioptions_.statistics, NO_FILE_ERRORS); + // We do not cache error results so that if the error is transient, + // or somebody repairs the file, we recover automatically. + } else { + s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>, + handle); + if (s.ok()) { + // Release ownership of table reader. + table_reader.release(); + } + } + } + return s; +} + +InternalIterator* TableCache::NewIterator( + const ReadOptions& options, const EnvOptions& env_options, + const InternalKeyComparator& icomparator, const FileDescriptor& fd, + RangeDelAggregator* range_del_agg, TableReader** table_reader_ptr, + HistogramImpl* file_read_hist, bool for_compaction, Arena* arena, + bool skip_filters, int level) { + PERF_TIMER_GUARD(new_table_iterator_nanos); + + Status s; + bool create_new_table_reader = false; + TableReader* table_reader = nullptr; + Cache::Handle* handle = nullptr; + if (s.ok()) { + if (table_reader_ptr != nullptr) { + *table_reader_ptr = nullptr; + } + size_t readahead = 0; + if (for_compaction) { +#ifndef NDEBUG + bool use_direct_reads_for_compaction = env_options.use_direct_reads; + TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction", + &use_direct_reads_for_compaction); +#endif // !NDEBUG + if (ioptions_.new_table_reader_for_compaction_inputs) { + readahead = ioptions_.compaction_readahead_size; + create_new_table_reader = true; + } + } else { + readahead = options.readahead_size; + create_new_table_reader = readahead > 0; + } + + if (create_new_table_reader) { + unique_ptr<TableReader> table_reader_unique_ptr; + s = GetTableReader( + env_options, icomparator, fd, true /* sequential_mode */, readahead, + !for_compaction /* record stats */, nullptr, &table_reader_unique_ptr, + false /* skip_filters */, level, + true /* prefetch_index_and_filter_in_cache */, for_compaction); + if (s.ok()) { + table_reader = table_reader_unique_ptr.release(); + } + } else { + table_reader = fd.table_reader; + if (table_reader == nullptr) { + s = FindTable(env_options, icomparator, fd, &handle, + options.read_tier == kBlockCacheTier /* no_io */, + !for_compaction /* record read_stats */, file_read_hist, + skip_filters, level); + if (s.ok()) { + table_reader = GetTableReaderFromHandle(handle); + } + } + } + } + InternalIterator* result = nullptr; + if (s.ok()) { + result = table_reader->NewIterator(options, arena, skip_filters); + if (create_new_table_reader) { + assert(handle == nullptr); + result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr); + } else if (handle != nullptr) { + result->RegisterCleanup(&UnrefEntry, cache_, handle); + handle = nullptr; // prevent from releasing below + } + + if (for_compaction) { + table_reader->SetupForCompaction(); + } + if (table_reader_ptr != nullptr) { + *table_reader_ptr = table_reader; + } + } + if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) { + std::unique_ptr<InternalIterator> range_del_iter( + table_reader->NewRangeTombstoneIterator(options)); + if (range_del_iter != nullptr) { + s = range_del_iter->status(); + } + if (s.ok()) { + s = range_del_agg->AddTombstones(std::move(range_del_iter)); + } + } + + if (handle != nullptr) { + ReleaseHandle(handle); + } + if (!s.ok()) { + assert(result == nullptr); + result = NewErrorInternalIterator(s, arena); + } + return result; +} + +InternalIterator* TableCache::NewRangeTombstoneIterator( + const ReadOptions& options, const EnvOptions& env_options, + const InternalKeyComparator& icomparator, const FileDescriptor& fd, + HistogramImpl* file_read_hist, bool skip_filters, int level) { + Status s; + TableReader* table_reader = nullptr; + Cache::Handle* handle = nullptr; + table_reader = fd.table_reader; + if (table_reader == nullptr) { + s = FindTable(env_options, icomparator, fd, &handle, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record read_stats */, file_read_hist, skip_filters, + level); + if (s.ok()) { + table_reader = GetTableReaderFromHandle(handle); + } + } + InternalIterator* result = nullptr; + if (s.ok()) { + result = table_reader->NewRangeTombstoneIterator(options); + if (result != nullptr) { + if (handle != nullptr) { + result->RegisterCleanup(&UnrefEntry, cache_, handle); + } + } + } + if (result == nullptr && handle != nullptr) { + // the range deletion block didn't exist, or there was a failure between + // getting handle and getting iterator. + ReleaseHandle(handle); + } + if (!s.ok()) { + assert(result == nullptr); + result = NewErrorInternalIterator(s); + } + return result; +} + +Status TableCache::Get(const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd, const Slice& k, + GetContext* get_context, HistogramImpl* file_read_hist, + bool skip_filters, int level) { + std::string* row_cache_entry = nullptr; + bool done = false; +#ifndef ROCKSDB_LITE + IterKey row_cache_key; + std::string row_cache_entry_buffer; + + // Check row cache if enabled. Since row cache does not currently store + // sequence numbers, we cannot use it if we need to fetch the sequence. + if (ioptions_.row_cache && !get_context->NeedToReadSequence()) { + uint64_t fd_number = fd.GetNumber(); + auto user_key = ExtractUserKey(k); + // We use the user key as cache key instead of the internal key, + // otherwise the whole cache would be invalidated every time the + // sequence key increases. However, to support caching snapshot + // reads, we append the sequence number (incremented by 1 to + // distinguish from 0) only in this case. + uint64_t seq_no = + options.snapshot == nullptr ? 0 : 1 + GetInternalKeySeqno(k); + + // Compute row cache key. + row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(), + row_cache_id_.size()); + AppendVarint64(&row_cache_key, fd_number); + AppendVarint64(&row_cache_key, seq_no); + row_cache_key.TrimAppend(row_cache_key.Size(), user_key.data(), + user_key.size()); + + if (auto row_handle = + ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) { + // Cleanable routine to release the cache entry + Cleanable value_pinner; + auto release_cache_entry_func = [](void* cache_to_clean, + void* cache_handle) { + ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle); + }; + auto found_row_cache_entry = static_cast<const std::string*>( + ioptions_.row_cache->Value(row_handle)); + // If it comes here value is located on the cache. + // found_row_cache_entry points to the value on cache, + // and value_pinner has cleanup procedure for the cached entry. + // After replayGetContextLog() returns, get_context.pinnable_slice_ + // will point to cache entry buffer (or a copy based on that) and + // cleanup routine under value_pinner will be delegated to + // get_context.pinnable_slice_. Cache entry is released when + // get_context.pinnable_slice_ is reset. + value_pinner.RegisterCleanup(release_cache_entry_func, + ioptions_.row_cache.get(), row_handle); + replayGetContextLog(*found_row_cache_entry, user_key, get_context, + &value_pinner); + RecordTick(ioptions_.statistics, ROW_CACHE_HIT); + done = true; + } else { + // Not found, setting up the replay log. + RecordTick(ioptions_.statistics, ROW_CACHE_MISS); + row_cache_entry = &row_cache_entry_buffer; + } + } +#endif // ROCKSDB_LITE + Status s; + TableReader* t = fd.table_reader; + Cache::Handle* handle = nullptr; + if (!done && s.ok()) { + if (t == nullptr) { + s = FindTable(env_options_, internal_comparator, fd, &handle, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, skip_filters, + level); + if (s.ok()) { + t = GetTableReaderFromHandle(handle); + } + } + if (s.ok() && get_context->range_del_agg() != nullptr && + !options.ignore_range_deletions) { + std::unique_ptr<InternalIterator> range_del_iter( + t->NewRangeTombstoneIterator(options)); + if (range_del_iter != nullptr) { + s = range_del_iter->status(); + } + if (s.ok()) { + s = get_context->range_del_agg()->AddTombstones( + std::move(range_del_iter)); + } + } + if (s.ok()) { + get_context->SetReplayLog(row_cache_entry); // nullptr if no cache. + s = t->Get(options, k, get_context, skip_filters); + get_context->SetReplayLog(nullptr); + } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { + // Couldn't find Table in cache but treat as kFound if no_io set + get_context->MarkKeyMayExist(); + s = Status::OK(); + done = true; + } + } + +#ifndef ROCKSDB_LITE + // Put the replay log in row cache only if something was found. + if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) { + size_t charge = + row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string); + void* row_ptr = new std::string(std::move(*row_cache_entry)); + ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge, + &DeleteEntry<std::string>); + } +#endif // ROCKSDB_LITE + + if (handle != nullptr) { + ReleaseHandle(handle); + } + return s; +} + +Status TableCache::GetTableProperties( + const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, + std::shared_ptr<const TableProperties>* properties, bool no_io) { + Status s; + auto table_reader = fd.table_reader; + // table already been pre-loaded? + if (table_reader) { + *properties = table_reader->GetTableProperties(); + + return s; + } + + Cache::Handle* table_handle = nullptr; + s = FindTable(env_options, internal_comparator, fd, &table_handle, no_io); + if (!s.ok()) { + return s; + } + assert(table_handle); + auto table = GetTableReaderFromHandle(table_handle); + *properties = table->GetTableProperties(); + ReleaseHandle(table_handle); + return s; +} + +size_t TableCache::GetMemoryUsageByTableReader( + const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd) { + Status s; + auto table_reader = fd.table_reader; + // table already been pre-loaded? + if (table_reader) { + return table_reader->ApproximateMemoryUsage(); + } + + Cache::Handle* table_handle = nullptr; + s = FindTable(env_options, internal_comparator, fd, &table_handle, true); + if (!s.ok()) { + return 0; + } + assert(table_handle); + auto table = GetTableReaderFromHandle(table_handle); + auto ret = table->ApproximateMemoryUsage(); + ReleaseHandle(table_handle); + return ret; +} + +void TableCache::Evict(Cache* cache, uint64_t file_number) { + cache->Erase(GetSliceForFileNumber(&file_number)); +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/table_cache.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/table_cache.h b/thirdparty/rocksdb/db/table_cache.h new file mode 100644 index 0000000..8b65baf --- /dev/null +++ b/thirdparty/rocksdb/db/table_cache.h @@ -0,0 +1,146 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Thread-safe (provides internal synchronization) + +#pragma once +#include <string> +#include <vector> +#include <stdint.h> + +#include "db/dbformat.h" +#include "db/range_del_aggregator.h" +#include "options/cf_options.h" +#include "port/port.h" +#include "rocksdb/cache.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "rocksdb/table.h" +#include "table/table_reader.h" + +namespace rocksdb { + +class Env; +class Arena; +struct FileDescriptor; +class GetContext; +class HistogramImpl; +class InternalIterator; + +class TableCache { + public: + TableCache(const ImmutableCFOptions& ioptions, + const EnvOptions& storage_options, Cache* cache); + ~TableCache(); + + // Return an iterator for the specified file number (the corresponding + // file length must be exactly "file_size" bytes). If "tableptr" is + // non-nullptr, also sets "*tableptr" to point to the Table object + // underlying the returned iterator, or nullptr if no Table object underlies + // the returned iterator. The returned "*tableptr" object is owned by + // the cache and should not be deleted, and is valid for as long as the + // returned iterator is live. + // @param range_del_agg If non-nullptr, adds range deletions to the + // aggregator. If an error occurs, returns it in a NewErrorInternalIterator + // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" + InternalIterator* NewIterator( + const ReadOptions& options, const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& file_fd, RangeDelAggregator* range_del_agg, + TableReader** table_reader_ptr = nullptr, + HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, + Arena* arena = nullptr, bool skip_filters = false, int level = -1); + + InternalIterator* NewRangeTombstoneIterator( + const ReadOptions& options, const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& file_fd, HistogramImpl* file_read_hist, + bool skip_filters, int level); + + // If a seek to internal key "k" in specified file finds an entry, + // call (*handle_result)(arg, found_key, found_value) repeatedly until + // it returns false. + // @param get_context State for get operation. If its range_del_agg() returns + // non-nullptr, adds range deletions to the aggregator. If an error occurs, + // returns non-ok status. + // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" + Status Get(const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& file_fd, const Slice& k, + GetContext* get_context, HistogramImpl* file_read_hist = nullptr, + bool skip_filters = false, int level = -1); + + // Evict any entry for the specified file number + static void Evict(Cache* cache, uint64_t file_number); + + // Clean table handle and erase it from the table cache + // Used in DB close, or the file is not live anymore. + void EraseHandle(const FileDescriptor& fd, Cache::Handle* handle); + + // Find table reader + // @param skip_filters Disables loading/accessing the filter block + // @param level == -1 means not specified + Status FindTable(const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& file_fd, Cache::Handle**, + const bool no_io = false, bool record_read_stats = true, + HistogramImpl* file_read_hist = nullptr, + bool skip_filters = false, int level = -1, + bool prefetch_index_and_filter_in_cache = true); + + // Get TableReader from a cache handle. + TableReader* GetTableReaderFromHandle(Cache::Handle* handle); + + // Get the table properties of a given table. + // @no_io: indicates if we should load table to the cache if it is not present + // in table cache yet. + // @returns: `properties` will be reset on success. Please note that we will + // return Status::Incomplete() if table is not present in cache and + // we set `no_io` to be true. + Status GetTableProperties(const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& file_meta, + std::shared_ptr<const TableProperties>* properties, + bool no_io = false); + + // Return total memory usage of the table reader of the file. + // 0 if table reader of the file is not loaded. + size_t GetMemoryUsageByTableReader( + const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd); + + // Release the handle from a cache + void ReleaseHandle(Cache::Handle* handle); + + // Capacity of the backing Cache that indicates inifinite TableCache capacity. + // For example when max_open_files is -1 we set the backing Cache to this. + static const int kInfiniteCapacity = 0x400000; + + private: + // Build a table reader + Status GetTableReader(const EnvOptions& env_options, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& fd, bool sequential_mode, + size_t readahead, bool record_read_stats, + HistogramImpl* file_read_hist, + unique_ptr<TableReader>* table_reader, + bool skip_filters = false, int level = -1, + bool prefetch_index_and_filter_in_cache = true, + bool for_compaction = false); + + const ImmutableCFOptions& ioptions_; + const EnvOptions& env_options_; + Cache* const cache_; + std::string row_cache_id_; +}; + +} // namespace rocksdb
