http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/memtable.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/memtable.h b/thirdparty/rocksdb/db/memtable.h new file mode 100644 index 0000000..fe9feaf --- /dev/null +++ b/thirdparty/rocksdb/db/memtable.h @@ -0,0 +1,427 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include <atomic> +#include <deque> +#include <functional> +#include <memory> +#include <string> +#include <unordered_map> +#include <vector> +#include "db/dbformat.h" +#include "db/range_del_aggregator.h" +#include "db/version_edit.h" +#include "monitoring/instrumented_mutex.h" +#include "options/cf_options.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/memtablerep.h" +#include "util/allocator.h" +#include "util/concurrent_arena.h" +#include "util/dynamic_bloom.h" +#include "util/hash.h" + +namespace rocksdb { + +class Mutex; +class MemTableIterator; +class MergeContext; +class InternalIterator; + +struct MemTableOptions { + explicit MemTableOptions( + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options); + size_t write_buffer_size; + size_t arena_block_size; + uint32_t memtable_prefix_bloom_bits; + size_t memtable_huge_page_size; + bool inplace_update_support; + size_t inplace_update_num_locks; + UpdateStatus (*inplace_callback)(char* existing_value, + uint32_t* existing_value_size, + Slice delta_value, + std::string* merged_value); + size_t max_successive_merges; + Statistics* statistics; + MergeOperator* merge_operator; + Logger* info_log; +}; + +// Batched counters to updated when inserting keys in one write batch. +// In post process of the write batch, these can be updated together. +// Only used in concurrent memtable insert case. +struct MemTablePostProcessInfo { + uint64_t data_size = 0; + uint64_t num_entries = 0; + uint64_t num_deletes = 0; +}; + +// Note: Many of the methods in this class have comments indicating that +// external synchromization is required as these methods are not thread-safe. +// It is up to higher layers of code to decide how to prevent concurrent +// invokation of these methods. This is usually done by acquiring either +// the db mutex or the single writer thread. +// +// Some of these methods are documented to only require external +// synchronization if this memtable is immutable. Calling MarkImmutable() is +// not sufficient to guarantee immutability. It is up to higher layers of +// code to determine if this MemTable can still be modified by other threads. +// Eg: The Superversion stores a pointer to the current MemTable (that can +// be modified) and a separate list of the MemTables that can no longer be +// written to (aka the 'immutable memtables'). +class MemTable { + public: + struct KeyComparator : public MemTableRep::KeyComparator { + const InternalKeyComparator comparator; + explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } + virtual int operator()(const char* prefix_len_key1, + const char* prefix_len_key2) const override; + virtual int operator()(const char* prefix_len_key, + const Slice& key) const override; + }; + + // MemTables are reference counted. The initial reference count + // is zero and the caller must call Ref() at least once. + // + // earliest_seq should be the current SequenceNumber in the db such that any + // key inserted into this memtable will have an equal or larger seq number. + // (When a db is first created, the earliest sequence number will be 0). + // If the earliest sequence number is not known, kMaxSequenceNumber may be + // used, but this may prevent some transactions from succeeding until the + // first key is inserted into the memtable. + explicit MemTable(const InternalKeyComparator& comparator, + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + WriteBufferManager* write_buffer_manager, + SequenceNumber earliest_seq, uint32_t column_family_id); + + // Do not delete this MemTable unless Unref() indicates it not in use. + ~MemTable(); + + // Increase reference count. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void Ref() { ++refs_; } + + // Drop reference count. + // If the refcount goes to zero return this memtable, otherwise return null. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + MemTable* Unref() { + --refs_; + assert(refs_ >= 0); + if (refs_ <= 0) { + return this; + } + return nullptr; + } + + // Returns an estimate of the number of bytes of data in use by this + // data structure. + // + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + size_t ApproximateMemoryUsage(); + + // This method heuristically determines if the memtable should continue to + // host more data. + bool ShouldScheduleFlush() const { + return flush_state_.load(std::memory_order_relaxed) == FLUSH_REQUESTED; + } + + // Returns true if a flush should be scheduled and the caller should + // be the one to schedule it + bool MarkFlushScheduled() { + auto before = FLUSH_REQUESTED; + return flush_state_.compare_exchange_strong(before, FLUSH_SCHEDULED, + std::memory_order_relaxed, + std::memory_order_relaxed); + } + + // Return an iterator that yields the contents of the memtable. + // + // The caller must ensure that the underlying MemTable remains live + // while the returned iterator is live. The keys returned by this + // iterator are internal keys encoded by AppendInternalKey in the + // db/dbformat.{h,cc} module. + // + // By default, it returns an iterator for prefix seek if prefix_extractor + // is configured in Options. + // arena: If not null, the arena needs to be used to allocate the Iterator. + // Calling ~Iterator of the iterator will destroy all the states but + // those allocated in arena. + InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena); + + InternalIterator* NewRangeTombstoneIterator(const ReadOptions& read_options); + + // Add an entry into memtable that maps key to value at the + // specified sequence number and with the specified type. + // Typically value will be empty if type==kTypeDeletion. + // + // REQUIRES: if allow_concurrent = false, external synchronization to prevent + // simultaneous operations on the same MemTable. + void Add(SequenceNumber seq, ValueType type, const Slice& key, + const Slice& value, bool allow_concurrent = false, + MemTablePostProcessInfo* post_process_info = nullptr); + + // If memtable contains a value for key, store it in *value and return true. + // If memtable contains a deletion for key, store a NotFound() error + // in *status and return true. + // If memtable contains Merge operation as the most recent entry for a key, + // and the merge process does not stop (not reaching a value or delete), + // prepend the current merge operand to *operands. + // store MergeInProgress in s, and return false. + // Else, return false. + // If any operation was found, its most recent sequence number + // will be stored in *seq on success (regardless of whether true/false is + // returned). Otherwise, *seq will be set to kMaxSequenceNumber. + // On success, *s may be set to OK, NotFound, or MergeInProgress. Any other + // status returned indicates a corruption or other unexpected error. + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + SequenceNumber* seq, const ReadOptions& read_opts); + + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { + SequenceNumber seq; + return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); + } + + // Attempts to update the new_value inplace, else does normal Add + // Pseudocode + // if key exists in current memtable && prev_value is of type kTypeValue + // if new sizeof(new_value) <= sizeof(prev_value) + // update inplace + // else add(key, new_value) + // else add(key, new_value) + // + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void Update(SequenceNumber seq, + const Slice& key, + const Slice& value); + + // If prev_value for key exists, attempts to update it inplace. + // else returns false + // Pseudocode + // if key exists in current memtable && prev_value is of type kTypeValue + // new_value = delta(prev_value) + // if sizeof(new_value) <= sizeof(prev_value) + // update inplace + // else add(key, new_value) + // else return false + // + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + bool UpdateCallback(SequenceNumber seq, + const Slice& key, + const Slice& delta); + + // Returns the number of successive merge entries starting from the newest + // entry for the key up to the last non-merge entry or last entry for the + // key in the memtable. + size_t CountSuccessiveMergeEntries(const LookupKey& key); + + // Update counters and flush status after inserting a whole write batch + // Used in concurrent memtable inserts. + void BatchPostProcess(const MemTablePostProcessInfo& update_counters) { + num_entries_.fetch_add(update_counters.num_entries, + std::memory_order_relaxed); + data_size_.fetch_add(update_counters.data_size, std::memory_order_relaxed); + if (update_counters.num_deletes != 0) { + num_deletes_.fetch_add(update_counters.num_deletes, + std::memory_order_relaxed); + } + UpdateFlushState(); + } + + // Get total number of entries in the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + uint64_t num_entries() const { + return num_entries_.load(std::memory_order_relaxed); + } + + // Get total number of deletes in the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + uint64_t num_deletes() const { + return num_deletes_.load(std::memory_order_relaxed); + } + + // Returns the edits area that is needed for flushing the memtable + VersionEdit* GetEdits() { return &edit_; } + + // Returns if there is no entry inserted to the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + bool IsEmpty() const { return first_seqno_ == 0; } + + // Returns the sequence number of the first element that was inserted + // into the memtable. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + SequenceNumber GetFirstSequenceNumber() { + return first_seqno_.load(std::memory_order_relaxed); + } + + // Returns the sequence number that is guaranteed to be smaller than or equal + // to the sequence number of any key that could be inserted into this + // memtable. It can then be assumed that any write with a larger(or equal) + // sequence number will be present in this memtable or a later memtable. + // + // If the earliest sequence number could not be determined, + // kMaxSequenceNumber will be returned. + SequenceNumber GetEarliestSequenceNumber() { + return earliest_seqno_.load(std::memory_order_relaxed); + } + + // DB's latest sequence ID when the memtable is created. This number + // may be updated to a more recent one before any key is inserted. + SequenceNumber GetCreationSeq() const { return creation_seq_; } + + void SetCreationSeq(SequenceNumber sn) { creation_seq_ = sn; } + + // Returns the next active logfile number when this memtable is about to + // be flushed to storage + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + uint64_t GetNextLogNumber() { return mem_next_logfile_number_; } + + // Sets the next active logfile number when this memtable is about to + // be flushed to storage + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } + + // if this memtable contains data from a committed + // two phase transaction we must take note of the + // log which contains that data so we can know + // when to relese that log + void RefLogContainingPrepSection(uint64_t log); + uint64_t GetMinLogContainingPrepSection(); + + // Notify the underlying storage that no more items will be added. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable. + // After MarkImmutable() is called, you should not attempt to + // write anything to this MemTable(). (Ie. do not call Add() or Update()). + void MarkImmutable() { + table_->MarkReadOnly(); + mem_tracker_.DoneAllocating(); + } + + // return true if the current MemTableRep supports merge operator. + bool IsMergeOperatorSupported() const { + return table_->IsMergeOperatorSupported(); + } + + // return true if the current MemTableRep supports snapshots. + // inplace update prevents snapshots, + bool IsSnapshotSupported() const { + return table_->IsSnapshotSupported() && !moptions_.inplace_update_support; + } + + struct MemTableStats { + uint64_t size; + uint64_t count; + }; + + MemTableStats ApproximateStats(const Slice& start_ikey, + const Slice& end_ikey); + + // Get the lock associated for the key + port::RWMutex* GetLock(const Slice& key); + + const InternalKeyComparator& GetInternalKeyComparator() const { + return comparator_.comparator; + } + + const MemTableOptions* GetMemTableOptions() const { return &moptions_; } + + private: + enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; + + friend class MemTableIterator; + friend class MemTableBackwardIterator; + friend class MemTableList; + + KeyComparator comparator_; + const MemTableOptions moptions_; + int refs_; + const size_t kArenaBlockSize; + AllocTracker mem_tracker_; + ConcurrentArena arena_; + unique_ptr<MemTableRep> table_; + unique_ptr<MemTableRep> range_del_table_; + bool is_range_del_table_empty_; + + // Total data size of all data inserted + std::atomic<uint64_t> data_size_; + std::atomic<uint64_t> num_entries_; + std::atomic<uint64_t> num_deletes_; + + // These are used to manage memtable flushes to storage + bool flush_in_progress_; // started the flush + bool flush_completed_; // finished the flush + uint64_t file_number_; // filled up after flush is complete + + // The updates to be applied to the transaction log when this + // memtable is flushed to storage. + VersionEdit edit_; + + // The sequence number of the kv that was inserted first + std::atomic<SequenceNumber> first_seqno_; + + // The db sequence number at the time of creation or kMaxSequenceNumber + // if not set. + std::atomic<SequenceNumber> earliest_seqno_; + + SequenceNumber creation_seq_; + + // The log files earlier than this number can be deleted. + uint64_t mem_next_logfile_number_; + + // the earliest log containing a prepared section + // which has been inserted into this memtable. + std::atomic<uint64_t> min_prep_log_referenced_; + + // rw locks for inplace updates + std::vector<port::RWMutex> locks_; + + const SliceTransform* const prefix_extractor_; + std::unique_ptr<DynamicBloom> prefix_bloom_; + + std::atomic<FlushStateEnum> flush_state_; + + Env* env_; + + // Extract sequential insert prefixes. + const SliceTransform* insert_with_hint_prefix_extractor_; + + // Insert hints for each prefix. + std::unordered_map<Slice, void*, SliceHasher> insert_hints_; + + // Returns a heuristic flush decision + bool ShouldFlushNow() const; + + // Updates flush_state_ using ShouldFlushNow() + void UpdateFlushState(); + + // No copying allowed + MemTable(const MemTable&); + MemTable& operator=(const MemTable&); +}; + +extern const char* EncodeKey(std::string* scratch, const Slice& target); + +} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/memtable_list.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/memtable_list.cc b/thirdparty/rocksdb/db/memtable_list.cc new file mode 100644 index 0000000..8f710c2 --- /dev/null +++ b/thirdparty/rocksdb/db/memtable_list.cc @@ -0,0 +1,482 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "db/memtable_list.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include <inttypes.h> +#include <string> +#include "db/memtable.h" +#include "db/version_set.h" +#include "monitoring/thread_status_util.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "table/merging_iterator.h" +#include "util/coding.h" +#include "util/log_buffer.h" +#include "util/sync_point.h" + +namespace rocksdb { + +class InternalKeyComparator; +class Mutex; +class VersionSet; + +void MemTableListVersion::AddMemTable(MemTable* m) { + memlist_.push_front(m); + *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage(); +} + +void MemTableListVersion::UnrefMemTable(autovector<MemTable*>* to_delete, + MemTable* m) { + if (m->Unref()) { + to_delete->push_back(m); + assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage()); + *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage(); + } else { + } +} + +MemTableListVersion::MemTableListVersion( + size_t* parent_memtable_list_memory_usage, MemTableListVersion* old) + : max_write_buffer_number_to_maintain_( + old->max_write_buffer_number_to_maintain_), + parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) { + if (old != nullptr) { + memlist_ = old->memlist_; + for (auto& m : memlist_) { + m->Ref(); + } + + memlist_history_ = old->memlist_history_; + for (auto& m : memlist_history_) { + m->Ref(); + } + } +} + +MemTableListVersion::MemTableListVersion( + size_t* parent_memtable_list_memory_usage, + int max_write_buffer_number_to_maintain) + : max_write_buffer_number_to_maintain_(max_write_buffer_number_to_maintain), + parent_memtable_list_memory_usage_(parent_memtable_list_memory_usage) {} + +void MemTableListVersion::Ref() { ++refs_; } + +// called by superversion::clean() +void MemTableListVersion::Unref(autovector<MemTable*>* to_delete) { + assert(refs_ >= 1); + --refs_; + if (refs_ == 0) { + // if to_delete is equal to nullptr it means we're confident + // that refs_ will not be zero + assert(to_delete != nullptr); + for (const auto& m : memlist_) { + UnrefMemTable(to_delete, m); + } + for (const auto& m : memlist_history_) { + UnrefMemTable(to_delete, m); + } + delete this; + } +} + +int MemTableList::NumNotFlushed() const { + int size = static_cast<int>(current_->memlist_.size()); + assert(num_flush_not_started_ <= size); + return size; +} + +int MemTableList::NumFlushed() const { + return static_cast<int>(current_->memlist_history_.size()); +} + +// Search all the memtables starting from the most recent one. +// Return the most recent value found, if any. +// Operands stores the list of merge operations to apply, so far. +bool MemTableListVersion::Get(const LookupKey& key, std::string* value, + Status* s, MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + SequenceNumber* seq, + const ReadOptions& read_opts) { + return GetFromList(&memlist_, key, value, s, merge_context, range_del_agg, + seq, read_opts); +} + +bool MemTableListVersion::GetFromHistory(const LookupKey& key, + std::string* value, Status* s, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + SequenceNumber* seq, + const ReadOptions& read_opts) { + return GetFromList(&memlist_history_, key, value, s, merge_context, + range_del_agg, seq, read_opts); +} + +bool MemTableListVersion::GetFromList(std::list<MemTable*>* list, + const LookupKey& key, std::string* value, + Status* s, MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + SequenceNumber* seq, + const ReadOptions& read_opts) { + *seq = kMaxSequenceNumber; + + for (auto& memtable : *list) { + SequenceNumber current_seq = kMaxSequenceNumber; + + bool done = memtable->Get(key, value, s, merge_context, range_del_agg, + ¤t_seq, read_opts); + if (*seq == kMaxSequenceNumber) { + // Store the most recent sequence number of any operation on this key. + // Since we only care about the most recent change, we only need to + // return the first operation found when searching memtables in + // reverse-chronological order. + *seq = current_seq; + } + + if (done) { + assert(*seq != kMaxSequenceNumber); + return true; + } + if (!done && !s->ok() && !s->IsMergeInProgress() && !s->IsNotFound()) { + return false; + } + } + return false; +} + +Status MemTableListVersion::AddRangeTombstoneIterators( + const ReadOptions& read_opts, Arena* arena, + RangeDelAggregator* range_del_agg) { + assert(range_del_agg != nullptr); + for (auto& m : memlist_) { + std::unique_ptr<InternalIterator> range_del_iter( + m->NewRangeTombstoneIterator(read_opts)); + Status s = range_del_agg->AddTombstones(std::move(range_del_iter)); + if (!s.ok()) { + return s; + } + } + return Status::OK(); +} + +Status MemTableListVersion::AddRangeTombstoneIterators( + const ReadOptions& read_opts, + std::vector<InternalIterator*>* range_del_iters) { + for (auto& m : memlist_) { + auto* range_del_iter = m->NewRangeTombstoneIterator(read_opts); + if (range_del_iter != nullptr) { + range_del_iters->push_back(range_del_iter); + } + } + return Status::OK(); +} + +void MemTableListVersion::AddIterators( + const ReadOptions& options, std::vector<InternalIterator*>* iterator_list, + Arena* arena) { + for (auto& m : memlist_) { + iterator_list->push_back(m->NewIterator(options, arena)); + } +} + +void MemTableListVersion::AddIterators( + const ReadOptions& options, MergeIteratorBuilder* merge_iter_builder) { + for (auto& m : memlist_) { + merge_iter_builder->AddIterator( + m->NewIterator(options, merge_iter_builder->GetArena())); + } +} + +uint64_t MemTableListVersion::GetTotalNumEntries() const { + uint64_t total_num = 0; + for (auto& m : memlist_) { + total_num += m->num_entries(); + } + return total_num; +} + +MemTable::MemTableStats MemTableListVersion::ApproximateStats( + const Slice& start_ikey, const Slice& end_ikey) { + MemTable::MemTableStats total_stats = {0, 0}; + for (auto& m : memlist_) { + auto mStats = m->ApproximateStats(start_ikey, end_ikey); + total_stats.size += mStats.size; + total_stats.count += mStats.count; + } + return total_stats; +} + +uint64_t MemTableListVersion::GetTotalNumDeletes() const { + uint64_t total_num = 0; + for (auto& m : memlist_) { + total_num += m->num_deletes(); + } + return total_num; +} + +SequenceNumber MemTableListVersion::GetEarliestSequenceNumber( + bool include_history) const { + if (include_history && !memlist_history_.empty()) { + return memlist_history_.back()->GetEarliestSequenceNumber(); + } else if (!memlist_.empty()) { + return memlist_.back()->GetEarliestSequenceNumber(); + } else { + return kMaxSequenceNumber; + } +} + +// caller is responsible for referencing m +void MemTableListVersion::Add(MemTable* m, autovector<MemTable*>* to_delete) { + assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable + AddMemTable(m); + + TrimHistory(to_delete); +} + +// Removes m from list of memtables not flushed. Caller should NOT Unref m. +void MemTableListVersion::Remove(MemTable* m, + autovector<MemTable*>* to_delete) { + assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable + memlist_.remove(m); + + if (max_write_buffer_number_to_maintain_ > 0) { + memlist_history_.push_front(m); + TrimHistory(to_delete); + } else { + UnrefMemTable(to_delete, m); + } +} + +// Make sure we don't use up too much space in history +void MemTableListVersion::TrimHistory(autovector<MemTable*>* to_delete) { + while (memlist_.size() + memlist_history_.size() > + static_cast<size_t>(max_write_buffer_number_to_maintain_) && + !memlist_history_.empty()) { + MemTable* x = memlist_history_.back(); + memlist_history_.pop_back(); + + UnrefMemTable(to_delete, x); + } +} + +// Returns true if there is at least one memtable on which flush has +// not yet started. +bool MemTableList::IsFlushPending() const { + if ((flush_requested_ && num_flush_not_started_ >= 1) || + (num_flush_not_started_ >= min_write_buffer_number_to_merge_)) { + assert(imm_flush_needed.load(std::memory_order_relaxed)); + return true; + } + return false; +} + +// Returns the memtables that need to be flushed. +void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); + const auto& memlist = current_->memlist_; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + if (!m->flush_in_progress_) { + assert(!m->flush_completed_); + num_flush_not_started_--; + if (num_flush_not_started_ == 0) { + imm_flush_needed.store(false, std::memory_order_release); + } + m->flush_in_progress_ = true; // flushing will start very soon + ret->push_back(m); + } + } + flush_requested_ = false; // start-flush request is complete +} + +void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems, + uint64_t file_number) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_MEMTABLE_ROLLBACK); + assert(!mems.empty()); + + // If the flush was not successful, then just reset state. + // Maybe a succeeding attempt to flush will be successful. + for (MemTable* m : mems) { + assert(m->flush_in_progress_); + assert(m->file_number_ == 0); + + m->flush_in_progress_ = false; + m->flush_completed_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + } + imm_flush_needed.store(true, std::memory_order_release); +} + +// Record a successful flush in the manifest file +Status MemTableList::InstallMemtableFlushResults( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu, + uint64_t file_number, autovector<MemTable*>* to_delete, + Directory* db_directory, LogBuffer* log_buffer) { + AutoThreadOperationStageUpdater stage_updater( + ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); + mu->AssertHeld(); + + // flush was successful + for (size_t i = 0; i < mems.size(); ++i) { + // All the edits are associated with the first memtable of this batch. + assert(i == 0 || mems[i]->GetEdits()->NumEntries() == 0); + + mems[i]->flush_completed_ = true; + mems[i]->file_number_ = file_number; + } + + // if some other thread is already committing, then return + Status s; + if (commit_in_progress_) { + TEST_SYNC_POINT("MemTableList::InstallMemtableFlushResults:InProgress"); + return s; + } + + // Only a single thread can be executing this piece of code + commit_in_progress_ = true; + + // Retry until all completed flushes are committed. New flushes can finish + // while the current thread is writing manifest where mutex is released. + while (s.ok()) { + auto& memlist = current_->memlist_; + if (memlist.empty() || !memlist.back()->flush_completed_) { + break; + } + // scan all memtables from the earliest, and commit those + // (in that order) that have finished flushing. Memetables + // are always committed in the order that they were created. + uint64_t batch_file_number = 0; + size_t batch_count = 0; + autovector<VersionEdit*> edit_list; + // enumerate from the last (earliest) element to see how many batch finished + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + if (!m->flush_completed_) { + break; + } + if (it == memlist.rbegin() || batch_file_number != m->file_number_) { + batch_file_number = m->file_number_; + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 " started", + cfd->GetName().c_str(), m->file_number_); + edit_list.push_back(&m->edit_); + } + batch_count++; + } + + if (batch_count > 0) { + // this can release and reacquire the mutex. + s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, + db_directory); + + // we will be changing the version in the next code path, + // so we better create a new one, since versions are immutable + InstallNewVersion(); + + // All the later memtables that have the same filenum + // are part of the same batch. They can be committed now. + uint64_t mem_id = 1; // how many memtables have been flushed. + if (s.ok()) { // commit new state + while (batch_count-- > 0) { + MemTable* m = current_->memlist_.back(); + ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfd->GetName().c_str(), m->file_number_, mem_id); + assert(m->file_number_ > 0); + current_->Remove(m, to_delete); + ++mem_id; + } + } else { + for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) { + MemTable* m = *it; + // commit failed. setup state so that we can flush again. + ROCKS_LOG_BUFFER(log_buffer, "Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + m->file_number_, mem_id); + m->flush_completed_ = false; + m->flush_in_progress_ = false; + m->edit_.Clear(); + num_flush_not_started_++; + m->file_number_ = 0; + imm_flush_needed.store(true, std::memory_order_release); + ++mem_id; + } + } + } + } + commit_in_progress_ = false; + return s; +} + +// New memtables are inserted at the front of the list. +void MemTableList::Add(MemTable* m, autovector<MemTable*>* to_delete) { + assert(static_cast<int>(current_->memlist_.size()) >= num_flush_not_started_); + InstallNewVersion(); + // this method is used to move mutable memtable into an immutable list. + // since mutable memtable is already refcounted by the DBImpl, + // and when moving to the imutable list we don't unref it, + // we don't have to ref the memtable here. we just take over the + // reference from the DBImpl. + current_->Add(m, to_delete); + m->MarkImmutable(); + num_flush_not_started_++; + if (num_flush_not_started_ == 1) { + imm_flush_needed.store(true, std::memory_order_release); + } +} + +// Returns an estimate of the number of bytes of data in use. +size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { + size_t total_size = 0; + for (auto& memtable : current_->memlist_) { + total_size += memtable->ApproximateMemoryUsage(); + } + return total_size; +} + +size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } + +void MemTableList::InstallNewVersion() { + if (current_->refs_ == 1) { + // we're the only one using the version, just keep using it + } else { + // somebody else holds the current version, we need to create new one + MemTableListVersion* version = current_; + current_ = new MemTableListVersion(¤t_memory_usage_, current_); + current_->Ref(); + version->Unref(); + } +} + +uint64_t MemTableList::GetMinLogContainingPrepSection() { + uint64_t min_log = 0; + + for (auto& m : current_->memlist_) { + // this mem has been flushed it no longer + // needs to hold on the its prep section + if (m->flush_completed_) { + continue; + } + + auto log = m->GetMinLogContainingPrepSection(); + + if (log > 0 && (min_log == 0 || log < min_log)) { + min_log = log; + } + } + + return min_log; +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/memtable_list.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/memtable_list.h b/thirdparty/rocksdb/db/memtable_list.h new file mode 100644 index 0000000..ed475b8 --- /dev/null +++ b/thirdparty/rocksdb/db/memtable_list.h @@ -0,0 +1,258 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#include <string> +#include <list> +#include <vector> +#include <set> +#include <deque> + +#include "db/dbformat.h" +#include "db/memtable.h" +#include "db/range_del_aggregator.h" +#include "monitoring/instrumented_mutex.h" +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "rocksdb/types.h" +#include "util/autovector.h" +#include "util/filename.h" +#include "util/log_buffer.h" + +namespace rocksdb { + +class ColumnFamilyData; +class InternalKeyComparator; +class InstrumentedMutex; +class MergeIteratorBuilder; + +// keeps a list of immutable memtables in a vector. the list is immutable +// if refcount is bigger than one. It is used as a state for Get() and +// Iterator code paths +// +// This class is not thread-safe. External synchronization is required +// (such as holding the db mutex or being on the write thread). +class MemTableListVersion { + public: + explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage, + MemTableListVersion* old = nullptr); + explicit MemTableListVersion(size_t* parent_memtable_list_memory_usage, + int max_write_buffer_number_to_maintain); + + void Ref(); + void Unref(autovector<MemTable*>* to_delete = nullptr); + + // Search all the memtables starting from the most recent one. + // Return the most recent value found, if any. + // + // If any operation was found for this key, its most recent sequence number + // will be stored in *seq on success (regardless of whether true/false is + // returned). Otherwise, *seq will be set to kMaxSequenceNumber. + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + SequenceNumber* seq, const ReadOptions& read_opts); + + bool Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { + SequenceNumber seq; + return Get(key, value, s, merge_context, range_del_agg, &seq, read_opts); + } + + // Similar to Get(), but searches the Memtable history of memtables that + // have already been flushed. Should only be used from in-memory only + // queries (such as Transaction validation) as the history may contain + // writes that are also present in the SST files. + bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, SequenceNumber* seq, + const ReadOptions& read_opts); + bool GetFromHistory(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, + const ReadOptions& read_opts) { + SequenceNumber seq; + return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq, + read_opts); + } + + Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, + RangeDelAggregator* range_del_agg); + Status AddRangeTombstoneIterators( + const ReadOptions& read_opts, + std::vector<InternalIterator*>* range_del_iters); + + void AddIterators(const ReadOptions& options, + std::vector<InternalIterator*>* iterator_list, + Arena* arena); + + void AddIterators(const ReadOptions& options, + MergeIteratorBuilder* merge_iter_builder); + + uint64_t GetTotalNumEntries() const; + + uint64_t GetTotalNumDeletes() const; + + MemTable::MemTableStats ApproximateStats(const Slice& start_ikey, + const Slice& end_ikey); + + // Returns the value of MemTable::GetEarliestSequenceNumber() on the most + // recent MemTable in this list or kMaxSequenceNumber if the list is empty. + // If include_history=true, will also search Memtables in MemTableList + // History. + SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const; + + private: + // REQUIRE: m is an immutable memtable + void Add(MemTable* m, autovector<MemTable*>* to_delete); + // REQUIRE: m is an immutable memtable + void Remove(MemTable* m, autovector<MemTable*>* to_delete); + + void TrimHistory(autovector<MemTable*>* to_delete); + + bool GetFromList(std::list<MemTable*>* list, const LookupKey& key, + std::string* value, Status* s, MergeContext* merge_context, + RangeDelAggregator* range_del_agg, SequenceNumber* seq, + const ReadOptions& read_opts); + + void AddMemTable(MemTable* m); + + void UnrefMemTable(autovector<MemTable*>* to_delete, MemTable* m); + + friend class MemTableList; + + // Immutable MemTables that have not yet been flushed. + std::list<MemTable*> memlist_; + + // MemTables that have already been flushed + // (used during Transaction validation) + std::list<MemTable*> memlist_history_; + + // Maximum number of MemTables to keep in memory (including both flushed + // and not-yet-flushed tables). + const int max_write_buffer_number_to_maintain_; + + int refs_ = 0; + + size_t* parent_memtable_list_memory_usage_; +}; + +// This class stores references to all the immutable memtables. +// The memtables are flushed to L0 as soon as possible and in +// any order. If there are more than one immutable memtable, their +// flushes can occur concurrently. However, they are 'committed' +// to the manifest in FIFO order to maintain correctness and +// recoverability from a crash. +// +// +// Other than imm_flush_needed, this class is not thread-safe and requires +// external synchronization (such as holding the db mutex or being on the +// write thread.) +class MemTableList { + public: + // A list of memtables. + explicit MemTableList(int min_write_buffer_number_to_merge, + int max_write_buffer_number_to_maintain) + : imm_flush_needed(false), + min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge), + current_(new MemTableListVersion(¤t_memory_usage_, + max_write_buffer_number_to_maintain)), + num_flush_not_started_(0), + commit_in_progress_(false), + flush_requested_(false) { + current_->Ref(); + current_memory_usage_ = 0; + } + + // Should not delete MemTableList without making sure MemTableList::current() + // is Unref()'d. + ~MemTableList() {} + + MemTableListVersion* current() { return current_; } + + // so that background threads can detect non-nullptr pointer to + // determine whether there is anything more to start flushing. + std::atomic<bool> imm_flush_needed; + + // Returns the total number of memtables in the list that haven't yet + // been flushed and logged. + int NumNotFlushed() const; + + // Returns total number of memtables in the list that have been + // completely flushed and logged. + int NumFlushed() const; + + // Returns true if there is at least one memtable on which flush has + // not yet started. + bool IsFlushPending() const; + + // Returns the earliest memtables that needs to be flushed. The returned + // memtables are guaranteed to be in the ascending order of created time. + void PickMemtablesToFlush(autovector<MemTable*>* mems); + + // Reset status of the given memtable list back to pending state so that + // they can get picked up again on the next round of flush. + void RollbackMemtableFlush(const autovector<MemTable*>& mems, + uint64_t file_number); + + // Commit a successful flush in the manifest file + Status InstallMemtableFlushResults( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + const autovector<MemTable*>& m, VersionSet* vset, InstrumentedMutex* mu, + uint64_t file_number, autovector<MemTable*>* to_delete, + Directory* db_directory, LogBuffer* log_buffer); + + // New memtables are inserted at the front of the list. + // Takes ownership of the referenced held on *m by the caller of Add(). + void Add(MemTable* m, autovector<MemTable*>* to_delete); + + // Returns an estimate of the number of bytes of data in use. + size_t ApproximateMemoryUsage(); + + // Returns an estimate of the number of bytes of data used by + // the unflushed mem-tables. + size_t ApproximateUnflushedMemTablesMemoryUsage(); + + // Request a flush of all existing memtables to storage. This will + // cause future calls to IsFlushPending() to return true if this list is + // non-empty (regardless of the min_write_buffer_number_to_merge + // parameter). This flush request will persist until the next time + // PickMemtablesToFlush() is called. + void FlushRequested() { flush_requested_ = true; } + + bool HasFlushRequested() { return flush_requested_; } + + // Copying allowed + // MemTableList(const MemTableList&); + // void operator=(const MemTableList&); + + size_t* current_memory_usage() { return ¤t_memory_usage_; } + + uint64_t GetMinLogContainingPrepSection(); + + private: + // DB mutex held + void InstallNewVersion(); + + const int min_write_buffer_number_to_merge_; + + MemTableListVersion* current_; + + // the number of elements that still need flushing + int num_flush_not_started_; + + // committing in progress + bool commit_in_progress_; + + // Requested a flush of all memtables to storage + bool flush_requested_; + + // The current memory usage. + size_t current_memory_usage_; +}; + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/merge_context.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/merge_context.h b/thirdparty/rocksdb/db/merge_context.h new file mode 100644 index 0000000..5e75e09 --- /dev/null +++ b/thirdparty/rocksdb/db/merge_context.h @@ -0,0 +1,116 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include <string> +#include <vector> +#include "db/dbformat.h" +#include "rocksdb/slice.h" + +namespace rocksdb { + +const std::vector<Slice> empty_operand_list; + +// The merge context for merging a user key. +// When doing a Get(), DB will create such a class and pass it when +// issuing Get() operation to memtables and version_set. The operands +// will be fetched from the context when issuing partial of full merge. +class MergeContext { + public: + // Clear all the operands + void Clear() { + if (operand_list_) { + operand_list_->clear(); + copied_operands_->clear(); + } + } + + // Push a merge operand + void PushOperand(const Slice& operand_slice, bool operand_pinned = false) { + Initialize(); + SetDirectionBackward(); + + if (operand_pinned) { + operand_list_->push_back(operand_slice); + } else { + // We need to have our own copy of the operand since it's not pinned + copied_operands_->emplace_back( + new std::string(operand_slice.data(), operand_slice.size())); + operand_list_->push_back(*copied_operands_->back()); + } + } + + // Push back a merge operand + void PushOperandBack(const Slice& operand_slice, + bool operand_pinned = false) { + Initialize(); + SetDirectionForward(); + + if (operand_pinned) { + operand_list_->push_back(operand_slice); + } else { + // We need to have our own copy of the operand since it's not pinned + copied_operands_->emplace_back( + new std::string(operand_slice.data(), operand_slice.size())); + operand_list_->push_back(*copied_operands_->back()); + } + } + + // return total number of operands in the list + size_t GetNumOperands() const { + if (!operand_list_) { + return 0; + } + return operand_list_->size(); + } + + // Get the operand at the index. + Slice GetOperand(int index) { + assert(operand_list_); + + SetDirectionForward(); + return (*operand_list_)[index]; + } + + // Return all the operands. + const std::vector<Slice>& GetOperands() { + if (!operand_list_) { + return empty_operand_list; + } + + SetDirectionForward(); + return *operand_list_; + } + + private: + void Initialize() { + if (!operand_list_) { + operand_list_.reset(new std::vector<Slice>()); + copied_operands_.reset(new std::vector<std::unique_ptr<std::string>>()); + } + } + + void SetDirectionForward() { + if (operands_reversed_ == true) { + std::reverse(operand_list_->begin(), operand_list_->end()); + operands_reversed_ = false; + } + } + + void SetDirectionBackward() { + if (operands_reversed_ == false) { + std::reverse(operand_list_->begin(), operand_list_->end()); + operands_reversed_ = true; + } + } + + // List of operands + std::unique_ptr<std::vector<Slice>> operand_list_; + // Copy of operands that are not pinned. + std::unique_ptr<std::vector<std::unique_ptr<std::string>>> copied_operands_; + bool operands_reversed_ = true; +}; + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/merge_helper.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/merge_helper.cc b/thirdparty/rocksdb/db/merge_helper.cc new file mode 100644 index 0000000..625de27 --- /dev/null +++ b/thirdparty/rocksdb/db/merge_helper.cc @@ -0,0 +1,365 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/merge_helper.h" + +#include <stdio.h> +#include <string> + +#include "db/dbformat.h" +#include "monitoring/perf_context_imp.h" +#include "monitoring/statistics.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/merge_operator.h" +#include "table/internal_iterator.h" + +namespace rocksdb { + +Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, const Slice* value, + const std::vector<Slice>& operands, + std::string* result, Logger* logger, + Statistics* statistics, Env* env, + Slice* result_operand, + bool update_num_ops_stats) { + assert(merge_operator != nullptr); + + if (operands.size() == 0) { + assert(value != nullptr && result != nullptr); + result->assign(value->data(), value->size()); + return Status::OK(); + } + + if (update_num_ops_stats) { + MeasureTime(statistics, READ_NUM_MERGE_OPERANDS, + static_cast<uint64_t>(operands.size())); + } + + bool success; + Slice tmp_result_operand(nullptr, 0); + const MergeOperator::MergeOperationInput merge_in(key, value, operands, + logger); + MergeOperator::MergeOperationOutput merge_out(*result, tmp_result_operand); + { + // Setup to time the merge + StopWatchNano timer(env, statistics != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + + // Do the merge + success = merge_operator->FullMergeV2(merge_in, &merge_out); + + if (tmp_result_operand.data()) { + // FullMergeV2 result is an existing operand + if (result_operand != nullptr) { + *result_operand = tmp_result_operand; + } else { + result->assign(tmp_result_operand.data(), tmp_result_operand.size()); + } + } else if (result_operand) { + *result_operand = Slice(nullptr, 0); + } + + RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, + statistics ? timer.ElapsedNanos() : 0); + } + + if (!success) { + RecordTick(statistics, NUMBER_MERGE_FAILURES); + return Status::Corruption("Error: Could not perform merge."); + } + + return Status::OK(); +} + +// PRE: iter points to the first merge type entry +// POST: iter points to the first entry beyond the merge process (or the end) +// keys_, operands_ are updated to reflect the merge result. +// keys_ stores the list of keys encountered while merging. +// operands_ stores the list of merge operands encountered while merging. +// keys_[i] corresponds to operands_[i] for each i. +Status MergeHelper::MergeUntil(InternalIterator* iter, + RangeDelAggregator* range_del_agg, + const SequenceNumber stop_before, + const bool at_bottom) { + // Get a copy of the internal key, before it's invalidated by iter->Next() + // Also maintain the list of merge operands seen. + assert(HasOperator()); + keys_.clear(); + merge_context_.Clear(); + has_compaction_filter_skip_until_ = false; + assert(user_merge_operator_); + bool first_key = true; + + // We need to parse the internal key again as the parsed key is + // backed by the internal key! + // Assume no internal key corruption as it has been successfully parsed + // by the caller. + // original_key_is_iter variable is just caching the information: + // original_key_is_iter == (iter->key().ToString() == original_key) + bool original_key_is_iter = true; + std::string original_key = iter->key().ToString(); + // Important: + // orig_ikey is backed by original_key if keys_.empty() + // orig_ikey is backed by keys_.back() if !keys_.empty() + ParsedInternalKey orig_ikey; + ParseInternalKey(original_key, &orig_ikey); + + Status s; + bool hit_the_next_user_key = false; + for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { + if (IsShuttingDown()) { + return Status::ShutdownInProgress(); + } + + ParsedInternalKey ikey; + assert(keys_.size() == merge_context_.GetNumOperands()); + + if (!ParseInternalKey(iter->key(), &ikey)) { + // stop at corrupted key + if (assert_valid_internal_key_) { + assert(!"Corrupted internal key not expected."); + return Status::Corruption("Corrupted internal key not expected."); + } + break; + } else if (first_key) { + assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)); + first_key = false; + } else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) { + // hit a different user key, stop right here + hit_the_next_user_key = true; + break; + } else if (stop_before && ikey.sequence <= stop_before) { + // hit an entry that's visible by the previous snapshot, can't touch that + break; + } + + // At this point we are guaranteed that we need to process this key. + + assert(IsValueType(ikey.type)); + if (ikey.type != kTypeMerge) { + + // hit a put/delete/single delete + // => merge the put value or a nullptr with operands_ + // => store result in operands_.back() (and update keys_.back()) + // => change the entry type to kTypeValue for keys_.back() + // We are done! Success! + + // If there are no operands, just return the Status::OK(). That will cause + // the compaction iterator to write out the key we're currently at, which + // is the put/delete we just encountered. + if (keys_.empty()) { + return Status::OK(); + } + + // TODO(noetzli) If the merge operator returns false, we are currently + // (almost) silently dropping the put/delete. That's probably not what we + // want. Also if we're in compaction and it's a put, it would be nice to + // run compaction filter on it. + const Slice val = iter->value(); + const Slice* val_ptr = (kTypeValue == ikey.type) ? &val : nullptr; + std::string merge_result; + s = TimedFullMerge(user_merge_operator_, ikey.user_key, val_ptr, + merge_context_.GetOperands(), &merge_result, logger_, + stats_, env_); + + // We store the result in keys_.back() and operands_.back() + // if nothing went wrong (i.e.: no operand corruption on disk) + if (s.ok()) { + // The original key encountered + original_key = std::move(keys_.back()); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); + keys_.clear(); + merge_context_.Clear(); + keys_.emplace_front(std::move(original_key)); + merge_context_.PushOperand(merge_result); + } + + // move iter to the next entry + iter->Next(); + return s; + } else { + // hit a merge + // => if there is a compaction filter, apply it. + // => check for range tombstones covering the operand + // => merge the operand into the front of the operands_ list + // if not filtered + // => then continue because we haven't yet seen a Put/Delete. + // + // Keep queuing keys and operands until we either meet a put / delete + // request or later did a partial merge. + + Slice value_slice = iter->value(); + // add an operand to the list if: + // 1) it's included in one of the snapshots. in that case we *must* write + // it out, no matter what compaction filter says + // 2) it's not filtered by a compaction filter + CompactionFilter::Decision filter = + ikey.sequence <= latest_snapshot_ + ? CompactionFilter::Decision::kKeep + : FilterMerge(orig_ikey.user_key, value_slice); + if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil && + range_del_agg != nullptr && + range_del_agg->ShouldDelete( + iter->key(), + RangeDelAggregator::RangePositioningMode::kForwardTraversal)) { + filter = CompactionFilter::Decision::kRemove; + } + if (filter == CompactionFilter::Decision::kKeep || + filter == CompactionFilter::Decision::kChangeValue) { + if (original_key_is_iter) { + // this is just an optimization that saves us one memcpy + keys_.push_front(std::move(original_key)); + } else { + keys_.push_front(iter->key().ToString()); + } + if (keys_.size() == 1) { + // we need to re-anchor the orig_ikey because it was anchored by + // original_key before + ParseInternalKey(keys_.back(), &orig_ikey); + } + if (filter == CompactionFilter::Decision::kKeep) { + merge_context_.PushOperand( + value_slice, iter->IsValuePinned() /* operand_pinned */); + } else { // kChangeValue + // Compaction filter asked us to change the operand from value_slice + // to compaction_filter_value_. + merge_context_.PushOperand(compaction_filter_value_, false); + } + } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { + // Compaction filter asked us to remove this key altogether + // (not just this operand), along with some keys following it. + keys_.clear(); + merge_context_.Clear(); + has_compaction_filter_skip_until_ = true; + return Status::OK(); + } + } + } + + if (merge_context_.GetNumOperands() == 0) { + // we filtered out all the merge operands + return Status::OK(); + } + + // We are sure we have seen this key's entire history if we are at the + // last level and exhausted all internal keys of this user key. + // NOTE: !iter->Valid() does not necessarily mean we hit the + // beginning of a user key, as versions of a user key might be + // split into multiple files (even files on the same level) + // and some files might not be included in the compaction/merge. + // + // There are also cases where we have seen the root of history of this + // key without being sure of it. Then, we simply miss the opportunity + // to combine the keys. Since VersionSet::SetupOtherInputs() always makes + // sure that all merge-operands on the same level get compacted together, + // this will simply lead to these merge operands moving to the next level. + // + // So, we only perform the following logic (to merge all operands together + // without a Put/Delete) if we are certain that we have seen the end of key. + bool surely_seen_the_beginning = hit_the_next_user_key && at_bottom; + if (surely_seen_the_beginning) { + // do a final merge with nullptr as the existing value and say + // bye to the merge type (it's now converted to a Put) + assert(kTypeMerge == orig_ikey.type); + assert(merge_context_.GetNumOperands() >= 1); + assert(merge_context_.GetNumOperands() == keys_.size()); + std::string merge_result; + s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, nullptr, + merge_context_.GetOperands(), &merge_result, logger_, + stats_, env_); + if (s.ok()) { + // The original key encountered + // We are certain that keys_ is not empty here (see assertions couple of + // lines before). + original_key = std::move(keys_.back()); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); + keys_.clear(); + merge_context_.Clear(); + keys_.emplace_front(std::move(original_key)); + merge_context_.PushOperand(merge_result); + } + } else { + // We haven't seen the beginning of the key nor a Put/Delete. + // Attempt to use the user's associative merge function to + // merge the stacked merge operands into a single operand. + s = Status::MergeInProgress(); + if (merge_context_.GetNumOperands() >= 2) { + bool merge_success = false; + std::string merge_result; + { + StopWatchNano timer(env_, stats_ != nullptr); + PERF_TIMER_GUARD(merge_operator_time_nanos); + merge_success = user_merge_operator_->PartialMergeMulti( + orig_ikey.user_key, + std::deque<Slice>(merge_context_.GetOperands().begin(), + merge_context_.GetOperands().end()), + &merge_result, logger_); + RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME, + stats_ ? timer.ElapsedNanosSafe() : 0); + } + if (merge_success) { + // Merging of operands (associative merge) was successful. + // Replace operands with the merge result + merge_context_.Clear(); + merge_context_.PushOperand(merge_result); + keys_.erase(keys_.begin(), keys_.end() - 1); + } + } + } + + return s; +} + +MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper) + : merge_helper_(merge_helper) { + it_keys_ = merge_helper_->keys().rend(); + it_values_ = merge_helper_->values().rend(); +} + +void MergeOutputIterator::SeekToFirst() { + const auto& keys = merge_helper_->keys(); + const auto& values = merge_helper_->values(); + assert(keys.size() == values.size()); + it_keys_ = keys.rbegin(); + it_values_ = values.rbegin(); +} + +void MergeOutputIterator::Next() { + ++it_keys_; + ++it_values_; +} + +CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key, + const Slice& value_slice) { + if (compaction_filter_ == nullptr) { + return CompactionFilter::Decision::kKeep; + } + if (stats_ != nullptr) { + filter_timer_.Start(); + } + compaction_filter_value_.clear(); + compaction_filter_skip_until_.Clear(); + auto ret = compaction_filter_->FilterV2( + level_, user_key, CompactionFilter::ValueType::kMergeOperand, value_slice, + &compaction_filter_value_, compaction_filter_skip_until_.rep()); + if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) { + if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(), + user_key) <= 0) { + // Invalid skip_until returned from compaction filter. + // Keep the key as per FilterV2 documentation. + ret = CompactionFilter::Decision::kKeep; + } else { + compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, + kValueTypeForSeek); + } + } + total_filter_time_ += filter_timer_.ElapsedNanosSafe(); + return ret; +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/merge_helper.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/merge_helper.h b/thirdparty/rocksdb/db/merge_helper.h new file mode 100644 index 0000000..59da47a --- /dev/null +++ b/thirdparty/rocksdb/db/merge_helper.h @@ -0,0 +1,209 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef MERGE_HELPER_H +#define MERGE_HELPER_H + +#include <deque> +#include <string> +#include <vector> + +#include "db/dbformat.h" +#include "db/merge_context.h" +#include "db/range_del_aggregator.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "util/stop_watch.h" + +namespace rocksdb { + +class Comparator; +class Iterator; +class Logger; +class MergeOperator; +class Statistics; +class InternalIterator; + +class MergeHelper { + public: + MergeHelper(Env* env, const Comparator* user_comparator, + const MergeOperator* user_merge_operator, + const CompactionFilter* compaction_filter, Logger* logger, + bool assert_valid_internal_key, SequenceNumber latest_snapshot, + int level = 0, Statistics* stats = nullptr, + const std::atomic<bool>* shutting_down = nullptr) + : env_(env), + user_comparator_(user_comparator), + user_merge_operator_(user_merge_operator), + compaction_filter_(compaction_filter), + shutting_down_(shutting_down), + logger_(logger), + assert_valid_internal_key_(assert_valid_internal_key), + latest_snapshot_(latest_snapshot), + level_(level), + keys_(), + filter_timer_(env_), + total_filter_time_(0U), + stats_(stats) { + assert(user_comparator_ != nullptr); + } + + // Wrapper around MergeOperator::FullMergeV2() that records perf statistics. + // Result of merge will be written to result if status returned is OK. + // If operands is empty, the value will simply be copied to result. + // Set `update_num_ops_stats` to true if it is from a user read, so that + // the latency is sensitive. + // Returns one of the following statuses: + // - OK: Entries were successfully merged. + // - Corruption: Merge operator reported unsuccessful merge. + static Status TimedFullMerge(const MergeOperator* merge_operator, + const Slice& key, const Slice* value, + const std::vector<Slice>& operands, + std::string* result, Logger* logger, + Statistics* statistics, Env* env, + Slice* result_operand = nullptr, + bool update_num_ops_stats = false); + + // Merge entries until we hit + // - a corrupted key + // - a Put/Delete, + // - a different user key, + // - a specific sequence number (snapshot boundary), + // - REMOVE_AND_SKIP_UNTIL returned from compaction filter, + // or - the end of iteration + // iter: (IN) points to the first merge type entry + // (OUT) points to the first entry not included in the merge process + // range_del_agg: (IN) filters merge operands covered by range tombstones. + // stop_before: (IN) a sequence number that merge should not cross. + // 0 means no restriction + // at_bottom: (IN) true if the iterator covers the bottem level, which means + // we could reach the start of the history of this user key. + // + // Returns one of the following statuses: + // - OK: Entries were successfully merged. + // - MergeInProgress: Put/Delete not encountered, and didn't reach the start + // of key's history. Output consists of merge operands only. + // - Corruption: Merge operator reported unsuccessful merge or a corrupted + // key has been encountered and not expected (applies only when compiling + // with asserts removed). + // - ShutdownInProgress: interrupted by shutdown (*shutting_down == true). + // + // REQUIRED: The first key in the input is not corrupted. + Status MergeUntil(InternalIterator* iter, + RangeDelAggregator* range_del_agg = nullptr, + const SequenceNumber stop_before = 0, + const bool at_bottom = false); + + // Filters a merge operand using the compaction filter specified + // in the constructor. Returns the decision that the filter made. + // Uses compaction_filter_value_ and compaction_filter_skip_until_ for the + // optional outputs of compaction filter. + CompactionFilter::Decision FilterMerge(const Slice& user_key, + const Slice& value_slice); + + // Query the merge result + // These are valid until the next MergeUntil call + // If the merging was successful: + // - keys() contains a single element with the latest sequence number of + // the merges. The type will be Put or Merge. See IMPORTANT 1 note, below. + // - values() contains a single element with the result of merging all the + // operands together + // + // IMPORTANT 1: the key type could change after the MergeUntil call. + // Put/Delete + Merge + ... + Merge => Put + // Merge + ... + Merge => Merge + // + // If the merge operator is not associative, and if a Put/Delete is not found + // then the merging will be unsuccessful. In this case: + // - keys() contains the list of internal keys seen in order of iteration. + // - values() contains the list of values (merges) seen in the same order. + // values() is parallel to keys() so that the first entry in + // keys() is the key associated with the first entry in values() + // and so on. These lists will be the same length. + // All of these pairs will be merges over the same user key. + // See IMPORTANT 2 note below. + // + // IMPORTANT 2: The entries were traversed in order from BACK to FRONT. + // So keys().back() was the first key seen by iterator. + // TODO: Re-style this comment to be like the first one + const std::deque<std::string>& keys() const { return keys_; } + const std::vector<Slice>& values() const { + return merge_context_.GetOperands(); + } + uint64_t TotalFilterTime() const { return total_filter_time_; } + bool HasOperator() const { return user_merge_operator_ != nullptr; } + + // If compaction filter returned REMOVE_AND_SKIP_UNTIL, this method will + // return true and fill *until with the key to which we should skip. + // If true, keys() and values() are empty. + bool FilteredUntil(Slice* skip_until) const { + if (!has_compaction_filter_skip_until_) { + return false; + } + assert(compaction_filter_ != nullptr); + assert(skip_until != nullptr); + assert(compaction_filter_skip_until_.Valid()); + *skip_until = compaction_filter_skip_until_.Encode(); + return true; + } + + private: + Env* env_; + const Comparator* user_comparator_; + const MergeOperator* user_merge_operator_; + const CompactionFilter* compaction_filter_; + const std::atomic<bool>* shutting_down_; + Logger* logger_; + bool assert_valid_internal_key_; // enforce no internal key corruption? + SequenceNumber latest_snapshot_; + int level_; + + // the scratch area that holds the result of MergeUntil + // valid up to the next MergeUntil call + + // Keeps track of the sequence of keys seen + std::deque<std::string> keys_; + // Parallel with keys_; stores the operands + mutable MergeContext merge_context_; + + StopWatchNano filter_timer_; + uint64_t total_filter_time_; + Statistics* stats_; + + bool has_compaction_filter_skip_until_ = false; + std::string compaction_filter_value_; + InternalKey compaction_filter_skip_until_; + + bool IsShuttingDown() { + // This is a best-effort facility, so memory_order_relaxed is sufficient. + return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); + } +}; + +// MergeOutputIterator can be used to iterate over the result of a merge. +class MergeOutputIterator { + public: + // The MergeOutputIterator is bound to a MergeHelper instance. + explicit MergeOutputIterator(const MergeHelper* merge_helper); + + // Seeks to the first record in the output. + void SeekToFirst(); + // Advances to the next record in the output. + void Next(); + + Slice key() { return Slice(*it_keys_); } + Slice value() { return Slice(*it_values_); } + bool Valid() { return it_keys_ != merge_helper_->keys().rend(); } + + private: + const MergeHelper* merge_helper_; + std::deque<std::string>::const_reverse_iterator it_keys_; + std::vector<Slice>::const_reverse_iterator it_values_; +}; + +} // namespace rocksdb + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/merge_operator.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/merge_operator.cc b/thirdparty/rocksdb/db/merge_operator.cc new file mode 100644 index 0000000..1981e65 --- /dev/null +++ b/thirdparty/rocksdb/db/merge_operator.cc @@ -0,0 +1,86 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +/** + * Back-end implementation details specific to the Merge Operator. + */ + +#include "rocksdb/merge_operator.h" + +namespace rocksdb { + +bool MergeOperator::FullMergeV2(const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + // If FullMergeV2 is not implemented, we convert the operand_list to + // std::deque<std::string> and pass it to FullMerge + std::deque<std::string> operand_list_str; + for (auto& op : merge_in.operand_list) { + operand_list_str.emplace_back(op.data(), op.size()); + } + return FullMerge(merge_in.key, merge_in.existing_value, operand_list_str, + &merge_out->new_value, merge_in.logger); +} + +// The default implementation of PartialMergeMulti, which invokes +// PartialMerge multiple times internally and merges two operands at +// a time. +bool MergeOperator::PartialMergeMulti(const Slice& key, + const std::deque<Slice>& operand_list, + std::string* new_value, + Logger* logger) const { + assert(operand_list.size() >= 2); + // Simply loop through the operands + Slice temp_slice(operand_list[0]); + + for (size_t i = 1; i < operand_list.size(); ++i) { + auto& operand = operand_list[i]; + std::string temp_value; + if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) { + return false; + } + swap(temp_value, *new_value); + temp_slice = Slice(*new_value); + } + + // The result will be in *new_value. All merges succeeded. + return true; +} + +// Given a "real" merge from the library, call the user's +// associative merge function one-by-one on each of the operands. +// NOTE: It is assumed that the client's merge-operator will handle any errors. +bool AssociativeMergeOperator::FullMergeV2( + const MergeOperationInput& merge_in, + MergeOperationOutput* merge_out) const { + // Simply loop through the operands + Slice temp_existing; + const Slice* existing_value = merge_in.existing_value; + for (const auto& operand : merge_in.operand_list) { + std::string temp_value; + if (!Merge(merge_in.key, existing_value, operand, &temp_value, + merge_in.logger)) { + return false; + } + swap(temp_value, merge_out->new_value); + temp_existing = Slice(merge_out->new_value); + existing_value = &temp_existing; + } + + // The result will be in *new_value. All merges succeeded. + return true; +} + +// Call the user defined simple merge on the operands; +// NOTE: It is assumed that the client's merge-operator will handle any errors. +bool AssociativeMergeOperator::PartialMerge( + const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const { + return Merge(key, &left_operand, right_operand, new_value, logger); +} + +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/pinned_iterators_manager.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/pinned_iterators_manager.h b/thirdparty/rocksdb/db/pinned_iterators_manager.h new file mode 100644 index 0000000..7874eef --- /dev/null +++ b/thirdparty/rocksdb/db/pinned_iterators_manager.h @@ -0,0 +1,87 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once +#include <algorithm> +#include <memory> +#include <utility> +#include <vector> + +#include "table/internal_iterator.h" + +namespace rocksdb { + +// PinnedIteratorsManager will be notified whenever we need to pin an Iterator +// and it will be responsible for deleting pinned Iterators when they are +// not needed anymore. +class PinnedIteratorsManager : public Cleanable { + public: + PinnedIteratorsManager() : pinning_enabled(false) {} + ~PinnedIteratorsManager() { + if (pinning_enabled) { + ReleasePinnedData(); + } + } + + // Enable Iterators pinning + void StartPinning() { + assert(pinning_enabled == false); + pinning_enabled = true; + } + + // Is pinning enabled ? + bool PinningEnabled() { return pinning_enabled; } + + // Take ownership of iter and delete it when ReleasePinnedData() is called + void PinIterator(InternalIterator* iter, bool arena = false) { + if (arena) { + PinPtr(iter, &PinnedIteratorsManager::ReleaseArenaInternalIterator); + } else { + PinPtr(iter, &PinnedIteratorsManager::ReleaseInternalIterator); + } + } + + typedef void (*ReleaseFunction)(void* arg1); + void PinPtr(void* ptr, ReleaseFunction release_func) { + assert(pinning_enabled); + if (ptr == nullptr) { + return; + } + pinned_ptrs_.emplace_back(ptr, release_func); + } + + // Release pinned Iterators + inline void ReleasePinnedData() { + assert(pinning_enabled == true); + pinning_enabled = false; + + // Remove duplicate pointers + std::sort(pinned_ptrs_.begin(), pinned_ptrs_.end()); + auto unique_end = std::unique(pinned_ptrs_.begin(), pinned_ptrs_.end()); + + for (auto i = pinned_ptrs_.begin(); i != unique_end; ++i) { + void* ptr = i->first; + ReleaseFunction release_func = i->second; + release_func(ptr); + } + pinned_ptrs_.clear(); + // Also do cleanups from the base Cleanable + Cleanable::Reset(); + } + + private: + static void ReleaseInternalIterator(void* ptr) { + delete reinterpret_cast<InternalIterator*>(ptr); + } + + static void ReleaseArenaInternalIterator(void* ptr) { + reinterpret_cast<InternalIterator*>(ptr)->~InternalIterator(); + } + + bool pinning_enabled; + std::vector<std::pair<void*, ReleaseFunction>> pinned_ptrs_; +}; + +} // namespace rocksdb
