http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.h b/be/src/kudu/util/pb_util.h
new file mode 100644
index 0000000..6c132a6
--- /dev/null
+++ b/be/src/kudu/util/pb_util.h
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utilities for dealing with protocol buffers.
+// These are mostly just functions similar to what are found in the protobuf
+// library itself, but using kudu::faststring instances instead of STL strings.
+#ifndef KUDU_UTIL_PB_UTIL_H
+#define KUDU_UTIL_PB_UTIL_H
+
+#include <cstdint>
+#include <iosfwd>
+#include <memory>
+#include <string>
+
+#include <boost/optional/optional.hpp>
+#include <google/protobuf/message.h>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/debug/trace_event_impl.h"
+
+namespace google {
+namespace protobuf {
+class DescriptorPool;
+class FileDescriptor;
+class FileDescriptorSet;
+class MessageLite;
+class SimpleDescriptorDatabase;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Env;
+class RandomAccessFile;
+class SequentialFile;
+class Slice;
+class Status;
+class RWFile;
+class faststring;
+
+namespace pb_util {
+
+enum SyncMode {
+  SYNC,
+  NO_SYNC
+};
+
+enum CreateMode {
+  OVERWRITE,
+  NO_OVERWRITE
+};
+
+enum class FileState {
+  NOT_INITIALIZED,
+  OPEN,
+  CLOSED
+};
+
+// The minimum valid length of a PBC file.
+extern const int kPBContainerMinimumValidLength;
+
+// See MessageLite::AppendToString
+void AppendToString(const google::protobuf::MessageLite &msg, faststring 
*output);
+
+// See MessageLite::AppendPartialToString
+void AppendPartialToString(const google::protobuf::MessageLite &msg, 
faststring *output);
+
+// See MessageLite::SerializeToString.
+void SerializeToString(const google::protobuf::MessageLite &msg, faststring 
*output);
+
+// See MessageLite::ParseFromZeroCopyStream
+Status ParseFromSequentialFile(google::protobuf::MessageLite *msg, 
SequentialFile *rfile);
+
+// Similar to MessageLite::ParseFromArray, with the difference that it returns
+// Status::Corruption() if the message could not be parsed.
+Status ParseFromArray(google::protobuf::MessageLite* msg, const uint8_t* data, 
uint32_t length);
+
+// Load a protobuf from the given path.
+Status ReadPBFromPath(Env* env, const std::string& path, 
google::protobuf::MessageLite* msg);
+
+// Serialize a protobuf to the given path.
+//
+// If SyncMode SYNC is provided, ensures the changes are made durable.
+Status WritePBToPath(Env* env, const std::string& path,
+                     const google::protobuf::MessageLite& msg, SyncMode sync);
+
+// Truncate any 'bytes' or 'string' fields of this message to max_len.
+// The text "<truncated>" is appended to any such truncated fields.
+void TruncateFields(google::protobuf::Message* message, int max_len);
+
+// Redaction-sensitive variant of Message::DebugString.
+//
+// For most protobufs, this has identical output to Message::DebugString. 
However,
+// a field with string or binary type may be tagged with the 'kudu.REDACT' 
option,
+// available by importing 'pb_util.proto'. When such a field is encountered by 
this
+// method, its contents will be redacted using the 'KUDU_REDACT' macro as 
documented
+// in kudu/util/logging.h.
+std::string SecureDebugString(const google::protobuf::Message& msg);
+
+// Same as SecureDebugString() above, but equivalent to 
Message::ShortDebugString.
+std::string SecureShortDebugString(const google::protobuf::Message& msg);
+
+// A protobuf "container" has the following format (all integers in
+// little-endian byte order).
+//
+// <file header>
+// <1 or more records>
+//
+// Note: There are two versions (version 1 and version 2) of the record format.
+// Version 2 of the file format differs from version 1 in the following ways:
+//
+//   * Version 2 has a file header checksum.
+//   * Version 2 has separate checksums for the record length and record data
+//     fields.
+//
+// File header format
+// ------------------
+//
+// Each protobuf container file contains a file header identifying the file.
+// This includes:
+//
+// magic number: 8 byte string identifying the file format.
+//
+//    Included so that we have a minimal guarantee that this file is of the
+//    type we expect and that we are not just reading garbage.
+//
+// container_version: 4 byte unsigned integer indicating the "version" of the
+//                    container format. May be set to 1 or 2.
+//
+//    Included so that this file format may be extended at some later date
+//    while maintaining backwards compatibility.
+//
+// file_header_checksum (version 2+ only): 4 byte unsigned integer with a 
CRC32C
+//                                         of the magic and version fields.
+//
+//    Included so that we can validate the container version number.
+//
+// The remaining container fields are considered part of a "record". There may
+// be 1 or more records in a valid protobuf container file.
+//
+// Record format
+// -------------
+//
+// data length: 4 byte unsigned integer indicating the size of the encoded 
data.
+//
+//    Included because PB messages aren't self-delimiting, and thus
+//    writing a stream of messages to the same file requires
+//    delimiting each with its size.
+//
+//    See 
https://developers.google.com/protocol-buffers/docs/techniques?hl=zh-cn#streaming
+//    for more details.
+//
+// length checksum (version 2+ only): 4-byte unsigned integer containing the
+//                                    CRC32C checksum of "data length".
+//
+//    Included so that we may discern the difference between a truncated file
+//    and a corrupted length field.
+//
+// data: "size" bytes of protobuf data encoded according to the schema.
+//
+//    Our payload.
+//
+// data checksum: 4 byte unsigned integer containing the CRC32C checksum of 
"data".
+//
+//    Included to ensure validity of the data on-disk.
+//    Note: In version 1 of the file format, this is a checksum of both the
+//    "data length" and "data" fields. In version 2+, this is only a checksum
+//    of the "data" field.
+//
+// Supplemental header
+// -------------------
+//
+// A valid container must have at least one record, the first of
+// which is known as the "supplemental header". The supplemental header
+// contains additional container-level information, including the protobuf
+// schema used for the records following it. See pb_util.proto for details. As
+// a containerized PB message, the supplemental header is protected by a CRC32C
+// checksum like any other message.
+//
+// Error detection and tolerance
+// -----------------------------
+//
+// It is worth describing the kinds of errors that can be detected by the
+// protobuf container and the kinds that cannot.
+//
+// The checksums in the container are independent, not rolling. As such,
+// they won't detect the disappearance or reordering of entire protobuf
+// messages, which can happen if a range of the file is collapsed (see
+// man fallocate(2)) or if the file is otherwise manually manipulated.
+//
+// In version 1, the checksums do not protect against corruption in the data
+// length field. However, version 2 of the format resolves that problem. The
+// benefit is that version 2 files can tell the difference between a record
+// with a corrupted length field and a record that was only partially written.
+// See ReadablePBContainerFile::ReadNextPB() for discussion on how this
+// difference is expressed via the API.
+//
+// In version 1 of the format, corruption of the version field in the file
+// header is not detectable. However, version 2 of the format addresses that
+// limitation as well.
+//
+// Corruption of the protobuf data itself is detected in all versions of the
+// file format (subject to CRC32 limitations).
+//
+// The container does not include footers or periodic checkpoints. As such, it
+// will not detect if entire records are truncated.
+//
+// The design and implementation relies on data ordering guarantees provided by
+// the file system to ensure that bytes are written to a file before the file
+// metadata (file size) is updated. A partially-written record (the result of a
+// failed append) is identified by one of the following criteria:
+// 1. Too-few bytes remain in the file to constitute a valid record. For
+//    version 2, that would be fewer than 12 bytes (data len, data len
+//    checksum, and data checksum), or
+// 2. Assuming a record's data length field is valid, then fewer bytes remain
+//    in the file than are specified in the data length field (plus enough for
+//    checksums).
+// In the above scenarios, it is assumed that the system faulted while in the
+// middle of appending a record, and it is considered safe to truncate the file
+// at the beginning of the partial record.
+//
+// If filesystem preallocation is used (at the time of this writing, the
+// implementation does not support preallocation) then even version 2 of the
+// format cannot safely support culling trailing partially-written records.
+// This is because it is not possible to reliably tell the difference between a
+// partially-written record that did not complete fsync (resulting in a bad
+// checksum) vs. a record that successfully was written to disk but then fell
+// victim to bit-level disk corruption. See also KUDU-1414.
+//
+// These tradeoffs in error detection are reasonable given the failure
+// environment that Kudu operates within. We tolerate failures such as
+// "kill -9" of the Kudu process, machine power loss, or fsync/fdatasync
+// failure, but not failures like runaway processes mangling data files
+// in arbitrary ways or attackers crafting malicious data files.
+//
+// In short, no version of the file format will detect truncation of entire
+// protobuf records. Version 2 relies on ordered data flushing semantics for
+// automatic recoverability from partial record writes. Version 1 of the file
+// format cannot support automatic recoverability from partial record writes.
+//
+// For further reading on what files might look like following a normal
+// filesystem failure or disk corruption, and the likelihood of various types
+// of disk errors, see the following papers:
+//
+// 
https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf
+// 
https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf
+
+// Protobuf container file opened for writing. Can be built around an existing
+// file or a completely new file.
+//
+// Every function is thread-safe unless indicated otherwise.
+class WritablePBContainerFile {
+ public:
+
+  // Initializes the class instance; writer must be open.
+  explicit WritablePBContainerFile(std::shared_ptr<RWFile> writer);
+
+  // Closes the container if not already closed.
+  ~WritablePBContainerFile();
+
+  // Writes the file header to disk and initializes the write offset to the
+  // byte after the file header. This method should NOT be called when opening
+  // an existing file for append; use OpenExisting() for that.
+  //
+  // 'msg' need not be populated; its type is used to "lock" the container
+  // to a particular protobuf message type in Append().
+  //
+  // Not thread-safe.
+  Status CreateNew(const google::protobuf::Message& msg);
+
+  // Opens an existing protobuf container file for append. The file must
+  // already have a valid file header. To initialize a new blank file for
+  // writing, use CreateNew() instead.
+  //
+  // The file header is read and the version specified there is used as the
+  // format version. The length of the file is also read and is used as the
+  // write offset for subsequent Append() calls. WritablePBContainerFile caches
+  // the write offset instead of constantly calling stat() on the file each
+  // time append is called.
+  //
+  // Not thread-safe.
+  Status OpenExisting();
+
+  // Writes a protobuf message to the container, beginning with its size
+  // and ending with its CRC32 checksum. One of CreateNew() or OpenExisting()
+  // must be called prior to calling Append(), i.e. the file must be open.
+  Status Append(const google::protobuf::Message& msg);
+
+  // Asynchronously flushes all dirty container data to the filesystem.
+  // The file must be open.
+  Status Flush();
+
+  // Synchronizes all dirty container data to the filesystem.
+  // The file must be open.
+  //
+  // Note: the parent directory is _not_ synchronized. Because the
+  // container file was provided during construction, we don't know whether
+  // it was created or reopened, and parent directory synchronization is
+  // only needed in the former case.
+  Status Sync();
+
+  // Closes the container.
+  //
+  // Not thread-safe.
+  Status Close();
+
+  // Returns the path to the container's underlying file handle.
+  const std::string& filename() const;
+
+ private:
+  friend class TestPBUtil;
+  FRIEND_TEST(TestPBUtil, TestPopulateDescriptorSet);
+
+  // Set the file format version. Only used for testing.
+  // Must be called before CreateNew().
+  Status SetVersionForTests(int version);
+
+  // Write the protobuf schemas belonging to 'desc' and all of its
+  // dependencies to 'output'.
+  //
+  // Schemas are written in dependency order (i.e. if A depends on B which
+  // depends on C, the order is C, B, A).
+  static void PopulateDescriptorSet(const google::protobuf::FileDescriptor* 
desc,
+                                    google::protobuf::FileDescriptorSet* 
output);
+
+  // Serialize the contents of 'msg' into 'buf' along with additional metadata
+  // to aid in deserialization.
+  Status AppendMsgToBuffer(const google::protobuf::Message& msg, faststring* 
buf);
+
+  // Append bytes to the file.
+  Status AppendBytes(const Slice& data);
+
+  // State of the file.
+  FileState state_;
+
+  // Protects offset_.
+  Mutex offset_lock_;
+
+  // Current write offset into the file.
+  uint64_t offset_;
+
+  // Protobuf container file version.
+  int version_;
+
+  // File writer.
+  std::shared_ptr<RWFile> writer_;
+};
+
+// Protobuf container file opened for reading.
+//
+// Can be built around a file with existing contents or an empty file (in
+// which case it's safe to interleave with WritablePBContainerFile).
+class ReadablePBContainerFile {
+ public:
+
+  // Initializes the class instance; reader must be open.
+  explicit ReadablePBContainerFile(std::shared_ptr<RandomAccessFile> reader);
+
+  // Closes the file if not already closed.
+  ~ReadablePBContainerFile();
+
+  // Reads the header information from the container and validates it.
+  // Must be called before any of the other methods.
+  Status Open();
+
+  // Reads a protobuf message from the container, validating its size and
+  // data using a CRC32 checksum. File must be open.
+  //
+  // Return values:
+  // * If there are no more records in the file, returns Status::EndOfFile.
+  // * If there is a partial record, but it is not long enough to be a full
+  //   record or the written length of the record is less than the remaining
+  //   bytes in the file, returns Status::Incomplete. If Status::Incomplete
+  //   is returned, calling offset() will return the point in the file where
+  //   the invalid partial record begins. In order to append additional records
+  //   to the file, the file must first be truncated at that offset.
+  //   Note: Version 1 of this file format will never return
+  //   Status::Incomplete() from this method.
+  // * If a corrupt record is encountered, returns Status::Corruption.
+  // * On success, stores the result in '*msg' and returns OK.
+  Status ReadNextPB(google::protobuf::Message* msg);
+
+  // Dumps any unread protobuf messages in the container to 'os'. Each
+  // message's DebugString() method is invoked to produce its textual form.
+  // File must be open.
+  enum class Format {
+    // Print each message on multiple lines, with intervening headers.
+    DEFAULT,
+    // Same as DEFAULT but includes additional metadata information.
+    DEBUG,
+    // Print each message on its own line.
+    ONELINE,
+    // Dump in JSON.
+    JSON
+  };
+  Status Dump(std::ostream* os, Format format);
+
+  // Closes the container.
+  Status Close();
+
+  // Expected PB type and schema for each message to be read.
+  //
+  // Only valid after a successful call to Open().
+  const std::string& pb_type() const { return pb_type_; }
+  const google::protobuf::FileDescriptorSet* protos() const {
+    return protos_.get();
+  }
+
+  // Get the prototype instance for the type of messages stored in this
+  // file. The returned Message is owned by this ReadablePBContainerFile 
instance.
+  Status GetPrototype(const google::protobuf::Message** prototype);
+
+  // Return the protobuf container file format version.
+  // File must be open.
+  int version() const;
+
+  // Return current read offset.
+  // File must be open.
+  uint64_t offset() const;
+
+ private:
+  FileState state_;
+  int version_;
+  uint64_t offset_;
+
+  // The size of the file we are reading, or 'none' if it hasn't yet been
+  // read.
+  boost::optional<uint64_t> cached_file_size_;
+
+  // The fully-qualified PB type name of the messages in the container.
+  std::string pb_type_;
+
+  // Wrapped in a unique_ptr so that clients need not include PB headers.
+  std::unique_ptr<google::protobuf::FileDescriptorSet> protos_;
+
+  // Protobuf infrastructure which owns the message prototype 'prototype_'.
+  std::unique_ptr<google::protobuf::SimpleDescriptorDatabase> db_;
+  std::unique_ptr<google::protobuf::DescriptorPool> descriptor_pool_;
+  std::unique_ptr<google::protobuf::MessageFactory> message_factory_;
+  const google::protobuf::Message* prototype_ = nullptr;
+
+  std::shared_ptr<RandomAccessFile> reader_;
+};
+
+// Convenience functions for protobuf containers holding just one record.
+
+// Load a "containerized" protobuf from the given path.
+// If the file does not exist, returns Status::NotFound(). Otherwise, may
+// return other Status error codes such as Status::IOError.
+Status ReadPBContainerFromPath(Env* env, const std::string& path,
+                               google::protobuf::Message* msg);
+
+// Serialize a "containerized" protobuf to the given path.
+//
+// If create == NO_OVERWRITE and 'path' already exists, the function will fail.
+// If sync == SYNC, the newly created file will be fsynced before returning.
+Status WritePBContainerToPath(Env* env, const std::string& path,
+                              const google::protobuf::Message& msg,
+                              CreateMode create,
+                              SyncMode sync);
+
+// Wrapper for a protobuf message which lazily converts to JSON when
+// the trace buffer is dumped.
+//
+// When tracing, an instance of this class can be associated with
+// a given trace, instead of a stringified PB, thus avoiding doing
+// stringification inline and moving that work to the tracing process.
+//
+// Example usage:
+//  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+//                         "response", 
pb_util::PbTracer::TracePb(*response_pb_),
+//                         ...);
+//
+class PbTracer : public debug::ConvertableToTraceFormat {
+ public:
+  enum {
+    kMaxFieldLengthToTrace = 100
+  };
+
+  // Static helper to be called when adding a stringified PB to a trace.
+  // This does not actually stringify 'msg', that will be done later
+  // when/if AppendAsTraceFormat() is called on the returned object.
+  static scoped_refptr<debug::ConvertableToTraceFormat> TracePb(
+      const google::protobuf::Message& msg);
+
+  explicit PbTracer(const google::protobuf::Message& msg);
+
+  // Actually stringifies the PB and appends the string to 'out'.
+  void AppendAsTraceFormat(std::string* out) const override;
+ private:
+  const std::unique_ptr<google::protobuf::Message> msg_;
+};
+
+} // namespace pb_util
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.proto b/be/src/kudu/util/pb_util.proto
new file mode 100644
index 0000000..b78c0cf
--- /dev/null
+++ b/be/src/kudu/util/pb_util.proto
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+import "google/protobuf/descriptor.proto";
+
+// ============================================================================
+//  Protobuf container metadata
+// ============================================================================
+
+// Supplemental protobuf container header, after the main header (see
+// pb_util.h for details).
+message ContainerSupHeaderPB {
+  // The protobuf schema for the messages expected in this container.
+  //
+  // This schema is complete, that is, it includes all of its dependencies
+  // (i.e. other schemas defined in .proto files imported by this schema's
+  // .proto file).
+  required google.protobuf.FileDescriptorSet protos = 1;
+
+  // The PB message type expected in each data entry in this container. Must
+  // be fully qualified (i.e. kudu.tablet.TabletSuperBlockPB).
+  required string pb_type = 2;
+}
+
+extend google.protobuf.FieldOptions {
+  optional bool REDACT = 50001 [default=false];
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util_test.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util_test.proto 
b/be/src/kudu/util/pb_util_test.proto
new file mode 100644
index 0000000..bac0be0
--- /dev/null
+++ b/be/src/kudu/util/pb_util_test.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+import "kudu/util/pb_util.proto";
+
+message TestSecurePrintingPB {
+  optional string insecure1 = 1;
+  optional string secure1 = 2 [(kudu.REDACT) = true];
+  optional string insecure2 = 3;
+  optional string secure2 = 4 [(kudu.REDACT) = true];
+  repeated string repeated_secure = 5 [(kudu.REDACT) = true];
+  optional string insecure3 = 6;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/process_memory-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory-test.cc 
b/be/src/kudu/util/process_memory-test.cc
new file mode 100644
index 0000000..36df1a9
--- /dev/null
+++ b/be/src/kudu/util/process_memory-test.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <ostream>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/process_memory.h"
+
+using std::atomic;
+using std::thread;
+using std::vector;
+
+namespace kudu {
+
+// Microbenchmark for our new/delete hooks which track process-wide
+// memory consumption.
+TEST(ProcessMemory, BenchmarkConsumptionTracking) {
+  const int kNumThreads = 200;
+  vector<thread> threads;
+  atomic<bool> done(false);
+  atomic<int64_t> total_count(0);
+
+  // We start many threads, each of which performs 10:1 ratio of
+  // new/delete pairs to consumption lookups. The high number
+  // of threads highlights when there is contention on central
+  // tcmalloc locks.
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&]() {
+        int64_t local_count = 0;
+        while (!done) {
+          for (int a = 0; a < 10; a++) {
+            // Mark 'x' volatile so that the compiler does not optimize out the
+            // allocation.
+            char* volatile x = new char[8000];
+            delete[] x;
+          }
+          process_memory::CurrentConsumption();
+          local_count++;
+        }
+        total_count += local_count;
+      });
+  }
+  double secs = 3;
+  SleepFor(MonoDelta::FromSeconds(secs));
+  done = true;
+
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  LOG(INFO) << "Performed " << total_count / secs << " iters/sec";
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/process_memory.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory.cc 
b/be/src/kudu/util/process_memory.cc
new file mode 100644
index 0000000..d2f3653
--- /dev/null
+++ b/be/src/kudu/util/process_memory.cc
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstddef>
+#include <ostream>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#ifdef TCMALLOC_ENABLED
+#include <gperftools/malloc_extension.h>  // IWYU pragma: keep
+#endif
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"          // IWYU pragma: keep
+#include "kudu/util/debug/trace_event.h"  // IWYU pragma: keep
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mem_tracker.h"        // IWYU pragma: keep
+#include "kudu/util/process_memory.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+
+DEFINE_int64(memory_limit_hard_bytes, 0,
+             "Maximum amount of memory this daemon should use, in bytes. "
+             "A value of 0 autosizes based on the total system memory. "
+             "A value of -1 disables all memory limiting.");
+TAG_FLAG(memory_limit_hard_bytes, stable);
+
+DEFINE_int32(memory_pressure_percentage, 60,
+             "Percentage of the hard memory limit that this daemon may "
+             "consume before flushing of in-memory data becomes prioritized.");
+TAG_FLAG(memory_pressure_percentage, advanced);
+
+DEFINE_int32(memory_limit_soft_percentage, 80,
+             "Percentage of the hard memory limit that this daemon may "
+             "consume before memory throttling of writes begins. The greater "
+             "the excess, the higher the chance of throttling. In general, a "
+             "lower soft limit leads to smoother write latencies but "
+             "decreased throughput, and vice versa for a higher soft limit.");
+TAG_FLAG(memory_limit_soft_percentage, advanced);
+
+DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
+             "Percentage of the hard memory limit that this daemon may "
+             "consume before WARNING level messages are periodically logged.");
+TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
+
+#ifdef TCMALLOC_ENABLED
+DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10,
+             "Maximum percentage of the RSS that tcmalloc is allowed to use 
for "
+             "reserved but unallocated memory.");
+TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced);
+#endif
+
+using strings::Substitute;
+
+namespace kudu {
+namespace process_memory {
+
+namespace {
+int64_t g_hard_limit;
+int64_t g_soft_limit;
+int64_t g_pressure_threshold;
+
+ThreadSafeRandom* g_rand = nullptr;
+
+#ifdef TCMALLOC_ENABLED
+// Total amount of memory released since the last GC. If this
+// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
+Atomic64 g_released_memory_since_gc;
+
+// Size, in bytes, that is considered a large value for Release() (or 
Consume() with
+// a negative value). If tcmalloc is used, this can trigger it to GC.
+// A higher value will make us call into tcmalloc less often (and therefore 
more
+// efficient). A lower value will mean our memory overhead is lower.
+// TODO(todd): this is a stopgap.
+const int64_t kGcReleaseSize = 128 * 1024L * 1024L;
+
+#endif // TCMALLOC_ENABLED
+
+} // anonymous namespace
+
+
+// Flag validation
+// ------------------------------------------------------------
+// Validate that various flags are percentages.
+static bool ValidatePercentage(const char* flagname, int value) {
+  if (value >= 0 && value <= 100) {
+    return true;
+  }
+  LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
+                           flagname, value);
+  return false;
+}
+
+static bool dummy[] = {
+  google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, 
&ValidatePercentage),
+  google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, 
&ValidatePercentage)
+#ifdef TCMALLOC_ENABLED
+  ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, 
&ValidatePercentage)
+#endif
+};
+
+
+// Wrappers around tcmalloc functionality
+// ------------------------------------------------------------
+#ifdef TCMALLOC_ENABLED
+static int64_t GetTCMallocProperty(const char* prop) {
+  size_t value;
+  if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) {
+    LOG(DFATAL) << "Failed to get tcmalloc property " << prop;
+  }
+  return value;
+}
+
+int64_t GetTCMallocCurrentAllocatedBytes() {
+  return GetTCMallocProperty("generic.current_allocated_bytes");
+}
+
+void GcTcmalloc() {
+  TRACE_EVENT0("process", "GcTcmalloc");
+
+  // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but
+  // not in use).
+  int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes");
+  // Bytes allocated by the application.
+  int64_t bytes_used = GetTCMallocCurrentAllocatedBytes();
+
+  int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage 
/ 100.0;
+  if (bytes_overhead > max_overhead) {
+    int64_t extra = bytes_overhead - max_overhead;
+    while (extra > 0) {
+      // Release 1MB at a time, so that tcmalloc releases its page heap lock
+      // allowing other threads to make progress. This still disrupts the 
current
+      // thread, but is better than disrupting all.
+      MallocExtension::instance()->ReleaseToSystem(1024 * 1024);
+      extra -= 1024 * 1024;
+    }
+  }
+}
+#endif // TCMALLOC_ENABLED
+
+
+// Consumption and soft memory limit behavior
+// ------------------------------------------------------------
+namespace {
+void DoInitLimits() {
+  int64_t limit = FLAGS_memory_limit_hard_bytes;
+  if (limit == 0) {
+    // If no limit is provided, we'll use 80% of system RAM.
+    int64_t total_ram;
+    CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
+    limit = total_ram * 4;
+    limit /= 5;
+  }
+  g_hard_limit = limit;
+  g_soft_limit = FLAGS_memory_limit_soft_percentage * g_hard_limit / 100;
+  g_pressure_threshold = FLAGS_memory_pressure_percentage * g_hard_limit / 100;
+
+  g_rand = new ThreadSafeRandom(1);
+}
+
+void InitLimits() {
+  static GoogleOnceType once;
+  GoogleOnceInit(&once, &DoInitLimits);
+}
+
+} // anonymous namespace
+
+int64_t CurrentConsumption() {
+#ifdef TCMALLOC_ENABLED
+  const int64_t kReadIntervalMicros = 50000;
+  static Atomic64 last_read_time = 0;
+  static simple_spinlock read_lock;
+  static Atomic64 consumption = 0;
+  uint64_t time = GetMonoTimeMicros();
+  if (time > last_read_time + kReadIntervalMicros && read_lock.try_lock()) {
+    base::subtle::NoBarrier_Store(&consumption, 
GetTCMallocCurrentAllocatedBytes());
+    // Re-fetch the time after getting the consumption. This way, in case 
fetching
+    // consumption is extremely slow for some reason (eg due to lots of 
contention
+    // in tcmalloc) we at least ensure that we wait at least another full 
interval
+    // before fetching the information again.
+    time = GetMonoTimeMicros();
+    base::subtle::NoBarrier_Store(&last_read_time, time);
+    read_lock.unlock();
+  }
+
+  return base::subtle::NoBarrier_Load(&consumption);
+#else
+  // Without tcmalloc, we have no reliable way of determining our own heap
+  // size (e.g. mallinfo doesn't work in ASAN builds). So, we'll fall back
+  // to just looking at the sum of our tracked memory.
+  return MemTracker::GetRootTracker()->consumption();
+#endif
+}
+
+int64_t HardLimit() {
+  InitLimits();
+  return g_hard_limit;
+}
+
+int64_t SoftLimit() {
+  InitLimits();
+  return g_soft_limit;
+}
+
+int64_t MemoryPressureThreshold() {
+  InitLimits();
+  return g_pressure_threshold;
+}
+
+bool UnderMemoryPressure(double* current_capacity_pct) {
+  InitLimits();
+  int64_t consumption = CurrentConsumption();
+  if (consumption < g_pressure_threshold) {
+    return false;
+  }
+  if (current_capacity_pct) {
+    *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 
100;
+  }
+  return true;
+}
+
+bool SoftLimitExceeded(double* current_capacity_pct) {
+  InitLimits();
+  int64_t consumption = CurrentConsumption();
+  // Did we exceed the actual limit?
+  if (consumption > g_hard_limit) {
+    if (current_capacity_pct) {
+      *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit 
* 100;
+    }
+    return true;
+  }
+
+  // No soft limit defined.
+  if (g_hard_limit == g_soft_limit) {
+    return false;
+  }
+
+  // Are we under the soft limit threshold?
+  if (consumption < g_soft_limit) {
+    return false;
+  }
+
+  // We're over the threshold; were we randomly chosen to be over the soft 
limit?
+  if (consumption + g_rand->Uniform64(g_hard_limit - g_soft_limit) > 
g_hard_limit) {
+    if (current_capacity_pct) {
+      *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit 
* 100;
+    }
+    return true;
+  }
+  return false;
+}
+
+void MaybeGCAfterRelease(int64_t released_bytes) {
+#ifdef TCMALLOC_ENABLED
+  int64_t now_released = base::subtle::NoBarrier_AtomicIncrement(
+      &g_released_memory_since_gc, -released_bytes);
+  if (PREDICT_FALSE(now_released > kGcReleaseSize)) {
+    base::subtle::NoBarrier_Store(&g_released_memory_since_gc, 0);
+    GcTcmalloc();
+  }
+#endif
+}
+
+} // namespace process_memory
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/process_memory.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory.h 
b/be/src/kudu/util/process_memory.h
new file mode 100644
index 0000000..cba7046
--- /dev/null
+++ b/be/src/kudu/util/process_memory.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+
+namespace kudu {
+namespace process_memory {
+
+// Probabilistically returns true if the process-wide soft memory limit is 
exceeded.
+// The greater the excess, the higher the chance that it returns true.
+//
+// If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the 
percentage
+// of the hard limit consumed is written to it.
+bool SoftLimitExceeded(double* current_capacity_pct);
+
+// Return true if we are under memory pressure (i.e if we are nearing the 
point at which
+// SoftLimitExceeded will begin to return true).
+//
+// If the process is under memory pressure, and 'current_capacity_pct' is not 
NULL,
+// the percentage of the hard limit consumed is written to it.
+bool UnderMemoryPressure(double* current_capacity_pct);
+
+// Potentially trigger a call to release tcmalloc memory back to the
+// OS, after the given amount of memory was released.
+void MaybeGCAfterRelease(int64_t released_bytes);
+
+// Return the total current memory consumption of the process.
+int64_t CurrentConsumption();
+
+// Return the configured hard limit for the process.
+int64_t HardLimit();
+
+// Return the configured soft limit for the process.
+int64_t SoftLimit();
+
+// Return the configured memory pressure threshold for the process.
+int64_t MemoryPressureThreshold();
+
+#ifdef TCMALLOC_ENABLED
+// Get the current amount of allocated memory, according to tcmalloc.
+//
+// This should be equal to CurrentConsumption(), but is made available so that 
tests
+// can verify the correctness of CurrentConsumption().
+int64_t GetTCMallocCurrentAllocatedBytes();
+#endif
+
+} // namespace process_memory
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/promise.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/promise.h b/be/src/kudu/util/promise.h
new file mode 100644
index 0000000..17f8cec
--- /dev/null
+++ b/be/src/kudu/util/promise.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PROMISE_H
+#define KUDU_UTIL_PROMISE_H
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/countdown_latch.h"
+
+namespace kudu {
+
+// A promise boxes a value which is to be provided at some time in the future.
+// A single producer calls Set(...), and any number of consumers can call Get()
+// to retrieve the produced value.
+//
+// In Guava terms, this is a SettableFuture<T>.
+template<typename T>
+class Promise {
+ public:
+  Promise() : latch_(1) {}
+  ~Promise() {}
+
+  // Reset the promise to be used again.
+  // For this to be safe, there must be some kind of external synchronization
+  // ensuring that no threads are still accessing the value from the previous
+  // incarnation of the promise.
+  void Reset() {
+    latch_.Reset(1);
+    val_ = T();
+  }
+
+  // Block until a value is available, and return a reference to it.
+  const T& Get() const {
+    latch_.Wait();
+    return val_;
+  }
+
+  // Wait for the promised value to become available with the given timeout.
+  //
+  // Returns NULL if the timeout elapses before a value is available.
+  // Otherwise returns a pointer to the value. This pointer's lifetime is
+  // tied to the lifetime of the Promise object.
+  const T* WaitFor(const MonoDelta& delta) const {
+    if (latch_.WaitFor(delta)) {
+      return &val_;
+    } else {
+      return NULL;
+    }
+  }
+
+  // Set the value of this promise.
+  // This may be called at most once.
+  void Set(const T& val) {
+    DCHECK_EQ(latch_.count(), 1) << "Already set!";
+    val_ = val;
+    latch_.CountDown();
+  }
+
+ private:
+  CountDownLatch latch_;
+  T val_;
+  DISALLOW_COPY_AND_ASSIGN(Promise);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_PROMISE_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/proto_container_test.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test.proto 
b/be/src/kudu/util/proto_container_test.proto
new file mode 100644
index 0000000..4707c08
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test.proto
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Arbitrary protobuf to test writing a containerized protobuf.
+message ProtoContainerTestPB {
+  required string name = 1;
+  required int32 value = 2;
+  optional string note = 3;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/proto_container_test2.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test2.proto 
b/be/src/kudu/util/proto_container_test2.proto
new file mode 100644
index 0000000..74a1ea3
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test2.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Dependency chain:
+//
+// this file --> proto_container_test.proto
+
+import "kudu/util/proto_container_test.proto";
+
+// Arbitrary protobuf that has one PB dependency.
+message ProtoContainerTest2PB {
+  required kudu.ProtoContainerTestPB record = 1;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/proto_container_test3.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test3.proto 
b/be/src/kudu/util/proto_container_test3.proto
new file mode 100644
index 0000000..1ed1c31
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test3.proto
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Dependency chain:
+//
+// this file --> proto_container_test.proto
+//           --> proto_container_test2.proto --> proto_container_test.proto
+
+import "kudu/util/proto_container_test.proto";
+import "kudu/util/proto_container_test2.proto";
+
+// Arbitrary protobuf has two PB dependencies.
+// dependency.
+message ProtoContainerTest3PB {
+  required kudu.ProtoContainerTestPB record_one = 1;
+  required kudu.ProtoContainerTest2PB record_two = 2;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/protobuf-annotations.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protobuf-annotations.h 
b/be/src/kudu/util/protobuf-annotations.h
new file mode 100644
index 0000000..7fdc961
--- /dev/null
+++ b/be/src/kudu/util/protobuf-annotations.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple header which is inserted into all of our generated protobuf code.
+// We use this to hook protobuf code up to TSAN annotations.
+#ifndef KUDU_UTIL_PROTOBUF_ANNOTATIONS_H
+#define KUDU_UTIL_PROTOBUF_ANNOTATIONS_H
+
+#include "kudu/gutil/dynamic_annotations.h"
+
+// The protobuf internal headers are included before this, so we have to 
undefine
+// the empty definitions first.
+#undef GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN
+#undef GOOGLE_SAFE_CONCURRENT_WRITES_END
+
+#define GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN ANNOTATE_IGNORE_WRITES_BEGIN
+#define GOOGLE_SAFE_CONCURRENT_WRITES_END ANNOTATE_IGNORE_WRITES_END
+
+#endif /* KUDU_UTIL_PROTOBUF_ANNOTATIONS_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/protobuf_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protobuf_util.h b/be/src/kudu/util/protobuf_util.h
new file mode 100644
index 0000000..cc88eda
--- /dev/null
+++ b/be/src/kudu/util/protobuf_util.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PROTOBUF_UTIL_H
+#define KUDU_UTIL_PROTOBUF_UTIL_H
+
+#include <google/protobuf/message_lite.h>
+
+namespace kudu {
+
+bool AppendPBToString(const google::protobuf::MessageLite &msg, faststring 
*output) {
+  int old_size = output->size();
+  int byte_size = msg.ByteSize();
+  output->resize(old_size + byte_size);
+  uint8* start = reinterpret_cast<uint8*>(output->data() + old_size);
+  uint8* end = msg.SerializeWithCachedSizesToArray(start);
+  CHECK(end - start == byte_size)
+    << "Error in serialization. byte_size=" << byte_size
+    << " new ByteSize()=" << msg.ByteSize()
+    << " end-start=" << (end-start);
+  return true;
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/protoc-gen-insertions.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protoc-gen-insertions.cc 
b/be/src/kudu/util/protoc-gen-insertions.cc
new file mode 100644
index 0000000..5d1097e
--- /dev/null
+++ b/be/src/kudu/util/protoc-gen-insertions.cc
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple protoc plugin which inserts some code at the top of each generated 
protobuf.
+// Currently, this just adds an include of protobuf-annotations.h, a file 
which hooks up
+// the protobuf concurrency annotations to our TSAN annotations.
+
+#include <string>
+
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using google::protobuf::io::ZeroCopyOutputStream;
+using google::protobuf::io::Printer;
+using std::string;
+
+namespace kudu {
+
+static const char* const kIncludeToInsert = "#include 
\"kudu/util/protobuf-annotations.h\"\n";
+static const char* const kProtoExtension = ".proto";
+
+class InsertAnnotations : public ::google::protobuf::compiler::CodeGenerator {
+  virtual bool Generate(const google::protobuf::FileDescriptor *file,
+                        const std::string &/*param*/,
+                        google::protobuf::compiler::GeneratorContext 
*gen_context,
+                        std::string *error) const OVERRIDE {
+
+    // Determine the file name we will substitute into.
+    string path_no_extension;
+    if (!TryStripSuffixString(file->name(), kProtoExtension, 
&path_no_extension)) {
+      *error = strings::Substitute("file name $0 did not end in $1", 
file->name(), kProtoExtension);
+      return false;
+    }
+    string pb_file = path_no_extension + ".pb.cc";
+
+    // Actually insert the new #include
+    gscoped_ptr<ZeroCopyOutputStream> 
inserter(gen_context->OpenForInsert(pb_file, "includes"));
+    Printer printer(inserter.get(), '$');
+    printer.Print(kIncludeToInsert);
+
+    if (printer.failed()) {
+      *error = "Failed to print to output file";
+      return false;
+    }
+
+    return true;
+  }
+};
+
+} // namespace kudu
+
+int main(int argc, char *argv[]) {
+  kudu::InsertAnnotations generator;
+  return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pstack_watcher-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher-test.cc 
b/be/src/kudu/util/pstack_watcher-test.cc
new file mode 100644
index 0000000..a993d66
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher-test.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/pstack_watcher.h"
+
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstdio>
+#include <memory>
+#include <string>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::shared_ptr;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+TEST(TestPstackWatcher, TestPstackWatcherCancellation) {
+  PstackWatcher watcher(MonoDelta::FromSeconds(1000000));
+  watcher.Shutdown();
+}
+
+TEST(TestPstackWatcher, TestWait) {
+  PstackWatcher watcher(MonoDelta::FromMilliseconds(10));
+  watcher.Wait();
+}
+
+TEST(TestPstackWatcher, TestDumpStacks) {
+  ASSERT_OK(PstackWatcher::DumpStacks());
+}
+
+static FILE* RedirectStdout(string *temp_path) {
+  string temp_dir;
+  CHECK_OK(Env::Default()->GetTestDirectory(&temp_dir));
+  *temp_path = Substitute("$0/pstack_watcher-dump.$1.txt",
+                      temp_dir, getpid());
+  FILE* reopened;
+  POINTER_RETRY_ON_EINTR(reopened, freopen(temp_path->c_str(), "w", stdout));
+  return reopened;
+}
+
+TEST(TestPstackWatcher, TestPstackWatcherRunning) {
+  string stdout_file;
+  int old_stdout;
+  RETRY_ON_EINTR(old_stdout, dup(STDOUT_FILENO));
+  CHECK_ERR(old_stdout);
+  {
+    FILE* out_fp = RedirectStdout(&stdout_file);
+    PCHECK(out_fp != nullptr);
+    SCOPED_CLEANUP({
+        int err;
+        RETRY_ON_EINTR(err, fclose(out_fp));
+      });
+    PstackWatcher watcher(MonoDelta::FromMilliseconds(500));
+    while (watcher.IsRunning()) {
+      SleepFor(MonoDelta::FromMilliseconds(1));
+    }
+  }
+  int dup2_ret;
+  RETRY_ON_EINTR(dup2_ret, dup2(old_stdout, STDOUT_FILENO));
+  CHECK_ERR(dup2_ret);
+  PCHECK(stdout = fdopen(STDOUT_FILENO, "w"));
+
+  faststring contents;
+  CHECK_OK(ReadFileToString(Env::Default(), stdout_file, &contents));
+  ASSERT_STR_CONTAINS(contents.ToString(), "BEGIN STACKS");
+  CHECK_ERR(unlink(stdout_file.c_str()));
+  ASSERT_GE(fprintf(stdout, "%s\n", contents.ToString().c_str()), 0)
+      << "errno=" << errno << ": " << ErrnoToString(errno);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pstack_watcher.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher.cc 
b/be/src/kudu/util/pstack_watcher.cc
new file mode 100644
index 0000000..2c4481a
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/pstack_watcher.h"
+
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstdio>
+#include <string>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/status.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+using std::string;
+using std::vector;
+using strings::SkipEmpty;
+using strings::SkipWhitespace;
+using strings::Split;
+using strings::Substitute;
+
+PstackWatcher::PstackWatcher(MonoDelta timeout)
+    : timeout_(timeout), running_(true), cond_(&lock_) {
+  CHECK_OK(Thread::Create("pstack_watcher", "pstack_watcher",
+                 boost::bind(&PstackWatcher::Run, this), &thread_));
+}
+
+PstackWatcher::~PstackWatcher() {
+  Shutdown();
+}
+
+void PstackWatcher::Shutdown() {
+  {
+    MutexLock guard(lock_);
+    running_ = false;
+    cond_.Broadcast();
+  }
+  if (thread_) {
+    CHECK_OK(ThreadJoiner(thread_.get()).Join());
+    thread_.reset();
+  }
+}
+
+bool PstackWatcher::IsRunning() const {
+  MutexLock guard(lock_);
+  return running_;
+}
+
+void PstackWatcher::Wait() const {
+  MutexLock lock(lock_);
+  while (running_) {
+    cond_.Wait();
+  }
+}
+
+void PstackWatcher::Run() {
+  MutexLock guard(lock_);
+  if (!running_) return;
+  cond_.WaitFor(timeout_);
+  if (!running_) return;
+
+  WARN_NOT_OK(DumpStacks(DUMP_FULL), "Unable to print pstack from watcher");
+  running_ = false;
+  cond_.Broadcast();
+}
+
+Status PstackWatcher::HasProgram(const char* progname) {
+  Subprocess proc({ "which", progname } );
+  proc.DisableStderr();
+  proc.DisableStdout();
+  RETURN_NOT_OK_PREPEND(proc.Start(),
+      Substitute("HasProgram($0): error running 'which'", progname));
+  RETURN_NOT_OK(proc.Wait());
+  int exit_status;
+  string exit_info;
+  RETURN_NOT_OK(proc.GetExitStatus(&exit_status, &exit_info));
+  if (exit_status == 0) {
+    return Status::OK();
+  }
+  return Status::NotFound(Substitute("can't find $0: $1", progname, 
exit_info));
+}
+
+Status PstackWatcher::HasGoodGdb() {
+  // Check for the existence of gdb.
+  RETURN_NOT_OK(HasProgram("gdb"));
+
+  // gdb exists, run it and parse the output of --version. For example:
+  //
+  // GNU gdb (GDB) Red Hat Enterprise Linux (7.2-75.el6)
+  // ...
+  //
+  // Or:
+  //
+  // GNU gdb (Ubuntu 7.11.1-0ubuntu1~16.5) 7.11.1
+  // ...
+  string stdout;
+  RETURN_NOT_OK(Subprocess::Call({"gdb", "--version"}, "", &stdout));
+  vector<string> lines = Split(stdout, "\n", SkipEmpty());
+  if (lines.empty()) {
+    return Status::Incomplete("gdb version not found");
+  }
+  vector<string> words = Split(lines[0], " ", SkipWhitespace());
+  if (words.empty()) {
+    return Status::Incomplete("could not parse gdb version");
+  }
+  string version = words[words.size() - 1];
+  version = StripPrefixString(version, "(");
+  version = StripSuffixString(version, ")");
+
+  // The variable pretty print routine in older versions of gdb is buggy in
+  // that it reads the values of all local variables, including uninitialized
+  // ones. For some variable types with an embedded length (such as std::string
+  // or std::vector), this can lead to all sorts of incorrect memory accesses,
+  // causing deadlocks or seemingly infinite loops within gdb.
+  //
+  // It's not clear exactly when this behavior was fixed, so we whitelist the
+  // oldest known good version: the one found in Ubuntu 14.04.
+  //
+  // See the following gdb bug reports for more information:
+  // - https://sourceware.org/bugzilla/show_bug.cgi?id=11868
+  // - https://sourceware.org/bugzilla/show_bug.cgi?id=12127
+  // - https://sourceware.org/bugzilla/show_bug.cgi?id=16196
+  // - https://sourceware.org/bugzilla/show_bug.cgi?id=16286
+  autodigit_less lt;
+  if (lt(version, "7.7")) {
+    return Status::NotSupported("gdb version too old", version);
+  }
+
+  return Status::OK();
+}
+
+Status PstackWatcher::DumpStacks(int flags) {
+  return DumpPidStacks(getpid(), flags);
+}
+
+Status PstackWatcher::DumpPidStacks(pid_t pid, int flags) {
+
+  // Prefer GDB if available; it gives us line numbers and thread names.
+  Status s = HasGoodGdb();
+  if (s.ok()) {
+    return RunGdbStackDump(pid, flags);
+  }
+  WARN_NOT_OK(s, "gdb not available");
+
+  // Otherwise, try to use pstack or gstack.
+  for (const auto& p : { "pstack", "gstack" }) {
+    s = HasProgram(p);
+    if (s.ok()) {
+      return RunPstack(p, pid);
+    }
+    WARN_NOT_OK(s, Substitute("$0 not available", p));
+  }
+
+  return Status::ServiceUnavailable("Neither gdb, pstack, nor gstack appear to 
be installed.");
+}
+
+Status PstackWatcher::RunGdbStackDump(pid_t pid, int flags) {
+  // Command: gdb -quiet -batch -nx -ex cmd1 -ex cmd2 /proc/$PID/exe $PID
+  vector<string> argv;
+  argv.emplace_back("gdb");
+  // Don't print introductory version/copyright messages.
+  argv.emplace_back("-quiet");
+  // Exit after processing all of the commands below.
+  argv.emplace_back("-batch");
+  // Don't run commands from .gdbinit
+  argv.emplace_back("-nx");
+  argv.emplace_back("-ex");
+  argv.emplace_back("set print pretty on");
+  argv.emplace_back("-ex");
+  argv.emplace_back("info threads");
+  argv.emplace_back("-ex");
+  argv.emplace_back("thread apply all bt");
+  if (flags & DUMP_FULL) {
+    argv.emplace_back("-ex");
+    argv.emplace_back("thread apply all bt full");
+  }
+  string executable;
+  Env* env = Env::Default();
+  RETURN_NOT_OK(env->GetExecutablePath(&executable));
+  argv.push_back(executable);
+  argv.push_back(Substitute("$0", pid));
+  return RunStackDump(argv);
+}
+
+Status PstackWatcher::RunPstack(const std::string& progname, pid_t pid) {
+  string pid_string(Substitute("$0", pid));
+  vector<string> argv;
+  argv.push_back(progname);
+  argv.push_back(pid_string);
+  return RunStackDump(argv);
+}
+
+Status PstackWatcher::RunStackDump(const vector<string>& argv) {
+  printf("************************ BEGIN STACKS **************************\n");
+  if (fflush(stdout) == EOF) {
+    return Status::IOError("Unable to flush stdout", ErrnoToString(errno), 
errno);
+  }
+  Subprocess pstack_proc(argv);
+  RETURN_NOT_OK_PREPEND(pstack_proc.Start(), "RunStackDump proc.Start() 
failed");
+  int ret;
+  RETRY_ON_EINTR(ret, ::close(pstack_proc.ReleaseChildStdinFd()));
+  if (ret == -1) {
+    return Status::IOError("Unable to close child stdin", 
ErrnoToString(errno), errno);
+  }
+  RETURN_NOT_OK_PREPEND(pstack_proc.Wait(), "RunStackDump proc.Wait() failed");
+  int exit_code;
+  string exit_info;
+  RETURN_NOT_OK_PREPEND(pstack_proc.GetExitStatus(&exit_code, &exit_info),
+                        "RunStackDump proc.GetExitStatus() failed");
+  if (exit_code != 0) {
+    return Status::RuntimeError("RunStackDump proc.Wait() error", exit_info);
+  }
+  printf("************************* END STACKS ***************************\n");
+  if (fflush(stdout) == EOF) {
+    return Status::IOError("Unable to flush stdout", ErrnoToString(errno), 
errno);
+  }
+
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pstack_watcher.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher.h 
b/be/src/kudu/util/pstack_watcher.h
new file mode 100644
index 0000000..882e6d2
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PSTACK_WATCHER_H
+#define KUDU_UTIL_PSTACK_WATCHER_H
+
+#include <sys/types.h>
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Thread;
+
+// PstackWatcher is an object which will pstack the current process and print
+// the results to stdout.  It does this after a certain timeout has occured.
+class PstackWatcher {
+ public:
+
+  enum Flags {
+    NO_FLAGS = 0,
+
+    // Run 'thread apply all bt full', which is very verbose output
+    DUMP_FULL = 1
+  };
+
+  // Static method to collect and write stack dump output to stdout of the 
current
+  // process.
+  static Status DumpStacks(int flags = NO_FLAGS);
+
+  // Like the above but for any process, not just the current one.
+  static Status DumpPidStacks(pid_t pid, int flags = NO_FLAGS);
+
+  // Instantiate a watcher that writes a pstack to stdout after the given
+  // timeout expires.
+  explicit PstackWatcher(MonoDelta timeout);
+
+  ~PstackWatcher();
+
+  // Shut down the watcher and do not log a pstack.
+  // This method is not thread-safe.
+  void Shutdown();
+
+  // Test whether the watcher is still running or has shut down.
+  // Thread-safe.
+  bool IsRunning() const;
+
+  // Wait until the timeout expires and the watcher logs a pstack.
+  // Thread-safe.
+  void Wait() const;
+
+ private:
+  // Test for the existence of the given program in the system path.
+  static Status HasProgram(const char* progname);
+
+  // Check whether the system path has 'gdb' and whether it is modern enough
+  // for safe stack dump usage.
+  static Status HasGoodGdb();
+
+  // Get a stack dump using GDB directly.
+  static Status RunGdbStackDump(pid_t pid, int flags);
+
+  // Get a stack dump using the pstack or gstack program.
+  static Status RunPstack(const std::string& progname, pid_t pid);
+
+  // Invoke and wait for the stack dump program.
+  static Status RunStackDump(const std::vector<std::string>& argv);
+
+  // Run the thread that waits for the specified duration before logging a
+  // pstack.
+  void Run();
+
+  const MonoDelta timeout_;
+  bool running_;
+  scoped_refptr<Thread> thread_;
+  mutable Mutex lock_;
+  mutable ConditionVariable cond_;
+};
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random-test.cc b/be/src/kudu/util/random-test.cc
new file mode 100644
index 0000000..26c7ab5
--- /dev/null
+++ b/be/src/kudu/util/random-test.cc
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cmath>
+#include <cstdint>
+#include <limits>
+#include <unordered_set>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
+
+using std::numeric_limits;
+using std::unordered_set;
+using std::vector;
+
+namespace kudu {
+
+class RandomTest : public KuduTest {
+ public:
+  RandomTest()
+      : rng_(SeedRandom()) {
+  }
+
+ protected:
+  Random rng_;
+};
+
+// Tests that after a certain number of invocations of Normal(), the
+// actual mean of all samples is within the specified standard
+// deviation of the target mean.
+TEST_F(RandomTest, TestNormalDist) {
+  const double kMean = 5.0;
+  const double kStdDev = 0.01;
+  const int kNumIters = 100000;
+
+  double sum = 0.0;
+  for (int i = 0; i < kNumIters; ++i) {
+    sum += rng_.Normal(kMean, kStdDev);
+  }
+
+  ASSERT_LE(fabs((sum / static_cast<double>(kNumIters)) - kMean), kStdDev);
+}
+
+// Tests that after a large number of invocations of Next32() and Next64(), we
+// have flipped all the bits we claim we should have.
+//
+// This is a regression test for a bug where we were incorrectly bit-shifting
+// in Next64().
+//
+// Note: Our RNG actually only generates 31 bits of randomness for 32 bit
+// integers. If all bits need to be randomized, callers must use 
Random::Next64().
+// This test reflects that, and if  we change the RNG algo this test should 
also change.
+TEST_F(RandomTest, TestUseOfBits) {
+  // For Next32():
+  uint32_t ones32 = numeric_limits<uint32_t>::max();
+  uint32_t zeroes32 = 0;
+  // For Next64():
+  uint64_t ones64 = numeric_limits<uint64_t>::max();
+  uint64_t zeroes64 = 0;
+
+  for (int i = 0; i < 10000000; i++) {
+    uint32_t r32 = rng_.Next32();
+    ones32 &= r32;
+    zeroes32 |= r32;
+
+    uint64_t r64 = rng_.Next64();
+    ones64 &= r64;
+    zeroes64 |= r64;
+  }
+
+  // At the end, we should have flipped 31 and 64 bits, respectively. One
+  // detail of the current RNG impl is that Next32() always returns a number
+  // with MSB set to 0.
+  uint32_t expected_bits_31 = numeric_limits<uint32_t>::max() >> 1;
+  uint64_t expected_bits_64 = numeric_limits<uint64_t>::max();
+
+  ASSERT_EQ(0, ones32);
+  ASSERT_EQ(expected_bits_31, zeroes32);
+  ASSERT_EQ(0, ones64);
+  ASSERT_EQ(expected_bits_64, zeroes64);
+}
+
+TEST_F(RandomTest, TestResetSeed) {
+  rng_.Reset(1);
+  uint64_t first = rng_.Next64();
+  rng_.Reset(1);
+  uint64_t second = rng_.Next64();
+  ASSERT_EQ(first, second);
+}
+
+TEST_F(RandomTest, TestReservoirSample) {
+  // Use a constant seed to avoid flakiness.
+  rng_.Reset(12345);
+
+  vector<int> population;
+  for (int i = 0; i < 100; i++) {
+    population.push_back(i);
+  }
+
+  // Run 1000 trials selecting 5 elements.
+  vector<int> results;
+  vector<int> counts(population.size());
+  unordered_set<int> avoid;
+  for (int trial = 0; trial < 1000; trial++) {
+    rng_.ReservoirSample(population, 5, avoid, &results);
+    for (int result : results) {
+      counts[result]++;
+    }
+  }
+
+  // We expect each element to be selected
+  // 50 times on average, but since it's random, it won't be exact.
+  // However, since we use a constant seed, this test won't be flaky.
+  for (int count : counts) {
+    ASSERT_GE(count, 25);
+    ASSERT_LE(count, 75);
+  }
+
+  // Run again, but avoid some particular entries.
+  avoid.insert(3);
+  avoid.insert(10);
+  avoid.insert(20);
+  counts.assign(100, 0);
+  for (int trial = 0; trial < 1000; trial++) {
+    rng_.ReservoirSample(population, 5, avoid, &results);
+    for (int result : results) {
+      counts[result]++;
+    }
+  }
+
+  // Ensure that we didn't ever pick the avoided elements.
+  ASSERT_EQ(0, counts[3]);
+  ASSERT_EQ(0, counts[10]);
+  ASSERT_EQ(0, counts[20]);
+}
+
+TEST_F(RandomTest, TestReservoirSamplePopulationTooSmall) {
+  vector<int> population;
+  for (int i = 0; i < 10; i++) {
+    population.push_back(i);
+  }
+
+  vector<int> results;
+  unordered_set<int> avoid;
+  rng_.ReservoirSample(population, 20, avoid, &results);
+  ASSERT_EQ(population.size(), results.size());
+  ASSERT_EQ(population, results);
+
+  rng_.ReservoirSample(population, 10, avoid, &results);
+  ASSERT_EQ(population.size(), results.size());
+  ASSERT_EQ(population, results);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random.h b/be/src/kudu/util/random.h
new file mode 100644
index 0000000..e31e475
--- /dev/null
+++ b/be/src/kudu/util/random.h
@@ -0,0 +1,252 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#ifndef KUDU_UTIL_RANDOM_H_
+#define KUDU_UTIL_RANDOM_H_
+
+#include <cmath>
+#include <cstdint>
+#include <mutex>
+#include <random>
+#include <vector>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+namespace random_internal {
+
+static const uint32_t M = 2147483647L;   // 2^31-1
+
+} // namespace random_internal
+
+template<class R>
+class StdUniformRNG;
+
+// A very simple random number generator.  Not especially good at
+// generating truly random bits, but good enough for our needs in this
+// package. This implementation is not thread-safe.
+class Random {
+ private:
+  uint32_t seed_;
+ public:
+  explicit Random(uint32_t s) {
+    Reset(s);
+  }
+
+  // Reset the RNG to the given seed value.
+  void Reset(uint32_t s) {
+    seed_ = s & 0x7fffffffu;
+    // Avoid bad seeds.
+    if (seed_ == 0 || seed_ == random_internal::M) {
+      seed_ = 1;
+    }
+  }
+
+  // Next pseudo-random 32-bit unsigned integer.
+  // FIXME: This currently only generates 31 bits of randomness.
+  // The MSB will always be zero.
+  uint32_t Next() {
+    static const uint64_t A = 16807;  // bits 14, 8, 7, 5, 2, 1, 0
+    // We are computing
+    //       seed_ = (seed_ * A) % M,    where M = 2^31-1
+    //
+    // seed_ must not be zero or M, or else all subsequent computed values
+    // will be zero or M respectively.  For all other values, seed_ will end
+    // up cycling through every number in [1,M-1]
+    uint64_t product = seed_ * A;
+
+    // Compute (product % M) using the fact that ((x << 31) % M) == x.
+    seed_ = static_cast<uint32_t>((product >> 31) + (product & 
random_internal::M));
+    // The first reduction may overflow by 1 bit, so we may need to
+    // repeat.  mod == M is not possible; using > allows the faster
+    // sign-bit-based test.
+    if (seed_ > random_internal::M) {
+      seed_ -= random_internal::M;
+    }
+    return seed_;
+  }
+
+  // Alias for consistency with Next64
+  uint32_t Next32() { return Next(); }
+
+  // Next pseudo-random 64-bit unsigned integer.
+  uint64_t Next64() {
+    uint64_t large = Next();
+    large <<= 31;
+    large |= Next();
+    // Fill in the highest two MSBs.
+    large |= implicit_cast<uint64_t>(Next32()) << 62;
+    return large;
+  }
+
+  // Returns a uniformly distributed value in the range [0..n-1]
+  // REQUIRES: n > 0
+  uint32_t Uniform(uint32_t n) { return Next() % n; }
+
+  // Alias for consistency with Uniform64
+  uint32_t Uniform32(uint32_t n) { return Uniform(n); }
+
+  // Returns a uniformly distributed 64-bit value in the range [0..n-1]
+  // REQUIRES: n > 0
+  uint64_t Uniform64(uint64_t n) { return Next64() % n; }
+
+  // Randomly returns true ~"1/n" of the time, and false otherwise.
+  // REQUIRES: n > 0
+  bool OneIn(int n) { return (Next() % n) == 0; }
+
+  // Skewed: pick "base" uniformly from range [0,max_log] and then
+  // return "base" random bits.  The effect is to pick a number in the
+  // range [0,2^max_log-1] with exponential bias towards smaller numbers.
+  uint32_t Skewed(int max_log) {
+    return Uniform(1 << Uniform(max_log + 1));
+  }
+
+  // Samples a random number from the given normal distribution.
+  double Normal(double mean, double std_dev);
+
+  // Return a random number between 0.0 and 1.0 inclusive.
+  double NextDoubleFraction() {
+    return Next() / static_cast<double>(random_internal::M + 1.0);
+  }
+
+  // Sample 'k' random elements from the collection 'c' into 'result', taking 
care not to sample any
+  // elements that are already present in 'avoid'.
+  //
+  // In the case that 'c' has fewer than 'k' elements then all elements in 'c' 
will be selected.
+  //
+  // 'c' should be an iterable STL collection such as a vector, set, or list.
+  // 'avoid' should be an STL-compatible set.
+  //
+  // The results are not stored in a randomized order: the order of results 
will
+  // match their order in the input collection.
+  template<class Collection, class Set, class T>
+  void ReservoirSample(const Collection& c, int k, const Set& avoid,
+                       std::vector<T>* result) {
+    result->clear();
+    result->reserve(k);
+    int i = 0;
+    for (const T& elem : c) {
+      if (ContainsKey(avoid, elem)) {
+        continue;
+      }
+      i++;
+      // Fill the reservoir if there is available space.
+      if (result->size() < k) {
+        result->push_back(elem);
+        continue;
+      }
+      // Otherwise replace existing elements with decreasing probability.
+      int j = Uniform(i);
+      if (j < k) {
+        (*result)[j] = elem;
+      }
+    }
+  }
+};
+
+// Thread-safe wrapper around Random.
+class ThreadSafeRandom {
+ public:
+  explicit ThreadSafeRandom(uint32_t s)
+      : random_(s) {
+  }
+
+  void Reset(uint32_t s) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    random_.Reset(s);
+  }
+
+  uint32_t Next() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Next();
+  }
+
+  uint32_t Next32() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Next32();
+  }
+
+  uint64_t Next64() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Next64();
+  }
+
+  uint32_t Uniform(uint32_t n) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Uniform(n);
+  }
+
+  uint32_t Uniform32(uint32_t n) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Uniform32(n);
+  }
+
+  uint64_t Uniform64(uint64_t n) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Uniform64(n);
+  }
+
+  bool OneIn(int n) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.OneIn(n);
+  }
+
+  uint32_t Skewed(int max_log) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Skewed(max_log);
+  }
+
+  double Normal(double mean, double std_dev) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.Normal(mean, std_dev);
+  }
+
+  double NextDoubleFraction() {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return random_.NextDoubleFraction();
+  }
+
+  template<class Collection, class Set, class T>
+  void ReservoirSample(const Collection& c, int k, const Set& avoid,
+                       std::vector<T>* result) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    random_.ReservoirSample(c, k, avoid, result);
+  }
+
+ private:
+  simple_spinlock lock_;
+  Random random_;
+};
+
+// Wraps either Random or ThreadSafeRandom as a C++ standard library
+// compliant UniformRandomNumberGenerator:
+//   http://en.cppreference.com/w/cpp/concept/UniformRandomNumberGenerator
+template<class R>
+class StdUniformRNG {
+ public:
+  typedef uint32_t result_type;
+
+  explicit StdUniformRNG(R* r) : r_(r) {}
+  uint32_t operator()() {
+    return r_->Next32();
+  }
+  constexpr static uint32_t min() { return 0; }
+  constexpr static uint32_t max() { return (1L << 31) - 1; }
+
+ private:
+  R* r_;
+};
+
+// Defined outside the class to make use of StdUniformRNG above.
+inline double Random::Normal(double mean, double std_dev) {
+  std::normal_distribution<> nd(mean, std_dev);
+  StdUniformRNG<Random> gen(this);
+  return nd(gen);
+}
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_RANDOM_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util-test.cc 
b/be/src/kudu/util/random_util-test.cc
new file mode 100644
index 0000000..993ef15
--- /dev/null
+++ b/be/src/kudu/util/random_util-test.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/random_util.h"
+
+#include <cstring>
+#include <ostream>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class RandomUtilTest : public KuduTest {
+ protected:
+  RandomUtilTest() : rng_(SeedRandom()) {}
+
+  Random rng_;
+
+  static const int kLenMax = 100;
+  static const int kNumTrials = 100;
+};
+
+namespace {
+
+// Checks string defined at start is set to \0 everywhere but [from, to)
+void CheckEmpty(char* start, int from, int to, int stop) {
+  DCHECK_LE(0, from);
+  DCHECK_LE(from, to);
+  DCHECK_LE(to, stop);
+  for (int j = 0; (j == from ? j = to : j) < stop; ++j) {
+    CHECK_EQ(start[j], '\0') << "Index " << j << " not null after defining"
+                             << "indices [" << from << "," << to << ") of "
+                             << "a nulled string [0," << stop << ").";
+  }
+}
+
+} // anonymous namespace
+
+// Makes sure that RandomString only writes the specified amount
+TEST_F(RandomUtilTest, TestRandomString) {
+  char start[kLenMax];
+
+  for (int i = 0; i < kNumTrials; ++i) {
+    memset(start, '\0', kLenMax);
+    int to = rng_.Uniform(kLenMax + 1);
+    int from = rng_.Uniform(to + 1);
+    RandomString(start + from, to - from, &rng_);
+    CheckEmpty(start, from, to, kLenMax);
+  }
+
+  // Corner case
+  memset(start, '\0', kLenMax);
+  RandomString(start, 0, &rng_);
+  CheckEmpty(start, 0, 0, kLenMax);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util.cc b/be/src/kudu/util/random_util.cc
new file mode 100644
index 0000000..fa7ef12
--- /dev/null
+++ b/be/src/kudu/util/random_util.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/random_util.h"
+
+#include <unistd.h>
+
+#include <cstdlib>
+#include <cstring>
+#include <string>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/random.h"
+
+using std::string;
+
+namespace kudu {
+
+void RandomString(void* dest, size_t n, Random* rng) {
+  size_t i = 0;
+  uint32_t random = rng->Next();
+  char* cdest = static_cast<char*>(dest);
+  static const size_t sz = sizeof(random);
+  if (n >= sz) {
+    for (i = 0; i <= n - sz; i += sz) {
+      memcpy(&cdest[i], &random, sizeof(random));
+      random = rng->Next();
+    }
+  }
+  memcpy(cdest + i, &random, n - i);
+}
+
+string RandomString(size_t n, Random* rng) {
+  faststring s;
+  s.resize(n);
+  RandomString(s.data(), n, rng);
+  return s.ToString();
+}
+
+ATTRIBUTE_NO_SANITIZE_INTEGER
+uint32_t GetRandomSeed32() {
+  uint32_t seed = static_cast<uint32_t>(GetCurrentTimeMicros());
+  seed *= getpid();
+  seed *= Env::Default()->gettid();
+  return seed;
+}
+
+} // namespace kudu

Reply via email to