http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/cache/lru_cache.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/cache/lru_cache.h b/thirdparty/rocksdb/cache/lru_cache.h new file mode 100644 index 0000000..2fd44bb --- /dev/null +++ b/thirdparty/rocksdb/cache/lru_cache.h @@ -0,0 +1,298 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include <string> + +#include "cache/sharded_cache.h" + +#include "port/port.h" +#include "util/autovector.h" + +namespace rocksdb { + +// LRU cache implementation + +// An entry is a variable length heap-allocated structure. +// Entries are referenced by cache and/or by any external entity. +// The cache keeps all its entries in table. Some elements +// are also stored on LRU list. +// +// LRUHandle can be in these states: +// 1. Referenced externally AND in hash table. +// In that case the entry is *not* in the LRU. (refs > 1 && in_cache == true) +// 2. Not referenced externally and in hash table. In that case the entry is +// in the LRU and can be freed. (refs == 1 && in_cache == true) +// 3. Referenced externally and not in hash table. In that case the entry is +// in not on LRU and not in table. (refs >= 1 && in_cache == false) +// +// All newly created LRUHandles are in state 1. If you call +// LRUCacheShard::Release +// on entry in state 1, it will go into state 2. To move from state 1 to +// state 3, either call LRUCacheShard::Erase or LRUCacheShard::Insert with the +// same key. +// To move from state 2 to state 1, use LRUCacheShard::Lookup. +// Before destruction, make sure that no handles are in state 1. This means +// that any successful LRUCacheShard::Lookup/LRUCacheShard::Insert have a +// matching +// RUCache::Release (to move into state 2) or LRUCacheShard::Erase (for state 3) + +struct LRUHandle { + void* value; + void (*deleter)(const Slice&, void* value); + LRUHandle* next_hash; + LRUHandle* next; + LRUHandle* prev; + size_t charge; // TODO(opt): Only allow uint32_t? + size_t key_length; + uint32_t refs; // a number of refs to this entry + // cache itself is counted as 1 + + // Include the following flags: + // in_cache: whether this entry is referenced by the hash table. + // is_high_pri: whether this entry is high priority entry. + // in_high_pro_pool: whether this entry is in high-pri pool. + char flags; + + uint32_t hash; // Hash of key(); used for fast sharding and comparisons + + char key_data[1]; // Beginning of key + + Slice key() const { + // For cheaper lookups, we allow a temporary Handle object + // to store a pointer to a key in "value". + if (next == this) { + return *(reinterpret_cast<Slice*>(value)); + } else { + return Slice(key_data, key_length); + } + } + + bool InCache() { return flags & 1; } + bool IsHighPri() { return flags & 2; } + bool InHighPriPool() { return flags & 4; } + + void SetInCache(bool in_cache) { + if (in_cache) { + flags |= 1; + } else { + flags &= ~1; + } + } + + void SetPriority(Cache::Priority priority) { + if (priority == Cache::Priority::HIGH) { + flags |= 2; + } else { + flags &= ~2; + } + } + + void SetInHighPriPool(bool in_high_pri_pool) { + if (in_high_pri_pool) { + flags |= 4; + } else { + flags &= ~4; + } + } + + void Free() { + assert((refs == 1 && InCache()) || (refs == 0 && !InCache())); + if (deleter) { + (*deleter)(key(), value); + } + delete[] reinterpret_cast<char*>(this); + } +}; + +// We provide our own simple hash table since it removes a whole bunch +// of porting hacks and is also faster than some of the built-in hash +// table implementations in some of the compiler/runtime combinations +// we have tested. E.g., readrandom speeds up by ~5% over the g++ +// 4.4.3's builtin hashtable. +class LRUHandleTable { + public: + LRUHandleTable(); + ~LRUHandleTable(); + + LRUHandle* Lookup(const Slice& key, uint32_t hash); + LRUHandle* Insert(LRUHandle* h); + LRUHandle* Remove(const Slice& key, uint32_t hash); + + template <typename T> + void ApplyToAllCacheEntries(T func) { + for (uint32_t i = 0; i < length_; i++) { + LRUHandle* h = list_[i]; + while (h != nullptr) { + auto n = h->next_hash; + assert(h->InCache()); + func(h); + h = n; + } + } + } + + private: + // Return a pointer to slot that points to a cache entry that + // matches key/hash. If there is no such cache entry, return a + // pointer to the trailing slot in the corresponding linked list. + LRUHandle** FindPointer(const Slice& key, uint32_t hash); + + void Resize(); + + // The table consists of an array of buckets where each bucket is + // a linked list of cache entries that hash into the bucket. + LRUHandle** list_; + uint32_t length_; + uint32_t elems_; +}; + +// A single shard of sharded cache. +class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard : public CacheShard { + public: + LRUCacheShard(); + virtual ~LRUCacheShard(); + + // Separate from constructor so caller can easily make an array of LRUCache + // if current usage is more than new capacity, the function will attempt to + // free the needed space + virtual void SetCapacity(size_t capacity) override; + + // Set the flag to reject insertion if cache if full. + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + // Set percentage of capacity reserved for high-pri cache entries. + void SetHighPriorityPoolRatio(double high_pri_pool_ratio); + + // Like Cache methods, but with an extra "hash" parameter. + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle, + Cache::Priority priority) override; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + virtual bool Ref(Cache::Handle* handle) override; + virtual bool Release(Cache::Handle* handle, + bool force_erase = false) override; + virtual void Erase(const Slice& key, uint32_t hash) override; + + // Although in some platforms the update of size_t is atomic, to make sure + // GetUsage() and GetPinnedUsage() work correctly under any platform, we'll + // protect them with mutex_. + + virtual size_t GetUsage() const override; + virtual size_t GetPinnedUsage() const override; + + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + + virtual void EraseUnRefEntries() override; + + virtual std::string GetPrintableOptions() const override; + + void TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri); + + // Retrieves number of elements in LRU, for unit test purpose only + // not threadsafe + size_t TEST_GetLRUSize(); + + // Overloading to aligned it to cache line size + void* operator new(size_t); + + void operator delete(void *); + + private: + void LRU_Remove(LRUHandle* e); + void LRU_Insert(LRUHandle* e); + + // Overflow the last entry in high-pri pool to low-pri pool until size of + // high-pri pool is no larger than the size specify by high_pri_pool_pct. + void MaintainPoolSize(); + + // Just reduce the reference count by 1. + // Return true if last reference + bool Unref(LRUHandle* e); + + // Free some space following strict LRU policy until enough space + // to hold (usage_ + charge) is freed or the lru list is empty + // This function is not thread safe - it needs to be executed while + // holding the mutex_ + void EvictFromLRU(size_t charge, autovector<LRUHandle*>* deleted); + + // Initialized before use. + size_t capacity_; + + // Memory size for entries in high-pri pool. + size_t high_pri_pool_usage_; + + // Whether to reject insertion if cache reaches its full capacity. + bool strict_capacity_limit_; + + // Ratio of capacity reserved for high priority cache entries. + double high_pri_pool_ratio_; + + // High-pri pool size, equals to capacity * high_pri_pool_ratio. + // Remember the value to avoid recomputing each time. + double high_pri_pool_capacity_; + + // Dummy head of LRU list. + // lru.prev is newest entry, lru.next is oldest entry. + // LRU contains items which can be evicted, ie reference only by cache + LRUHandle lru_; + + // Pointer to head of low-pri pool in LRU list. + LRUHandle* lru_low_pri_; + + // ------------^^^^^^^^^^^^^----------- + // Not frequently modified data members + // ------------------------------------ + // + // We separate data members that are updated frequently from the ones that + // are not frequently updated so that they don't share the same cache line + // which will lead into false cache sharing + // + // ------------------------------------ + // Frequently modified data members + // ------------vvvvvvvvvvvvv----------- + LRUHandleTable table_; + + // Memory size for entries residing in the cache + size_t usage_; + + // Memory size for entries residing only in the LRU list + size_t lru_usage_; + + // mutex_ protects the following state. + // We don't count mutex_ as the cache's internal state so semantically we + // don't mind mutex_ invoking the non-const actions. + mutable port::Mutex mutex_; +}; + +class LRUCache : public ShardedCache { + public: + LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio); + virtual ~LRUCache(); + virtual const char* Name() const override { return "LRUCache"; } + virtual CacheShard* GetShard(int shard) override; + virtual const CacheShard* GetShard(int shard) const override; + virtual void* Value(Handle* handle) override; + virtual size_t GetCharge(Handle* handle) const override; + virtual uint32_t GetHash(Handle* handle) const override; + virtual void DisownData() override; + + // Retrieves number of elements in LRU, for unit test purpose only + size_t TEST_GetLRUSize(); + + private: + LRUCacheShard* shards_; + int num_shards_ = 0; +}; + +} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/cache/sharded_cache.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/cache/sharded_cache.cc b/thirdparty/rocksdb/cache/sharded_cache.cc new file mode 100644 index 0000000..9bdea3a --- /dev/null +++ b/thirdparty/rocksdb/cache/sharded_cache.cc @@ -0,0 +1,161 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "cache/sharded_cache.h" + +#include <string> + +#include "util/mutexlock.h" + +namespace rocksdb { + +ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, + bool strict_capacity_limit) + : num_shard_bits_(num_shard_bits), + capacity_(capacity), + strict_capacity_limit_(strict_capacity_limit), + last_id_(1) {} + +void ShardedCache::SetCapacity(size_t capacity) { + int num_shards = 1 << num_shard_bits_; + const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; + MutexLock l(&capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetCapacity(per_shard); + } + capacity_ = capacity; +} + +void ShardedCache::SetStrictCapacityLimit(bool strict_capacity_limit) { + int num_shards = 1 << num_shard_bits_; + MutexLock l(&capacity_mutex_); + for (int s = 0; s < num_shards; s++) { + GetShard(s)->SetStrictCapacityLimit(strict_capacity_limit); + } + strict_capacity_limit_ = strict_capacity_limit; +} + +Status ShardedCache::Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle, Priority priority) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Insert(key, hash, value, charge, deleter, handle, priority); +} + +Cache::Handle* ShardedCache::Lookup(const Slice& key, Statistics* stats) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash))->Lookup(key, hash); +} + +bool ShardedCache::Ref(Handle* handle) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Ref(handle); +} + +bool ShardedCache::Release(Handle* handle, bool force_erase) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Release(handle, force_erase); +} + +void ShardedCache::Erase(const Slice& key) { + uint32_t hash = HashSlice(key); + GetShard(Shard(hash))->Erase(key, hash); +} + +uint64_t ShardedCache::NewId() { + return last_id_.fetch_add(1, std::memory_order_relaxed); +} + +size_t ShardedCache::GetCapacity() const { + MutexLock l(&capacity_mutex_); + return capacity_; +} + +bool ShardedCache::HasStrictCapacityLimit() const { + MutexLock l(&capacity_mutex_); + return strict_capacity_limit_; +} + +size_t ShardedCache::GetUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetUsage(); + } + return usage; +} + +size_t ShardedCache::GetUsage(Handle* handle) const { + return GetCharge(handle); +} + +size_t ShardedCache::GetPinnedUsage() const { + // We will not lock the cache when getting the usage from shards. + int num_shards = 1 << num_shard_bits_; + size_t usage = 0; + for (int s = 0; s < num_shards; s++) { + usage += GetShard(s)->GetPinnedUsage(); + } + return usage; +} + +void ShardedCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->ApplyToAllCacheEntries(callback, thread_safe); + } +} + +void ShardedCache::EraseUnRefEntries() { + int num_shards = 1 << num_shard_bits_; + for (int s = 0; s < num_shards; s++) { + GetShard(s)->EraseUnRefEntries(); + } +} + +std::string ShardedCache::GetPrintableOptions() const { + std::string ret; + ret.reserve(20000); + const int kBufferSize = 200; + char buffer[kBufferSize]; + { + MutexLock l(&capacity_mutex_); + snprintf(buffer, kBufferSize, " capacity : %" ROCKSDB_PRIszt "\n", + capacity_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " num_shard_bits : %d\n", num_shard_bits_); + ret.append(buffer); + snprintf(buffer, kBufferSize, " strict_capacity_limit : %d\n", + strict_capacity_limit_); + ret.append(buffer); + } + ret.append(GetShard(0)->GetPrintableOptions()); + return ret; +} +int GetDefaultCacheShardBits(size_t capacity) { + int num_shard_bits = 0; + size_t min_shard_size = 512L * 1024L; // Every shard is at least 512KB. + size_t num_shards = capacity / min_shard_size; + while (num_shards >>= 1) { + if (++num_shard_bits >= 6) { + // No more than 6. + return num_shard_bits; + } + } + return num_shard_bits; +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/cache/sharded_cache.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/cache/sharded_cache.h b/thirdparty/rocksdb/cache/sharded_cache.h new file mode 100644 index 0000000..4f9dea2 --- /dev/null +++ b/thirdparty/rocksdb/cache/sharded_cache.h @@ -0,0 +1,102 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include <atomic> +#include <string> + +#include "port/port.h" +#include "rocksdb/cache.h" +#include "util/hash.h" + +namespace rocksdb { + +// Single cache shard interface. +class CacheShard { + public: + CacheShard() = default; + virtual ~CacheShard() = default; + + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + size_t charge, + void (*deleter)(const Slice& key, void* value), + Cache::Handle** handle, Cache::Priority priority) = 0; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0; + virtual bool Ref(Cache::Handle* handle) = 0; + virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0; + virtual void Erase(const Slice& key, uint32_t hash) = 0; + virtual void SetCapacity(size_t capacity) = 0; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; + virtual size_t GetUsage() const = 0; + virtual size_t GetPinnedUsage() const = 0; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) = 0; + virtual void EraseUnRefEntries() = 0; + virtual std::string GetPrintableOptions() const { return ""; } +}; + +// Generic cache interface which shards cache by hash of keys. 2^num_shard_bits +// shards will be created, with capacity split evenly to each of the shards. +// Keys are sharded by the highest num_shard_bits bits of hash value. +class ShardedCache : public Cache { + public: + ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit); + virtual ~ShardedCache() = default; + virtual const char* Name() const override = 0; + virtual CacheShard* GetShard(int shard) = 0; + virtual const CacheShard* GetShard(int shard) const = 0; + virtual void* Value(Handle* handle) override = 0; + virtual size_t GetCharge(Handle* handle) const = 0; + virtual uint32_t GetHash(Handle* handle) const = 0; + virtual void DisownData() override = 0; + + virtual void SetCapacity(size_t capacity) override; + virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; + + virtual Status Insert(const Slice& key, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + Handle** handle, Priority priority) override; + virtual Handle* Lookup(const Slice& key, Statistics* stats) override; + virtual bool Ref(Handle* handle) override; + virtual bool Release(Handle* handle, bool force_erase = false) override; + virtual void Erase(const Slice& key) override; + virtual uint64_t NewId() override; + virtual size_t GetCapacity() const override; + virtual bool HasStrictCapacityLimit() const override; + virtual size_t GetUsage() const override; + virtual size_t GetUsage(Handle* handle) const override; + virtual size_t GetPinnedUsage() const override; + virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), + bool thread_safe) override; + virtual void EraseUnRefEntries() override; + virtual std::string GetPrintableOptions() const override; + + int GetNumShardBits() const { return num_shard_bits_; } + + private: + static inline uint32_t HashSlice(const Slice& s) { + return Hash(s.data(), s.size(), 0); + } + + uint32_t Shard(uint32_t hash) { + // Note, hash >> 32 yields hash in gcc, not the zero we expect! + return (num_shard_bits_ > 0) ? (hash >> (32 - num_shard_bits_)) : 0; + } + + int num_shard_bits_; + mutable port::Mutex capacity_mutex_; + size_t capacity_; + bool strict_capacity_limit_; + std::atomic<uint64_t> last_id_; +}; + +extern int GetDefaultCacheShardBits(size_t capacity); + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/coverage/parse_gcov_output.py ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/coverage/parse_gcov_output.py b/thirdparty/rocksdb/coverage/parse_gcov_output.py new file mode 100644 index 0000000..72e8b07 --- /dev/null +++ b/thirdparty/rocksdb/coverage/parse_gcov_output.py @@ -0,0 +1,118 @@ +import optparse +import re +import sys + +from optparse import OptionParser + +# the gcov report follows certain pattern. Each file will have two lines +# of report, from which we can extract the file name, total lines and coverage +# percentage. +def parse_gcov_report(gcov_input): + per_file_coverage = {} + total_coverage = None + + for line in sys.stdin: + line = line.strip() + + # --First line of the coverage report (with file name in it)? + match_obj = re.match("^File '(.*)'$", line) + if match_obj: + # fetch the file name from the first line of the report. + current_file = match_obj.group(1) + continue + + # -- Second line of the file report (with coverage percentage) + match_obj = re.match("^Lines executed:(.*)% of (.*)", line) + + if match_obj: + coverage = float(match_obj.group(1)) + lines = int(match_obj.group(2)) + + if current_file is not None: + per_file_coverage[current_file] = (coverage, lines) + current_file = None + else: + # If current_file is not set, we reach the last line of report, + # which contains the summarized coverage percentage. + total_coverage = (coverage, lines) + continue + + # If the line's pattern doesn't fall into the above categories. We + # can simply ignore them since they're either empty line or doesn't + # find executable lines of the given file. + current_file = None + + return per_file_coverage, total_coverage + +def get_option_parser(): + usage = "Parse the gcov output and generate more human-readable code " +\ + "coverage report." + parser = OptionParser(usage) + + parser.add_option( + "--interested-files", "-i", + dest="filenames", + help="Comma separated files names. if specified, we will display " + + "the coverage report only for interested source files. " + + "Otherwise we will display the coverage report for all " + + "source files." + ) + return parser + +def display_file_coverage(per_file_coverage, total_coverage): + # To print out auto-adjustable column, we need to know the longest + # length of file names. + max_file_name_length = max( + len(fname) for fname in per_file_coverage.keys() + ) + + # -- Print header + # size of separator is determined by 3 column sizes: + # file name, coverage percentage and lines. + header_template = \ + "%" + str(max_file_name_length) + "s\t%s\t%s" + separator = "-" * (max_file_name_length + 10 + 20) + print header_template % ("Filename", "Coverage", "Lines") + print separator + + # -- Print body + # template for printing coverage report for each file. + record_template = "%" + str(max_file_name_length) + "s\t%5.2f%%\t%10d" + + for fname, coverage_info in per_file_coverage.items(): + coverage, lines = coverage_info + print record_template % (fname, coverage, lines) + + # -- Print footer + if total_coverage: + print separator + print record_template % ("Total", total_coverage[0], total_coverage[1]) + +def report_coverage(): + parser = get_option_parser() + (options, args) = parser.parse_args() + + interested_files = set() + if options.filenames is not None: + interested_files = set(f.strip() for f in options.filenames.split(',')) + + # To make things simple, right now we only read gcov report from the input + per_file_coverage, total_coverage = parse_gcov_report(sys.stdin) + + # Check if we need to display coverage info for interested files. + if len(interested_files): + per_file_coverage = dict( + (fname, per_file_coverage[fname]) for fname in interested_files + if fname in per_file_coverage + ) + # If we only interested in several files, it makes no sense to report + # the total_coverage + total_coverage = None + + if not len(per_file_coverage): + print >> sys.stderr, "Cannot find coverage info for the given files." + return + display_file_coverage(per_file_coverage, total_coverage) + +if __name__ == "__main__": + report_coverage() http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/builder.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/builder.cc b/thirdparty/rocksdb/db/builder.cc new file mode 100644 index 0000000..6f973fd --- /dev/null +++ b/thirdparty/rocksdb/db/builder.cc @@ -0,0 +1,228 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/builder.h" + +#include <algorithm> +#include <deque> +#include <vector> + +#include "db/compaction_iterator.h" +#include "db/dbformat.h" +#include "db/event_helpers.h" +#include "db/internal_stats.h" +#include "db/merge_helper.h" +#include "db/table_cache.h" +#include "db/version_edit.h" +#include "monitoring/iostats_context_imp.h" +#include "monitoring/thread_status_util.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "rocksdb/table.h" +#include "table/block_based_table_builder.h" +#include "table/internal_iterator.h" +#include "util/file_reader_writer.h" +#include "util/filename.h" +#include "util/stop_watch.h" +#include "util/sync_point.h" + +namespace rocksdb { + +class TableFactory; + +TableBuilder* NewTableBuilder( + const ImmutableCFOptions& ioptions, + const InternalKeyComparator& internal_comparator, + const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* + int_tbl_prop_collector_factories, + uint32_t column_family_id, const std::string& column_family_name, + WritableFileWriter* file, const CompressionType compression_type, + const CompressionOptions& compression_opts, int level, + const std::string* compression_dict, const bool skip_filters, + const uint64_t creation_time) { + assert((column_family_id == + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == + column_family_name.empty()); + return ioptions.table_factory->NewTableBuilder( + TableBuilderOptions(ioptions, internal_comparator, + int_tbl_prop_collector_factories, compression_type, + compression_opts, compression_dict, skip_filters, + column_family_name, level, creation_time), + column_family_id, file); +} + +Status BuildTable( + const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, + TableCache* table_cache, InternalIterator* iter, + std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta, + const InternalKeyComparator& internal_comparator, + const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* + int_tbl_prop_collector_factories, + uint32_t column_family_id, const std::string& column_family_name, + std::vector<SequenceNumber> snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const CompressionType compression, + const CompressionOptions& compression_opts, bool paranoid_file_checks, + InternalStats* internal_stats, TableFileCreationReason reason, + EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, + TableProperties* table_properties, int level, + const uint64_t creation_time) { + assert((column_family_id == + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == + column_family_name.empty()); + // Reports the IOStats for flush for every following bytes. + const size_t kReportFlushIOStatsEvery = 1048576; + Status s; + meta->fd.file_size = 0; + iter->SeekToFirst(); + std::unique_ptr<RangeDelAggregator> range_del_agg( + new RangeDelAggregator(internal_comparator, snapshots)); + s = range_del_agg->AddTombstones(std::move(range_del_iter)); + if (!s.ok()) { + // may be non-ok if a range tombstone key is unparsable + return s; + } + + std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), + meta->fd.GetPathId()); +#ifndef ROCKSDB_LITE + EventHelpers::NotifyTableFileCreationStarted( + ioptions.listeners, dbname, column_family_name, fname, job_id, reason); +#endif // !ROCKSDB_LITE + TableProperties tp; + + if (iter->Valid() || range_del_agg->ShouldAddTombstones()) { + TableBuilder* builder; + unique_ptr<WritableFileWriter> file_writer; + { + unique_ptr<WritableFile> file; +#ifndef NDEBUG + bool use_direct_writes = env_options.use_direct_writes; + TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes); +#endif // !NDEBUG + s = NewWritableFile(env, fname, &file, env_options); + if (!s.ok()) { + EventHelpers::LogAndNotifyTableFileCreationFinished( + event_logger, ioptions.listeners, dbname, column_family_name, fname, + job_id, meta->fd, tp, reason, s); + return s; + } + file->SetIOPriority(io_priority); + + file_writer.reset(new WritableFileWriter(std::move(file), env_options, + ioptions.statistics)); + + builder = NewTableBuilder( + ioptions, internal_comparator, int_tbl_prop_collector_factories, + column_family_id, column_family_name, file_writer.get(), compression, + compression_opts, level, nullptr /* compression_dict */, + false /* skip_filters */, creation_time); + } + + MergeHelper merge(env, internal_comparator.user_comparator(), + ioptions.merge_operator, nullptr, ioptions.info_log, + true /* internal key corruption is not ok */, + snapshots.empty() ? 0 : snapshots.back()); + + CompactionIterator c_iter( + iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber, + &snapshots, earliest_write_conflict_snapshot, env, + true /* internal key corruption is not ok */, range_del_agg.get()); + c_iter.SeekToFirst(); + for (; c_iter.Valid(); c_iter.Next()) { + const Slice& key = c_iter.key(); + const Slice& value = c_iter.value(); + builder->Add(key, value); + meta->UpdateBoundaries(key, c_iter.ikey().sequence); + + // TODO(noetzli): Update stats after flush, too. + if (io_priority == Env::IO_HIGH && + IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { + ThreadStatusUtil::SetThreadOperationProperty( + ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); + } + } + // nullptr for table_{min,max} so all range tombstones will be flushed + range_del_agg->AddToBuilder(builder, nullptr /* lower_bound */, + nullptr /* upper_bound */, meta); + + // Finish and check for builder errors + bool empty = builder->NumEntries() == 0; + s = c_iter.status(); + if (!s.ok() || empty) { + builder->Abandon(); + } else { + s = builder->Finish(); + } + + if (s.ok() && !empty) { + uint64_t file_size = builder->FileSize(); + meta->fd.file_size = file_size; + meta->marked_for_compaction = builder->NeedCompact(); + assert(meta->fd.GetFileSize() > 0); + tp = builder->GetTableProperties(); + if (table_properties) { + *table_properties = tp; + } + } + delete builder; + + // Finish and check for file errors + if (s.ok() && !empty) { + StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS); + s = file_writer->Sync(ioptions.use_fsync); + } + if (s.ok() && !empty) { + s = file_writer->Close(); + } + + if (s.ok() && !empty) { + // Verify that the table is usable + // We set for_compaction to false and don't OptimizeForCompactionTableRead + // here because this is a special case after we finish the table building + // No matter whether use_direct_io_for_flush_and_compaction is true, + // we will regrad this verification as user reads since the goal is + // to cache it here for further user reads + std::unique_ptr<InternalIterator> it(table_cache->NewIterator( + ReadOptions(), env_options, internal_comparator, meta->fd, + nullptr /* range_del_agg */, nullptr, + (internal_stats == nullptr) ? nullptr + : internal_stats->GetFileReadHist(0), + false /* for_compaction */, nullptr /* arena */, + false /* skip_filter */, level)); + s = it->status(); + if (s.ok() && paranoid_file_checks) { + for (it->SeekToFirst(); it->Valid(); it->Next()) { + } + s = it->status(); + } + } + } + + // Check for input iterator errors + if (!iter->status().ok()) { + s = iter->status(); + } + + if (!s.ok() || meta->fd.GetFileSize() == 0) { + env->DeleteFile(fname); + } + + // Output to event logger and fire events. + EventHelpers::LogAndNotifyTableFileCreationFinished( + event_logger, ioptions.listeners, dbname, column_family_name, fname, + job_id, meta->fd, tp, reason, s); + + return s; +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/builder.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/builder.h b/thirdparty/rocksdb/db/builder.h new file mode 100644 index 0000000..a432a75 --- /dev/null +++ b/thirdparty/rocksdb/db/builder.h @@ -0,0 +1,82 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include <string> +#include <utility> +#include <vector> +#include "db/table_properties_collector.h" +#include "options/cf_options.h" +#include "rocksdb/comparator.h" +#include "rocksdb/env.h" +#include "rocksdb/listener.h" +#include "rocksdb/options.h" +#include "rocksdb/status.h" +#include "rocksdb/table_properties.h" +#include "rocksdb/types.h" +#include "table/scoped_arena_iterator.h" +#include "util/event_logger.h" + +namespace rocksdb { + +struct Options; +struct FileMetaData; + +class Env; +struct EnvOptions; +class Iterator; +class TableCache; +class VersionEdit; +class TableBuilder; +class WritableFileWriter; +class InternalStats; +class InternalIterator; + +// @param column_family_name Name of the column family that is also identified +// by column_family_id, or empty string if unknown. It must outlive the +// TableBuilder returned by this function. +// @param compression_dict Data for presetting the compression library's +// dictionary, or nullptr. +TableBuilder* NewTableBuilder( + const ImmutableCFOptions& options, + const InternalKeyComparator& internal_comparator, + const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* + int_tbl_prop_collector_factories, + uint32_t column_family_id, const std::string& column_family_name, + WritableFileWriter* file, const CompressionType compression_type, + const CompressionOptions& compression_opts, int level, + const std::string* compression_dict = nullptr, + const bool skip_filters = false, const uint64_t creation_time = 0); + +// Build a Table file from the contents of *iter. The generated file +// will be named according to number specified in meta. On success, the rest of +// *meta will be filled with metadata about the generated table. +// If no data is present in *iter, meta->file_size will be set to +// zero, and no Table file will be produced. +// +// @param column_family_name Name of the column family that is also identified +// by column_family_id, or empty string if unknown. +extern Status BuildTable( + const std::string& dbname, Env* env, const ImmutableCFOptions& options, + const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, + TableCache* table_cache, InternalIterator* iter, + std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta, + const InternalKeyComparator& internal_comparator, + const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>* + int_tbl_prop_collector_factories, + uint32_t column_family_id, const std::string& column_family_name, + std::vector<SequenceNumber> snapshots, + SequenceNumber earliest_write_conflict_snapshot, + const CompressionType compression, + const CompressionOptions& compression_opts, bool paranoid_file_checks, + InternalStats* internal_stats, TableFileCreationReason reason, + EventLogger* event_logger = nullptr, int job_id = 0, + const Env::IOPriority io_priority = Env::IO_HIGH, + TableProperties* table_properties = nullptr, int level = -1, + const uint64_t creation_time = 0); + +} // namespace rocksdb
