http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pb_util.cc b/be/src/kudu/util/pb_util.cc new file mode 100644 index 0000000..a83c309 --- /dev/null +++ b/be/src/kudu/util/pb_util.cc @@ -0,0 +1,956 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Some portions copyright (C) 2008, Google, inc. +// +// Utilities for working with protobufs. +// Some of this code is cribbed from the protobuf source, +// but modified to work with kudu's 'faststring' instead of STL strings. + +#include "kudu/util/pb_util.h" + +#include <deque> +#include <initializer_list> +#include <memory> +#include <mutex> +#include <ostream> +#include <sstream> +#include <string> +#include <unordered_set> +#include <vector> + +#include <glog/logging.h> +#include <google/protobuf/descriptor.h> +#include <google/protobuf/descriptor.pb.h> +#include <google/protobuf/descriptor_database.h> +#include <google/protobuf/dynamic_message.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> +#include <google/protobuf/message.h> +#include <google/protobuf/message_lite.h> +#include <google/protobuf/text_format.h> + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/callback.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/escaping.h" +#include "kudu/gutil/strings/fastmem.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/coding-inl.h" +#include "kudu/util/coding.h" +#include "kudu/util/crc.h" +#include "kudu/util/debug/sanitizer_scopes.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/env.h" +#include "kudu/util/env_util.h" +#include "kudu/util/jsonwriter.h" +#include "kudu/util/logging.h" +#include "kudu/util/mutex.h" +#include "kudu/util/path_util.h" +#include "kudu/util/pb_util-internal.h" +#include "kudu/util/pb_util.pb.h" +#include "kudu/util/status.h" + +using google::protobuf::Descriptor; +using google::protobuf::DescriptorPool; +using google::protobuf::DynamicMessageFactory; +using google::protobuf::FieldDescriptor; +using google::protobuf::FileDescriptor; +using google::protobuf::FileDescriptorProto; +using google::protobuf::FileDescriptorSet; +using google::protobuf::io::ArrayInputStream; +using google::protobuf::io::CodedInputStream; +using google::protobuf::Message; +using google::protobuf::MessageLite; +using google::protobuf::Reflection; +using google::protobuf::SimpleDescriptorDatabase; +using google::protobuf::TextFormat; +using kudu::crc::Crc; +using kudu::pb_util::internal::SequentialFileFileInputStream; +using kudu::pb_util::internal::WritableFileOutputStream; +using std::deque; +using std::endl; +using std::initializer_list; +using std::ostream; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::unordered_set; +using std::vector; +using strings::Substitute; +using strings::Utf8SafeCEscape; + +namespace std { + +// Allow the use of FileState with DCHECK_EQ. +std::ostream& operator<< (std::ostream& os, const kudu::pb_util::FileState& state) { + os << static_cast<int>(state); + return os; +} + +} // namespace std + +namespace kudu { +namespace pb_util { + +static const char* const kTmpTemplateSuffix = ".XXXXXX"; + +// Protobuf container constants. +static const uint32_t kPBContainerInvalidVersion = 0; +static const uint32_t kPBContainerDefaultVersion = 2; +static const int kPBContainerChecksumLen = sizeof(uint32_t); +static const char kPBContainerMagic[] = "kuducntr"; +static const int kPBContainerMagicLen = 8; +static const int kPBContainerV1HeaderLen = + kPBContainerMagicLen + sizeof(uint32_t); // Magic number + version. +static const int kPBContainerV2HeaderLen = + kPBContainerV1HeaderLen + kPBContainerChecksumLen; // Same as V1 plus a checksum. + +const int kPBContainerMinimumValidLength = kPBContainerV1HeaderLen; + +static_assert(arraysize(kPBContainerMagic) - 1 == kPBContainerMagicLen, + "kPBContainerMagic does not match expected length"); + +namespace { + +// When serializing, we first compute the byte size, then serialize the message. +// If serialization produces a different number of bytes than expected, we +// call this function, which crashes. The problem could be due to a bug in the +// protobuf implementation but is more likely caused by concurrent modification +// of the message. This function attempts to distinguish between the two and +// provide a useful error message. +void ByteSizeConsistencyError(int byte_size_before_serialization, + int byte_size_after_serialization, + int bytes_produced_by_serialization) { + CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization) + << "Protocol message was modified concurrently during serialization."; + CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization) + << "Byte size calculation and serialization were inconsistent. This " + "may indicate a bug in protocol buffers or it may be caused by " + "concurrent modification of the message."; + LOG(FATAL) << "This shouldn't be called if all the sizes are equal."; +} + +string InitializationErrorMessage(const char* action, + const MessageLite& message) { + // Note: We want to avoid depending on strutil in the lite library, otherwise + // we'd use: + // + // return strings::Substitute( + // "Can't $0 message of type \"$1\" because it is missing required " + // "fields: $2", + // action, message.GetTypeName(), + // message.InitializationErrorString()); + + string result; + result += "Can't "; + result += action; + result += " message of type \""; + result += message.GetTypeName(); + result += "\" because it is missing required fields: "; + result += message.InitializationErrorString(); + return result; +} + +// Returns true iff the specified protobuf container file version is supported +// by this implementation. +bool IsSupportedContainerVersion(uint32_t version) { + if (version == 1 || version == 2) { + return true; + } + return false; +} + +// Reads exactly 'length' bytes from the container file into 'scratch', +// validating that there is sufficient data in the file to read this length +// before attempting to do so, and validating that it has read that length +// after performing the read. +// +// If the file size is less than the requested size of the read, returns +// Status::Incomplete. +// If there is an unexpected short read, returns Status::Corruption. +// +// A Slice of the bytes read into 'scratch' is returned in 'result'. +template<typename ReadableFileType> +Status ValidateAndReadData(ReadableFileType* reader, uint64_t file_size, + uint64_t* offset, uint64_t length, + Slice* result, unique_ptr<uint8_t[]>* scratch) { + // Validate the read length using the file size. + if (*offset + length > file_size) { + return Status::Incomplete("File size not large enough to be valid", + Substitute("Proto container file $0: " + "Tried to read $1 bytes at offset " + "$2 but file size is only $3 bytes", + reader->filename(), length, + *offset, file_size)); + } + + // Perform the read. + unique_ptr<uint8_t[]> local_scratch(new uint8_t[length]); + Slice s(local_scratch.get(), length); + RETURN_NOT_OK(reader->Read(*offset, &s)); + CHECK_EQ(length, s.size()) // Should never trigger due to contract with reader APIs. + << Substitute("Unexpected short read: Proto container file $0: Tried to read $1 bytes " + "but only read $2 bytes", + reader->filename(), length, s.size()); + + *offset += length; + *result = s; + scratch->swap(local_scratch); + return Status::OK(); +} + +// Helper macro for use with ParseAndCompareChecksum(). Example usage: +// RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { data }), +// CHECKSUM_ERR_MSG("Data checksum does not match", filename, offset)); +#define CHECKSUM_ERR_MSG(prefix, filename, cksum_offset) \ + Substitute("$0: Incorrect checksum in file $1 at offset $2", prefix, filename, cksum_offset) + +// Parses a checksum from the specified buffer and compares it to the bytes +// given in 'slices' by calculating a rolling CRC32 checksum of the bytes in +// the 'slices'. +// If they match, returns OK. Otherwise, returns Status::Corruption. +Status ParseAndCompareChecksum(const uint8_t* checksum_buf, + const initializer_list<Slice>& slices) { + uint32_t written_checksum = DecodeFixed32(checksum_buf); + uint64_t actual_checksum = 0; + Crc* crc32c = crc::GetCrc32cInstance(); + for (Slice s : slices) { + crc32c->Compute(s.data(), s.size(), &actual_checksum); + } + if (PREDICT_FALSE(actual_checksum != written_checksum)) { + return Status::Corruption(Substitute("Checksum does not match. Expected: $0. Actual: $1", + written_checksum, actual_checksum)); + } + return Status::OK(); +} + +// Read and parse a message of the specified format at the given offset in the +// format documented in pb_util.h. 'offset' is an in-out parameter and will be +// updated with the new offset on success. On failure, 'offset' is not modified. +template<typename ReadableFileType> +Status ReadPBStartingAt(ReadableFileType* reader, int version, uint64_t* offset, Message* msg) { + uint64_t tmp_offset = *offset; + VLOG(1) << "Reading PB with version " << version << " starting at offset " << *offset; + + uint64_t file_size; + RETURN_NOT_OK(reader->Size(&file_size)); + if (tmp_offset == file_size) { + return Status::EndOfFile("Reached end of file"); + } + + // Read the data length from the file. + // Version 2+ includes a checksum for the length field. + uint64_t length_buflen = (version == 1) ? sizeof(uint32_t) + : sizeof(uint32_t) + kPBContainerChecksumLen; + Slice len_and_cksum_slice; + unique_ptr<uint8_t[]> length_scratch; + RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, length_buflen, + &len_and_cksum_slice, &length_scratch), + Substitute("Could not read data length from proto container file $0 " + "at offset $1", reader->filename(), *offset)); + Slice length(len_and_cksum_slice.data(), sizeof(uint32_t)); + + // Versions >= 2 have an individual checksum for the data length. + if (version >= 2) { + Slice length_checksum(len_and_cksum_slice.data() + sizeof(uint32_t), kPBContainerChecksumLen); + RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(length_checksum.data(), { length }), + CHECKSUM_ERR_MSG("Data length checksum does not match", + reader->filename(), tmp_offset - kPBContainerChecksumLen)); + } + uint32_t data_length = DecodeFixed32(length.data()); + + // Read body and checksum into buffer for checksum & parsing. + uint64_t data_and_cksum_buflen = data_length + kPBContainerChecksumLen; + Slice body_and_cksum_slice; + unique_ptr<uint8_t[]> body_scratch; + RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, data_and_cksum_buflen, + &body_and_cksum_slice, &body_scratch), + Substitute("Could not read PB message data from proto container file $0 " + "at offset $1", + reader->filename(), tmp_offset)); + Slice body(body_and_cksum_slice.data(), data_length); + Slice record_checksum(body_and_cksum_slice.data() + data_length, kPBContainerChecksumLen); + + // Version 1 has a single checksum for length, body. + // Version 2+ has individual checksums for length and body, respectively. + if (version == 1) { + RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { length, body }), + CHECKSUM_ERR_MSG("Length and data checksum does not match", + reader->filename(), tmp_offset - kPBContainerChecksumLen)); + } else { + RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { body }), + CHECKSUM_ERR_MSG("Data checksum does not match", + reader->filename(), tmp_offset - kPBContainerChecksumLen)); + } + + // The checksum is correct. Time to decode the body. + // + // We could compare pb_type_ against msg.GetTypeName(), but: + // 1. pb_type_ is not available when reading the supplemental header, + // 2. ParseFromArray() should fail if the data cannot be parsed into the + // provided message type. + + // To permit parsing of very large PB messages, we must use parse through a + // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs + // say that 512MB is the shortest theoretical message length that may produce + // integer overflow warnings, so that's what we'll use. + ArrayInputStream ais(body.data(), body.size()); + CodedInputStream cis(&ais); + cis.SetTotalBytesLimit(512 * 1024 * 1024, -1); + if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) { + return Status::IOError("Unable to parse PB from path", reader->filename()); + } + + *offset = tmp_offset; + return Status::OK(); +} + +// Wrapper around ReadPBStartingAt() to enforce that we don't return +// Status::Incomplete() for V1 format files. +template<typename ReadableFileType> +Status ReadFullPB(ReadableFileType* reader, int version, uint64_t* offset, Message* msg) { + Status s = ReadPBStartingAt(reader, version, offset, msg); + if (PREDICT_FALSE(s.IsIncomplete() && version == 1)) { + return Status::Corruption("Unrecoverable incomplete record", s.ToString()); + } + return s; +} + +// Read and parse the protobuf container file-level header documented in pb_util.h. +template<typename ReadableFileType> +Status ParsePBFileHeader(ReadableFileType* reader, uint64_t* offset, int* version) { + uint64_t file_size; + RETURN_NOT_OK(reader->Size(&file_size)); + + // We initially read enough data for a V2+ file header. This optimizes for + // V2+ and is valid on a V1 file because we don't consider these files valid + // unless they contain a record in addition to the file header. The + // additional 4 bytes required by a V2+ header (vs V1) is still less than the + // minimum number of bytes required for a V1 format data record. + uint64_t tmp_offset = *offset; + Slice header; + unique_ptr<uint8_t[]> scratch; + RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, kPBContainerV2HeaderLen, + &header, &scratch), + Substitute("Could not read header for proto container file $0", + reader->filename())); + Slice magic_and_version(header.data(), kPBContainerMagicLen + sizeof(uint32_t)); + Slice checksum(header.data() + kPBContainerMagicLen + sizeof(uint32_t), kPBContainerChecksumLen); + + // Validate magic number. + if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) { + string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen); + return Status::Corruption("Invalid magic number", + Substitute("Expected: $0, found: $1", + Utf8SafeCEscape(kPBContainerMagic), + Utf8SafeCEscape(file_magic))); + } + + // Validate container file version. + uint32_t tmp_version = DecodeFixed32(header.data() + kPBContainerMagicLen); + if (PREDICT_FALSE(!IsSupportedContainerVersion(tmp_version))) { + return Status::NotSupported( + Substitute("Protobuf container has unsupported version: $0. Default version: $1", + tmp_version, kPBContainerDefaultVersion)); + } + + // Versions >= 2 have a checksum after the magic number and encoded version + // to ensure the integrity of these fields. + if (tmp_version >= 2) { + RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { magic_and_version }), + CHECKSUM_ERR_MSG("File header checksum does not match", + reader->filename(), tmp_offset - kPBContainerChecksumLen)); + } else { + // Version 1 doesn't have a header checksum. Rewind our read offset so this + // data will be read again when we next attempt to read a data record. + tmp_offset -= kPBContainerChecksumLen; + } + + *offset = tmp_offset; + *version = tmp_version; + return Status::OK(); +} + +// Read and parse the supplemental header from the container file. +template<typename ReadableFileType> +Status ReadSupplementalHeader(ReadableFileType* reader, int version, uint64_t* offset, + ContainerSupHeaderPB* sup_header) { + RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, offset, sup_header), + Substitute("Could not read supplemental header from proto container file $0 " + "with version $1 at offset $2", + reader->filename(), version, *offset)); + return Status::OK(); +} + +} // anonymous namespace + +void AppendToString(const MessageLite &msg, faststring *output) { + DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); + AppendPartialToString(msg, output); +} + +void AppendPartialToString(const MessageLite &msg, faststring* output) { + size_t old_size = output->size(); + int byte_size = msg.ByteSize(); + // Messages >2G cannot be serialized due to overflow computing ByteSize. + DCHECK_GE(byte_size, 0) << "Error computing ByteSize"; + + output->resize(old_size + static_cast<size_t>(byte_size)); + + uint8* start = &((*output)[old_size]); + uint8* end = msg.SerializeWithCachedSizesToArray(start); + if (end - start != byte_size) { + ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start); + } +} + +void SerializeToString(const MessageLite &msg, faststring *output) { + output->clear(); + AppendToString(msg, output); +} + +Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) { + SequentialFileFileInputStream istream(rfile); + if (!msg->ParseFromZeroCopyStream(&istream)) { + RETURN_NOT_OK(istream.status()); + + // If it's not a file IO error then it's a parsing error. + // Probably, we read wrong or damaged data here. + return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg)); + } + return Status::OK(); +} + +Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length) { + if (!msg->ParseFromArray(data, length)) { + return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg)); + } + return Status::OK(); +} + +Status WritePBToPath(Env* env, const std::string& path, + const MessageLite& msg, + SyncMode sync) { + const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix; + string tmp_path; + + unique_ptr<WritableFile> file; + RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file)); + env_util::ScopedFileDeleter tmp_deleter(env, tmp_path); + + WritableFileOutputStream ostream(file.get()); + bool res = msg.SerializeToZeroCopyStream(&ostream); + if (!res || !ostream.Flush()) { + return Status::IOError("Unable to serialize PB to file"); + } + + if (sync == pb_util::SYNC) { + RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path); + } + RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path); + RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path); + tmp_deleter.Cancel(); + if (sync == pb_util::SYNC) { + RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path); + } + return Status::OK(); +} + +Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) { + shared_ptr<SequentialFile> rfile; + RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile)); + RETURN_NOT_OK(ParseFromSequentialFile(msg, rfile.get())); + return Status::OK(); +} + +static void TruncateString(string* s, int max_len) { + if (s->size() > max_len) { + s->resize(max_len); + s->append("<truncated>"); + } +} + +void TruncateFields(Message* message, int max_len) { + const Reflection* reflection = message->GetReflection(); + vector<const FieldDescriptor*> fields; + reflection->ListFields(*message, &fields); + for (const FieldDescriptor* field : fields) { + if (field->is_repeated()) { + for (int i = 0; i < reflection->FieldSize(*message, field); i++) { + switch (field->cpp_type()) { + case FieldDescriptor::CPPTYPE_STRING: { + const string& s_const = reflection->GetRepeatedStringReference(*message, field, i, + nullptr); + TruncateString(const_cast<string*>(&s_const), max_len); + break; + } + case FieldDescriptor::CPPTYPE_MESSAGE: { + TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len); + break; + } + default: + break; + } + } + } else { + switch (field->cpp_type()) { + case FieldDescriptor::CPPTYPE_STRING: { + const string& s_const = reflection->GetStringReference(*message, field, nullptr); + TruncateString(const_cast<string*>(&s_const), max_len); + break; + } + case FieldDescriptor::CPPTYPE_MESSAGE: { + TruncateFields(reflection->MutableMessage(message, field), max_len); + break; + } + default: + break; + } + } + } +} + +namespace { +class SecureFieldPrinter : public TextFormat::FieldValuePrinter { + public: + using super = TextFormat::FieldValuePrinter; + + string PrintFieldName(const Message& message, + const Reflection* reflection, + const FieldDescriptor* field) const override { + hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING && + field->options().GetExtension(REDACT); + return super::PrintFieldName(message, reflection, field); + } + + string PrintString(const string& val) const override { + if (hide_next_string_) { + hide_next_string_ = false; + return KUDU_REDACT(super::PrintString(val)); + } + return super::PrintString(val); + } + string PrintBytes(const string& val) const override { + if (hide_next_string_) { + hide_next_string_ = false; + return KUDU_REDACT(super::PrintBytes(val)); + } + return super::PrintBytes(val); + } + + mutable bool hide_next_string_ = false; +}; +} // anonymous namespace + +string SecureDebugString(const Message& msg) { + string debug_string; + TextFormat::Printer printer; + printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter()); + printer.PrintToString(msg, &debug_string); + return debug_string; +} + +string SecureShortDebugString(const Message& msg) { + string debug_string; + + TextFormat::Printer printer; + printer.SetSingleLineMode(true); + printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter()); + + printer.PrintToString(msg, &debug_string); + // Single line mode currently might have an extra space at the end. + if (!debug_string.empty() && + debug_string[debug_string.size() - 1] == ' ') { + debug_string.resize(debug_string.size() - 1); + } + + return debug_string; +} + + +WritablePBContainerFile::WritablePBContainerFile(shared_ptr<RWFile> writer) + : state_(FileState::NOT_INITIALIZED), + offset_(0), + version_(kPBContainerDefaultVersion), + writer_(std::move(writer)) { +} + +WritablePBContainerFile::~WritablePBContainerFile() { + WARN_NOT_OK(Close(), "Could not Close() when destroying file"); +} + +Status WritablePBContainerFile::SetVersionForTests(int version) { + DCHECK_EQ(FileState::NOT_INITIALIZED, state_); + if (!IsSupportedContainerVersion(version)) { + return Status::NotSupported(Substitute("Version $0 is not supported", version)); + } + version_ = version; + return Status::OK(); +} + +Status WritablePBContainerFile::CreateNew(const Message& msg) { + DCHECK_EQ(FileState::NOT_INITIALIZED, state_); + + const uint64_t kHeaderLen = (version_ == 1) ? kPBContainerV1HeaderLen + : kPBContainerV1HeaderLen + kPBContainerChecksumLen; + + faststring buf; + buf.resize(kHeaderLen); + + // Serialize the magic. + strings::memcpy_inlined(buf.data(), kPBContainerMagic, kPBContainerMagicLen); + uint64_t offset = kPBContainerMagicLen; + + // Serialize the version. + InlineEncodeFixed32(buf.data() + offset, version_); + offset += sizeof(uint32_t); + DCHECK_EQ(kPBContainerV1HeaderLen, offset) + << "Serialized unexpected number of total bytes"; + + // Versions >= 2: Checksum the magic and version. + if (version_ >= 2) { + uint32_t header_checksum = crc::Crc32c(buf.data(), offset); + InlineEncodeFixed32(buf.data() + offset, header_checksum); + offset += sizeof(uint32_t); + } + + // Serialize the supplemental header. + ContainerSupHeaderPB sup_header; + PopulateDescriptorSet(msg.GetDescriptor()->file(), + sup_header.mutable_protos()); + sup_header.set_pb_type(msg.GetTypeName()); + RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf), + "Failed to prepare supplemental header for writing"); + + // Write the serialized buffer to the file. + RETURN_NOT_OK_PREPEND(AppendBytes(buf), + "Failed to append header to file"); + state_ = FileState::OPEN; + return Status::OK(); +} + +Status WritablePBContainerFile::OpenExisting() { + DCHECK_EQ(FileState::NOT_INITIALIZED, state_); + RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &offset_, &version_)); + ContainerSupHeaderPB sup_header; + RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &offset_, &sup_header)); + RETURN_NOT_OK(writer_->Size(&offset_)); // Reset the write offset to the end of the file. + state_ = FileState::OPEN; + return Status::OK(); +} + +Status WritablePBContainerFile::AppendBytes(const Slice& data) { + std::lock_guard<Mutex> l(offset_lock_); + RETURN_NOT_OK(writer_->Write(offset_, data)); + offset_ += data.size(); + return Status::OK(); +} + +Status WritablePBContainerFile::Append(const Message& msg) { + DCHECK_EQ(FileState::OPEN, state_); + + faststring buf; + RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf), + "Failed to prepare buffer for writing"); + RETURN_NOT_OK_PREPEND(AppendBytes(buf), "Failed to append data to file"); + + return Status::OK(); +} + +Status WritablePBContainerFile::Flush() { + DCHECK_EQ(FileState::OPEN, state_); + + // TODO: Flush just the dirty bytes. + RETURN_NOT_OK_PREPEND(writer_->Flush(RWFile::FLUSH_ASYNC, 0, 0), "Failed to Flush() file"); + + return Status::OK(); +} + +Status WritablePBContainerFile::Sync() { + DCHECK_EQ(FileState::OPEN, state_); + + RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file"); + + return Status::OK(); +} + +Status WritablePBContainerFile::Close() { + if (state_ != FileState::CLOSED) { + state_ = FileState::CLOSED; + Status s = writer_->Close(); + writer_.reset(); + RETURN_NOT_OK_PREPEND(s, "Failed to Close() file"); + } + return Status::OK(); +} + +const string& WritablePBContainerFile::filename() const { + return writer_->filename(); +} + +Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) { + DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg); + int data_len = msg.ByteSize(); + // Messages >2G cannot be serialized due to overflow computing ByteSize. + DCHECK_GE(data_len, 0) << "Error computing ByteSize"; + uint64_t record_buflen = sizeof(uint32_t) + data_len + sizeof(uint32_t); + if (version_ >= 2) { + record_buflen += sizeof(uint32_t); // Additional checksum just for the length. + } + + // Grow the buffer to hold the new data. + uint64_t record_offset = buf->size(); + buf->resize(record_offset + record_buflen); + uint8_t* dst = buf->data() + record_offset; + + // Serialize the data length. + size_t cur_offset = 0; + InlineEncodeFixed32(dst + cur_offset, static_cast<uint32_t>(data_len)); + cur_offset += sizeof(uint32_t); + + // For version >= 2: Serialize the checksum of the data length. + if (version_ >= 2) { + uint32_t length_checksum = crc::Crc32c(&data_len, sizeof(data_len)); + InlineEncodeFixed32(dst + cur_offset, length_checksum); + cur_offset += sizeof(uint32_t); + } + + // Serialize the data. + uint64_t data_offset = cur_offset; + if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + cur_offset))) { + return Status::IOError("Failed to serialize PB to array"); + } + cur_offset += data_len; + + // Calculate and serialize the data checksum. + // For version 1, this is the checksum of the len + data. + // For version >= 2, this is only the checksum of the data. + uint32_t data_checksum; + if (version_ == 1) { + data_checksum = crc::Crc32c(dst, cur_offset); + } else { + data_checksum = crc::Crc32c(dst + data_offset, data_len); + } + InlineEncodeFixed32(dst + cur_offset, data_checksum); + cur_offset += sizeof(uint32_t); + + DCHECK_EQ(record_buflen, cur_offset) << "Serialized unexpected number of total bytes"; + return Status::OK(); +} + +void WritablePBContainerFile::PopulateDescriptorSet( + const FileDescriptor* desc, FileDescriptorSet* output) { + // Because we don't compile protobuf with TSAN enabled, copying the + // static PB descriptors in this function ends up triggering a lot of + // race reports. We suppress the reports, but TSAN still has to walk + // the stack, etc, and this function becomes very slow. So, we ignore + // TSAN here. + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + + FileDescriptorSet all_descs; + + // Tracks all schemas that have been added to 'unemitted' at one point + // or another. Is a superset of 'unemitted' and only ever grows. + unordered_set<const FileDescriptor*> processed; + + // Tracks all remaining unemitted schemas. + deque<const FileDescriptor*> unemitted; + + InsertOrDie(&processed, desc); + unemitted.push_front(desc); + while (!unemitted.empty()) { + const FileDescriptor* proto = unemitted.front(); + + // The current schema is emitted iff we've processed (i.e. emitted) all + // of its dependencies. + bool emit = true; + for (int i = 0; i < proto->dependency_count(); i++) { + const FileDescriptor* dep = proto->dependency(i); + if (InsertIfNotPresent(&processed, dep)) { + unemitted.push_front(dep); + emit = false; + } + } + if (emit) { + unemitted.pop_front(); + proto->CopyTo(all_descs.mutable_file()->Add()); + } + } + all_descs.Swap(output); +} + +ReadablePBContainerFile::ReadablePBContainerFile(shared_ptr<RandomAccessFile> reader) + : state_(FileState::NOT_INITIALIZED), + version_(kPBContainerInvalidVersion), + offset_(0), + reader_(std::move(reader)) { +} + +ReadablePBContainerFile::~ReadablePBContainerFile() { + Close(); +} + +Status ReadablePBContainerFile::Open() { + DCHECK_EQ(FileState::NOT_INITIALIZED, state_); + RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &offset_, &version_)); + ContainerSupHeaderPB sup_header; + RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &offset_, &sup_header)); + protos_.reset(sup_header.release_protos()); + pb_type_ = sup_header.pb_type(); + state_ = FileState::OPEN; + return Status::OK(); +} + +Status ReadablePBContainerFile::ReadNextPB(Message* msg) { + DCHECK_EQ(FileState::OPEN, state_); + return ReadFullPB(reader_.get(), version_, &offset_, msg); +} + +Status ReadablePBContainerFile::Dump(ostream* os, bool oneline) { + DCHECK_EQ(FileState::OPEN, state_); + + // Use the embedded protobuf information from the container file to + // create the appropriate kind of protobuf Message. + // + // Loading the schemas into a DescriptorDatabase (and not directly into + // a DescriptorPool) defers resolution until FindMessageTypeByName() + // below, allowing for schemas to be loaded in any order. + SimpleDescriptorDatabase db; + for (int i = 0; i < protos()->file_size(); i++) { + if (!db.Add(protos()->file(i))) { + return Status::Corruption("Descriptor not loaded", Substitute( + "Could not load descriptor for PB type $0 referenced in container file", + pb_type())); + } + } + DescriptorPool pool(&db); + const Descriptor* desc = pool.FindMessageTypeByName(pb_type()); + if (!desc) { + return Status::NotFound("Descriptor not found", Substitute( + "Could not find descriptor for PB type $0 referenced in container file", + pb_type())); + } + DynamicMessageFactory factory; + const Message* prototype = factory.GetPrototype(desc); + if (!prototype) { + return Status::NotSupported("Descriptor not supported", Substitute( + "Descriptor $0 referenced in container file not supported", + pb_type())); + } + unique_ptr<Message> msg(prototype->New()); + + // Dump each message in the container file. + int count = 0; + Status s; + for (s = ReadNextPB(msg.get()); + s.ok(); + s = ReadNextPB(msg.get())) { + if (oneline) { + *os << count++ << "\t" << SecureShortDebugString(*msg) << endl; + } else { + *os << "Message " << count << endl; + *os << "-------" << endl; + *os << SecureDebugString(*msg) << endl; + count++; + } + } + return s.IsEndOfFile() ? s.OK() : s; +} + +Status ReadablePBContainerFile::Close() { + state_ = FileState::CLOSED; + reader_.reset(); + return Status::OK(); +} + +int ReadablePBContainerFile::version() const { + DCHECK_EQ(FileState::OPEN, state_); + return version_; +} + +uint64_t ReadablePBContainerFile::offset() const { + DCHECK_EQ(FileState::OPEN, state_); + return offset_; +} + +Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) { + unique_ptr<RandomAccessFile> file; + RETURN_NOT_OK(env->NewRandomAccessFile(path, &file)); + + ReadablePBContainerFile pb_file(std::move(file)); + RETURN_NOT_OK(pb_file.Open()); + RETURN_NOT_OK(pb_file.ReadNextPB(msg)); + return pb_file.Close(); +} + +Status WritePBContainerToPath(Env* env, const std::string& path, + const Message& msg, + CreateMode create, + SyncMode sync) { + TRACE_EVENT2("io", "WritePBContainerToPath", + "path", path, + "msg_type", msg.GetTypeName()); + + if (create == NO_OVERWRITE && env->FileExists(path)) { + return Status::AlreadyPresent(Substitute("File $0 already exists", path)); + } + + const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix; + string tmp_path; + + unique_ptr<RWFile> file; + RETURN_NOT_OK(env->NewTempRWFile(RWFileOptions(), tmp_template, &tmp_path, &file)); + env_util::ScopedFileDeleter tmp_deleter(env, tmp_path); + + WritablePBContainerFile pb_file(std::move(file)); + RETURN_NOT_OK(pb_file.CreateNew(msg)); + RETURN_NOT_OK(pb_file.Append(msg)); + if (sync == pb_util::SYNC) { + RETURN_NOT_OK(pb_file.Sync()); + } + RETURN_NOT_OK(pb_file.Close()); + RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), + "Failed to rename tmp file to " + path); + tmp_deleter.Cancel(); + if (sync == pb_util::SYNC) { + RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), + "Failed to SyncDir() parent of " + path); + } + return Status::OK(); +} + + +scoped_refptr<debug::ConvertableToTraceFormat> PbTracer::TracePb(const Message& msg) { + return make_scoped_refptr(new PbTracer(msg)); +} + +PbTracer::PbTracer(const Message& msg) : msg_(msg.New()) { + msg_->CopyFrom(msg); +} + +void PbTracer::AppendAsTraceFormat(std::string* out) const { + pb_util::TruncateFields(msg_.get(), kMaxFieldLengthToTrace); + std::ostringstream ss; + JsonWriter jw(&ss, JsonWriter::COMPACT); + jw.Protobuf(*msg_); + out->append(ss.str()); +} + +} // namespace pb_util +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pb_util.h b/be/src/kudu/util/pb_util.h new file mode 100644 index 0000000..24119c9 --- /dev/null +++ b/be/src/kudu/util/pb_util.h @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Utilities for dealing with protocol buffers. +// These are mostly just functions similar to what are found in the protobuf +// library itself, but using kudu::faststring instances instead of STL strings. +#ifndef KUDU_UTIL_PB_UTIL_H +#define KUDU_UTIL_PB_UTIL_H + +#include <memory> +#include <string> + +#include <gtest/gtest_prod.h> + +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/faststring.h" +#include "kudu/util/mutex.h" + +namespace google { +namespace protobuf { +class FileDescriptor; +class FileDescriptorSet; +class MessageLite; +class Message; +} +} + +namespace kudu { + +class Env; +class RandomAccessFile; +class SequentialFile; +class Slice; +class Status; +class RWFile; + +namespace pb_util { + +using google::protobuf::MessageLite; + +enum SyncMode { + SYNC, + NO_SYNC +}; + +enum CreateMode { + OVERWRITE, + NO_OVERWRITE +}; + +enum class FileState { + NOT_INITIALIZED, + OPEN, + CLOSED +}; + +// The minimum valid length of a PBC file. +extern const int kPBContainerMinimumValidLength; + +// See MessageLite::AppendToString +void AppendToString(const MessageLite &msg, faststring *output); + +// See MessageLite::AppendPartialToString +void AppendPartialToString(const MessageLite &msg, faststring *output); + +// See MessageLite::SerializeToString. +void SerializeToString(const MessageLite &msg, faststring *output); + +// See MessageLite::ParseFromZeroCopyStream +Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile); + +// Similar to MessageLite::ParseFromArray, with the difference that it returns +// Status::Corruption() if the message could not be parsed. +Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length); + +// Load a protobuf from the given path. +Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg); + +// Serialize a protobuf to the given path. +// +// If SyncMode SYNC is provided, ensures the changes are made durable. +Status WritePBToPath(Env* env, const std::string& path, const MessageLite& msg, SyncMode sync); + +// Truncate any 'bytes' or 'string' fields of this message to max_len. +// The text "<truncated>" is appended to any such truncated fields. +void TruncateFields(google::protobuf::Message* message, int max_len); + +// Redaction-sensitive variant of Message::DebugString. +// +// For most protobufs, this has identical output to Message::DebugString. However, +// a field with string or binary type may be tagged with the 'kudu.REDACT' option, +// available by importing 'pb_util.proto'. When such a field is encountered by this +// method, its contents will be redacted using the 'KUDU_REDACT' macro as documented +// in kudu/util/logging.h. +std::string SecureDebugString(const google::protobuf::Message& msg); + +// Same as SecureDebugString() above, but equivalent to Message::ShortDebugString. +std::string SecureShortDebugString(const google::protobuf::Message& msg); + +// A protobuf "container" has the following format (all integers in +// little-endian byte order). +// +// <file header> +// <1 or more records> +// +// Note: There are two versions (version 1 and version 2) of the record format. +// Version 2 of the file format differs from version 1 in the following ways: +// +// * Version 2 has a file header checksum. +// * Version 2 has separate checksums for the record length and record data +// fields. +// +// File header format +// ------------------ +// +// Each protobuf container file contains a file header identifying the file. +// This includes: +// +// magic number: 8 byte string identifying the file format. +// +// Included so that we have a minimal guarantee that this file is of the +// type we expect and that we are not just reading garbage. +// +// container_version: 4 byte unsigned integer indicating the "version" of the +// container format. May be set to 1 or 2. +// +// Included so that this file format may be extended at some later date +// while maintaining backwards compatibility. +// +// file_header_checksum (version 2+ only): 4 byte unsigned integer with a CRC32C +// of the magic and version fields. +// +// Included so that we can validate the container version number. +// +// The remaining container fields are considered part of a "record". There may +// be 1 or more records in a valid protobuf container file. +// +// Record format +// ------------- +// +// data length: 4 byte unsigned integer indicating the size of the encoded data. +// +// Included because PB messages aren't self-delimiting, and thus +// writing a stream of messages to the same file requires +// delimiting each with its size. +// +// See https://developers.google.com/protocol-buffers/docs/techniques?hl=zh-cn#streaming +// for more details. +// +// length checksum (version 2+ only): 4-byte unsigned integer containing the +// CRC32C checksum of "data length". +// +// Included so that we may discern the difference between a truncated file +// and a corrupted length field. +// +// data: "size" bytes of protobuf data encoded according to the schema. +// +// Our payload. +// +// data checksum: 4 byte unsigned integer containing the CRC32C checksum of "data". +// +// Included to ensure validity of the data on-disk. +// Note: In version 1 of the file format, this is a checksum of both the +// "data length" and "data" fields. In version 2+, this is only a checksum +// of the "data" field. +// +// Supplemental header +// ------------------- +// +// A valid container must have at least one record, the first of +// which is known as the "supplemental header". The supplemental header +// contains additional container-level information, including the protobuf +// schema used for the records following it. See pb_util.proto for details. As +// a containerized PB message, the supplemental header is protected by a CRC32C +// checksum like any other message. +// +// Error detection and tolerance +// ----------------------------- +// +// It is worth describing the kinds of errors that can be detected by the +// protobuf container and the kinds that cannot. +// +// The checksums in the container are independent, not rolling. As such, +// they won't detect the disappearance or reordering of entire protobuf +// messages, which can happen if a range of the file is collapsed (see +// man fallocate(2)) or if the file is otherwise manually manipulated. +// +// In version 1, the checksums do not protect against corruption in the data +// length field. However, version 2 of the format resolves that problem. The +// benefit is that version 2 files can tell the difference between a record +// with a corrupted length field and a record that was only partially written. +// See ReadablePBContainerFile::ReadNextPB() for discussion on how this +// difference is expressed via the API. +// +// In version 1 of the format, corruption of the version field in the file +// header is not detectable. However, version 2 of the format addresses that +// limitation as well. +// +// Corruption of the protobuf data itself is detected in all versions of the +// file format (subject to CRC32 limitations). +// +// The container does not include footers or periodic checkpoints. As such, it +// will not detect if entire records are truncated. +// +// The design and implementation relies on data ordering guarantees provided by +// the file system to ensure that bytes are written to a file before the file +// metadata (file size) is updated. A partially-written record (the result of a +// failed append) is identified by one of the following criteria: +// 1. Too-few bytes remain in the file to constitute a valid record. For +// version 2, that would be fewer than 12 bytes (data len, data len +// checksum, and data checksum), or +// 2. Assuming a record's data length field is valid, then fewer bytes remain +// in the file than are specified in the data length field (plus enough for +// checksums). +// In the above scenarios, it is assumed that the system faulted while in the +// middle of appending a record, and it is considered safe to truncate the file +// at the beginning of the partial record. +// +// If filesystem preallocation is used (at the time of this writing, the +// implementation does not support preallocation) then even version 2 of the +// format cannot safely support culling trailing partially-written records. +// This is because it is not possible to reliably tell the difference between a +// partially-written record that did not complete fsync (resulting in a bad +// checksum) vs. a record that successfully was written to disk but then fell +// victim to bit-level disk corruption. See also KUDU-1414. +// +// These tradeoffs in error detection are reasonable given the failure +// environment that Kudu operates within. We tolerate failures such as +// "kill -9" of the Kudu process, machine power loss, or fsync/fdatasync +// failure, but not failures like runaway processes mangling data files +// in arbitrary ways or attackers crafting malicious data files. +// +// In short, no version of the file format will detect truncation of entire +// protobuf records. Version 2 relies on ordered data flushing semantics for +// automatic recoverability from partial record writes. Version 1 of the file +// format cannot support automatic recoverability from partial record writes. +// +// For further reading on what files might look like following a normal +// filesystem failure or disk corruption, and the likelihood of various types +// of disk errors, see the following papers: +// +// https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf +// https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf + +// Protobuf container file opened for writing. Can be built around an existing +// file or a completely new file. +// +// Every function is thread-safe unless indicated otherwise. +class WritablePBContainerFile { + public: + + // Initializes the class instance; writer must be open. + explicit WritablePBContainerFile(std::shared_ptr<RWFile> writer); + + // Closes the container if not already closed. + ~WritablePBContainerFile(); + + // Writes the file header to disk and initializes the write offset to the + // byte after the file header. This method should NOT be called when opening + // an existing file for append; use OpenExisting() for that. + // + // 'msg' need not be populated; its type is used to "lock" the container + // to a particular protobuf message type in Append(). + // + // Not thread-safe. + Status CreateNew(const google::protobuf::Message& msg); + + // Opens an existing protobuf container file for append. The file must + // already have a valid file header. To initialize a new blank file for + // writing, use CreateNew() instead. + // + // The file header is read and the version specified there is used as the + // format version. The length of the file is also read and is used as the + // write offset for subsequent Append() calls. WritablePBContainerFile caches + // the write offset instead of constantly calling stat() on the file each + // time append is called. + // + // Not thread-safe. + Status OpenExisting(); + + // Writes a protobuf message to the container, beginning with its size + // and ending with its CRC32 checksum. One of CreateNew() or OpenExisting() + // must be called prior to calling Append(), i.e. the file must be open. + Status Append(const google::protobuf::Message& msg); + + // Asynchronously flushes all dirty container data to the filesystem. + // The file must be open. + Status Flush(); + + // Synchronizes all dirty container data to the filesystem. + // The file must be open. + // + // Note: the parent directory is _not_ synchronized. Because the + // container file was provided during construction, we don't know whether + // it was created or reopened, and parent directory synchronization is + // only needed in the former case. + Status Sync(); + + // Closes the container. + // + // Not thread-safe. + Status Close(); + + // Returns the path to the container's underlying file handle. + const std::string& filename() const; + + private: + friend class TestPBUtil; + FRIEND_TEST(TestPBUtil, TestPopulateDescriptorSet); + + // Set the file format version. Only used for testing. + // Must be called before CreateNew(). + Status SetVersionForTests(int version); + + // Write the protobuf schemas belonging to 'desc' and all of its + // dependencies to 'output'. + // + // Schemas are written in dependency order (i.e. if A depends on B which + // depends on C, the order is C, B, A). + static void PopulateDescriptorSet(const google::protobuf::FileDescriptor* desc, + google::protobuf::FileDescriptorSet* output); + + // Serialize the contents of 'msg' into 'buf' along with additional metadata + // to aid in deserialization. + Status AppendMsgToBuffer(const google::protobuf::Message& msg, faststring* buf); + + // Append bytes to the file. + Status AppendBytes(const Slice& data); + + // State of the file. + FileState state_; + + // Protects offset_. + Mutex offset_lock_; + + // Current write offset into the file. + uint64_t offset_; + + // Protobuf container file version. + int version_; + + // File writer. + std::shared_ptr<RWFile> writer_; +}; + +// Protobuf container file opened for reading. +// +// Can be built around a file with existing contents or an empty file (in +// which case it's safe to interleave with WritablePBContainerFile). +class ReadablePBContainerFile { + public: + + // Initializes the class instance; reader must be open. + explicit ReadablePBContainerFile(std::shared_ptr<RandomAccessFile> reader); + + // Closes the file if not already closed. + ~ReadablePBContainerFile(); + + // Reads the header information from the container and validates it. + // Must be called before any of the other methods. + Status Open(); + + // Reads a protobuf message from the container, validating its size and + // data using a CRC32 checksum. File must be open. + // + // Return values: + // * If there are no more records in the file, returns Status::EndOfFile. + // * If there is a partial record, but it is not long enough to be a full + // record or the written length of the record is less than the remaining + // bytes in the file, returns Status::Incomplete. If Status::Incomplete + // is returned, calling offset() will return the point in the file where + // the invalid partial record begins. In order to append additional records + // to the file, the file must first be truncated at that offset. + // Note: Version 1 of this file format will never return + // Status::Incomplete() from this method. + // * If a corrupt record is encountered, returns Status::Corruption. + // * On success, stores the result in '*msg' and returns OK. + Status ReadNextPB(google::protobuf::Message* msg); + + // Dumps any unread protobuf messages in the container to 'os'. Each + // message's DebugString() method is invoked to produce its textual form. + // File must be open. + // + // If 'oneline' is true, prints each message on a single line. + Status Dump(std::ostream* os, bool oneline); + + // Closes the container. + Status Close(); + + // Expected PB type and schema for each message to be read. + // + // Only valid after a successful call to Open(). + const std::string& pb_type() const { return pb_type_; } + const google::protobuf::FileDescriptorSet* protos() const { + return protos_.get(); + } + + // Return the protobuf container file format version. + // File must be open. + int version() const; + + // Return current read offset. + // File must be open. + uint64_t offset() const; + + private: + FileState state_; + int version_; + uint64_t offset_; + + // The fully-qualified PB type name of the messages in the container. + std::string pb_type_; + + // Wrapped in a unique_ptr so that clients need not include PB headers. + std::unique_ptr<google::protobuf::FileDescriptorSet> protos_; + + std::shared_ptr<RandomAccessFile> reader_; +}; + +// Convenience functions for protobuf containers holding just one record. + +// Load a "containerized" protobuf from the given path. +// If the file does not exist, returns Status::NotFound(). Otherwise, may +// return other Status error codes such as Status::IOError. +Status ReadPBContainerFromPath(Env* env, const std::string& path, + google::protobuf::Message* msg); + +// Serialize a "containerized" protobuf to the given path. +// +// If create == NO_OVERWRITE and 'path' already exists, the function will fail. +// If sync == SYNC, the newly created file will be fsynced before returning. +Status WritePBContainerToPath(Env* env, const std::string& path, + const google::protobuf::Message& msg, + CreateMode create, + SyncMode sync); + +// Wrapper for a protobuf message which lazily converts to JSON when +// the trace buffer is dumped. +// +// When tracing, an instance of this class can be associated with +// a given trace, instead of a stringified PB, thus avoiding doing +// stringification inline and moving that work to the tracing process. +// +// Example usage: +// TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this, +// "response", pb_util::PbTracer::TracePb(*response_pb_), +// ...); +// +class PbTracer : public debug::ConvertableToTraceFormat { + public: + enum { + kMaxFieldLengthToTrace = 100 + }; + + // Static helper to be called when adding a stringified PB to a trace. + // This does not actually stringify 'msg', that will be done later + // when/if AppendAsTraceFormat() is called on the returned object. + static scoped_refptr<debug::ConvertableToTraceFormat> TracePb( + const google::protobuf::Message& msg); + + explicit PbTracer(const google::protobuf::Message& msg); + + // Actually stringifies the PB and appends the string to 'out'. + void AppendAsTraceFormat(std::string* out) const override; + private: + const std::unique_ptr<google::protobuf::Message> msg_; +}; + +} // namespace pb_util + +// TODO(todd) Replacing all Message::ToString call sites for KUDU-1812 +// is much easier if these are available in the 'kudu' namespace. We should +// consider removing these imports and move them to all call sites. +using pb_util::SecureDebugString; // NOLINT +using pb_util::SecureShortDebugString; // NOLINT + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pb_util.proto b/be/src/kudu/util/pb_util.proto new file mode 100644 index 0000000..b78c0cf --- /dev/null +++ b/be/src/kudu/util/pb_util.proto @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; +package kudu; + +option java_package = "org.apache.kudu"; + +import "google/protobuf/descriptor.proto"; + +// ============================================================================ +// Protobuf container metadata +// ============================================================================ + +// Supplemental protobuf container header, after the main header (see +// pb_util.h for details). +message ContainerSupHeaderPB { + // The protobuf schema for the messages expected in this container. + // + // This schema is complete, that is, it includes all of its dependencies + // (i.e. other schemas defined in .proto files imported by this schema's + // .proto file). + required google.protobuf.FileDescriptorSet protos = 1; + + // The PB message type expected in each data entry in this container. Must + // be fully qualified (i.e. kudu.tablet.TabletSuperBlockPB). + required string pb_type = 2; +} + +extend google.protobuf.FieldOptions { + optional bool REDACT = 50001 [default=false]; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/pb_util_test.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/pb_util_test.proto b/be/src/kudu/util/pb_util_test.proto new file mode 100644 index 0000000..bac0be0 --- /dev/null +++ b/be/src/kudu/util/pb_util_test.proto @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; +package kudu; + +import "kudu/util/pb_util.proto"; + +message TestSecurePrintingPB { + optional string insecure1 = 1; + optional string secure1 = 2 [(kudu.REDACT) = true]; + optional string insecure2 = 3; + optional string secure2 = 4 [(kudu.REDACT) = true]; + repeated string repeated_secure = 5 [(kudu.REDACT) = true]; + optional string insecure3 = 6; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/process_memory-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/process_memory-test.cc b/be/src/kudu/util/process_memory-test.cc new file mode 100644 index 0000000..38922f5 --- /dev/null +++ b/be/src/kudu/util/process_memory-test.cc @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <atomic> +#include <thread> +#include <vector> + +#include "kudu/util/monotime.h" +#include "kudu/util/process_memory.h" +#include "kudu/util/test_util.h" + +using std::atomic; +using std::thread; +using std::vector; + +namespace kudu { + +// Microbenchmark for our new/delete hooks which track process-wide +// memory consumption. +TEST(ProcessMemory, BenchmarkConsumptionTracking) { + const int kNumThreads = 200; + vector<thread> threads; + atomic<bool> done(false); + atomic<int64_t> total_count(0); + + // We start many threads, each of which performs 10:1 ratio of + // new/delete pairs to consumption lookups. The high number + // of threads highlights when there is contention on central + // tcmalloc locks. + for (int i = 0; i < kNumThreads; i++) { + threads.emplace_back([&]() { + int64_t local_count = 0; + while (!done) { + for (int a = 0; a < 10; a++) { + // Mark 'x' volatile so that the compiler does not optimize out the + // allocation. + char* volatile x = new char[8000]; + delete[] x; + } + process_memory::CurrentConsumption(); + local_count++; + } + total_count += local_count; + }); + } + double secs = 3; + SleepFor(MonoDelta::FromSeconds(secs)); + done = true; + + for (auto& t : threads) { + t.join(); + } + + LOG(INFO) << "Performed " << total_count / secs << " iters/sec"; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/process_memory.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/process_memory.cc b/be/src/kudu/util/process_memory.cc new file mode 100644 index 0000000..986e15a --- /dev/null +++ b/be/src/kudu/util/process_memory.cc @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <sys/resource.h> + +#include <gflags/gflags.h> +#include <gperftools/malloc_extension.h> + +#include "kudu/gutil/once.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/env.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/mem_tracker.h" +#include "kudu/util/process_memory.h" +#include "kudu/util/random.h" +#include "kudu/util/striped64.h" + +DEFINE_int64(memory_limit_hard_bytes, 0, + "Maximum amount of memory this daemon should use, in bytes. " + "A value of 0 autosizes based on the total system memory. " + "A value of -1 disables all memory limiting."); +TAG_FLAG(memory_limit_hard_bytes, stable); + +DEFINE_int32(memory_pressure_percentage, 60, + "Percentage of the hard memory limit that this daemon may " + "consume before flushing of in-memory data becomes prioritized."); +TAG_FLAG(memory_pressure_percentage, advanced); + +DEFINE_int32(memory_limit_soft_percentage, 80, + "Percentage of the hard memory limit that this daemon may " + "consume before memory throttling of writes begins. The greater " + "the excess, the higher the chance of throttling. In general, a " + "lower soft limit leads to smoother write latencies but " + "decreased throughput, and vice versa for a higher soft limit."); +TAG_FLAG(memory_limit_soft_percentage, advanced); + +DEFINE_int32(memory_limit_warn_threshold_percentage, 98, + "Percentage of the hard memory limit that this daemon may " + "consume before WARNING level messages are periodically logged."); +TAG_FLAG(memory_limit_warn_threshold_percentage, advanced); + +#ifdef TCMALLOC_ENABLED +DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10, + "Maximum percentage of the RSS that tcmalloc is allowed to use for " + "reserved but unallocated memory."); +TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced); +#endif + +using strings::Substitute; + +namespace kudu { +namespace process_memory { + +namespace { +int64_t g_hard_limit; +int64_t g_soft_limit; +int64_t g_pressure_threshold; + +ThreadSafeRandom* g_rand = nullptr; + +#ifdef TCMALLOC_ENABLED +// Total amount of memory released since the last GC. If this +// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc. +Atomic64 g_released_memory_since_gc; + +// Size, in bytes, that is considered a large value for Release() (or Consume() with +// a negative value). If tcmalloc is used, this can trigger it to GC. +// A higher value will make us call into tcmalloc less often (and therefore more +// efficient). A lower value will mean our memory overhead is lower. +// TODO(todd): this is a stopgap. +const int64_t GC_RELEASE_SIZE = 128 * 1024L * 1024L; + +#endif // TCMALLOC_ENABLED + +} // anonymous namespace + + +// Flag validation +// ------------------------------------------------------------ +// Validate that various flags are percentages. +static bool ValidatePercentage(const char* flagname, int value) { + if (value >= 0 && value <= 100) { + return true; + } + LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid", + flagname, value); + return false; +} + +static bool dummy[] = { + google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage), + google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage) +#ifdef TCMALLOC_ENABLED + ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage) +#endif +}; + + +// Wrappers around tcmalloc functionality +// ------------------------------------------------------------ +#ifdef TCMALLOC_ENABLED +static int64_t GetTCMallocProperty(const char* prop) { + size_t value; + if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) { + LOG(DFATAL) << "Failed to get tcmalloc property " << prop; + } + return value; +} + +int64_t GetTCMallocCurrentAllocatedBytes() { + return GetTCMallocProperty("generic.current_allocated_bytes"); +} + +void GcTcmalloc() { + TRACE_EVENT0("process", "GcTcmalloc"); + + // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but + // not in use). + int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes"); + // Bytes allocated by the application. + int64_t bytes_used = GetTCMallocCurrentAllocatedBytes(); + + int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0; + if (bytes_overhead > max_overhead) { + int64_t extra = bytes_overhead - max_overhead; + while (extra > 0) { + // Release 1MB at a time, so that tcmalloc releases its page heap lock + // allowing other threads to make progress. This still disrupts the current + // thread, but is better than disrupting all. + MallocExtension::instance()->ReleaseToSystem(1024 * 1024); + extra -= 1024 * 1024; + } + } +} +#endif // TCMALLOC_ENABLED + + +// Consumption and soft memory limit behavior +// ------------------------------------------------------------ +namespace { +void DoInitLimits() { + int64_t limit = FLAGS_memory_limit_hard_bytes; + if (limit == 0) { + // If no limit is provided, we'll use 80% of system RAM. + int64_t total_ram; + CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram)); + limit = total_ram * 4; + limit /= 5; + } + g_hard_limit = limit; + g_soft_limit = FLAGS_memory_limit_soft_percentage * g_hard_limit / 100; + g_pressure_threshold = FLAGS_memory_pressure_percentage * g_hard_limit / 100; + + g_rand = new ThreadSafeRandom(1); + + LOG(INFO) << StringPrintf("Process hard memory limit is %.6f GB", + (static_cast<float>(g_hard_limit) / (1024.0 * 1024.0 * 1024.0))); + LOG(INFO) << StringPrintf("Process soft memory limit is %.6f GB", + (static_cast<float>(g_soft_limit) / + (1024.0 * 1024.0 * 1024.0))); + LOG(INFO) << StringPrintf("Process memory pressure threshold is %.6f GB", + (static_cast<float>(g_pressure_threshold) / + (1024.0 * 1024.0 * 1024.0))); +} + +void InitLimits() { + static GoogleOnceType once; + GoogleOnceInit(&once, &DoInitLimits); +} + +} // anonymous namespace + +int64_t CurrentConsumption() { +#ifdef TCMALLOC_ENABLED + const int64_t kReadIntervalMicros = 50000; + static Atomic64 last_read_time = 0; + static simple_spinlock read_lock; + static Atomic64 consumption = 0; + uint64_t time = GetMonoTimeMicros(); + if (time > last_read_time + kReadIntervalMicros && read_lock.try_lock()) { + base::subtle::NoBarrier_Store(&consumption, GetTCMallocCurrentAllocatedBytes()); + // Re-fetch the time after getting the consumption. This way, in case fetching + // consumption is extremely slow for some reason (eg due to lots of contention + // in tcmalloc) we at least ensure that we wait at least another full interval + // before fetching the information again. + time = GetMonoTimeMicros(); + base::subtle::NoBarrier_Store(&last_read_time, time); + read_lock.unlock(); + } + + return base::subtle::NoBarrier_Load(&consumption); +#else + // Without tcmalloc, we have no reliable way of determining our own heap + // size (e.g. mallinfo doesn't work in ASAN builds). So, we'll fall back + // to just looking at the sum of our tracked memory. + return MemTracker::GetRootTracker()->consumption(); +#endif +} + +int64_t HardLimit() { + return g_hard_limit; +} + +bool UnderMemoryPressure(double* current_capacity_pct) { + InitLimits(); + int64_t consumption = CurrentConsumption(); + if (consumption < g_pressure_threshold) { + return false; + } + if (current_capacity_pct) { + *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100; + } + return true; +} + +bool SoftLimitExceeded(double* current_capacity_pct) { + InitLimits(); + int64_t consumption = CurrentConsumption(); + // Did we exceed the actual limit? + if (consumption > g_hard_limit) { + if (current_capacity_pct) { + *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100; + } + return true; + } + + // No soft limit defined. + if (g_hard_limit == g_soft_limit) { + return false; + } + + // Are we under the soft limit threshold? + if (consumption < g_soft_limit) { + return false; + } + + // We're over the threshold; were we randomly chosen to be over the soft limit? + if (consumption + g_rand->Uniform64(g_hard_limit - g_soft_limit) > g_hard_limit) { + if (current_capacity_pct) { + *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100; + } + return true; + } + return false; +} + +void MaybeGCAfterRelease(int64_t released_bytes) { +#ifdef TCMALLOC_ENABLED + int64_t now_released = base::subtle::NoBarrier_AtomicIncrement( + &g_released_memory_since_gc, -released_bytes); + if (PREDICT_FALSE(now_released > GC_RELEASE_SIZE)) { + base::subtle::NoBarrier_Store(&g_released_memory_since_gc, 0); + GcTcmalloc(); + } +#endif +} + +} // namespace process_memory +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/process_memory.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/process_memory.h b/be/src/kudu/util/process_memory.h new file mode 100644 index 0000000..1545dc7 --- /dev/null +++ b/be/src/kudu/util/process_memory.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> + +namespace kudu { +namespace process_memory { + +// Probabilistically returns true if the process-wide soft memory limit is exceeded. +// The greater the excess, the higher the chance that it returns true. +// +// If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage +// of the hard limit consumed is written to it. +bool SoftLimitExceeded(double* current_capacity_pct); + +// Return true if we are under memory pressure (i.e if we are nearing the point at which +// SoftLimitExceeded will begin to return true). +// +// If the process is under memory pressure, and 'current_capacity_pct' is not NULL, +// the percentage of the hard limit consumed is written to it. +bool UnderMemoryPressure(double* current_capacity_pct); + +// Potentially trigger a call to release tcmalloc memory back to the +// OS, after the given amount of memory was released. +void MaybeGCAfterRelease(int64_t released_bytes); + +// Return the total current memory consumption of the process. +int64_t CurrentConsumption(); + +// Return the configured hard limit for the process. +int64_t HardLimit(); + +#ifdef TCMALLOC_ENABLED +// Get the current amount of allocated memory, according to tcmalloc. +// +// This should be equal to CurrentConsumption(), but is made available so that tests +// can verify the correctness of CurrentConsumption(). +int64_t GetTCMallocCurrentAllocatedBytes(); +#endif + +} // namespace process_memory +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/promise.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/promise.h b/be/src/kudu/util/promise.h new file mode 100644 index 0000000..17f8cec --- /dev/null +++ b/be/src/kudu/util/promise.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_PROMISE_H +#define KUDU_UTIL_PROMISE_H + +#include "kudu/gutil/macros.h" +#include "kudu/util/countdown_latch.h" + +namespace kudu { + +// A promise boxes a value which is to be provided at some time in the future. +// A single producer calls Set(...), and any number of consumers can call Get() +// to retrieve the produced value. +// +// In Guava terms, this is a SettableFuture<T>. +template<typename T> +class Promise { + public: + Promise() : latch_(1) {} + ~Promise() {} + + // Reset the promise to be used again. + // For this to be safe, there must be some kind of external synchronization + // ensuring that no threads are still accessing the value from the previous + // incarnation of the promise. + void Reset() { + latch_.Reset(1); + val_ = T(); + } + + // Block until a value is available, and return a reference to it. + const T& Get() const { + latch_.Wait(); + return val_; + } + + // Wait for the promised value to become available with the given timeout. + // + // Returns NULL if the timeout elapses before a value is available. + // Otherwise returns a pointer to the value. This pointer's lifetime is + // tied to the lifetime of the Promise object. + const T* WaitFor(const MonoDelta& delta) const { + if (latch_.WaitFor(delta)) { + return &val_; + } else { + return NULL; + } + } + + // Set the value of this promise. + // This may be called at most once. + void Set(const T& val) { + DCHECK_EQ(latch_.count(), 1) << "Already set!"; + val_ = val; + latch_.CountDown(); + } + + private: + CountDownLatch latch_; + T val_; + DISALLOW_COPY_AND_ASSIGN(Promise); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_PROMISE_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/proto_container_test.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/proto_container_test.proto b/be/src/kudu/util/proto_container_test.proto new file mode 100644 index 0000000..4707c08 --- /dev/null +++ b/be/src/kudu/util/proto_container_test.proto @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; +package kudu; + +// Arbitrary protobuf to test writing a containerized protobuf. +message ProtoContainerTestPB { + required string name = 1; + required int32 value = 2; + optional string note = 3; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/proto_container_test2.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/proto_container_test2.proto b/be/src/kudu/util/proto_container_test2.proto new file mode 100644 index 0000000..74a1ea3 --- /dev/null +++ b/be/src/kudu/util/proto_container_test2.proto @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; +package kudu; + +// Dependency chain: +// +// this file --> proto_container_test.proto + +import "kudu/util/proto_container_test.proto"; + +// Arbitrary protobuf that has one PB dependency. +message ProtoContainerTest2PB { + required kudu.ProtoContainerTestPB record = 1; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/proto_container_test3.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/proto_container_test3.proto b/be/src/kudu/util/proto_container_test3.proto new file mode 100644 index 0000000..1ed1c31 --- /dev/null +++ b/be/src/kudu/util/proto_container_test3.proto @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; +package kudu; + +// Dependency chain: +// +// this file --> proto_container_test.proto +// --> proto_container_test2.proto --> proto_container_test.proto + +import "kudu/util/proto_container_test.proto"; +import "kudu/util/proto_container_test2.proto"; + +// Arbitrary protobuf has two PB dependencies. +// dependency. +message ProtoContainerTest3PB { + required kudu.ProtoContainerTestPB record_one = 1; + required kudu.ProtoContainerTest2PB record_two = 2; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/protobuf-annotations.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/protobuf-annotations.h b/be/src/kudu/util/protobuf-annotations.h new file mode 100644 index 0000000..7fdc961 --- /dev/null +++ b/be/src/kudu/util/protobuf-annotations.h @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Simple header which is inserted into all of our generated protobuf code. +// We use this to hook protobuf code up to TSAN annotations. +#ifndef KUDU_UTIL_PROTOBUF_ANNOTATIONS_H +#define KUDU_UTIL_PROTOBUF_ANNOTATIONS_H + +#include "kudu/gutil/dynamic_annotations.h" + +// The protobuf internal headers are included before this, so we have to undefine +// the empty definitions first. +#undef GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN +#undef GOOGLE_SAFE_CONCURRENT_WRITES_END + +#define GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN ANNOTATE_IGNORE_WRITES_BEGIN +#define GOOGLE_SAFE_CONCURRENT_WRITES_END ANNOTATE_IGNORE_WRITES_END + +#endif /* KUDU_UTIL_PROTOBUF_ANNOTATIONS_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/protobuf_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/protobuf_util.h b/be/src/kudu/util/protobuf_util.h new file mode 100644 index 0000000..cc88eda --- /dev/null +++ b/be/src/kudu/util/protobuf_util.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_PROTOBUF_UTIL_H +#define KUDU_UTIL_PROTOBUF_UTIL_H + +#include <google/protobuf/message_lite.h> + +namespace kudu { + +bool AppendPBToString(const google::protobuf::MessageLite &msg, faststring *output) { + int old_size = output->size(); + int byte_size = msg.ByteSize(); + output->resize(old_size + byte_size); + uint8* start = reinterpret_cast<uint8*>(output->data() + old_size); + uint8* end = msg.SerializeWithCachedSizesToArray(start); + CHECK(end - start == byte_size) + << "Error in serialization. byte_size=" << byte_size + << " new ByteSize()=" << msg.ByteSize() + << " end-start=" << (end-start); + return true; +} + +} // namespace kudu + +#endif
