http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/dbformat.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/dbformat.h b/thirdparty/rocksdb/db/dbformat.h new file mode 100644 index 0000000..d9fd5f3 --- /dev/null +++ b/thirdparty/rocksdb/db/dbformat.h @@ -0,0 +1,596 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include <stdio.h> +#include <string> +#include <utility> +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/table.h" +#include "rocksdb/types.h" +#include "util/coding.h" +#include "util/logging.h" + +namespace rocksdb { + +class InternalKey; + +// Value types encoded as the last component of internal keys. +// DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk +// data structures. +// The highest bit of the value type needs to be reserved to SST tables +// for them to do more flexible encoding. +enum ValueType : unsigned char { + kTypeDeletion = 0x0, + kTypeValue = 0x1, + kTypeMerge = 0x2, + kTypeLogData = 0x3, // WAL only. + kTypeColumnFamilyDeletion = 0x4, // WAL only. + kTypeColumnFamilyValue = 0x5, // WAL only. + kTypeColumnFamilyMerge = 0x6, // WAL only. + kTypeSingleDeletion = 0x7, + kTypeColumnFamilySingleDeletion = 0x8, // WAL only. + kTypeBeginPrepareXID = 0x9, // WAL only. + kTypeEndPrepareXID = 0xA, // WAL only. + kTypeCommitXID = 0xB, // WAL only. + kTypeRollbackXID = 0xC, // WAL only. + kTypeNoop = 0xD, // WAL only. + kTypeColumnFamilyRangeDeletion = 0xE, // WAL only. + kTypeRangeDeletion = 0xF, // meta block + kMaxValue = 0x7F // Not used for storing records. +}; + +// Defined in dbformat.cc +extern const ValueType kValueTypeForSeek; +extern const ValueType kValueTypeForSeekForPrev; + +// Checks whether a type is an inline value type +// (i.e. a type used in memtable skiplist and sst file datablock). +inline bool IsValueType(ValueType t) { + return t <= kTypeMerge || t == kTypeSingleDeletion; +} + +// Checks whether a type is from user operation +// kTypeRangeDeletion is in meta block so this API is separated from above +inline bool IsExtendedValueType(ValueType t) { + return IsValueType(t) || t == kTypeRangeDeletion; +} + +// We leave eight bits empty at the bottom so a type and sequence# +// can be packed together into 64-bits. +static const SequenceNumber kMaxSequenceNumber = + ((0x1ull << 56) - 1); + +static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64; + +struct ParsedInternalKey { + Slice user_key; + SequenceNumber sequence; + ValueType type; + + ParsedInternalKey() + : sequence(kMaxSequenceNumber) // Make code analyzer happy + {} // Intentionally left uninitialized (for speed) + ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t) + : user_key(u), sequence(seq), type(t) { } + std::string DebugString(bool hex = false) const; + + void clear() { + user_key.clear(); + sequence = 0; + type = kTypeDeletion; + } +}; + +// Return the length of the encoding of "key". +inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) { + return key.user_key.size() + 8; +} + +// Pack a sequence number and a ValueType into a uint64_t +extern uint64_t PackSequenceAndType(uint64_t seq, ValueType t); + +// Given the result of PackSequenceAndType, store the sequence number in *seq +// and the ValueType in *t. +extern void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t); + +// Append the serialization of "key" to *result. +extern void AppendInternalKey(std::string* result, + const ParsedInternalKey& key); +// Serialized internal key consists of user key followed by footer. +// This function appends the footer to *result, assuming that *result already +// contains the user key at the end. +extern void AppendInternalKeyFooter(std::string* result, SequenceNumber s, + ValueType t); + +// Attempt to parse an internal key from "internal_key". On success, +// stores the parsed data in "*result", and returns true. +// +// On error, returns false, leaves "*result" in an undefined state. +extern bool ParseInternalKey(const Slice& internal_key, + ParsedInternalKey* result); + +// Returns the user key portion of an internal key. +inline Slice ExtractUserKey(const Slice& internal_key) { + assert(internal_key.size() >= 8); + return Slice(internal_key.data(), internal_key.size() - 8); +} + +inline ValueType ExtractValueType(const Slice& internal_key) { + assert(internal_key.size() >= 8); + const size_t n = internal_key.size(); + uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + unsigned char c = num & 0xff; + return static_cast<ValueType>(c); +} + +// A comparator for internal keys that uses a specified comparator for +// the user key portion and breaks ties by decreasing sequence number. +class InternalKeyComparator : public Comparator { + private: + const Comparator* user_comparator_; + std::string name_; + public: + explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c), + name_("rocksdb.InternalKeyComparator:" + + std::string(user_comparator_->Name())) { + } + virtual ~InternalKeyComparator() {} + + virtual const char* Name() const override; + virtual int Compare(const Slice& a, const Slice& b) const override; + virtual void FindShortestSeparator(std::string* start, + const Slice& limit) const override; + virtual void FindShortSuccessor(std::string* key) const override; + + const Comparator* user_comparator() const { return user_comparator_; } + + int Compare(const InternalKey& a, const InternalKey& b) const; + int Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const; + virtual const Comparator* GetRootComparator() const override { + return user_comparator_->GetRootComparator(); + } +}; + +// Modules in this directory should keep internal keys wrapped inside +// the following class instead of plain strings so that we do not +// incorrectly use string comparisons instead of an InternalKeyComparator. +class InternalKey { + private: + std::string rep_; + public: + InternalKey() { } // Leave rep_ as empty to indicate it is invalid + InternalKey(const Slice& _user_key, SequenceNumber s, ValueType t) { + AppendInternalKey(&rep_, ParsedInternalKey(_user_key, s, t)); + } + + // sets the internal key to be bigger or equal to all internal keys with this + // user key + void SetMaxPossibleForUserKey(const Slice& _user_key) { + AppendInternalKey(&rep_, ParsedInternalKey(_user_key, kMaxSequenceNumber, + kValueTypeForSeek)); + } + + // sets the internal key to be smaller or equal to all internal keys with this + // user key + void SetMinPossibleForUserKey(const Slice& _user_key) { + AppendInternalKey( + &rep_, ParsedInternalKey(_user_key, 0, static_cast<ValueType>(0))); + } + + bool Valid() const { + ParsedInternalKey parsed; + return ParseInternalKey(Slice(rep_), &parsed); + } + + void DecodeFrom(const Slice& s) { rep_.assign(s.data(), s.size()); } + Slice Encode() const { + assert(!rep_.empty()); + return rep_; + } + + Slice user_key() const { return ExtractUserKey(rep_); } + size_t size() { return rep_.size(); } + + void Set(const Slice& _user_key, SequenceNumber s, ValueType t) { + SetFrom(ParsedInternalKey(_user_key, s, t)); + } + + void SetFrom(const ParsedInternalKey& p) { + rep_.clear(); + AppendInternalKey(&rep_, p); + } + + void Clear() { rep_.clear(); } + + // The underlying representation. + // Intended only to be used together with ConvertFromUserKey(). + std::string* rep() { return &rep_; } + + // Assuming that *rep() contains a user key, this method makes internal key + // out of it in-place. This saves a memcpy compared to Set()/SetFrom(). + void ConvertFromUserKey(SequenceNumber s, ValueType t) { + AppendInternalKeyFooter(&rep_, s, t); + } + + std::string DebugString(bool hex = false) const; +}; + +inline int InternalKeyComparator::Compare( + const InternalKey& a, const InternalKey& b) const { + return Compare(a.Encode(), b.Encode()); +} + +inline bool ParseInternalKey(const Slice& internal_key, + ParsedInternalKey* result) { + const size_t n = internal_key.size(); + if (n < 8) return false; + uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + unsigned char c = num & 0xff; + result->sequence = num >> 8; + result->type = static_cast<ValueType>(c); + assert(result->type <= ValueType::kMaxValue); + result->user_key = Slice(internal_key.data(), n - 8); + return IsExtendedValueType(result->type); +} + +// Update the sequence number in the internal key. +// Guarantees not to invalidate ikey.data(). +inline void UpdateInternalKey(std::string* ikey, uint64_t seq, ValueType t) { + size_t ikey_sz = ikey->size(); + assert(ikey_sz >= 8); + uint64_t newval = (seq << 8) | t; + + // Note: Since C++11, strings are guaranteed to be stored contiguously and + // string::operator[]() is guaranteed not to change ikey.data(). + EncodeFixed64(&(*ikey)[ikey_sz - 8], newval); +} + +// Get the sequence number from the internal key +inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { + const size_t n = internal_key.size(); + assert(n >= 8); + uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + return num >> 8; +} + + +// A helper class useful for DBImpl::Get() +class LookupKey { + public: + // Initialize *this for looking up user_key at a snapshot with + // the specified sequence number. + LookupKey(const Slice& _user_key, SequenceNumber sequence); + + ~LookupKey(); + + // Return a key suitable for lookup in a MemTable. + Slice memtable_key() const { + return Slice(start_, static_cast<size_t>(end_ - start_)); + } + + // Return an internal key (suitable for passing to an internal iterator) + Slice internal_key() const { + return Slice(kstart_, static_cast<size_t>(end_ - kstart_)); + } + + // Return the user key + Slice user_key() const { + return Slice(kstart_, static_cast<size_t>(end_ - kstart_ - 8)); + } + + private: + // We construct a char array of the form: + // klength varint32 <-- start_ + // userkey char[klength] <-- kstart_ + // tag uint64 + // <-- end_ + // The array is a suitable MemTable key. + // The suffix starting with "userkey" can be used as an InternalKey. + const char* start_; + const char* kstart_; + const char* end_; + char space_[200]; // Avoid allocation for short keys + + // No copying allowed + LookupKey(const LookupKey&); + void operator=(const LookupKey&); +}; + +inline LookupKey::~LookupKey() { + if (start_ != space_) delete[] start_; +} + +class IterKey { + public: + IterKey() + : buf_(space_), + buf_size_(sizeof(space_)), + key_(buf_), + key_size_(0), + is_user_key_(true) {} + + ~IterKey() { ResetBuffer(); } + + Slice GetInternalKey() const { + assert(!IsUserKey()); + return Slice(key_, key_size_); + } + + Slice GetUserKey() const { + if (IsUserKey()) { + return Slice(key_, key_size_); + } else { + assert(key_size_ >= 8); + return Slice(key_, key_size_ - 8); + } + } + + size_t Size() const { return key_size_; } + + void Clear() { key_size_ = 0; } + + // Append "non_shared_data" to its back, from "shared_len" + // This function is used in Block::Iter::ParseNextKey + // shared_len: bytes in [0, shard_len-1] would be remained + // non_shared_data: data to be append, its length must be >= non_shared_len + void TrimAppend(const size_t shared_len, const char* non_shared_data, + const size_t non_shared_len) { + assert(shared_len <= key_size_); + size_t total_size = shared_len + non_shared_len; + + if (IsKeyPinned() /* key is not in buf_ */) { + // Copy the key from external memory to buf_ (copy shared_len bytes) + EnlargeBufferIfNeeded(total_size); + memcpy(buf_, key_, shared_len); + } else if (total_size > buf_size_) { + // Need to allocate space, delete previous space + char* p = new char[total_size]; + memcpy(p, key_, shared_len); + + if (buf_ != space_) { + delete[] buf_; + } + + buf_ = p; + buf_size_ = total_size; + } + + memcpy(buf_ + shared_len, non_shared_data, non_shared_len); + key_ = buf_; + key_size_ = total_size; + } + + Slice SetUserKey(const Slice& key, bool copy = true) { + is_user_key_ = true; + return SetKeyImpl(key, copy); + } + + Slice SetInternalKey(const Slice& key, bool copy = true) { + is_user_key_ = false; + return SetKeyImpl(key, copy); + } + + // Copies the content of key, updates the reference to the user key in ikey + // and returns a Slice referencing the new copy. + Slice SetInternalKey(const Slice& key, ParsedInternalKey* ikey) { + size_t key_n = key.size(); + assert(key_n >= 8); + SetInternalKey(key); + ikey->user_key = Slice(key_, key_n - 8); + return Slice(key_, key_n); + } + + // Copy the key into IterKey own buf_ + void OwnKey() { + assert(IsKeyPinned() == true); + + Reserve(key_size_); + memcpy(buf_, key_, key_size_); + key_ = buf_; + } + + // Update the sequence number in the internal key. Guarantees not to + // invalidate slices to the key (and the user key). + void UpdateInternalKey(uint64_t seq, ValueType t) { + assert(!IsKeyPinned()); + assert(key_size_ >= 8); + uint64_t newval = (seq << 8) | t; + EncodeFixed64(&buf_[key_size_ - 8], newval); + } + + bool IsKeyPinned() const { return (key_ != buf_); } + + void SetInternalKey(const Slice& key_prefix, const Slice& user_key, + SequenceNumber s, + ValueType value_type = kValueTypeForSeek) { + size_t psize = key_prefix.size(); + size_t usize = user_key.size(); + EnlargeBufferIfNeeded(psize + usize + sizeof(uint64_t)); + if (psize > 0) { + memcpy(buf_, key_prefix.data(), psize); + } + memcpy(buf_ + psize, user_key.data(), usize); + EncodeFixed64(buf_ + usize + psize, PackSequenceAndType(s, value_type)); + + key_ = buf_; + key_size_ = psize + usize + sizeof(uint64_t); + is_user_key_ = false; + } + + void SetInternalKey(const Slice& user_key, SequenceNumber s, + ValueType value_type = kValueTypeForSeek) { + SetInternalKey(Slice(), user_key, s, value_type); + } + + void Reserve(size_t size) { + EnlargeBufferIfNeeded(size); + key_size_ = size; + } + + void SetInternalKey(const ParsedInternalKey& parsed_key) { + SetInternalKey(Slice(), parsed_key); + } + + void SetInternalKey(const Slice& key_prefix, + const ParsedInternalKey& parsed_key_suffix) { + SetInternalKey(key_prefix, parsed_key_suffix.user_key, + parsed_key_suffix.sequence, parsed_key_suffix.type); + } + + void EncodeLengthPrefixedKey(const Slice& key) { + auto size = key.size(); + EnlargeBufferIfNeeded(size + static_cast<size_t>(VarintLength(size))); + char* ptr = EncodeVarint32(buf_, static_cast<uint32_t>(size)); + memcpy(ptr, key.data(), size); + key_ = buf_; + is_user_key_ = true; + } + + bool IsUserKey() const { return is_user_key_; } + + private: + char* buf_; + size_t buf_size_; + const char* key_; + size_t key_size_; + char space_[32]; // Avoid allocation for short keys + bool is_user_key_; + + Slice SetKeyImpl(const Slice& key, bool copy) { + size_t size = key.size(); + if (copy) { + // Copy key to buf_ + EnlargeBufferIfNeeded(size); + memcpy(buf_, key.data(), size); + key_ = buf_; + } else { + // Update key_ to point to external memory + key_ = key.data(); + } + key_size_ = size; + return Slice(key_, key_size_); + } + + void ResetBuffer() { + if (buf_ != space_) { + delete[] buf_; + buf_ = space_; + } + buf_size_ = sizeof(space_); + key_size_ = 0; + } + + // Enlarge the buffer size if needed based on key_size. + // By default, static allocated buffer is used. Once there is a key + // larger than the static allocated buffer, another buffer is dynamically + // allocated, until a larger key buffer is requested. In that case, we + // reallocate buffer and delete the old one. + void EnlargeBufferIfNeeded(size_t key_size) { + // If size is smaller than buffer size, continue using current buffer, + // or the static allocated one, as default + if (key_size > buf_size_) { + // Need to enlarge the buffer. + ResetBuffer(); + buf_ = new char[key_size]; + buf_size_ = key_size; + } + } + + // No copying allowed + IterKey(const IterKey&) = delete; + void operator=(const IterKey&) = delete; +}; + +class InternalKeySliceTransform : public SliceTransform { + public: + explicit InternalKeySliceTransform(const SliceTransform* transform) + : transform_(transform) {} + + virtual const char* Name() const override { return transform_->Name(); } + + virtual Slice Transform(const Slice& src) const override { + auto user_key = ExtractUserKey(src); + return transform_->Transform(user_key); + } + + virtual bool InDomain(const Slice& src) const override { + auto user_key = ExtractUserKey(src); + return transform_->InDomain(user_key); + } + + virtual bool InRange(const Slice& dst) const override { + auto user_key = ExtractUserKey(dst); + return transform_->InRange(user_key); + } + + const SliceTransform* user_prefix_extractor() const { return transform_; } + + private: + // Like comparator, InternalKeySliceTransform will not take care of the + // deletion of transform_ + const SliceTransform* const transform_; +}; + +// Read the key of a record from a write batch. +// if this record represent the default column family then cf_record +// must be passed as false, otherwise it must be passed as true. +extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, + bool cf_record); + +// Read record from a write batch piece from input. +// tag, column_family, key, value and blob are return values. Callers own the +// Slice they point to. +// Tag is defined as ValueType. +// input will be advanced to after the record. +extern Status ReadRecordFromWriteBatch(Slice* input, char* tag, + uint32_t* column_family, Slice* key, + Slice* value, Slice* blob, Slice* xid); + +// When user call DeleteRange() to delete a range of keys, +// we will store a serialized RangeTombstone in MemTable and SST. +// the struct here is a easy-understood form +// start/end_key_ is the start/end user key of the range to be deleted +struct RangeTombstone { + Slice start_key_; + Slice end_key_; + SequenceNumber seq_; + RangeTombstone() = default; + RangeTombstone(Slice sk, Slice ek, SequenceNumber sn) + : start_key_(sk), end_key_(ek), seq_(sn) {} + + RangeTombstone(ParsedInternalKey parsed_key, Slice value) { + start_key_ = parsed_key.user_key; + seq_ = parsed_key.sequence; + end_key_ = value; + } + + // be careful to use Serialize(), allocates new memory + std::pair<InternalKey, Slice> Serialize() const { + auto key = InternalKey(start_key_, seq_, kTypeRangeDeletion); + Slice value = end_key_; + return std::make_pair(std::move(key), std::move(value)); + } + + // be careful to use SerializeKey(), allocates new memory + InternalKey SerializeKey() const { + return InternalKey(start_key_, seq_, kTypeRangeDeletion); + } + + // be careful to use SerializeEndKey(), allocates new memory + InternalKey SerializeEndKey() const { + return InternalKey(end_key_, seq_, kTypeRangeDeletion); + } +}; + +} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/event_helpers.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/event_helpers.cc b/thirdparty/rocksdb/db/event_helpers.cc new file mode 100644 index 0000000..1b79acb --- /dev/null +++ b/thirdparty/rocksdb/db/event_helpers.cc @@ -0,0 +1,155 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/event_helpers.h" + +namespace rocksdb { + +namespace { +template<class T> +inline T SafeDivide(T a, T b) { + return b == 0 ? 0 : a / b; +} +} // namespace + +void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) { + *jwriter << "time_micros" + << std::chrono::duration_cast<std::chrono::microseconds>( + std::chrono::system_clock::now().time_since_epoch()).count(); +} + +#ifndef ROCKSDB_LITE +void EventHelpers::NotifyTableFileCreationStarted( + const std::vector<std::shared_ptr<EventListener>>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, TableFileCreationReason reason) { + TableFileCreationBriefInfo info; + info.db_name = db_name; + info.cf_name = cf_name; + info.file_path = file_path; + info.job_id = job_id; + info.reason = reason; + for (auto& listener : listeners) { + listener->OnTableFileCreationStarted(info); + } +} +#endif // !ROCKSDB_LITE + +void EventHelpers::NotifyOnBackgroundError( + const std::vector<std::shared_ptr<EventListener>>& listeners, + BackgroundErrorReason reason, Status* bg_error, + InstrumentedMutex* db_mutex) { +#ifndef ROCKSDB_LITE + if (listeners.size() == 0U) { + return; + } + db_mutex->AssertHeld(); + // release lock while notifying events + db_mutex->Unlock(); + for (auto& listener : listeners) { + listener->OnBackgroundError(reason, bg_error); + } + db_mutex->Lock(); +#endif // ROCKSDB_LITE +} + +void EventHelpers::LogAndNotifyTableFileCreationFinished( + EventLogger* event_logger, + const std::vector<std::shared_ptr<EventListener>>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, const FileDescriptor& fd, + const TableProperties& table_properties, TableFileCreationReason reason, + const Status& s) { + if (s.ok() && event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + jwriter << "cf_name" << cf_name << "job" << job_id << "event" + << "table_file_creation" + << "file_number" << fd.GetNumber() << "file_size" + << fd.GetFileSize(); + + // table_properties + { + jwriter << "table_properties"; + jwriter.StartObject(); + + // basic properties: + jwriter << "data_size" << table_properties.data_size << "index_size" + << table_properties.index_size << "filter_size" + << table_properties.filter_size << "raw_key_size" + << table_properties.raw_key_size << "raw_average_key_size" + << SafeDivide(table_properties.raw_key_size, + table_properties.num_entries) + << "raw_value_size" << table_properties.raw_value_size + << "raw_average_value_size" + << SafeDivide(table_properties.raw_value_size, + table_properties.num_entries) + << "num_data_blocks" << table_properties.num_data_blocks + << "num_entries" << table_properties.num_entries + << "filter_policy_name" << table_properties.filter_policy_name; + + // user collected properties + for (const auto& prop : table_properties.readable_properties) { + jwriter << prop.first << prop.second; + } + jwriter.EndObject(); + } + jwriter.EndObject(); + + event_logger->Log(jwriter); + } + +#ifndef ROCKSDB_LITE + if (listeners.size() == 0) { + return; + } + TableFileCreationInfo info; + info.db_name = db_name; + info.cf_name = cf_name; + info.file_path = file_path; + info.file_size = fd.file_size; + info.job_id = job_id; + info.table_properties = table_properties; + info.reason = reason; + info.status = s; + for (auto& listener : listeners) { + listener->OnTableFileCreated(info); + } +#endif // !ROCKSDB_LITE +} + +void EventHelpers::LogAndNotifyTableFileDeletion( + EventLogger* event_logger, int job_id, + uint64_t file_number, const std::string& file_path, + const Status& status, const std::string& dbname, + const std::vector<std::shared_ptr<EventListener>>& listeners) { + + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + + jwriter << "job" << job_id + << "event" << "table_file_deletion" + << "file_number" << file_number; + if (!status.ok()) { + jwriter << "status" << status.ToString(); + } + + jwriter.EndObject(); + + event_logger->Log(jwriter); + +#ifndef ROCKSDB_LITE + TableFileDeletionInfo info; + info.db_name = dbname; + info.job_id = job_id; + info.file_path = file_path; + info.status = status; + for (auto& listener : listeners) { + listener->OnTableFileDeleted(info); + } +#endif // !ROCKSDB_LITE +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/event_helpers.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/event_helpers.h b/thirdparty/rocksdb/db/event_helpers.h new file mode 100644 index 0000000..674e6c5 --- /dev/null +++ b/thirdparty/rocksdb/db/event_helpers.h @@ -0,0 +1,52 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include <memory> +#include <string> +#include <vector> + +#include "db/column_family.h" +#include "db/version_edit.h" +#include "rocksdb/listener.h" +#include "rocksdb/table_properties.h" +#include "util/event_logger.h" + +namespace rocksdb { + +class EventHelpers { + public: + static void AppendCurrentTime(JSONWriter* json_writer); +#ifndef ROCKSDB_LITE + static void NotifyTableFileCreationStarted( + const std::vector<std::shared_ptr<EventListener>>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, TableFileCreationReason reason); +#endif // !ROCKSDB_LITE + static void NotifyOnBackgroundError( + const std::vector<std::shared_ptr<EventListener>>& listeners, + BackgroundErrorReason reason, Status* bg_error, + InstrumentedMutex* db_mutex); + static void LogAndNotifyTableFileCreationFinished( + EventLogger* event_logger, + const std::vector<std::shared_ptr<EventListener>>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, const FileDescriptor& fd, + const TableProperties& table_properties, TableFileCreationReason reason, + const Status& s); + static void LogAndNotifyTableFileDeletion( + EventLogger* event_logger, int job_id, + uint64_t file_number, const std::string& file_path, + const Status& status, const std::string& db_name, + const std::vector<std::shared_ptr<EventListener>>& listeners); + + private: + static void LogAndNotifyTableFileCreation( + EventLogger* event_logger, + const std::vector<std::shared_ptr<EventListener>>& listeners, + const FileDescriptor& fd, const TableFileCreationInfo& info); +}; + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/experimental.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/experimental.cc b/thirdparty/rocksdb/db/experimental.cc new file mode 100644 index 0000000..effe9d7 --- /dev/null +++ b/thirdparty/rocksdb/db/experimental.cc @@ -0,0 +1,49 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "rocksdb/experimental.h" + +#include "db/db_impl.h" + +namespace rocksdb { +namespace experimental { + +#ifndef ROCKSDB_LITE + +Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + if (db == nullptr) { + return Status::InvalidArgument("DB is empty"); + } + + return db->SuggestCompactRange(column_family, begin, end); +} + +Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) { + if (db == nullptr) { + return Status::InvalidArgument("Didn't recognize DB object"); + } + return db->PromoteL0(column_family, target_level); +} + +#else // ROCKSDB_LITE + +Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + return Status::NotSupported("Not supported in RocksDB LITE"); +} + +Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) { + return Status::NotSupported("Not supported in RocksDB LITE"); +} + +#endif // ROCKSDB_LITE + +Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) { + return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end); +} + +} // namespace experimental +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/external_sst_file_ingestion_job.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/external_sst_file_ingestion_job.cc b/thirdparty/rocksdb/db/external_sst_file_ingestion_job.cc new file mode 100644 index 0000000..58fa354 --- /dev/null +++ b/thirdparty/rocksdb/db/external_sst_file_ingestion_job.cc @@ -0,0 +1,665 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "db/external_sst_file_ingestion_job.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include <inttypes.h> +#include <algorithm> +#include <string> +#include <vector> + +#include "db/version_edit.h" +#include "table/merging_iterator.h" +#include "table/scoped_arena_iterator.h" +#include "table/sst_file_writer_collectors.h" +#include "table/table_builder.h" +#include "util/file_reader_writer.h" +#include "util/file_util.h" +#include "util/stop_watch.h" +#include "util/sync_point.h" + +namespace rocksdb { + +Status ExternalSstFileIngestionJob::Prepare( + const std::vector<std::string>& external_files_paths) { + Status status; + + // Read the information of files we are ingesting + for (const std::string& file_path : external_files_paths) { + IngestedFileInfo file_to_ingest; + status = GetIngestedFileInfo(file_path, &file_to_ingest); + if (!status.ok()) { + return status; + } + files_to_ingest_.push_back(file_to_ingest); + } + + for (const IngestedFileInfo& f : files_to_ingest_) { + if (f.cf_id != + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily && + f.cf_id != cfd_->GetID()) { + return Status::InvalidArgument( + "External file column family id dont match"); + } + } + + const Comparator* ucmp = cfd_->internal_comparator().user_comparator(); + auto num_files = files_to_ingest_.size(); + if (num_files == 0) { + return Status::InvalidArgument("The list of files is empty"); + } else if (num_files > 1) { + // Verify that passed files dont have overlapping ranges + autovector<const IngestedFileInfo*> sorted_files; + for (size_t i = 0; i < num_files; i++) { + sorted_files.push_back(&files_to_ingest_[i]); + } + + std::sort( + sorted_files.begin(), sorted_files.end(), + [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) { + return ucmp->Compare(info1->smallest_user_key, + info2->smallest_user_key) < 0; + }); + + for (size_t i = 0; i < num_files - 1; i++) { + if (ucmp->Compare(sorted_files[i]->largest_user_key, + sorted_files[i + 1]->smallest_user_key) >= 0) { + return Status::NotSupported("Files have overlapping ranges"); + } + } + } + + for (IngestedFileInfo& f : files_to_ingest_) { + if (f.num_entries == 0) { + return Status::InvalidArgument("File contain no entries"); + } + + if (!f.smallest_internal_key().Valid() || + !f.largest_internal_key().Valid()) { + return Status::Corruption("Generated table have corrupted keys"); + } + } + + // Copy/Move external files into DB + for (IngestedFileInfo& f : files_to_ingest_) { + f.fd = FileDescriptor(versions_->NewFileNumber(), 0, f.file_size); + + const std::string path_outside_db = f.external_file_path; + const std::string path_inside_db = + TableFileName(db_options_.db_paths, f.fd.GetNumber(), f.fd.GetPathId()); + + if (ingestion_options_.move_files) { + status = env_->LinkFile(path_outside_db, path_inside_db); + if (status.IsNotSupported()) { + // Original file is on a different FS, use copy instead of hard linking + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); + } + } else { + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); + } + TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); + if (!status.ok()) { + break; + } + f.internal_file_path = path_inside_db; + } + + if (!status.ok()) { + // We failed, remove all files that we copied into the db + for (IngestedFileInfo& f : files_to_ingest_) { + if (f.internal_file_path == "") { + break; + } + Status s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } + + return status; +} + +Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) { + SuperVersion* super_version = cfd_->GetSuperVersion(); + Status status = + IngestedFilesOverlapWithMemtables(super_version, flush_needed); + + if (status.ok() && *flush_needed && + !ingestion_options_.allow_blocking_flush) { + status = Status::InvalidArgument("External file requires flush"); + } + return status; +} + +// REQUIRES: we have become the only writer by entering both write_thread_ and +// nonmem_write_thread_ +Status ExternalSstFileIngestionJob::Run() { + Status status; +#ifndef NDEBUG + // We should never run the job with a memtable that is overlapping + // with the files we are ingesting + bool need_flush = false; + status = NeedsFlush(&need_flush); + assert(status.ok() && need_flush == false); +#endif + + bool consumed_seqno = false; + bool force_global_seqno = false; + + if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) { + // We need to assign a global sequence number to all the files even + // if the dont overlap with any ranges since we have snapshots + force_global_seqno = true; + } + // It is safe to use this instead of LastToBeWrittenSequence since we are + // the only active writer, and hence they are equal + const SequenceNumber last_seqno = versions_->LastSequence(); + SuperVersion* super_version = cfd_->GetSuperVersion(); + edit_.SetColumnFamily(cfd_->GetID()); + // The levels that the files will be ingested into + + for (IngestedFileInfo& f : files_to_ingest_) { + SequenceNumber assigned_seqno = 0; + if (ingestion_options_.ingest_behind) { + status = CheckLevelForIngestedBehindFile(&f); + } else { + status = AssignLevelAndSeqnoForIngestedFile( + super_version, force_global_seqno, cfd_->ioptions()->compaction_style, + &f, &assigned_seqno); + } + if (!status.ok()) { + return status; + } + status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); + TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", + &assigned_seqno); + if (assigned_seqno == last_seqno + 1) { + consumed_seqno = true; + } + if (!status.ok()) { + return status; + } + edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), + f.fd.GetFileSize(), f.smallest_internal_key(), + f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno, + false); + } + + if (consumed_seqno) { + versions_->SetLastToBeWrittenSequence(last_seqno + 1); + versions_->SetLastSequence(last_seqno + 1); + } + + return status; +} + +void ExternalSstFileIngestionJob::UpdateStats() { + // Update internal stats for new ingested files + uint64_t total_keys = 0; + uint64_t total_l0_files = 0; + uint64_t total_time = env_->NowMicros() - job_start_time_; + for (IngestedFileInfo& f : files_to_ingest_) { + InternalStats::CompactionStats stats(1); + stats.micros = total_time; + stats.bytes_written = f.fd.GetFileSize(); + stats.num_output_files = 1; + cfd_->internal_stats()->AddCompactionStats(f.picked_level, stats); + cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE, + f.fd.GetFileSize()); + total_keys += f.num_entries; + if (f.picked_level == 0) { + total_l0_files += 1; + } + ROCKS_LOG_INFO( + db_options_.info_log, + "[AddFile] External SST file %s was ingested in L%d with path %s " + "(global_seqno=%" PRIu64 ")\n", + f.external_file_path.c_str(), f.picked_level, + f.internal_file_path.c_str(), f.assigned_seqno); + } + cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL, + total_keys); + cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL, + files_to_ingest_.size()); + cfd_->internal_stats()->AddCFStats( + InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files); +} + +void ExternalSstFileIngestionJob::Cleanup(const Status& status) { + if (!status.ok()) { + // We failed to add the files to the database + // remove all the files we copied + for (IngestedFileInfo& f : files_to_ingest_) { + Status s = env_->DeleteFile(f.internal_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "AddFile() clean up for file %s failed : %s", + f.internal_file_path.c_str(), s.ToString().c_str()); + } + } + } else if (status.ok() && ingestion_options_.move_files) { + // The files were moved and added successfully, remove original file links + for (IngestedFileInfo& f : files_to_ingest_) { + Status s = env_->DeleteFile(f.external_file_path); + if (!s.ok()) { + ROCKS_LOG_WARN( + db_options_.info_log, + "%s was added to DB successfully but failed to remove original " + "file link : %s", + f.external_file_path.c_str(), s.ToString().c_str()); + } + } + } +} + +Status ExternalSstFileIngestionJob::GetIngestedFileInfo( + const std::string& external_file, IngestedFileInfo* file_to_ingest) { + file_to_ingest->external_file_path = external_file; + + // Get external file size + Status status = env_->GetFileSize(external_file, &file_to_ingest->file_size); + if (!status.ok()) { + return status; + } + + // Create TableReader for external file + std::unique_ptr<TableReader> table_reader; + std::unique_ptr<RandomAccessFile> sst_file; + std::unique_ptr<RandomAccessFileReader> sst_file_reader; + + status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_); + if (!status.ok()) { + return status; + } + sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file), + external_file)); + + status = cfd_->ioptions()->table_factory->NewTableReader( + TableReaderOptions(*cfd_->ioptions(), env_options_, + cfd_->internal_comparator()), + std::move(sst_file_reader), file_to_ingest->file_size, &table_reader); + if (!status.ok()) { + return status; + } + + // Get the external file properties + auto props = table_reader->GetTableProperties(); + const auto& uprops = props->user_collected_properties; + + // Get table version + auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion); + if (version_iter == uprops.end()) { + return Status::Corruption("External file version not found"); + } + file_to_ingest->version = DecodeFixed32(version_iter->second.c_str()); + + auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno); + if (file_to_ingest->version == 2) { + // version 2 imply that we have global sequence number + if (seqno_iter == uprops.end()) { + return Status::Corruption( + "External file global sequence number not found"); + } + + // Set the global sequence number + file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str()); + file_to_ingest->global_seqno_offset = props->properties_offsets.at( + ExternalSstFilePropertyNames::kGlobalSeqno); + + if (file_to_ingest->global_seqno_offset == 0) { + return Status::Corruption("Was not able to find file global seqno field"); + } + } else if (file_to_ingest->version == 1) { + // SST file V1 should not have global seqno field + assert(seqno_iter == uprops.end()); + file_to_ingest->original_seqno = 0; + if (ingestion_options_.allow_blocking_flush || + ingestion_options_.allow_global_seqno) { + return Status::InvalidArgument( + "External SST file V1 does not support global seqno"); + } + } else { + return Status::InvalidArgument("External file version is not supported"); + } + // Get number of entries in table + file_to_ingest->num_entries = props->num_entries; + + ParsedInternalKey key; + ReadOptions ro; + // During reading the external file we can cache blocks that we read into + // the block cache, if we later change the global seqno of this file, we will + // have block in cache that will include keys with wrong seqno. + // We need to disable fill_cache so that we read from the file without + // updating the block cache. + ro.fill_cache = false; + std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(ro)); + + // Get first (smallest) key from file + iter->SeekToFirst(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("external file have non zero sequence number"); + } + file_to_ingest->smallest_user_key = key.user_key.ToString(); + + // Get last (largest) key from file + iter->SeekToLast(); + if (!ParseInternalKey(iter->key(), &key)) { + return Status::Corruption("external file have corrupted keys"); + } + if (key.sequence != 0) { + return Status::Corruption("external file have non zero sequence number"); + } + file_to_ingest->largest_user_key = key.user_key.ToString(); + + file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id); + + file_to_ingest->table_properties = *props; + + return status; +} + +Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables( + SuperVersion* sv, bool* overlap) { + // Create an InternalIterator over all memtables + Arena arena; + ReadOptions ro; + ro.total_order_seek = true; + MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), &arena); + merge_iter_builder.AddIterator(sv->mem->NewIterator(ro, &arena)); + sv->imm->AddIterators(ro, &merge_iter_builder); + ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); + + std::vector<InternalIterator*> memtable_range_del_iters; + auto* active_range_del_iter = sv->mem->NewRangeTombstoneIterator(ro); + if (active_range_del_iter != nullptr) { + memtable_range_del_iters.push_back(active_range_del_iter); + } + sv->imm->AddRangeTombstoneIterators(ro, &memtable_range_del_iters); + std::unique_ptr<InternalIterator> memtable_range_del_iter(NewMergingIterator( + &cfd_->internal_comparator(), + memtable_range_del_iters.empty() ? nullptr : &memtable_range_del_iters[0], + static_cast<int>(memtable_range_del_iters.size()))); + + Status status; + *overlap = false; + for (IngestedFileInfo& f : files_to_ingest_) { + status = + IngestedFileOverlapWithIteratorRange(&f, memtable_iter.get(), overlap); + if (!status.ok() || *overlap == true) { + break; + } + status = IngestedFileOverlapWithRangeDeletions( + &f, memtable_range_del_iter.get(), overlap); + if (!status.ok() || *overlap == true) { + break; + } + } + + return status; +} + +Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( + SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, + IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) { + Status status; + *assigned_seqno = 0; + const SequenceNumber last_seqno = versions_->LastSequence(); + if (force_global_seqno) { + *assigned_seqno = last_seqno + 1; + if (compaction_style == kCompactionStyleUniversal) { + file_to_ingest->picked_level = 0; + return status; + } + } + + bool overlap_with_db = false; + Arena arena; + ReadOptions ro; + ro.total_order_seek = true; + int target_level = 0; + auto* vstorage = cfd_->current()->storage_info(); + + for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) { + if (lvl > 0 && lvl < vstorage->base_level()) { + continue; + } + + if (vstorage->NumLevelFiles(lvl) > 0) { + bool overlap_with_level = false; + status = IngestedFileOverlapWithLevel(sv, file_to_ingest, lvl, + &overlap_with_level); + if (!status.ok()) { + return status; + } + if (overlap_with_level) { + // We must use L0 or any level higher than `lvl` to be able to overwrite + // the keys that we overlap with in this level, We also need to assign + // this file a seqno to overwrite the existing keys in level `lvl` + overlap_with_db = true; + break; + } + + if (compaction_style == kCompactionStyleUniversal && lvl != 0) { + const std::vector<FileMetaData*>& level_files = + vstorage->LevelFiles(lvl); + const SequenceNumber level_largest_seqno = + (*max_element(level_files.begin(), level_files.end(), + [](FileMetaData* f1, FileMetaData* f2) { + return f1->largest_seqno < f2->largest_seqno; + })) + ->largest_seqno; + if (level_largest_seqno != 0) { + *assigned_seqno = level_largest_seqno; + } else { + continue; + } + } + } else if (compaction_style == kCompactionStyleUniversal) { + continue; + } + + // We dont overlap with any keys in this level, but we still need to check + // if our file can fit in it + if (IngestedFileFitInLevel(file_to_ingest, lvl)) { + target_level = lvl; + } + } + TEST_SYNC_POINT_CALLBACK( + "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", + &overlap_with_db); + file_to_ingest->picked_level = target_level; + if (overlap_with_db && *assigned_seqno == 0) { + *assigned_seqno = last_seqno + 1; + } + return status; +} + +Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile( + IngestedFileInfo* file_to_ingest) { + auto* vstorage = cfd_->current()->storage_info(); + // first check if new files fit in the bottommost level + int bottom_lvl = cfd_->NumberLevels() - 1; + if(!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) { + return Status::InvalidArgument( + "Can't ingest_behind file as it doesn't fit " + "at the bottommost level!"); + } + + // second check if despite allow_ingest_behind=true we still have 0 seqnums + // at some upper level + for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) { + for (auto file : vstorage->LevelFiles(lvl)) { + if (file->smallest_seqno == 0) { + return Status::InvalidArgument( + "Can't ingest_behind file as despite allow_ingest_behind=true " + "there are files with 0 seqno in database at upper levels!"); + } + } + } + + file_to_ingest->picked_level = bottom_lvl; + return Status::OK(); +} + +Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( + IngestedFileInfo* file_to_ingest, SequenceNumber seqno) { + if (file_to_ingest->original_seqno == seqno) { + // This file already have the correct global seqno + return Status::OK(); + } else if (!ingestion_options_.allow_global_seqno) { + return Status::InvalidArgument("Global seqno is required, but disabled"); + } else if (file_to_ingest->global_seqno_offset == 0) { + return Status::InvalidArgument( + "Trying to set global seqno for a file that dont have a global seqno " + "field"); + } + + std::unique_ptr<RandomRWFile> rwfile; + Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path, + &rwfile, env_options_); + if (!status.ok()) { + return status; + } + + // Write the new seqno in the global sequence number field in the file + std::string seqno_val; + PutFixed64(&seqno_val, seqno); + status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val); + if (status.ok()) { + file_to_ingest->assigned_seqno = seqno; + } + return status; +} + +Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange( + const IngestedFileInfo* file_to_ingest, InternalIterator* iter, + bool* overlap) { + auto* vstorage = cfd_->current()->storage_info(); + auto* ucmp = vstorage->InternalComparator()->user_comparator(); + InternalKey range_start(file_to_ingest->smallest_user_key, kMaxSequenceNumber, + kValueTypeForSeek); + iter->Seek(range_start.Encode()); + if (!iter->status().ok()) { + return iter->status(); + } + + *overlap = false; + if (iter->Valid()) { + ParsedInternalKey seek_result; + if (!ParseInternalKey(iter->key(), &seek_result)) { + return Status::Corruption("DB have corrupted keys"); + } + + if (ucmp->Compare(seek_result.user_key, file_to_ingest->largest_user_key) <= + 0) { + *overlap = true; + } + } + + return iter->status(); +} + +Status ExternalSstFileIngestionJob::IngestedFileOverlapWithRangeDeletions( + const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter, + bool* overlap) { + auto* vstorage = cfd_->current()->storage_info(); + auto* ucmp = vstorage->InternalComparator()->user_comparator(); + + *overlap = false; + if (range_del_iter != nullptr) { + for (range_del_iter->SeekToFirst(); range_del_iter->Valid(); + range_del_iter->Next()) { + ParsedInternalKey parsed_key; + if (!ParseInternalKey(range_del_iter->key(), &parsed_key)) { + return Status::Corruption("corrupted range deletion key: " + + range_del_iter->key().ToString()); + } + RangeTombstone range_del(parsed_key, range_del_iter->value()); + if (ucmp->Compare(range_del.start_key_, + file_to_ingest->largest_user_key) <= 0 && + ucmp->Compare(file_to_ingest->smallest_user_key, + range_del.end_key_) <= 0) { + *overlap = true; + break; + } + } + } + return Status::OK(); +} + +bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( + const IngestedFileInfo* file_to_ingest, int level) { + if (level == 0) { + // Files can always fit in L0 + return true; + } + + auto* vstorage = cfd_->current()->storage_info(); + Slice file_smallest_user_key(file_to_ingest->smallest_user_key); + Slice file_largest_user_key(file_to_ingest->largest_user_key); + + if (vstorage->OverlapInLevel(level, &file_smallest_user_key, + &file_largest_user_key)) { + // File overlap with another files in this level, we cannot + // add it to this level + return false; + } + if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key, + file_largest_user_key, level)) { + // File overlap with a running compaction output that will be stored + // in this level, we cannot add this file to this level + return false; + } + + // File did not overlap with level files, our compaction output + return true; +} + +Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel( + SuperVersion* sv, IngestedFileInfo* file_to_ingest, int lvl, + bool* overlap_with_level) { + Arena arena; + ReadOptions ro; + ro.total_order_seek = true; + MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), + &arena); + sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, lvl, + nullptr /* range_del_agg */); + ScopedArenaIterator level_iter(merge_iter_builder.Finish()); + + std::vector<InternalIterator*> level_range_del_iters; + sv->current->AddRangeDelIteratorsForLevel(ro, env_options_, lvl, + &level_range_del_iters); + std::unique_ptr<InternalIterator> level_range_del_iter(NewMergingIterator( + &cfd_->internal_comparator(), + level_range_del_iters.empty() ? nullptr : &level_range_del_iters[0], + static_cast<int>(level_range_del_iters.size()))); + + Status status = IngestedFileOverlapWithIteratorRange( + file_to_ingest, level_iter.get(), overlap_with_level); + if (status.ok() && *overlap_with_level == false) { + status = IngestedFileOverlapWithRangeDeletions( + file_to_ingest, level_range_del_iter.get(), overlap_with_level); + } + return status; +} + +} // namespace rocksdb + +#endif // !ROCKSDB_LITE http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/external_sst_file_ingestion_job.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/external_sst_file_ingestion_job.h b/thirdparty/rocksdb/db/external_sst_file_ingestion_job.h new file mode 100644 index 0000000..2d0fade --- /dev/null +++ b/thirdparty/rocksdb/db/external_sst_file_ingestion_job.h @@ -0,0 +1,171 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include <string> +#include <unordered_set> +#include <vector> + +#include "db/column_family.h" +#include "db/dbformat.h" +#include "db/internal_stats.h" +#include "db/snapshot_impl.h" +#include "options/db_options.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/sst_file_writer.h" +#include "util/autovector.h" + +namespace rocksdb { + +struct IngestedFileInfo { + // External file path + std::string external_file_path; + // Smallest user key in external file + std::string smallest_user_key; + // Largest user key in external file + std::string largest_user_key; + // Sequence number for keys in external file + SequenceNumber original_seqno; + // Offset of the global sequence number field in the file, will + // be zero if version is 1 (global seqno is not supported) + size_t global_seqno_offset; + // External file size + uint64_t file_size; + // total number of keys in external file + uint64_t num_entries; + // Id of column family this file shoule be ingested into + uint32_t cf_id; + // TableProperties read from external file + TableProperties table_properties; + // Version of external file + int version; + + // FileDescriptor for the file inside the DB + FileDescriptor fd; + // file path that we picked for file inside the DB + std::string internal_file_path = ""; + // Global sequence number that we picked for the file inside the DB + SequenceNumber assigned_seqno = 0; + // Level inside the DB we picked for the external file. + int picked_level = 0; + + InternalKey smallest_internal_key() const { + return InternalKey(smallest_user_key, assigned_seqno, + ValueType::kTypeValue); + } + + InternalKey largest_internal_key() const { + return InternalKey(largest_user_key, assigned_seqno, ValueType::kTypeValue); + } +}; + +class ExternalSstFileIngestionJob { + public: + ExternalSstFileIngestionJob( + Env* env, VersionSet* versions, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, const EnvOptions& env_options, + SnapshotList* db_snapshots, + const IngestExternalFileOptions& ingestion_options) + : env_(env), + versions_(versions), + cfd_(cfd), + db_options_(db_options), + env_options_(env_options), + db_snapshots_(db_snapshots), + ingestion_options_(ingestion_options), + job_start_time_(env_->NowMicros()) {} + + // Prepare the job by copying external files into the DB. + Status Prepare(const std::vector<std::string>& external_files_paths); + + // Check if we need to flush the memtable before running the ingestion job + // This will be true if the files we are ingesting are overlapping with any + // key range in the memtable. + // REQUIRES: Mutex held + Status NeedsFlush(bool* flush_needed); + + // Will execute the ingestion job and prepare edit() to be applied. + // REQUIRES: Mutex held + Status Run(); + + // Update column family stats. + // REQUIRES: Mutex held + void UpdateStats(); + + // Cleanup after successful/failed job + void Cleanup(const Status& status); + + VersionEdit* edit() { return &edit_; } + + const autovector<IngestedFileInfo>& files_to_ingest() const { + return files_to_ingest_; + } + + private: + // Open the external file and populate `file_to_ingest` with all the + // external information we need to ingest this file. + Status GetIngestedFileInfo(const std::string& external_file, + IngestedFileInfo* file_to_ingest); + + // Check if the files we are ingesting overlap with any memtable. + // REQUIRES: Mutex held + Status IngestedFilesOverlapWithMemtables(SuperVersion* sv, bool* overlap); + + // Assign `file_to_ingest` the appropriate sequence number and the lowest + // possible level that it can be ingested to according to compaction_style. + // REQUIRES: Mutex held + Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv, + bool force_global_seqno, + CompactionStyle compaction_style, + IngestedFileInfo* file_to_ingest, + SequenceNumber* assigned_seqno); + + // File that we want to ingest behind always goes to the lowest level; + // we just check that it fits in the level, that DB allows ingest_behind, + // and that we don't have 0 seqnums at the upper levels. + // REQUIRES: Mutex held + Status CheckLevelForIngestedBehindFile(IngestedFileInfo* file_to_ingest); + + // Set the file global sequence number to `seqno` + Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest, + SequenceNumber seqno); + + // Check if `file_to_ingest` key range overlap with the range `iter` represent + // REQUIRES: Mutex held + Status IngestedFileOverlapWithIteratorRange( + const IngestedFileInfo* file_to_ingest, InternalIterator* iter, + bool* overlap); + + // Check if `file_to_ingest` key range overlaps with any range deletions + // specified by `iter`. + // REQUIRES: Mutex held + Status IngestedFileOverlapWithRangeDeletions( + const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter, + bool* overlap); + + // Check if `file_to_ingest` key range overlap with level + // REQUIRES: Mutex held + Status IngestedFileOverlapWithLevel(SuperVersion* sv, + IngestedFileInfo* file_to_ingest, int lvl, bool* overlap_with_level); + + // Check if `file_to_ingest` can fit in level `level` + // REQUIRES: Mutex held + bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest, + int level); + + Env* env_; + VersionSet* versions_; + ColumnFamilyData* cfd_; + const ImmutableDBOptions& db_options_; + const EnvOptions& env_options_; + SnapshotList* db_snapshots_; + autovector<IngestedFileInfo> files_to_ingest_; + const IngestExternalFileOptions& ingestion_options_; + VersionEdit edit_; + uint64_t job_start_time_; +}; + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/file_indexer.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/file_indexer.cc b/thirdparty/rocksdb/db/file_indexer.cc new file mode 100644 index 0000000..abfa7cf --- /dev/null +++ b/thirdparty/rocksdb/db/file_indexer.cc @@ -0,0 +1,214 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/file_indexer.h" +#include <algorithm> +#include <functional> +#include "db/version_edit.h" +#include "rocksdb/comparator.h" + +namespace rocksdb { + +FileIndexer::FileIndexer(const Comparator* ucmp) + : num_levels_(0), ucmp_(ucmp), level_rb_(nullptr) {} + +size_t FileIndexer::NumLevelIndex() const { return next_level_index_.size(); } + +size_t FileIndexer::LevelIndexSize(size_t level) const { + if (level >= next_level_index_.size()) { + return 0; + } + return next_level_index_[level].num_index; +} + +void FileIndexer::GetNextLevelIndex(const size_t level, const size_t file_index, + const int cmp_smallest, + const int cmp_largest, int32_t* left_bound, + int32_t* right_bound) const { + assert(level > 0); + + // Last level, no hint + if (level == num_levels_ - 1) { + *left_bound = 0; + *right_bound = -1; + return; + } + + assert(level < num_levels_ - 1); + assert(static_cast<int32_t>(file_index) <= level_rb_[level]); + + const IndexUnit* index_units = next_level_index_[level].index_units; + const auto& index = index_units[file_index]; + + if (cmp_smallest < 0) { + *left_bound = (level > 0 && file_index > 0) + ? index_units[file_index - 1].largest_lb + : 0; + *right_bound = index.smallest_rb; + } else if (cmp_smallest == 0) { + *left_bound = index.smallest_lb; + *right_bound = index.smallest_rb; + } else if (cmp_smallest > 0 && cmp_largest < 0) { + *left_bound = index.smallest_lb; + *right_bound = index.largest_rb; + } else if (cmp_largest == 0) { + *left_bound = index.largest_lb; + *right_bound = index.largest_rb; + } else if (cmp_largest > 0) { + *left_bound = index.largest_lb; + *right_bound = level_rb_[level + 1]; + } else { + assert(false); + } + + assert(*left_bound >= 0); + assert(*left_bound <= *right_bound + 1); + assert(*right_bound <= level_rb_[level + 1]); +} + +void FileIndexer::UpdateIndex(Arena* arena, const size_t num_levels, + std::vector<FileMetaData*>* const files) { + if (files == nullptr) { + return; + } + if (num_levels == 0) { // uint_32 0-1 would cause bad behavior + num_levels_ = num_levels; + return; + } + assert(level_rb_ == nullptr); // level_rb_ should be init here + + num_levels_ = num_levels; + next_level_index_.resize(num_levels); + + char* mem = arena->AllocateAligned(num_levels_ * sizeof(int32_t)); + level_rb_ = new (mem) int32_t[num_levels_]; + for (size_t i = 0; i < num_levels_; i++) { + level_rb_[i] = -1; + } + + // L1 - Ln-1 + for (size_t level = 1; level < num_levels_ - 1; ++level) { + const auto& upper_files = files[level]; + const int32_t upper_size = static_cast<int32_t>(upper_files.size()); + const auto& lower_files = files[level + 1]; + level_rb_[level] = static_cast<int32_t>(upper_files.size()) - 1; + if (upper_size == 0) { + continue; + } + IndexLevel& index_level = next_level_index_[level]; + index_level.num_index = upper_size; + mem = arena->AllocateAligned(upper_size * sizeof(IndexUnit)); + index_level.index_units = new (mem) IndexUnit[upper_size]; + + CalculateLB( + upper_files, lower_files, &index_level, + [this](const FileMetaData * a, const FileMetaData * b)->int { + return ucmp_->Compare(a->smallest.user_key(), b->largest.user_key()); + }, + [](IndexUnit* index, int32_t f_idx) { index->smallest_lb = f_idx; }); + CalculateLB( + upper_files, lower_files, &index_level, + [this](const FileMetaData * a, const FileMetaData * b)->int { + return ucmp_->Compare(a->largest.user_key(), b->largest.user_key()); + }, + [](IndexUnit* index, int32_t f_idx) { index->largest_lb = f_idx; }); + CalculateRB( + upper_files, lower_files, &index_level, + [this](const FileMetaData * a, const FileMetaData * b)->int { + return ucmp_->Compare(a->smallest.user_key(), b->smallest.user_key()); + }, + [](IndexUnit* index, int32_t f_idx) { index->smallest_rb = f_idx; }); + CalculateRB( + upper_files, lower_files, &index_level, + [this](const FileMetaData * a, const FileMetaData * b)->int { + return ucmp_->Compare(a->largest.user_key(), b->smallest.user_key()); + }, + [](IndexUnit* index, int32_t f_idx) { index->largest_rb = f_idx; }); + } + + level_rb_[num_levels_ - 1] = + static_cast<int32_t>(files[num_levels_ - 1].size()) - 1; +} + +void FileIndexer::CalculateLB( + const std::vector<FileMetaData*>& upper_files, + const std::vector<FileMetaData*>& lower_files, IndexLevel* index_level, + std::function<int(const FileMetaData*, const FileMetaData*)> cmp_op, + std::function<void(IndexUnit*, int32_t)> set_index) { + const int32_t upper_size = static_cast<int32_t>(upper_files.size()); + const int32_t lower_size = static_cast<int32_t>(lower_files.size()); + int32_t upper_idx = 0; + int32_t lower_idx = 0; + + IndexUnit* index = index_level->index_units; + while (upper_idx < upper_size && lower_idx < lower_size) { + int cmp = cmp_op(upper_files[upper_idx], lower_files[lower_idx]); + + if (cmp == 0) { + set_index(&index[upper_idx], lower_idx); + ++upper_idx; + ++lower_idx; + } else if (cmp > 0) { + // Lower level's file (largest) is smaller, a key won't hit in that + // file. Move to next lower file + ++lower_idx; + } else { + // Lower level's file becomes larger, update the index, and + // move to the next upper file + set_index(&index[upper_idx], lower_idx); + ++upper_idx; + } + } + + while (upper_idx < upper_size) { + // Lower files are exhausted, that means the remaining upper files are + // greater than any lower files. Set the index to be the lower level size. + set_index(&index[upper_idx], lower_size); + ++upper_idx; + } +} + +void FileIndexer::CalculateRB( + const std::vector<FileMetaData*>& upper_files, + const std::vector<FileMetaData*>& lower_files, IndexLevel* index_level, + std::function<int(const FileMetaData*, const FileMetaData*)> cmp_op, + std::function<void(IndexUnit*, int32_t)> set_index) { + const int32_t upper_size = static_cast<int32_t>(upper_files.size()); + const int32_t lower_size = static_cast<int32_t>(lower_files.size()); + int32_t upper_idx = upper_size - 1; + int32_t lower_idx = lower_size - 1; + + IndexUnit* index = index_level->index_units; + while (upper_idx >= 0 && lower_idx >= 0) { + int cmp = cmp_op(upper_files[upper_idx], lower_files[lower_idx]); + + if (cmp == 0) { + set_index(&index[upper_idx], lower_idx); + --upper_idx; + --lower_idx; + } else if (cmp < 0) { + // Lower level's file (smallest) is larger, a key won't hit in that + // file. Move to next lower file. + --lower_idx; + } else { + // Lower level's file becomes smaller, update the index, and move to + // the next the upper file + set_index(&index[upper_idx], lower_idx); + --upper_idx; + } + } + while (upper_idx >= 0) { + // Lower files are exhausted, that means the remaining upper files are + // smaller than any lower files. Set it to -1. + set_index(&index[upper_idx], -1); + --upper_idx; + } +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/file_indexer.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/file_indexer.h b/thirdparty/rocksdb/db/file_indexer.h new file mode 100644 index 0000000..1bef3aa --- /dev/null +++ b/thirdparty/rocksdb/db/file_indexer.h @@ -0,0 +1,142 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include <cstdint> +#include <functional> +#include <limits> +#include <vector> +#include "port/port.h" +#include "util/arena.h" +#include "util/autovector.h" + +namespace rocksdb { + +class Comparator; +struct FileMetaData; +struct FdWithKeyRange; +struct FileLevel; + +// The file tree structure in Version is prebuilt and the range of each file +// is known. On Version::Get(), it uses binary search to find a potential file +// and then check if a target key can be found in the file by comparing the key +// to each file's smallest and largest key. The results of these comparisons +// can be reused beyond checking if a key falls into a file's range. +// With some pre-calculated knowledge, each key comparison that has been done +// can serve as a hint to narrow down further searches: if a key compared to +// be smaller than a file's smallest or largest, that comparison can be used +// to find out the right bound of next binary search. Similarly, if a key +// compared to be larger than a file's smallest or largest, it can be utilized +// to find out the left bound of next binary search. +// With these hints: it can greatly reduce the range of binary search, +// especially for bottom levels, given that one file most likely overlaps with +// only N files from level below (where N is max_bytes_for_level_multiplier). +// So on level L, we will only look at ~N files instead of N^L files on the +// naive approach. +class FileIndexer { + public: + explicit FileIndexer(const Comparator* ucmp); + + size_t NumLevelIndex() const; + + size_t LevelIndexSize(size_t level) const; + + // Return a file index range in the next level to search for a key based on + // smallest and largest key comparison for the current file specified by + // level and file_index. When *left_index < *right_index, both index should + // be valid and fit in the vector size. + void GetNextLevelIndex(const size_t level, const size_t file_index, + const int cmp_smallest, const int cmp_largest, + int32_t* left_bound, int32_t* right_bound) const; + + void UpdateIndex(Arena* arena, const size_t num_levels, + std::vector<FileMetaData*>* const files); + + enum { + // MSVC version 1800 still does not have constexpr for ::max() + kLevelMaxIndex = rocksdb::port::kMaxInt32 + }; + + private: + size_t num_levels_; + const Comparator* ucmp_; + + struct IndexUnit { + IndexUnit() + : smallest_lb(0), largest_lb(0), smallest_rb(-1), largest_rb(-1) {} + // During file search, a key is compared against smallest and largest + // from a FileMetaData. It can have 3 possible outcomes: + // (1) key is smaller than smallest, implying it is also smaller than + // larger. Precalculated index based on "smallest < smallest" can + // be used to provide right bound. + // (2) key is in between smallest and largest. + // Precalculated index based on "smallest > greatest" can be used to + // provide left bound. + // Precalculated index based on "largest < smallest" can be used to + // provide right bound. + // (3) key is larger than largest, implying it is also larger than smallest. + // Precalculated index based on "largest > largest" can be used to + // provide left bound. + // + // As a result, we will need to do: + // Compare smallest (<=) and largest keys from upper level file with + // smallest key from lower level to get a right bound. + // Compare smallest (>=) and largest keys from upper level file with + // largest key from lower level to get a left bound. + // + // Example: + // level 1: [50 - 60] + // level 2: [1 - 40], [45 - 55], [58 - 80] + // A key 35, compared to be less than 50, 3rd file on level 2 can be + // skipped according to rule (1). LB = 0, RB = 1. + // A key 53, sits in the middle 50 and 60. 1st file on level 2 can be + // skipped according to rule (2)-a, but the 3rd file cannot be skipped + // because 60 is greater than 58. LB = 1, RB = 2. + // A key 70, compared to be larger than 60. 1st and 2nd file can be skipped + // according to rule (3). LB = 2, RB = 2. + // + // Point to a left most file in a lower level that may contain a key, + // which compares greater than smallest of a FileMetaData (upper level) + int32_t smallest_lb; + // Point to a left most file in a lower level that may contain a key, + // which compares greater than largest of a FileMetaData (upper level) + int32_t largest_lb; + // Point to a right most file in a lower level that may contain a key, + // which compares smaller than smallest of a FileMetaData (upper level) + int32_t smallest_rb; + // Point to a right most file in a lower level that may contain a key, + // which compares smaller than largest of a FileMetaData (upper level) + int32_t largest_rb; + }; + + // Data structure to store IndexUnits in a whole level + struct IndexLevel { + size_t num_index; + IndexUnit* index_units; + + IndexLevel() : num_index(0), index_units(nullptr) {} + }; + + void CalculateLB( + const std::vector<FileMetaData*>& upper_files, + const std::vector<FileMetaData*>& lower_files, IndexLevel* index_level, + std::function<int(const FileMetaData*, const FileMetaData*)> cmp_op, + std::function<void(IndexUnit*, int32_t)> set_index); + + void CalculateRB( + const std::vector<FileMetaData*>& upper_files, + const std::vector<FileMetaData*>& lower_files, IndexLevel* index_level, + std::function<int(const FileMetaData*, const FileMetaData*)> cmp_op, + std::function<void(IndexUnit*, int32_t)> set_index); + + autovector<IndexLevel> next_level_index_; + int32_t* level_rb_; +}; + +} // namespace rocksdb
