http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/version_set.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/version_set.cc b/thirdparty/rocksdb/db/version_set.cc new file mode 100644 index 0000000..6b9611a --- /dev/null +++ b/thirdparty/rocksdb/db/version_set.cc @@ -0,0 +1,3801 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/version_set.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include <inttypes.h> +#include <stdio.h> +#include <algorithm> +#include <climits> +#include <map> +#include <set> +#include <string> +#include <unordered_map> +#include <vector> +#include "db/compaction.h" +#include "db/internal_stats.h" +#include "db/log_reader.h" +#include "db/log_writer.h" +#include "db/memtable.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" +#include "db/pinned_iterators_manager.h" +#include "db/table_cache.h" +#include "db/version_builder.h" +#include "monitoring/file_read_sample.h" +#include "monitoring/perf_context_imp.h" +#include "rocksdb/env.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/write_buffer_manager.h" +#include "table/format.h" +#include "table/get_context.h" +#include "table/internal_iterator.h" +#include "table/merging_iterator.h" +#include "table/meta_blocks.h" +#include "table/plain_table_factory.h" +#include "table/table_reader.h" +#include "table/two_level_iterator.h" +#include "util/coding.h" +#include "util/file_reader_writer.h" +#include "util/filename.h" +#include "util/stop_watch.h" +#include "util/string_util.h" +#include "util/sync_point.h" + +namespace rocksdb { + +namespace { + +// Find File in LevelFilesBrief data structure +// Within an index range defined by left and right +int FindFileInRange(const InternalKeyComparator& icmp, + const LevelFilesBrief& file_level, + const Slice& key, + uint32_t left, + uint32_t right) { + while (left < right) { + uint32_t mid = (left + right) / 2; + const FdWithKeyRange& f = file_level.files[mid]; + if (icmp.InternalKeyComparator::Compare(f.largest_key, key) < 0) { + // Key at "mid.largest" is < "target". Therefore all + // files at or before "mid" are uninteresting. + left = mid + 1; + } else { + // Key at "mid.largest" is >= "target". Therefore all files + // after "mid" are uninteresting. + right = mid; + } + } + return right; +} + +// Class to help choose the next file to search for the particular key. +// Searches and returns files level by level. +// We can search level-by-level since entries never hop across +// levels. Therefore we are guaranteed that if we find data +// in a smaller level, later levels are irrelevant (unless we +// are MergeInProgress). +class FilePicker { + public: + FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key, + const Slice& ikey, autovector<LevelFilesBrief>* file_levels, + unsigned int num_levels, FileIndexer* file_indexer, + const Comparator* user_comparator, + const InternalKeyComparator* internal_comparator) + : num_levels_(num_levels), + curr_level_(static_cast<unsigned int>(-1)), + returned_file_level_(static_cast<unsigned int>(-1)), + hit_file_level_(static_cast<unsigned int>(-1)), + search_left_bound_(0), + search_right_bound_(FileIndexer::kLevelMaxIndex), +#ifndef NDEBUG + files_(files), +#endif + level_files_brief_(file_levels), + is_hit_file_last_in_level_(false), + user_key_(user_key), + ikey_(ikey), + file_indexer_(file_indexer), + user_comparator_(user_comparator), + internal_comparator_(internal_comparator) { + // Setup member variables to search first level. + search_ended_ = !PrepareNextLevel(); + if (!search_ended_) { + // Prefetch Level 0 table data to avoid cache miss if possible. + for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) { + auto* r = (*level_files_brief_)[0].files[i].fd.table_reader; + if (r) { + r->Prepare(ikey); + } + } + } + } + + int GetCurrentLevel() const { return curr_level_; } + + FdWithKeyRange* GetNextFile() { + while (!search_ended_) { // Loops over different levels. + while (curr_index_in_curr_level_ < curr_file_level_->num_files) { + // Loops over all files in current level. + FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_]; + hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_index_in_curr_level_ == curr_file_level_->num_files - 1; + int cmp_largest = -1; + + // Do key range filtering of files or/and fractional cascading if: + // (1) not all the files are in level 0, or + // (2) there are more than 3 current level files + // If there are only 3 or less current level files in the system, we skip + // the key range filtering. In this case, more likely, the system is + // highly tuned to minimize number of tables queried by each query, + // so it is unlikely that key range filtering is more efficient than + // querying the files. + if (num_levels_ > 1 || curr_file_level_->num_files > 3) { + // Check if key is within a file's range. If search left bound and + // right bound point to the same find, we are sure key falls in + // range. + assert( + curr_level_ == 0 || + curr_index_in_curr_level_ == start_index_in_curr_level_ || + user_comparator_->Compare(user_key_, + ExtractUserKey(f->smallest_key)) <= 0); + + int cmp_smallest = user_comparator_->Compare(user_key_, + ExtractUserKey(f->smallest_key)); + if (cmp_smallest >= 0) { + cmp_largest = user_comparator_->Compare(user_key_, + ExtractUserKey(f->largest_key)); + } + + // Setup file search bound for the next level based on the + // comparison results + if (curr_level_ > 0) { + file_indexer_->GetNextLevelIndex(curr_level_, + curr_index_in_curr_level_, + cmp_smallest, cmp_largest, + &search_left_bound_, + &search_right_bound_); + } + // Key falls out of current file's range + if (cmp_smallest < 0 || cmp_largest > 0) { + if (curr_level_ == 0) { + ++curr_index_in_curr_level_; + continue; + } else { + // Search next level. + break; + } + } + } +#ifndef NDEBUG + // Sanity check to make sure that the files are correctly sorted + if (prev_file_) { + if (curr_level_ != 0) { + int comp_sign = internal_comparator_->Compare( + prev_file_->largest_key, f->smallest_key); + assert(comp_sign < 0); + } else { + // level == 0, the current file cannot be newer than the previous + // one. Use compressed data structure, has no attribute seqNo + assert(curr_index_in_curr_level_ > 0); + assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_], + files_[0][curr_index_in_curr_level_-1])); + } + } + prev_file_ = f; +#endif + returned_file_level_ = curr_level_; + if (curr_level_ > 0 && cmp_largest < 0) { + // No more files to search in this level. + search_ended_ = !PrepareNextLevel(); + } else { + ++curr_index_in_curr_level_; + } + return f; + } + // Start searching next level. + search_ended_ = !PrepareNextLevel(); + } + // Search ended. + return nullptr; + } + + // getter for current file level + // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts + unsigned int GetHitFileLevel() { return hit_file_level_; } + + // Returns true if the most recent "hit file" (i.e., one returned by + // GetNextFile()) is at the last index in its level. + bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + + private: + unsigned int num_levels_; + unsigned int curr_level_; + unsigned int returned_file_level_; + unsigned int hit_file_level_; + int32_t search_left_bound_; + int32_t search_right_bound_; +#ifndef NDEBUG + std::vector<FileMetaData*>* files_; +#endif + autovector<LevelFilesBrief>* level_files_brief_; + bool search_ended_; + bool is_hit_file_last_in_level_; + LevelFilesBrief* curr_file_level_; + unsigned int curr_index_in_curr_level_; + unsigned int start_index_in_curr_level_; + Slice user_key_; + Slice ikey_; + FileIndexer* file_indexer_; + const Comparator* user_comparator_; + const InternalKeyComparator* internal_comparator_; +#ifndef NDEBUG + FdWithKeyRange* prev_file_; +#endif + + // Setup local variables to search next level. + // Returns false if there are no more levels to search. + bool PrepareNextLevel() { + curr_level_++; + while (curr_level_ < num_levels_) { + curr_file_level_ = &(*level_files_brief_)[curr_level_]; + if (curr_file_level_->num_files == 0) { + // When current level is empty, the search bound generated from upper + // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is + // also empty. + assert(search_left_bound_ == 0); + assert(search_right_bound_ == -1 || + search_right_bound_ == FileIndexer::kLevelMaxIndex); + // Since current level is empty, it will need to search all files in + // the next level + search_left_bound_ = 0; + search_right_bound_ = FileIndexer::kLevelMaxIndex; + curr_level_++; + continue; + } + + // Some files may overlap each other. We find + // all files that overlap user_key and process them in order from + // newest to oldest. In the context of merge-operator, this can occur at + // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes + // are always compacted into a single entry). + int32_t start_index; + if (curr_level_ == 0) { + // On Level-0, we read through all files to check for overlap. + start_index = 0; + } else { + // On Level-n (n>=1), files are sorted. Binary search to find the + // earliest file whose largest key >= ikey. Search left bound and + // right bound are used to narrow the range. + if (search_left_bound_ == search_right_bound_) { + start_index = search_left_bound_; + } else if (search_left_bound_ < search_right_bound_) { + if (search_right_bound_ == FileIndexer::kLevelMaxIndex) { + search_right_bound_ = + static_cast<int32_t>(curr_file_level_->num_files) - 1; + } + start_index = + FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_, + static_cast<uint32_t>(search_left_bound_), + static_cast<uint32_t>(search_right_bound_)); + } else { + // search_left_bound > search_right_bound, key does not exist in + // this level. Since no comparison is done in this level, it will + // need to search all files in the next level. + search_left_bound_ = 0; + search_right_bound_ = FileIndexer::kLevelMaxIndex; + curr_level_++; + continue; + } + } + start_index_in_curr_level_ = start_index; + curr_index_in_curr_level_ = start_index; +#ifndef NDEBUG + prev_file_ = nullptr; +#endif + return true; + } + // curr_level_ = num_levels_. So, no more levels to search. + return false; + } +}; +} // anonymous namespace + +VersionStorageInfo::~VersionStorageInfo() { delete[] files_; } + +Version::~Version() { + assert(refs_ == 0); + + // Remove from linked list + prev_->next_ = next_; + next_->prev_ = prev_; + + // Drop references to files + for (int level = 0; level < storage_info_.num_levels_; level++) { + for (size_t i = 0; i < storage_info_.files_[level].size(); i++) { + FileMetaData* f = storage_info_.files_[level][i]; + assert(f->refs > 0); + f->refs--; + if (f->refs <= 0) { + vset_->obsolete_files_.push_back(f); + } + } + } +} + +int FindFile(const InternalKeyComparator& icmp, + const LevelFilesBrief& file_level, + const Slice& key) { + return FindFileInRange(icmp, file_level, key, 0, + static_cast<uint32_t>(file_level.num_files)); +} + +void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level, + const std::vector<FileMetaData*>& files, + Arena* arena) { + assert(file_level); + assert(arena); + + size_t num = files.size(); + file_level->num_files = num; + char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange)); + file_level->files = new (mem)FdWithKeyRange[num]; + + for (size_t i = 0; i < num; i++) { + Slice smallest_key = files[i]->smallest.Encode(); + Slice largest_key = files[i]->largest.Encode(); + + // Copy key slice to sequential memory + size_t smallest_size = smallest_key.size(); + size_t largest_size = largest_key.size(); + mem = arena->AllocateAligned(smallest_size + largest_size); + memcpy(mem, smallest_key.data(), smallest_size); + memcpy(mem + smallest_size, largest_key.data(), largest_size); + + FdWithKeyRange& f = file_level->files[i]; + f.fd = files[i]->fd; + f.file_metadata = files[i]; + f.smallest_key = Slice(mem, smallest_size); + f.largest_key = Slice(mem + smallest_size, largest_size); + } +} + +static bool AfterFile(const Comparator* ucmp, + const Slice* user_key, const FdWithKeyRange* f) { + // nullptr user_key occurs before all keys and is therefore never after *f + return (user_key != nullptr && + ucmp->Compare(*user_key, ExtractUserKey(f->largest_key)) > 0); +} + +static bool BeforeFile(const Comparator* ucmp, + const Slice* user_key, const FdWithKeyRange* f) { + // nullptr user_key occurs after all keys and is therefore never before *f + return (user_key != nullptr && + ucmp->Compare(*user_key, ExtractUserKey(f->smallest_key)) < 0); +} + +bool SomeFileOverlapsRange( + const InternalKeyComparator& icmp, + bool disjoint_sorted_files, + const LevelFilesBrief& file_level, + const Slice* smallest_user_key, + const Slice* largest_user_key) { + const Comparator* ucmp = icmp.user_comparator(); + if (!disjoint_sorted_files) { + // Need to check against all files + for (size_t i = 0; i < file_level.num_files; i++) { + const FdWithKeyRange* f = &(file_level.files[i]); + if (AfterFile(ucmp, smallest_user_key, f) || + BeforeFile(ucmp, largest_user_key, f)) { + // No overlap + } else { + return true; // Overlap + } + } + return false; + } + + // Binary search over file list + uint32_t index = 0; + if (smallest_user_key != nullptr) { + // Find the earliest possible internal key for smallest_user_key + InternalKey small; + small.SetMaxPossibleForUserKey(*smallest_user_key); + index = FindFile(icmp, file_level, small.Encode()); + } + + if (index >= file_level.num_files) { + // beginning of range is after all files, so no overlap. + return false; + } + + return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]); +} + +namespace { + +// An internal iterator. For a given version/level pair, yields +// information about the files in the level. For a given entry, key() +// is the largest key that occurs in the file, and value() is an +// 16-byte value containing the file number and file size, both +// encoded using EncodeFixed64. +class LevelFileNumIterator : public InternalIterator { + public: + LevelFileNumIterator(const InternalKeyComparator& icmp, + const LevelFilesBrief* flevel, bool should_sample) + : icmp_(icmp), + flevel_(flevel), + index_(static_cast<uint32_t>(flevel->num_files)), + current_value_(0, 0, 0), // Marks as invalid + should_sample_(should_sample) {} + virtual bool Valid() const override { return index_ < flevel_->num_files; } + virtual void Seek(const Slice& target) override { + index_ = FindFile(icmp_, *flevel_, target); + } + virtual void SeekForPrev(const Slice& target) override { + SeekForPrevImpl(target, &icmp_); + } + + virtual void SeekToFirst() override { index_ = 0; } + virtual void SeekToLast() override { + index_ = (flevel_->num_files == 0) + ? 0 + : static_cast<uint32_t>(flevel_->num_files) - 1; + } + virtual void Next() override { + assert(Valid()); + index_++; + } + virtual void Prev() override { + assert(Valid()); + if (index_ == 0) { + index_ = static_cast<uint32_t>(flevel_->num_files); // Marks as invalid + } else { + index_--; + } + } + Slice key() const override { + assert(Valid()); + return flevel_->files[index_].largest_key; + } + Slice value() const override { + assert(Valid()); + + auto file_meta = flevel_->files[index_]; + if (should_sample_) { + sample_file_read_inc(file_meta.file_metadata); + } + current_value_ = file_meta.fd; + return Slice(reinterpret_cast<const char*>(¤t_value_), + sizeof(FileDescriptor)); + } + virtual Status status() const override { return Status::OK(); } + + private: + const InternalKeyComparator icmp_; + const LevelFilesBrief* flevel_; + uint32_t index_; + mutable FileDescriptor current_value_; + bool should_sample_; +}; + +class LevelFileIteratorState : public TwoLevelIteratorState { + public: + // @param skip_filters Disables loading/accessing the filter block + LevelFileIteratorState(TableCache* table_cache, + const ReadOptions& read_options, + const EnvOptions& env_options, + const InternalKeyComparator& icomparator, + HistogramImpl* file_read_hist, bool for_compaction, + bool prefix_enabled, bool skip_filters, int level, + RangeDelAggregator* range_del_agg) + : TwoLevelIteratorState(prefix_enabled), + table_cache_(table_cache), + read_options_(read_options), + env_options_(env_options), + icomparator_(icomparator), + file_read_hist_(file_read_hist), + for_compaction_(for_compaction), + skip_filters_(skip_filters), + level_(level), + range_del_agg_(range_del_agg) {} + + InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override { + if (meta_handle.size() != sizeof(FileDescriptor)) { + return NewErrorInternalIterator( + Status::Corruption("FileReader invoked with unexpected value")); + } + const FileDescriptor* fd = + reinterpret_cast<const FileDescriptor*>(meta_handle.data()); + return table_cache_->NewIterator( + read_options_, env_options_, icomparator_, *fd, range_del_agg_, + nullptr /* don't need reference to table */, file_read_hist_, + for_compaction_, nullptr /* arena */, skip_filters_, level_); + } + + bool PrefixMayMatch(const Slice& internal_key) override { + return true; + } + + bool KeyReachedUpperBound(const Slice& internal_key) override { + return read_options_.iterate_upper_bound != nullptr && + icomparator_.user_comparator()->Compare( + ExtractUserKey(internal_key), + *read_options_.iterate_upper_bound) >= 0; + } + + private: + TableCache* table_cache_; + const ReadOptions read_options_; + const EnvOptions& env_options_; + const InternalKeyComparator& icomparator_; + HistogramImpl* file_read_hist_; + bool for_compaction_; + bool skip_filters_; + int level_; + RangeDelAggregator* range_del_agg_; +}; + +// A wrapper of version builder which references the current version in +// constructor and unref it in the destructor. +// Both of the constructor and destructor need to be called inside DB Mutex. +class BaseReferencedVersionBuilder { + public: + explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd) + : version_builder_(new VersionBuilder( + cfd->current()->version_set()->env_options(), cfd->table_cache(), + cfd->current()->storage_info(), cfd->ioptions()->info_log)), + version_(cfd->current()) { + version_->Ref(); + } + ~BaseReferencedVersionBuilder() { + delete version_builder_; + version_->Unref(); + } + VersionBuilder* version_builder() { return version_builder_; } + + private: + VersionBuilder* version_builder_; + Version* version_; +}; +} // anonymous namespace + +Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp, + const FileMetaData* file_meta, + const std::string* fname) const { + auto table_cache = cfd_->table_cache(); + auto ioptions = cfd_->ioptions(); + Status s = table_cache->GetTableProperties( + vset_->env_options_, cfd_->internal_comparator(), file_meta->fd, + tp, true /* no io */); + if (s.ok()) { + return s; + } + + // We only ignore error type `Incomplete` since it's by design that we + // disallow table when it's not in table cache. + if (!s.IsIncomplete()) { + return s; + } + + // 2. Table is not present in table cache, we'll read the table properties + // directly from the properties block in the file. + std::unique_ptr<RandomAccessFile> file; + std::string file_name; + if (fname != nullptr) { + file_name = *fname; + } else { + file_name = + TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), + file_meta->fd.GetPathId()); + } + s = ioptions->env->NewRandomAccessFile(file_name, &file, vset_->env_options_); + if (!s.ok()) { + return s; + } + + TableProperties* raw_table_properties; + // By setting the magic number to kInvalidTableMagicNumber, we can by + // pass the magic number check in the footer. + std::unique_ptr<RandomAccessFileReader> file_reader( + new RandomAccessFileReader(std::move(file), file_name)); + s = ReadTableProperties( + file_reader.get(), file_meta->fd.GetFileSize(), + Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions, &raw_table_properties); + if (!s.ok()) { + return s; + } + RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES); + + *tp = std::shared_ptr<const TableProperties>(raw_table_properties); + return s; +} + +Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { + Status s; + for (int level = 0; level < storage_info_.num_levels_; level++) { + s = GetPropertiesOfAllTables(props, level); + if (!s.ok()) { + return s; + } + } + + return Status::OK(); +} + +Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props, + int level) { + for (const auto& file_meta : storage_info_.files_[level]) { + auto fname = + TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), + file_meta->fd.GetPathId()); + // 1. If the table is already present in table cache, load table + // properties from there. + std::shared_ptr<const TableProperties> table_properties; + Status s = GetTableProperties(&table_properties, file_meta, &fname); + if (s.ok()) { + props->insert({fname, table_properties}); + } else { + return s; + } + } + + return Status::OK(); +} + +Status Version::GetPropertiesOfTablesInRange( + const Range* range, std::size_t n, TablePropertiesCollection* props) const { + for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { + for (decltype(n) i = 0; i < n; i++) { + // Convert user_key into a corresponding internal key. + InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); + InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); + std::vector<FileMetaData*> files; + storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr, + false); + for (const auto& file_meta : files) { + auto fname = + TableFileName(vset_->db_options_->db_paths, + file_meta->fd.GetNumber(), file_meta->fd.GetPathId()); + if (props->count(fname) == 0) { + // 1. If the table is already present in table cache, load table + // properties from there. + std::shared_ptr<const TableProperties> table_properties; + Status s = GetTableProperties(&table_properties, file_meta, &fname); + if (s.ok()) { + props->insert({fname, table_properties}); + } else { + return s; + } + } + } + } + } + + return Status::OK(); +} + +Status Version::GetAggregatedTableProperties( + std::shared_ptr<const TableProperties>* tp, int level) { + TablePropertiesCollection props; + Status s; + if (level < 0) { + s = GetPropertiesOfAllTables(&props); + } else { + s = GetPropertiesOfAllTables(&props, level); + } + if (!s.ok()) { + return s; + } + + auto* new_tp = new TableProperties(); + for (const auto& item : props) { + new_tp->Add(*item.second); + } + tp->reset(new_tp); + return Status::OK(); +} + +size_t Version::GetMemoryUsageByTableReaders() { + size_t total_usage = 0; + for (auto& file_level : storage_info_.level_files_brief_) { + for (size_t i = 0; i < file_level.num_files; i++) { + total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader( + vset_->env_options_, cfd_->internal_comparator(), + file_level.files[i].fd); + } + } + return total_usage; +} + +void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) { + assert(cf_meta); + assert(cfd_); + + cf_meta->name = cfd_->GetName(); + cf_meta->size = 0; + cf_meta->file_count = 0; + cf_meta->levels.clear(); + + auto* ioptions = cfd_->ioptions(); + auto* vstorage = storage_info(); + + for (int level = 0; level < cfd_->NumberLevels(); level++) { + uint64_t level_size = 0; + cf_meta->file_count += vstorage->LevelFiles(level).size(); + std::vector<SstFileMetaData> files; + for (const auto& file : vstorage->LevelFiles(level)) { + uint32_t path_id = file->fd.GetPathId(); + std::string file_path; + if (path_id < ioptions->db_paths.size()) { + file_path = ioptions->db_paths[path_id].path; + } else { + assert(!ioptions->db_paths.empty()); + file_path = ioptions->db_paths.back().path; + } + files.emplace_back( + MakeTableFileName("", file->fd.GetNumber()), file_path, + file->fd.GetFileSize(), file->smallest_seqno, file->largest_seqno, + file->smallest.user_key().ToString(), + file->largest.user_key().ToString(), + file->stats.num_reads_sampled.load(std::memory_order_relaxed), + file->being_compacted); + level_size += file->fd.GetFileSize(); + } + cf_meta->levels.emplace_back( + level, level_size, std::move(files)); + cf_meta->size += level_size; + } +} + + +uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const { + // Estimation will be inaccurate when: + // (1) there exist merge keys + // (2) keys are directly overwritten + // (3) deletion on non-existing keys + // (4) low number of samples + if (current_num_samples_ == 0) { + return 0; + } + + if (current_num_non_deletions_ <= current_num_deletions_) { + return 0; + } + + uint64_t est = current_num_non_deletions_ - current_num_deletions_; + + uint64_t file_count = 0; + for (int level = 0; level < num_levels_; ++level) { + file_count += files_[level].size(); + } + + if (current_num_samples_ < file_count) { + // casting to avoid overflowing + return + static_cast<uint64_t>( + (est * static_cast<double>(file_count) / current_num_samples_) + ); + } else { + return est; + } +} + +double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel( + int level) const { + assert(level < num_levels_); + uint64_t sum_file_size_bytes = 0; + uint64_t sum_data_size_bytes = 0; + for (auto* file_meta : files_[level]) { + sum_file_size_bytes += file_meta->fd.GetFileSize(); + sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size; + } + if (sum_file_size_bytes == 0) { + return -1.0; + } + return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes; +} + +void Version::AddIterators(const ReadOptions& read_options, + const EnvOptions& soptions, + MergeIteratorBuilder* merge_iter_builder, + RangeDelAggregator* range_del_agg) { + assert(storage_info_.finalized_); + + for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) { + AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level, + range_del_agg); + } +} + +void Version::AddIteratorsForLevel(const ReadOptions& read_options, + const EnvOptions& soptions, + MergeIteratorBuilder* merge_iter_builder, + int level, + RangeDelAggregator* range_del_agg) { + assert(storage_info_.finalized_); + if (level >= storage_info_.num_non_empty_levels()) { + // This is an empty level + return; + } else if (storage_info_.LevelFilesBrief(level).num_files == 0) { + // No files in this level + return; + } + + bool should_sample = should_sample_file_read(); + + auto* arena = merge_iter_builder->GetArena(); + if (level == 0) { + // Merge all level zero files together since they may overlap + for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { + const auto& file = storage_info_.LevelFilesBrief(0).files[i]; + merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( + read_options, soptions, cfd_->internal_comparator(), file.fd, + range_del_agg, nullptr, cfd_->internal_stats()->GetFileReadHist(0), + false, arena, false /* skip_filters */, 0 /* level */)); + } + if (should_sample) { + // Count ones for every L0 files. This is done per iterator creation + // rather than Seek(), while files in other levels are recored per seek. + // If users execute one range query per iterator, there may be some + // discrepancy here. + for (FileMetaData* meta : storage_info_.LevelFiles(0)) { + sample_file_read_inc(meta); + } + } + } else { + // For levels > 0, we can use a concatenating iterator that sequentially + // walks through the non-overlapping files in the level, opening them + // lazily. + auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); + auto* state = new (mem) + LevelFileIteratorState(cfd_->table_cache(), read_options, soptions, + cfd_->internal_comparator(), + cfd_->internal_stats()->GetFileReadHist(level), + false /* for_compaction */, + cfd_->ioptions()->prefix_extractor != nullptr, + IsFilterSkipped(level), level, range_del_agg); + mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); + auto* first_level_iter = new (mem) LevelFileNumIterator( + cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level), + should_sample_file_read()); + merge_iter_builder->AddIterator( + NewTwoLevelIterator(state, first_level_iter, arena, false)); + } +} + +void Version::AddRangeDelIteratorsForLevel( + const ReadOptions& read_options, const EnvOptions& soptions, int level, + std::vector<InternalIterator*>* range_del_iters) { + range_del_iters->clear(); + for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files; i++) { + const auto& file = storage_info_.LevelFilesBrief(level).files[i]; + auto* range_del_iter = cfd_->table_cache()->NewRangeTombstoneIterator( + read_options, soptions, cfd_->internal_comparator(), file.fd, + cfd_->internal_stats()->GetFileReadHist(level), + false /* skip_filters */, level); + if (range_del_iter != nullptr) { + range_del_iters->push_back(range_del_iter); + } + } +} + +VersionStorageInfo::VersionStorageInfo( + const InternalKeyComparator* internal_comparator, + const Comparator* user_comparator, int levels, + CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage, + bool _force_consistency_checks) + : internal_comparator_(internal_comparator), + user_comparator_(user_comparator), + // cfd is nullptr if Version is dummy + num_levels_(levels), + num_non_empty_levels_(0), + file_indexer_(user_comparator), + compaction_style_(compaction_style), + files_(new std::vector<FileMetaData*>[num_levels_]), + base_level_(num_levels_ == 1 ? -1 : 1), + files_by_compaction_pri_(num_levels_), + level0_non_overlapping_(false), + next_file_to_compact_by_size_(num_levels_), + compaction_score_(num_levels_), + compaction_level_(num_levels_), + l0_delay_trigger_count_(0), + accumulated_file_size_(0), + accumulated_raw_key_size_(0), + accumulated_raw_value_size_(0), + accumulated_num_non_deletions_(0), + accumulated_num_deletions_(0), + current_num_non_deletions_(0), + current_num_deletions_(0), + current_num_samples_(0), + estimated_compaction_needed_bytes_(0), + finalized_(false), + force_consistency_checks_(_force_consistency_checks) { + if (ref_vstorage != nullptr) { + accumulated_file_size_ = ref_vstorage->accumulated_file_size_; + accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_; + accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_; + accumulated_num_non_deletions_ = + ref_vstorage->accumulated_num_non_deletions_; + accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_; + current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_; + current_num_deletions_ = ref_vstorage->current_num_deletions_; + current_num_samples_ = ref_vstorage->current_num_samples_; + } +} + +Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, + uint64_t version_number) + : env_(vset->env_), + cfd_(column_family_data), + info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log), + db_statistics_((cfd_ == nullptr) ? nullptr + : cfd_->ioptions()->statistics), + table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()), + merge_operator_((cfd_ == nullptr) ? nullptr + : cfd_->ioptions()->merge_operator), + storage_info_( + (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(), + (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(), + cfd_ == nullptr ? 0 : cfd_->NumberLevels(), + cfd_ == nullptr ? kCompactionStyleLevel + : cfd_->ioptions()->compaction_style, + (cfd_ == nullptr || cfd_->current() == nullptr) + ? nullptr + : cfd_->current()->storage_info(), + cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks), + vset_(vset), + next_(this), + prev_(this), + refs_(0), + version_number_(version_number) {} + +void Version::Get(const ReadOptions& read_options, const LookupKey& k, + PinnableSlice* value, Status* status, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, bool* value_found, + bool* key_exists, SequenceNumber* seq) { + Slice ikey = k.internal_key(); + Slice user_key = k.user_key(); + + assert(status->ok() || status->IsMergeInProgress()); + + if (key_exists != nullptr) { + // will falsify below if not found + *key_exists = true; + } + + PinnedIteratorsManager pinned_iters_mgr; + GetContext get_context( + user_comparator(), merge_operator_, info_log_, db_statistics_, + status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, + value, value_found, merge_context, range_del_agg, this->env_, seq, + merge_operator_ ? &pinned_iters_mgr : nullptr); + + // Pin blocks that we read to hold merge operands + if (merge_operator_) { + pinned_iters_mgr.StartPinning(); + } + + FilePicker fp( + storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, + storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, + user_comparator(), internal_comparator()); + FdWithKeyRange* f = fp.GetNextFile(); + while (f != nullptr) { + if (get_context.sample()) { + sample_file_read_inc(f->file_metadata); + } + *status = table_cache_->Get( + read_options, *internal_comparator(), f->fd, ikey, &get_context, + cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), + IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()), + fp.IsHitFileLastInLevel()), + fp.GetCurrentLevel()); + // TODO: examine the behavior for corrupted key + if (!status->ok()) { + return; + } + + switch (get_context.State()) { + case GetContext::kNotFound: + // Keep searching in other files + break; + case GetContext::kFound: + if (fp.GetHitFileLevel() == 0) { + RecordTick(db_statistics_, GET_HIT_L0); + } else if (fp.GetHitFileLevel() == 1) { + RecordTick(db_statistics_, GET_HIT_L1); + } else if (fp.GetHitFileLevel() >= 2) { + RecordTick(db_statistics_, GET_HIT_L2_AND_UP); + } + return; + case GetContext::kDeleted: + // Use empty error message for speed + *status = Status::NotFound(); + return; + case GetContext::kCorrupt: + *status = Status::Corruption("corrupted key for ", user_key); + return; + case GetContext::kMerge: + break; + } + f = fp.GetNextFile(); + } + + if (GetContext::kMerge == get_context.State()) { + if (!merge_operator_) { + *status = Status::InvalidArgument( + "merge_operator is not properly initialized."); + return; + } + // merge_operands are in saver and we hit the beginning of the key history + // do a final merge of nullptr and operands; + std::string* str_value = value != nullptr ? value->GetSelf() : nullptr; + *status = MergeHelper::TimedFullMerge( + merge_operator_, user_key, nullptr, merge_context->GetOperands(), + str_value, info_log_, db_statistics_, env_, + nullptr /* result_operand */, true); + if (LIKELY(value != nullptr)) { + value->PinSelf(); + } + } else { + if (key_exists != nullptr) { + *key_exists = false; + } + *status = Status::NotFound(); // Use an empty error message for speed + } +} + +bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) { + // Reaching the bottom level implies misses at all upper levels, so we'll + // skip checking the filters when we predict a hit. + return cfd_->ioptions()->optimize_filters_for_hits && + (level > 0 || is_file_last_in_level) && + level == storage_info_.num_non_empty_levels() - 1; +} + +void VersionStorageInfo::GenerateLevelFilesBrief() { + level_files_brief_.resize(num_non_empty_levels_); + for (int level = 0; level < num_non_empty_levels_; level++) { + DoGenerateLevelFilesBrief( + &level_files_brief_[level], files_[level], &arena_); + } +} + +void Version::PrepareApply( + const MutableCFOptions& mutable_cf_options, + bool update_stats) { + UpdateAccumulatedStats(update_stats); + storage_info_.UpdateNumNonEmptyLevels(); + storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options); + storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri); + storage_info_.GenerateFileIndexer(); + storage_info_.GenerateLevelFilesBrief(); + storage_info_.GenerateLevel0NonOverlapping(); +} + +bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { + if (file_meta->init_stats_from_file || + file_meta->compensated_file_size > 0) { + return false; + } + std::shared_ptr<const TableProperties> tp; + Status s = GetTableProperties(&tp, file_meta); + file_meta->init_stats_from_file = true; + if (!s.ok()) { + ROCKS_LOG_ERROR(vset_->db_options_->info_log, + "Unable to load table properties for file %" PRIu64 + " --- %s\n", + file_meta->fd.GetNumber(), s.ToString().c_str()); + return false; + } + if (tp.get() == nullptr) return false; + file_meta->num_entries = tp->num_entries; + file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties); + file_meta->raw_value_size = tp->raw_value_size; + file_meta->raw_key_size = tp->raw_key_size; + + return true; +} + +void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) { + assert(file_meta->init_stats_from_file); + accumulated_file_size_ += file_meta->fd.GetFileSize(); + accumulated_raw_key_size_ += file_meta->raw_key_size; + accumulated_raw_value_size_ += file_meta->raw_value_size; + accumulated_num_non_deletions_ += + file_meta->num_entries - file_meta->num_deletions; + accumulated_num_deletions_ += file_meta->num_deletions; + + current_num_non_deletions_ += + file_meta->num_entries - file_meta->num_deletions; + current_num_deletions_ += file_meta->num_deletions; + current_num_samples_++; +} + +void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) { + if (file_meta->init_stats_from_file) { + current_num_non_deletions_ -= + file_meta->num_entries - file_meta->num_deletions; + current_num_deletions_ -= file_meta->num_deletions; + current_num_samples_--; + } +} + +void Version::UpdateAccumulatedStats(bool update_stats) { + if (update_stats) { + // maximum number of table properties loaded from files. + const int kMaxInitCount = 20; + int init_count = 0; + // here only the first kMaxInitCount files which haven't been + // initialized from file will be updated with num_deletions. + // The motivation here is to cap the maximum I/O per Version creation. + // The reason for choosing files from lower-level instead of higher-level + // is that such design is able to propagate the initialization from + // lower-level to higher-level: When the num_deletions of lower-level + // files are updated, it will make the lower-level files have accurate + // compensated_file_size, making lower-level to higher-level compaction + // will be triggered, which creates higher-level files whose num_deletions + // will be updated here. + for (int level = 0; + level < storage_info_.num_levels_ && init_count < kMaxInitCount; + ++level) { + for (auto* file_meta : storage_info_.files_[level]) { + if (MaybeInitializeFileMetaData(file_meta)) { + // each FileMeta will be initialized only once. + storage_info_.UpdateAccumulatedStats(file_meta); + // when option "max_open_files" is -1, all the file metadata has + // already been read, so MaybeInitializeFileMetaData() won't incur + // any I/O cost. "max_open_files=-1" means that the table cache passed + // to the VersionSet and then to the ColumnFamilySet has a size of + // TableCache::kInfiniteCapacity + if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() == + TableCache::kInfiniteCapacity) { + continue; + } + if (++init_count >= kMaxInitCount) { + break; + } + } + } + } + // In case all sampled-files contain only deletion entries, then we + // load the table-property of a file in higher-level to initialize + // that value. + for (int level = storage_info_.num_levels_ - 1; + storage_info_.accumulated_raw_value_size_ == 0 && level >= 0; + --level) { + for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1; + storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) { + if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) { + storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]); + } + } + } + } + + storage_info_.ComputeCompensatedSizes(); +} + +void VersionStorageInfo::ComputeCompensatedSizes() { + static const int kDeletionWeightOnCompaction = 2; + uint64_t average_value_size = GetAverageValueSize(); + + // compute the compensated size + for (int level = 0; level < num_levels_; level++) { + for (auto* file_meta : files_[level]) { + // Here we only compute compensated_file_size for those file_meta + // which compensated_file_size is uninitialized (== 0). This is true only + // for files that have been created right now and no other thread has + // access to them. That's why we can safely mutate compensated_file_size. + if (file_meta->compensated_file_size == 0) { + file_meta->compensated_file_size = file_meta->fd.GetFileSize(); + // Here we only boost the size of deletion entries of a file only + // when the number of deletion entries is greater than the number of + // non-deletion entries in the file. The motivation here is that in + // a stable workload, the number of deletion entries should be roughly + // equal to the number of non-deletion entries. If we compensate the + // size of deletion entries in a stable workload, the deletion + // compensation logic might introduce unwanted effet which changes the + // shape of LSM tree. + if (file_meta->num_deletions * 2 >= file_meta->num_entries) { + file_meta->compensated_file_size += + (file_meta->num_deletions * 2 - file_meta->num_entries) * + average_value_size * kDeletionWeightOnCompaction; + } + } + } + } +} + +int VersionStorageInfo::MaxInputLevel() const { + if (compaction_style_ == kCompactionStyleLevel) { + return num_levels() - 2; + } + return 0; +} + +int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const { + if (allow_ingest_behind) { + assert(num_levels() > 1); + return num_levels() - 2; + } + return num_levels() - 1; +} + +void VersionStorageInfo::EstimateCompactionBytesNeeded( + const MutableCFOptions& mutable_cf_options) { + // Only implemented for level-based compaction + if (compaction_style_ != kCompactionStyleLevel) { + estimated_compaction_needed_bytes_ = 0; + return; + } + + // Start from Level 0, if level 0 qualifies compaction to level 1, + // we estimate the size of compaction. + // Then we move on to the next level and see whether it qualifies compaction + // to the next level. The size of the level is estimated as the actual size + // on the level plus the input bytes from the previous level if there is any. + // If it exceeds, take the exceeded bytes as compaction input and add the size + // of the compaction size to tatal size. + // We keep doing it to Level 2, 3, etc, until the last level and return the + // accumulated bytes. + + uint64_t bytes_compact_to_next_level = 0; + uint64_t level_size = 0; + for (auto* f : files_[0]) { + level_size += f->fd.GetFileSize(); + } + // Level 0 + bool level0_compact_triggered = false; + if (static_cast<int>(files_[0].size()) >= + mutable_cf_options.level0_file_num_compaction_trigger || + level_size >= mutable_cf_options.max_bytes_for_level_base) { + level0_compact_triggered = true; + estimated_compaction_needed_bytes_ = level_size; + bytes_compact_to_next_level = level_size; + } else { + estimated_compaction_needed_bytes_ = 0; + } + + // Level 1 and up. + uint64_t bytes_next_level = 0; + for (int level = base_level(); level <= MaxInputLevel(); level++) { + level_size = 0; + if (bytes_next_level > 0) { +#ifndef NDEBUG + uint64_t level_size2 = 0; + for (auto* f : files_[level]) { + level_size2 += f->fd.GetFileSize(); + } + assert(level_size2 == bytes_next_level); +#endif + level_size = bytes_next_level; + bytes_next_level = 0; + } else { + for (auto* f : files_[level]) { + level_size += f->fd.GetFileSize(); + } + } + if (level == base_level() && level0_compact_triggered) { + // Add base level size to compaction if level0 compaction triggered. + estimated_compaction_needed_bytes_ += level_size; + } + // Add size added by previous compaction + level_size += bytes_compact_to_next_level; + bytes_compact_to_next_level = 0; + uint64_t level_target = MaxBytesForLevel(level); + if (level_size > level_target) { + bytes_compact_to_next_level = level_size - level_target; + // Estimate the actual compaction fan-out ratio as size ratio between + // the two levels. + + assert(bytes_next_level == 0); + if (level + 1 < num_levels_) { + for (auto* f : files_[level + 1]) { + bytes_next_level += f->fd.GetFileSize(); + } + } + if (bytes_next_level > 0) { + assert(level_size > 0); + estimated_compaction_needed_bytes_ += static_cast<uint64_t>( + static_cast<double>(bytes_compact_to_next_level) * + (static_cast<double>(bytes_next_level) / + static_cast<double>(level_size) + + 1)); + } + } + } +} + +namespace { +uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions, + const std::vector<FileMetaData*>& files) { + uint32_t ttl_expired_files_count = 0; + + int64_t _current_time; + auto status = ioptions.env->GetCurrentTime(&_current_time); + if (status.ok()) { + const uint64_t current_time = static_cast<uint64_t>(_current_time); + for (auto f : files) { + if (!f->being_compacted && f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time > 0 && + creation_time < + (current_time - ioptions.compaction_options_fifo.ttl)) { + ttl_expired_files_count++; + } + } + } + } + return ttl_expired_files_count; +} +} // anonymous namespace + +void VersionStorageInfo::ComputeCompactionScore( + const ImmutableCFOptions& immutable_cf_options, + const MutableCFOptions& mutable_cf_options) { + for (int level = 0; level <= MaxInputLevel(); level++) { + double score; + if (level == 0) { + // We treat level-0 specially by bounding the number of files + // instead of number of bytes for two reasons: + // + // (1) With larger write-buffer sizes, it is nice not to do too + // many level-0 compactions. + // + // (2) The files in level-0 are merged on every read and + // therefore we wish to avoid too many files when the individual + // file size is small (perhaps because of a small write-buffer + // setting, or very high compression ratios, or lots of + // overwrites/deletions). + int num_sorted_runs = 0; + uint64_t total_size = 0; + for (auto* f : files_[level]) { + if (!f->being_compacted) { + total_size += f->compensated_file_size; + num_sorted_runs++; + } + } + if (compaction_style_ == kCompactionStyleUniversal) { + // For universal compaction, we use level0 score to indicate + // compaction score for the whole DB. Adding other levels as if + // they are L0 files. + for (int i = 1; i < num_levels(); i++) { + if (!files_[i].empty() && !files_[i][0]->being_compacted) { + num_sorted_runs++; + } + } + } + + if (compaction_style_ == kCompactionStyleFIFO) { + score = + static_cast<double>(total_size) / + immutable_cf_options.compaction_options_fifo.max_table_files_size; + if (immutable_cf_options.compaction_options_fifo.allow_compaction) { + score = std::max( + static_cast<double>(num_sorted_runs) / + mutable_cf_options.level0_file_num_compaction_trigger, + score); + } + if (immutable_cf_options.compaction_options_fifo.ttl > 0) { + score = std::max(static_cast<double>(GetExpiredTtlFilesCount( + immutable_cf_options, files_[level])), + score); + } + + } else { + score = static_cast<double>(num_sorted_runs) / + mutable_cf_options.level0_file_num_compaction_trigger; + if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) { + // Level-based involves L0->L0 compactions that can lead to oversized + // L0 files. Take into account size as well to avoid later giant + // compactions to the base level. + score = std::max( + score, static_cast<double>(total_size) / + mutable_cf_options.max_bytes_for_level_base); + } + } + } else { + // Compute the ratio of current size to size limit. + uint64_t level_bytes_no_compacting = 0; + for (auto f : files_[level]) { + if (!f->being_compacted) { + level_bytes_no_compacting += f->compensated_file_size; + } + } + score = static_cast<double>(level_bytes_no_compacting) / + MaxBytesForLevel(level); + } + compaction_level_[level] = level; + compaction_score_[level] = score; + } + + // sort all the levels based on their score. Higher scores get listed + // first. Use bubble sort because the number of entries are small. + for (int i = 0; i < num_levels() - 2; i++) { + for (int j = i + 1; j < num_levels() - 1; j++) { + if (compaction_score_[i] < compaction_score_[j]) { + double score = compaction_score_[i]; + int level = compaction_level_[i]; + compaction_score_[i] = compaction_score_[j]; + compaction_level_[i] = compaction_level_[j]; + compaction_score_[j] = score; + compaction_level_[j] = level; + } + } + } + ComputeFilesMarkedForCompaction(); + EstimateCompactionBytesNeeded(mutable_cf_options); +} + +void VersionStorageInfo::ComputeFilesMarkedForCompaction() { + files_marked_for_compaction_.clear(); + int last_qualify_level = 0; + + // Do not include files from the last level with data + // If table properties collector suggests a file on the last level, + // we should not move it to a new level. + for (int level = num_levels() - 1; level >= 1; level--) { + if (!files_[level].empty()) { + last_qualify_level = level - 1; + break; + } + } + + for (int level = 0; level <= last_qualify_level; level++) { + for (auto* f : files_[level]) { + if (!f->being_compacted && f->marked_for_compaction) { + files_marked_for_compaction_.emplace_back(level, f); + } + } + } +} + +namespace { + +// used to sort files by size +struct Fsize { + size_t index; + FileMetaData* file; +}; + +// Compator that is used to sort files based on their size +// In normal mode: descending size +bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) { + return (first.file->compensated_file_size > + second.file->compensated_file_size); +} +} // anonymous namespace + +void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) { + auto* level_files = &files_[level]; + // Must not overlap +#ifndef NDEBUG + if (level > 0 && !level_files->empty() && + internal_comparator_->Compare( + (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) { + auto* f2 = (*level_files)[level_files->size() - 1]; + if (info_log != nullptr) { + Error(info_log, "Adding new file %" PRIu64 + " range (%s, %s) to level %d but overlapping " + "with existing file %" PRIu64 " %s %s", + f->fd.GetNumber(), f->smallest.DebugString(true).c_str(), + f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(), + f2->smallest.DebugString(true).c_str(), + f2->largest.DebugString(true).c_str()); + LogFlush(info_log); + } + assert(false); + } +#endif + f->refs++; + level_files->push_back(f); +} + +// Version::PrepareApply() need to be called before calling the function, or +// following functions called: +// 1. UpdateNumNonEmptyLevels(); +// 2. CalculateBaseBytes(); +// 3. UpdateFilesByCompactionPri(); +// 4. GenerateFileIndexer(); +// 5. GenerateLevelFilesBrief(); +// 6. GenerateLevel0NonOverlapping(); +void VersionStorageInfo::SetFinalized() { + finalized_ = true; +#ifndef NDEBUG + if (compaction_style_ != kCompactionStyleLevel) { + // Not level based compaction. + return; + } + assert(base_level_ < 0 || num_levels() == 1 || + (base_level_ >= 1 && base_level_ < num_levels())); + // Verify all levels newer than base_level are empty except L0 + for (int level = 1; level < base_level(); level++) { + assert(NumLevelBytes(level) == 0); + } + uint64_t max_bytes_prev_level = 0; + for (int level = base_level(); level < num_levels() - 1; level++) { + if (LevelFiles(level).size() == 0) { + continue; + } + assert(MaxBytesForLevel(level) >= max_bytes_prev_level); + max_bytes_prev_level = MaxBytesForLevel(level); + } + int num_empty_non_l0_level = 0; + for (int level = 0; level < num_levels(); level++) { + assert(LevelFiles(level).size() == 0 || + LevelFiles(level).size() == LevelFilesBrief(level).num_files); + if (level > 0 && NumLevelBytes(level) > 0) { + num_empty_non_l0_level++; + } + if (LevelFiles(level).size() > 0) { + assert(level < num_non_empty_levels()); + } + } + assert(compaction_level_.size() > 0); + assert(compaction_level_.size() == compaction_score_.size()); +#endif +} + +void VersionStorageInfo::UpdateNumNonEmptyLevels() { + num_non_empty_levels_ = num_levels_; + for (int i = num_levels_ - 1; i >= 0; i--) { + if (files_[i].size() != 0) { + return; + } else { + num_non_empty_levels_ = i; + } + } +} + +namespace { +// Sort `temp` based on ratio of overlapping size over file size +void SortFileByOverlappingRatio( + const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files, + const std::vector<FileMetaData*>& next_level_files, + std::vector<Fsize>* temp) { + std::unordered_map<uint64_t, uint64_t> file_to_order; + auto next_level_it = next_level_files.begin(); + + for (auto& file : files) { + uint64_t overlapping_bytes = 0; + // Skip files in next level that is smaller than current file + while (next_level_it != next_level_files.end() && + icmp.Compare((*next_level_it)->largest, file->smallest) < 0) { + next_level_it++; + } + + while (next_level_it != next_level_files.end() && + icmp.Compare((*next_level_it)->smallest, file->largest) < 0) { + overlapping_bytes += (*next_level_it)->fd.file_size; + + if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) { + // next level file cross large boundary of current file. + break; + } + next_level_it++; + } + + assert(file->fd.file_size != 0); + file_to_order[file->fd.GetNumber()] = + overlapping_bytes * 1024u / file->fd.file_size; + } + + std::sort(temp->begin(), temp->end(), + [&](const Fsize& f1, const Fsize& f2) -> bool { + return file_to_order[f1.file->fd.GetNumber()] < + file_to_order[f2.file->fd.GetNumber()]; + }); +} +} // namespace + +void VersionStorageInfo::UpdateFilesByCompactionPri( + CompactionPri compaction_pri) { + if (compaction_style_ == kCompactionStyleFIFO || + compaction_style_ == kCompactionStyleUniversal) { + // don't need this + return; + } + // No need to sort the highest level because it is never compacted. + for (int level = 0; level < num_levels() - 1; level++) { + const std::vector<FileMetaData*>& files = files_[level]; + auto& files_by_compaction_pri = files_by_compaction_pri_[level]; + assert(files_by_compaction_pri.size() == 0); + + // populate a temp vector for sorting based on size + std::vector<Fsize> temp(files.size()); + for (size_t i = 0; i < files.size(); i++) { + temp[i].index = i; + temp[i].file = files[i]; + } + + // sort the top number_of_files_to_sort_ based on file size + size_t num = VersionStorageInfo::kNumberFilesToSort; + if (num > temp.size()) { + num = temp.size(); + } + switch (compaction_pri) { + case kByCompensatedSize: + std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), + CompareCompensatedSizeDescending); + break; + case kOldestLargestSeqFirst: + std::sort(temp.begin(), temp.end(), + [](const Fsize& f1, const Fsize& f2) -> bool { + return f1.file->largest_seqno < f2.file->largest_seqno; + }); + break; + case kOldestSmallestSeqFirst: + std::sort(temp.begin(), temp.end(), + [](const Fsize& f1, const Fsize& f2) -> bool { + return f1.file->smallest_seqno < f2.file->smallest_seqno; + }); + break; + case kMinOverlappingRatio: + SortFileByOverlappingRatio(*internal_comparator_, files_[level], + files_[level + 1], &temp); + break; + default: + assert(false); + } + assert(temp.size() == files.size()); + + // initialize files_by_compaction_pri_ + for (size_t i = 0; i < temp.size(); i++) { + files_by_compaction_pri.push_back(static_cast<int>(temp[i].index)); + } + next_file_to_compact_by_size_[level] = 0; + assert(files_[level].size() == files_by_compaction_pri_[level].size()); + } +} + +void VersionStorageInfo::GenerateLevel0NonOverlapping() { + assert(!finalized_); + level0_non_overlapping_ = true; + if (level_files_brief_.size() == 0) { + return; + } + + // A copy of L0 files sorted by smallest key + std::vector<FdWithKeyRange> level0_sorted_file( + level_files_brief_[0].files, + level_files_brief_[0].files + level_files_brief_[0].num_files); + std::sort(level0_sorted_file.begin(), level0_sorted_file.end(), + [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool { + return (internal_comparator_->Compare(f1.smallest_key, + f2.smallest_key) < 0); + }); + + for (size_t i = 1; i < level0_sorted_file.size(); ++i) { + FdWithKeyRange& f = level0_sorted_file[i]; + FdWithKeyRange& prev = level0_sorted_file[i - 1]; + if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) { + level0_non_overlapping_ = false; + break; + } + } +} + +void Version::Ref() { + ++refs_; +} + +bool Version::Unref() { + assert(refs_ >= 1); + --refs_; + if (refs_ == 0) { + delete this; + return true; + } + return false; +} + +bool VersionStorageInfo::OverlapInLevel(int level, + const Slice* smallest_user_key, + const Slice* largest_user_key) { + if (level >= num_non_empty_levels_) { + // empty level, no overlap + return false; + } + return SomeFileOverlapsRange(*internal_comparator_, (level > 0), + level_files_brief_[level], smallest_user_key, + largest_user_key); +} + +// Store in "*inputs" all files in "level" that overlap [begin,end] +// If hint_index is specified, then it points to a file in the +// overlapping range. +// The file_index returns a pointer to any file in an overlapping range. +void VersionStorageInfo::GetOverlappingInputs( + int level, const InternalKey* begin, const InternalKey* end, + std::vector<FileMetaData*>* inputs, int hint_index, int* file_index, + bool expand_range) const { + if (level >= num_non_empty_levels_) { + // this level is empty, no overlapping inputs + return; + } + + inputs->clear(); + Slice user_begin, user_end; + if (begin != nullptr) { + user_begin = begin->user_key(); + } + if (end != nullptr) { + user_end = end->user_key(); + } + if (file_index) { + *file_index = -1; + } + const Comparator* user_cmp = user_comparator_; + if (begin != nullptr && end != nullptr && level > 0) { + GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs, + hint_index, file_index); + return; + } + + for (size_t i = 0; i < level_files_brief_[level].num_files; ) { + FdWithKeyRange* f = &(level_files_brief_[level].files[i++]); + const Slice file_start = ExtractUserKey(f->smallest_key); + const Slice file_limit = ExtractUserKey(f->largest_key); + if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) { + // "f" is completely before specified range; skip it + } else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) { + // "f" is completely after specified range; skip it + } else { + inputs->push_back(files_[level][i-1]); + if (level == 0 && expand_range) { + // Level-0 files may overlap each other. So check if the newly + // added file has expanded the range. If so, restart search. + if (begin != nullptr && user_cmp->Compare(file_start, user_begin) < 0) { + user_begin = file_start; + inputs->clear(); + i = 0; + } else if (end != nullptr + && user_cmp->Compare(file_limit, user_end) > 0) { + user_end = file_limit; + inputs->clear(); + i = 0; + } + } else if (file_index) { + *file_index = static_cast<int>(i) - 1; + } + } + } +} + +// Store in "*inputs" files in "level" that within range [begin,end] +// Guarantee a "clean cut" boundary between the files in inputs +// and the surrounding files and the maxinum number of files. +// This will ensure that no parts of a key are lost during compaction. +// If hint_index is specified, then it points to a file in the range. +// The file_index returns a pointer to any file in an overlapping range. +void VersionStorageInfo::GetCleanInputsWithinInterval( + int level, const InternalKey* begin, const InternalKey* end, + std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const { + if (level >= num_non_empty_levels_) { + // this level is empty, no inputs within range + return; + } + + inputs->clear(); + Slice user_begin, user_end; + if (begin != nullptr) { + user_begin = begin->user_key(); + } + if (end != nullptr) { + user_end = end->user_key(); + } + if (file_index) { + *file_index = -1; + } + if (begin != nullptr && end != nullptr && level > 0) { + GetOverlappingInputsRangeBinarySearch(level, user_begin, user_end, inputs, + hint_index, file_index, + true /* within_interval */); + } +} + +// Store in "*inputs" all files in "level" that overlap [begin,end] +// Employ binary search to find at least one file that overlaps the +// specified range. From that file, iterate backwards and +// forwards to find all overlapping files. +// if within_range is set, then only store the maximum clean inputs +// within range [begin, end]. "clean" means there is a boudnary +// between the files in "*inputs" and the surrounding files +void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch( + int level, const Slice& user_begin, const Slice& user_end, + std::vector<FileMetaData*>* inputs, int hint_index, int* file_index, + bool within_interval) const { + assert(level > 0); + int min = 0; + int mid = 0; + int max = static_cast<int>(files_[level].size()) - 1; + bool foundOverlap = false; + const Comparator* user_cmp = user_comparator_; + + // if the caller already knows the index of a file that has overlap, + // then we can skip the binary search. + if (hint_index != -1) { + mid = hint_index; + foundOverlap = true; + } + + while (!foundOverlap && min <= max) { + mid = (min + max)/2; + FdWithKeyRange* f = &(level_files_brief_[level].files[mid]); + const Slice file_start = ExtractUserKey(f->smallest_key); + const Slice file_limit = ExtractUserKey(f->largest_key); + if ((!within_interval && user_cmp->Compare(file_limit, user_begin) < 0) || + (within_interval && user_cmp->Compare(file_start, user_begin) < 0)) { + min = mid + 1; + } else if ((!within_interval && + user_cmp->Compare(user_end, file_start) < 0) || + (within_interval && + user_cmp->Compare(user_end, file_limit) < 0)) { + max = mid - 1; + } else { + foundOverlap = true; + break; + } + } + + // If there were no overlapping files, return immediately. + if (!foundOverlap) { + return; + } + // returns the index where an overlap is found + if (file_index) { + *file_index = mid; + } + + int start_index, end_index; + if (within_interval) { + ExtendFileRangeWithinInterval(level, user_begin, user_end, mid, &start_index, + &end_index); + } else { + ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid, + &start_index, &end_index); + } + assert(end_index >= start_index); + // insert overlapping files into vector + for (int i = start_index; i <= end_index; i++) { + inputs->push_back(files_[level][i]); + } +} + +// Store in *start_index and *end_index the range of all files in +// "level" that overlap [begin,end] +// The mid_index specifies the index of at least one file that +// overlaps the specified range. From that file, iterate backward +// and forward to find all overlapping files. +// Use FileLevel in searching, make it faster +void VersionStorageInfo::ExtendFileRangeOverlappingInterval( + int level, const Slice& user_begin, const Slice& user_end, + unsigned int mid_index, int* start_index, int* end_index) const { + const Comparator* user_cmp = user_comparator_; + const FdWithKeyRange* files = level_files_brief_[level].files; +#ifndef NDEBUG + { + // assert that the file at mid_index overlaps with the range + assert(mid_index < level_files_brief_[level].num_files); + const FdWithKeyRange* f = &files[mid_index]; + const Slice fstart = ExtractUserKey(f->smallest_key); + const Slice flimit = ExtractUserKey(f->largest_key); + if (user_cmp->Compare(fstart, user_begin) >= 0) { + assert(user_cmp->Compare(fstart, user_end) <= 0); + } else { + assert(user_cmp->Compare(flimit, user_begin) >= 0); + } + } +#endif + *start_index = mid_index + 1; + *end_index = mid_index; + int count __attribute__((unused)) = 0; + + // check backwards from 'mid' to lower indices + for (int i = mid_index; i >= 0 ; i--) { + const FdWithKeyRange* f = &files[i]; + const Slice file_limit = ExtractUserKey(f->largest_key); + if (user_cmp->Compare(file_limit, user_begin) >= 0) { + *start_index = i; + assert((count++, true)); + } else { + break; + } + } + // check forward from 'mid+1' to higher indices + for (unsigned int i = mid_index+1; + i < level_files_brief_[level].num_files; i++) { + const FdWithKeyRange* f = &files[i]; + const Slice file_start = ExtractUserKey(f->smallest_key); + if (user_cmp->Compare(file_start, user_end) <= 0) { + assert((count++, true)); + *end_index = i; + } else { + break; + } + } + assert(count == *end_index - *start_index + 1); +} + +// Store in *start_index and *end_index the clean range of all files in +// "level" within [begin,end] +// The mid_index specifies the index of at least one file within +// the specified range. From that file, iterate backward +// and forward to find all overlapping files and then "shrink" to +// the clean range required. +// Use FileLevel in searching, make it faster +void VersionStorageInfo::ExtendFileRangeWithinInterval( + int level, const Slice& user_begin, const Slice& user_end, + unsigned int mid_index, int* start_index, int* end_index) const { + assert(level != 0); + const Comparator* user_cmp = user_comparator_; + const FdWithKeyRange* files = level_files_brief_[level].files; +#ifndef NDEBUG + { + // assert that the file at mid_index is within the range + assert(mid_index < level_files_brief_[level].num_files); + const FdWithKeyRange* f = &files[mid_index]; + const Slice fstart = ExtractUserKey(f->smallest_key); + const Slice flimit = ExtractUserKey(f->largest_key); + assert(user_cmp->Compare(fstart, user_begin) >= 0 && + user_cmp->Compare(flimit, user_end) <= 0); + } +#endif + ExtendFileRangeOverlappingInterval(level, user_begin, user_end, mid_index, + start_index, end_index); + int left = *start_index; + int right = *end_index; + // shrink from left to right + while (left <= right) { + const Slice& first_key_in_range = ExtractUserKey(files[left].smallest_key); + if (user_cmp->Compare(first_key_in_range, user_begin) < 0) { + left++; + continue; + } + if (left > 0) { // If not first file + const Slice& last_key_before = + ExtractUserKey(files[left - 1].largest_key); + if (user_cmp->Equal(first_key_in_range, last_key_before)) { + // The first user key in range overlaps with the previous file's last + // key + left++; + continue; + } + } + break; + } + // shrink from right to left + while (left <= right) { + const Slice last_key_in_range = ExtractUserKey(files[right].largest_key); + if (user_cmp->Compare(last_key_in_range, user_end) > 0) { + right--; + continue; + } + if (right < static_cast<int>(level_files_brief_[level].num_files) - + 1) { // If not the last file + const Slice first_key_after = + ExtractUserKey(files[right + 1].smallest_key); + if (user_cmp->Equal(last_key_in_range, first_key_after)) { + // The last user key in range overlaps with the next file's first key + right--; + continue; + } + } + break; + } + + *start_index = left; + *end_index = right; +} + +uint64_t VersionStorageInfo::NumLevelBytes(int level) const { + assert(level >= 0); + assert(level < num_levels()); + return TotalFileSize(files_[level]); +} + +const char* VersionStorageInfo::LevelSummary( + LevelSummaryStorage* scratch) const { + int len = 0; + if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) { + assert(base_level_ < static_cast<int>(level_max_bytes_.size())); + len = snprintf(scratch->buffer, sizeof(scratch->buffer), + "base level %d max bytes base %" PRIu64 " ", base_level_, + level_max_bytes_[base_level_]); + } + len += + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files["); + for (int i = 0; i < num_levels(); i++) { + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size())); + if (ret < 0 || ret >= sz) break; + len += ret; + } + if (len > 0) { + // overwrite the last space + --len; + } + len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, + "] max score %.2f", compaction_score_[0]); + + if (!files_marked_for_compaction_.empty()) { + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, + " (%" ROCKSDB_PRIszt " files need compaction)", + files_marked_for_compaction_.size()); + } + + return scratch->buffer; +} + +const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch, + int level) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); + for (const auto& f : files_[level]) { + int sz = sizeof(scratch->buffer) - len; + char sztxt[16]; + AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt)); + int ret = snprintf(scratch->buffer + len, sz, + "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ", + f->fd.GetNumber(), f->smallest_seqno, sztxt, + static_cast<int>(f->being_compacted)); + if (ret < 0 || ret >= sz) + break; + len += ret; + } + // overwrite the last space (only if files_[level].size() is non-zero) + if (files_[level].size() && len > 0) { + --len; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + +int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() { + uint64_t result = 0; + std::vector<FileMetaData*> overlaps; + for (int level = 1; level < num_levels() - 1; level++) { + for (const auto& f : files_[level]) { + GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps); + const uint64_t sum = TotalFileSize(overlaps); + if (sum > result) { + result = sum; + } + } + } + return result; +} + +uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const { + // Note: the result for level zero is not really used since we set + // the level-0 compaction threshold based on number of files. + assert(level >= 0); + assert(level < static_cast<int>(level_max_bytes_.size())); + return level_max_bytes_[level]; +} + +void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions, + const MutableCFOptions& options) { + // Special logic to set number of sorted runs. + // It is to match the previous behavior when all files are in L0. + int num_l0_count = static_cast<int>(files_[0].size()); + if (compaction_style_ == kCompactionStyleUniversal) { + // For universal compaction, we use level0 score to indicate + // compaction score for the whole DB. Adding other levels as if + // they are L0 files. + for (int i = 1; i < num_levels(); i++) { + if (!files_[i].empty()) { + num_l0_count++; + } + } + } + set_l0_delay_trigger_count(num_l0_count); + + level_max_bytes_.resize(ioptions.num_levels); + if (!ioptions.level_compaction_dynamic_level_bytes) { + base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1; + + // Calculate for static bytes base case + for (int i = 0; i < ioptions.num_levels; ++i) { + if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) { + level_max_bytes_[i] = options.max_bytes_for_level_base; + } else if (i > 1) { + level_max_bytes_[i] = MultiplyCheckOverflow( + MultiplyCheckOverflow(level_max_bytes_[i - 1], + options.max_bytes_for_level_multiplier), + options.MaxBytesMultiplerAdditional(i - 1)); + } else { + level_max_bytes_[i] = options.max_bytes_for_level_base; + } + } + } else { + uint64_t max_level_size = 0; + + int first_non_empty_level = -1; + // Find size of non-L0 level of most data. + // Cannot use the size of the last level because it can be empty or less + // than previous levels after compaction. + for (int i = 1; i < num_levels_; i++) { + uint64_t total_size = 0; + for (const auto& f : files_[i]) { + total_size += f->fd.GetFileSize(); + } + if (total_size > 0 && first_non_empty_level == -1) { + first_non_empty_level = i; + } + if (total_size > max_level_size) { + max_level_size = total_size; + } + } + + // Prefill every level's max bytes to disallow compaction from there. + for (int i = 0; i < num_levels_; i++) { + level_max_bytes_[i] = std::numeric_limits<uint64_t>::max(); + } + + if (max_level_size == 0) { + // No data for L1 and up. L0 compacts to last level directly. + // No compaction from L1+ needs to be scheduled. + base_level_ = num_levels_ - 1; + } else { + uint64_t base_bytes_max = options.max_bytes_for_level_base; + uint64_t base_bytes_min = static_cast<uint64_t>( + base_bytes_max / options.max_bytes_for_level_multiplier); + + // Try whether we can make last level's target size to be max_level_size + uint64_t cur_level_size = max_level_size; + for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) { + // Round up after dividing + cur_level_size = static_cast<uint64_t>( + cur_level_size / options.max_bytes_for_level_multiplier); + } + + // Calculate base level and its size. + uint64_t base_level_size; + if (cur_level_size <= base_bytes_min) { + // Case 1. If we make target size of last level to be max_level_size, + // target size of the first non-empty level would be smaller than + // base_bytes_min. We set it be base_bytes_min. + base_level_size = base_bytes_min + 1U; + base_level_ = first_non_empty_level; + ROCKS_LOG_WARN(ioptions.info_log, + "More existing levels in DB than needed. " + "max_bytes_for_level_multiplier may not be guaranteed."); + } else { + // Find base level (where L0 data is compacted to). + base_level_ = first_non_empty_level; + while (base_level_ > 1 && cur_level_size > base_bytes_max) { + --base_level_; + cur_level_size = static_cast<uint64_t>( + cur_level_size / options.max_bytes_for_level_multiplier); + } + if (cur_level_size > base_bytes_max) { + // Even L1 will be too large + assert(base_level_ == 1); + base_level_size = base_bytes_max; + } else { + base_level_size = cur_level_size; + } + } + + uint64_t level_size = base_level_size; + for (int i = base_level_; i < num_levels_; i++) { + if (i > base_level_) { + level_size = MultiplyCheckOverflow( + level_size, options.max_bytes_for_level_multiplier); + } + // Don't set any level below base_bytes_max. Otherwise, the LSM can + // assume an hourglass shape where L1+ sizes are smaller than L0. This + // causes compaction scoring, which depends on level sizes, to favor L1+ + // at the expense of L0, which may fill up and stall. + level_max_bytes_[i] = std::max(level_size, base_bytes_max); + } + } + } +} + +uint64_t VersionStorageInfo::EstimateLiveDataSize() const { + // Estimate the live data size by adding up the size of the last level for all + // key ranges. Note: Estimate depends on the ordering of files in level 0 + // because files in level 0 can be overlapping. + uint64_t size = 0; + + auto ikey_lt = [this](InternalKey* x, InternalKey* y) { + return internal_comparator_->Compare(*x, *y) < 0; + }; + // (Ordered) map of largest keys in non-overlapping files + std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt); + + for (int l = num_levels_ - 1; l >= 0; l--) { + bool found_end = false; + for (auto file : files_[l]) { + // Find the first file where the largest key is larger than the smallest + // key of the current file. If this file does not overlap with the + // current file, none of the files in the map does. If there is + // no potential overlap, we can safely insert the rest of this level + // (if the level is not 0) into the map without checking again because + // the elements in the level are sorted and non-overlapping. + auto lb = (found_end && l != 0) ? + ranges.end() : ranges.lower_bound(&file->smallest); + found_end = (lb == ranges.end()); + if (found_end || internal_comparator_->Compare( + file->largest, (*lb).second->smallest) < 0) { + ranges.emplace_hint(lb, &file->largest, file); + size += file->fd.file_size; + } + } + } + return size; +} + + +void Version::AddLiveFiles(std::vector<FileDescriptor>* live) { + for (int level = 0; level < storage_info_.num_levels(); level++) { + const std::vector<FileMetaData*>& files = storage_info_.files_[level]; + for (const auto& file : files) { + live->push_back(file->fd); + } + } +} + +std::string Version::DebugString(bool hex, bool print_stats) const { + std::string r; + for (int level = 0; level < storage_info_.num_levels_; level++) { + // E.g., + // --- level 1 --- + // 17:123['a' .. 'd'] + // 20:43['e' .. 'g'] + // + // if print_stats=true: + // 17:123['a' .. 'd'](4096) + r.append("--- level "); + AppendNumberTo(&r, level); + r.append(" --- version# "); + AppendNumberTo(&r, version_number_); + r.append(" ---\n"); + const std::vector<FileMetaData*>& files = storage_info_.files_[level]; + for (size_t i = 0; i < files.size(); i++) { + r.push_back(' '); + AppendNumberTo(&r, files[i]->fd.GetNumber()); + r.push_back(':'); + AppendNumberTo(&r, files[i]->fd.GetFileSize()); + r.append("["); + r.append(files[i]->smallest.DebugString(hex)); + r.append(" .. "); + r.append(files[i]->largest.DebugString(hex)); + r.append("]"); + if (print_stats) { + r.append("("); + r.append(ToString( + files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed))); + r.append(")"); + } + r.append("\n"); + } + } + return r; +} + +// this is used to batch writes to the manifest file +struct VersionSet::ManifestWriter { + Status status; + bool done; + InstrumentedCondVar cv; + ColumnFamilyData* cfd; + const autovector<VersionEdit*>& edit_list; + + explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, + const autovector<VersionEdit*>& e) + : done(false), cv(mu), cfd(_cfd), edit_list(e) {} +}; + +VersionSet::VersionSet(const std::string& dbname, + const ImmutableDBOptions* db_options, + const EnvOptions& storage_options, Cache* table_cache, + WriteBufferManager* write_buffer_manager, + WriteController* write_controller) + : column_family_set_( + new ColumnFamilySet(dbname, db_options, storage_options, table_cache, + write_buffer_manager, write_controller)), + env_(db_options->env), + dbname_(dbname), + db_options_(db_options), + next_file_number_(2), + manifest_file_number_(0), // Filled by Recover() + pending_manifest_file_number_(0), + last_sequence_(0), + last_to_be_written_sequence_(0), + prev_log_number_(0), + current_version_number_(0), + manifest_file_size_(0), + env_options_(storage_options), + env_options_compactions_( + env_->OptimizeForCompactionTableRead(env_options_, *db_options_)) {} + +void CloseTables(void* ptr, size_t) { + TableReader* table_reader = reinterpret_cast<TableReader*>(ptr); + table_reader->Close(); +} + +VersionSet::~VersionSet() { + // we need to delete column_family_set_ because its destructor depends on + // VersionSet + Cache* table_cache = column_family_set_->get_table_cache(); + table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */); + column_family_set_.reset(); + for (auto file : obsolete_files_) { + if (file->table_reader_handle) { + table_cache->Release(file->table_reader_handle); + TableCache::Evict(table_cache, file->fd.GetNumber()); + } + delete file; + } + obsolete_files_.clear(); +} + +void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, + Version* v) { + // compute new compaction score + v->storage_info()->ComputeCompactionScore( + *column_family_data->ioptions(), + *column_family_data->GetLatestM
<TRUNCATED>
