http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.cc 
b/be/src/kudu/util/maintenance_manager.cc
new file mode 100644
index 0000000..18ed8b8
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.cc
@@ -0,0 +1,503 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/maintenance_manager.h"
+
+#include <gflags/gflags.h>
+#include <memory>
+#include <stdint.h>
+#include <string>
+#include <utility>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/debug/trace_logging.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/process_memory.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+
+using std::pair;
+using std::shared_ptr;
+using strings::Substitute;
+
+DEFINE_int32(maintenance_manager_num_threads, 1,
+             "Size of the maintenance manager thread pool. "
+             "For spinning disks, the number of threads should "
+             "not be above the number of devices.");
+TAG_FLAG(maintenance_manager_num_threads, stable);
+
+DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
+       "Polling interval for the maintenance manager scheduler, "
+       "in milliseconds.");
+TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
+
+DEFINE_int32(maintenance_manager_history_size, 8,
+       "Number of completed operations the manager is keeping track of.");
+TAG_FLAG(maintenance_manager_history_size, hidden);
+
+DEFINE_bool(enable_maintenance_manager, true,
+       "Enable the maintenance manager, runs compaction and tablet cleaning 
tasks.");
+TAG_FLAG(enable_maintenance_manager, unsafe);
+
+DEFINE_int64(log_target_replay_size_mb, 1024,
+             "The target maximum size of logs to be replayed at startup. If a 
tablet "
+             "has in-memory operations that are causing more than this size of 
logs "
+             "to be retained, then the maintenance manager will prioritize 
flushing "
+             "these operations to disk.");
+TAG_FLAG(log_target_replay_size_mb, experimental);
+
+DEFINE_int64(data_gc_min_size_mb, 0,
+             "The (exclusive) minimum number of megabytes of ancient data on "
+             "disk, per tablet, needed to prioritize deletion of that data.");
+TAG_FLAG(data_gc_min_size_mb, experimental);
+
+DEFINE_double(data_gc_prioritization_prob, 0.5,
+             "The probability that we will prioritize data GC over performance 
"
+             "improvement operations. If set to 1.0, we will always prefer to "
+             "delete old data before running performance improvement 
operations "
+             "such as delta compaction.");
+TAG_FLAG(data_gc_prioritization_prob, experimental);
+
+namespace kudu {
+
+MaintenanceOpStats::MaintenanceOpStats() {
+  Clear();
+}
+
+void MaintenanceOpStats::Clear() {
+  valid_ = false;
+  runnable_ = false;
+  ram_anchored_ = 0;
+  logs_retained_bytes_ = 0;
+  data_retained_bytes_ = 0;
+  perf_improvement_ = 0;
+  last_modified_ = MonoTime();
+}
+
+MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
+    : name_(std::move(name)),
+      running_(0),
+      cancel_(false),
+      io_usage_(io_usage) {
+}
+
+MaintenanceOp::~MaintenanceOp() {
+  CHECK(!manager_.get()) << "You must unregister the " << name_
+         << " Op before destroying it.";
+}
+
+void MaintenanceOp::Unregister() {
+  CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
+  manager_->UnregisterOp(this);
+}
+
+const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
+  .num_threads = 0,
+  .polling_interval_ms = 0,
+  .history_size = 0,
+};
+
+MaintenanceManager::MaintenanceManager(const Options& options)
+  : num_threads_(options.num_threads <= 0 ?
+      FLAGS_maintenance_manager_num_threads : options.num_threads),
+    cond_(&lock_),
+    shutdown_(false),
+    running_ops_(0),
+    polling_interval_ms_(options.polling_interval_ms <= 0 ?
+          FLAGS_maintenance_manager_polling_interval_ms :
+          options.polling_interval_ms),
+    completed_ops_count_(0),
+    rand_(GetRandomSeed32()),
+    memory_pressure_func_(&process_memory::UnderMemoryPressure) {
+  CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
+               .set_max_threads(num_threads_).Build(&thread_pool_));
+  uint32_t history_size = options.history_size == 0 ?
+                          FLAGS_maintenance_manager_history_size :
+                          options.history_size;
+  completed_ops_.resize(history_size);
+}
+
+MaintenanceManager::~MaintenanceManager() {
+  Shutdown();
+}
+
+Status MaintenanceManager::Init(string server_uuid) {
+  server_uuid_ = std::move(server_uuid);
+  RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
+      boost::bind(&MaintenanceManager::RunSchedulerThread, this),
+      &monitor_thread_));
+  return Status::OK();
+}
+
+void MaintenanceManager::Shutdown() {
+  {
+    std::lock_guard<Mutex> guard(lock_);
+    if (shutdown_) {
+      return;
+    }
+    shutdown_ = true;
+    cond_.Broadcast();
+  }
+  if (monitor_thread_.get()) {
+    CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
+    monitor_thread_.reset();
+    thread_pool_->Shutdown();
+  }
+}
+
+void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
+  CHECK(op);
+  std::lock_guard<Mutex> guard(lock_);
+  CHECK(!op->manager_) << "Tried to register " << op->name()
+          << ", but it was already registered.";
+  pair<OpMapTy::iterator, bool> val
+    (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
+  CHECK(val.second)
+      << "Tried to register " << op->name()
+      << ", but it already exists in ops_.";
+  op->manager_ = shared_from_this();
+  op->cond_.reset(new ConditionVariable(&lock_));
+  VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Registered " << 
op->name();
+}
+
+void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
+  {
+    std::lock_guard<Mutex> guard(lock_);
+    CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
+          << ", but it is not currently registered with this maintenance 
manager.";
+    auto iter = ops_.find(op);
+    CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+        << ", but it was never registered";
+    // While the op is running, wait for it to be finished.
+    if (iter->first->running_ > 0) {
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Waiting for op " << 
op->name()
+                                       << " to finish so we can unregister 
it.";
+    }
+    op->CancelAndDisable();
+    while (iter->first->running_ > 0) {
+      op->cond_->Wait();
+      iter = ops_.find(op);
+      CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+          << ", but another thread unregistered it while we were "
+          << "waiting for it to complete";
+    }
+    ops_.erase(iter);
+  }
+  LOG_WITH_PREFIX(INFO) << "Unregistered op " << op->name();
+  op->cond_.reset();
+  // Remove the op's shared_ptr reference to us.  This might 'delete this'.
+  op->manager_.reset();
+}
+
+void MaintenanceManager::RunSchedulerThread() {
+  if (!FLAGS_enable_maintenance_manager) {
+    LOG(INFO) << "Maintenance manager is disabled. Stopping thread.";
+    return;
+  }
+
+  MonoDelta polling_interval = 
MonoDelta::FromMilliseconds(polling_interval_ms_);
+
+  std::unique_lock<Mutex> guard(lock_);
+
+  // Set to true if the scheduler runs and finds that there is no work to do.
+  bool prev_iter_found_no_work = false;
+
+  while (true) {
+    // We'll keep sleeping if:
+    //    1) there are no free threads available to perform a maintenance op.
+    // or 2) we just tried to schedule an op but found nothing to run.
+    // However, if it's time to shut down, we want to do so immediately.
+    while ((running_ops_ >= num_threads_ || prev_iter_found_no_work) && 
!shutdown_) {
+      cond_.TimedWait(polling_interval);
+      prev_iter_found_no_work = false;
+    }
+    if (shutdown_) {
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Shutting down 
maintenance manager.";
+      return;
+    }
+
+    // Find the best op.
+    MaintenanceOp* op = FindBestOp();
+    // If we found no work to do, then we should sleep before trying again to 
schedule.
+    // Otherwise, we can go right into trying to find the next op.
+    prev_iter_found_no_work = (op == nullptr);
+    if (!op) {
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix()
+                                       << "No maintenance operations look 
worth doing.";
+      continue;
+    }
+
+    // Prepare the maintenance operation.
+    op->running_++;
+    running_ops_++;
+    guard.unlock();
+    bool ready = op->Prepare();
+    guard.lock();
+    if (!ready) {
+      LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
+                            << ".  Re-running scheduler.";
+      op->running_--;
+      op->cond_->Signal();
+      continue;
+    }
+
+    // Run the maintenance operation.
+    Status s = thread_pool_->SubmitFunc(boost::bind(
+          &MaintenanceManager::LaunchOp, this, op));
+    CHECK(s.ok());
+  }
+}
+
+// Finding the best operation goes through four filters:
+// - If there's an Op that we can run quickly that frees log retention, we run 
it.
+// - If we've hit the overall process memory limit (note: this includes memory 
that the Ops cannot
+//   free), we run the Op with the highest RAM usage.
+// - If there are Ops that are retaining logs past our target replay size, we 
run the one that has
+//   the highest retention (and if many qualify, then we run the one that also 
frees up the
+//   most RAM).
+// - Finally, if there's nothing else that we really need to do, we run the Op 
that will improve
+//   performance the most.
+//
+// The reason it's done this way is that we want to prioritize limiting the 
amount of resources we
+// hold on to. Low IO Ops go first since we can quickly run them, then we can 
look at memory usage.
+// Reversing those can starve the low IO Ops when the system is under intense 
memory pressure.
+//
+// In the third priority we're at a point where nothing's urgent and there's 
nothing we can run
+// quickly.
+// TODO We currently optimize for freeing log retention but we could consider 
having some sort of
+// sliding priority between log retention and RAM usage. For example, is an Op 
that frees
+// 128MB of log retention and 12MB of RAM always better than an op that frees 
12MB of log retention
+// and 128MB of RAM? Maybe a more holistic approach would be better.
+MaintenanceOp* MaintenanceManager::FindBestOp() {
+  TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
+
+  size_t free_threads = num_threads_ - running_ops_;
+  if (free_threads == 0) {
+    VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
+                                     << "There are no free threads, so we 
can't run anything.";
+    return nullptr;
+  }
+
+  int64_t low_io_most_logs_retained_bytes = 0;
+  MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
+
+  uint64_t most_mem_anchored = 0;
+  MaintenanceOp* most_mem_anchored_op = nullptr;
+
+  int64_t most_logs_retained_bytes = 0;
+  int64_t most_logs_retained_bytes_ram_anchored = 0;
+  MaintenanceOp* most_logs_retained_bytes_op = nullptr;
+
+  int64_t most_data_retained_bytes = 0;
+  MaintenanceOp* most_data_retained_bytes_op = nullptr;
+
+  double best_perf_improvement = 0;
+  MaintenanceOp* best_perf_improvement_op = nullptr;
+  for (OpMapTy::value_type &val : ops_) {
+    MaintenanceOp* op(val.first);
+    MaintenanceOpStats& stats(val.second);
+    VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name();
+    // Update op stats.
+    stats.Clear();
+    op->UpdateStats(&stats);
+    if (op->cancelled() || !stats.valid() || !stats.runnable()) {
+      continue;
+    }
+    if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
+        op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
+      low_io_most_logs_retained_bytes_op = op;
+      low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() 
<< " can free "
+                                       << stats.logs_retained_bytes() << " 
bytes of logs";
+    }
+
+    if (stats.ram_anchored() > most_mem_anchored) {
+      most_mem_anchored_op = op;
+      most_mem_anchored = stats.ram_anchored();
+    }
+    // We prioritize ops that can free more logs, but when it's the same we 
pick the one that
+    // also frees up the most memory.
+    if (stats.logs_retained_bytes() > 0 &&
+        (stats.logs_retained_bytes() > most_logs_retained_bytes ||
+            (stats.logs_retained_bytes() == most_logs_retained_bytes &&
+                stats.ram_anchored() > 
most_logs_retained_bytes_ram_anchored))) {
+      most_logs_retained_bytes_op = op;
+      most_logs_retained_bytes = stats.logs_retained_bytes();
+      most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
+    }
+
+    if (stats.data_retained_bytes() > most_data_retained_bytes) {
+      most_data_retained_bytes_op = op;
+      most_data_retained_bytes = stats.data_retained_bytes();
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() 
<< " can free "
+                                       << stats.data_retained_bytes() << " 
bytes of data";
+    }
+
+    if ((!best_perf_improvement_op) ||
+        (stats.perf_improvement() > best_perf_improvement)) {
+      best_perf_improvement_op = op;
+      best_perf_improvement = stats.perf_improvement();
+    }
+  }
+
+  // Look at ops that we can run quickly that free up log retention.
+  if (low_io_most_logs_retained_bytes_op) {
+    if (low_io_most_logs_retained_bytes > 0) {
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
+                    << "Performing " << 
low_io_most_logs_retained_bytes_op->name() << ", "
+                    << "because it can free up more logs "
+                    << "at " << low_io_most_logs_retained_bytes
+                    << " bytes with a low IO cost";
+      return low_io_most_logs_retained_bytes_op;
+    }
+  }
+
+  // Look at free memory. If it is dangerously low, we must select something
+  // that frees memory-- the op with the most anchored memory.
+  double capacity_pct;
+  if (memory_pressure_func_(&capacity_pct)) {
+    if (!most_mem_anchored_op) {
+      string msg = StringPrintf("we have exceeded our soft memory limit "
+          "(current capacity is %.2f%%).  However, there are no ops currently "
+          "runnable which would free memory.", capacity_pct);
+      LOG_WITH_PREFIX(INFO) << msg;
+      return nullptr;
+    }
+    VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "We have exceeded our 
soft memory limit "
+            << "(current capacity is " << capacity_pct << "%).  Running the op 
"
+            << "which anchors the most memory: " << 
most_mem_anchored_op->name();
+    return most_mem_anchored_op;
+  }
+
+  if (most_logs_retained_bytes_op &&
+      most_logs_retained_bytes / 1024 / 1024 >= 
FLAGS_log_target_replay_size_mb) {
+    VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
+            << "Performing " << most_logs_retained_bytes_op->name() << ", "
+            << "because it can free up more logs (" << most_logs_retained_bytes
+            << " bytes)";
+    return most_logs_retained_bytes_op;
+  }
+
+  // Look at ops that we can run quickly that free up data on disk.
+  if (most_data_retained_bytes_op &&
+      most_data_retained_bytes > FLAGS_data_gc_min_size_mb * 1024 * 1024) {
+    if (!best_perf_improvement_op || best_perf_improvement <= 0 ||
+        rand_.NextDoubleFraction() <= FLAGS_data_gc_prioritization_prob) {
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
+                    << "Performing " << most_data_retained_bytes_op->name() << 
", "
+                    << "because it can free up more data "
+                    << "at " << most_data_retained_bytes << " bytes";
+      return most_data_retained_bytes_op;
+    }
+    VLOG(1) << "Skipping data GC due to prioritizing perf improvement";
+  }
+
+  if (best_perf_improvement_op && best_perf_improvement > 0) {
+    VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Performing "
+                << best_perf_improvement_op->name() << ", "
+                << "because it had the best perf_improvement score, "
+                << "at " << best_perf_improvement;
+    return best_perf_improvement_op;
+  }
+  return nullptr;
+}
+
+void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
+  MonoTime start_time = MonoTime::Now();
+  op->RunningGauge()->Increment();
+
+  scoped_refptr<Trace> trace(new Trace);
+  LOG_TIMING(INFO, Substitute("running $0", op->name())) {
+    ADOPT_TRACE(trace.get());
+    TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
+                 "name", op->name());
+    op->Perform();
+  }
+  LOG_WITH_PREFIX(INFO) << op->name() << " metrics: " << 
trace->MetricsAsJSON();
+
+  op->RunningGauge()->Decrement();
+  MonoDelta delta = MonoTime::Now() - start_time;
+
+  std::lock_guard<Mutex> l(lock_);
+  CompletedOp& completed_op = completed_ops_[completed_ops_count_ % 
completed_ops_.size()];
+  completed_op.name = op->name();
+  completed_op.duration = delta;
+  completed_op.start_mono_time = start_time;
+  completed_ops_count_++;
+
+  op->DurationHistogram()->Increment(delta.ToMilliseconds());
+
+  running_ops_--;
+  op->running_--;
+  op->cond_->Signal();
+  cond_.Signal(); // wake up scheduler
+}
+
+void 
MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* 
out_pb) {
+  DCHECK(out_pb != nullptr);
+  std::lock_guard<Mutex> guard(lock_);
+  MaintenanceOp* best_op = FindBestOp();
+  for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
+    MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = 
out_pb->add_registered_operations();
+    MaintenanceOp* op(val.first);
+    MaintenanceOpStats& stat(val.second);
+    op_pb->set_name(op->name());
+    op_pb->set_running(op->running());
+    if (stat.valid()) {
+      op_pb->set_runnable(stat.runnable());
+      op_pb->set_ram_anchored_bytes(stat.ram_anchored());
+      op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
+      op_pb->set_perf_improvement(stat.perf_improvement());
+    } else {
+      op_pb->set_runnable(false);
+      op_pb->set_ram_anchored_bytes(0);
+      op_pb->set_logs_retained_bytes(0);
+      op_pb->set_perf_improvement(0.0);
+    }
+
+    if (best_op == op) {
+      out_pb->mutable_best_op()->CopyFrom(*op_pb);
+    }
+  }
+
+  for (int n = 1; n <= completed_ops_.size(); n++) {
+    int i = completed_ops_count_ - n;
+    if (i < 0) break;
+    const auto& completed_op = completed_ops_[i % completed_ops_.size()];
+
+    if (!completed_op.name.empty()) {
+      MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = 
out_pb->add_completed_operations();
+      completed_pb->set_name(completed_op.name);
+      
completed_pb->set_duration_millis(completed_op.duration.ToMilliseconds());
+
+      MonoDelta 
delta(MonoTime::Now().GetDeltaSince(completed_op.start_mono_time));
+      completed_pb->set_secs_since_start(delta.ToSeconds());
+    }
+  }
+}
+
+std::string MaintenanceManager::LogPrefix() const {
+  return Substitute("P $0: ", server_uuid_);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.h 
b/be/src/kudu/util/maintenance_manager.h
new file mode 100644
index 0000000..6070e2d
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.h
@@ -0,0 +1,324 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/random.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+
+template<class T>
+class AtomicGauge;
+class Histogram;
+class MaintenanceManager;
+
+class MaintenanceOpStats {
+ public:
+  MaintenanceOpStats();
+
+  // Zero all stats. They are invalid until the first setter is called.
+  void Clear();
+
+  bool runnable() const {
+    DCHECK(valid_);
+    return runnable_;
+  }
+
+  void set_runnable(bool runnable) {
+    UpdateLastModified();
+    runnable_ = runnable;
+  }
+
+  uint64_t ram_anchored() const {
+    DCHECK(valid_);
+    return ram_anchored_;
+  }
+
+  void set_ram_anchored(uint64_t ram_anchored) {
+    UpdateLastModified();
+    ram_anchored_ = ram_anchored;
+  }
+
+  int64_t logs_retained_bytes() const {
+    DCHECK(valid_);
+    return logs_retained_bytes_;
+  }
+
+  void set_logs_retained_bytes(int64_t logs_retained_bytes) {
+    UpdateLastModified();
+    logs_retained_bytes_ = logs_retained_bytes;
+  }
+
+  int64_t data_retained_bytes() const {
+    DCHECK(valid_);
+    return data_retained_bytes_;
+  }
+
+  void set_data_retained_bytes(int64_t data_retained_bytes) {
+    UpdateLastModified();
+    data_retained_bytes_ = data_retained_bytes;
+  }
+
+  double perf_improvement() const {
+    DCHECK(valid_);
+    return perf_improvement_;
+  }
+
+  void set_perf_improvement(double perf_improvement) {
+    UpdateLastModified();
+    perf_improvement_ = perf_improvement;
+  }
+
+  const MonoTime& last_modified() const {
+    DCHECK(valid_);
+    return last_modified_;
+  }
+
+  bool valid() const {
+    return valid_;
+  }
+
+ private:
+  void UpdateLastModified() {
+    valid_ = true;
+    last_modified_ = MonoTime::Now();
+  }
+
+  // Important: Update Clear() when adding fields to this class.
+
+  // True if these stats are valid.
+  bool valid_;
+
+  // True if this op can be run now.
+  bool runnable_;
+
+  // The approximate amount of memory that not doing this operation keeps
+  // around.  This number is used to decide when to start freeing memory, so it
+  // should be fairly accurate.  May be 0.
+  uint64_t ram_anchored_;
+
+  // Approximate amount of disk space in WAL files that would be freed if this
+  // operation ran. May be 0.
+  int64_t logs_retained_bytes_;
+
+  // Approximate amount of disk space in data blocks that would be freed if
+  // this operation ran. May be 0.
+  int64_t data_retained_bytes_;
+
+  // The estimated performance improvement-- how good it is to do this on some
+  // absolute scale (yet TBD).
+  double perf_improvement_;
+
+  // The last time that the stats were modified.
+  MonoTime last_modified_;
+};
+
+// MaintenanceOp objects represent background operations that the
+// MaintenanceManager can schedule.  Once a MaintenanceOp is registered, the
+// manager will periodically poll it for statistics.  The registrant is
+// responsible for managing the memory associated with the MaintenanceOp 
object.
+// Op objects should be unregistered before being de-allocated.
+class MaintenanceOp {
+ public:
+  friend class MaintenanceManager;
+
+  // General indicator of how much IO the Op will use.
+  enum IOUsage {
+    LOW_IO_USAGE, // Low impact operations like removing a file, updating 
metadata.
+    HIGH_IO_USAGE // Everything else.
+  };
+
+  explicit MaintenanceOp(std::string name, IOUsage io_usage);
+  virtual ~MaintenanceOp();
+
+  // Unregister this op, if it is currently registered.
+  void Unregister();
+
+  // Update the op statistics.  This will be called every scheduling period
+  // (about a few times a second), so it should not be too expensive.  It's
+  // possible for the returned statistics to be invalid; the caller should
+  // call MaintenanceOpStats::valid() before using them.  This will be run
+  // under the MaintenanceManager lock.
+  virtual void UpdateStats(MaintenanceOpStats* stats) = 0;
+
+  // Prepare to perform the operation.  This will be run without holding the
+  // maintenance manager lock.  It should be short, since it is run from the
+  // context of the maintenance op scheduler thread rather than a worker 
thread.
+  // If this returns false, we will abort the operation.
+  virtual bool Prepare() = 0;
+
+  // Perform the operation.  This will be run without holding the maintenance
+  // manager lock, and may take a long time.
+  virtual void Perform() = 0;
+
+  // Returns the histogram for this op that tracks duration. Cannot be NULL.
+  virtual scoped_refptr<Histogram> DurationHistogram() const = 0;
+
+  // Returns the gauge for this op that tracks when this op is running. Cannot 
be NULL.
+  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0;
+
+  uint32_t running() { return running_; }
+
+  std::string name() const { return name_; }
+
+  IOUsage io_usage() const { return io_usage_; }
+
+  // Return true if the operation has been cancelled due to Unregister() 
pending.
+  bool cancelled() const {
+    return cancel_.Load();
+  }
+
+  // Cancel this operation, which prevents new instances of it from being 
scheduled
+  // regardless of whether the statistics indicate it is runnable. Instances 
may also
+  // optionally poll 'cancelled()' on a periodic basis to know if they should 
abort a
+  // lengthy operation in the middle of Perform().
+  void CancelAndDisable() {
+    cancel_.Store(true);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
+
+  // The name of the operation.  Op names must be unique.
+  const std::string name_;
+
+  // The number of times that this op is currently running.
+  uint32_t running_;
+
+  // Set when we are trying to unregister the maintenance operation.
+  // Ongoing operations could read this boolean and cancel themselves.
+  // New operations will not be scheduled when this boolean is set.
+  AtomicBool cancel_;
+
+  // Condition variable which the UnregisterOp function can wait on.
+  //
+  // Note: 'cond_' is used with the MaintenanceManager's mutex. As such,
+  // it only exists when the op is registered.
+  gscoped_ptr<ConditionVariable> cond_;
+
+  // The MaintenanceManager with which this op is registered, or null
+  // if it is not registered.
+  std::shared_ptr<MaintenanceManager> manager_;
+
+  IOUsage io_usage_;
+};
+
+struct MaintenanceOpComparator {
+  bool operator() (const MaintenanceOp* lhs,
+                   const MaintenanceOp* rhs) const {
+    return lhs->name().compare(rhs->name()) < 0;
+  }
+};
+
+// Holds the information regarding a recently completed operation.
+struct CompletedOp {
+  std::string name;
+  MonoDelta duration;
+  MonoTime start_mono_time;
+};
+
+// The MaintenanceManager manages the scheduling of background operations such
+// as flushes or compactions.  It runs these operations in the background, in a
+// thread pool.  It uses information provided in MaintenanceOpStats objects to
+// decide which operations, if any, to run.
+class MaintenanceManager : public 
std::enable_shared_from_this<MaintenanceManager> {
+ public:
+  struct Options {
+    int32_t num_threads;
+    int32_t polling_interval_ms;
+    uint32_t history_size;
+  };
+
+  explicit MaintenanceManager(const Options& options);
+  ~MaintenanceManager();
+
+  Status Init(std::string server_uuid);
+  void Shutdown();
+
+  // Register an op with the manager.
+  void RegisterOp(MaintenanceOp* op);
+
+  // Unregister an op with the manager.
+  // If the Op is currently running, it will not be interrupted.  However, this
+  // function will block until the Op is finished.
+  void UnregisterOp(MaintenanceOp* op);
+
+  void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb);
+
+  void set_memory_pressure_func_for_tests(std::function<bool(double*)> f) {
+    std::lock_guard<Mutex> guard(lock_);
+    memory_pressure_func_ = std::move(f);
+  }
+
+  static const Options DEFAULT_OPTIONS;
+
+ private:
+  FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
+  typedef std::map<MaintenanceOp*, MaintenanceOpStats,
+          MaintenanceOpComparator> OpMapTy;
+
+  void RunSchedulerThread();
+
+  // find the best op, or null if there is nothing we want to run
+  MaintenanceOp* FindBestOp();
+
+  void LaunchOp(MaintenanceOp* op);
+
+  std::string LogPrefix() const;
+
+  const int32_t num_threads_;
+  OpMapTy ops_; // registered operations
+  Mutex lock_;
+  scoped_refptr<kudu::Thread> monitor_thread_;
+  gscoped_ptr<ThreadPool> thread_pool_;
+  ConditionVariable cond_;
+  bool shutdown_;
+  uint64_t running_ops_;
+  int32_t polling_interval_ms_;
+  // Vector used as a circular buffer for recently completed ops. Elements 
need to be added at
+  // the completed_ops_count_ % the vector's size and then the count needs to 
be incremented.
+  std::vector<CompletedOp> completed_ops_;
+  int64_t completed_ops_count_;
+  std::string server_uuid_;
+  Random rand_;
+
+  // Function which should return true if the server is under global memory 
pressure.
+  // This is indirected for testing purposes.
+  std::function<bool(double*)> memory_pressure_func_;
+
+  DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/maintenance_manager.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.proto 
b/be/src/kudu/util/maintenance_manager.proto
new file mode 100644
index 0000000..75b0ab3
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.proto
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+// Used to present the maintenance manager's internal state.
+message MaintenanceManagerStatusPB {
+  message MaintenanceOpPB {
+    required string name = 1;
+    // Number of times this operation is currently running.
+    required uint32 running = 2;
+    required bool runnable = 3;
+    required uint64 ram_anchored_bytes = 4;
+    required int64 logs_retained_bytes = 5;
+    required double perf_improvement = 6;
+  }
+
+  message CompletedOpPB {
+    required string name = 1;
+    required int32 duration_millis = 2;
+    // Number of seconds since this operation started.
+    required int32 secs_since_start = 3;
+  }
+
+  // The next operation that would run.
+  optional MaintenanceOpPB best_op = 1;
+
+  // List of all the operations.
+  repeated MaintenanceOpPB registered_operations = 2;
+
+  // This list isn't in order of anything. Can contain the same operation 
mutiple times.
+  repeated CompletedOpPB completed_operations = 3;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/make_shared.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/make_shared.h b/be/src/kudu/util/make_shared.h
new file mode 100644
index 0000000..af254df
--- /dev/null
+++ b/be/src/kudu/util/make_shared.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_MAKE_SHARED_H_
+#define KUDU_UTIL_MAKE_SHARED_H_
+
+#include <memory>
+
+// It isn't possible to use std::make_shared() with a class that has private
+// constructors. Moreover, the standard workarounds are inelegant when said
+// class has non-default constructors. As such, we employ a simple solution:
+// declare the class as a friend to std::make_shared()'s internal allocator.
+// This approach is non-portable and must be implemented separately for each
+// supported STL implementation.
+//
+// Note: due to friendship restrictions on partial template specialization,
+// it isn't possible to befriend just the allocation function; the entire
+// allocator class must be befriended.
+//
+// See http://stackoverflow.com/q/8147027 for a longer discussion.
+
+#ifdef __GLIBCXX__
+  // In libstdc++, new_allocator is defined as a class (ext/new_allocator.h)
+  // but forward declared as a struct (ext/alloc_traits.h). Clang complains
+  // about this when -Wmismatched-tags is set, which gcc doesn't support
+  // (which probably explains why the discrepancy exists in the first place).
+  // We can temporarily disable this warning via pragmas [1], but we must
+  // not expose them to gcc due to its poor handling of the _Pragma() C99
+  // operator [2].
+  //
+  // 1. 
http://clang.llvm.org/docs/UsersManual.html#controlling-diagnostics-via-pragmas
+  // 2. https://gcc.gnu.org/bugzilla/show_bug.cgi?id=60875
+  #ifdef __clang__
+    #define ALLOW_MAKE_SHARED(T)                                \
+      _Pragma("clang diagnostic push")                          \
+      _Pragma("clang diagnostic ignored \"-Wmismatched-tags\"") \
+      friend class __gnu_cxx::new_allocator<T>                  \
+      _Pragma("clang diagnostic pop")
+  #else
+    #define ALLOW_MAKE_SHARED(T) \
+      friend class __gnu_cxx::new_allocator<T>
+  #endif
+#elif defined(_LIBCPP_VERSION)
+  #define ALLOW_MAKE_SHARED(T) \
+    friend class 
std::__1::__libcpp_compressed_pair_imp<std::__1::allocator<T>, T, 1>
+#else
+  #error "Need to implement ALLOW_MAKE_SHARED for your platform!"
+#endif
+
+#endif // KUDU_UTIL_MAKE_SHARED_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/malloc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/malloc.cc b/be/src/kudu/util/malloc.cc
new file mode 100644
index 0000000..3fec2db
--- /dev/null
+++ b/be/src/kudu/util/malloc.cc
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/malloc.h"
+
+#if defined(__linux__)
+#include <malloc.h>
+#else
+#include <malloc/malloc.h>
+#endif // defined(__linux__)
+
+namespace kudu {
+
+int64_t kudu_malloc_usable_size(const void* obj) {
+#if defined(__linux__)
+  return malloc_usable_size(const_cast<void*>(obj));
+#else
+  return malloc_size(obj);
+#endif // defined(__linux__)
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/malloc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/malloc.h b/be/src/kudu/util/malloc.h
new file mode 100644
index 0000000..e8a27c5
--- /dev/null
+++ b/be/src/kudu/util/malloc.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MALLOC_H
+#define KUDU_UTIL_MALLOC_H
+
+#include <stdint.h>
+
+namespace kudu {
+
+// Simple wrapper for malloc_usable_size().
+//
+// Really just centralizes the const_cast, as this function is often called
+// on const pointers (i.e. "this" in a const method).
+int64_t kudu_malloc_usable_size(const void* obj);
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_MALLOC_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/map-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/map-util-test.cc 
b/be/src/kudu/util/map-util-test.cc
new file mode 100644
index 0000000..1e818a2
--- /dev/null
+++ b/be/src/kudu/util/map-util-test.cc
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This unit test belongs in gutil, but it depends on test_main which is
+// part of util.
+#include "kudu/gutil/map-util.h"
+
+#include <gtest/gtest.h>
+#include <map>
+#include <memory>
+
+using std::map;
+using std::string;
+using std::shared_ptr;
+using std::unique_ptr;
+
+namespace kudu {
+
+TEST(FloorTest, TestMapUtil) {
+  map<int, int> my_map;
+
+  ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 5));
+
+  my_map[5] = 5;
+  ASSERT_EQ(5, *FindFloorOrNull(my_map, 6));
+  ASSERT_EQ(5, *FindFloorOrNull(my_map, 5));
+  ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 4));
+
+  my_map[1] = 1;
+  ASSERT_EQ(5, *FindFloorOrNull(my_map, 6));
+  ASSERT_EQ(5, *FindFloorOrNull(my_map, 5));
+  ASSERT_EQ(1, *FindFloorOrNull(my_map, 4));
+  ASSERT_EQ(1, *FindFloorOrNull(my_map, 1));
+  ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 0));
+}
+
+TEST(ComputeIfAbsentTest, TestComputeIfAbsent) {
+  map<string, string> my_map;
+  auto result = ComputeIfAbsent(&my_map, "key", []{ return "hello_world"; });
+  ASSERT_EQ(*result, "hello_world");
+  auto result2 = ComputeIfAbsent(&my_map, "key", [] { return "hello_world2"; 
});
+  ASSERT_EQ(*result2, "hello_world");
+}
+
+TEST(ComputeIfAbsentTest, TestComputeIfAbsentAndReturnAbsense) {
+  map<string, string> my_map;
+  auto result = ComputeIfAbsentReturnAbsense(&my_map, "key", []{ return 
"hello_world"; });
+  ASSERT_TRUE(result.second);
+  ASSERT_EQ(*result.first, "hello_world");
+  auto result2 = ComputeIfAbsentReturnAbsense(&my_map, "key", [] { return 
"hello_world2"; });
+  ASSERT_FALSE(result2.second);
+  ASSERT_EQ(*result2.first, "hello_world");
+}
+
+TEST(FindPointeeOrNullTest, TestFindPointeeOrNull) {
+  map<string, unique_ptr<string>> my_map;
+  auto iter = my_map.emplace("key", unique_ptr<string>(new 
string("hello_world")));
+  ASSERT_TRUE(iter.second);
+  string* value = FindPointeeOrNull(my_map, "key");
+  ASSERT_TRUE(value != nullptr);
+  ASSERT_EQ(*value, "hello_world");
+  my_map.erase(iter.first);
+  value = FindPointeeOrNull(my_map, "key");
+  ASSERT_TRUE(value == nullptr);
+}
+
+TEST(EraseKeyReturnValuePtrTest, TestRawAndSmartSmartPointers) {
+  map<string, unique_ptr<string>> my_map;
+  unique_ptr<string> value = EraseKeyReturnValuePtr(&my_map, "key");
+  ASSERT_TRUE(value.get() == nullptr);
+  my_map.emplace("key", unique_ptr<string>(new string("hello_world")));
+  value = EraseKeyReturnValuePtr(&my_map, "key");
+  ASSERT_EQ(*value, "hello_world");
+  value.reset();
+  value = EraseKeyReturnValuePtr(&my_map, "key");
+  ASSERT_TRUE(value.get() == nullptr);
+  map<string, shared_ptr<string>> my_map2;
+  shared_ptr<string> value2 = EraseKeyReturnValuePtr(&my_map2, "key");
+  ASSERT_TRUE(value2.get() == nullptr);
+  my_map2.emplace("key", shared_ptr<string>(new string("hello_world")));
+  value2 = EraseKeyReturnValuePtr(&my_map2, "key");
+  ASSERT_EQ(*value2, "hello_world");
+  map<string, string*> my_map_raw;
+  my_map_raw.emplace("key", new string("hello_world"));
+  value.reset(EraseKeyReturnValuePtr(&my_map_raw, "key"));
+  ASSERT_EQ(*value, "hello_world");
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mem_tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker-test.cc 
b/be/src/kudu/util/mem_tracker-test.cc
new file mode 100644
index 0000000..7e78cbe
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker-test.cc
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/mem_tracker.h"
+
+#include <atomic>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+using std::equal_to;
+using std::hash;
+using std::pair;
+using std::shared_ptr;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+TEST(MemTrackerTest, SingleTrackerNoLimit) {
+  shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t");
+  EXPECT_FALSE(t->has_limit());
+  t->Consume(10);
+  EXPECT_EQ(t->consumption(), 10);
+  t->Consume(10);
+  EXPECT_EQ(t->consumption(), 20);
+  t->Release(15);
+  EXPECT_EQ(t->consumption(), 5);
+  EXPECT_FALSE(t->LimitExceeded());
+  t->Release(5);
+  EXPECT_EQ(t->consumption(), 0);
+}
+
+TEST(MemTrackerTest, SingleTrackerWithLimit) {
+  shared_ptr<MemTracker> t = MemTracker::CreateTracker(11, "t");
+  EXPECT_TRUE(t->has_limit());
+  t->Consume(10);
+  EXPECT_EQ(t->consumption(), 10);
+  EXPECT_FALSE(t->LimitExceeded());
+  t->Consume(10);
+  EXPECT_EQ(t->consumption(), 20);
+  EXPECT_TRUE(t->LimitExceeded());
+  t->Release(15);
+  EXPECT_EQ(t->consumption(), 5);
+  EXPECT_FALSE(t->LimitExceeded());
+  t->Release(5);
+}
+
+TEST(MemTrackerTest, TrackerHierarchy) {
+  shared_ptr<MemTracker> p = MemTracker::CreateTracker(100, "p");
+  shared_ptr<MemTracker> c1 = MemTracker::CreateTracker(80, "c1", p);
+  shared_ptr<MemTracker> c2 = MemTracker::CreateTracker(50, "c2", p);
+
+  // everything below limits
+  c1->Consume(60);
+  EXPECT_EQ(c1->consumption(), 60);
+  EXPECT_FALSE(c1->LimitExceeded());
+  EXPECT_FALSE(c1->AnyLimitExceeded());
+  EXPECT_EQ(c2->consumption(), 0);
+  EXPECT_FALSE(c2->LimitExceeded());
+  EXPECT_FALSE(c2->AnyLimitExceeded());
+  EXPECT_EQ(p->consumption(), 60);
+  EXPECT_FALSE(p->LimitExceeded());
+  EXPECT_FALSE(p->AnyLimitExceeded());
+
+  // p goes over limit
+  c2->Consume(50);
+  EXPECT_EQ(c1->consumption(), 60);
+  EXPECT_FALSE(c1->LimitExceeded());
+  EXPECT_TRUE(c1->AnyLimitExceeded());
+  EXPECT_EQ(c2->consumption(), 50);
+  EXPECT_FALSE(c2->LimitExceeded());
+  EXPECT_TRUE(c2->AnyLimitExceeded());
+  EXPECT_EQ(p->consumption(), 110);
+  EXPECT_TRUE(p->LimitExceeded());
+
+  // c2 goes over limit, p drops below limit
+  c1->Release(20);
+  c2->Consume(10);
+  EXPECT_EQ(c1->consumption(), 40);
+  EXPECT_FALSE(c1->LimitExceeded());
+  EXPECT_FALSE(c1->AnyLimitExceeded());
+  EXPECT_EQ(c2->consumption(), 60);
+  EXPECT_TRUE(c2->LimitExceeded());
+  EXPECT_TRUE(c2->AnyLimitExceeded());
+  EXPECT_EQ(p->consumption(), 100);
+  EXPECT_FALSE(p->LimitExceeded());
+  c1->Release(40);
+  c2->Release(60);
+}
+
+class GcFunctionHelper {
+ public:
+  static const int NUM_RELEASE_BYTES = 1;
+
+  explicit GcFunctionHelper(MemTracker* tracker) : tracker_(tracker) { }
+
+  void GcFunc() { tracker_->Release(NUM_RELEASE_BYTES); }
+
+ private:
+  MemTracker* tracker_;
+};
+
+TEST(MemTrackerTest, STLContainerAllocator) {
+  shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t");
+  MemTrackerAllocator<int> vec_alloc(t);
+  MemTrackerAllocator<pair<const int, int>> map_alloc(t);
+
+  // Simple test: use the allocator in a vector.
+  {
+    vector<int, MemTrackerAllocator<int> > v(vec_alloc);
+    ASSERT_EQ(0, t->consumption());
+    v.reserve(5);
+    ASSERT_EQ(5 * sizeof(int), t->consumption());
+    v.reserve(10);
+    ASSERT_EQ(10 * sizeof(int), t->consumption());
+  }
+  ASSERT_EQ(0, t->consumption());
+
+  // Complex test: use it in an unordered_map, where it must be rebound in
+  // order to allocate the map's buckets.
+  {
+    unordered_map<int, int, hash<int>, equal_to<int>, 
MemTrackerAllocator<pair<const int, int>>> um(
+        10,
+        hash<int>(),
+        equal_to<int>(),
+        map_alloc);
+
+    // Don't care about the value (it depends on map internals).
+    ASSERT_GT(t->consumption(), 0);
+  }
+  ASSERT_EQ(0, t->consumption());
+}
+
+TEST(MemTrackerTest, FindFunctionsTakeOwnership) {
+  // In each test, ToString() would crash if the MemTracker is destroyed when
+  // 'm' goes out of scope.
+
+  shared_ptr<MemTracker> ref;
+  {
+    shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+    ASSERT_TRUE(MemTracker::FindTracker(m->id(), &ref));
+  }
+  LOG(INFO) << ref->ToString();
+  ref.reset();
+
+  {
+    shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+    ref = MemTracker::FindOrCreateGlobalTracker(-1, m->id());
+  }
+  LOG(INFO) << ref->ToString();
+  ref.reset();
+
+  vector<shared_ptr<MemTracker> > refs;
+  {
+    shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+    MemTracker::ListTrackers(&refs);
+  }
+  for (const shared_ptr<MemTracker>& r : refs) {
+    LOG(INFO) << r->ToString();
+  }
+  refs.clear();
+}
+
+TEST(MemTrackerTest, ScopedTrackedConsumption) {
+  shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+  ASSERT_EQ(0, m->consumption());
+  {
+    ScopedTrackedConsumption consumption(m, 1);
+    ASSERT_EQ(1, m->consumption());
+
+    consumption.Reset(3);
+    ASSERT_EQ(3, m->consumption());
+  }
+  ASSERT_EQ(0, m->consumption());
+}
+
+TEST(MemTrackerTest, CollisionDetection) {
+  shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "parent");
+  shared_ptr<MemTracker> c = MemTracker::CreateTracker(-1, "child", p);
+  vector<shared_ptr<MemTracker>> all;
+
+  // Three trackers: root, parent, and child.
+  MemTracker::ListTrackers(&all);
+  ASSERT_EQ(3, all.size());
+
+  // Now only two because the child has been destroyed.
+  c.reset();
+  MemTracker::ListTrackers(&all);
+  ASSERT_EQ(2, all.size());
+  shared_ptr<MemTracker> not_found;
+  ASSERT_FALSE(MemTracker::FindTracker("child", &not_found, p));
+
+  // Let's duplicate the parent. It's not recommended, but it's allowed.
+  shared_ptr<MemTracker> p2 = MemTracker::CreateTracker(-1, "parent");
+  ASSERT_EQ(p->ToString(), p2->ToString());
+
+  // Only when we do a Find() operation do we crash.
+#ifndef NDEBUG
+  const string kDeathMsg = "Multiple memtrackers with same id";
+  EXPECT_DEATH({
+    shared_ptr<MemTracker> found;
+    MemTracker::FindTracker("parent", &found);
+  }, kDeathMsg);
+  EXPECT_DEATH({
+    MemTracker::FindOrCreateGlobalTracker(-1, "parent");
+  }, kDeathMsg);
+#endif
+}
+
+TEST(MemTrackerTest, TestMultiThreadedRegisterAndDestroy) {
+  std::atomic<bool> done(false);
+  vector<std::thread> threads;
+  for (int i = 0; i < 10; i++) {
+    threads.emplace_back([&done]{
+        while (!done.load()) {
+          shared_ptr<MemTracker> t = MemTracker::FindOrCreateGlobalTracker(
+              1000, "foo");
+        }
+      });
+  }
+
+  SleepFor(MonoDelta::FromSeconds(AllowSlowTests() ? 5 : 1));
+  done.store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+TEST(MemTrackerTest, TestMultiThreadedCreateFind) {
+  shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "p");
+  shared_ptr<MemTracker> c1 = MemTracker::CreateTracker(-1, "c1", p);
+  std::atomic<bool> done(false);
+  vector<std::thread> threads;
+  threads.emplace_back([&]{
+    while (!done.load()) {
+      shared_ptr<MemTracker> c1_copy;
+      CHECK(MemTracker::FindTracker(c1->id(), &c1_copy, p));
+    }
+  });
+  for (int i = 0; i < 5; i++) {
+    threads.emplace_back([&, i]{
+      while (!done.load()) {
+        shared_ptr<MemTracker> c2 =
+            MemTracker::CreateTracker(-1, Substitute("ci-$0", i), p);
+      }
+    });
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(500));
+  done.store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mem_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker.cc b/be/src/kudu/util/mem_tracker.cc
new file mode 100644
index 0000000..88e1d23
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker.cc
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/mem_tracker.h"
+
+#include <algorithm>
+#include <deque>
+#include <limits>
+#include <list>
+#include <memory>
+#include <mutex>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/process_memory.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// NOTE: this class has been adapted from Impala, so the code style varies
+// somewhat from kudu.
+
+using std::deque;
+using std::list;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using std::weak_ptr;
+
+using strings::Substitute;
+
+// The ancestor for all trackers. Every tracker is visible from the root down.
+static shared_ptr<MemTracker> root_tracker;
+static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT;
+
+void MemTracker::CreateRootTracker() {
+  root_tracker.reset(new MemTracker(-1, "root", shared_ptr<MemTracker>()));
+  root_tracker->Init();
+}
+
+shared_ptr<MemTracker> MemTracker::CreateTracker(int64_t byte_limit,
+                                                 const string& id,
+                                                 const shared_ptr<MemTracker>& 
parent) {
+  shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker();
+  shared_ptr<MemTracker> tracker(new MemTracker(byte_limit, id, real_parent));
+  real_parent->AddChildTracker(tracker);
+  tracker->Init();
+
+  return tracker;
+}
+
+MemTracker::MemTracker(int64_t byte_limit, const string& id, 
shared_ptr<MemTracker> parent)
+    : limit_(byte_limit),
+      id_(id),
+      descr_(Substitute("memory consumption for $0", id)),
+      parent_(std::move(parent)),
+      consumption_(0) {
+  VLOG(1) << "Creating tracker " << ToString();
+}
+
+MemTracker::~MemTracker() {
+  VLOG(1) << "Destroying tracker " << ToString();
+  if (parent_) {
+    DCHECK(consumption() == 0) << "Memory tracker " << ToString()
+        << " has unreleased consumption " << consumption();
+    parent_->Release(consumption());
+
+    MutexLock l(parent_->child_trackers_lock_);
+    if (child_tracker_it_ != parent_->child_trackers_.end()) {
+      parent_->child_trackers_.erase(child_tracker_it_);
+      child_tracker_it_ = parent_->child_trackers_.end();
+    }
+  }
+}
+
+string MemTracker::ToString() const {
+  string s;
+  const MemTracker* tracker = this;
+  while (tracker) {
+    if (s != "") {
+      s += "->";
+    }
+    s += tracker->id();
+    tracker = tracker->parent_.get();
+  }
+  return s;
+}
+
+bool MemTracker::FindTracker(const string& id,
+                             shared_ptr<MemTracker>* tracker,
+                             const shared_ptr<MemTracker>& parent) {
+  return FindTrackerInternal(id, tracker, parent ? parent : GetRootTracker());
+}
+
+bool MemTracker::FindTrackerInternal(const string& id,
+                                     shared_ptr<MemTracker>* tracker,
+                                     const shared_ptr<MemTracker>& parent) {
+  DCHECK(parent != NULL);
+
+  list<weak_ptr<MemTracker>> children;
+  {
+    MutexLock l(parent->child_trackers_lock_);
+    children = parent->child_trackers_;
+  }
+
+  // Search for the matching child without holding the parent's lock.
+  //
+  // If the lock were held while searching, it'd be possible for 'child' to be
+  // the last live ref to a tracker, which would lead to a recursive
+  // acquisition of the parent lock during the 'child' destructor call.
+  vector<shared_ptr<MemTracker>> found;
+  for (const auto& child_weak : children) {
+    shared_ptr<MemTracker> child = child_weak.lock();
+    if (child && child->id() == id) {
+      found.emplace_back(std::move(child));
+    }
+  }
+  if (PREDICT_TRUE(found.size() == 1)) {
+    *tracker = found[0];
+    return true;
+  } else if (found.size() > 1) {
+    LOG(DFATAL) <<
+        Substitute("Multiple memtrackers with same id ($0) found on parent $1",
+                   id, parent->ToString());
+    *tracker = found[0];
+    return true;
+  }
+  return false;
+}
+
+shared_ptr<MemTracker> MemTracker::FindOrCreateGlobalTracker(
+    int64_t byte_limit,
+    const string& id) {
+  // The calls below comprise a critical section, but we can't use the root
+  // tracker's child_trackers_lock_ to synchronize it as the lock must be
+  // released during FindTrackerInternal(). Since this function creates
+  // globally-visible MemTrackers which are the exception rather than the rule,
+  // it's reasonable to synchronize their creation on a singleton lock.
+  static Mutex find_or_create_lock;
+  MutexLock l(find_or_create_lock);
+
+  shared_ptr<MemTracker> found;
+  if (FindTrackerInternal(id, &found, GetRootTracker())) {
+    return found;
+  }
+  return CreateTracker(byte_limit, id, GetRootTracker());
+}
+
+void MemTracker::ListTrackers(vector<shared_ptr<MemTracker>>* trackers) {
+  trackers->clear();
+  deque<shared_ptr<MemTracker> > to_process;
+  to_process.push_front(GetRootTracker());
+  while (!to_process.empty()) {
+    shared_ptr<MemTracker> t = to_process.back();
+    to_process.pop_back();
+
+    trackers->push_back(t);
+    {
+      MutexLock l(t->child_trackers_lock_);
+      for (const auto& child_weak : t->child_trackers_) {
+        shared_ptr<MemTracker> child = child_weak.lock();
+        if (child) {
+          to_process.emplace_back(std::move(child));
+        }
+      }
+    }
+  }
+}
+
+void MemTracker::Consume(int64_t bytes) {
+  if (bytes < 0) {
+    Release(-bytes);
+    return;
+  }
+
+  if (bytes == 0) {
+    return;
+  }
+  for (auto& tracker : all_trackers_) {
+    tracker->consumption_.IncrementBy(bytes);
+  }
+}
+
+bool MemTracker::TryConsume(int64_t bytes) {
+  if (bytes <= 0) {
+    Release(-bytes);
+    return true;
+  }
+
+  int i = 0;
+  // Walk the tracker tree top-down, consuming memory from each in turn.
+  for (i = all_trackers_.size() - 1; i >= 0; --i) {
+    MemTracker *tracker = all_trackers_[i];
+    if (tracker->limit_ < 0) {
+      tracker->consumption_.IncrementBy(bytes);
+    } else {
+      if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) {
+        break;
+      }
+    }
+  }
+  // Everyone succeeded, return.
+  if (i == -1) {
+    return true;
+  }
+
+  // Someone failed, roll back the ones that succeeded.
+  // TODO(todd): this doesn't roll it back completely since the max values for
+  // the updated trackers aren't decremented. The max values are only used
+  // for error reporting so this is probably okay. Rolling those back is
+  // pretty hard; we'd need something like 2PC.
+  for (int j = all_trackers_.size() - 1; j > i; --j) {
+    all_trackers_[j]->consumption_.IncrementBy(-bytes);
+  }
+  return false;
+}
+
+void MemTracker::Release(int64_t bytes) {
+  if (bytes < 0) {
+    Consume(-bytes);
+    return;
+  }
+
+  if (bytes == 0) {
+    return;
+  }
+
+  for (auto& tracker : all_trackers_) {
+    tracker->consumption_.IncrementBy(-bytes);
+  }
+  process_memory::MaybeGCAfterRelease(bytes);
+}
+
+bool MemTracker::AnyLimitExceeded() {
+  for (const auto& tracker : limit_trackers_) {
+    if (tracker->LimitExceeded()) {
+      return true;
+    }
+  }
+  return false;
+}
+
+int64_t MemTracker::SpareCapacity() const {
+  int64_t result = std::numeric_limits<int64_t>::max();
+  for (const auto& tracker : limit_trackers_) {
+    int64_t mem_left = tracker->limit() - tracker->consumption();
+    result = std::min(result, mem_left);
+  }
+  return result;
+}
+
+
+void MemTracker::Init() {
+  // populate all_trackers_ and limit_trackers_
+  MemTracker* tracker = this;
+  while (tracker) {
+    all_trackers_.push_back(tracker);
+    if (tracker->has_limit()) limit_trackers_.push_back(tracker);
+    tracker = tracker->parent_.get();
+  }
+  DCHECK_GT(all_trackers_.size(), 0);
+  DCHECK_EQ(all_trackers_[0], this);
+}
+
+void MemTracker::AddChildTracker(const shared_ptr<MemTracker>& tracker) {
+  MutexLock l(child_trackers_lock_);
+  tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), 
tracker);
+}
+
+shared_ptr<MemTracker> MemTracker::GetRootTracker() {
+  GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker);
+  return root_tracker;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mem_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker.h b/be/src/kudu/util/mem_tracker.h
new file mode 100644
index 0000000..a43e9a2
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker.h
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MEM_TRACKER_H
+#define KUDU_UTIL_MEM_TRACKER_H
+
+#include <list>
+#include <memory>
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/high_water_mark.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+class Status;
+class MemTracker;
+
+// A MemTracker tracks memory consumption; it contains an optional limit and is
+// arranged into a tree structure such that the consumption tracked by a
+// MemTracker is also tracked by its ancestors.
+//
+// The MemTracker hierarchy is rooted in a single static MemTracker.
+// The root MemTracker always exists, and it is the common
+// ancestor to all MemTrackers. All operations that discover MemTrackers begin
+// at the root and work their way down the tree, while operations that deal
+// with adjusting memory consumption begin at a particular MemTracker and work
+// their way up the tree to the root. All MemTrackers (except the root) must
+// have a parent. As a rule, all children belonging to a parent should have
+// unique ids, but this is only enforced during a Find() operation to allow for
+// transient duplicates (e.g. the web UI grabbing very short-lived references
+// to all MemTrackers while rendering a web page). This also means id
+// uniqueness only exists where it's actually needed.
+//
+// When a MemTracker begins its life, it has a strong reference to its parent
+// and the parent has a weak reference to it. Both remain for the lifetime of
+// the MemTracker.
+//
+// Memory consumption is tracked via calls to Consume()/Release(), either to
+// the tracker itself or to one of its descendants.
+//
+// This class is thread-safe.
+class MemTracker : public std::enable_shared_from_this<MemTracker> {
+ public:
+  ~MemTracker();
+
+  // Creates and adds the tracker to the tree so that it can be retrieved with
+  // FindTracker/FindOrCreateTracker.
+  //
+  // byte_limit < 0 means no limit; 'id' is a used as a label to uniquely 
identify
+  // the MemTracker for the below Find...() calls as well as the web UI.
+  //
+  // Use the two-argument form if there is no parent.
+  static std::shared_ptr<MemTracker> CreateTracker(
+      int64_t byte_limit,
+      const std::string& id,
+      const std::shared_ptr<MemTracker>& parent = 
std::shared_ptr<MemTracker>());
+
+  // If a tracker with the specified 'id' and 'parent' exists in the tree, sets
+  // 'tracker' to reference that instance. Returns false if no such tracker
+  // exists.
+  //
+  // Use the two-argument form if there is no parent.
+  //
+  // Note: this function will enforce that 'id' is unique amongst the children
+  // of 'parent'.
+  static bool FindTracker(
+      const std::string& id,
+      std::shared_ptr<MemTracker>* tracker,
+      const std::shared_ptr<MemTracker>& parent = 
std::shared_ptr<MemTracker>());
+
+  // If a global tracker with the specified 'id' exists in the tree, returns a
+  // shared_ptr to that instance. Otherwise, creates a new MemTracker with the
+  // specified byte_limit and id, parented to the root MemTracker.
+  //
+  // Note: this function will enforce that 'id' is unique amongst the children
+  // of the root MemTracker.
+  static std::shared_ptr<MemTracker> FindOrCreateGlobalTracker(
+      int64_t byte_limit, const std::string& id);
+
+  // Returns a list of all the valid trackers.
+  static void ListTrackers(std::vector<std::shared_ptr<MemTracker> >* 
trackers);
+
+  // Gets a shared_ptr to the "root" tracker, creating it if necessary.
+  static std::shared_ptr<MemTracker> GetRootTracker();
+
+  // Increases consumption of this tracker and its ancestors by 'bytes'.
+  void Consume(int64_t bytes);
+
+  // Increases consumption of this tracker and its ancestors by 'bytes' only if
+  // they can all consume 'bytes'. If this brings any of them over, none of 
them
+  // are updated.
+  // Returns true if the try succeeded.
+  bool TryConsume(int64_t bytes);
+
+  // Decreases consumption of this tracker and its ancestors by 'bytes'.
+  //
+  // This will also cause the process to periodically trigger tcmalloc 
"ReleaseMemory"
+  // to ensure that memory is released to the OS.
+  void Release(int64_t bytes);
+
+  // Returns true if a valid limit of this tracker or one of its ancestors is
+  // exceeded.
+  bool AnyLimitExceeded();
+
+  // If this tracker has a limit, checks the limit and attempts to free up 
some memory if
+  // the limit is exceeded by calling any added GC functions. Returns true if 
the limit is
+  // exceeded after calling the GC functions. Returns false if there is no 
limit.
+  bool LimitExceeded() {
+    return limit_ >= 0 && limit_ < consumption();
+  }
+
+  // Returns the maximum consumption that can be made without exceeding the 
limit on
+  // this tracker or any of its parents. Returns int64_t::max() if there are no
+  // limits and a negative value if any limit is already exceeded.
+  int64_t SpareCapacity() const;
+
+
+  int64_t limit() const { return limit_; }
+  bool has_limit() const { return limit_ >= 0; }
+  const std::string& id() const { return id_; }
+
+  // Returns the memory consumed in bytes.
+  int64_t consumption() const {
+    return consumption_.current_value();
+  }
+
+  int64_t peak_consumption() const { return consumption_.max_value(); }
+
+  // Retrieve the parent tracker, or NULL If one is not set.
+  std::shared_ptr<MemTracker> parent() const { return parent_; }
+
+  // Returns a textual representation of the tracker that is likely (but not
+  // guaranteed) to be globally unique.
+  std::string ToString() const;
+
+ private:
+  // byte_limit < 0 means no limit
+  // 'id' is the label for LogUsage() and web UI.
+  MemTracker(int64_t byte_limit, const std::string& id, 
std::shared_ptr<MemTracker> parent);
+
+  // Further initializes the tracker.
+  void Init();
+
+  // Adds tracker to child_trackers_.
+  void AddChildTracker(const std::shared_ptr<MemTracker>& tracker);
+
+  // Variant of FindTracker() that must be called with a non-NULL parent.
+  static bool FindTrackerInternal(
+      const std::string& id,
+      std::shared_ptr<MemTracker>* tracker,
+      const std::shared_ptr<MemTracker>& parent);
+
+  // Creates the root tracker.
+  static void CreateRootTracker();
+
+  int64_t limit_;
+  const std::string id_;
+  const std::string descr_;
+  std::shared_ptr<MemTracker> parent_;
+
+  HighWaterMark consumption_;
+
+  // this tracker plus all of its ancestors
+  std::vector<MemTracker*> all_trackers_;
+  // all_trackers_ with valid limits
+  std::vector<MemTracker*> limit_trackers_;
+
+  // All the child trackers of this tracker. Used for error reporting and
+  // listing only (i.e. updating the consumption of a parent tracker does not
+  // update that of its children).
+  mutable Mutex child_trackers_lock_;
+  std::list<std::weak_ptr<MemTracker>> child_trackers_;
+
+  // Iterator into parent_->child_trackers_ for this object. Stored to have 
O(1)
+  // remove.
+  std::list<std::weak_ptr<MemTracker>>::iterator child_tracker_it_;
+};
+
+// An std::allocator that manipulates a MemTracker during allocation
+// and deallocation.
+template<typename T, typename Alloc = std::allocator<T> >
+class MemTrackerAllocator : public Alloc {
+ public:
+  typedef typename Alloc::pointer pointer;
+  typedef typename Alloc::const_pointer const_pointer;
+  typedef typename Alloc::size_type size_type;
+
+  explicit MemTrackerAllocator(std::shared_ptr<MemTracker> mem_tracker)
+      : mem_tracker_(std::move(mem_tracker)) {}
+
+  // This constructor is used for rebinding.
+  template <typename U>
+  MemTrackerAllocator(const MemTrackerAllocator<U>& allocator)
+      : Alloc(allocator),
+        mem_tracker_(allocator.mem_tracker()) {
+  }
+
+  ~MemTrackerAllocator() {
+  }
+
+  pointer allocate(size_type n, const_pointer hint = 0) {
+    // Ideally we'd use TryConsume() here to enforce the tracker's limit.
+    // However, that means throwing bad_alloc if the limit is exceeded, and
+    // it's not clear that the rest of Kudu can handle that.
+    mem_tracker_->Consume(n * sizeof(T));
+    return Alloc::allocate(n, hint);
+  }
+
+  void deallocate(pointer p, size_type n) {
+    Alloc::deallocate(p, n);
+    mem_tracker_->Release(n * sizeof(T));
+  }
+
+  // This allows an allocator<T> to be used for a different type.
+  template <class U>
+  struct rebind {
+    typedef MemTrackerAllocator<U, typename Alloc::template rebind<U>::other> 
other;
+  };
+
+  const std::shared_ptr<MemTracker>& mem_tracker() const { return 
mem_tracker_; }
+
+ private:
+  std::shared_ptr<MemTracker> mem_tracker_;
+};
+
+// Convenience class that adds memory consumption to a tracker when declared,
+// releasing it when the end of scope is reached.
+class ScopedTrackedConsumption {
+ public:
+  ScopedTrackedConsumption(std::shared_ptr<MemTracker> tracker,
+                           int64_t to_consume)
+      : tracker_(std::move(tracker)), consumption_(to_consume) {
+    DCHECK(tracker_);
+    tracker_->Consume(consumption_);
+  }
+
+  void Reset(int64_t new_consumption) {
+    // Consume(-x) is the same as Release(x).
+    tracker_->Consume(new_consumption - consumption_);
+    consumption_ = new_consumption;
+  }
+
+  ~ScopedTrackedConsumption() {
+    tracker_->Release(consumption_);
+  }
+
+  int64_t consumption() const { return consumption_; }
+
+ private:
+  std::shared_ptr<MemTracker> tracker_;
+  int64_t consumption_;
+};
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_MEM_TRACKER_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memcmpable_varint-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint-test.cc 
b/be/src/kudu/util/memcmpable_varint-test.cc
new file mode 100644
index 0000000..3e5b5e0
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint-test.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/hexdump.h"
+#include "kudu/util/memcmpable_varint.h"
+#include "kudu/util/random.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+// Add operator<< to print pairs, used in a test below.
+// This has to be done in the 'std' namespace due to the way that
+// template resolution works.
+namespace std {
+template<typename T1, typename T2>
+ostream &operator <<(ostream &os, const pair<T1, T2> &pair) {
+  return os << "(" << pair.first << ", " << pair.second << ")";
+}
+}
+
+namespace kudu {
+
+class TestMemcmpableVarint : public KuduTest {
+ protected:
+  TestMemcmpableVarint() : random_(SeedRandom()) {}
+
+  // Random number generator that generates different length integers
+  // with equal probability -- i.e it is equally as likely to generate
+  // a number with 8 bits as it is to generate one with 64 bits.
+  // This is useful for testing varint implementations, where a uniform
+  // random is skewed towards generating longer integers.
+  uint64_t Rand64WithRandomBitLength() {
+    return random_.Next64() >> random_.Uniform(64);
+  }
+
+  Random random_;
+};
+
+static void DoRoundTripTest(uint64_t to_encode) {
+  static faststring buf;
+  buf.clear();
+  PutMemcmpableVarint64(&buf, to_encode);
+
+  uint64_t decoded;
+  Slice slice(buf);
+  bool success = GetMemcmpableVarint64(&slice, &decoded);
+  ASSERT_TRUE(success);
+  ASSERT_EQ(to_encode, decoded);
+  ASSERT_TRUE(slice.empty());
+}
+
+
+TEST_F(TestMemcmpableVarint, TestRoundTrip) {
+  // Test the first 100K integers
+  // (exercises the special cases for <= 67823 in the code)
+  for (int i = 0; i < 100000; i++) {
+    DoRoundTripTest(i);
+  }
+
+  // Test a bunch of random integers (which are likely to be many bytes)
+  for (int i = 0; i < 100000; i++) {
+    DoRoundTripTest(random_.Next64());
+  }
+}
+
+
+// Test that a composite key can be made up of multiple memcmpable
+// varints strung together, and that the resulting key compares the
+// same as the original pair of integers (i.e left-to-right).
+TEST_F(TestMemcmpableVarint, TestCompositeKeys) {
+  faststring buf1;
+  faststring buf2;
+
+  const int n_trials = 1000;
+
+  for (int i = 0; i < n_trials; i++) {
+    buf1.clear();
+    buf2.clear();
+
+    pair<uint64_t, uint64_t> p1 =
+      make_pair(Rand64WithRandomBitLength(), Rand64WithRandomBitLength());
+    PutMemcmpableVarint64(&buf1, p1.first);
+    PutMemcmpableVarint64(&buf1, p1.second);
+
+    pair<uint64_t, uint64_t> p2 =
+      make_pair(Rand64WithRandomBitLength(), Rand64WithRandomBitLength());
+    PutMemcmpableVarint64(&buf2, p2.first);
+    PutMemcmpableVarint64(&buf2, p2.second);
+
+    SCOPED_TRACE(testing::Message() << p1 << "\n" << HexDump(Slice(buf1))
+                 << "  vs\n" << p2 << "\n" << HexDump(Slice(buf2)));
+    if (p1 < p2) {
+      ASSERT_LT(Slice(buf1).compare(Slice(buf2)), 0);
+    } else if (p1 > p2) {
+      ASSERT_GT(Slice(buf1).compare(Slice(buf2)), 0);
+    } else {
+      ASSERT_EQ(Slice(buf1).compare(Slice(buf2)), 0);
+    }
+  }
+}
+
+// Similar to the above test, but instead of being randomized, specifically
+// tests "interesting" values -- i.e values around the boundaries of where
+// the encoding changes its number of bytes.
+TEST_F(TestMemcmpableVarint, TestInterestingCompositeKeys) {
+  vector<uint64_t> interesting_values = { 0, 1, 240, // 1 byte
+                                          241, 2000, 2287, // 2 bytes
+                                          2288, 40000, 67823, // 3 bytes
+                                          67824, 1ULL << 23, (1ULL << 24) - 1, 
// 4 bytes
+                                          1ULL << 24, 1ULL << 30, (1ULL << 32) 
- 1 }; // 5 bytes
+
+  faststring buf1;
+  faststring buf2;
+
+  for (uint64_t v1 : interesting_values) {
+    for (uint64_t v2 : interesting_values) {
+      buf1.clear();
+      pair<uint64_t, uint64_t> p1 = make_pair(v1, v2);
+      PutMemcmpableVarint64(&buf1, p1.first);
+      PutMemcmpableVarint64(&buf1, p1.second);
+
+      for (uint64_t v3 : interesting_values) {
+        for (uint64_t v4 : interesting_values) {
+          buf2.clear();
+          pair<uint64_t, uint64_t> p2 = make_pair(v3, v4);
+          PutMemcmpableVarint64(&buf2, p2.first);
+          PutMemcmpableVarint64(&buf2, p2.second);
+
+          SCOPED_TRACE(testing::Message() << p1 << "\n" << HexDump(Slice(buf1))
+                       << "  vs\n" << p2 << "\n" << HexDump(Slice(buf2)));
+          if (p1 < p2) {
+            ASSERT_LT(Slice(buf1).compare(Slice(buf2)), 0);
+          } else if (p1 > p2) {
+            ASSERT_GT(Slice(buf1).compare(Slice(buf2)), 0);
+          } else {
+            ASSERT_EQ(Slice(buf1).compare(Slice(buf2)), 0);
+          }
+        }
+      }
+    }
+  }
+}
+
+////////////////////////////////////////////////////////////
+// Benchmarks
+////////////////////////////////////////////////////////////
+
+#ifdef NDEBUG
+TEST_F(TestMemcmpableVarint, BenchmarkEncode) {
+  faststring buf;
+
+  int sum_sizes = 0; // need to do something with results to force evaluation
+
+  LOG_TIMING(INFO, "Encoding integers") {
+    for (int trial = 0; trial < 100; trial++) {
+      for (uint64_t i = 0; i < 1000000; i++) {
+        buf.clear();
+        PutMemcmpableVarint64(&buf, i);
+        sum_sizes += buf.size();
+      }
+    }
+  }
+  ASSERT_GT(sum_sizes, 1); // use 'sum_sizes' to avoid optimizing it out.
+}
+
+TEST_F(TestMemcmpableVarint, BenchmarkDecode) {
+  faststring buf;
+
+  // Encode 1M integers into the buffer
+  for (uint64_t i = 0; i < 1000000; i++) {
+    PutMemcmpableVarint64(&buf, i);
+  }
+
+  // Decode the whole buffer 100 times.
+  LOG_TIMING(INFO, "Decoding integers") {
+    uint64_t sum_vals = 0;
+    for (int trial = 0; trial < 100; trial++) {
+      Slice s(buf);
+      while (!s.empty()) {
+        uint64_t decoded;
+        CHECK(GetMemcmpableVarint64(&s, &decoded));
+        sum_vals += decoded;
+      }
+    }
+    ASSERT_GT(sum_vals, 1); // use 'sum_vals' to avoid optimizing it out.
+  }
+}
+
+#endif
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memcmpable_varint.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint.cc 
b/be/src/kudu/util/memcmpable_varint.cc
new file mode 100644
index 0000000..e55addf
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint.cc
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This file contains code derived from sqlite4, distributed in the public 
domain.
+//
+// A variable length integer is an encoding of 64-bit unsigned integers
+// into between 1 and 9 bytes.  The encoding is designed so that small
+// (and common) values take much less space that larger values.  Additional
+// properties:
+//
+//    *  The length of the varint can be determined after examining just
+//       the first byte of the encoding.
+//
+//    *  Varints compare in numerical order using memcmp().
+//
+//************************************************************************
+//
+// Treat each byte of the encoding as an unsigned integer between 0 and 255.
+// Let the bytes of the encoding be called A0, A1, A2, ..., A8.
+//
+// DECODE
+//
+// If A0 is between 0 and 240 inclusive, then the result is the value of A0.
+//
+// If A0 is between 241 and 248 inclusive, then the result is
+// 240+256*(A0-241)+A1.
+//
+// If A0 is 249 then the result is 2288+256*A1+A2.
+//
+// If A0 is 250 then the result is A1..A3 as a 3-byte big-ending integer.
+//
+// If A0 is 251 then the result is A1..A4 as a 4-byte big-ending integer.
+//
+// If A0 is 252 then the result is A1..A5 as a 5-byte big-ending integer.
+//
+// If A0 is 253 then the result is A1..A6 as a 6-byte big-ending integer.
+//
+// If A0 is 254 then the result is A1..A7 as a 7-byte big-ending integer.
+//
+// If A0 is 255 then the result is A1..A8 as a 8-byte big-ending integer.
+//
+// ENCODE
+//
+// Let the input value be V.
+//
+// If V<=240 then output a single by A0 equal to V.
+//
+// If V<=2287 then output A0 as (V-240)/256 + 241 and A1 as (V-240)%256.
+//
+// If V<=67823 then output A0 as 249, A1 as (V-2288)/256, and A2
+// as (V-2288)%256.
+//
+// If V<=16777215 then output A0 as 250 and A1 through A3 as a big-endian
+// 3-byte integer.
+//
+// If V<=4294967295 then output A0 as 251 and A1..A4 as a big-ending
+// 4-byte integer.
+//
+// If V<=1099511627775 then output A0 as 252 and A1..A5 as a big-ending
+// 5-byte integer.
+//
+// If V<=281474976710655 then output A0 as 253 and A1..A6 as a big-ending
+// 6-byte integer.
+//
+// If V<=72057594037927935 then output A0 as 254 and A1..A7 as a
+// big-ending 7-byte integer.
+//
+// Otherwise then output A0 as 255 and A1..A8 as a big-ending 8-byte integer.
+//
+// SUMMARY
+//
+//    Bytes    Max Value    Digits
+//    -------  ---------    ---------
+//      1      240           2.3
+//      2      2287          3.3
+//      3      67823         4.8
+//      4      2**24-1       7.2
+//      5      2**32-1       9.6
+//      6      2**40-1      12.0
+//      7      2**48-1      14.4
+//      8      2**56-1      16.8
+//      9      2**64-1      19.2
+//
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/memcmpable_varint.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+////////////////////////////////////////////////////////////
+// Begin code ripped from sqlite4
+////////////////////////////////////////////////////////////
+
+// This function is borrowed from sqlite4/varint.c
+static void varintWrite32(uint8_t *z, uint32_t y) {
+  z[0] = (uint8_t)(y>>24);
+  z[1] = (uint8_t)(y>>16);
+  z[2] = (uint8_t)(y>>8);
+  z[3] = (uint8_t)(y);
+}
+
+
+// Write a varint into z[].  The buffer z[] must be at least 9 characters
+// long to accommodate the largest possible varint.  Return the number of
+// bytes of z[] used.
+//
+// This function is borrowed from sqlite4/varint.c
+static size_t sqlite4PutVarint64(uint8_t *z, uint64_t x) {
+  uint64_t w, y;
+  if (x <= 240) {
+    z[0] = (uint8_t)x;
+    return 1;
+  }
+  if (x <= 2287) {
+    y = (uint64_t)(x - 240);
+    z[0] = (uint8_t)(y/256 + 241);
+    z[1] = (uint8_t)(y%256);
+    return 2;
+  }
+  if (x <= 67823) {
+    y = (uint64_t)(x - 2288);
+    z[0] = 249;
+    z[1] = (uint8_t)(y/256);
+    z[2] = (uint8_t)(y%256);
+    return 3;
+  }
+  y = (uint64_t)x;
+  w = (uint64_t)(x>>32);
+  if (w == 0) {
+    if (y <= 16777215) {
+      z[0] = 250;
+      z[1] = (uint8_t)(y>>16);
+      z[2] = (uint8_t)(y>>8);
+      z[3] = (uint8_t)(y);
+      return 4;
+    }
+    z[0] = 251;
+    varintWrite32(z+1, y);
+    return 5;
+  }
+  if (w <= 255) {
+    z[0] = 252;
+    z[1] = (uint8_t)w;
+    varintWrite32(z+2, y);
+    return 6;
+  }
+  if (w <= 65535) {
+    z[0] = 253;
+    z[1] = (uint8_t)(w>>8);
+    z[2] = (uint8_t)w;
+    varintWrite32(z+3, y);
+    return 7;
+  }
+  if (w <= 16777215) {
+    z[0] = 254;
+    z[1] = (uint8_t)(w>>16);
+    z[2] = (uint8_t)(w>>8);
+    z[3] = (uint8_t)w;
+    varintWrite32(z+4, y);
+    return 8;
+  }
+  z[0] = 255;
+  varintWrite32(z+1, w);
+  varintWrite32(z+5, y);
+  return 9;
+}
+
+// Decode the varint in the first n bytes z[].  Write the integer value
+// into *pResult and return the number of bytes in the varint.
+//
+// If the decode fails because there are not enough bytes in z[] then
+// return 0;
+//
+// Borrowed from sqlite4 varint.c
+static int sqlite4GetVarint64(
+  const uint8_t *z,
+  int n,
+  uint64_t *pResult) {
+  unsigned int x;
+  if ( n < 1) return 0;
+  if (z[0] <= 240) {
+    *pResult = z[0];
+    return 1;
+  }
+  if (z[0] <= 248) {
+    if ( n < 2) return 0;
+    *pResult = (z[0]-241)*256 + z[1] + 240;
+    return 2;
+  }
+  if (n < z[0]-246 ) return 0;
+  if (z[0] == 249) {
+    *pResult = 2288 + 256*z[1] + z[2];
+    return 3;
+  }
+  if (z[0] == 250) {
+    *pResult = (z[1]<<16) + (z[2]<<8) + z[3];
+    return 4;
+  }
+  x = (z[1]<<24) + (z[2]<<16) + (z[3]<<8) + z[4];
+  if (z[0] == 251) {
+    *pResult = x;
+    return 5;
+  }
+  if (z[0] == 252) {
+    *pResult = (((uint64_t)x)<<8) + z[5];
+    return 6;
+  }
+  if (z[0] == 253) {
+    *pResult = (((uint64_t)x)<<16) + (z[5]<<8) + z[6];
+    return 7;
+  }
+  if (z[0] == 254) {
+    *pResult = (((uint64_t)x)<<24) + (z[5]<<16) + (z[6]<<8) + z[7];
+    return 8;
+  }
+  *pResult = (((uint64_t)x)<<32) +
+               (0xffffffff & ((z[5]<<24) + (z[6]<<16) + (z[7]<<8) + z[8]));
+  return 9;
+}
+
+////////////////////////////////////////////////////////////
+// End code ripped from sqlite4
+////////////////////////////////////////////////////////////
+
+void PutMemcmpableVarint64(faststring *dst, uint64_t value) {
+  uint8_t buf[9];
+  int used = sqlite4PutVarint64(buf, value);
+  DCHECK_LE(used, sizeof(buf));
+  dst->append(buf, used);
+}
+
+bool GetMemcmpableVarint64(Slice *input, uint64_t *value) {
+  size_t size = sqlite4GetVarint64(input->data(), input->size(), value);
+  input->remove_prefix(size);
+  return size > 0;
+}
+
+
+} // namespace kudu


Reply via email to