http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/db_impl.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/db_impl.cc b/thirdparty/rocksdb/db/db_impl.cc new file mode 100644 index 0000000..cdba039 --- /dev/null +++ b/thirdparty/rocksdb/db/db_impl.cc @@ -0,0 +1,2824 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db/db_impl.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif +#include <inttypes.h> +#include <stdint.h> +#ifdef OS_SOLARIS +#include <alloca.h> +#endif + +#include <algorithm> +#include <climits> +#include <cstdio> +#include <map> +#include <set> +#include <stdexcept> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <utility> +#include <vector> + +#include "db/builder.h" +#include "db/compaction_job.h" +#include "db/db_info_dumper.h" +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "db/event_helpers.h" +#include "db/external_sst_file_ingestion_job.h" +#include "db/flush_job.h" +#include "db/forward_iterator.h" +#include "db/job_context.h" +#include "db/log_reader.h" +#include "db/log_writer.h" +#include "db/malloc_stats.h" +#include "db/managed_iterator.h" +#include "db/memtable.h" +#include "db/memtable_list.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" +#include "db/range_del_aggregator.h" +#include "db/table_cache.h" +#include "db/table_properties_collector.h" +#include "db/transaction_log_impl.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "db/write_callback.h" +#include "memtable/hash_linklist_rep.h" +#include "memtable/hash_skiplist_rep.h" +#include "monitoring/iostats_context_imp.h" +#include "monitoring/perf_context_imp.h" +#include "monitoring/thread_status_updater.h" +#include "monitoring/thread_status_util.h" +#include "options/cf_options.h" +#include "options/options_helper.h" +#include "options/options_parser.h" +#include "port/likely.h" +#include "port/port.h" +#include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/convenience.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" +#include "rocksdb/table.h" +#include "rocksdb/version.h" +#include "rocksdb/write_buffer_manager.h" +#include "table/block.h" +#include "table/block_based_table_factory.h" +#include "table/merging_iterator.h" +#include "table/table_builder.h" +#include "table/two_level_iterator.h" +#include "tools/sst_dump_tool_imp.h" +#include "util/auto_roll_logger.h" +#include "util/autovector.h" +#include "util/build_version.h" +#include "util/coding.h" +#include "util/compression.h" +#include "util/crc32c.h" +#include "util/file_reader_writer.h" +#include "util/file_util.h" +#include "util/filename.h" +#include "util/log_buffer.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/sst_file_manager_impl.h" +#include "util/stop_watch.h" +#include "util/string_util.h" +#include "util/sync_point.h" + +namespace rocksdb { +const std::string kDefaultColumnFamilyName("default"); +void DumpRocksDBBuildVersion(Logger * log); + +CompressionType GetCompressionFlush( + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options) { + // Compressing memtable flushes might not help unless the sequential load + // optimization is used for leveled compaction. Otherwise the CPU and + // latency overhead is not offset by saving much space. + if (ioptions.compaction_style == kCompactionStyleUniversal) { + if (ioptions.compaction_options_universal.compression_size_percent < 0) { + return mutable_cf_options.compression; + } else { + return kNoCompression; + } + } else if (!ioptions.compression_per_level.empty()) { + // For leveled compress when min_level_to_compress != 0. + return ioptions.compression_per_level[0]; + } else { + return mutable_cf_options.compression; + } +} + +namespace { +void DumpSupportInfo(Logger* logger) { + ROCKS_LOG_HEADER(logger, "Compression algorithms supported:"); + ROCKS_LOG_HEADER(logger, "\tSnappy supported: %d", Snappy_Supported()); + ROCKS_LOG_HEADER(logger, "\tZlib supported: %d", Zlib_Supported()); + ROCKS_LOG_HEADER(logger, "\tBzip supported: %d", BZip2_Supported()); + ROCKS_LOG_HEADER(logger, "\tLZ4 supported: %d", LZ4_Supported()); + ROCKS_LOG_HEADER(logger, "\tZSTD supported: %d", ZSTD_Supported()); + ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %d", + crc32c::IsFastCrc32Supported()); +} + +int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024; +} // namespace + +DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) + : env_(options.env), + dbname_(dbname), + initial_db_options_(SanitizeOptions(dbname, options)), + immutable_db_options_(initial_db_options_), + mutable_db_options_(initial_db_options_), + stats_(immutable_db_options_.statistics.get()), + db_lock_(nullptr), + mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, + immutable_db_options_.use_adaptive_mutex), + shutting_down_(false), + bg_cv_(&mutex_), + logfile_number_(0), + log_dir_synced_(false), + log_empty_(true), + default_cf_handle_(nullptr), + log_sync_cv_(&mutex_), + total_log_size_(0), + max_total_in_memory_state_(0), + is_snapshot_supported_(true), + write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), + write_thread_(immutable_db_options_), + nonmem_write_thread_(immutable_db_options_), + write_controller_(mutable_db_options_.delayed_write_rate), + // Use delayed_write_rate as a base line to determine the initial + // low pri write rate limit. It may be adjusted later. + low_pri_write_rate_limiter_(NewGenericRateLimiter(std::min( + static_cast<int64_t>(mutable_db_options_.delayed_write_rate / 8), + kDefaultLowPriThrottledRate))), + last_batch_group_size_(0), + unscheduled_flushes_(0), + unscheduled_compactions_(0), + bg_bottom_compaction_scheduled_(0), + bg_compaction_scheduled_(0), + num_running_compactions_(0), + bg_flush_scheduled_(0), + num_running_flushes_(0), + bg_purge_scheduled_(0), + disable_delete_obsolete_files_(0), + delete_obsolete_files_last_run_(env_->NowMicros()), + last_stats_dump_time_microsec_(0), + next_job_id_(1), + has_unpersisted_data_(false), + unable_to_flush_oldest_log_(false), + env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), + num_running_ingest_file_(0), +#ifndef ROCKSDB_LITE + wal_manager_(immutable_db_options_, env_options_), +#endif // ROCKSDB_LITE + event_logger_(immutable_db_options_.info_log.get()), + bg_work_paused_(0), + bg_compaction_paused_(0), + refitting_level_(false), + opened_successfully_(false), + concurrent_prepare_(options.concurrent_prepare), + manual_wal_flush_(options.manual_wal_flush) { + env_->GetAbsolutePath(dbname, &db_absolute_path_); + + // Reserve ten files or so for other uses and give the rest to TableCache. + // Give a large number for setting of "infinite" open files. + const int table_cache_size = (mutable_db_options_.max_open_files == -1) + ? TableCache::kInfiniteCapacity + : mutable_db_options_.max_open_files - 10; + table_cache_ = NewLRUCache(table_cache_size, + immutable_db_options_.table_cache_numshardbits); + + versions_.reset(new VersionSet(dbname_, &immutable_db_options_, env_options_, + table_cache_.get(), write_buffer_manager_, + &write_controller_)); + column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); + + DumpRocksDBBuildVersion(immutable_db_options_.info_log.get()); + DumpDBFileSummary(immutable_db_options_, dbname_); + immutable_db_options_.Dump(immutable_db_options_.info_log.get()); + mutable_db_options_.Dump(immutable_db_options_.info_log.get()); + DumpSupportInfo(immutable_db_options_.info_log.get()); +} + +// Will lock the mutex_, will wait for completion if wait is true +void DBImpl::CancelAllBackgroundWork(bool wait) { + InstrumentedMutexLock l(&mutex_); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Shutdown: canceling all background work"); + + if (!shutting_down_.load(std::memory_order_acquire) && + has_unpersisted_data_.load(std::memory_order_relaxed) && + !mutable_db_options_.avoid_flush_during_shutdown) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { + cfd->Ref(); + mutex_.Unlock(); + FlushMemTable(cfd, FlushOptions()); + mutex_.Lock(); + cfd->Unref(); + } + } + versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); + } + + shutting_down_.store(true, std::memory_order_release); + bg_cv_.SignalAll(); + if (!wait) { + return; + } + // Wait for background work to finish + while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || + bg_flush_scheduled_) { + bg_cv_.Wait(); + } +} + +DBImpl::~DBImpl() { + // CancelAllBackgroundWork called with false means we just set the shutdown + // marker. After this we do a variant of the waiting and unschedule work + // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) + CancelAllBackgroundWork(false); + int bottom_compactions_unscheduled = + env_->UnSchedule(this, Env::Priority::BOTTOM); + int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW); + int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH); + mutex_.Lock(); + bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled; + bg_compaction_scheduled_ -= compactions_unscheduled; + bg_flush_scheduled_ -= flushes_unscheduled; + + // Wait for background work to finish + while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || + bg_flush_scheduled_ || bg_purge_scheduled_) { + TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); + bg_cv_.Wait(); + } + EraseThreadStatusDbInfo(); + flush_scheduler_.Clear(); + + while (!flush_queue_.empty()) { + auto cfd = PopFirstFromFlushQueue(); + if (cfd->Unref()) { + delete cfd; + } + } + while (!compaction_queue_.empty()) { + auto cfd = PopFirstFromCompactionQueue(); + if (cfd->Unref()) { + delete cfd; + } + } + + if (default_cf_handle_ != nullptr) { + // we need to delete handle outside of lock because it does its own locking + mutex_.Unlock(); + delete default_cf_handle_; + mutex_.Lock(); + } + + // Clean up obsolete files due to SuperVersion release. + // (1) Need to delete to obsolete files before closing because RepairDB() + // scans all existing files in the file system and builds manifest file. + // Keeping obsolete files confuses the repair process. + // (2) Need to check if we Open()/Recover() the DB successfully before + // deleting because if VersionSet recover fails (may be due to corrupted + // manifest file), it is not able to identify live files correctly. As a + // result, all "live" files can get deleted by accident. However, corrupted + // manifest is recoverable by RepairDB(). + if (opened_successfully_) { + JobContext job_context(next_job_id_.fetch_add(1)); + FindObsoleteFiles(&job_context, true); + + mutex_.Unlock(); + // manifest number starting from 2 + job_context.manifest_file_number = 1; + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + mutex_.Lock(); + } + + for (auto l : logs_to_free_) { + delete l; + } + for (auto& log : logs_) { + log.ClearWriter(); + } + logs_.clear(); + + // Table cache may have table handles holding blocks from the block cache. + // We need to release them before the block cache is destroyed. The block + // cache may be destroyed inside versions_.reset(), when column family data + // list is destroyed, so leaving handles in table cache after + // versions_.reset() may cause issues. + // Here we clean all unreferenced handles in table cache. + // Now we assume all user queries have finished, so only version set itself + // can possibly hold the blocks from block cache. After releasing unreferenced + // handles here, only handles held by version set left and inside + // versions_.reset(), we will release them. There, we need to make sure every + // time a handle is released, we erase it from the cache too. By doing that, + // we can guarantee that after versions_.reset(), table cache is empty + // so the cache can be safely destroyed. + table_cache_->EraseUnRefEntries(); + + for (auto& txn_entry : recovered_transactions_) { + delete txn_entry.second; + } + + // versions need to be destroyed before table_cache since it can hold + // references to table_cache. + versions_.reset(); + mutex_.Unlock(); + if (db_lock_ != nullptr) { + env_->UnlockFile(db_lock_); + } + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete"); + LogFlush(immutable_db_options_.info_log); +} + +void DBImpl::MaybeIgnoreError(Status* s) const { + if (s->ok() || immutable_db_options_.paranoid_checks) { + // No change needed + } else { + ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s", + s->ToString().c_str()); + *s = Status::OK(); + } +} + +const Status DBImpl::CreateArchivalDirectory() { + if (immutable_db_options_.wal_ttl_seconds > 0 || + immutable_db_options_.wal_size_limit_mb > 0) { + std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir); + return env_->CreateDirIfMissing(archivalPath); + } + return Status::OK(); +} + +void DBImpl::PrintStatistics() { + auto dbstats = immutable_db_options_.statistics.get(); + if (dbstats) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, "STATISTICS:\n %s", + dbstats->ToString().c_str()); + } +} + +void DBImpl::MaybeDumpStats() { + mutex_.Lock(); + unsigned int stats_dump_period_sec = + mutable_db_options_.stats_dump_period_sec; + mutex_.Unlock(); + if (stats_dump_period_sec == 0) return; + + const uint64_t now_micros = env_->NowMicros(); + + if (last_stats_dump_time_microsec_ + stats_dump_period_sec * 1000000 <= + now_micros) { + // Multiple threads could race in here simultaneously. + // However, the last one will update last_stats_dump_time_microsec_ + // atomically. We could see more than one dump during one dump + // period in rare cases. + last_stats_dump_time_microsec_ = now_micros; + +#ifndef ROCKSDB_LITE + const DBPropertyInfo* cf_property_info = + GetPropertyInfo(DB::Properties::kCFStats); + assert(cf_property_info != nullptr); + const DBPropertyInfo* db_property_info = + GetPropertyInfo(DB::Properties::kDBStats); + assert(db_property_info != nullptr); + + std::string stats; + { + InstrumentedMutexLock l(&mutex_); + default_cf_internal_stats_->GetStringProperty( + *db_property_info, DB::Properties::kDBStats, &stats); + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->initialized()) { + cfd->internal_stats()->GetStringProperty( + *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, + &stats); + } + } + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->initialized()) { + cfd->internal_stats()->GetStringProperty( + *cf_property_info, DB::Properties::kCFFileHistogram, &stats); + } + } + } + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "------- DUMPING STATS -------"); + ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str()); + if (immutable_db_options_.dump_malloc_stats) { + stats.clear(); + DumpMallocStats(&stats); + if (!stats.empty()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "------- Malloc STATS -------"); + ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str()); + } + } +#endif // !ROCKSDB_LITE + + PrintStatistics(); + } +} + +void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { + if (!job_context->logs_to_free.empty()) { + for (auto l : job_context->logs_to_free) { + AddToLogsToFreeQueue(l); + } + job_context->logs_to_free.clear(); + SchedulePurge(); + } +} + +Directory* DBImpl::Directories::GetDataDir(size_t path_id) { + assert(path_id < data_dirs_.size()); + Directory* ret_dir = data_dirs_[path_id].get(); + if (ret_dir == nullptr) { + // Should use db_dir_ + return db_dir_.get(); + } + return ret_dir; +} + +Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, + const std::unordered_map<std::string, std::string>& options_map) { +#ifdef ROCKSDB_LITE + return Status::NotSupported("Not supported in ROCKSDB LITE"); +#else + auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); + if (options_map.empty()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "SetOptions() on column family [%s], empty input", + cfd->GetName().c_str()); + return Status::InvalidArgument("empty input"); + } + + MutableCFOptions new_options; + Status s; + Status persist_options_status; + WriteThread::Writer w; + { + InstrumentedMutexLock l(&mutex_); + s = cfd->SetOptions(options_map); + if (s.ok()) { + new_options = *cfd->GetLatestMutableCFOptions(); + // Append new version to recompute compaction score. + VersionEdit dummy_edit; + versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_, + directories_.GetDbDir()); + // Trigger possible flush/compactions. This has to be before we persist + // options to file, otherwise there will be a deadlock with writer + // thread. + auto* old_sv = + InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); + delete old_sv; + + persist_options_status = WriteOptionsFile( + false /*need_mutex_lock*/, true /*need_enter_write_thread*/); + } + } + + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "SetOptions() on column family [%s], inputs:", + cfd->GetName().c_str()); + for (const auto& o : options_map) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(), + o.second.c_str()); + } + if (s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] SetOptions() succeeded", cfd->GetName().c_str()); + new_options.Dump(immutable_db_options_.info_log.get()); + if (!persist_options_status.ok()) { + s = persist_options_status; + } + } else { + ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed", + cfd->GetName().c_str()); + } + LogFlush(immutable_db_options_.info_log); + return s; +#endif // ROCKSDB_LITE +} + +Status DBImpl::SetDBOptions( + const std::unordered_map<std::string, std::string>& options_map) { +#ifdef ROCKSDB_LITE + return Status::NotSupported("Not supported in ROCKSDB LITE"); +#else + if (options_map.empty()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "SetDBOptions(), empty input."); + return Status::InvalidArgument("empty input"); + } + + MutableDBOptions new_options; + Status s; + Status persist_options_status; + WriteThread::Writer w; + WriteContext write_context; + { + InstrumentedMutexLock l(&mutex_); + s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map, + &new_options); + if (s.ok()) { + if (new_options.max_background_compactions > + mutable_db_options_.max_background_compactions) { + env_->IncBackgroundThreadsIfNeeded( + new_options.max_background_compactions, Env::Priority::LOW); + MaybeScheduleFlushOrCompaction(); + } + + write_controller_.set_max_delayed_write_rate(new_options.delayed_write_rate); + table_cache_.get()->SetCapacity(new_options.max_open_files == -1 + ? TableCache::kInfiniteCapacity + : new_options.max_open_files - 10); + + mutable_db_options_ = new_options; + + write_thread_.EnterUnbatched(&w, &mutex_); + if (total_log_size_ > GetMaxTotalWalSize()) { + Status purge_wal_status = HandleWALFull(&write_context); + if (!purge_wal_status.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Unable to purge WAL files in SetDBOptions() -- %s", + purge_wal_status.ToString().c_str()); + } + } + persist_options_status = WriteOptionsFile( + false /*need_mutex_lock*/, false /*need_enter_write_thread*/); + write_thread_.ExitUnbatched(&w); + } + } + ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:"); + for (const auto& o : options_map) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(), + o.second.c_str()); + } + if (s.ok()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded"); + new_options.Dump(immutable_db_options_.info_log.get()); + if (!persist_options_status.ok()) { + if (immutable_db_options_.fail_if_options_file_error) { + s = Status::IOError( + "SetDBOptions() succeeded, but unable to persist options", + persist_options_status.ToString()); + } + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Unable to persist options in SetDBOptions() -- %s", + persist_options_status.ToString().c_str()); + } + } else { + ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed"); + } + LogFlush(immutable_db_options_.info_log); + return s; +#endif // ROCKSDB_LITE +} + +// return the same level if it cannot be moved +int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, + const MutableCFOptions& mutable_cf_options, int level) { + mutex_.AssertHeld(); + const auto* vstorage = cfd->current()->storage_info(); + int minimum_level = level; + for (int i = level - 1; i > 0; --i) { + // stop if level i is not empty + if (vstorage->NumLevelFiles(i) > 0) break; + // stop if level i is too small (cannot fit the level files) + if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) { + break; + } + + minimum_level = i; + } + return minimum_level; +} + +Status DBImpl::FlushWAL(bool sync) { + { + // We need to lock log_write_mutex_ since logs_ might change concurrently + InstrumentedMutexLock wl(&log_write_mutex_); + log::Writer* cur_log_writer = logs_.back().writer; + auto s = cur_log_writer->WriteBuffer(); + if (!s.ok()) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", + s.ToString().c_str()); + } + if (!sync) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); + return s; + } + } + // sync = true + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true"); + return SyncWAL(); +} + +Status DBImpl::SyncWAL() { + autovector<log::Writer*, 1> logs_to_sync; + bool need_log_dir_sync; + uint64_t current_log_number; + + { + InstrumentedMutexLock l(&mutex_); + assert(!logs_.empty()); + + // This SyncWAL() call only cares about logs up to this number. + current_log_number = logfile_number_; + + while (logs_.front().number <= current_log_number && + logs_.front().getting_synced) { + log_sync_cv_.Wait(); + } + // First check that logs are safe to sync in background. + for (auto it = logs_.begin(); + it != logs_.end() && it->number <= current_log_number; ++it) { + if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) { + return Status::NotSupported( + "SyncWAL() is not supported for this implementation of WAL file", + immutable_db_options_.allow_mmap_writes + ? "try setting Options::allow_mmap_writes to false" + : Slice()); + } + } + for (auto it = logs_.begin(); + it != logs_.end() && it->number <= current_log_number; ++it) { + auto& log = *it; + assert(!log.getting_synced); + log.getting_synced = true; + logs_to_sync.push_back(log.writer); + } + + need_log_dir_sync = !log_dir_synced_; + } + + TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); + RecordTick(stats_, WAL_FILE_SYNCED); + Status status; + for (log::Writer* log : logs_to_sync) { + status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync); + if (!status.ok()) { + break; + } + } + if (status.ok() && need_log_dir_sync) { + status = directories_.GetWalDir()->Fsync(); + } + TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); + + TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); + { + InstrumentedMutexLock l(&mutex_); + MarkLogsSynced(current_log_number, need_log_dir_sync, status); + } + TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); + + return status; +} + +void DBImpl::MarkLogsSynced( + uint64_t up_to, bool synced_dir, const Status& status) { + mutex_.AssertHeld(); + if (synced_dir && + logfile_number_ == up_to && + status.ok()) { + log_dir_synced_ = true; + } + for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { + auto& log = *it; + assert(log.getting_synced); + if (status.ok() && logs_.size() > 1) { + logs_to_free_.push_back(log.ReleaseWriter()); + it = logs_.erase(it); + } else { + log.getting_synced = false; + ++it; + } + } + assert(!status.ok() || logs_.empty() || logs_[0].number > up_to || + (logs_.size() == 1 && !logs_[0].getting_synced)); + log_sync_cv_.SignalAll(); +} + +SequenceNumber DBImpl::GetLatestSequenceNumber() const { + return versions_->LastSequence(); +} + +InternalIterator* DBImpl::NewInternalIterator( + Arena* arena, RangeDelAggregator* range_del_agg, + ColumnFamilyHandle* column_family) { + ColumnFamilyData* cfd; + if (column_family == nullptr) { + cfd = default_cf_handle_->cfd(); + } else { + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + cfd = cfh->cfd(); + } + + mutex_.Lock(); + SuperVersion* super_version = cfd->GetSuperVersion()->Ref(); + mutex_.Unlock(); + ReadOptions roptions; + return NewInternalIterator(roptions, cfd, super_version, arena, + range_del_agg); +} + +void DBImpl::SchedulePurge() { + mutex_.AssertHeld(); + assert(opened_successfully_); + + // Purge operations are put into High priority queue + bg_purge_scheduled_++; + env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr); +} + +void DBImpl::BackgroundCallPurge() { + mutex_.Lock(); + + // We use one single loop to clear both queues so that after existing the loop + // both queues are empty. This is stricter than what is needed, but can make + // it easier for us to reason the correctness. + while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) { + if (!purge_queue_.empty()) { + auto purge_file = purge_queue_.begin(); + auto fname = purge_file->fname; + auto type = purge_file->type; + auto number = purge_file->number; + auto path_id = purge_file->path_id; + auto job_id = purge_file->job_id; + purge_queue_.pop_front(); + + mutex_.Unlock(); + Status file_deletion_status; + DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number, + path_id); + mutex_.Lock(); + } else { + assert(!logs_to_free_queue_.empty()); + log::Writer* log_writer = *(logs_to_free_queue_.begin()); + logs_to_free_queue_.pop_front(); + mutex_.Unlock(); + delete log_writer; + mutex_.Lock(); + } + } + bg_purge_scheduled_--; + + bg_cv_.SignalAll(); + // IMPORTANT:there should be no code after calling SignalAll. This call may + // signal the DB destructor that it's OK to proceed with destruction. In + // that case, all DB variables will be dealloacated and referencing them + // will cause trouble. + mutex_.Unlock(); +} + +namespace { +struct IterState { + IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version, + bool _background_purge) + : db(_db), + mu(_mu), + super_version(_super_version), + background_purge(_background_purge) {} + + DBImpl* db; + InstrumentedMutex* mu; + SuperVersion* super_version; + bool background_purge; +}; + +static void CleanupIteratorState(void* arg1, void* arg2) { + IterState* state = reinterpret_cast<IterState*>(arg1); + + if (state->super_version->Unref()) { + // Job id == 0 means that this is not our background process, but rather + // user thread + JobContext job_context(0); + + state->mu->Lock(); + state->super_version->Cleanup(); + state->db->FindObsoleteFiles(&job_context, false, true); + if (state->background_purge) { + state->db->ScheduleBgLogWriterClose(&job_context); + } + state->mu->Unlock(); + + delete state->super_version; + if (job_context.HaveSomethingToDelete()) { + if (state->background_purge) { + // PurgeObsoleteFiles here does not delete files. Instead, it adds the + // files to be deleted to a job queue, and deletes it in a separate + // background thread. + state->db->PurgeObsoleteFiles(job_context, true /* schedule only */); + state->mu->Lock(); + state->db->SchedulePurge(); + state->mu->Unlock(); + } else { + state->db->PurgeObsoleteFiles(job_context); + } + } + job_context.Clean(); + } + + delete state; +} +} // namespace + +InternalIterator* DBImpl::NewInternalIterator( + const ReadOptions& read_options, ColumnFamilyData* cfd, + SuperVersion* super_version, Arena* arena, + RangeDelAggregator* range_del_agg) { + InternalIterator* internal_iter; + assert(arena != nullptr); + assert(range_del_agg != nullptr); + // Need to create internal iterator from the arena. + MergeIteratorBuilder merge_iter_builder( + &cfd->internal_comparator(), arena, + !read_options.total_order_seek && + cfd->ioptions()->prefix_extractor != nullptr); + // Collect iterator for mutable mem + merge_iter_builder.AddIterator( + super_version->mem->NewIterator(read_options, arena)); + std::unique_ptr<InternalIterator> range_del_iter; + Status s; + if (!read_options.ignore_range_deletions) { + range_del_iter.reset( + super_version->mem->NewRangeTombstoneIterator(read_options)); + s = range_del_agg->AddTombstones(std::move(range_del_iter)); + } + // Collect all needed child iterators for immutable memtables + if (s.ok()) { + super_version->imm->AddIterators(read_options, &merge_iter_builder); + if (!read_options.ignore_range_deletions) { + s = super_version->imm->AddRangeTombstoneIterators(read_options, arena, + range_del_agg); + } + } + if (s.ok()) { + // Collect iterators for files in L0 - Ln + if (read_options.read_tier != kMemtableTier) { + super_version->current->AddIterators(read_options, env_options_, + &merge_iter_builder, range_del_agg); + } + internal_iter = merge_iter_builder.Finish(); + IterState* cleanup = + new IterState(this, &mutex_, super_version, + read_options.background_purge_on_iterator_cleanup); + internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); + + return internal_iter; + } + return NewErrorInternalIterator(s); +} + +ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { + return default_cf_handle_; +} + +Status DBImpl::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) { + return GetImpl(read_options, column_family, key, value); +} + +Status DBImpl::GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* pinnable_val, bool* value_found) { + assert(pinnable_val != nullptr); + StopWatch sw(env_, stats_, DB_GET); + PERF_TIMER_GUARD(get_snapshot_time); + + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + auto cfd = cfh->cfd(); + + // Acquire SuperVersion + SuperVersion* sv = GetAndRefSuperVersion(cfd); + + TEST_SYNC_POINT("DBImpl::GetImpl:1"); + TEST_SYNC_POINT("DBImpl::GetImpl:2"); + + SequenceNumber snapshot; + if (read_options.snapshot != nullptr) { + snapshot = reinterpret_cast<const SnapshotImpl*>( + read_options.snapshot)->number_; + } else { + // Since we get and reference the super version before getting + // the snapshot number, without a mutex protection, it is possible + // that a memtable switch happened in the middle and not all the + // data for this snapshot is available. But it will contain all + // the data available in the super version we have, which is also + // a valid snapshot to read from. + // We shouldn't get snapshot before finding and referencing the + // super versipon because a flush happening in between may compact + // away data for the snapshot, but the snapshot is earlier than the + // data overwriting it, so users may see wrong results. + snapshot = versions_->LastSequence(); + } + TEST_SYNC_POINT("DBImpl::GetImpl:3"); + TEST_SYNC_POINT("DBImpl::GetImpl:4"); + + // Prepare to store a list of merge operations if merge occurs. + MergeContext merge_context; + RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); + + Status s; + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // merge_operands will contain the sequence of merges in the latter case. + LookupKey lkey(key, snapshot); + PERF_TIMER_STOP(get_snapshot_time); + + bool skip_memtable = (read_options.read_tier == kPersistedTier && + has_unpersisted_data_.load(std::memory_order_relaxed)); + bool done = false; + if (!skip_memtable) { + if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &range_del_agg, read_options)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &range_del_agg, read_options)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + return s; + } + } + if (!done) { + PERF_TIMER_GUARD(get_from_output_files_time); + sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context, + &range_del_agg, value_found); + RecordTick(stats_, MEMTABLE_MISS); + } + + { + PERF_TIMER_GUARD(get_post_process_time); + + ReturnAndCleanupSuperVersion(cfd, sv); + + RecordTick(stats_, NUMBER_KEYS_READ); + size_t size = pinnable_val->size(); + RecordTick(stats_, BYTES_READ, size); + MeasureTime(stats_, BYTES_PER_READ, size); + } + return s; +} + +std::vector<Status> DBImpl::MultiGet( + const ReadOptions& read_options, + const std::vector<ColumnFamilyHandle*>& column_family, + const std::vector<Slice>& keys, std::vector<std::string>* values) { + + StopWatch sw(env_, stats_, DB_MULTIGET); + PERF_TIMER_GUARD(get_snapshot_time); + + SequenceNumber snapshot; + + struct MultiGetColumnFamilyData { + ColumnFamilyData* cfd; + SuperVersion* super_version; + }; + std::unordered_map<uint32_t, MultiGetColumnFamilyData*> multiget_cf_data; + // fill up and allocate outside of mutex + for (auto cf : column_family) { + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf); + auto cfd = cfh->cfd(); + if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) { + auto mgcfd = new MultiGetColumnFamilyData(); + mgcfd->cfd = cfd; + multiget_cf_data.insert({cfd->GetID(), mgcfd}); + } + } + + mutex_.Lock(); + if (read_options.snapshot != nullptr) { + snapshot = reinterpret_cast<const SnapshotImpl*>( + read_options.snapshot)->number_; + } else { + snapshot = versions_->LastSequence(); + } + for (auto mgd_iter : multiget_cf_data) { + mgd_iter.second->super_version = + mgd_iter.second->cfd->GetSuperVersion()->Ref(); + } + mutex_.Unlock(); + + // Contain a list of merge operations if merge occurs. + MergeContext merge_context; + + // Note: this always resizes the values array + size_t num_keys = keys.size(); + std::vector<Status> stat_list(num_keys); + values->resize(num_keys); + + // Keep track of bytes that we read for statistics-recording later + uint64_t bytes_read = 0; + PERF_TIMER_STOP(get_snapshot_time); + + // For each of the given keys, apply the entire "get" process as follows: + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // merge_operands will contain the sequence of merges in the latter case. + for (size_t i = 0; i < num_keys; ++i) { + merge_context.Clear(); + Status& s = stat_list[i]; + std::string* value = &(*values)[i]; + + LookupKey lkey(keys[i], snapshot); + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]); + RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(), + snapshot); + auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); + assert(mgd_iter != multiget_cf_data.end()); + auto mgd = mgd_iter->second; + auto super_version = mgd->super_version; + bool skip_memtable = + (read_options.read_tier == kPersistedTier && + has_unpersisted_data_.load(std::memory_order_relaxed)); + bool done = false; + if (!skip_memtable) { + if (super_version->mem->Get(lkey, value, &s, &merge_context, + &range_del_agg, read_options)) { + done = true; + // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? + } else if (super_version->imm->Get(lkey, value, &s, &merge_context, + &range_del_agg, read_options)) { + done = true; + // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? + } + } + if (!done) { + PinnableSlice pinnable_val; + PERF_TIMER_GUARD(get_from_output_files_time); + super_version->current->Get(read_options, lkey, &pinnable_val, &s, + &merge_context, &range_del_agg); + value->assign(pinnable_val.data(), pinnable_val.size()); + // TODO(?): RecordTick(stats_, MEMTABLE_MISS)? + } + + if (s.ok()) { + bytes_read += value->size(); + } + } + + // Post processing (decrement reference counts and record statistics) + PERF_TIMER_GUARD(get_post_process_time); + autovector<SuperVersion*> superversions_to_delete; + + // TODO(icanadi) do we need lock here or just around Cleanup()? + mutex_.Lock(); + for (auto mgd_iter : multiget_cf_data) { + auto mgd = mgd_iter.second; + if (mgd->super_version->Unref()) { + mgd->super_version->Cleanup(); + superversions_to_delete.push_back(mgd->super_version); + } + } + mutex_.Unlock(); + + for (auto td : superversions_to_delete) { + delete td; + } + for (auto mgd : multiget_cf_data) { + delete mgd.second; + } + + RecordTick(stats_, NUMBER_MULTIGET_CALLS); + RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys); + RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read); + MeasureTime(stats_, BYTES_PER_MULTIGET, bytes_read); + PERF_TIMER_STOP(get_post_process_time); + + return stat_list; +} + +Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, + const std::string& column_family, + ColumnFamilyHandle** handle) { + assert(handle != nullptr); + Status s = CreateColumnFamilyImpl(cf_options, column_family, handle); + if (s.ok()) { + s = WriteOptionsFile(true /*need_mutex_lock*/, + true /*need_enter_write_thread*/); + } + return s; +} + +Status DBImpl::CreateColumnFamilies( + const ColumnFamilyOptions& cf_options, + const std::vector<std::string>& column_family_names, + std::vector<ColumnFamilyHandle*>* handles) { + assert(handles != nullptr); + handles->clear(); + size_t num_cf = column_family_names.size(); + Status s; + bool success_once = false; + for (size_t i = 0; i < num_cf; i++) { + ColumnFamilyHandle* handle; + s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle); + if (!s.ok()) { + break; + } + handles->push_back(handle); + success_once = true; + } + if (success_once) { + Status persist_options_status = WriteOptionsFile( + true /*need_mutex_lock*/, true /*need_enter_write_thread*/); + if (s.ok() && !persist_options_status.ok()) { + s = persist_options_status; + } + } + return s; +} + +Status DBImpl::CreateColumnFamilies( + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles) { + assert(handles != nullptr); + handles->clear(); + size_t num_cf = column_families.size(); + Status s; + bool success_once = false; + for (size_t i = 0; i < num_cf; i++) { + ColumnFamilyHandle* handle; + s = CreateColumnFamilyImpl(column_families[i].options, + column_families[i].name, &handle); + if (!s.ok()) { + break; + } + handles->push_back(handle); + success_once = true; + } + if (success_once) { + Status persist_options_status = WriteOptionsFile( + true /*need_mutex_lock*/, true /*need_enter_write_thread*/); + if (s.ok() && !persist_options_status.ok()) { + s = persist_options_status; + } + } + return s; +} + +Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, + const std::string& column_family_name, + ColumnFamilyHandle** handle) { + Status s; + Status persist_options_status; + *handle = nullptr; + + s = CheckCompressionSupported(cf_options); + if (s.ok() && immutable_db_options_.allow_concurrent_memtable_write) { + s = CheckConcurrentWritesSupported(cf_options); + } + if (!s.ok()) { + return s; + } + + { + InstrumentedMutexLock l(&mutex_); + + if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) != + nullptr) { + return Status::InvalidArgument("Column family already exists"); + } + VersionEdit edit; + edit.AddColumnFamily(column_family_name); + uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); + edit.SetColumnFamily(new_id); + edit.SetLogNumber(logfile_number_); + edit.SetComparatorName(cf_options.comparator->Name()); + + // LogAndApply will both write the creation in MANIFEST and create + // ColumnFamilyData object + { // write thread + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + // LogAndApply will both write the creation in MANIFEST and create + // ColumnFamilyData object + s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit, + &mutex_, directories_.GetDbDir(), false, + &cf_options); + write_thread_.ExitUnbatched(&w); + } + if (s.ok()) { + single_column_family_mode_ = false; + auto* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); + assert(cfd != nullptr); + delete InstallSuperVersionAndScheduleWork( + cfd, nullptr, *cfd->GetLatestMutableCFOptions()); + + if (!cfd->mem()->IsSnapshotSupported()) { + is_snapshot_supported_ = false; + } + + cfd->set_initialized(); + + *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Created column family [%s] (ID %u)", + column_family_name.c_str(), (unsigned)cfd->GetID()); + } else { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, + "Creating column family [%s] FAILED -- %s", + column_family_name.c_str(), s.ToString().c_str()); + } + } // InstrumentedMutexLock l(&mutex_) + + // this is outside the mutex + if (s.ok()) { + NewThreadStatusCfInfo( + reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd()); + } + return s; +} + +Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { + assert(column_family != nullptr); + Status s = DropColumnFamilyImpl(column_family); + if (s.ok()) { + s = WriteOptionsFile(true /*need_mutex_lock*/, + true /*need_enter_write_thread*/); + } + return s; +} + +Status DBImpl::DropColumnFamilies( + const std::vector<ColumnFamilyHandle*>& column_families) { + Status s; + bool success_once = false; + for (auto* handle : column_families) { + s = DropColumnFamilyImpl(handle); + if (!s.ok()) { + break; + } + success_once = true; + } + if (success_once) { + Status persist_options_status = WriteOptionsFile( + true /*need_mutex_lock*/, true /*need_enter_write_thread*/); + if (s.ok() && !persist_options_status.ok()) { + s = persist_options_status; + } + } + return s; +} + +Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) { + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + auto cfd = cfh->cfd(); + if (cfd->GetID() == 0) { + return Status::InvalidArgument("Can't drop default column family"); + } + + bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported(); + + VersionEdit edit; + edit.DropColumnFamily(); + edit.SetColumnFamily(cfd->GetID()); + + Status s; + { + InstrumentedMutexLock l(&mutex_); + if (cfd->IsDropped()) { + s = Status::InvalidArgument("Column family already dropped!\n"); + } + if (s.ok()) { + // we drop column family from a single write thread + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_); + write_thread_.ExitUnbatched(&w); + } + + if (!cf_support_snapshot) { + // Dropped Column Family doesn't support snapshot. Need to recalculate + // is_snapshot_supported_. + bool new_is_snapshot_supported = true; + for (auto c : *versions_->GetColumnFamilySet()) { + if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) { + new_is_snapshot_supported = false; + break; + } + } + is_snapshot_supported_ = new_is_snapshot_supported; + } + } + + if (s.ok()) { + // Note that here we erase the associated cf_info of the to-be-dropped + // cfd before its ref-count goes to zero to avoid having to erase cf_info + // later inside db_mutex. + EraseThreadStatusCfInfo(cfd); + assert(cfd->IsDropped()); + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Dropped column family with id %u\n", cfd->GetID()); + } else { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, + "Dropping column family with id %u FAILED -- %s\n", + cfd->GetID(), s.ToString().c_str()); + } + + return s; +} + +bool DBImpl::KeyMayExist(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value, bool* value_found) { + assert(value != nullptr); + if (value_found != nullptr) { + // falsify later if key-may-exist but can't fetch value + *value_found = true; + } + ReadOptions roptions = read_options; + roptions.read_tier = kBlockCacheTier; // read from block cache only + PinnableSlice pinnable_val; + auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found); + value->assign(pinnable_val.data(), pinnable_val.size()); + + // If block_cache is enabled and the index block of the table didn't + // not present in block_cache, the return value will be Status::Incomplete. + // In this case, key may still exist in the table. + return s.ok() || s.IsIncomplete(); +} + +Iterator* DBImpl::NewIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + if (read_options.read_tier == kPersistedTier) { + return NewErrorIterator(Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators.")); + } + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + auto cfd = cfh->cfd(); + if (read_options.managed) { +#ifdef ROCKSDB_LITE + // not supported in lite version + return NewErrorIterator(Status::InvalidArgument( + "Managed Iterators not supported in RocksDBLite.")); +#else + if ((read_options.tailing) || (read_options.snapshot != nullptr) || + (is_snapshot_supported_)) { + return new ManagedIterator(this, read_options, cfd); + } + // Managed iter not supported + return NewErrorIterator(Status::InvalidArgument( + "Managed Iterators not supported without snapshots.")); +#endif + } else if (read_options.tailing) { +#ifdef ROCKSDB_LITE + // not supported in lite version + return nullptr; +#else + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + auto iter = new ForwardIterator(this, read_options, cfd, sv); + return NewDBIterator( + env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter, + kMaxSequenceNumber, + sv->mutable_cf_options.max_sequential_skip_in_iterations); +#endif + } else { + SequenceNumber latest_snapshot = versions_->LastSequence(); + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + + auto snapshot = + read_options.snapshot != nullptr + ? reinterpret_cast<const SnapshotImpl*>( + read_options.snapshot)->number_ + : latest_snapshot; + + // Try to generate a DB iterator tree in continuous memory area to be + // cache friendly. Here is an example of result: + // +-------------------------------+ + // | | + // | ArenaWrappedDBIter | + // | + | + // | +---> Inner Iterator ------------+ + // | | | | + // | | +-- -- -- -- -- -- -- --+ | + // | +--- | Arena | | + // | | | | + // | Allocated Memory: | | + // | | +-------------------+ | + // | | | DBIter | <---+ + // | | + | + // | | | +-> iter_ ------------+ + // | | | | | + // | | +-------------------+ | + // | | | MergingIterator | <---+ + // | | + | + // | | | +->child iter1 ------------+ + // | | | | | | + // | | +->child iter2 ----------+ | + // | | | | | | | + // | | | +->child iter3 --------+ | | + // | | | | | | + // | | +-------------------+ | | | + // | | | Iterator1 | <--------+ + // | | +-------------------+ | | + // | | | Iterator2 | <------+ + // | | +-------------------+ | + // | | | Iterator3 | <----+ + // | | +-------------------+ + // | | | + // +-------+-----------------------+ + // + // ArenaWrappedDBIter inlines an arena area where all the iterators in + // the iterator tree are allocated in the order of being accessed when + // querying. + // Laying out the iterators in the order of being accessed makes it more + // likely that any iterator pointer is close to the iterator it points to so + // that they are likely to be in the same cache line and/or page. + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), snapshot, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + sv->version_number, + ((read_options.snapshot != nullptr) ? nullptr : this), cfd); + + InternalIterator* internal_iter = + NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); + db_iter->SetIterUnderDBIter(internal_iter); + + return db_iter; + } + // To stop compiler from complaining + return nullptr; +} + +Status DBImpl::NewIterators( + const ReadOptions& read_options, + const std::vector<ColumnFamilyHandle*>& column_families, + std::vector<Iterator*>* iterators) { + if (read_options.read_tier == kPersistedTier) { + return Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators."); + } + iterators->clear(); + iterators->reserve(column_families.size()); + if (read_options.managed) { +#ifdef ROCKSDB_LITE + return Status::InvalidArgument( + "Managed interator not supported in RocksDB lite"); +#else + if ((!read_options.tailing) && (read_options.snapshot == nullptr) && + (!is_snapshot_supported_)) { + return Status::InvalidArgument( + "Managed interator not supported without snapshots"); + } + for (auto cfh : column_families) { + auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); + auto iter = new ManagedIterator(this, read_options, cfd); + iterators->push_back(iter); + } +#endif + } else if (read_options.tailing) { +#ifdef ROCKSDB_LITE + return Status::InvalidArgument( + "Tailing interator not supported in RocksDB lite"); +#else + for (auto cfh : column_families) { + auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + auto iter = new ForwardIterator(this, read_options, cfd, sv); + iterators->push_back(NewDBIterator( + env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter, + kMaxSequenceNumber, + sv->mutable_cf_options.max_sequential_skip_in_iterations)); + } +#endif + } else { + SequenceNumber latest_snapshot = versions_->LastSequence(); + + for (size_t i = 0; i < column_families.size(); ++i) { + auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>( + column_families[i])->cfd(); + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + + auto snapshot = + read_options.snapshot != nullptr + ? reinterpret_cast<const SnapshotImpl*>( + read_options.snapshot)->number_ + : latest_snapshot; + + ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), snapshot, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + sv->version_number, + ((read_options.snapshot != nullptr) ? nullptr : this), cfd); + InternalIterator* internal_iter = + NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(), + db_iter->GetRangeDelAggregator()); + db_iter->SetIterUnderDBIter(internal_iter); + iterators->push_back(db_iter); + } + } + + return Status::OK(); +} + +const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); } + +#ifndef ROCKSDB_LITE +const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() { + return GetSnapshotImpl(true); +} +#endif // ROCKSDB_LITE + +const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) { + int64_t unix_time = 0; + env_->GetCurrentTime(&unix_time); // Ignore error + SnapshotImpl* s = new SnapshotImpl; + + InstrumentedMutexLock l(&mutex_); + // returns null if the underlying memtable does not support snapshot. + if (!is_snapshot_supported_) { + delete s; + return nullptr; + } + return snapshots_.New(s, versions_->LastSequence(), unix_time, + is_write_conflict_boundary); +} + +void DBImpl::ReleaseSnapshot(const Snapshot* s) { + const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s); + { + InstrumentedMutexLock l(&mutex_); + snapshots_.Delete(casted_s); + } + delete casted_s; +} + +bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) { + InstrumentedMutexLock l(&mutex_); + if (snapshots_.empty()) { + return false; + } + return (snapshots_.newest()->GetSequenceNumber() > sn); +} + +#ifndef ROCKSDB_LITE +Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, + TablePropertiesCollection* props) { + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + auto cfd = cfh->cfd(); + + // Increment the ref count + mutex_.Lock(); + auto version = cfd->current(); + version->Ref(); + mutex_.Unlock(); + + auto s = version->GetPropertiesOfAllTables(props); + + // Decrement the ref count + mutex_.Lock(); + version->Unref(); + mutex_.Unlock(); + + return s; +} + +Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family, + const Range* range, std::size_t n, + TablePropertiesCollection* props) { + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + auto cfd = cfh->cfd(); + + // Increment the ref count + mutex_.Lock(); + auto version = cfd->current(); + version->Ref(); + mutex_.Unlock(); + + auto s = version->GetPropertiesOfTablesInRange(range, n, props); + + // Decrement the ref count + mutex_.Lock(); + version->Unref(); + mutex_.Unlock(); + + return s; +} + +#endif // ROCKSDB_LITE + +const std::string& DBImpl::GetName() const { + return dbname_; +} + +Env* DBImpl::GetEnv() const { + return env_; +} + +Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const { + InstrumentedMutexLock l(&mutex_); + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_), + cfh->cfd()->GetLatestCFOptions()); +} + +DBOptions DBImpl::GetDBOptions() const { + InstrumentedMutexLock l(&mutex_); + return BuildDBOptions(immutable_db_options_, mutable_db_options_); +} + +bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, + const Slice& property, std::string* value) { + const DBPropertyInfo* property_info = GetPropertyInfo(property); + value->clear(); + auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); + if (property_info == nullptr) { + return false; + } else if (property_info->handle_int) { + uint64_t int_value; + bool ret_value = + GetIntPropertyInternal(cfd, *property_info, false, &int_value); + if (ret_value) { + *value = ToString(int_value); + } + return ret_value; + } else if (property_info->handle_string) { + InstrumentedMutexLock l(&mutex_); + return cfd->internal_stats()->GetStringProperty(*property_info, property, + value); + } + // Shouldn't reach here since exactly one of handle_string and handle_int + // should be non-nullptr. + assert(false); + return false; +} + +bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family, + const Slice& property, + std::map<std::string, double>* value) { + const DBPropertyInfo* property_info = GetPropertyInfo(property); + value->clear(); + auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); + if (property_info == nullptr) { + return false; + } else if (property_info->handle_map) { + InstrumentedMutexLock l(&mutex_); + return cfd->internal_stats()->GetMapProperty(*property_info, property, + value); + } + // If we reach this point it means that handle_map is not provided for the + // requested property + return false; +} + +bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family, + const Slice& property, uint64_t* value) { + const DBPropertyInfo* property_info = GetPropertyInfo(property); + if (property_info == nullptr || property_info->handle_int == nullptr) { + return false; + } + auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); + return GetIntPropertyInternal(cfd, *property_info, false, value); +} + +bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd, + const DBPropertyInfo& property_info, + bool is_locked, uint64_t* value) { + assert(property_info.handle_int != nullptr); + if (!property_info.need_out_of_mutex) { + if (is_locked) { + mutex_.AssertHeld(); + return cfd->internal_stats()->GetIntProperty(property_info, value, this); + } else { + InstrumentedMutexLock l(&mutex_); + return cfd->internal_stats()->GetIntProperty(property_info, value, this); + } + } else { + SuperVersion* sv = nullptr; + if (!is_locked) { + sv = GetAndRefSuperVersion(cfd); + } else { + sv = cfd->GetSuperVersion(); + } + + bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex( + property_info, sv->current, value); + + if (!is_locked) { + ReturnAndCleanupSuperVersion(cfd, sv); + } + + return ret; + } +} + +#ifndef ROCKSDB_LITE +Status DBImpl::ResetStats() { + InstrumentedMutexLock l(&mutex_); + for (auto* cfd : *versions_->GetColumnFamilySet()) { + if (cfd->initialized()) { + cfd->internal_stats()->Clear(); + } + } + return Status::OK(); +} +#endif // ROCKSDB_LITE + +bool DBImpl::GetAggregatedIntProperty(const Slice& property, + uint64_t* aggregated_value) { + const DBPropertyInfo* property_info = GetPropertyInfo(property); + if (property_info == nullptr || property_info->handle_int == nullptr) { + return false; + } + + uint64_t sum = 0; + { + // Needs mutex to protect the list of column families. + InstrumentedMutexLock l(&mutex_); + uint64_t value; + for (auto* cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->initialized()) { + continue; + } + if (GetIntPropertyInternal(cfd, *property_info, true, &value)) { + sum += value; + } else { + return false; + } + } + } + *aggregated_value = sum; + return true; +} + +SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) { + // TODO(ljin): consider using GetReferencedSuperVersion() directly + return cfd->GetThreadLocalSuperVersion(&mutex_); +} + +// REQUIRED: this function should only be called on the write thread or if the +// mutex is held. +SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) { + auto column_family_set = versions_->GetColumnFamilySet(); + auto cfd = column_family_set->GetColumnFamily(column_family_id); + if (!cfd) { + return nullptr; + } + + return GetAndRefSuperVersion(cfd); +} + +void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, + SuperVersion* sv) { + bool unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv); + + if (unref_sv) { + // Release SuperVersion + if (sv->Unref()) { + { + InstrumentedMutexLock l(&mutex_); + sv->Cleanup(); + } + delete sv; + RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS); + } + RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES); + } +} + +// REQUIRED: this function should only be called on the write thread. +void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id, + SuperVersion* sv) { + auto column_family_set = versions_->GetColumnFamilySet(); + auto cfd = column_family_set->GetColumnFamily(column_family_id); + + // If SuperVersion is held, and we successfully fetched a cfd using + // GetAndRefSuperVersion(), it must still exist. + assert(cfd != nullptr); + ReturnAndCleanupSuperVersion(cfd, sv); +} + +// REQUIRED: this function should only be called on the write thread or if the +// mutex is held. +ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) { + ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get(); + + if (!cf_memtables->Seek(column_family_id)) { + return nullptr; + } + + return cf_memtables->GetColumnFamilyHandle(); +} + +// REQUIRED: mutex is NOT held. +ColumnFamilyHandle* DBImpl::GetColumnFamilyHandleUnlocked( + uint32_t column_family_id) { + ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get(); + + InstrumentedMutexLock l(&mutex_); + + if (!cf_memtables->Seek(column_family_id)) { + return nullptr; + } + + return cf_memtables->GetColumnFamilyHandle(); +} + +void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family, + const Range& range, + uint64_t* const count, + uint64_t* const size) { + ColumnFamilyHandleImpl* cfh = + reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + SuperVersion* sv = GetAndRefSuperVersion(cfd); + + // Convert user_key into a corresponding internal key. + InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek); + InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek); + MemTable::MemTableStats memStats = + sv->mem->ApproximateStats(k1.Encode(), k2.Encode()); + MemTable::MemTableStats immStats = + sv->imm->ApproximateStats(k1.Encode(), k2.Encode()); + *count = memStats.count + immStats.count; + *size = memStats.size + immStats.size; + + ReturnAndCleanupSuperVersion(cfd, sv); +} + +void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family, + const Range* range, int n, uint64_t* sizes, + uint8_t include_flags) { + assert(include_flags & DB::SizeApproximationFlags::INCLUDE_FILES || + include_flags & DB::SizeApproximationFlags::INCLUDE_MEMTABLES); + Version* v; + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + auto cfd = cfh->cfd(); + SuperVersion* sv = GetAndRefSuperVersion(cfd); + v = sv->current; + + for (int i = 0; i < n; i++) { + // Convert user_key into a corresponding internal key. + InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); + InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); + sizes[i] = 0; + if (include_flags & DB::SizeApproximationFlags::INCLUDE_FILES) { + sizes[i] += versions_->ApproximateSize(v, k1.Encode(), k2.Encode()); + } + if (include_flags & DB::SizeApproximationFlags::INCLUDE_MEMTABLES) { + sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size; + sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size; + } + } + + ReturnAndCleanupSuperVersion(cfd, sv); +} + +std::list<uint64_t>::iterator +DBImpl::CaptureCurrentFileNumberInPendingOutputs() { + // We need to remember the iterator of our insert, because after the + // background job is done, we need to remove that element from + // pending_outputs_. + pending_outputs_.push_back(versions_->current_next_file_number()); + auto pending_outputs_inserted_elem = pending_outputs_.end(); + --pending_outputs_inserted_elem; + return pending_outputs_inserted_elem; +} + +void DBImpl::ReleaseFileNumberFromPendingOutputs( + std::list<uint64_t>::iterator v) { + pending_outputs_.erase(v); +} + +#ifndef ROCKSDB_LITE +Status DBImpl::GetUpdatesSince( + SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter, + const TransactionLogIterator::ReadOptions& read_options) { + + RecordTick(stats_, GET_UPDATES_SINCE_CALLS); + if (seq > versions_->LastSequence()) { + return Status::NotFound("Requested sequence not yet written in the db"); + } + return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get()); +} + +Status DBImpl::DeleteFile(std::string name) { + uint64_t number; + FileType type; + WalFileType log_type; + if (!ParseFileName(name, &number, &type, &log_type) || + (type != kTableFile && type != kLogFile)) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n", + name.c_str()); + return Status::InvalidArgument("Invalid file name"); + } + + Status status; + if (type == kLogFile) { + // Only allow deleting archived log files + if (log_type != kArchivedLogFile) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, + "DeleteFile %s failed - not archived log.\n", + name.c_str()); + return Status::NotSupported("Delete only supported for archived logs"); + } + status = + env_->DeleteFile(immutable_db_options_.wal_dir + "/" + name.c_str()); + if (!status.ok()) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, + "DeleteFile %s failed -- %s.\n", name.c_str(), + status.ToString().c_str()); + } + return status; + } + + int level; + FileMetaData* metadata; + ColumnFamilyData* cfd; + VersionEdit edit; + JobContext job_context(next_job_id_.fetch_add(1), true); + { + InstrumentedMutexLock l(&mutex_); + status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd); + if (!status.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "DeleteFile %s failed. File not found\n", name.c_str()); + job_context.Clean(); + return Status::InvalidArgument("File not found"); + } + assert(level < cfd->NumberLevels()); + + // If the file is being compacted no need to delete. + if (metadata->being_compacted) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "DeleteFile %s Skipped. File about to be compacted\n", + name.c_str()); + job_context.Clean(); + return Status::OK(); + } + + // Only the files in the last level can be deleted externally. + // This is to make sure that any deletion tombstones are not + // lost. Check that the level passed is the last level. + auto* vstoreage = cfd->current()->storage_info(); + for (int i = level + 1; i < cfd->NumberLevels(); i++) { + if (vstoreage->NumLevelFiles(i) != 0) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "DeleteFile %s FAILED. File not in last level\n", + name.c_str()); + job_context.Clean(); + return Status::InvalidArgument("File not in last level"); + } + } + // if level == 0, it has to be the oldest file + if (level == 0 && + vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "DeleteFile %s failed ---" + " target file in level 0 must be the oldest.", + name.c_str()); + job_context.Clean(); + return Status::InvalidArgument("File in level 0, but not oldest"); + } + edit.SetColumnFamily(cfd->GetID()); + edit.DeleteFile(level, number); + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_, directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); + } + FindObsoleteFiles(&job_context, false); + } // lock released here + + LogFlush(immutable_db_options_.info_log); + // remove files outside the db-lock + if (job_context.HaveSomethingToDelete()) { + // Call PurgeObsoleteFiles() without holding mutex. + PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + return status; +} + +Status DBImpl::DeleteFilesInRange(ColumnFamilyHandle* column_family, + const Slice* begin, const Slice* end) { + Status status; + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + VersionEdit edit; + std::vector<FileMetaData*> deleted_files; + JobContext job_context(next_job_id_.fetch_add(1), true); + { + InstrumentedMutexLock l(&mutex_); + Version* input_version = cfd->current(); + + auto* vstorage = input_version->storage_info(); + for (int i = 1; i < cfd->NumberLevels(); i++) { + if (vstorage->LevelFiles(i).empty() || + !vstorage->OverlapInLevel(i, begin, end)) { + continue; + } + std::vector<FileMetaData*> level_files; + InternalKey begin_storage, end_storage, *begin_key, *end_key; + if (begin == nullptr) { + begin_key = nullptr; + } else { + begin_storage.SetMaxPossibleForUserKey(*begin); + begin_key = &begin_storage; + } + if (end == nullptr) { + end_key = nullptr; + } else { + end_storage.SetMinPossibleForUserKey(*end); + end_key = &end_storage; + } + + vstorage->GetOverlappingInputs(i, begin_key, end_key, &level_files, -1, + nullptr, false); + FileMetaData* level_file; + for (uint32_t j = 0; j < level_files.size(); j++) { + level_file = level_files[j]; + if (((begin == nullptr) || + (cfd->internal_comparator().user_comparator()->Compare( + level_file->smallest.user_key(), *begin) >= 0)) && + ((end == nullptr) || + (cfd->internal_comparator().user_comparator()->Compare( + level_file->largest.user_key(), *end) <= 0))) { + if (level_file->being_compacted) { + continue; + } + edit.SetColumnFamily(cfd->GetID()); + edit.DeleteFile(i, level_file->fd.GetNumber()); + deleted_files.push_back(level_file); + level_file->being_compacted = true; + } + } + } + if (edit.GetDeletedFiles().empty()) { + job_context.Clean(); + return Status::OK(); + } + input_version->Ref(); + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_, directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); + } + for (auto* deleted_file : deleted_files) { + deleted_file->being_compacted = false; + } + input_version->Unref(); + FindObsoleteFiles(&job_context, false); + } // lock released here + + LogFlush(immutable_db_options_.info_log); + // remove files outside the db-lock + if (job_context.HaveSomethingToDelete()) { + // Call PurgeObsoleteFiles() without holding mutex. + PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + return status; +} + +void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) { + InstrumentedMutexLock l(&mutex_); + versions_->GetLiveFilesMetaData(metadata); +} + +void DBImpl::GetColumnFamilyMetaData( + ColumnFamilyHandle* column_family, + ColumnFamilyMetaData* cf_meta) { + assert(column_family); + auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd(); + auto* sv = GetAndRefSuperVersion(cfd); + sv->current->GetColumnFamilyMetaData(cf_meta); + ReturnAndCleanupSuperVersion(cfd, sv); +} + +#endif // ROCKSDB_LITE + +Status DBImpl::CheckConsistency() { + mutex_.AssertHeld(); + std::vector<LiveFileMetaData> metadata; + versions_->GetLiveFilesMetaData(&metadata); + + std::string corruption_messages; + for (const auto& md : metadata) { + // md.name has a leading "/". + std::string file_path = md.db_path + md.name; + + uint64_t fsize = 0; + Status s = env_->GetFileSize(file_path, &fsize); + if (!s.ok() && + env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) { + s = Status::OK(); + } + if (!s.ok()) { + corruption_messages += + "Can't access " + md.name + ": " + s.ToString() + "\n"; + } else if (fsize != md.size) { + corruption_messages += "Sst file size mismatch: " + file_path + + ". Size recorded in manifest " + + ToString(md.size) + ", actual size " + + ToString(fsize) + "\n"; + } + } + if (corruption_messages.size() == 0) { + return Status::OK(); + } else { + return Status::Corruption(corruption_messages); + } +} + +Status DBImpl::GetDbIdentity(std::string& identity) const { + std::string idfilename = IdentityFileName(dbname_); + const EnvOptions soptions; + unique_ptr<SequentialFileReader> id_file_reader; + Status s; + { + unique_ptr<SequentialFile> idfile; + s = env_->NewSequentialFile(idfilename, &idfile, soptions); + if (!s.ok()) { + return s; + } + id_file_reader.reset(new SequentialFileReader(std::move(idfile))); + } + + uint64_t file_size; + s = env_->GetFileSize(idfilename, &file_size); + if (!s.ok()) { + return s; + } + char* buffer = reinterpret_cast<char*>(alloca(file_size)); + Slice id; + s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer); + if (!s.ok()) { + return s; + } + identity.assign(id.ToString()); + // If last character is '\n' remove it from identity + if (identity.size() > 0 && identity.back() == '\n') { + identity.pop_back(); + } + return s; +} + +// Default implementation -- returns not supported status +Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options, + const std::string& column_family_name, + ColumnFamilyHandle** handle) { + return Status::NotSupported(""); +} + +Status DB::CreateColumnFamilies( + const ColumnFamilyOptions& cf_options, + const std::vector<std::string>& column_family_names, + std::vector<ColumnFamilyHandle*>* handles) { + return Status::NotSupported(""); +} + +Status DB::CreateColumnFamilies( + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles) { + return Status::NotSupported(""); +} + +Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) { + return Status::NotSupported(""); +} + +Status DB::DropColumnFamilies( + const std::vector<ColumnFamilyHandle*>& column_families) { + return Status::NotSupported(""); +} + +Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) { + delete column_family; + return Status::OK(); +} + +DB::~DB() { } + +Status DB::ListColumnFamilies(const DBOptions& db_options, + const std::string& name, + std::vector<std::string>* column_families) { + return VersionSet::ListColumnFamilies(column_families, name, db_options.env); +} + +Snapshot::~Snapshot() { +} + +Status DestroyDB(const std::string& dbname, const Options& options) { + const ImmutableDBOptions soptions(SanitizeOptions(dbname, options)); + Env* env = soptions.env; + std::vector<std::string> filenames; + + // Ignore error in case directory does not exist + env->GetChildren(dbname, &filenames); + + FileLock* lock; + const std::string lockname = LockFileName(dbname); + Status result = env->LockFile(lockname, &lock); + if (result.ok()) { + uint64_t number; + FileType type; + InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname); + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, info_log_prefix.prefix, &type) && + type != kDBLockFile) { // Lock file will be deleted at end + Status del; + std::string path_to_delete = dbname + "/" + filenames[i]; + if (type == kMetaDatabase) { + del = DestroyDB(path_to_delete, options); + } else if (type == kTableFile) { + del = DeleteSSTFile(&soptions, path_to_delete, 0); + } else { + del = env->DeleteFile(path_to_delete); + } + if (result.ok() && !del.ok()) { + result = del; + } + } + } + + for (size_t path_id = 0; path_id < options.db_paths.size(); path_id++) { + const auto& db_path = options.db_paths[path_id]; + env->GetChildren(db_path.path, &filenames); + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) && + type == kTableFile) { // Lock file will be deleted at end + std::string table_path = db_path.path + "/" + filenames[i]; + Status del = DeleteSSTFile(&soptions, table_path, + static_cast<uint32_t>(path_id)); + if (result.ok() && !del.ok()) { + result = del; + } + } + } + } + + std::vector<std::string> walDirFiles; + std::string archivedir = ArchivalDirectory(dbname); + if (dbname != soptions.wal_dir) { + env->GetChildren(soptions.wal_dir, &walDirFiles); + archivedir = ArchivalDirectory(soptions.wal_dir); + } + + // Delete log files in the WAL dir + for (const auto& file : walDirFiles) { + if (ParseFileName(file, &number, &type) && type == kLogFile) { + Status del = env->DeleteFile(LogFileName(soptions.wal_dir, number)); + if (result.ok() && !del.ok()) { + result = del; + } + } + } + + std::vector<std::string> archiveFiles; + env->GetChildren(archivedir, &archiveFiles); + // Delete archival files. + for (size_t i = 0; i < archiveFiles.size(); ++i) { + if (ParseFileName(archiveFiles[i], &number, &type) && + type == kLogFile) { + Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]); + if (result.ok() && !del.ok()) { + result = del; + } + } + } + + // ignore case where no archival directory is present + env->DeleteDir(archivedir); + + env->UnlockFile(lock); // Ignore error since state is already gone + env->DeleteFile(lockname); + env->DeleteDir(dbname); // Ignore error in case dir contains other files + env->DeleteDir(soptions.wal_dir); + } + return result; +} + +Status DBImpl::WriteOptionsFile(bool need_mutex_lock, + bool need_enter_write_thread) { +#ifndef ROCKSDB_LITE + WriteThread::Writer w; + if (need_mutex_lock) { + mutex_.Lock(); + } else { + mutex_.AssertHeld(); + } + if (need_enter_write_thread) { + write_thread_.EnterUnbatched(&w, &mutex_); + } + + std::vector<std::string> cf_names; + std::vector<ColumnFamilyOptions> cf_opts; + + // This part requires mutex to protect the column family options + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + cf_names.push_back(cfd->GetName()); + cf_opts.push_back(cfd->GetLatestCFOptions()); + } + + // Unlock during expensive operations. New writes cannot get here + // because the single write thread ensures all new writes get queued. + DBOptions db_options = + BuildDBOptions(immutable_db_options_, mutable_db_options_); + mutex_.Unlock(); + + TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1"); + TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2"); + + std::string file_name = + TempOptionsFileName(GetName(), versions_->NewFileNumber()); + Status s = + PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name, GetEnv()); + + if (s.ok()) { + s = RenameTempFileToOptionsFile(file_name); + } + // restore lock + if (!need_mutex_lock) { + mutex_.Lock(); + } + if (need_enter_write_thread) { + write_thread_.ExitUnbatched(&w); + } + if (!s.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Unnable to persist options -- %s", s.ToString().c_str()); + if (immutable_db_options_.fail_if_options_file_error) { + return Status::IOError("Unable to persist options.", + s.ToString().c_str()); + } + } +#endif // !ROCKSDB_LITE + return Status::OK(); +} + +#ifndef ROCKSDB_LITE +namespace { +void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames, + const size_t num_files_to_keep, + const std::shared_ptr<Logger>& info_log, + Env* env) { + if (filenames.size() <= num_files_to_keep) { + return; + } + for (auto iter = std::next(filenames.begin(), num_files_to_keep); + iter != filenames.end(); ++iter) { + if (!env->DeleteFile(iter->second).ok()) { + ROCKS_LOG_WARN(info_log, "Unable to delete options file %s", + iter->second.c_str()); + } + } +} +} // namespace +#endif // !ROCKSDB_LITE + +Status DBImpl::DeleteObsoleteOptionsFiles() { +#ifndef ROCKSDB_LITE + std::vector<std::string> filenames; + // use ordered map to store keep the filenames sorted from the newest + // to the oldest. + std::map<uint64_t, std::string> options_filenames; + Status s; + s = GetEnv()->GetChildren(GetName(), &filenames); + if (!s.ok()) { + return s; + } + for (auto& filename : filenames) { + uint64_t file_number; + FileType type; + if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) { + options_filenames.insert( + {std::numeric_limits<uint64_t>::max() - file_number, + GetName() + "/" + filename}); + } + } + + // Keeps the latest 2 Options file + const size_t kNumOptionsFilesKept = 2; + DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept, + immutable_db_options_.info_log, GetEnv()); + return Status::OK(); +#else + return Status::OK(); +#endif // !ROCKSDB_LITE +} + +Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) { +#ifndef ROCKSDB_LITE + Status s; + + versions_->options_file_number_ = versions_->NewFileNumber(); + std::string options_file_name = + OptionsFileName(GetName(), versions_->options_file_number_); + // Retry if the file name happen to conflict with an existing one. + s = GetEnv()->RenameFile(file_name, options_file_name); + + DeleteObsoleteOptionsFiles(); + return s; +#else + return Status::OK(); +#endif // !ROCKSDB_LITE +} + +#ifdef ROCKSDB_USING_THREAD_STATUS + +void DBImpl::NewThreadStatusCfInfo( + ColumnFamilyData* cfd) const { + if (immutable_db_options_.enable_thread_tracking) { + ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(), + cfd->ioptions()->env); + } +} + +void DBImpl::EraseThreadStatusCfInfo( + ColumnFamilyData* cfd) const { + if (immutable_db_options_.enable_thread_tracking) { + ThreadStatusUtil::EraseColumnFamilyInfo(cfd); + } +} + +void DBImpl::EraseThreadStatusDbInfo() const { + if (immutable_db_options_.enable_thread_tracking) { + ThreadStatusUtil::EraseDatabaseInfo(this); + } +} + +#else +void DBImpl::NewThreadStatusCfInfo( + ColumnFamilyData* cfd) const { +} + +void DBImpl::EraseThreadStatusCfInfo( + ColumnFamilyData* cfd) const { +} + +void DBImpl::EraseThreadStatusDbInfo() const { +} +#endif // ROCKSDB_USING_THREAD_STATUS + +// +// A global method that can dump out the build version +void DumpRocksDBBuildVersion(Logger * log) { +#if !defined(IOS_CROSS_COMPILE) + // if we compile with Xcode, we don't r
<TRUNCATED>
