http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/log_reader.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/log_reader.cc b/thirdparty/rocksdb/db/log_reader.cc new file mode 100644 index 0000000..cae5d8e --- /dev/null +++ b/thirdparty/rocksdb/db/log_reader.cc @@ -0,0 +1,432 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/log_reader.h" + +#include <stdio.h> +#include "rocksdb/env.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/file_reader_writer.h" + +namespace rocksdb { +namespace log { + +Reader::Reporter::~Reporter() { +} + +Reader::Reader(std::shared_ptr<Logger> info_log, + unique_ptr<SequentialFileReader>&& _file, Reporter* reporter, + bool checksum, uint64_t initial_offset, uint64_t log_num) + : info_log_(info_log), + file_(std::move(_file)), + reporter_(reporter), + checksum_(checksum), + backing_store_(new char[kBlockSize]), + buffer_(), + eof_(false), + read_error_(false), + eof_offset_(0), + last_record_offset_(0), + end_of_buffer_offset_(0), + initial_offset_(initial_offset), + log_number_(log_num), + recycled_(false) {} + +Reader::~Reader() { + delete[] backing_store_; +} + +bool Reader::SkipToInitialBlock() { + size_t initial_offset_in_block = initial_offset_ % kBlockSize; + uint64_t block_start_location = initial_offset_ - initial_offset_in_block; + + // Don't search a block if we'd be in the trailer + if (initial_offset_in_block > kBlockSize - 6) { + block_start_location += kBlockSize; + } + + end_of_buffer_offset_ = block_start_location; + + // Skip to start of first block that can contain the initial record + if (block_start_location > 0) { + Status skip_status = file_->Skip(block_start_location); + if (!skip_status.ok()) { + ReportDrop(static_cast<size_t>(block_start_location), skip_status); + return false; + } + } + + return true; +} + +// For kAbsoluteConsistency, on clean shutdown we don't expect any error +// in the log files. For other modes, we can ignore only incomplete records +// in the last log file, which are presumably due to a write in progress +// during restart (or from log recycling). +// +// TODO krad: Evaluate if we need to move to a more strict mode where we +// restrict the inconsistency to only the last log +bool Reader::ReadRecord(Slice* record, std::string* scratch, + WALRecoveryMode wal_recovery_mode) { + if (last_record_offset_ < initial_offset_) { + if (!SkipToInitialBlock()) { + return false; + } + } + + scratch->clear(); + record->clear(); + bool in_fragmented_record = false; + // Record offset of the logical record that we're reading + // 0 is a dummy value to make compilers happy + uint64_t prospective_record_offset = 0; + + Slice fragment; + while (true) { + uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); + size_t drop_size = 0; + const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size); + switch (record_type) { + case kFullType: + case kRecyclableFullType: + if (in_fragmented_record && !scratch->empty()) { + // Handle bug in earlier versions of log::Writer where + // it could emit an empty kFirstType record at the tail end + // of a block followed by a kFullType or kFirstType record + // at the beginning of the next block. + ReportCorruption(scratch->size(), "partial record without end(1)"); + } + prospective_record_offset = physical_record_offset; + scratch->clear(); + *record = fragment; + last_record_offset_ = prospective_record_offset; + return true; + + case kFirstType: + case kRecyclableFirstType: + if (in_fragmented_record && !scratch->empty()) { + // Handle bug in earlier versions of log::Writer where + // it could emit an empty kFirstType record at the tail end + // of a block followed by a kFullType or kFirstType record + // at the beginning of the next block. + ReportCorruption(scratch->size(), "partial record without end(2)"); + } + prospective_record_offset = physical_record_offset; + scratch->assign(fragment.data(), fragment.size()); + in_fragmented_record = true; + break; + + case kMiddleType: + case kRecyclableMiddleType: + if (!in_fragmented_record) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(1)"); + } else { + scratch->append(fragment.data(), fragment.size()); + } + break; + + case kLastType: + case kRecyclableLastType: + if (!in_fragmented_record) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(2)"); + } else { + scratch->append(fragment.data(), fragment.size()); + *record = Slice(*scratch); + last_record_offset_ = prospective_record_offset; + return true; + } + break; + + case kBadHeader: + if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { + // in clean shutdown we don't expect any error in the log files + ReportCorruption(drop_size, "truncated header"); + } + // fall-thru + + case kEof: + if (in_fragmented_record) { + if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { + // in clean shutdown we don't expect any error in the log files + ReportCorruption(scratch->size(), "error reading trailing data"); + } + // This can be caused by the writer dying immediately after + // writing a physical record but before completing the next; don't + // treat it as a corruption, just ignore the entire logical record. + scratch->clear(); + } + return false; + + case kOldRecord: + if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) { + // Treat a record from a previous instance of the log as EOF. + if (in_fragmented_record) { + if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) { + // in clean shutdown we don't expect any error in the log files + ReportCorruption(scratch->size(), "error reading trailing data"); + } + // This can be caused by the writer dying immediately after + // writing a physical record but before completing the next; don't + // treat it as a corruption, just ignore the entire logical record. + scratch->clear(); + } + return false; + } + // fall-thru + + case kBadRecord: + if (in_fragmented_record) { + ReportCorruption(scratch->size(), "error in middle of record"); + in_fragmented_record = false; + scratch->clear(); + } + break; + + case kBadRecordLen: + case kBadRecordChecksum: + if (recycled_ && + wal_recovery_mode == + WALRecoveryMode::kTolerateCorruptedTailRecords) { + scratch->clear(); + return false; + } + if (record_type == kBadRecordLen) { + ReportCorruption(drop_size, "bad record length"); + } else { + ReportCorruption(drop_size, "checksum mismatch"); + } + if (in_fragmented_record) { + ReportCorruption(scratch->size(), "error in middle of record"); + in_fragmented_record = false; + scratch->clear(); + } + break; + + default: { + char buf[40]; + snprintf(buf, sizeof(buf), "unknown record type %u", record_type); + ReportCorruption( + (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), + buf); + in_fragmented_record = false; + scratch->clear(); + break; + } + } + } + return false; +} + +uint64_t Reader::LastRecordOffset() { + return last_record_offset_; +} + +void Reader::UnmarkEOF() { + if (read_error_) { + return; + } + + eof_ = false; + + if (eof_offset_ == 0) { + return; + } + + // If the EOF was in the middle of a block (a partial block was read) we have + // to read the rest of the block as ReadPhysicalRecord can only read full + // blocks and expects the file position indicator to be aligned to the start + // of a block. + // + // consumed_bytes + buffer_size() + remaining == kBlockSize + + size_t consumed_bytes = eof_offset_ - buffer_.size(); + size_t remaining = kBlockSize - eof_offset_; + + // backing_store_ is used to concatenate what is left in buffer_ and + // the remainder of the block. If buffer_ already uses backing_store_, + // we just append the new data. + if (buffer_.data() != backing_store_ + consumed_bytes) { + // Buffer_ does not use backing_store_ for storage. + // Copy what is left in buffer_ to backing_store. + memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); + } + + Slice read_buffer; + Status status = file_->Read(remaining, &read_buffer, + backing_store_ + eof_offset_); + + size_t added = read_buffer.size(); + end_of_buffer_offset_ += added; + + if (!status.ok()) { + if (added > 0) { + ReportDrop(added, status); + } + + read_error_ = true; + return; + } + + if (read_buffer.data() != backing_store_ + eof_offset_) { + // Read did not write to backing_store_ + memmove(backing_store_ + eof_offset_, read_buffer.data(), + read_buffer.size()); + } + + buffer_ = Slice(backing_store_ + consumed_bytes, + eof_offset_ + added - consumed_bytes); + + if (added < remaining) { + eof_ = true; + eof_offset_ += added; + } else { + eof_offset_ = 0; + } +} + +void Reader::ReportCorruption(size_t bytes, const char* reason) { + ReportDrop(bytes, Status::Corruption(reason)); +} + +void Reader::ReportDrop(size_t bytes, const Status& reason) { + if (reporter_ != nullptr && + end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) { + reporter_->Corruption(bytes, reason); + } +} + +bool Reader::ReadMore(size_t* drop_size, int *error) { + if (!eof_ && !read_error_) { + // Last read was a full read, so this is a trailer to skip + buffer_.clear(); + Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + end_of_buffer_offset_ += buffer_.size(); + if (!status.ok()) { + buffer_.clear(); + ReportDrop(kBlockSize, status); + read_error_ = true; + *error = kEof; + return false; + } else if (buffer_.size() < (size_t)kBlockSize) { + eof_ = true; + eof_offset_ = buffer_.size(); + } + return true; + } else { + // Note that if buffer_ is non-empty, we have a truncated header at the + // end of the file, which can be caused by the writer crashing in the + // middle of writing the header. Unless explicitly requested we don't + // considering this an error, just report EOF. + if (buffer_.size()) { + *drop_size = buffer_.size(); + buffer_.clear(); + *error = kBadHeader; + return false; + } + buffer_.clear(); + *error = kEof; + return false; + } +} + +unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { + while (true) { + // We need at least the minimum header size + if (buffer_.size() < (size_t)kHeaderSize) { + int r; + if (!ReadMore(drop_size, &r)) { + return r; + } + continue; + } + + // Parse the header + const char* header = buffer_.data(); + const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; + const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; + const unsigned int type = header[6]; + const uint32_t length = a | (b << 8); + int header_size = kHeaderSize; + if (type >= kRecyclableFullType && type <= kRecyclableLastType) { + if (end_of_buffer_offset_ - buffer_.size() == 0) { + recycled_ = true; + } + header_size = kRecyclableHeaderSize; + // We need enough for the larger header + if (buffer_.size() < (size_t)kRecyclableHeaderSize) { + int r; + if (!ReadMore(drop_size, &r)) { + return r; + } + continue; + } + const uint32_t log_num = DecodeFixed32(header + 7); + if (log_num != log_number_) { + return kOldRecord; + } + } + if (header_size + length > buffer_.size()) { + *drop_size = buffer_.size(); + buffer_.clear(); + if (!eof_) { + return kBadRecordLen; + } + // If the end of the file has been reached without reading |length| bytes + // of payload, assume the writer died in the middle of writing the record. + // Don't report a corruption unless requested. + if (*drop_size) { + return kBadHeader; + } + return kEof; + } + + if (type == kZeroType && length == 0) { + // Skip zero length record without reporting any drops since + // such records are produced by the mmap based writing code in + // env_posix.cc that preallocates file regions. + // NOTE: this should never happen in DB written by new RocksDB versions, + // since we turn off mmap writes to manifest and log files + buffer_.clear(); + return kBadRecord; + } + + // Check crc + if (checksum_) { + uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); + uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); + if (actual_crc != expected_crc) { + // Drop the rest of the buffer since "length" itself may have + // been corrupted and if we trust it, we could find some + // fragment of a real log record that just happens to look + // like a valid log record. + *drop_size = buffer_.size(); + buffer_.clear(); + return kBadRecordChecksum; + } + } + + buffer_.remove_prefix(header_size + length); + + // Skip physical record that started before initial_offset_ + if (end_of_buffer_offset_ - buffer_.size() - header_size - length < + initial_offset_) { + result->clear(); + return kBadRecord; + } + + *result = Slice(header + header_size, length); + return type; + } +} + +} // namespace log +} // namespace rocksdb
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/log_reader.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/log_reader.h b/thirdparty/rocksdb/db/log_reader.h new file mode 100644 index 0000000..c6a471c --- /dev/null +++ b/thirdparty/rocksdb/db/log_reader.h @@ -0,0 +1,160 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include <memory> +#include <stdint.h> + +#include "db/log_format.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "rocksdb/options.h" + +namespace rocksdb { + +class SequentialFileReader; +class Logger; +using std::unique_ptr; + +namespace log { + +/** + * Reader is a general purpose log stream reader implementation. The actual job + * of reading from the device is implemented by the SequentialFile interface. + * + * Please see Writer for details on the file and record layout. + */ +class Reader { + public: + // Interface for reporting errors. + class Reporter { + public: + virtual ~Reporter(); + + // Some corruption was detected. "size" is the approximate number + // of bytes dropped due to the corruption. + virtual void Corruption(size_t bytes, const Status& status) = 0; + }; + + // Create a reader that will return log records from "*file". + // "*file" must remain live while this Reader is in use. + // + // If "reporter" is non-nullptr, it is notified whenever some data is + // dropped due to a detected corruption. "*reporter" must remain + // live while this Reader is in use. + // + // If "checksum" is true, verify checksums if available. + // + // The Reader will start reading at the first record located at physical + // position >= initial_offset within the file. + Reader(std::shared_ptr<Logger> info_log, + unique_ptr<SequentialFileReader>&& file, + Reporter* reporter, bool checksum, uint64_t initial_offset, + uint64_t log_num); + + ~Reader(); + + // Read the next record into *record. Returns true if read + // successfully, false if we hit end of the input. May use + // "*scratch" as temporary storage. The contents filled in *record + // will only be valid until the next mutating operation on this + // reader or the next mutation to *scratch. + bool ReadRecord(Slice* record, std::string* scratch, + WALRecoveryMode wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords); + + // Returns the physical offset of the last record returned by ReadRecord. + // + // Undefined before the first call to ReadRecord. + uint64_t LastRecordOffset(); + + // returns true if the reader has encountered an eof condition. + bool IsEOF() { + return eof_; + } + + // when we know more data has been written to the file. we can use this + // function to force the reader to look again in the file. + // Also aligns the file position indicator to the start of the next block + // by reading the rest of the data from the EOF position to the end of the + // block that was partially read. + void UnmarkEOF(); + + SequentialFileReader* file() { return file_.get(); } + + private: + std::shared_ptr<Logger> info_log_; + const unique_ptr<SequentialFileReader> file_; + Reporter* const reporter_; + bool const checksum_; + char* const backing_store_; + Slice buffer_; + bool eof_; // Last Read() indicated EOF by returning < kBlockSize + bool read_error_; // Error occurred while reading from file + + // Offset of the file position indicator within the last block when an + // EOF was detected. + size_t eof_offset_; + + // Offset of the last record returned by ReadRecord. + uint64_t last_record_offset_; + // Offset of the first location past the end of buffer_. + uint64_t end_of_buffer_offset_; + + // Offset at which to start looking for the first record to return + uint64_t const initial_offset_; + + // which log number this is + uint64_t const log_number_; + + // Whether this is a recycled log file + bool recycled_; + + // Extend record types with the following special values + enum { + kEof = kMaxRecordType + 1, + // Returned whenever we find an invalid physical record. + // Currently there are three situations in which this happens: + // * The record has an invalid CRC (ReadPhysicalRecord reports a drop) + // * The record is a 0-length record (No drop is reported) + // * The record is below constructor's initial_offset (No drop is reported) + kBadRecord = kMaxRecordType + 2, + // Returned when we fail to read a valid header. + kBadHeader = kMaxRecordType + 3, + // Returned when we read an old record from a previous user of the log. + kOldRecord = kMaxRecordType + 4, + // Returned when we get a bad record length + kBadRecordLen = kMaxRecordType + 5, + // Returned when we get a bad record checksum + kBadRecordChecksum = kMaxRecordType + 6, + }; + + // Skips all blocks that are completely before "initial_offset_". + // + // Returns true on success. Handles reporting. + bool SkipToInitialBlock(); + + // Return type, or one of the preceding special values + unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size); + + // Read some more + bool ReadMore(size_t* drop_size, int *error); + + // Reports dropped bytes to the reporter. + // buffer_ must be updated to remove the dropped bytes prior to invocation. + void ReportCorruption(size_t bytes, const char* reason); + void ReportDrop(size_t bytes, const Status& reason); + + // No copying allowed + Reader(const Reader&); + void operator=(const Reader&); +}; + +} // namespace log +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/log_writer.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/log_writer.cc b/thirdparty/rocksdb/db/log_writer.cc new file mode 100644 index 0000000..b02eec8 --- /dev/null +++ b/thirdparty/rocksdb/db/log_writer.cc @@ -0,0 +1,142 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/log_writer.h" + +#include <stdint.h> +#include "rocksdb/env.h" +#include "util/coding.h" +#include "util/crc32c.h" +#include "util/file_reader_writer.h" + +namespace rocksdb { +namespace log { + +Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number, + bool recycle_log_files, bool manual_flush) + : dest_(std::move(dest)), + block_offset_(0), + log_number_(log_number), + recycle_log_files_(recycle_log_files), + manual_flush_(manual_flush) { + for (int i = 0; i <= kMaxRecordType; i++) { + char t = static_cast<char>(i); + type_crc_[i] = crc32c::Value(&t, 1); + } +} + +Writer::~Writer() { WriteBuffer(); } + +Status Writer::WriteBuffer() { return dest_->Flush(); } + +Status Writer::AddRecord(const Slice& slice) { + const char* ptr = slice.data(); + size_t left = slice.size(); + + // Header size varies depending on whether we are recycling or not. + const int header_size = + recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize; + + // Fragment the record if necessary and emit it. Note that if slice + // is empty, we still want to iterate once to emit a single + // zero-length record + Status s; + bool begin = true; + do { + const int64_t leftover = kBlockSize - block_offset_; + assert(leftover >= 0); + if (leftover < header_size) { + // Switch to a new block + if (leftover > 0) { + // Fill the trailer (literal below relies on kHeaderSize and + // kRecyclableHeaderSize being <= 11) + assert(header_size <= 11); + dest_->Append( + Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", leftover)); + } + block_offset_ = 0; + } + + // Invariant: we never leave < header_size bytes in a block. + assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size); + + const size_t avail = kBlockSize - block_offset_ - header_size; + const size_t fragment_length = (left < avail) ? left : avail; + + RecordType type; + const bool end = (left == fragment_length); + if (begin && end) { + type = recycle_log_files_ ? kRecyclableFullType : kFullType; + } else if (begin) { + type = recycle_log_files_ ? kRecyclableFirstType : kFirstType; + } else if (end) { + type = recycle_log_files_ ? kRecyclableLastType : kLastType; + } else { + type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType; + } + + s = EmitPhysicalRecord(type, ptr, fragment_length); + ptr += fragment_length; + left -= fragment_length; + begin = false; + } while (s.ok() && left > 0); + return s; +} + +Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { + assert(n <= 0xffff); // Must fit in two bytes + + size_t header_size; + char buf[kRecyclableHeaderSize]; + + // Format the header + buf[4] = static_cast<char>(n & 0xff); + buf[5] = static_cast<char>(n >> 8); + buf[6] = static_cast<char>(t); + + uint32_t crc = type_crc_[t]; + if (t < kRecyclableFullType) { + // Legacy record format + assert(block_offset_ + kHeaderSize + n <= kBlockSize); + header_size = kHeaderSize; + } else { + // Recyclable record format + assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize); + header_size = kRecyclableHeaderSize; + + // Only encode low 32-bits of the 64-bit log number. This means + // we will fail to detect an old record if we recycled a log from + // ~4 billion logs ago, but that is effectively impossible, and + // even if it were we'dbe far more likely to see a false positive + // on the 32-bit CRC. + EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_)); + crc = crc32c::Extend(crc, buf + 7, 4); + } + + // Compute the crc of the record type and the payload. + crc = crc32c::Extend(crc, ptr, n); + crc = crc32c::Mask(crc); // Adjust for storage + EncodeFixed32(buf, crc); + + // Write the header and the payload + Status s = dest_->Append(Slice(buf, header_size)); + if (s.ok()) { + s = dest_->Append(Slice(ptr, n)); + if (s.ok()) { + if (!manual_flush_) { + s = dest_->Flush(); + } + } + } + block_offset_ += header_size + n; + return s; +} + +} // namespace log +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/log_writer.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/log_writer.h b/thirdparty/rocksdb/db/log_writer.h new file mode 100644 index 0000000..a3a8799 --- /dev/null +++ b/thirdparty/rocksdb/db/log_writer.h @@ -0,0 +1,111 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include <stdint.h> + +#include <memory> + +#include "db/log_format.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +class WritableFileWriter; + +using std::unique_ptr; + +namespace log { + +/** + * Writer is a general purpose log stream writer. It provides an append-only + * abstraction for writing data. The details of the how the data is written is + * handled by the WriteableFile sub-class implementation. + * + * File format: + * + * File is broken down into variable sized records. The format of each record + * is described below. + * +-----+-------------+--+----+----------+------+-- ... ----+ + * File | r0 | r1 |P | r2 | r3 | r4 | | + * +-----+-------------+--+----+----------+------+-- ... ----+ + * <--- kBlockSize ------>|<-- kBlockSize ------>| + * rn = variable size records + * P = Padding + * + * Data is written out in kBlockSize chunks. If next record does not fit + * into the space left, the leftover space will be padded with \0. + * + * Legacy record format: + * + * +---------+-----------+-----------+--- ... ---+ + * |CRC (4B) | Size (2B) | Type (1B) | Payload | + * +---------+-----------+-----------+--- ... ---+ + * + * CRC = 32bit hash computed over the payload using CRC + * Size = Length of the payload data + * Type = Type of record + * (kZeroType, kFullType, kFirstType, kLastType, kMiddleType ) + * The type is used to group a bunch of records together to represent + * blocks that are larger than kBlockSize + * Payload = Byte stream as long as specified by the payload size + * + * Recyclable record format: + * + * +---------+-----------+-----------+----------------+--- ... ---+ + * |CRC (4B) | Size (2B) | Type (1B) | Log number (4B)| Payload | + * +---------+-----------+-----------+----------------+--- ... ---+ + * + * Same as above, with the addition of + * Log number = 32bit log file number, so that we can distinguish between + * records written by the most recent log writer vs a previous one. + */ +class Writer { + public: + // Create a writer that will append data to "*dest". + // "*dest" must be initially empty. + // "*dest" must remain live while this Writer is in use. + explicit Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number, + bool recycle_log_files, bool manual_flush = false); + ~Writer(); + + Status AddRecord(const Slice& slice); + + WritableFileWriter* file() { return dest_.get(); } + const WritableFileWriter* file() const { return dest_.get(); } + + uint64_t get_log_number() const { return log_number_; } + + Status WriteBuffer(); + + private: + unique_ptr<WritableFileWriter> dest_; + size_t block_offset_; // Current offset in block + uint64_t log_number_; + bool recycle_log_files_; + + // crc32c values for all supported record types. These are + // pre-computed to reduce the overhead of computing the crc of the + // record type stored in the header. + uint32_t type_crc_[kMaxRecordType + 1]; + + Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); + + // If true, it does not flush after each write. Instead it relies on the upper + // layer to manually does the flush by calling ::WriteBuffer() + bool manual_flush_; + + // No copying allowed + Writer(const Writer&); + void operator=(const Writer&); +}; + +} // namespace log +} // namespace rocksdb http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/malloc_stats.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/malloc_stats.cc b/thirdparty/rocksdb/db/malloc_stats.cc new file mode 100644 index 0000000..7acca65 --- /dev/null +++ b/thirdparty/rocksdb/db/malloc_stats.cc @@ -0,0 +1,52 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/malloc_stats.h" + +#ifndef ROCKSDB_LITE +#include <memory> +#include <string.h> + +namespace rocksdb { + +#ifdef ROCKSDB_JEMALLOC +#include "jemalloc/jemalloc.h" + +typedef struct { + char* cur; + char* end; +} MallocStatus; + +static void GetJemallocStatus(void* mstat_arg, const char* status) { + MallocStatus* mstat = reinterpret_cast<MallocStatus*>(mstat_arg); + size_t status_len = status ? strlen(status) : 0; + size_t buf_size = (size_t)(mstat->end - mstat->cur); + if (!status_len || status_len > buf_size) { + return; + } + + snprintf(mstat->cur, buf_size, "%s", status); + mstat->cur += status_len; +} +#endif // ROCKSDB_JEMALLOC + +void DumpMallocStats(std::string* stats) { +#ifdef ROCKSDB_JEMALLOC + MallocStatus mstat; + const unsigned int kMallocStatusLen = 1000000; + std::unique_ptr<char[]> buf{new char[kMallocStatusLen + 1]}; + mstat.cur = buf.get(); + mstat.end = buf.get() + kMallocStatusLen; + je_malloc_stats_print(GetJemallocStatus, &mstat, ""); + stats->append(buf.get()); +#endif // ROCKSDB_JEMALLOC +} + +} +#endif // !ROCKSDB_LITE http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/malloc_stats.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/malloc_stats.h b/thirdparty/rocksdb/db/malloc_stats.h new file mode 100644 index 0000000..a2f324f --- /dev/null +++ b/thirdparty/rocksdb/db/malloc_stats.h @@ -0,0 +1,22 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#ifndef ROCKSDB_LITE + +#include <string> + +namespace rocksdb { + +void DumpMallocStats(std::string*); + +} + +#endif // !ROCKSDB_LITE http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/managed_iterator.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/managed_iterator.cc b/thirdparty/rocksdb/db/managed_iterator.cc new file mode 100644 index 0000000..c393eb5 --- /dev/null +++ b/thirdparty/rocksdb/db/managed_iterator.cc @@ -0,0 +1,262 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "db/managed_iterator.h" + +#include <limits> +#include <string> +#include <utility> + +#include "db/column_family.h" +#include "db/db_impl.h" +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "table/merging_iterator.h" + +namespace rocksdb { + +namespace { +// Helper class that locks a mutex on construction and unlocks the mutex when +// the destructor of the MutexLock object is invoked. +// +// Typical usage: +// +// void MyClass::MyMethod() { +// MILock l(&mu_); // mu_ is an instance variable +// ... some complex code, possibly with multiple return paths ... +// } + +class MILock { + public: + explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) { + this->mu_->lock(); + } + ~MILock() { + this->mu_->unlock(); + } + ManagedIterator* GetManagedIterator() { return mi_; } + + private: + std::mutex* const mu_; + ManagedIterator* mi_; + // No copying allowed + MILock(const MILock&) = delete; + void operator=(const MILock&) = delete; +}; +} // anonymous namespace + +// +// Synchronization between modifiers, releasers, creators +// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use +// if modifying mutable_iter, atomically exchange in_use: +// return if in_use set / otherwise set in use, +// atomically replace new iter with old , reset in use +// The releaser is the new operation and it holds a lock for a very short time +// The existing non-const iterator operations are supposed to be single +// threaded and hold the lock for the duration of the operation +// The existing const iterator operations use the cached key/values +// and don't do any locking. +ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd) + : db_(db), + read_options_(read_options), + cfd_(cfd), + svnum_(cfd->GetSuperVersionNumber()), + mutable_iter_(nullptr), + valid_(false), + snapshot_created_(false), + release_supported_(true) { + read_options_.managed = false; + if ((!read_options_.tailing) && (read_options_.snapshot == nullptr)) { + assert(nullptr != (read_options_.snapshot = db_->GetSnapshot())); + snapshot_created_ = true; + } + cfh_.SetCFD(cfd); + mutable_iter_ = unique_ptr<Iterator>(db->NewIterator(read_options_, &cfh_)); +} + +ManagedIterator::~ManagedIterator() { + Lock(); + if (snapshot_created_) { + db_->ReleaseSnapshot(read_options_.snapshot); + snapshot_created_ = false; + read_options_.snapshot = nullptr; + } + UnLock(); +} + +bool ManagedIterator::Valid() const { return valid_; } + +void ManagedIterator::SeekToLast() { + MILock l(&in_use_, this); + if (NeedToRebuild()) { + RebuildIterator(); + } + assert(mutable_iter_ != nullptr); + mutable_iter_->SeekToLast(); + if (mutable_iter_->status().ok()) { + UpdateCurrent(); + } +} + +void ManagedIterator::SeekToFirst() { + MILock l(&in_use_, this); + SeekInternal(Slice(), true); +} + +void ManagedIterator::Seek(const Slice& user_key) { + MILock l(&in_use_, this); + SeekInternal(user_key, false); +} + +void ManagedIterator::SeekForPrev(const Slice& user_key) { + MILock l(&in_use_, this); + if (NeedToRebuild()) { + RebuildIterator(); + } + assert(mutable_iter_ != nullptr); + mutable_iter_->SeekForPrev(user_key); + UpdateCurrent(); +} + +void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) { + if (NeedToRebuild()) { + RebuildIterator(); + } + assert(mutable_iter_ != nullptr); + if (seek_to_first) { + mutable_iter_->SeekToFirst(); + } else { + mutable_iter_->Seek(user_key); + } + UpdateCurrent(); +} + +void ManagedIterator::Prev() { + if (!valid_) { + status_ = Status::InvalidArgument("Iterator value invalid"); + return; + } + MILock l(&in_use_, this); + if (NeedToRebuild()) { + std::string current_key = key().ToString(); + Slice old_key(current_key); + RebuildIterator(); + SeekInternal(old_key, false); + UpdateCurrent(); + if (!valid_) { + return; + } + if (key().compare(old_key) != 0) { + valid_ = false; + status_ = Status::Incomplete("Cannot do Prev now"); + return; + } + } + mutable_iter_->Prev(); + if (mutable_iter_->status().ok()) { + UpdateCurrent(); + status_ = Status::OK(); + } else { + status_ = mutable_iter_->status(); + } +} + +void ManagedIterator::Next() { + if (!valid_) { + status_ = Status::InvalidArgument("Iterator value invalid"); + return; + } + MILock l(&in_use_, this); + if (NeedToRebuild()) { + std::string current_key = key().ToString(); + Slice old_key(current_key.data(), cached_key_.Size()); + RebuildIterator(); + SeekInternal(old_key, false); + UpdateCurrent(); + if (!valid_) { + return; + } + if (key().compare(old_key) != 0) { + valid_ = false; + status_ = Status::Incomplete("Cannot do Next now"); + return; + } + } + mutable_iter_->Next(); + UpdateCurrent(); +} + +Slice ManagedIterator::key() const { + assert(valid_); + return cached_key_.GetUserKey(); +} + +Slice ManagedIterator::value() const { + assert(valid_); + return cached_value_.GetUserKey(); +} + +Status ManagedIterator::status() const { return status_; } + +void ManagedIterator::RebuildIterator() { + svnum_ = cfd_->GetSuperVersionNumber(); + mutable_iter_ = unique_ptr<Iterator>(db_->NewIterator(read_options_, &cfh_)); +} + +void ManagedIterator::UpdateCurrent() { + assert(mutable_iter_ != nullptr); + + valid_ = mutable_iter_->Valid(); + if (!valid_) { + status_ = mutable_iter_->status(); + return; + } + + status_ = Status::OK(); + cached_key_.SetUserKey(mutable_iter_->key()); + cached_value_.SetUserKey(mutable_iter_->value()); +} + +void ManagedIterator::ReleaseIter(bool only_old) { + if ((mutable_iter_ == nullptr) || (!release_supported_)) { + return; + } + if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) { + if (!TryLock()) { // Don't release iter if in use + return; + } + mutable_iter_ = nullptr; // in_use for a very short time + UnLock(); + } +} + +bool ManagedIterator::NeedToRebuild() { + if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) || + (!only_drop_old_ && (svnum_ != cfd_->GetSuperVersionNumber()))) { + return true; + } + return false; +} + +void ManagedIterator::Lock() { + in_use_.lock(); + return; +} + +bool ManagedIterator::TryLock() { return in_use_.try_lock(); } + +void ManagedIterator::UnLock() { + in_use_.unlock(); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/managed_iterator.h ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/managed_iterator.h b/thirdparty/rocksdb/db/managed_iterator.h new file mode 100644 index 0000000..8e962f7 --- /dev/null +++ b/thirdparty/rocksdb/db/managed_iterator.h @@ -0,0 +1,85 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#ifndef ROCKSDB_LITE + +#include <mutex> +#include <queue> +#include <string> +#include <vector> + +#include "db/column_family.h" +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "util/arena.h" + +namespace rocksdb { + +class DBImpl; +struct SuperVersion; +class ColumnFamilyData; + +/** + * ManagedIterator is a special type of iterator that supports freeing the + * underlying iterator and still being able to access the current key/value + * pair. This is done by copying the key/value pair so that clients can + * continue to access the data without getting a SIGSEGV. + * The underlying iterator can be freed manually through the call to + * ReleaseIter or automatically (as needed on space pressure or age.) + * The iterator is recreated using the saved original arguments. + */ +class ManagedIterator : public Iterator { + public: + ManagedIterator(DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd); + virtual ~ManagedIterator(); + + virtual void SeekToLast() override; + virtual void Prev() override; + virtual bool Valid() const override; + void SeekToFirst() override; + virtual void Seek(const Slice& target) override; + virtual void SeekForPrev(const Slice& target) override; + virtual void Next() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + void ReleaseIter(bool only_old); + void SetDropOld(bool only_old) { + only_drop_old_ = read_options_.tailing || only_old; + } + + private: + void RebuildIterator(); + void UpdateCurrent(); + void SeekInternal(const Slice& user_key, bool seek_to_first); + bool NeedToRebuild(); + void Lock(); + bool TryLock(); + void UnLock(); + DBImpl* const db_; + ReadOptions read_options_; + ColumnFamilyData* const cfd_; + ColumnFamilyHandleInternal cfh_; + + uint64_t svnum_; + std::unique_ptr<Iterator> mutable_iter_; + // internal iterator status + Status status_; + bool valid_; + + IterKey cached_key_; + IterKey cached_value_; + + bool only_drop_old_ = true; + bool snapshot_created_; + bool release_supported_; + std::mutex in_use_; // is managed iterator in use +}; + +} // namespace rocksdb +#endif // !ROCKSDB_LITE http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/48867732/thirdparty/rocksdb/db/memtable.cc ---------------------------------------------------------------------- diff --git a/thirdparty/rocksdb/db/memtable.cc b/thirdparty/rocksdb/db/memtable.cc new file mode 100644 index 0000000..efea619 --- /dev/null +++ b/thirdparty/rocksdb/db/memtable.cc @@ -0,0 +1,885 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/memtable.h" + +#include <memory> +#include <algorithm> + +#include "db/dbformat.h" +#include "db/merge_context.h" +#include "db/merge_helper.h" +#include "db/pinned_iterators_manager.h" +#include "monitoring/perf_context_imp.h" +#include "monitoring/statistics.h" +#include "port/port.h" +#include "rocksdb/comparator.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice_transform.h" +#include "rocksdb/write_buffer_manager.h" +#include "table/internal_iterator.h" +#include "table/iterator_wrapper.h" +#include "table/merging_iterator.h" +#include "util/arena.h" +#include "util/autovector.h" +#include "util/coding.h" +#include "util/memory_usage.h" +#include "util/murmurhash.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +MemTableOptions::MemTableOptions(const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options) + : write_buffer_size(mutable_cf_options.write_buffer_size), + arena_block_size(mutable_cf_options.arena_block_size), + memtable_prefix_bloom_bits( + static_cast<uint32_t>( + static_cast<double>(mutable_cf_options.write_buffer_size) * + mutable_cf_options.memtable_prefix_bloom_size_ratio) * + 8u), + memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size), + inplace_update_support(ioptions.inplace_update_support), + inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks), + inplace_callback(ioptions.inplace_callback), + max_successive_merges(mutable_cf_options.max_successive_merges), + statistics(ioptions.statistics), + merge_operator(ioptions.merge_operator), + info_log(ioptions.info_log) {} + +MemTable::MemTable(const InternalKeyComparator& cmp, + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + WriteBufferManager* write_buffer_manager, + SequenceNumber latest_seq, uint32_t column_family_id) + : comparator_(cmp), + moptions_(ioptions, mutable_cf_options), + refs_(0), + kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)), + mem_tracker_(write_buffer_manager), + arena_( + moptions_.arena_block_size, + (write_buffer_manager != nullptr && write_buffer_manager->enabled()) + ? &mem_tracker_ + : nullptr, + mutable_cf_options.memtable_huge_page_size), + table_(ioptions.memtable_factory->CreateMemTableRep( + comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log, + column_family_id)), + range_del_table_(SkipListFactory().CreateMemTableRep( + comparator_, &arena_, nullptr /* transform */, ioptions.info_log, + column_family_id)), + is_range_del_table_empty_(true), + data_size_(0), + num_entries_(0), + num_deletes_(0), + flush_in_progress_(false), + flush_completed_(false), + file_number_(0), + first_seqno_(0), + earliest_seqno_(latest_seq), + creation_seq_(latest_seq), + mem_next_logfile_number_(0), + min_prep_log_referenced_(0), + locks_(moptions_.inplace_update_support + ? moptions_.inplace_update_num_locks + : 0), + prefix_extractor_(ioptions.prefix_extractor), + flush_state_(FLUSH_NOT_REQUESTED), + env_(ioptions.env), + insert_with_hint_prefix_extractor_( + ioptions.memtable_insert_with_hint_prefix_extractor) { + UpdateFlushState(); + // something went wrong if we need to flush before inserting anything + assert(!ShouldScheduleFlush()); + + if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) { + prefix_bloom_.reset(new DynamicBloom( + &arena_, moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality, + 6 /* hard coded 6 probes */, nullptr, moptions_.memtable_huge_page_size, + ioptions.info_log)); + } +} + +MemTable::~MemTable() { + mem_tracker_.FreeMem(); + assert(refs_ == 0); +} + +size_t MemTable::ApproximateMemoryUsage() { + autovector<size_t> usages = {arena_.ApproximateMemoryUsage(), + table_->ApproximateMemoryUsage(), + range_del_table_->ApproximateMemoryUsage(), + rocksdb::ApproximateMemoryUsage(insert_hints_)}; + size_t total_usage = 0; + for (size_t usage : usages) { + // If usage + total_usage >= kMaxSizet, return kMaxSizet. + // the following variation is to avoid numeric overflow. + if (usage >= port::kMaxSizet - total_usage) { + return port::kMaxSizet; + } + total_usage += usage; + } + // otherwise, return the actual usage + return total_usage; +} + +bool MemTable::ShouldFlushNow() const { + // In a lot of times, we cannot allocate arena blocks that exactly matches the + // buffer size. Thus we have to decide if we should over-allocate or + // under-allocate. + // This constant variable can be interpreted as: if we still have more than + // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over + // allocate one more block. + const double kAllowOverAllocationRatio = 0.6; + + // If arena still have room for new block allocation, we can safely say it + // shouldn't flush. + auto allocated_memory = table_->ApproximateMemoryUsage() + + range_del_table_->ApproximateMemoryUsage() + + arena_.MemoryAllocatedBytes(); + + // if we can still allocate one more block without exceeding the + // over-allocation ratio, then we should not flush. + if (allocated_memory + kArenaBlockSize < + moptions_.write_buffer_size + + kArenaBlockSize * kAllowOverAllocationRatio) { + return false; + } + + // if user keeps adding entries that exceeds moptions.write_buffer_size, + // we need to flush earlier even though we still have much available + // memory left. + if (allocated_memory > moptions_.write_buffer_size + + kArenaBlockSize * kAllowOverAllocationRatio) { + return true; + } + + // In this code path, Arena has already allocated its "last block", which + // means the total allocatedmemory size is either: + // (1) "moderately" over allocated the memory (no more than `0.6 * arena + // block size`. Or, + // (2) the allocated memory is less than write buffer size, but we'll stop + // here since if we allocate a new arena block, we'll over allocate too much + // more (half of the arena block size) memory. + // + // In either case, to avoid over-allocate, the last block will stop allocation + // when its usage reaches a certain ratio, which we carefully choose "0.75 + // full" as the stop condition because it addresses the following issue with + // great simplicity: What if the next inserted entry's size is + // bigger than AllocatedAndUnused()? + // + // The answer is: if the entry size is also bigger than 0.25 * + // kArenaBlockSize, a dedicated block will be allocated for it; otherwise + // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty + // and regular block. In either case, we *overly* over-allocated. + // + // Therefore, setting the last block to be at most "0.75 full" avoids both + // cases. + // + // NOTE: the average percentage of waste space of this approach can be counted + // as: "arena block size * 0.25 / write buffer size". User who specify a small + // write buffer size and/or big arena block size may suffer. + return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; +} + +void MemTable::UpdateFlushState() { + auto state = flush_state_.load(std::memory_order_relaxed); + if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) { + // ignore CAS failure, because that means somebody else requested + // a flush + flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED, + std::memory_order_relaxed, + std::memory_order_relaxed); + } +} + +int MemTable::KeyComparator::operator()(const char* prefix_len_key1, + const char* prefix_len_key2) const { + // Internal keys are encoded as length-prefixed strings. + Slice k1 = GetLengthPrefixedSlice(prefix_len_key1); + Slice k2 = GetLengthPrefixedSlice(prefix_len_key2); + return comparator.Compare(k1, k2); +} + +int MemTable::KeyComparator::operator()(const char* prefix_len_key, + const Slice& key) + const { + // Internal keys are encoded as length-prefixed strings. + Slice a = GetLengthPrefixedSlice(prefix_len_key); + return comparator.Compare(a, key); +} + +Slice MemTableRep::UserKey(const char* key) const { + Slice slice = GetLengthPrefixedSlice(key); + return Slice(slice.data(), slice.size() - 8); +} + +KeyHandle MemTableRep::Allocate(const size_t len, char** buf) { + *buf = allocator_->Allocate(len); + return static_cast<KeyHandle>(*buf); +} + +// Encode a suitable internal key target for "target" and return it. +// Uses *scratch as scratch space, and the returned pointer will point +// into this scratch space. +const char* EncodeKey(std::string* scratch, const Slice& target) { + scratch->clear(); + PutVarint32(scratch, static_cast<uint32_t>(target.size())); + scratch->append(target.data(), target.size()); + return scratch->data(); +} + +class MemTableIterator : public InternalIterator { + public: + MemTableIterator(const MemTable& mem, const ReadOptions& read_options, + Arena* arena, bool use_range_del_table = false) + : bloom_(nullptr), + prefix_extractor_(mem.prefix_extractor_), + comparator_(mem.comparator_), + valid_(false), + arena_mode_(arena != nullptr), + value_pinned_(!mem.GetMemTableOptions()->inplace_update_support) { + if (use_range_del_table) { + iter_ = mem.range_del_table_->GetIterator(arena); + } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) { + bloom_ = mem.prefix_bloom_.get(); + iter_ = mem.table_->GetDynamicPrefixIterator(arena); + } else { + iter_ = mem.table_->GetIterator(arena); + } + } + + ~MemTableIterator() { +#ifndef NDEBUG + // Assert that the MemTableIterator is never deleted while + // Pinning is Enabled. + assert(!pinned_iters_mgr_ || + (pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled())); +#endif + if (arena_mode_) { + iter_->~Iterator(); + } else { + delete iter_; + } + } + +#ifndef NDEBUG + virtual void SetPinnedItersMgr( + PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + } + PinnedIteratorsManager* pinned_iters_mgr_ = nullptr; +#endif + + virtual bool Valid() const override { return valid_; } + virtual void Seek(const Slice& k) override { + PERF_TIMER_GUARD(seek_on_memtable_time); + PERF_COUNTER_ADD(seek_on_memtable_count, 1); + if (bloom_ != nullptr) { + if (!bloom_->MayContain( + prefix_extractor_->Transform(ExtractUserKey(k)))) { + PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); + valid_ = false; + return; + } else { + PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); + } + } + iter_->Seek(k, nullptr); + valid_ = iter_->Valid(); + } + virtual void SeekForPrev(const Slice& k) override { + PERF_TIMER_GUARD(seek_on_memtable_time); + PERF_COUNTER_ADD(seek_on_memtable_count, 1); + if (bloom_ != nullptr) { + if (!bloom_->MayContain( + prefix_extractor_->Transform(ExtractUserKey(k)))) { + PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); + valid_ = false; + return; + } else { + PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); + } + } + iter_->Seek(k, nullptr); + valid_ = iter_->Valid(); + if (!Valid()) { + SeekToLast(); + } + while (Valid() && comparator_.comparator.Compare(k, key()) < 0) { + Prev(); + } + } + virtual void SeekToFirst() override { + iter_->SeekToFirst(); + valid_ = iter_->Valid(); + } + virtual void SeekToLast() override { + iter_->SeekToLast(); + valid_ = iter_->Valid(); + } + virtual void Next() override { + PERF_COUNTER_ADD(next_on_memtable_count, 1); + assert(Valid()); + iter_->Next(); + valid_ = iter_->Valid(); + } + virtual void Prev() override { + PERF_COUNTER_ADD(prev_on_memtable_count, 1); + assert(Valid()); + iter_->Prev(); + valid_ = iter_->Valid(); + } + virtual Slice key() const override { + assert(Valid()); + return GetLengthPrefixedSlice(iter_->key()); + } + virtual Slice value() const override { + assert(Valid()); + Slice key_slice = GetLengthPrefixedSlice(iter_->key()); + return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); + } + + virtual Status status() const override { return Status::OK(); } + + virtual bool IsKeyPinned() const override { + // memtable data is always pinned + return true; + } + + virtual bool IsValuePinned() const override { + // memtable value is always pinned, except if we allow inplace update. + return value_pinned_; + } + + private: + DynamicBloom* bloom_; + const SliceTransform* const prefix_extractor_; + const MemTable::KeyComparator comparator_; + MemTableRep::Iterator* iter_; + bool valid_; + bool arena_mode_; + bool value_pinned_; + + // No copying allowed + MemTableIterator(const MemTableIterator&); + void operator=(const MemTableIterator&); +}; + +InternalIterator* MemTable::NewIterator(const ReadOptions& read_options, + Arena* arena) { + assert(arena != nullptr); + auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); + return new (mem) MemTableIterator(*this, read_options, arena); +} + +InternalIterator* MemTable::NewRangeTombstoneIterator( + const ReadOptions& read_options) { + if (read_options.ignore_range_deletions || is_range_del_table_empty_) { + return nullptr; + } + return new MemTableIterator(*this, read_options, nullptr /* arena */, + true /* use_range_del_table */); +} + +port::RWMutex* MemTable::GetLock(const Slice& key) { + static murmur_hash hash; + return &locks_[hash(key) % locks_.size()]; +} + +MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey, + const Slice& end_ikey) { + uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey); + entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey); + if (entry_count == 0) { + return {0, 0}; + } + uint64_t n = num_entries_.load(std::memory_order_relaxed); + if (n == 0) { + return {0, 0}; + } + if (entry_count > n) { + // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can + // be larger than actual entries we have. Cap it to entries we have to limit + // the inaccuracy. + entry_count = n; + } + uint64_t data_size = data_size_.load(std::memory_order_relaxed); + return {entry_count * (data_size / n), entry_count}; +} + +void MemTable::Add(SequenceNumber s, ValueType type, + const Slice& key, /* user key */ + const Slice& value, bool allow_concurrent, + MemTablePostProcessInfo* post_process_info) { + // Format of an entry is concatenation of: + // key_size : varint32 of internal_key.size() + // key bytes : char[internal_key.size()] + // value_size : varint32 of value.size() + // value bytes : char[value.size()] + uint32_t key_size = static_cast<uint32_t>(key.size()); + uint32_t val_size = static_cast<uint32_t>(value.size()); + uint32_t internal_key_size = key_size + 8; + const uint32_t encoded_len = VarintLength(internal_key_size) + + internal_key_size + VarintLength(val_size) + + val_size; + char* buf = nullptr; + std::unique_ptr<MemTableRep>& table = + type == kTypeRangeDeletion ? range_del_table_ : table_; + KeyHandle handle = table->Allocate(encoded_len, &buf); + + char* p = EncodeVarint32(buf, internal_key_size); + memcpy(p, key.data(), key_size); + Slice key_slice(p, key_size); + p += key_size; + uint64_t packed = PackSequenceAndType(s, type); + EncodeFixed64(p, packed); + p += 8; + p = EncodeVarint32(p, val_size); + memcpy(p, value.data(), val_size); + assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len); + if (!allow_concurrent) { + // Extract prefix for insert with hint. + if (insert_with_hint_prefix_extractor_ != nullptr && + insert_with_hint_prefix_extractor_->InDomain(key_slice)) { + Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); + table->InsertWithHint(handle, &insert_hints_[prefix]); + } else { + table->Insert(handle); + } + + // this is a bit ugly, but is the way to avoid locked instructions + // when incrementing an atomic + num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, + std::memory_order_relaxed); + if (type == kTypeDeletion) { + num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + } + + if (prefix_bloom_) { + assert(prefix_extractor_); + prefix_bloom_->Add(prefix_extractor_->Transform(key)); + } + + // The first sequence number inserted into the memtable + assert(first_seqno_ == 0 || s > first_seqno_); + if (first_seqno_ == 0) { + first_seqno_.store(s, std::memory_order_relaxed); + + if (earliest_seqno_ == kMaxSequenceNumber) { + earliest_seqno_.store(GetFirstSequenceNumber(), + std::memory_order_relaxed); + } + assert(first_seqno_.load() >= earliest_seqno_.load()); + } + assert(post_process_info == nullptr); + UpdateFlushState(); + } else { + table->InsertConcurrently(handle); + + assert(post_process_info != nullptr); + post_process_info->num_entries++; + post_process_info->data_size += encoded_len; + if (type == kTypeDeletion) { + post_process_info->num_deletes++; + } + + if (prefix_bloom_) { + assert(prefix_extractor_); + prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key)); + } + + // atomically update first_seqno_ and earliest_seqno_. + uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed); + while ((cur_seq_num == 0 || s < cur_seq_num) && + !first_seqno_.compare_exchange_weak(cur_seq_num, s)) { + } + uint64_t cur_earliest_seqno = + earliest_seqno_.load(std::memory_order_relaxed); + while ( + (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) && + !first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) { + } + } + if (is_range_del_table_empty_ && type == kTypeRangeDeletion) { + is_range_del_table_empty_ = false; + } +} + +// Callback from MemTable::Get() +namespace { + +struct Saver { + Status* status; + const LookupKey* key; + bool* found_final_value; // Is value set correctly? Used by KeyMayExist + bool* merge_in_progress; + std::string* value; + SequenceNumber seq; + const MergeOperator* merge_operator; + // the merge operations encountered; + MergeContext* merge_context; + RangeDelAggregator* range_del_agg; + MemTable* mem; + Logger* logger; + Statistics* statistics; + bool inplace_update_support; + Env* env_; +}; +} // namespace + +static bool SaveValue(void* arg, const char* entry) { + Saver* s = reinterpret_cast<Saver*>(arg); + MergeContext* merge_context = s->merge_context; + RangeDelAggregator* range_del_agg = s->range_del_agg; + const MergeOperator* merge_operator = s->merge_operator; + + assert(s != nullptr && merge_context != nullptr && range_del_agg != nullptr); + + // entry format is: + // klength varint32 + // userkey char[klength-8] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (s->mem->GetInternalKeyComparator().user_comparator()->Equal( + Slice(key_ptr, key_length - 8), s->key->user_key())) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + ValueType type; + UnPackSequenceAndType(tag, &s->seq, &type); + + if ((type == kTypeValue || type == kTypeMerge) && + range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) { + type = kTypeRangeDeletion; + } + switch (type) { + case kTypeValue: { + if (s->inplace_update_support) { + s->mem->GetLock(s->key->user_key())->ReadLock(); + } + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + *(s->status) = Status::OK(); + if (*(s->merge_in_progress)) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), &v, + merge_context->GetOperands(), s->value, s->logger, s->statistics, + s->env_, nullptr /* result_operand */, true); + } else if (s->value != nullptr) { + s->value->assign(v.data(), v.size()); + } + if (s->inplace_update_support) { + s->mem->GetLock(s->key->user_key())->ReadUnlock(); + } + *(s->found_final_value) = true; + return false; + } + case kTypeDeletion: + case kTypeSingleDeletion: + case kTypeRangeDeletion: { + if (*(s->merge_in_progress)) { + *(s->status) = MergeHelper::TimedFullMerge( + merge_operator, s->key->user_key(), nullptr, + merge_context->GetOperands(), s->value, s->logger, s->statistics, + s->env_, nullptr /* result_operand */, true); + } else { + *(s->status) = Status::NotFound(); + } + *(s->found_final_value) = true; + return false; + } + case kTypeMerge: { + if (!merge_operator) { + *(s->status) = Status::InvalidArgument( + "merge_operator is not properly initialized."); + // Normally we continue the loop (return true) when we see a merge + // operand. But in case of an error, we should stop the loop + // immediately and pretend we have found the value to stop further + // seek. Otherwise, the later call will override this error status. + *(s->found_final_value) = true; + return false; + } + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + *(s->merge_in_progress) = true; + merge_context->PushOperand( + v, s->inplace_update_support == false /* operand_pinned */); + return true; + } + default: + assert(false); + return true; + } + } + + // s->state could be Corrupt, merge or notfound + return false; +} + +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, + MergeContext* merge_context, + RangeDelAggregator* range_del_agg, SequenceNumber* seq, + const ReadOptions& read_opts) { + // The sequence number is updated synchronously in version_set.h + if (IsEmpty()) { + // Avoiding recording stats for speed. + return false; + } + PERF_TIMER_GUARD(get_from_memtable_time); + + Slice user_key = key.user_key(); + bool found_final_value = false; + bool merge_in_progress = s->IsMergeInProgress(); + bool const may_contain = + nullptr == prefix_bloom_ + ? false + : prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key)); + if (prefix_bloom_ && !may_contain) { + // iter is null if prefix bloom says the key does not exist + PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); + *seq = kMaxSequenceNumber; + } else { + if (prefix_bloom_) { + PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); + } + std::unique_ptr<InternalIterator> range_del_iter( + NewRangeTombstoneIterator(read_opts)); + Status status = range_del_agg->AddTombstones(std::move(range_del_iter)); + if (!status.ok()) { + *s = status; + return false; + } + Saver saver; + saver.status = s; + saver.found_final_value = &found_final_value; + saver.merge_in_progress = &merge_in_progress; + saver.key = &key; + saver.value = value; + saver.seq = kMaxSequenceNumber; + saver.mem = this; + saver.merge_context = merge_context; + saver.range_del_agg = range_del_agg; + saver.merge_operator = moptions_.merge_operator; + saver.logger = moptions_.info_log; + saver.inplace_update_support = moptions_.inplace_update_support; + saver.statistics = moptions_.statistics; + saver.env_ = env_; + table_->Get(key, &saver, SaveValue); + + *seq = saver.seq; + } + + // No change to value, since we have not yet found a Put/Delete + if (!found_final_value && merge_in_progress) { + *s = Status::MergeInProgress(); + } + PERF_COUNTER_ADD(get_from_memtable_count, 1); + return found_final_value; +} + +void MemTable::Update(SequenceNumber seq, + const Slice& key, + const Slice& value) { + LookupKey lkey(key, seq); + Slice mem_key = lkey.memtable_key(); + + std::unique_ptr<MemTableRep::Iterator> iter( + table_->GetDynamicPrefixIterator()); + iter->Seek(lkey.internal_key(), mem_key.data()); + + if (iter->Valid()) { + // entry format is: + // key_length varint32 + // userkey char[klength-8] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + const char* entry = iter->key(); + uint32_t key_length = 0; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (comparator_.comparator.user_comparator()->Equal( + Slice(key_ptr, key_length - 8), lkey.user_key())) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + ValueType type; + SequenceNumber unused; + UnPackSequenceAndType(tag, &unused, &type); + if (type == kTypeValue) { + Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); + uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); + uint32_t new_size = static_cast<uint32_t>(value.size()); + + // Update value, if new value size <= previous value size + if (new_size <= prev_size) { + char* p = + EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size); + WriteLock wl(GetLock(lkey.user_key())); + memcpy(p, value.data(), value.size()); + assert((unsigned)((p + value.size()) - entry) == + (unsigned)(VarintLength(key_length) + key_length + + VarintLength(value.size()) + value.size())); + return; + } + } + } + } + + // key doesn't exist + Add(seq, kTypeValue, key, value); +} + +bool MemTable::UpdateCallback(SequenceNumber seq, + const Slice& key, + const Slice& delta) { + LookupKey lkey(key, seq); + Slice memkey = lkey.memtable_key(); + + std::unique_ptr<MemTableRep::Iterator> iter( + table_->GetDynamicPrefixIterator()); + iter->Seek(lkey.internal_key(), memkey.data()); + + if (iter->Valid()) { + // entry format is: + // key_length varint32 + // userkey char[klength-8] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + const char* entry = iter->key(); + uint32_t key_length = 0; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (comparator_.comparator.user_comparator()->Equal( + Slice(key_ptr, key_length - 8), lkey.user_key())) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + ValueType type; + uint64_t unused; + UnPackSequenceAndType(tag, &unused, &type); + switch (type) { + case kTypeValue: { + Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); + uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); + + char* prev_buffer = const_cast<char*>(prev_value.data()); + uint32_t new_prev_size = prev_size; + + std::string str_value; + WriteLock wl(GetLock(lkey.user_key())); + auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size, + delta, &str_value); + if (status == UpdateStatus::UPDATED_INPLACE) { + // Value already updated by callback. + assert(new_prev_size <= prev_size); + if (new_prev_size < prev_size) { + // overwrite the new prev_size + char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length, + new_prev_size); + if (VarintLength(new_prev_size) < VarintLength(prev_size)) { + // shift the value buffer as well. + memcpy(p, prev_buffer, new_prev_size); + } + } + RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); + UpdateFlushState(); + return true; + } else if (status == UpdateStatus::UPDATED) { + Add(seq, kTypeValue, key, Slice(str_value)); + RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); + UpdateFlushState(); + return true; + } else if (status == UpdateStatus::UPDATE_FAILED) { + // No action required. Return. + UpdateFlushState(); + return true; + } + } + default: + break; + } + } + } + // If the latest value is not kTypeValue + // or key doesn't exist + return false; +} + +size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { + Slice memkey = key.memtable_key(); + + // A total ordered iterator is costly for some memtablerep (prefix aware + // reps). By passing in the user key, we allow efficient iterator creation. + // The iterator only needs to be ordered within the same user key. + std::unique_ptr<MemTableRep::Iterator> iter( + table_->GetDynamicPrefixIterator()); + iter->Seek(key.internal_key(), memkey.data()); + + size_t num_successive_merges = 0; + + for (; iter->Valid(); iter->Next()) { + const char* entry = iter->key(); + uint32_t key_length = 0; + const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (!comparator_.comparator.user_comparator()->Equal( + Slice(iter_key_ptr, key_length - 8), key.user_key())) { + break; + } + + const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8); + ValueType type; + uint64_t unused; + UnPackSequenceAndType(tag, &unused, &type); + if (type != kTypeMerge) { + break; + } + + ++num_successive_merges; + } + + return num_successive_merges; +} + +void MemTableRep::Get(const LookupKey& k, void* callback_args, + bool (*callback_func)(void* arg, const char* entry)) { + auto iter = GetDynamicPrefixIterator(); + for (iter->Seek(k.internal_key(), k.memtable_key().data()); + iter->Valid() && callback_func(callback_args, iter->key()); + iter->Next()) { + } +} + +void MemTable::RefLogContainingPrepSection(uint64_t log) { + assert(log > 0); + auto cur = min_prep_log_referenced_.load(); + while ((log < cur || cur == 0) && + !min_prep_log_referenced_.compare_exchange_strong(cur, log)) { + cur = min_prep_log_referenced_.load(); + } +} + +uint64_t MemTable::GetMinLogContainingPrepSection() { + return min_prep_log_referenced_.load(); +} + +} // namespace rocksdb
