http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/leveldb-1.18/db/corruption_test.cc ---------------------------------------------------------------------- diff --git a/thirdparty/leveldb-1.18/db/corruption_test.cc b/thirdparty/leveldb-1.18/db/corruption_test.cc deleted file mode 100755 index 52ab7e2..0000000 --- a/thirdparty/leveldb-1.18/db/corruption_test.cc +++ /dev/null @@ -1,388 +0,0 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#include "leveldb/db.h" - -#include <errno.h> -#include <fcntl.h> -#include <sys/stat.h> -#include <sys/types.h> -#include "leveldb/cache.h" -#include "leveldb/env.h" -#include "leveldb/table.h" -#include "leveldb/write_batch.h" -#include "db/db_impl.h" -#include "db/filename.h" -#include "db/log_format.h" -#include "db/version_set.h" -#include "util/logging.h" -#include "util/testharness.h" -#include "util/testutil.h" - -namespace leveldb { - -static const int kValueSize = 1000; - -class CorruptionTest { - public: - test::ErrorEnv env_; - std::string dbname_; - Cache* tiny_cache_; - Options options_; - DB* db_; - - CorruptionTest() { - tiny_cache_ = NewLRUCache(100); - options_.env = &env_; - options_.block_cache = tiny_cache_; - dbname_ = test::TmpDir() + "/db_test"; - DestroyDB(dbname_, options_); - - db_ = NULL; - options_.create_if_missing = true; - Reopen(); - options_.create_if_missing = false; - } - - ~CorruptionTest() { - delete db_; - DestroyDB(dbname_, Options()); - delete tiny_cache_; - } - - Status TryReopen() { - delete db_; - db_ = NULL; - return DB::Open(options_, dbname_, &db_); - } - - void Close() { - delete db_; - db_ = nullptr; - } - - void Reopen() { - ASSERT_OK(TryReopen()); - } - - void RepairDB() { - delete db_; - db_ = NULL; - ASSERT_OK(::leveldb::RepairDB(dbname_, options_)); - } - - void Build(int n) { - std::string key_space, value_space; - WriteBatch batch; - for (int i = 0; i < n; i++) { - //if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n); - Slice key = Key(i, &key_space); - batch.Clear(); - batch.Put(key, Value(i, &value_space)); - WriteOptions options; - // Corrupt() doesn't work without this sync on windows; stat reports 0 for - // the file size. - if (i == n - 1) { - options.sync = true; - } - ASSERT_OK(db_->Write(options, &batch)); - } - } - - void Check(int min_expected, int max_expected) { - int next_expected = 0; - int missed = 0; - int bad_keys = 0; - int bad_values = 0; - int correct = 0; - std::string value_space; - Iterator* iter = db_->NewIterator(ReadOptions()); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - uint64_t key; - Slice in(iter->key()); - if (in == "" || in == "~") { - // Ignore boundary keys. - continue; - } - if (!ConsumeDecimalNumber(&in, &key) || - !in.empty() || - key < next_expected) { - bad_keys++; - continue; - } - missed += static_cast<int>(key - next_expected); - next_expected = static_cast<int>(key + 1); - if (iter->value() != Value(static_cast<int>(key), &value_space)) { - bad_values++; - } else { - correct++; - } - } - delete iter; - - fprintf(stderr, - "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%d\n", - min_expected, max_expected, correct, bad_keys, bad_values, missed); - ASSERT_LE(min_expected, correct); - ASSERT_GE(max_expected, correct); - } - - void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) { - // Pick file to corrupt - std::vector<std::string> filenames; - ASSERT_OK(env_.GetChildren(dbname_, &filenames)); - uint64_t number; - FileType type; - std::string fname; - int picked_number = -1; - for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type) && - type == filetype && - int(number) > picked_number) { // Pick latest file - fname = dbname_ + "/" + filenames[i]; - picked_number = static_cast<int>(number); - } - } - ASSERT_TRUE(!fname.empty()) << filetype; - - struct stat sbuf; - if (stat(fname.c_str(), &sbuf) != 0) { - const char* msg = strerror(errno); - ASSERT_TRUE(false) << fname << ": " << msg; - } - - if (offset < 0) { - // Relative to end of file; make it absolute - if (-offset > sbuf.st_size) { - offset = 0; - } else { - offset = sbuf.st_size + offset; - } - } - if (offset > sbuf.st_size) { - offset = sbuf.st_size; - } - if (offset + bytes_to_corrupt > sbuf.st_size) { - bytes_to_corrupt = sbuf.st_size - offset; - } - - // Do it - std::string contents; - Status s = ReadFileToString(Env::Default(), fname, &contents); - ASSERT_TRUE(s.ok()) << s.ToString(); - for (int i = 0; i < bytes_to_corrupt; i++) { - contents[i + offset] ^= 0x80; - } - s = WriteStringToFile(Env::Default(), contents, fname); - ASSERT_TRUE(s.ok()) << s.ToString(); - } - - int Property(const std::string& name) { - std::string property; - int result; - if (db_->GetProperty(name, &property) && - sscanf(property.c_str(), "%d", &result) == 1) { - return result; - } else { - return -1; - } - } - - // Return the ith key - Slice Key(int i, std::string* storage) { - char buf[100]; - snprintf(buf, sizeof(buf), "%016d", i); - storage->assign(buf, strlen(buf)); - return Slice(*storage); - } - - // Return the value to associate with the specified key - Slice Value(int k, std::string* storage) { - Random r(k); - return test::RandomString(&r, kValueSize, storage); - } -}; - -TEST(CorruptionTest, Recovery) { - Build(100); - Check(100, 100); - Close(); - Corrupt(kLogFile, 19, 1); // WriteBatch tag for first record - Corrupt(kLogFile, log::kBlockSize + 1000, 1); // Somewhere in second block - Reopen(); - - // The 64 records in the first two log blocks are completely lost. - Check(36, 36); -} - -TEST(CorruptionTest, RecoverWriteError) { - env_.writable_file_error_ = true; - Status s = TryReopen(); - ASSERT_TRUE(!s.ok()); -} - -TEST(CorruptionTest, NewFileErrorDuringWrite) { - // Do enough writing to force minor compaction - env_.writable_file_error_ = true; - const int num = 3 + static_cast<int>(Options().write_buffer_size / kValueSize); - std::string value_storage; - Status s; - for (int i = 0; s.ok() && i < num; i++) { - WriteBatch batch; - batch.Put("a", Value(100, &value_storage)); - s = db_->Write(WriteOptions(), &batch); - } - ASSERT_TRUE(!s.ok()); - ASSERT_GE(env_.num_writable_file_errors_, 1); - env_.writable_file_error_ = false; - Reopen(); -} - -TEST(CorruptionTest, TableFile) { - Build(100); - DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); - dbi->TEST_CompactMemTable(); - dbi->TEST_CompactRange(0, NULL, NULL); - dbi->TEST_CompactRange(1, NULL, NULL); - Close(); - Corrupt(kTableFile, 100, 1); - Reopen(); - Check(90, 99); -} - -TEST(CorruptionTest, TableFileRepair) { - options_.block_size = 2 * kValueSize; // Limit scope of corruption - options_.paranoid_checks = true; - Reopen(); - Build(100); - DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); - dbi->TEST_CompactMemTable(); - dbi->TEST_CompactRange(0, NULL, NULL); - dbi->TEST_CompactRange(1, NULL, NULL); - Close(); - Corrupt(kTableFile, 100, 1); - Reopen(); - Check(95, 99); -} - -TEST(CorruptionTest, TableFileIndexData) { - Build(10000); // Enough to build multiple Tables - DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); - dbi->TEST_CompactMemTable(); - Close(); - Corrupt(kTableFile, -2000, 500); - Reopen(); - Check(5000, 9999); -} - -TEST(CorruptionTest, MissingDescriptor) { - Build(1000); - RepairDB(); - Reopen(); - Check(1000, 1000); -} - -TEST(CorruptionTest, SequenceNumberRecovery) { - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1")); - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2")); - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3")); - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4")); - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5")); - RepairDB(); - Reopen(); - std::string v; - ASSERT_OK(db_->Get(ReadOptions(), "foo", &v)); - ASSERT_EQ("v5", v); - // Write something. If sequence number was not recovered properly, - // it will be hidden by an earlier write. - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6")); - ASSERT_OK(db_->Get(ReadOptions(), "foo", &v)); - ASSERT_EQ("v6", v); - Reopen(); - ASSERT_OK(db_->Get(ReadOptions(), "foo", &v)); - ASSERT_EQ("v6", v); -} - -TEST(CorruptionTest, CorruptedDescriptor) { - ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello")); - DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); - dbi->TEST_CompactMemTable(); - dbi->TEST_CompactRange(0, NULL, NULL); - Close(); - Corrupt(kDescriptorFile, 0, 1000); - Status s = TryReopen(); - ASSERT_TRUE(!s.ok()); - - RepairDB(); - Reopen(); - std::string v; - ASSERT_OK(db_->Get(ReadOptions(), "foo", &v)); - ASSERT_EQ("hello", v); -} - -TEST(CorruptionTest, CompactionInputError) { - Build(10); - DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); - dbi->TEST_CompactMemTable(); - const int last = config::kMaxMemCompactLevel; - ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last))); - Close(); - Corrupt(kTableFile, 100, 1); - Reopen(); - Check(5, 9); - - // Force compactions by writing lots of values - Build(10000); - Check(10000, 10000); - - Close(); -} - -TEST(CorruptionTest, CompactionInputErrorParanoid) { - options_.paranoid_checks = true; - options_.write_buffer_size = 512 << 10; - - // Make multiple inputs so we need to compact. - for (int i = 0; i < 2; i++) { - Reopen(); - Build(10); - reinterpret_cast<DBImpl*>(db_)->TEST_CompactMemTable(); - Close(); - Corrupt(kTableFile, 100, 1); - env_.SleepForMicroseconds(100000); - } - - Reopen(); - reinterpret_cast<DBImpl*>(db_)->CompactRange(NULL, NULL); - - // Write must fail because of corrupted table - std::string tmp1, tmp2; - Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2)); - ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db"; -} - -TEST(CorruptionTest, UnrelatedKeys) { - Build(10); - DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); - dbi->TEST_CompactMemTable(); - Close(); - Corrupt(kTableFile, 100, 1); - Reopen(); - dbi = reinterpret_cast<DBImpl*>(db_); - - std::string tmp1, tmp2; - ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2))); - std::string v; - ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v)); - ASSERT_EQ(Value(1000, &tmp2).ToString(), v); - dbi->TEST_CompactMemTable(); - ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v)); - ASSERT_EQ(Value(1000, &tmp2).ToString(), v); -} - -} // namespace leveldb - -int main(int argc, char** argv) { - return leveldb::test::RunAllTests(); -}
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/leveldb-1.18/db/db_bench.cc ---------------------------------------------------------------------- diff --git a/thirdparty/leveldb-1.18/db/db_bench.cc b/thirdparty/leveldb-1.18/db/db_bench.cc deleted file mode 100755 index 817064a..0000000 --- a/thirdparty/leveldb-1.18/db/db_bench.cc +++ /dev/null @@ -1,979 +0,0 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#include <sys/types.h> -#include <stdio.h> -#include <stdlib.h> -#include "db/db_impl.h" -#include "db/version_set.h" -#include "leveldb/cache.h" -#include "leveldb/db.h" -#include "leveldb/env.h" -#include "leveldb/write_batch.h" -#include "port/port.h" -#include "util/crc32c.h" -#include "util/histogram.h" -#include "util/mutexlock.h" -#include "util/random.h" -#include "util/testutil.h" - -// Comma-separated list of operations to run in the specified order -// Actual benchmarks: -// fillseq -- write N values in sequential key order in async mode -// fillrandom -- write N values in random key order in async mode -// overwrite -- overwrite N values in random key order in async mode -// fillsync -- write N/100 values in random key order in sync mode -// fill100K -- write N/1000 100K values in random order in async mode -// deleteseq -- delete N keys in sequential order -// deleterandom -- delete N keys in random order -// readseq -- read N times sequentially -// readreverse -- read N times in reverse order -// readrandom -- read N times in random order -// readmissing -- read N missing keys in random order -// readhot -- read N times in random order from 1% section of DB -// seekrandom -- N random seeks -// crc32c -- repeated crc32c of 4K of data -// acquireload -- load N*1000 times -// Meta operations: -// compact -- Compact the entire DB -// stats -- Print DB stats -// sstables -- Print sstable info -// heapprofile -- Dump a heap profile (if supported by this port) -static const char* FLAGS_benchmarks = - "fillseq," - "fillsync," - "fillrandom," - "overwrite," - "readrandom," - "readrandom," // Extra run to allow previous compactions to quiesce - "readseq," - "readreverse," - "compact," - "readrandom," - "readseq," - "readreverse," - "fill100K," - "crc32c," - "snappycomp," - "snappyuncomp," - "acquireload," - ; - -// Number of key/values to place in database -static int FLAGS_num = 1000000; - -// Number of read operations to do. If negative, do FLAGS_num reads. -static int FLAGS_reads = -1; - -// Number of concurrent threads to run. -static int FLAGS_threads = 1; - -// Size of each value -static int FLAGS_value_size = 100; - -// Arrange to generate values that shrink to this fraction of -// their original size after compression -static double FLAGS_compression_ratio = 0.5; - -// Print histogram of operation timings -static bool FLAGS_histogram = false; - -// Number of bytes to buffer in memtable before compacting -// (initialized to default value by "main") -static int FLAGS_write_buffer_size = 0; - -// Number of bytes to use as a cache of uncompressed data. -// Negative means use default settings. -static int FLAGS_cache_size = -1; - -// Maximum number of files to keep open at the same time (use default if == 0) -static int FLAGS_open_files = 0; - -// Bloom filter bits per key. -// Negative means use default settings. -static int FLAGS_bloom_bits = -1; - -// If true, do not destroy the existing database. If you set this -// flag and also specify a benchmark that wants a fresh database, that -// benchmark will fail. -static bool FLAGS_use_existing_db = false; - -// Use the db with the following name. -static const char* FLAGS_db = NULL; - -namespace leveldb { - -namespace { - -// Helper for quickly generating random data. -class RandomGenerator { - private: - std::string data_; - int pos_; - - public: - RandomGenerator() { - // We use a limited amount of data over and over again and ensure - // that it is larger than the compression window (32KB), and also - // large enough to serve all typical value sizes we want to write. - Random rnd(301); - std::string piece; - while (data_.size() < 1048576) { - // Add a short fragment that is as compressible as specified - // by FLAGS_compression_ratio. - test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece); - data_.append(piece); - } - pos_ = 0; - } - - Slice Generate(size_t len) { - if (pos_ + len > data_.size()) { - pos_ = 0; - assert(len < data_.size()); - } - pos_ += static_cast<int>(len); - return Slice(data_.data() + pos_ - len, len); - } -}; - -static Slice TrimSpace(Slice s) { - size_t start = 0; - while (start < s.size() && isspace(s[start])) { - start++; - } - size_t limit = s.size(); - while (limit > start && isspace(s[limit-1])) { - limit--; - } - return Slice(s.data() + start, limit - start); -} - -static void AppendWithSpace(std::string* str, Slice msg) { - if (msg.empty()) return; - if (!str->empty()) { - str->push_back(' '); - } - str->append(msg.data(), msg.size()); -} - -class Stats { - private: - double start_; - double finish_; - double seconds_; - int done_; - int next_report_; - int64_t bytes_; - double last_op_finish_; - Histogram hist_; - std::string message_; - - public: - Stats() { Start(); } - - void Start() { - next_report_ = 100; - last_op_finish_ = start_; - hist_.Clear(); - done_ = 0; - bytes_ = 0; - seconds_ = 0; - start_ = static_cast<double>(Env::Default()->NowMicros()); - finish_ = start_; - message_.clear(); - } - - void Merge(const Stats& other) { - hist_.Merge(other.hist_); - done_ += other.done_; - bytes_ += other.bytes_; - seconds_ += other.seconds_; - if (other.start_ < start_) start_ = other.start_; - if (other.finish_ > finish_) finish_ = other.finish_; - - // Just keep the messages from one thread - if (message_.empty()) message_ = other.message_; - } - - void Stop() { - finish_ = static_cast<double>(Env::Default()->NowMicros()); - seconds_ = (finish_ - start_) * 1e-6; - } - - void AddMessage(Slice msg) { - AppendWithSpace(&message_, msg); - } - - void FinishedSingleOp() { - if (FLAGS_histogram) { - double now = static_cast<double>(Env::Default()->NowMicros()); - double micros = now - last_op_finish_; - hist_.Add(micros); - if (micros > 20000) { - fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); - fflush(stderr); - } - last_op_finish_ = now; - } - - done_++; - if (done_ >= next_report_) { - if (next_report_ < 1000) next_report_ += 100; - else if (next_report_ < 5000) next_report_ += 500; - else if (next_report_ < 10000) next_report_ += 1000; - else if (next_report_ < 50000) next_report_ += 5000; - else if (next_report_ < 100000) next_report_ += 10000; - else if (next_report_ < 500000) next_report_ += 50000; - else next_report_ += 100000; - fprintf(stderr, "... finished %d ops%30s\r", done_, ""); - fflush(stderr); - } - } - - void AddBytes(int64_t n) { - bytes_ += n; - } - - void Report(const Slice& name) { - // Pretend at least one op was done in case we are running a benchmark - // that does not call FinishedSingleOp(). - if (done_ < 1) done_ = 1; - - std::string extra; - if (bytes_ > 0) { - // Rate is computed on actual elapsed time, not the sum of per-thread - // elapsed times. - double elapsed = (finish_ - start_) * 1e-6; - char rate[100]; - snprintf(rate, sizeof(rate), "%6.1f MB/s", - (bytes_ / 1048576.0) / elapsed); - extra = rate; - } - AppendWithSpace(&extra, message_); - - fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", - name.ToString().c_str(), - seconds_ * 1e6 / done_, - (extra.empty() ? "" : " "), - extra.c_str()); - if (FLAGS_histogram) { - fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); - } - fflush(stdout); - } -}; - -// State shared by all concurrent executions of the same benchmark. -struct SharedState { - port::Mutex mu; - port::CondVar cv; - int total; - - // Each thread goes through the following states: - // (1) initializing - // (2) waiting for others to be initialized - // (3) running - // (4) done - - int num_initialized; - int num_done; - bool start; - - SharedState() : cv(&mu) { } -}; - -// Per-thread state for concurrent executions of the same benchmark. -struct ThreadState { - int tid; // 0..n-1 when running in n threads - Random rand; // Has different seeds for different threads - Stats stats; - SharedState* shared; - - ThreadState(int index) - : tid(index), - rand(1000 + index) { - } -}; - -} // namespace - -class Benchmark { - private: - Cache* cache_; - const FilterPolicy* filter_policy_; - DB* db_; - int num_; - int value_size_; - int entries_per_batch_; - WriteOptions write_options_; - int reads_; - int heap_counter_; - - void PrintHeader() { - const int kKeySize = 16; - PrintEnvironment(); - fprintf(stdout, "Keys: %d bytes each\n", kKeySize); - fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n", - FLAGS_value_size, - static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); - fprintf(stdout, "Entries: %d\n", num_); - fprintf(stdout, "RawSize: %.1f MB (estimated)\n", - ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_) - / 1048576.0)); - fprintf(stdout, "FileSize: %.1f MB (estimated)\n", - (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) - / 1048576.0)); - PrintWarnings(); - fprintf(stdout, "------------------------------------------------\n"); - } - - void PrintWarnings() { -#if defined(__GNUC__) && !defined(__OPTIMIZE__) - fprintf(stdout, - "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n" - ); -#endif -#ifndef NDEBUG - fprintf(stdout, - "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); -#endif - - // See if snappy is working by attempting to compress a compressible string - const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"; - std::string compressed; - if (!port::Snappy_Compress(text, sizeof(text), &compressed)) { - fprintf(stdout, "WARNING: Snappy compression is not enabled\n"); - } else if (compressed.size() >= sizeof(text)) { - fprintf(stdout, "WARNING: Snappy compression is not effective\n"); - } - } - - void PrintEnvironment() { - fprintf(stderr, "LevelDB: version %d.%d\n", - kMajorVersion, kMinorVersion); - -#if defined(__linux) - time_t now = time(NULL); - fprintf(stderr, "Date: %s", ctime(&now)); // ctime() adds newline - - FILE* cpuinfo = fopen("/proc/cpuinfo", "r"); - if (cpuinfo != NULL) { - char line[1000]; - int num_cpus = 0; - std::string cpu_type; - std::string cache_size; - while (fgets(line, sizeof(line), cpuinfo) != NULL) { - const char* sep = strchr(line, ':'); - if (sep == NULL) { - continue; - } - Slice key = TrimSpace(Slice(line, sep - 1 - line)); - Slice val = TrimSpace(Slice(sep + 1)); - if (key == "model name") { - ++num_cpus; - cpu_type = val.ToString(); - } else if (key == "cache size") { - cache_size = val.ToString(); - } - } - fclose(cpuinfo); - fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str()); - fprintf(stderr, "CPUCache: %s\n", cache_size.c_str()); - } -#endif - } - - public: - Benchmark() - : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL), - filter_policy_(FLAGS_bloom_bits >= 0 - ? NewBloomFilterPolicy(FLAGS_bloom_bits) - : NULL), - db_(NULL), - num_(FLAGS_num), - value_size_(FLAGS_value_size), - entries_per_batch_(1), - reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), - heap_counter_(0) { - std::vector<std::string> files; - Env::Default()->GetChildren(FLAGS_db, &files); - for (size_t i = 0; i < files.size(); i++) { - if (Slice(files[i]).starts_with("heap-")) { - Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]); - } - } - if (!FLAGS_use_existing_db) { - DestroyDB(FLAGS_db, Options()); - } - } - - ~Benchmark() { - delete db_; - delete cache_; - delete filter_policy_; - } - - void Run() { - PrintHeader(); - Open(); - - const char* benchmarks = FLAGS_benchmarks; - while (benchmarks != NULL) { - const char* sep = strchr(benchmarks, ','); - Slice name; - if (sep == NULL) { - name = benchmarks; - benchmarks = NULL; - } else { - name = Slice(benchmarks, sep - benchmarks); - benchmarks = sep + 1; - } - - // Reset parameters that may be overridden below - num_ = FLAGS_num; - reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads); - value_size_ = FLAGS_value_size; - entries_per_batch_ = 1; - write_options_ = WriteOptions(); - - void (Benchmark::*method)(ThreadState*) = NULL; - bool fresh_db = false; - int num_threads = FLAGS_threads; - - if (name == Slice("fillseq")) { - fresh_db = true; - method = &Benchmark::WriteSeq; - } else if (name == Slice("fillbatch")) { - fresh_db = true; - entries_per_batch_ = 1000; - method = &Benchmark::WriteSeq; - } else if (name == Slice("fillrandom")) { - fresh_db = true; - method = &Benchmark::WriteRandom; - } else if (name == Slice("overwrite")) { - fresh_db = false; - method = &Benchmark::WriteRandom; - } else if (name == Slice("fillsync")) { - fresh_db = true; - num_ /= 1000; - write_options_.sync = true; - method = &Benchmark::WriteRandom; - } else if (name == Slice("fill100K")) { - fresh_db = true; - num_ /= 1000; - value_size_ = 100 * 1000; - method = &Benchmark::WriteRandom; - } else if (name == Slice("readseq")) { - method = &Benchmark::ReadSequential; - } else if (name == Slice("readreverse")) { - method = &Benchmark::ReadReverse; - } else if (name == Slice("readrandom")) { - method = &Benchmark::ReadRandom; - } else if (name == Slice("readmissing")) { - method = &Benchmark::ReadMissing; - } else if (name == Slice("seekrandom")) { - method = &Benchmark::SeekRandom; - } else if (name == Slice("readhot")) { - method = &Benchmark::ReadHot; - } else if (name == Slice("readrandomsmall")) { - reads_ /= 1000; - method = &Benchmark::ReadRandom; - } else if (name == Slice("deleteseq")) { - method = &Benchmark::DeleteSeq; - } else if (name == Slice("deleterandom")) { - method = &Benchmark::DeleteRandom; - } else if (name == Slice("readwhilewriting")) { - num_threads++; // Add extra thread for writing - method = &Benchmark::ReadWhileWriting; - } else if (name == Slice("compact")) { - method = &Benchmark::Compact; - } else if (name == Slice("crc32c")) { - method = &Benchmark::Crc32c; - } else if (name == Slice("acquireload")) { - method = &Benchmark::AcquireLoad; - } else if (name == Slice("snappycomp")) { - method = &Benchmark::SnappyCompress; - } else if (name == Slice("snappyuncomp")) { - method = &Benchmark::SnappyUncompress; - } else if (name == Slice("heapprofile")) { - HeapProfile(); - } else if (name == Slice("stats")) { - PrintStats("leveldb.stats"); - } else if (name == Slice("sstables")) { - PrintStats("leveldb.sstables"); - } else { - if (name != Slice()) { // No error message for empty name - fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); - } - } - - if (fresh_db) { - if (FLAGS_use_existing_db) { - fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n", - name.ToString().c_str()); - method = NULL; - } else { - delete db_; - db_ = NULL; - DestroyDB(FLAGS_db, Options()); - Open(); - } - } - - if (method != NULL) { - RunBenchmark(num_threads, name, method); - } - } - } - - private: - struct ThreadArg { - Benchmark* bm; - SharedState* shared; - ThreadState* thread; - void (Benchmark::*method)(ThreadState*); - }; - - static void ThreadBody(void* v) { - ThreadArg* arg = reinterpret_cast<ThreadArg*>(v); - SharedState* shared = arg->shared; - ThreadState* thread = arg->thread; - { - MutexLock l(&shared->mu); - shared->num_initialized++; - if (shared->num_initialized >= shared->total) { - shared->cv.SignalAll(); - } - while (!shared->start) { - shared->cv.Wait(); - } - } - - thread->stats.Start(); - (arg->bm->*(arg->method))(thread); - thread->stats.Stop(); - - { - MutexLock l(&shared->mu); - shared->num_done++; - if (shared->num_done >= shared->total) { - shared->cv.SignalAll(); - } - } - } - - void RunBenchmark(int n, Slice name, - void (Benchmark::*method)(ThreadState*)) { - SharedState shared; - shared.total = n; - shared.num_initialized = 0; - shared.num_done = 0; - shared.start = false; - - ThreadArg* arg = new ThreadArg[n]; - for (int i = 0; i < n; i++) { - arg[i].bm = this; - arg[i].method = method; - arg[i].shared = &shared; - arg[i].thread = new ThreadState(i); - arg[i].thread->shared = &shared; - std::thread t(std::bind(&ThreadBody, &arg[i])); - t.detach(); - } - - shared.mu.Lock(); - while (shared.num_initialized < n) { - shared.cv.Wait(); - } - - shared.start = true; - shared.cv.SignalAll(); - while (shared.num_done < n) { - shared.cv.Wait(); - } - shared.mu.Unlock(); - - for (int i = 1; i < n; i++) { - arg[0].thread->stats.Merge(arg[i].thread->stats); - } - arg[0].thread->stats.Report(name); - - for (int i = 0; i < n; i++) { - delete arg[i].thread; - } - delete[] arg; - } - - void Crc32c(ThreadState* thread) { - // Checksum about 500MB of data total - const int size = 4096; - const char* label = "(4K per op)"; - std::string data(size, 'x'); - int64_t bytes = 0; - uint32_t crc = 0; - while (bytes < 500 * 1048576) { - crc = crc32c::Value(data.data(), size); - thread->stats.FinishedSingleOp(); - bytes += size; - } - // Print so result is not dead - fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc)); - - thread->stats.AddBytes(bytes); - thread->stats.AddMessage(label); - } - - void AcquireLoad(ThreadState* thread) { - int dummy; - port::AtomicPointer ap(&dummy); - int count = 0; - void *ptr = NULL; - thread->stats.AddMessage("(each op is 1000 loads)"); - while (count < 100000) { - for (int i = 0; i < 1000; i++) { - ptr = ap.Acquire_Load(); - } - count++; - thread->stats.FinishedSingleOp(); - } - if (ptr == NULL) exit(1); // Disable unused variable warning. - } - - void SnappyCompress(ThreadState* thread) { - RandomGenerator gen; - Slice input = gen.Generate(Options().block_size); - int64_t bytes = 0; - int64_t produced = 0; - bool ok = true; - std::string compressed; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Compress(input.data(), input.size(), &compressed); - produced += compressed.size(); - bytes += input.size(); - thread->stats.FinishedSingleOp(); - } - - if (!ok) { - thread->stats.AddMessage("(snappy failure)"); - } else { - char buf[100]; - snprintf(buf, sizeof(buf), "(output: %.1f%%)", - (produced * 100.0) / bytes); - thread->stats.AddMessage(buf); - thread->stats.AddBytes(bytes); - } - } - - void SnappyUncompress(ThreadState* thread) { - RandomGenerator gen; - Slice input = gen.Generate(Options().block_size); - std::string compressed; - bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); - int64_t bytes = 0; - char* uncompressed = new char[input.size()]; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed); - bytes += input.size(); - thread->stats.FinishedSingleOp(); - } - delete[] uncompressed; - - if (!ok) { - thread->stats.AddMessage("(snappy failure)"); - } else { - thread->stats.AddBytes(bytes); - } - } - - void Open() { - assert(db_ == NULL); - Options options; - options.create_if_missing = !FLAGS_use_existing_db; - options.block_cache = cache_; - options.write_buffer_size = FLAGS_write_buffer_size; - options.max_open_files = FLAGS_open_files; - options.filter_policy = filter_policy_; - Status s = DB::Open(options, FLAGS_db, &db_); - if (!s.ok()) { - fprintf(stderr, "open error: %s\n", s.ToString().c_str()); - exit(1); - } - } - - void WriteSeq(ThreadState* thread) { - DoWrite(thread, true); - } - - void WriteRandom(ThreadState* thread) { - DoWrite(thread, false); - } - - void DoWrite(ThreadState* thread, bool seq) { - if (num_ != FLAGS_num) { - char msg[100]; - snprintf(msg, sizeof(msg), "(%d ops)", num_); - thread->stats.AddMessage(msg); - } - - RandomGenerator gen; - WriteBatch batch; - Status s; - int64_t bytes = 0; - for (int i = 0; i < num_; i += entries_per_batch_) { - batch.Clear(); - for (int j = 0; j < entries_per_batch_; j++) { - const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num); - char key[100]; - snprintf(key, sizeof(key), "%016d", k); - batch.Put(key, gen.Generate(value_size_)); - bytes += value_size_ + strlen(key); - thread->stats.FinishedSingleOp(); - } - s = db_->Write(write_options_, &batch); - if (!s.ok()) { - fprintf(stderr, "put error: %s\n", s.ToString().c_str()); - exit(1); - } - } - thread->stats.AddBytes(bytes); - } - - void ReadSequential(ThreadState* thread) { - Iterator* iter = db_->NewIterator(ReadOptions()); - int i = 0; - int64_t bytes = 0; - for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { - bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(); - ++i; - } - delete iter; - thread->stats.AddBytes(bytes); - } - - void ReadReverse(ThreadState* thread) { - Iterator* iter = db_->NewIterator(ReadOptions()); - int i = 0; - int64_t bytes = 0; - for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { - bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(); - ++i; - } - delete iter; - thread->stats.AddBytes(bytes); - } - - void ReadRandom(ThreadState* thread) { - ReadOptions options; - std::string value; - int found = 0; - for (int i = 0; i < reads_; i++) { - char key[100]; - const int k = thread->rand.Next() % FLAGS_num; - snprintf(key, sizeof(key), "%016d", k); - if (db_->Get(options, key, &value).ok()) { - found++; - } - thread->stats.FinishedSingleOp(); - } - char msg[100]; - snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); - thread->stats.AddMessage(msg); - } - - void ReadMissing(ThreadState* thread) { - ReadOptions options; - std::string value; - for (int i = 0; i < reads_; i++) { - char key[100]; - const int k = thread->rand.Next() % FLAGS_num; - snprintf(key, sizeof(key), "%016d.", k); - db_->Get(options, key, &value); - thread->stats.FinishedSingleOp(); - } - } - - void ReadHot(ThreadState* thread) { - ReadOptions options; - std::string value; - const int range = (FLAGS_num + 99) / 100; - for (int i = 0; i < reads_; i++) { - char key[100]; - const int k = thread->rand.Next() % range; - snprintf(key, sizeof(key), "%016d", k); - db_->Get(options, key, &value); - thread->stats.FinishedSingleOp(); - } - } - - void SeekRandom(ThreadState* thread) { - ReadOptions options; - int found = 0; - for (int i = 0; i < reads_; i++) { - Iterator* iter = db_->NewIterator(options); - char key[100]; - const int k = thread->rand.Next() % FLAGS_num; - snprintf(key, sizeof(key), "%016d", k); - iter->Seek(key); - if (iter->Valid() && iter->key() == key) found++; - delete iter; - thread->stats.FinishedSingleOp(); - } - char msg[100]; - snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); - thread->stats.AddMessage(msg); - } - - void DoDelete(ThreadState* thread, bool seq) { - RandomGenerator gen; - WriteBatch batch; - Status s; - for (int i = 0; i < num_; i += entries_per_batch_) { - batch.Clear(); - for (int j = 0; j < entries_per_batch_; j++) { - const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num); - char key[100]; - snprintf(key, sizeof(key), "%016d", k); - batch.Delete(key); - thread->stats.FinishedSingleOp(); - } - s = db_->Write(write_options_, &batch); - if (!s.ok()) { - fprintf(stderr, "del error: %s\n", s.ToString().c_str()); - exit(1); - } - } - } - - void DeleteSeq(ThreadState* thread) { - DoDelete(thread, true); - } - - void DeleteRandom(ThreadState* thread) { - DoDelete(thread, false); - } - - void ReadWhileWriting(ThreadState* thread) { - if (thread->tid > 0) { - ReadRandom(thread); - } else { - // Special thread that keeps writing until other threads are done. - RandomGenerator gen; - while (true) { - { - MutexLock l(&thread->shared->mu); - if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { - // Other threads have finished - break; - } - } - - const int k = thread->rand.Next() % FLAGS_num; - char key[100]; - snprintf(key, sizeof(key), "%016d", k); - Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); - if (!s.ok()) { - fprintf(stderr, "put error: %s\n", s.ToString().c_str()); - exit(1); - } - } - - // Do not count any of the preceding work/delay in stats. - thread->stats.Start(); - } - } - - void Compact(ThreadState* thread) { - db_->CompactRange(NULL, NULL); - } - - void PrintStats(const char* key) { - std::string stats; - if (!db_->GetProperty(key, &stats)) { - stats = "(failed)"; - } - fprintf(stdout, "\n%s\n", stats.c_str()); - } - - static void WriteToFile(void* arg, const char* buf, int n) { - reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n)); - } - - void HeapProfile() { - char fname[100]; - snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_); - WritableFile* file; - Status s = Env::Default()->NewWritableFile(fname, &file); - if (!s.ok()) { - fprintf(stderr, "%s\n", s.ToString().c_str()); - return; - } - bool ok = port::GetHeapProfile(WriteToFile, file); - delete file; - if (!ok) { - fprintf(stderr, "heap profiling not supported\n"); - Env::Default()->DeleteFile(fname); - } - } -}; - -} // namespace leveldb - -int main(int argc, char** argv) { - FLAGS_write_buffer_size = static_cast<int>(leveldb::Options().write_buffer_size); - FLAGS_open_files = leveldb::Options().max_open_files; - std::string default_db_path; - - for (int i = 1; i < argc; i++) { - double d; - int n; - char junk; - if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) { - FLAGS_benchmarks = argv[i] + strlen("--benchmarks="); - } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) { - FLAGS_compression_ratio = d; - } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 && - (n == 0 || n == 1)) { - FLAGS_histogram = n; - } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 && - (n == 0 || n == 1)) { - FLAGS_use_existing_db = n; - } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) { - FLAGS_num = n; - } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { - FLAGS_reads = n; - } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { - FLAGS_threads = n; - } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) { - FLAGS_value_size = n; - } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { - FLAGS_write_buffer_size = n; - } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { - FLAGS_cache_size = n; - } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { - FLAGS_bloom_bits = n; - } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { - FLAGS_open_files = n; - } else if (strncmp(argv[i], "--db=", 5) == 0) { - FLAGS_db = argv[i] + 5; - } else { - fprintf(stderr, "Invalid flag '%s'\n", argv[i]); - exit(1); - } - } - - // Choose a location for the test database if none given with --db=<path> - if (FLAGS_db == NULL) { - leveldb::Env::Default()->GetTestDirectory(&default_db_path); - default_db_path += "/dbbench"; - FLAGS_db = default_db_path.c_str(); - } - - leveldb::Benchmark benchmark; - benchmark.Run(); - return 0; -} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/leveldb-1.18/db/db_impl.cc ---------------------------------------------------------------------- diff --git a/thirdparty/leveldb-1.18/db/db_impl.cc b/thirdparty/leveldb-1.18/db/db_impl.cc deleted file mode 100755 index a9a4db7..0000000 --- a/thirdparty/leveldb-1.18/db/db_impl.cc +++ /dev/null @@ -1,1529 +0,0 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#include "db/db_impl.h" - -#include <algorithm> -#include <set> -#include <string> -#include <stdint.h> -#include <stdio.h> -#include <vector> -#include "db/builder.h" -#include "db/db_iter.h" -#include "db/dbformat.h" -#include "db/filename.h" -#include "db/log_reader.h" -#include "db/log_writer.h" -#include "db/memtable.h" -#include "db/table_cache.h" -#include "db/version_set.h" -#include "db/write_batch_internal.h" -#include "leveldb/db.h" -#include "leveldb/env.h" -#include "leveldb/status.h" -#include "leveldb/table.h" -#include "leveldb/table_builder.h" -#include "port/port.h" -#include "table/block.h" -#include "table/merger.h" -#include "table/two_level_iterator.h" -#include "util/coding.h" -#include "util/logging.h" -#include "util/mutexlock.h" - -namespace leveldb { - -const int kNumNonTableCacheFiles = 10; - -// Information kept for every waiting writer -struct DBImpl::Writer { - Status status; - WriteBatch* batch; - bool sync; - bool done; - port::CondVar cv; - - explicit Writer(port::Mutex* mu) : cv(mu) { } -}; - -struct DBImpl::CompactionState { - Compaction* const compaction; - - // Sequence numbers < smallest_snapshot are not significant since we - // will never have to service a snapshot below smallest_snapshot. - // Therefore if we have seen a sequence number S <= smallest_snapshot, - // we can drop all entries for the same key with sequence numbers < S. - SequenceNumber smallest_snapshot; - - // Files produced by compaction - struct Output { - uint64_t number; - uint64_t file_size; - InternalKey smallest, largest; - }; - std::vector<Output> outputs; - - // State kept for output being generated - WritableFile* outfile; - TableBuilder* builder; - - uint64_t total_bytes; - - Output* current_output() { return &outputs[outputs.size()-1]; } - - explicit CompactionState(Compaction* c) - : compaction(c), - outfile(NULL), - builder(NULL), - total_bytes(0) { - } -}; - -// Fix user-supplied options to be reasonable -template <class T,class V> -static void ClipToRange(T* ptr, V minvalue, V maxvalue) { - if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; - if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; -} -Options SanitizeOptions(const std::string& dbname, - const InternalKeyComparator* icmp, - const InternalFilterPolicy* ipolicy, - const Options& src) { - Options result = src; - result.comparator = icmp; - result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL; - ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); - ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); - ClipToRange(&result.block_size, 1<<10, 4<<20); - if (result.info_log == NULL) { - // Open a log file in the same directory as the db - src.env->CreateDir(dbname); // In case it does not exist - src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); - Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); - if (!s.ok()) { - // No place suitable for logging - result.info_log = NULL; - } - } - if (result.block_cache == NULL) { - result.block_cache = NewLRUCache(8 << 20); - } - return result; -} - -DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) - : env_(raw_options.env), - internal_comparator_(raw_options.comparator), - internal_filter_policy_(raw_options.filter_policy), - options_(SanitizeOptions(dbname, &internal_comparator_, - &internal_filter_policy_, raw_options)), - owns_info_log_(options_.info_log != raw_options.info_log), - owns_cache_(options_.block_cache != raw_options.block_cache), - dbname_(dbname), - db_lock_(NULL), - shutting_down_(NULL), - bg_cv_(&mutex_), - mem_(new MemTable(internal_comparator_)), - imm_(NULL), - logfile_(NULL), - logfile_number_(0), - log_(NULL), - seed_(0), - tmp_batch_(new WriteBatch), - bg_compaction_scheduled_(false), - manual_compaction_(NULL) { - mem_->Ref(); - has_imm_.Release_Store(NULL); - - // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles; - table_cache_ = new TableCache(dbname_, &options_, table_cache_size); - - versions_ = new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_); -} - -DBImpl::~DBImpl() { - // Wait for background work to finish - mutex_.Lock(); - shutting_down_.Release_Store(this); // Any non-NULL value is ok - while (bg_compaction_scheduled_) { - bg_cv_.Wait(); - } - mutex_.Unlock(); - - if (db_lock_ != NULL) { - env_->UnlockFile(db_lock_); - } - - delete versions_; - if (mem_ != NULL) mem_->Unref(); - if (imm_ != NULL) imm_->Unref(); - delete tmp_batch_; - delete log_; - delete logfile_; - delete table_cache_; - - if (owns_info_log_) { - delete options_.info_log; - } - if (owns_cache_) { - delete options_.block_cache; - } -} - -Status DBImpl::NewDB() { - VersionEdit new_db; - new_db.SetComparatorName(user_comparator()->Name()); - new_db.SetLogNumber(0); - new_db.SetNextFile(2); - new_db.SetLastSequence(0); - - const std::string manifest = DescriptorFileName(dbname_, 1); - WritableFile* file; - Status s = env_->NewWritableFile(manifest, &file); - if (!s.ok()) { - return s; - } - { - log::Writer log(file); - std::string record; - new_db.EncodeTo(&record); - s = log.AddRecord(record); - if (s.ok()) { - s = file->Close(); - } - } - delete file; - if (s.ok()) { - // Make "CURRENT" file that points to the new manifest file. - s = SetCurrentFile(env_, dbname_, 1); - } else { - env_->DeleteFile(manifest); - } - return s; -} - -void DBImpl::MaybeIgnoreError(Status* s) const { - if (s->ok() || options_.paranoid_checks) { - // No change needed - } else { - Log(options_.info_log, "Ignoring error %s", s->ToString().c_str()); - *s = Status::OK(); - } -} - -void DBImpl::DeleteObsoleteFiles() { - if (!bg_error_.ok()) { - // After a background error, we don't know whether a new version may - // or may not have been committed, so we cannot safely garbage collect. - return; - } - - // Make a set of all of the live files - std::set<uint64_t> live = pending_outputs_; - versions_->AddLiveFiles(&live); - - std::vector<std::string> filenames; - env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose - uint64_t number; - FileType type; - for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type)) { - bool keep = true; - switch (type) { - case kLogFile: - keep = ((number >= versions_->LogNumber()) || - (number == versions_->PrevLogNumber())); - break; - case kDescriptorFile: - // Keep my manifest file, and any newer incarnations' - // (in case there is a race that allows other incarnations) - keep = (number >= versions_->ManifestFileNumber()); - break; - case kTableFile: - keep = (live.find(number) != live.end()); - break; - case kTempFile: - // Any temp files that are currently being written to must - // be recorded in pending_outputs_, which is inserted into "live" - keep = (live.find(number) != live.end()); - break; - case kCurrentFile: - case kDBLockFile: - case kInfoLogFile: - keep = true; - break; - } - - if (!keep) { - if (type == kTableFile) { - table_cache_->Evict(number); - } - Log(options_.info_log, "Delete type=%d #%lld\n", - int(type), - static_cast<unsigned long long>(number)); - env_->DeleteFile(dbname_ + "/" + filenames[i]); - } - } - } -} - -Status DBImpl::Recover(VersionEdit* edit) { - mutex_.AssertHeld(); - - // Ignore error from CreateDir since the creation of the DB is - // committed only when the descriptor is created, and this directory - // may already exist from a previous failed creation attempt. - env_->CreateDir(dbname_); - assert(db_lock_ == NULL); - Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); - if (!s.ok()) { - return s; - } - - if (!env_->FileExists(CurrentFileName(dbname_))) { - if (options_.create_if_missing) { - s = NewDB(); - if (!s.ok()) { - return s; - } - } else { - return Status::InvalidArgument( - dbname_, "does not exist (create_if_missing is false)"); - } - } else { - if (options_.error_if_exists) { - return Status::InvalidArgument( - dbname_, "exists (error_if_exists is true)"); - } - } - - s = versions_->Recover(); - if (s.ok()) { - SequenceNumber max_sequence(0); - - // Recover from all newer log files than the ones named in the - // descriptor (new log files may have been added by the previous - // incarnation without registering them in the descriptor). - // - // Note that PrevLogNumber() is no longer used, but we pay - // attention to it in case we are recovering a database - // produced by an older version of leveldb. - const uint64_t min_log = versions_->LogNumber(); - const uint64_t prev_log = versions_->PrevLogNumber(); - std::vector<std::string> filenames; - s = env_->GetChildren(dbname_, &filenames); - if (!s.ok()) { - return s; - } - std::set<uint64_t> expected; - versions_->AddLiveFiles(&expected); - uint64_t number; - FileType type; - std::vector<uint64_t> logs; - for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type)) { - expected.erase(number); - if (type == kLogFile && ((number >= min_log) || (number == prev_log))) - logs.push_back(number); - } - } - if (!expected.empty()) { - char buf[50]; - snprintf(buf, sizeof(buf), "%d missing files; e.g.", - static_cast<int>(expected.size())); - return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); - } - - // Recover in the order in which the logs were generated - std::sort(logs.begin(), logs.end()); - for (size_t i = 0; i < logs.size(); i++) { - s = RecoverLogFile(logs[i], edit, &max_sequence); - - // The previous incarnation may not have written any MANIFEST - // records after allocating this log number. So we manually - // update the file number allocation counter in VersionSet. - versions_->MarkFileNumberUsed(logs[i]); - } - - if (s.ok()) { - if (versions_->LastSequence() < max_sequence) { - versions_->SetLastSequence(max_sequence); - } - } - } - - return s; -} - -Status DBImpl::RecoverLogFile(uint64_t log_number, - VersionEdit* edit, - SequenceNumber* max_sequence) { - struct LogReporter : public log::Reader::Reporter { - Env* env; - Logger* info_log; - const char* fname; - Status* status; // NULL if options_.paranoid_checks==false - virtual void Corruption(size_t bytes, const Status& s) { - Log(info_log, "%s%s: dropping %d bytes; %s", - (this->status == NULL ? "(ignoring error) " : ""), - fname, static_cast<int>(bytes), s.ToString().c_str()); - if (this->status != NULL && this->status->ok()) *this->status = s; - } - }; - - mutex_.AssertHeld(); - - // Open the log file - std::string fname = LogFileName(dbname_, log_number); - SequentialFile* file; - Status status = env_->NewSequentialFile(fname, &file); - if (!status.ok()) { - MaybeIgnoreError(&status); - return status; - } - - // Create the log reader. - LogReporter reporter; - reporter.env = env_; - reporter.info_log = options_.info_log; - reporter.fname = fname.c_str(); - reporter.status = (options_.paranoid_checks ? &status : NULL); - // We intentionally make log::Reader do checksumming even if - // paranoid_checks==false so that corruptions cause entire commits - // to be skipped instead of propagating bad information (like overly - // large sequence numbers). - log::Reader reader(file, &reporter, true/*checksum*/, - 0/*initial_offset*/); - Log(options_.info_log, "Recovering log #%llu", - (unsigned long long) log_number); - - // Read all the records and add to a memtable - std::string scratch; - Slice record; - WriteBatch batch; - MemTable* mem = NULL; - while (reader.ReadRecord(&record, &scratch) && - status.ok()) { - if (record.size() < 12) { - reporter.Corruption( - record.size(), Status::Corruption("log record too small")); - continue; - } - WriteBatchInternal::SetContents(&batch, record); - - if (mem == NULL) { - mem = new MemTable(internal_comparator_); - mem->Ref(); - } - status = WriteBatchInternal::InsertInto(&batch, mem); - MaybeIgnoreError(&status); - if (!status.ok()) { - break; - } - const SequenceNumber last_seq = - WriteBatchInternal::Sequence(&batch) + - WriteBatchInternal::Count(&batch) - 1; - if (last_seq > *max_sequence) { - *max_sequence = last_seq; - } - - if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { - status = WriteLevel0Table(mem, edit, NULL); - if (!status.ok()) { - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - break; - } - mem->Unref(); - mem = NULL; - } - } - - if (status.ok() && mem != NULL) { - status = WriteLevel0Table(mem, edit, NULL); - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - } - - if (mem != NULL) mem->Unref(); - delete file; - return status; -} - -Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, - Version* base) { - mutex_.AssertHeld(); - const uint64_t start_micros = env_->NowMicros(); - FileMetaData meta; - meta.number = versions_->NewFileNumber(); - pending_outputs_.insert(meta.number); - Iterator* iter = mem->NewIterator(); - Log(options_.info_log, "Level-0 table #%llu: started", - (unsigned long long) meta.number); - - Status s; - { - mutex_.Unlock(); - s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); - mutex_.Lock(); - } - - Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", - (unsigned long long) meta.number, - (unsigned long long) meta.file_size, - s.ToString().c_str()); - delete iter; - pending_outputs_.erase(meta.number); - - - // Note that if file_size is zero, the file has been deleted and - // should not be added to the manifest. - int level = 0; - if (s.ok() && meta.file_size > 0) { - const Slice min_user_key = meta.smallest.user_key(); - const Slice max_user_key = meta.largest.user_key(); - if (base != NULL) { - level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); - } - edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); - } - - CompactionStats stats; - stats.micros = env_->NowMicros() - start_micros; - stats.bytes_written = meta.file_size; - stats_[level].Add(stats); - return s; -} - -void DBImpl::CompactMemTable() { - mutex_.AssertHeld(); - assert(imm_ != NULL); - - // Save the contents of the memtable as a new Table - VersionEdit edit; - Version* base = versions_->current(); - base->Ref(); - Status s = WriteLevel0Table(imm_, &edit, base); - base->Unref(); - - if (s.ok() && shutting_down_.Acquire_Load()) { - s = Status::IOError("Deleting DB during memtable compaction"); - } - - // Replace immutable memtable with the generated Table - if (s.ok()) { - edit.SetPrevLogNumber(0); - edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed - s = versions_->LogAndApply(&edit, &mutex_); - } - - if (s.ok()) { - // Commit to the new state - imm_->Unref(); - imm_ = NULL; - has_imm_.Release_Store(NULL); - DeleteObsoleteFiles(); - } else { - RecordBackgroundError(s); - } -} - -void DBImpl::CompactRange(const Slice* begin, const Slice* end) { - int max_level_with_files = 1; - { - MutexLock l(&mutex_); - Version* base = versions_->current(); - for (int level = 1; level < config::kNumLevels; level++) { - if (base->OverlapInLevel(level, begin, end)) { - max_level_with_files = level; - } - } - } - TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap - for (int level = 0; level < max_level_with_files; level++) { - TEST_CompactRange(level, begin, end); - } -} - -void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { - assert(level >= 0); - assert(level + 1 < config::kNumLevels); - - InternalKey begin_storage, end_storage; - - ManualCompaction manual; - manual.level = level; - manual.done = false; - if (begin == NULL) { - manual.begin = NULL; - } else { - begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); - manual.begin = &begin_storage; - } - if (end == NULL) { - manual.end = NULL; - } else { - end_storage = InternalKey(*end, 0, static_cast<ValueType>(0)); - manual.end = &end_storage; - } - - MutexLock l(&mutex_); - while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { - if (manual_compaction_ == NULL) { // Idle - manual_compaction_ = &manual; - MaybeScheduleCompaction(); - } else { // Running either my compaction or another compaction. - bg_cv_.Wait(); - } - } - if (manual_compaction_ == &manual) { - // Cancel my manual compaction since we aborted early for some reason. - manual_compaction_ = NULL; - } -} - -Status DBImpl::TEST_CompactMemTable() { - // NULL batch means just wait for earlier writes to be done - Status s = Write(WriteOptions(), NULL); - if (s.ok()) { - // Wait until the compaction completes - MutexLock l(&mutex_); - while (imm_ != NULL && bg_error_.ok()) { - bg_cv_.Wait(); - } - if (imm_ != NULL) { - s = bg_error_; - } - } - return s; -} - -void DBImpl::RecordBackgroundError(const Status& s) { - mutex_.AssertHeld(); - if (bg_error_.ok()) { - bg_error_ = s; - bg_cv_.SignalAll(); - } -} - -void DBImpl::MaybeScheduleCompaction() { - mutex_.AssertHeld(); - if (bg_compaction_scheduled_) { - // Already scheduled - } else if (shutting_down_.Acquire_Load()) { - // DB is being deleted; no more background compactions - } else if (!bg_error_.ok()) { - // Already got an error; no more changes - } else if (imm_ == NULL && - manual_compaction_ == NULL && - !versions_->NeedsCompaction()) { - // No work to be done - } else { - bg_compaction_scheduled_ = true; - env_->Schedule(&DBImpl::BGWork, this); - } -} - -void DBImpl::BGWork(void* db) { - reinterpret_cast<DBImpl*>(db)->BackgroundCall(); -} - -void DBImpl::BackgroundCall() { - MutexLock l(&mutex_); - assert(bg_compaction_scheduled_); - if (shutting_down_.Acquire_Load()) { - // No more background work when shutting down. - } else if (!bg_error_.ok()) { - // No more background work after a background error. - } else { - BackgroundCompaction(); - } - - bg_compaction_scheduled_ = false; - - // Previous compaction may have produced too many files in a level, - // so reschedule another compaction if needed. - MaybeScheduleCompaction(); - bg_cv_.SignalAll(); -} - -void DBImpl::BackgroundCompaction() { - mutex_.AssertHeld(); - - if (imm_ != NULL) { - CompactMemTable(); - return; - } - - Compaction* c; - bool is_manual = (manual_compaction_ != NULL); - InternalKey manual_end; - if (is_manual) { - ManualCompaction* m = manual_compaction_; - c = versions_->CompactRange(m->level, m->begin, m->end); - m->done = (c == NULL); - if (c != NULL) { - manual_end = c->input(0, c->num_input_files(0) - 1)->largest; - } - Log(options_.info_log, - "Manual compaction at level-%d from %s .. %s; will stop at %s\n", - m->level, - (m->begin ? m->begin->DebugString().c_str() : "(begin)"), - (m->end ? m->end->DebugString().c_str() : "(end)"), - (m->done ? "(end)" : manual_end.DebugString().c_str())); - } else { - c = versions_->PickCompaction(); - } - - Status status; - if (c == NULL) { - // Nothing to do - } else if (!is_manual && c->IsTrivialMove()) { - // Move file to next level - assert(c->num_input_files(0) == 1); - FileMetaData* f = c->input(0, 0); - c->edit()->DeleteFile(c->level(), f->number); - c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); - status = versions_->LogAndApply(c->edit(), &mutex_); - if (!status.ok()) { - RecordBackgroundError(status); - } - VersionSet::LevelSummaryStorage tmp; - Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", - static_cast<unsigned long long>(f->number), - c->level() + 1, - static_cast<unsigned long long>(f->file_size), - status.ToString().c_str(), - versions_->LevelSummary(&tmp)); - } else { - CompactionState* compact = new CompactionState(c); - status = DoCompactionWork(compact); - if (!status.ok()) { - RecordBackgroundError(status); - } - CleanupCompaction(compact); - c->ReleaseInputs(); - DeleteObsoleteFiles(); - } - delete c; - - if (status.ok()) { - // Done - } else if (shutting_down_.Acquire_Load()) { - // Ignore compaction errors found during shutting down - } else { - Log(options_.info_log, - "Compaction error: %s", status.ToString().c_str()); - } - - if (is_manual) { - ManualCompaction* m = manual_compaction_; - if (!status.ok()) { - m->done = true; - } - if (!m->done) { - // We only compacted part of the requested range. Update *m - // to the range that is left to be compacted. - m->tmp_storage = manual_end; - m->begin = &m->tmp_storage; - } - manual_compaction_ = NULL; - } -} - -void DBImpl::CleanupCompaction(CompactionState* compact) { - mutex_.AssertHeld(); - if (compact->builder != NULL) { - // May happen if we get a shutdown call in the middle of compaction - compact->builder->Abandon(); - delete compact->builder; - } else { - assert(compact->outfile == NULL); - } - delete compact->outfile; - for (size_t i = 0; i < compact->outputs.size(); i++) { - const CompactionState::Output& out = compact->outputs[i]; - pending_outputs_.erase(out.number); - } - delete compact; -} - -Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { - assert(compact != NULL); - assert(compact->builder == NULL); - uint64_t file_number; - { - mutex_.Lock(); - file_number = versions_->NewFileNumber(); - pending_outputs_.insert(file_number); - CompactionState::Output out; - out.number = file_number; - out.smallest.Clear(); - out.largest.Clear(); - compact->outputs.push_back(out); - mutex_.Unlock(); - } - - // Make the output file - std::string fname = TableFileName(dbname_, file_number); - Status s = env_->NewWritableFile(fname, &compact->outfile); - if (s.ok()) { - compact->builder = new TableBuilder(options_, compact->outfile); - } - return s; -} - -Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, - Iterator* input) { - assert(compact != NULL); - assert(compact->outfile != NULL); - assert(compact->builder != NULL); - - const uint64_t output_number = compact->current_output()->number; - assert(output_number != 0); - - // Check for iterator errors - Status s = input->status(); - const uint64_t current_entries = compact->builder->NumEntries(); - if (s.ok()) { - s = compact->builder->Finish(); - } else { - compact->builder->Abandon(); - } - const uint64_t current_bytes = compact->builder->FileSize(); - compact->current_output()->file_size = current_bytes; - compact->total_bytes += current_bytes; - delete compact->builder; - compact->builder = NULL; - - // Finish and check for file errors - if (s.ok()) { - s = compact->outfile->Sync(); - } - if (s.ok()) { - s = compact->outfile->Close(); - } - delete compact->outfile; - compact->outfile = NULL; - - if (s.ok() && current_entries > 0) { - // Verify that the table is usable - Iterator* iter = table_cache_->NewIterator(ReadOptions(), - output_number, - current_bytes); - s = iter->status(); - delete iter; - if (s.ok()) { - Log(options_.info_log, - "Generated table #%llu: %lld keys, %lld bytes", - (unsigned long long) output_number, - (unsigned long long) current_entries, - (unsigned long long) current_bytes); - } - } - return s; -} - - -Status DBImpl::InstallCompactionResults(CompactionState* compact) { - mutex_.AssertHeld(); -#ifdef _WIN32 - Log(options_.info_log, "Compacted %Iu@%d + %Iu@%d files => %lld bytes", -#else - Log(options_.info_log, "Compacted %zu@%d + %zu@%d files => %lld bytes", -#endif - compact->compaction->num_input_files(0), - compact->compaction->level(), - compact->compaction->num_input_files(1), - compact->compaction->level() + 1, - static_cast<long long>(compact->total_bytes)); - - // Add compaction outputs - compact->compaction->AddInputDeletions(compact->compaction->edit()); - const int level = compact->compaction->level(); - for (size_t i = 0; i < compact->outputs.size(); i++) { - const CompactionState::Output& out = compact->outputs[i]; - compact->compaction->edit()->AddFile( - level + 1, - out.number, out.file_size, out.smallest, out.largest); - } - return versions_->LogAndApply(compact->compaction->edit(), &mutex_); -} - -Status DBImpl::DoCompactionWork(CompactionState* compact) { - const uint64_t start_micros = env_->NowMicros(); - int64_t imm_micros = 0; // Micros spent doing imm_ compactions - -#ifdef _WIN32 - Log(options_.info_log, "Compacting %Iu@%d + %Iu@%d files", -#else - Log(options_.info_log, "Compacting %zu@%d + %zu@%d files", -#endif - compact->compaction->num_input_files(0), - compact->compaction->level(), - compact->compaction->num_input_files(1), - compact->compaction->level() + 1); - - assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); - assert(compact->builder == NULL); - assert(compact->outfile == NULL); - if (snapshots_.empty()) { - compact->smallest_snapshot = versions_->LastSequence(); - } else { - compact->smallest_snapshot = snapshots_.oldest()->number_; - } - - // Release mutex while we're actually doing the compaction work - mutex_.Unlock(); - - Iterator* input = versions_->MakeInputIterator(compact->compaction); - input->SeekToFirst(); - Status status; - ParsedInternalKey ikey; - std::string current_user_key; - bool has_current_user_key = false; - SequenceNumber last_sequence_for_key = kMaxSequenceNumber; - for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { - // Prioritize immutable compaction work - if (has_imm_.NoBarrier_Load() != NULL) { - const uint64_t imm_start = env_->NowMicros(); - mutex_.Lock(); - if (imm_ != NULL) { - CompactMemTable(); - bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary - } - mutex_.Unlock(); - imm_micros += (env_->NowMicros() - imm_start); - } - - Slice key = input->key(); - if (compact->compaction->ShouldStopBefore(key) && - compact->builder != NULL) { - status = FinishCompactionOutputFile(compact, input); - if (!status.ok()) { - break; - } - } - - // Handle key/value, add to state, etc. - bool drop = false; - if (!ParseInternalKey(key, &ikey)) { - // Do not hide error keys - current_user_key.clear(); - has_current_user_key = false; - last_sequence_for_key = kMaxSequenceNumber; - } else { - if (!has_current_user_key || - user_comparator()->Compare(ikey.user_key, - Slice(current_user_key)) != 0) { - // First occurrence of this user key - current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); - has_current_user_key = true; - last_sequence_for_key = kMaxSequenceNumber; - } - - if (last_sequence_for_key <= compact->smallest_snapshot) { - // Hidden by an newer entry for same user key - drop = true; // (A) - } else if (ikey.type == kTypeDeletion && - ikey.sequence <= compact->smallest_snapshot && - compact->compaction->IsBaseLevelForKey(ikey.user_key)) { - // For this user key: - // (1) there is no data in higher levels - // (2) data in lower levels will have larger sequence numbers - // (3) data in layers that are being compacted here and have - // smaller sequence numbers will be dropped in the next - // few iterations of this loop (by rule (A) above). - // Therefore this deletion marker is obsolete and can be dropped. - drop = true; - } - - last_sequence_for_key = ikey.sequence; - } -#if 0 - Log(options_.info_log, - " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " - "%d smallest_snapshot: %d", - ikey.user_key.ToString().c_str(), - (int)ikey.sequence, ikey.type, kTypeValue, drop, - compact->compaction->IsBaseLevelForKey(ikey.user_key), - (int)last_sequence_for_key, (int)compact->smallest_snapshot); -#endif - - if (!drop) { - // Open output file if necessary - if (compact->builder == NULL) { - status = OpenCompactionOutputFile(compact); - if (!status.ok()) { - break; - } - } - if (compact->builder->NumEntries() == 0) { - compact->current_output()->smallest.DecodeFrom(key); - } - compact->current_output()->largest.DecodeFrom(key); - compact->builder->Add(key, input->value()); - - // Close output file if it is big enough - if (compact->builder->FileSize() >= - compact->compaction->MaxOutputFileSize()) { - status = FinishCompactionOutputFile(compact, input); - if (!status.ok()) { - break; - } - } - } - - input->Next(); - } - - if (status.ok() && shutting_down_.Acquire_Load()) { - status = Status::IOError("Deleting DB during compaction"); - } - if (status.ok() && compact->builder != NULL) { - status = FinishCompactionOutputFile(compact, input); - } - if (status.ok()) { - status = input->status(); - } - delete input; - input = NULL; - - CompactionStats stats; - stats.micros = env_->NowMicros() - start_micros - imm_micros; - for (int which = 0; which < 2; which++) { - for (size_t i = 0; i < compact->compaction->num_input_files(which); i++) { - stats.bytes_read += compact->compaction->input(which, i)->file_size; - } - } - for (size_t i = 0; i < compact->outputs.size(); i++) { - stats.bytes_written += compact->outputs[i].file_size; - } - - mutex_.Lock(); - stats_[compact->compaction->level() + 1].Add(stats); - - if (status.ok()) { - status = InstallCompactionResults(compact); - } - if (!status.ok()) { - RecordBackgroundError(status); - } - VersionSet::LevelSummaryStorage tmp; - Log(options_.info_log, - "compacted to: %s", versions_->LevelSummary(&tmp)); - return status; -} - -namespace { -struct IterState { - port::Mutex* mu; - Version* version; - MemTable* mem; - MemTable* imm; -}; - -static void CleanupIteratorState(void* arg1, void* arg2) { - IterState* state = reinterpret_cast<IterState*>(arg1); - state->mu->Lock(); - state->mem->Unref(); - if (state->imm != NULL) state->imm->Unref(); - state->version->Unref(); - state->mu->Unlock(); - delete state; -} -} // namespace - -Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, - SequenceNumber* latest_snapshot, - uint32_t* seed) { - IterState* cleanup = new IterState; - mutex_.Lock(); - *latest_snapshot = versions_->LastSequence(); - - // Collect together all needed child iterators - std::vector<Iterator*> list; - list.push_back(mem_->NewIterator()); - mem_->Ref(); - if (imm_ != NULL) { - list.push_back(imm_->NewIterator()); - imm_->Ref(); - } - versions_->current()->AddIterators(options, &list); - Iterator* internal_iter = - NewMergingIterator(&internal_comparator_, &list[0], list.size()); - versions_->current()->Ref(); - - cleanup->mu = &mutex_; - cleanup->mem = mem_; - cleanup->imm = imm_; - cleanup->version = versions_->current(); - internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); - - *seed = ++seed_; - mutex_.Unlock(); - return internal_iter; -} - -Iterator* DBImpl::TEST_NewInternalIterator() { - SequenceNumber ignored; - uint32_t ignored_seed; - return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); -} - -int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { - MutexLock l(&mutex_); - return versions_->MaxNextLevelOverlappingBytes(); -} - -Status DBImpl::Get(const ReadOptions& options, - const Slice& key, - std::string* value) { - Status s; - MutexLock l(&mutex_); - SequenceNumber snapshot; - if (options.snapshot != NULL) { - snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_; - } else { - snapshot = versions_->LastSequence(); - } - - MemTable* mem = mem_; - MemTable* imm = imm_; - Version* current = versions_->current(); - mem->Ref(); - if (imm != NULL) imm->Ref(); - current->Ref(); - - bool have_stat_update = false; - Version::GetStats stats; - - // Unlock while reading from files and memtables - { - mutex_.Unlock(); - // First look in the memtable, then in the immutable memtable (if any). - LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s)) { - // Done - } else if (imm != NULL && imm->Get(lkey, value, &s)) { - // Done - } else { - s = current->Get(options, lkey, value, &stats); - have_stat_update = true; - } - mutex_.Lock(); - } - - if (have_stat_update && current->UpdateStats(stats)) { - MaybeScheduleCompaction(); - } - mem->Unref(); - if (imm != NULL) imm->Unref(); - current->Unref(); - return s; -} - -Iterator* DBImpl::NewIterator(const ReadOptions& options) { - SequenceNumber latest_snapshot; - uint32_t seed; - Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); - return NewDBIterator( - this, user_comparator(), iter, - (options.snapshot != NULL - ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_ - : latest_snapshot), - seed); -} - -void DBImpl::RecordReadSample(Slice key) { - MutexLock l(&mutex_); - if (versions_->current()->RecordReadSample(key)) { - MaybeScheduleCompaction(); - } -} - -const Snapshot* DBImpl::GetSnapshot() { - MutexLock l(&mutex_); - return snapshots_.New(versions_->LastSequence()); -} - -void DBImpl::ReleaseSnapshot(const Snapshot* s) { - MutexLock l(&mutex_); - snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s)); -} - -// Convenience methods -Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { - return DB::Put(o, key, val); -} - -Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { - return DB::Delete(options, key); -} - -Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { - Writer w(&mutex_); - w.batch = my_batch; - w.sync = options.sync; - w.done = false; - - MutexLock l(&mutex_); - writers_.push_back(&w); - while (!w.done && &w != writers_.front()) { - w.cv.Wait(); - } - if (w.done) { - return w.status; - } - - // May temporarily unlock and wait. - Status status = MakeRoomForWrite(my_batch == NULL); - uint64_t last_sequence = versions_->LastSequence(); - Writer* last_writer = &w; - if (status.ok() && my_batch != NULL) { // NULL batch is for compactions - WriteBatch* updates = BuildBatchGroup(&last_writer); - WriteBatchInternal::SetSequence(updates, last_sequence + 1); - last_sequence += WriteBatchInternal::Count(updates); - - // Add to log and apply to memtable. We can release the lock - // during this phase since &w is currently responsible for logging - // and protects against concurrent loggers and concurrent writes - // into mem_. - { - mutex_.Unlock(); - status = log_->AddRecord(WriteBatchInternal::Contents(updates)); - bool sync_error = false; - if (status.ok() && options.sync) { - status = logfile_->Sync(); - if (!status.ok()) { - sync_error = true; - } - } - if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, mem_); - } - mutex_.Lock(); - if (sync_error) { - // The state of the log file is indeterminate: the log record we - // just added may or may not show up when the DB is re-opened. - // So we force the DB into a mode where all future writes fail. - RecordBackgroundError(status); - } - } - if (updates == tmp_batch_) tmp_batch_->Clear(); - - versions_->SetLastSequence(last_sequence); - } - - while (true) { - Writer* ready = writers_.front(); - writers_.pop_front(); - if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); - } - if (ready == last_writer) break; - } - - // Notify new head of write queue - if (!writers_.empty()) { - writers_.front()->cv.Signal(); - } - - return status; -} - -// REQUIRES: Writer list must be non-empty -// REQUIRES: First writer must have a non-NULL batch -WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { - assert(!writers_.empty()); - Writer* first = writers_.front(); - WriteBatch* result = first->batch; - assert(result != NULL); - - size_t size = WriteBatchInternal::ByteSize(first->batch); - - // Allow the group to grow up to a maximum size, but if the - // original write is small, limit the growth so we do not slow - // down the small write too much. - size_t max_size = 1 << 20; - if (size <= (128<<10)) { - max_size = size + (128<<10); - } - - *last_writer = first; - std::deque<Writer*>::iterator iter = writers_.begin(); - ++iter; // Advance past "first" - for (; iter != writers_.end(); ++iter) { - Writer* w = *iter; - if (w->sync && !first->sync) { - // Do not include a sync write into a batch handled by a non-sync write. - break; - } - - if (w->batch != NULL) { - size += WriteBatchInternal::ByteSize(w->batch); - if (size > max_size) { - // Do not make batch too big - break; - } - - // Append to *result - if (result == first->batch) { - // Switch to temporary batch instead of disturbing caller's batch - result = tmp_batch_; - assert(WriteBatchInternal::Count(result) == 0); - WriteBatchInternal::Append(result, first->batch); - } - WriteBatchInternal::Append(result, w->batch); - } - *last_writer = w; - } - return result; -} - -// REQUIRES: mutex_ is held -// REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::MakeRoomForWrite(bool force) { - mutex_.AssertHeld(); - assert(!writers_.empty()); - bool allow_delay = !force; - Status s; - while (true) { - if (!bg_error_.ok()) { - // Yield previous error - s = bg_error_; - break; - } else if ( - allow_delay && - versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) { - // We are getting close to hitting a hard limit on the number of - // L0 files. Rather than delaying a single write by several - // seconds when we hit the hard limit, start delaying each - // individual write by 1ms to reduce latency variance. Also, - // this delay hands over some CPU to the compaction thread in - // case it is sharing the same core as the writer. - mutex_.Unlock(); - env_->SleepForMicroseconds(1000); - allow_delay = false; // Do not delay a single write more than once - mutex_.Lock(); - } else if (!force && - (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { - // There is room in current memtable - break; - } else if (imm_ != NULL) { - // We have filled up the current memtable, but the previous - // one is still being compacted, so we wait. - Log(options_.info_log, "Current memtable full; waiting...\n"); - bg_cv_.Wait(); - } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { - // There are too many level-0 files. - Log(options_.info_log, "Too many L0 files; waiting...\n"); - bg_cv_.Wait(); - } else { - // Attempt to switch to a new memtable and trigger compaction of old - assert(versions_->PrevLogNumber() == 0); - uint64_t new_log_number = versions_->NewFileNumber(); - WritableFile* lfile = NULL; - s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); - if (!s.ok()) { - // Avoid chewing through file number space in a tight loop. - versions_->ReuseFileNumber(new_log_number); - break; - } - delete log_; - delete logfile_; - logfile_ = lfile; - logfile_number_ = new_log_number; - log_ = new log::Writer(lfile); - imm_ = mem_; - has_imm_.Release_Store(imm_); - mem_ = new MemTable(internal_comparator_); - mem_->Ref(); - force = false; // Do not force another compaction if have room - MaybeScheduleCompaction(); - } - } - return s; -} - -bool DBImpl::GetProperty(const Slice& property, std::string* value) { - value->clear(); - - MutexLock l(&mutex_); - Slice in = property; - Slice prefix("leveldb."); - if (!in.starts_with(prefix)) return false; - in.remove_prefix(prefix.size()); - - if (in.starts_with("num-files-at-level")) { - in.remove_prefix(strlen("num-files-at-level")); - uint64_t level; - bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); - if (!ok || level >= config::kNumLevels) { - return false; - } else { - char buf[100]; -#ifdef _WIN32 - snprintf(buf, sizeof(buf), "%Iu", -#else - snprintf(buf, sizeof(buf), "%zu", -#endif - versions_->NumLevelFiles(static_cast<int>(level))); - *value = buf; - return true; - } - } else if (in == "stats") { - char buf[200]; - snprintf(buf, sizeof(buf), - " Compactions\n" - "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" - "--------------------------------------------------\n" - ); - value->append(buf); - for (int level = 0; level < config::kNumLevels; level++) { - size_t files = versions_->NumLevelFiles(level); - if (stats_[level].micros > 0 || files > 0) { - snprintf( - buf, sizeof(buf), -#ifdef _WIN32 - "%3d %8Iu %8.0f %9.0f %8.0f %9.0f\n", -#else - "%3d %8zu %8.0f %9.0f %8.0f %9.0f\n", -#endif - level, - files, - versions_->NumLevelBytes(level) / 1048576.0, - stats_[level].micros / 1e6, - stats_[level].bytes_read / 1048576.0, - stats_[level].bytes_written / 1048576.0); - value->append(buf); - } - } - return true; - } else if (in == "sstables") { - *value = versions_->current()->DebugString(); - return true; - } - - return false; -} - -void DBImpl::GetApproximateSizes( - const Range* range, int n, - uint64_t* sizes) { - // TODO(opt): better implementation - Version* v; - { - MutexLock l(&mutex_); - versions_->current()->Ref(); - v = versions_->current(); - } - - for (int i = 0; i < n; i++) { - // Convert user_key into a corresponding internal key. - InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); - InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); - uint64_t start = versions_->ApproximateOffsetOf(v, k1); - uint64_t limit = versions_->ApproximateOffsetOf(v, k2); - sizes[i] = (limit >= start ? limit - start : 0); - } - - { - MutexLock l(&mutex_); - v->Unref(); - } -} - -// Default implementations of convenience methods that subclasses of DB -// can call if they wish -Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { - WriteBatch batch; - batch.Put(key, value); - return Write(opt, &batch); -} - -Status DB::Delete(const WriteOptions& opt, const Slice& key) { - WriteBatch batch; - batch.Delete(key); - return Write(opt, &batch); -} - -DB::~DB() { } - -Status DB::Open(const Options& options, const std::string& dbname, - DB** dbptr) { - *dbptr = NULL; - - DBImpl* impl = new DBImpl(options, dbname); - impl->mutex_.Lock(); - VersionEdit edit; - Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists - if (s.ok()) { - uint64_t new_log_number = impl->versions_->NewFileNumber(); - WritableFile* lfile; - s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), - &lfile); - if (s.ok()) { - edit.SetLogNumber(new_log_number); - impl->logfile_ = lfile; - impl->logfile_number_ = new_log_number; - impl->log_ = new log::Writer(lfile); - s = impl->versions_->LogAndApply(&edit, &impl->mutex_); - } - if (s.ok()) { - impl->DeleteObsoleteFiles(); - impl->MaybeScheduleCompaction(); - } - } - impl->mutex_.Unlock(); - if (s.ok()) { - *dbptr = impl; - } else { - delete impl; - } - return s; -} - -Snapshot::~Snapshot() { -} - -Status DestroyDB(const std::string& dbname, const Options& options) { - Env* env = options.env; - std::vector<std::string> filenames; - // Ignore error in case directory does not exist - env->GetChildren(dbname, &filenames); - if (filenames.empty()) { - return Status::OK(); - } - - FileLock* lock; - const std::string lockname = LockFileName(dbname); - Status result = env->LockFile(lockname, &lock); - if (result.ok()) { - uint64_t number; - FileType type; - for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type) && - type != kDBLockFile) { // Lock file will be deleted at end - Status del = env->DeleteFile(dbname + "/" + filenames[i]); - if (result.ok() && !del.ok()) { - result = del; - } - } - } - env->UnlockFile(lock); // Ignore error since state is already gone - env->DeleteFile(lockname); - env->DeleteDir(dbname); // Ignore error in case dir contains other files - } - return result; -} - -} // namespace leveldb
