http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/maintenance_manager.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/maintenance_manager.cc b/be/src/kudu/util/maintenance_manager.cc new file mode 100644 index 0000000..18ed8b8 --- /dev/null +++ b/be/src/kudu/util/maintenance_manager.cc @@ -0,0 +1,503 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/maintenance_manager.h" + +#include <gflags/gflags.h> +#include <memory> +#include <stdint.h> +#include <string> +#include <utility> + +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/debug/trace_logging.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" +#include "kudu/util/metrics.h" +#include "kudu/util/process_memory.h" +#include "kudu/util/random_util.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/thread.h" +#include "kudu/util/trace.h" + +using std::pair; +using std::shared_ptr; +using strings::Substitute; + +DEFINE_int32(maintenance_manager_num_threads, 1, + "Size of the maintenance manager thread pool. " + "For spinning disks, the number of threads should " + "not be above the number of devices."); +TAG_FLAG(maintenance_manager_num_threads, stable); + +DEFINE_int32(maintenance_manager_polling_interval_ms, 250, + "Polling interval for the maintenance manager scheduler, " + "in milliseconds."); +TAG_FLAG(maintenance_manager_polling_interval_ms, hidden); + +DEFINE_int32(maintenance_manager_history_size, 8, + "Number of completed operations the manager is keeping track of."); +TAG_FLAG(maintenance_manager_history_size, hidden); + +DEFINE_bool(enable_maintenance_manager, true, + "Enable the maintenance manager, runs compaction and tablet cleaning tasks."); +TAG_FLAG(enable_maintenance_manager, unsafe); + +DEFINE_int64(log_target_replay_size_mb, 1024, + "The target maximum size of logs to be replayed at startup. If a tablet " + "has in-memory operations that are causing more than this size of logs " + "to be retained, then the maintenance manager will prioritize flushing " + "these operations to disk."); +TAG_FLAG(log_target_replay_size_mb, experimental); + +DEFINE_int64(data_gc_min_size_mb, 0, + "The (exclusive) minimum number of megabytes of ancient data on " + "disk, per tablet, needed to prioritize deletion of that data."); +TAG_FLAG(data_gc_min_size_mb, experimental); + +DEFINE_double(data_gc_prioritization_prob, 0.5, + "The probability that we will prioritize data GC over performance " + "improvement operations. If set to 1.0, we will always prefer to " + "delete old data before running performance improvement operations " + "such as delta compaction."); +TAG_FLAG(data_gc_prioritization_prob, experimental); + +namespace kudu { + +MaintenanceOpStats::MaintenanceOpStats() { + Clear(); +} + +void MaintenanceOpStats::Clear() { + valid_ = false; + runnable_ = false; + ram_anchored_ = 0; + logs_retained_bytes_ = 0; + data_retained_bytes_ = 0; + perf_improvement_ = 0; + last_modified_ = MonoTime(); +} + +MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage) + : name_(std::move(name)), + running_(0), + cancel_(false), + io_usage_(io_usage) { +} + +MaintenanceOp::~MaintenanceOp() { + CHECK(!manager_.get()) << "You must unregister the " << name_ + << " Op before destroying it."; +} + +void MaintenanceOp::Unregister() { + CHECK(manager_.get()) << "Op " << name_ << " was never registered."; + manager_->UnregisterOp(this); +} + +const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = { + .num_threads = 0, + .polling_interval_ms = 0, + .history_size = 0, +}; + +MaintenanceManager::MaintenanceManager(const Options& options) + : num_threads_(options.num_threads <= 0 ? + FLAGS_maintenance_manager_num_threads : options.num_threads), + cond_(&lock_), + shutdown_(false), + running_ops_(0), + polling_interval_ms_(options.polling_interval_ms <= 0 ? + FLAGS_maintenance_manager_polling_interval_ms : + options.polling_interval_ms), + completed_ops_count_(0), + rand_(GetRandomSeed32()), + memory_pressure_func_(&process_memory::UnderMemoryPressure) { + CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_) + .set_max_threads(num_threads_).Build(&thread_pool_)); + uint32_t history_size = options.history_size == 0 ? + FLAGS_maintenance_manager_history_size : + options.history_size; + completed_ops_.resize(history_size); +} + +MaintenanceManager::~MaintenanceManager() { + Shutdown(); +} + +Status MaintenanceManager::Init(string server_uuid) { + server_uuid_ = std::move(server_uuid); + RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler", + boost::bind(&MaintenanceManager::RunSchedulerThread, this), + &monitor_thread_)); + return Status::OK(); +} + +void MaintenanceManager::Shutdown() { + { + std::lock_guard<Mutex> guard(lock_); + if (shutdown_) { + return; + } + shutdown_ = true; + cond_.Broadcast(); + } + if (monitor_thread_.get()) { + CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join()); + monitor_thread_.reset(); + thread_pool_->Shutdown(); + } +} + +void MaintenanceManager::RegisterOp(MaintenanceOp* op) { + CHECK(op); + std::lock_guard<Mutex> guard(lock_); + CHECK(!op->manager_) << "Tried to register " << op->name() + << ", but it was already registered."; + pair<OpMapTy::iterator, bool> val + (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats()))); + CHECK(val.second) + << "Tried to register " << op->name() + << ", but it already exists in ops_."; + op->manager_ = shared_from_this(); + op->cond_.reset(new ConditionVariable(&lock_)); + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Registered " << op->name(); +} + +void MaintenanceManager::UnregisterOp(MaintenanceOp* op) { + { + std::lock_guard<Mutex> guard(lock_); + CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name() + << ", but it is not currently registered with this maintenance manager."; + auto iter = ops_.find(op); + CHECK(iter != ops_.end()) << "Tried to unregister " << op->name() + << ", but it was never registered"; + // While the op is running, wait for it to be finished. + if (iter->first->running_ > 0) { + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Waiting for op " << op->name() + << " to finish so we can unregister it."; + } + op->CancelAndDisable(); + while (iter->first->running_ > 0) { + op->cond_->Wait(); + iter = ops_.find(op); + CHECK(iter != ops_.end()) << "Tried to unregister " << op->name() + << ", but another thread unregistered it while we were " + << "waiting for it to complete"; + } + ops_.erase(iter); + } + LOG_WITH_PREFIX(INFO) << "Unregistered op " << op->name(); + op->cond_.reset(); + // Remove the op's shared_ptr reference to us. This might 'delete this'. + op->manager_.reset(); +} + +void MaintenanceManager::RunSchedulerThread() { + if (!FLAGS_enable_maintenance_manager) { + LOG(INFO) << "Maintenance manager is disabled. Stopping thread."; + return; + } + + MonoDelta polling_interval = MonoDelta::FromMilliseconds(polling_interval_ms_); + + std::unique_lock<Mutex> guard(lock_); + + // Set to true if the scheduler runs and finds that there is no work to do. + bool prev_iter_found_no_work = false; + + while (true) { + // We'll keep sleeping if: + // 1) there are no free threads available to perform a maintenance op. + // or 2) we just tried to schedule an op but found nothing to run. + // However, if it's time to shut down, we want to do so immediately. + while ((running_ops_ >= num_threads_ || prev_iter_found_no_work) && !shutdown_) { + cond_.TimedWait(polling_interval); + prev_iter_found_no_work = false; + } + if (shutdown_) { + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Shutting down maintenance manager."; + return; + } + + // Find the best op. + MaintenanceOp* op = FindBestOp(); + // If we found no work to do, then we should sleep before trying again to schedule. + // Otherwise, we can go right into trying to find the next op. + prev_iter_found_no_work = (op == nullptr); + if (!op) { + VLOG_AND_TRACE("maintenance", 2) << LogPrefix() + << "No maintenance operations look worth doing."; + continue; + } + + // Prepare the maintenance operation. + op->running_++; + running_ops_++; + guard.unlock(); + bool ready = op->Prepare(); + guard.lock(); + if (!ready) { + LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name() + << ". Re-running scheduler."; + op->running_--; + op->cond_->Signal(); + continue; + } + + // Run the maintenance operation. + Status s = thread_pool_->SubmitFunc(boost::bind( + &MaintenanceManager::LaunchOp, this, op)); + CHECK(s.ok()); + } +} + +// Finding the best operation goes through four filters: +// - If there's an Op that we can run quickly that frees log retention, we run it. +// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot +// free), we run the Op with the highest RAM usage. +// - If there are Ops that are retaining logs past our target replay size, we run the one that has +// the highest retention (and if many qualify, then we run the one that also frees up the +// most RAM). +// - Finally, if there's nothing else that we really need to do, we run the Op that will improve +// performance the most. +// +// The reason it's done this way is that we want to prioritize limiting the amount of resources we +// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage. +// Reversing those can starve the low IO Ops when the system is under intense memory pressure. +// +// In the third priority we're at a point where nothing's urgent and there's nothing we can run +// quickly. +// TODO We currently optimize for freeing log retention but we could consider having some sort of +// sliding priority between log retention and RAM usage. For example, is an Op that frees +// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention +// and 128MB of RAM? Maybe a more holistic approach would be better. +MaintenanceOp* MaintenanceManager::FindBestOp() { + TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp"); + + size_t free_threads = num_threads_ - running_ops_; + if (free_threads == 0) { + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() + << "There are no free threads, so we can't run anything."; + return nullptr; + } + + int64_t low_io_most_logs_retained_bytes = 0; + MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr; + + uint64_t most_mem_anchored = 0; + MaintenanceOp* most_mem_anchored_op = nullptr; + + int64_t most_logs_retained_bytes = 0; + int64_t most_logs_retained_bytes_ram_anchored = 0; + MaintenanceOp* most_logs_retained_bytes_op = nullptr; + + int64_t most_data_retained_bytes = 0; + MaintenanceOp* most_data_retained_bytes_op = nullptr; + + double best_perf_improvement = 0; + MaintenanceOp* best_perf_improvement_op = nullptr; + for (OpMapTy::value_type &val : ops_) { + MaintenanceOp* op(val.first); + MaintenanceOpStats& stats(val.second); + VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name(); + // Update op stats. + stats.Clear(); + op->UpdateStats(&stats); + if (op->cancelled() || !stats.valid() || !stats.runnable()) { + continue; + } + if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes && + op->io_usage() == MaintenanceOp::LOW_IO_USAGE) { + low_io_most_logs_retained_bytes_op = op; + low_io_most_logs_retained_bytes = stats.logs_retained_bytes(); + VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free " + << stats.logs_retained_bytes() << " bytes of logs"; + } + + if (stats.ram_anchored() > most_mem_anchored) { + most_mem_anchored_op = op; + most_mem_anchored = stats.ram_anchored(); + } + // We prioritize ops that can free more logs, but when it's the same we pick the one that + // also frees up the most memory. + if (stats.logs_retained_bytes() > 0 && + (stats.logs_retained_bytes() > most_logs_retained_bytes || + (stats.logs_retained_bytes() == most_logs_retained_bytes && + stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) { + most_logs_retained_bytes_op = op; + most_logs_retained_bytes = stats.logs_retained_bytes(); + most_logs_retained_bytes_ram_anchored = stats.ram_anchored(); + } + + if (stats.data_retained_bytes() > most_data_retained_bytes) { + most_data_retained_bytes_op = op; + most_data_retained_bytes = stats.data_retained_bytes(); + VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free " + << stats.data_retained_bytes() << " bytes of data"; + } + + if ((!best_perf_improvement_op) || + (stats.perf_improvement() > best_perf_improvement)) { + best_perf_improvement_op = op; + best_perf_improvement = stats.perf_improvement(); + } + } + + // Look at ops that we can run quickly that free up log retention. + if (low_io_most_logs_retained_bytes_op) { + if (low_io_most_logs_retained_bytes > 0) { + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() + << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", " + << "because it can free up more logs " + << "at " << low_io_most_logs_retained_bytes + << " bytes with a low IO cost"; + return low_io_most_logs_retained_bytes_op; + } + } + + // Look at free memory. If it is dangerously low, we must select something + // that frees memory-- the op with the most anchored memory. + double capacity_pct; + if (memory_pressure_func_(&capacity_pct)) { + if (!most_mem_anchored_op) { + string msg = StringPrintf("we have exceeded our soft memory limit " + "(current capacity is %.2f%%). However, there are no ops currently " + "runnable which would free memory.", capacity_pct); + LOG_WITH_PREFIX(INFO) << msg; + return nullptr; + } + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "We have exceeded our soft memory limit " + << "(current capacity is " << capacity_pct << "%). Running the op " + << "which anchors the most memory: " << most_mem_anchored_op->name(); + return most_mem_anchored_op; + } + + if (most_logs_retained_bytes_op && + most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) { + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() + << "Performing " << most_logs_retained_bytes_op->name() << ", " + << "because it can free up more logs (" << most_logs_retained_bytes + << " bytes)"; + return most_logs_retained_bytes_op; + } + + // Look at ops that we can run quickly that free up data on disk. + if (most_data_retained_bytes_op && + most_data_retained_bytes > FLAGS_data_gc_min_size_mb * 1024 * 1024) { + if (!best_perf_improvement_op || best_perf_improvement <= 0 || + rand_.NextDoubleFraction() <= FLAGS_data_gc_prioritization_prob) { + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() + << "Performing " << most_data_retained_bytes_op->name() << ", " + << "because it can free up more data " + << "at " << most_data_retained_bytes << " bytes"; + return most_data_retained_bytes_op; + } + VLOG(1) << "Skipping data GC due to prioritizing perf improvement"; + } + + if (best_perf_improvement_op && best_perf_improvement > 0) { + VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Performing " + << best_perf_improvement_op->name() << ", " + << "because it had the best perf_improvement score, " + << "at " << best_perf_improvement; + return best_perf_improvement_op; + } + return nullptr; +} + +void MaintenanceManager::LaunchOp(MaintenanceOp* op) { + MonoTime start_time = MonoTime::Now(); + op->RunningGauge()->Increment(); + + scoped_refptr<Trace> trace(new Trace); + LOG_TIMING(INFO, Substitute("running $0", op->name())) { + ADOPT_TRACE(trace.get()); + TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp", + "name", op->name()); + op->Perform(); + } + LOG_WITH_PREFIX(INFO) << op->name() << " metrics: " << trace->MetricsAsJSON(); + + op->RunningGauge()->Decrement(); + MonoDelta delta = MonoTime::Now() - start_time; + + std::lock_guard<Mutex> l(lock_); + CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()]; + completed_op.name = op->name(); + completed_op.duration = delta; + completed_op.start_mono_time = start_time; + completed_ops_count_++; + + op->DurationHistogram()->Increment(delta.ToMilliseconds()); + + running_ops_--; + op->running_--; + op->cond_->Signal(); + cond_.Signal(); // wake up scheduler +} + +void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) { + DCHECK(out_pb != nullptr); + std::lock_guard<Mutex> guard(lock_); + MaintenanceOp* best_op = FindBestOp(); + for (MaintenanceManager::OpMapTy::value_type& val : ops_) { + MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations(); + MaintenanceOp* op(val.first); + MaintenanceOpStats& stat(val.second); + op_pb->set_name(op->name()); + op_pb->set_running(op->running()); + if (stat.valid()) { + op_pb->set_runnable(stat.runnable()); + op_pb->set_ram_anchored_bytes(stat.ram_anchored()); + op_pb->set_logs_retained_bytes(stat.logs_retained_bytes()); + op_pb->set_perf_improvement(stat.perf_improvement()); + } else { + op_pb->set_runnable(false); + op_pb->set_ram_anchored_bytes(0); + op_pb->set_logs_retained_bytes(0); + op_pb->set_perf_improvement(0.0); + } + + if (best_op == op) { + out_pb->mutable_best_op()->CopyFrom(*op_pb); + } + } + + for (int n = 1; n <= completed_ops_.size(); n++) { + int i = completed_ops_count_ - n; + if (i < 0) break; + const auto& completed_op = completed_ops_[i % completed_ops_.size()]; + + if (!completed_op.name.empty()) { + MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = out_pb->add_completed_operations(); + completed_pb->set_name(completed_op.name); + completed_pb->set_duration_millis(completed_op.duration.ToMilliseconds()); + + MonoDelta delta(MonoTime::Now().GetDeltaSince(completed_op.start_mono_time)); + completed_pb->set_secs_since_start(delta.ToSeconds()); + } + } +} + +std::string MaintenanceManager::LogPrefix() const { + return Substitute("P $0: ", server_uuid_); +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/maintenance_manager.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/maintenance_manager.h b/be/src/kudu/util/maintenance_manager.h new file mode 100644 index 0000000..6070e2d --- /dev/null +++ b/be/src/kudu/util/maintenance_manager.h @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stdint.h> + +#include <functional> +#include <map> +#include <memory> +#include <set> +#include <string> +#include <vector> + +#include "kudu/gutil/macros.h" +#include "kudu/util/atomic.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/maintenance_manager.pb.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/random.h" +#include "kudu/util/thread.h" +#include "kudu/util/threadpool.h" + +namespace kudu { + +template<class T> +class AtomicGauge; +class Histogram; +class MaintenanceManager; + +class MaintenanceOpStats { + public: + MaintenanceOpStats(); + + // Zero all stats. They are invalid until the first setter is called. + void Clear(); + + bool runnable() const { + DCHECK(valid_); + return runnable_; + } + + void set_runnable(bool runnable) { + UpdateLastModified(); + runnable_ = runnable; + } + + uint64_t ram_anchored() const { + DCHECK(valid_); + return ram_anchored_; + } + + void set_ram_anchored(uint64_t ram_anchored) { + UpdateLastModified(); + ram_anchored_ = ram_anchored; + } + + int64_t logs_retained_bytes() const { + DCHECK(valid_); + return logs_retained_bytes_; + } + + void set_logs_retained_bytes(int64_t logs_retained_bytes) { + UpdateLastModified(); + logs_retained_bytes_ = logs_retained_bytes; + } + + int64_t data_retained_bytes() const { + DCHECK(valid_); + return data_retained_bytes_; + } + + void set_data_retained_bytes(int64_t data_retained_bytes) { + UpdateLastModified(); + data_retained_bytes_ = data_retained_bytes; + } + + double perf_improvement() const { + DCHECK(valid_); + return perf_improvement_; + } + + void set_perf_improvement(double perf_improvement) { + UpdateLastModified(); + perf_improvement_ = perf_improvement; + } + + const MonoTime& last_modified() const { + DCHECK(valid_); + return last_modified_; + } + + bool valid() const { + return valid_; + } + + private: + void UpdateLastModified() { + valid_ = true; + last_modified_ = MonoTime::Now(); + } + + // Important: Update Clear() when adding fields to this class. + + // True if these stats are valid. + bool valid_; + + // True if this op can be run now. + bool runnable_; + + // The approximate amount of memory that not doing this operation keeps + // around. This number is used to decide when to start freeing memory, so it + // should be fairly accurate. May be 0. + uint64_t ram_anchored_; + + // Approximate amount of disk space in WAL files that would be freed if this + // operation ran. May be 0. + int64_t logs_retained_bytes_; + + // Approximate amount of disk space in data blocks that would be freed if + // this operation ran. May be 0. + int64_t data_retained_bytes_; + + // The estimated performance improvement-- how good it is to do this on some + // absolute scale (yet TBD). + double perf_improvement_; + + // The last time that the stats were modified. + MonoTime last_modified_; +}; + +// MaintenanceOp objects represent background operations that the +// MaintenanceManager can schedule. Once a MaintenanceOp is registered, the +// manager will periodically poll it for statistics. The registrant is +// responsible for managing the memory associated with the MaintenanceOp object. +// Op objects should be unregistered before being de-allocated. +class MaintenanceOp { + public: + friend class MaintenanceManager; + + // General indicator of how much IO the Op will use. + enum IOUsage { + LOW_IO_USAGE, // Low impact operations like removing a file, updating metadata. + HIGH_IO_USAGE // Everything else. + }; + + explicit MaintenanceOp(std::string name, IOUsage io_usage); + virtual ~MaintenanceOp(); + + // Unregister this op, if it is currently registered. + void Unregister(); + + // Update the op statistics. This will be called every scheduling period + // (about a few times a second), so it should not be too expensive. It's + // possible for the returned statistics to be invalid; the caller should + // call MaintenanceOpStats::valid() before using them. This will be run + // under the MaintenanceManager lock. + virtual void UpdateStats(MaintenanceOpStats* stats) = 0; + + // Prepare to perform the operation. This will be run without holding the + // maintenance manager lock. It should be short, since it is run from the + // context of the maintenance op scheduler thread rather than a worker thread. + // If this returns false, we will abort the operation. + virtual bool Prepare() = 0; + + // Perform the operation. This will be run without holding the maintenance + // manager lock, and may take a long time. + virtual void Perform() = 0; + + // Returns the histogram for this op that tracks duration. Cannot be NULL. + virtual scoped_refptr<Histogram> DurationHistogram() const = 0; + + // Returns the gauge for this op that tracks when this op is running. Cannot be NULL. + virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0; + + uint32_t running() { return running_; } + + std::string name() const { return name_; } + + IOUsage io_usage() const { return io_usage_; } + + // Return true if the operation has been cancelled due to Unregister() pending. + bool cancelled() const { + return cancel_.Load(); + } + + // Cancel this operation, which prevents new instances of it from being scheduled + // regardless of whether the statistics indicate it is runnable. Instances may also + // optionally poll 'cancelled()' on a periodic basis to know if they should abort a + // lengthy operation in the middle of Perform(). + void CancelAndDisable() { + cancel_.Store(true); + } + + private: + DISALLOW_COPY_AND_ASSIGN(MaintenanceOp); + + // The name of the operation. Op names must be unique. + const std::string name_; + + // The number of times that this op is currently running. + uint32_t running_; + + // Set when we are trying to unregister the maintenance operation. + // Ongoing operations could read this boolean and cancel themselves. + // New operations will not be scheduled when this boolean is set. + AtomicBool cancel_; + + // Condition variable which the UnregisterOp function can wait on. + // + // Note: 'cond_' is used with the MaintenanceManager's mutex. As such, + // it only exists when the op is registered. + gscoped_ptr<ConditionVariable> cond_; + + // The MaintenanceManager with which this op is registered, or null + // if it is not registered. + std::shared_ptr<MaintenanceManager> manager_; + + IOUsage io_usage_; +}; + +struct MaintenanceOpComparator { + bool operator() (const MaintenanceOp* lhs, + const MaintenanceOp* rhs) const { + return lhs->name().compare(rhs->name()) < 0; + } +}; + +// Holds the information regarding a recently completed operation. +struct CompletedOp { + std::string name; + MonoDelta duration; + MonoTime start_mono_time; +}; + +// The MaintenanceManager manages the scheduling of background operations such +// as flushes or compactions. It runs these operations in the background, in a +// thread pool. It uses information provided in MaintenanceOpStats objects to +// decide which operations, if any, to run. +class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManager> { + public: + struct Options { + int32_t num_threads; + int32_t polling_interval_ms; + uint32_t history_size; + }; + + explicit MaintenanceManager(const Options& options); + ~MaintenanceManager(); + + Status Init(std::string server_uuid); + void Shutdown(); + + // Register an op with the manager. + void RegisterOp(MaintenanceOp* op); + + // Unregister an op with the manager. + // If the Op is currently running, it will not be interrupted. However, this + // function will block until the Op is finished. + void UnregisterOp(MaintenanceOp* op); + + void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb); + + void set_memory_pressure_func_for_tests(std::function<bool(double*)> f) { + std::lock_guard<Mutex> guard(lock_); + memory_pressure_func_ = std::move(f); + } + + static const Options DEFAULT_OPTIONS; + + private: + FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization); + typedef std::map<MaintenanceOp*, MaintenanceOpStats, + MaintenanceOpComparator> OpMapTy; + + void RunSchedulerThread(); + + // find the best op, or null if there is nothing we want to run + MaintenanceOp* FindBestOp(); + + void LaunchOp(MaintenanceOp* op); + + std::string LogPrefix() const; + + const int32_t num_threads_; + OpMapTy ops_; // registered operations + Mutex lock_; + scoped_refptr<kudu::Thread> monitor_thread_; + gscoped_ptr<ThreadPool> thread_pool_; + ConditionVariable cond_; + bool shutdown_; + uint64_t running_ops_; + int32_t polling_interval_ms_; + // Vector used as a circular buffer for recently completed ops. Elements need to be added at + // the completed_ops_count_ % the vector's size and then the count needs to be incremented. + std::vector<CompletedOp> completed_ops_; + int64_t completed_ops_count_; + std::string server_uuid_; + Random rand_; + + // Function which should return true if the server is under global memory pressure. + // This is indirected for testing purposes. + std::function<bool(double*)> memory_pressure_func_; + + DISALLOW_COPY_AND_ASSIGN(MaintenanceManager); +}; + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/maintenance_manager.proto ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/maintenance_manager.proto b/be/src/kudu/util/maintenance_manager.proto new file mode 100644 index 0000000..75b0ab3 --- /dev/null +++ b/be/src/kudu/util/maintenance_manager.proto @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +syntax = "proto2"; +package kudu; + +option java_package = "org.apache.kudu"; + +// Used to present the maintenance manager's internal state. +message MaintenanceManagerStatusPB { + message MaintenanceOpPB { + required string name = 1; + // Number of times this operation is currently running. + required uint32 running = 2; + required bool runnable = 3; + required uint64 ram_anchored_bytes = 4; + required int64 logs_retained_bytes = 5; + required double perf_improvement = 6; + } + + message CompletedOpPB { + required string name = 1; + required int32 duration_millis = 2; + // Number of seconds since this operation started. + required int32 secs_since_start = 3; + } + + // The next operation that would run. + optional MaintenanceOpPB best_op = 1; + + // List of all the operations. + repeated MaintenanceOpPB registered_operations = 2; + + // This list isn't in order of anything. Can contain the same operation mutiple times. + repeated CompletedOpPB completed_operations = 3; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/make_shared.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/make_shared.h b/be/src/kudu/util/make_shared.h new file mode 100644 index 0000000..af254df --- /dev/null +++ b/be/src/kudu/util/make_shared.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_UTIL_MAKE_SHARED_H_ +#define KUDU_UTIL_MAKE_SHARED_H_ + +#include <memory> + +// It isn't possible to use std::make_shared() with a class that has private +// constructors. Moreover, the standard workarounds are inelegant when said +// class has non-default constructors. As such, we employ a simple solution: +// declare the class as a friend to std::make_shared()'s internal allocator. +// This approach is non-portable and must be implemented separately for each +// supported STL implementation. +// +// Note: due to friendship restrictions on partial template specialization, +// it isn't possible to befriend just the allocation function; the entire +// allocator class must be befriended. +// +// See http://stackoverflow.com/q/8147027 for a longer discussion. + +#ifdef __GLIBCXX__ + // In libstdc++, new_allocator is defined as a class (ext/new_allocator.h) + // but forward declared as a struct (ext/alloc_traits.h). Clang complains + // about this when -Wmismatched-tags is set, which gcc doesn't support + // (which probably explains why the discrepancy exists in the first place). + // We can temporarily disable this warning via pragmas [1], but we must + // not expose them to gcc due to its poor handling of the _Pragma() C99 + // operator [2]. + // + // 1. http://clang.llvm.org/docs/UsersManual.html#controlling-diagnostics-via-pragmas + // 2. https://gcc.gnu.org/bugzilla/show_bug.cgi?id=60875 + #ifdef __clang__ + #define ALLOW_MAKE_SHARED(T) \ + _Pragma("clang diagnostic push") \ + _Pragma("clang diagnostic ignored \"-Wmismatched-tags\"") \ + friend class __gnu_cxx::new_allocator<T> \ + _Pragma("clang diagnostic pop") + #else + #define ALLOW_MAKE_SHARED(T) \ + friend class __gnu_cxx::new_allocator<T> + #endif +#elif defined(_LIBCPP_VERSION) + #define ALLOW_MAKE_SHARED(T) \ + friend class std::__1::__libcpp_compressed_pair_imp<std::__1::allocator<T>, T, 1> +#else + #error "Need to implement ALLOW_MAKE_SHARED for your platform!" +#endif + +#endif // KUDU_UTIL_MAKE_SHARED_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/malloc.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/malloc.cc b/be/src/kudu/util/malloc.cc new file mode 100644 index 0000000..3fec2db --- /dev/null +++ b/be/src/kudu/util/malloc.cc @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/malloc.h" + +#if defined(__linux__) +#include <malloc.h> +#else +#include <malloc/malloc.h> +#endif // defined(__linux__) + +namespace kudu { + +int64_t kudu_malloc_usable_size(const void* obj) { +#if defined(__linux__) + return malloc_usable_size(const_cast<void*>(obj)); +#else + return malloc_size(obj); +#endif // defined(__linux__) +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/malloc.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/malloc.h b/be/src/kudu/util/malloc.h new file mode 100644 index 0000000..e8a27c5 --- /dev/null +++ b/be/src/kudu/util/malloc.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_MALLOC_H +#define KUDU_UTIL_MALLOC_H + +#include <stdint.h> + +namespace kudu { + +// Simple wrapper for malloc_usable_size(). +// +// Really just centralizes the const_cast, as this function is often called +// on const pointers (i.e. "this" in a const method). +int64_t kudu_malloc_usable_size(const void* obj); + +} // namespace kudu + +#endif // KUDU_UTIL_MALLOC_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/map-util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/map-util-test.cc b/be/src/kudu/util/map-util-test.cc new file mode 100644 index 0000000..1e818a2 --- /dev/null +++ b/be/src/kudu/util/map-util-test.cc @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This unit test belongs in gutil, but it depends on test_main which is +// part of util. +#include "kudu/gutil/map-util.h" + +#include <gtest/gtest.h> +#include <map> +#include <memory> + +using std::map; +using std::string; +using std::shared_ptr; +using std::unique_ptr; + +namespace kudu { + +TEST(FloorTest, TestMapUtil) { + map<int, int> my_map; + + ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 5)); + + my_map[5] = 5; + ASSERT_EQ(5, *FindFloorOrNull(my_map, 6)); + ASSERT_EQ(5, *FindFloorOrNull(my_map, 5)); + ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 4)); + + my_map[1] = 1; + ASSERT_EQ(5, *FindFloorOrNull(my_map, 6)); + ASSERT_EQ(5, *FindFloorOrNull(my_map, 5)); + ASSERT_EQ(1, *FindFloorOrNull(my_map, 4)); + ASSERT_EQ(1, *FindFloorOrNull(my_map, 1)); + ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 0)); +} + +TEST(ComputeIfAbsentTest, TestComputeIfAbsent) { + map<string, string> my_map; + auto result = ComputeIfAbsent(&my_map, "key", []{ return "hello_world"; }); + ASSERT_EQ(*result, "hello_world"); + auto result2 = ComputeIfAbsent(&my_map, "key", [] { return "hello_world2"; }); + ASSERT_EQ(*result2, "hello_world"); +} + +TEST(ComputeIfAbsentTest, TestComputeIfAbsentAndReturnAbsense) { + map<string, string> my_map; + auto result = ComputeIfAbsentReturnAbsense(&my_map, "key", []{ return "hello_world"; }); + ASSERT_TRUE(result.second); + ASSERT_EQ(*result.first, "hello_world"); + auto result2 = ComputeIfAbsentReturnAbsense(&my_map, "key", [] { return "hello_world2"; }); + ASSERT_FALSE(result2.second); + ASSERT_EQ(*result2.first, "hello_world"); +} + +TEST(FindPointeeOrNullTest, TestFindPointeeOrNull) { + map<string, unique_ptr<string>> my_map; + auto iter = my_map.emplace("key", unique_ptr<string>(new string("hello_world"))); + ASSERT_TRUE(iter.second); + string* value = FindPointeeOrNull(my_map, "key"); + ASSERT_TRUE(value != nullptr); + ASSERT_EQ(*value, "hello_world"); + my_map.erase(iter.first); + value = FindPointeeOrNull(my_map, "key"); + ASSERT_TRUE(value == nullptr); +} + +TEST(EraseKeyReturnValuePtrTest, TestRawAndSmartSmartPointers) { + map<string, unique_ptr<string>> my_map; + unique_ptr<string> value = EraseKeyReturnValuePtr(&my_map, "key"); + ASSERT_TRUE(value.get() == nullptr); + my_map.emplace("key", unique_ptr<string>(new string("hello_world"))); + value = EraseKeyReturnValuePtr(&my_map, "key"); + ASSERT_EQ(*value, "hello_world"); + value.reset(); + value = EraseKeyReturnValuePtr(&my_map, "key"); + ASSERT_TRUE(value.get() == nullptr); + map<string, shared_ptr<string>> my_map2; + shared_ptr<string> value2 = EraseKeyReturnValuePtr(&my_map2, "key"); + ASSERT_TRUE(value2.get() == nullptr); + my_map2.emplace("key", shared_ptr<string>(new string("hello_world"))); + value2 = EraseKeyReturnValuePtr(&my_map2, "key"); + ASSERT_EQ(*value2, "hello_world"); + map<string, string*> my_map_raw; + my_map_raw.emplace("key", new string("hello_world")); + value.reset(EraseKeyReturnValuePtr(&my_map_raw, "key")); + ASSERT_EQ(*value, "hello_world"); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mem_tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/mem_tracker-test.cc b/be/src/kudu/util/mem_tracker-test.cc new file mode 100644 index 0000000..7e78cbe --- /dev/null +++ b/be/src/kudu/util/mem_tracker-test.cc @@ -0,0 +1,278 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/mem_tracker.h" + +#include <atomic> +#include <string> +#include <thread> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/test_util.h" + +namespace kudu { + +using std::equal_to; +using std::hash; +using std::pair; +using std::shared_ptr; +using std::string; +using std::unordered_map; +using std::vector; +using strings::Substitute; + +TEST(MemTrackerTest, SingleTrackerNoLimit) { + shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t"); + EXPECT_FALSE(t->has_limit()); + t->Consume(10); + EXPECT_EQ(t->consumption(), 10); + t->Consume(10); + EXPECT_EQ(t->consumption(), 20); + t->Release(15); + EXPECT_EQ(t->consumption(), 5); + EXPECT_FALSE(t->LimitExceeded()); + t->Release(5); + EXPECT_EQ(t->consumption(), 0); +} + +TEST(MemTrackerTest, SingleTrackerWithLimit) { + shared_ptr<MemTracker> t = MemTracker::CreateTracker(11, "t"); + EXPECT_TRUE(t->has_limit()); + t->Consume(10); + EXPECT_EQ(t->consumption(), 10); + EXPECT_FALSE(t->LimitExceeded()); + t->Consume(10); + EXPECT_EQ(t->consumption(), 20); + EXPECT_TRUE(t->LimitExceeded()); + t->Release(15); + EXPECT_EQ(t->consumption(), 5); + EXPECT_FALSE(t->LimitExceeded()); + t->Release(5); +} + +TEST(MemTrackerTest, TrackerHierarchy) { + shared_ptr<MemTracker> p = MemTracker::CreateTracker(100, "p"); + shared_ptr<MemTracker> c1 = MemTracker::CreateTracker(80, "c1", p); + shared_ptr<MemTracker> c2 = MemTracker::CreateTracker(50, "c2", p); + + // everything below limits + c1->Consume(60); + EXPECT_EQ(c1->consumption(), 60); + EXPECT_FALSE(c1->LimitExceeded()); + EXPECT_FALSE(c1->AnyLimitExceeded()); + EXPECT_EQ(c2->consumption(), 0); + EXPECT_FALSE(c2->LimitExceeded()); + EXPECT_FALSE(c2->AnyLimitExceeded()); + EXPECT_EQ(p->consumption(), 60); + EXPECT_FALSE(p->LimitExceeded()); + EXPECT_FALSE(p->AnyLimitExceeded()); + + // p goes over limit + c2->Consume(50); + EXPECT_EQ(c1->consumption(), 60); + EXPECT_FALSE(c1->LimitExceeded()); + EXPECT_TRUE(c1->AnyLimitExceeded()); + EXPECT_EQ(c2->consumption(), 50); + EXPECT_FALSE(c2->LimitExceeded()); + EXPECT_TRUE(c2->AnyLimitExceeded()); + EXPECT_EQ(p->consumption(), 110); + EXPECT_TRUE(p->LimitExceeded()); + + // c2 goes over limit, p drops below limit + c1->Release(20); + c2->Consume(10); + EXPECT_EQ(c1->consumption(), 40); + EXPECT_FALSE(c1->LimitExceeded()); + EXPECT_FALSE(c1->AnyLimitExceeded()); + EXPECT_EQ(c2->consumption(), 60); + EXPECT_TRUE(c2->LimitExceeded()); + EXPECT_TRUE(c2->AnyLimitExceeded()); + EXPECT_EQ(p->consumption(), 100); + EXPECT_FALSE(p->LimitExceeded()); + c1->Release(40); + c2->Release(60); +} + +class GcFunctionHelper { + public: + static const int NUM_RELEASE_BYTES = 1; + + explicit GcFunctionHelper(MemTracker* tracker) : tracker_(tracker) { } + + void GcFunc() { tracker_->Release(NUM_RELEASE_BYTES); } + + private: + MemTracker* tracker_; +}; + +TEST(MemTrackerTest, STLContainerAllocator) { + shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t"); + MemTrackerAllocator<int> vec_alloc(t); + MemTrackerAllocator<pair<const int, int>> map_alloc(t); + + // Simple test: use the allocator in a vector. + { + vector<int, MemTrackerAllocator<int> > v(vec_alloc); + ASSERT_EQ(0, t->consumption()); + v.reserve(5); + ASSERT_EQ(5 * sizeof(int), t->consumption()); + v.reserve(10); + ASSERT_EQ(10 * sizeof(int), t->consumption()); + } + ASSERT_EQ(0, t->consumption()); + + // Complex test: use it in an unordered_map, where it must be rebound in + // order to allocate the map's buckets. + { + unordered_map<int, int, hash<int>, equal_to<int>, MemTrackerAllocator<pair<const int, int>>> um( + 10, + hash<int>(), + equal_to<int>(), + map_alloc); + + // Don't care about the value (it depends on map internals). + ASSERT_GT(t->consumption(), 0); + } + ASSERT_EQ(0, t->consumption()); +} + +TEST(MemTrackerTest, FindFunctionsTakeOwnership) { + // In each test, ToString() would crash if the MemTracker is destroyed when + // 'm' goes out of scope. + + shared_ptr<MemTracker> ref; + { + shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test"); + ASSERT_TRUE(MemTracker::FindTracker(m->id(), &ref)); + } + LOG(INFO) << ref->ToString(); + ref.reset(); + + { + shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test"); + ref = MemTracker::FindOrCreateGlobalTracker(-1, m->id()); + } + LOG(INFO) << ref->ToString(); + ref.reset(); + + vector<shared_ptr<MemTracker> > refs; + { + shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test"); + MemTracker::ListTrackers(&refs); + } + for (const shared_ptr<MemTracker>& r : refs) { + LOG(INFO) << r->ToString(); + } + refs.clear(); +} + +TEST(MemTrackerTest, ScopedTrackedConsumption) { + shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test"); + ASSERT_EQ(0, m->consumption()); + { + ScopedTrackedConsumption consumption(m, 1); + ASSERT_EQ(1, m->consumption()); + + consumption.Reset(3); + ASSERT_EQ(3, m->consumption()); + } + ASSERT_EQ(0, m->consumption()); +} + +TEST(MemTrackerTest, CollisionDetection) { + shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "parent"); + shared_ptr<MemTracker> c = MemTracker::CreateTracker(-1, "child", p); + vector<shared_ptr<MemTracker>> all; + + // Three trackers: root, parent, and child. + MemTracker::ListTrackers(&all); + ASSERT_EQ(3, all.size()); + + // Now only two because the child has been destroyed. + c.reset(); + MemTracker::ListTrackers(&all); + ASSERT_EQ(2, all.size()); + shared_ptr<MemTracker> not_found; + ASSERT_FALSE(MemTracker::FindTracker("child", ¬_found, p)); + + // Let's duplicate the parent. It's not recommended, but it's allowed. + shared_ptr<MemTracker> p2 = MemTracker::CreateTracker(-1, "parent"); + ASSERT_EQ(p->ToString(), p2->ToString()); + + // Only when we do a Find() operation do we crash. +#ifndef NDEBUG + const string kDeathMsg = "Multiple memtrackers with same id"; + EXPECT_DEATH({ + shared_ptr<MemTracker> found; + MemTracker::FindTracker("parent", &found); + }, kDeathMsg); + EXPECT_DEATH({ + MemTracker::FindOrCreateGlobalTracker(-1, "parent"); + }, kDeathMsg); +#endif +} + +TEST(MemTrackerTest, TestMultiThreadedRegisterAndDestroy) { + std::atomic<bool> done(false); + vector<std::thread> threads; + for (int i = 0; i < 10; i++) { + threads.emplace_back([&done]{ + while (!done.load()) { + shared_ptr<MemTracker> t = MemTracker::FindOrCreateGlobalTracker( + 1000, "foo"); + } + }); + } + + SleepFor(MonoDelta::FromSeconds(AllowSlowTests() ? 5 : 1)); + done.store(true); + for (auto& t : threads) { + t.join(); + } +} + +TEST(MemTrackerTest, TestMultiThreadedCreateFind) { + shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "p"); + shared_ptr<MemTracker> c1 = MemTracker::CreateTracker(-1, "c1", p); + std::atomic<bool> done(false); + vector<std::thread> threads; + threads.emplace_back([&]{ + while (!done.load()) { + shared_ptr<MemTracker> c1_copy; + CHECK(MemTracker::FindTracker(c1->id(), &c1_copy, p)); + } + }); + for (int i = 0; i < 5; i++) { + threads.emplace_back([&, i]{ + while (!done.load()) { + shared_ptr<MemTracker> c2 = + MemTracker::CreateTracker(-1, Substitute("ci-$0", i), p); + } + }); + } + + SleepFor(MonoDelta::FromMilliseconds(500)); + done.store(true); + for (auto& t : threads) { + t.join(); + } +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mem_tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/mem_tracker.cc b/be/src/kudu/util/mem_tracker.cc new file mode 100644 index 0000000..88e1d23 --- /dev/null +++ b/be/src/kudu/util/mem_tracker.cc @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/mem_tracker.h" + +#include <algorithm> +#include <deque> +#include <limits> +#include <list> +#include <memory> +#include <mutex> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/once.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/mutex.h" +#include "kudu/util/process_memory.h" +#include "kudu/util/status.h" + +namespace kudu { + +// NOTE: this class has been adapted from Impala, so the code style varies +// somewhat from kudu. + +using std::deque; +using std::list; +using std::shared_ptr; +using std::string; +using std::vector; +using std::weak_ptr; + +using strings::Substitute; + +// The ancestor for all trackers. Every tracker is visible from the root down. +static shared_ptr<MemTracker> root_tracker; +static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT; + +void MemTracker::CreateRootTracker() { + root_tracker.reset(new MemTracker(-1, "root", shared_ptr<MemTracker>())); + root_tracker->Init(); +} + +shared_ptr<MemTracker> MemTracker::CreateTracker(int64_t byte_limit, + const string& id, + const shared_ptr<MemTracker>& parent) { + shared_ptr<MemTracker> real_parent = parent ? parent : GetRootTracker(); + shared_ptr<MemTracker> tracker(new MemTracker(byte_limit, id, real_parent)); + real_parent->AddChildTracker(tracker); + tracker->Init(); + + return tracker; +} + +MemTracker::MemTracker(int64_t byte_limit, const string& id, shared_ptr<MemTracker> parent) + : limit_(byte_limit), + id_(id), + descr_(Substitute("memory consumption for $0", id)), + parent_(std::move(parent)), + consumption_(0) { + VLOG(1) << "Creating tracker " << ToString(); +} + +MemTracker::~MemTracker() { + VLOG(1) << "Destroying tracker " << ToString(); + if (parent_) { + DCHECK(consumption() == 0) << "Memory tracker " << ToString() + << " has unreleased consumption " << consumption(); + parent_->Release(consumption()); + + MutexLock l(parent_->child_trackers_lock_); + if (child_tracker_it_ != parent_->child_trackers_.end()) { + parent_->child_trackers_.erase(child_tracker_it_); + child_tracker_it_ = parent_->child_trackers_.end(); + } + } +} + +string MemTracker::ToString() const { + string s; + const MemTracker* tracker = this; + while (tracker) { + if (s != "") { + s += "->"; + } + s += tracker->id(); + tracker = tracker->parent_.get(); + } + return s; +} + +bool MemTracker::FindTracker(const string& id, + shared_ptr<MemTracker>* tracker, + const shared_ptr<MemTracker>& parent) { + return FindTrackerInternal(id, tracker, parent ? parent : GetRootTracker()); +} + +bool MemTracker::FindTrackerInternal(const string& id, + shared_ptr<MemTracker>* tracker, + const shared_ptr<MemTracker>& parent) { + DCHECK(parent != NULL); + + list<weak_ptr<MemTracker>> children; + { + MutexLock l(parent->child_trackers_lock_); + children = parent->child_trackers_; + } + + // Search for the matching child without holding the parent's lock. + // + // If the lock were held while searching, it'd be possible for 'child' to be + // the last live ref to a tracker, which would lead to a recursive + // acquisition of the parent lock during the 'child' destructor call. + vector<shared_ptr<MemTracker>> found; + for (const auto& child_weak : children) { + shared_ptr<MemTracker> child = child_weak.lock(); + if (child && child->id() == id) { + found.emplace_back(std::move(child)); + } + } + if (PREDICT_TRUE(found.size() == 1)) { + *tracker = found[0]; + return true; + } else if (found.size() > 1) { + LOG(DFATAL) << + Substitute("Multiple memtrackers with same id ($0) found on parent $1", + id, parent->ToString()); + *tracker = found[0]; + return true; + } + return false; +} + +shared_ptr<MemTracker> MemTracker::FindOrCreateGlobalTracker( + int64_t byte_limit, + const string& id) { + // The calls below comprise a critical section, but we can't use the root + // tracker's child_trackers_lock_ to synchronize it as the lock must be + // released during FindTrackerInternal(). Since this function creates + // globally-visible MemTrackers which are the exception rather than the rule, + // it's reasonable to synchronize their creation on a singleton lock. + static Mutex find_or_create_lock; + MutexLock l(find_or_create_lock); + + shared_ptr<MemTracker> found; + if (FindTrackerInternal(id, &found, GetRootTracker())) { + return found; + } + return CreateTracker(byte_limit, id, GetRootTracker()); +} + +void MemTracker::ListTrackers(vector<shared_ptr<MemTracker>>* trackers) { + trackers->clear(); + deque<shared_ptr<MemTracker> > to_process; + to_process.push_front(GetRootTracker()); + while (!to_process.empty()) { + shared_ptr<MemTracker> t = to_process.back(); + to_process.pop_back(); + + trackers->push_back(t); + { + MutexLock l(t->child_trackers_lock_); + for (const auto& child_weak : t->child_trackers_) { + shared_ptr<MemTracker> child = child_weak.lock(); + if (child) { + to_process.emplace_back(std::move(child)); + } + } + } + } +} + +void MemTracker::Consume(int64_t bytes) { + if (bytes < 0) { + Release(-bytes); + return; + } + + if (bytes == 0) { + return; + } + for (auto& tracker : all_trackers_) { + tracker->consumption_.IncrementBy(bytes); + } +} + +bool MemTracker::TryConsume(int64_t bytes) { + if (bytes <= 0) { + Release(-bytes); + return true; + } + + int i = 0; + // Walk the tracker tree top-down, consuming memory from each in turn. + for (i = all_trackers_.size() - 1; i >= 0; --i) { + MemTracker *tracker = all_trackers_[i]; + if (tracker->limit_ < 0) { + tracker->consumption_.IncrementBy(bytes); + } else { + if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) { + break; + } + } + } + // Everyone succeeded, return. + if (i == -1) { + return true; + } + + // Someone failed, roll back the ones that succeeded. + // TODO(todd): this doesn't roll it back completely since the max values for + // the updated trackers aren't decremented. The max values are only used + // for error reporting so this is probably okay. Rolling those back is + // pretty hard; we'd need something like 2PC. + for (int j = all_trackers_.size() - 1; j > i; --j) { + all_trackers_[j]->consumption_.IncrementBy(-bytes); + } + return false; +} + +void MemTracker::Release(int64_t bytes) { + if (bytes < 0) { + Consume(-bytes); + return; + } + + if (bytes == 0) { + return; + } + + for (auto& tracker : all_trackers_) { + tracker->consumption_.IncrementBy(-bytes); + } + process_memory::MaybeGCAfterRelease(bytes); +} + +bool MemTracker::AnyLimitExceeded() { + for (const auto& tracker : limit_trackers_) { + if (tracker->LimitExceeded()) { + return true; + } + } + return false; +} + +int64_t MemTracker::SpareCapacity() const { + int64_t result = std::numeric_limits<int64_t>::max(); + for (const auto& tracker : limit_trackers_) { + int64_t mem_left = tracker->limit() - tracker->consumption(); + result = std::min(result, mem_left); + } + return result; +} + + +void MemTracker::Init() { + // populate all_trackers_ and limit_trackers_ + MemTracker* tracker = this; + while (tracker) { + all_trackers_.push_back(tracker); + if (tracker->has_limit()) limit_trackers_.push_back(tracker); + tracker = tracker->parent_.get(); + } + DCHECK_GT(all_trackers_.size(), 0); + DCHECK_EQ(all_trackers_[0], this); +} + +void MemTracker::AddChildTracker(const shared_ptr<MemTracker>& tracker) { + MutexLock l(child_trackers_lock_); + tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker); +} + +shared_ptr<MemTracker> MemTracker::GetRootTracker() { + GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker); + return root_tracker; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/mem_tracker.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/mem_tracker.h b/be/src/kudu/util/mem_tracker.h new file mode 100644 index 0000000..a43e9a2 --- /dev/null +++ b/be/src/kudu/util/mem_tracker.h @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_MEM_TRACKER_H +#define KUDU_UTIL_MEM_TRACKER_H + +#include <list> +#include <memory> +#include <stdint.h> +#include <string> +#include <vector> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/high_water_mark.h" +#include "kudu/util/locks.h" +#include "kudu/util/mutex.h" + +namespace kudu { + +class Status; +class MemTracker; + +// A MemTracker tracks memory consumption; it contains an optional limit and is +// arranged into a tree structure such that the consumption tracked by a +// MemTracker is also tracked by its ancestors. +// +// The MemTracker hierarchy is rooted in a single static MemTracker. +// The root MemTracker always exists, and it is the common +// ancestor to all MemTrackers. All operations that discover MemTrackers begin +// at the root and work their way down the tree, while operations that deal +// with adjusting memory consumption begin at a particular MemTracker and work +// their way up the tree to the root. All MemTrackers (except the root) must +// have a parent. As a rule, all children belonging to a parent should have +// unique ids, but this is only enforced during a Find() operation to allow for +// transient duplicates (e.g. the web UI grabbing very short-lived references +// to all MemTrackers while rendering a web page). This also means id +// uniqueness only exists where it's actually needed. +// +// When a MemTracker begins its life, it has a strong reference to its parent +// and the parent has a weak reference to it. Both remain for the lifetime of +// the MemTracker. +// +// Memory consumption is tracked via calls to Consume()/Release(), either to +// the tracker itself or to one of its descendants. +// +// This class is thread-safe. +class MemTracker : public std::enable_shared_from_this<MemTracker> { + public: + ~MemTracker(); + + // Creates and adds the tracker to the tree so that it can be retrieved with + // FindTracker/FindOrCreateTracker. + // + // byte_limit < 0 means no limit; 'id' is a used as a label to uniquely identify + // the MemTracker for the below Find...() calls as well as the web UI. + // + // Use the two-argument form if there is no parent. + static std::shared_ptr<MemTracker> CreateTracker( + int64_t byte_limit, + const std::string& id, + const std::shared_ptr<MemTracker>& parent = std::shared_ptr<MemTracker>()); + + // If a tracker with the specified 'id' and 'parent' exists in the tree, sets + // 'tracker' to reference that instance. Returns false if no such tracker + // exists. + // + // Use the two-argument form if there is no parent. + // + // Note: this function will enforce that 'id' is unique amongst the children + // of 'parent'. + static bool FindTracker( + const std::string& id, + std::shared_ptr<MemTracker>* tracker, + const std::shared_ptr<MemTracker>& parent = std::shared_ptr<MemTracker>()); + + // If a global tracker with the specified 'id' exists in the tree, returns a + // shared_ptr to that instance. Otherwise, creates a new MemTracker with the + // specified byte_limit and id, parented to the root MemTracker. + // + // Note: this function will enforce that 'id' is unique amongst the children + // of the root MemTracker. + static std::shared_ptr<MemTracker> FindOrCreateGlobalTracker( + int64_t byte_limit, const std::string& id); + + // Returns a list of all the valid trackers. + static void ListTrackers(std::vector<std::shared_ptr<MemTracker> >* trackers); + + // Gets a shared_ptr to the "root" tracker, creating it if necessary. + static std::shared_ptr<MemTracker> GetRootTracker(); + + // Increases consumption of this tracker and its ancestors by 'bytes'. + void Consume(int64_t bytes); + + // Increases consumption of this tracker and its ancestors by 'bytes' only if + // they can all consume 'bytes'. If this brings any of them over, none of them + // are updated. + // Returns true if the try succeeded. + bool TryConsume(int64_t bytes); + + // Decreases consumption of this tracker and its ancestors by 'bytes'. + // + // This will also cause the process to periodically trigger tcmalloc "ReleaseMemory" + // to ensure that memory is released to the OS. + void Release(int64_t bytes); + + // Returns true if a valid limit of this tracker or one of its ancestors is + // exceeded. + bool AnyLimitExceeded(); + + // If this tracker has a limit, checks the limit and attempts to free up some memory if + // the limit is exceeded by calling any added GC functions. Returns true if the limit is + // exceeded after calling the GC functions. Returns false if there is no limit. + bool LimitExceeded() { + return limit_ >= 0 && limit_ < consumption(); + } + + // Returns the maximum consumption that can be made without exceeding the limit on + // this tracker or any of its parents. Returns int64_t::max() if there are no + // limits and a negative value if any limit is already exceeded. + int64_t SpareCapacity() const; + + + int64_t limit() const { return limit_; } + bool has_limit() const { return limit_ >= 0; } + const std::string& id() const { return id_; } + + // Returns the memory consumed in bytes. + int64_t consumption() const { + return consumption_.current_value(); + } + + int64_t peak_consumption() const { return consumption_.max_value(); } + + // Retrieve the parent tracker, or NULL If one is not set. + std::shared_ptr<MemTracker> parent() const { return parent_; } + + // Returns a textual representation of the tracker that is likely (but not + // guaranteed) to be globally unique. + std::string ToString() const; + + private: + // byte_limit < 0 means no limit + // 'id' is the label for LogUsage() and web UI. + MemTracker(int64_t byte_limit, const std::string& id, std::shared_ptr<MemTracker> parent); + + // Further initializes the tracker. + void Init(); + + // Adds tracker to child_trackers_. + void AddChildTracker(const std::shared_ptr<MemTracker>& tracker); + + // Variant of FindTracker() that must be called with a non-NULL parent. + static bool FindTrackerInternal( + const std::string& id, + std::shared_ptr<MemTracker>* tracker, + const std::shared_ptr<MemTracker>& parent); + + // Creates the root tracker. + static void CreateRootTracker(); + + int64_t limit_; + const std::string id_; + const std::string descr_; + std::shared_ptr<MemTracker> parent_; + + HighWaterMark consumption_; + + // this tracker plus all of its ancestors + std::vector<MemTracker*> all_trackers_; + // all_trackers_ with valid limits + std::vector<MemTracker*> limit_trackers_; + + // All the child trackers of this tracker. Used for error reporting and + // listing only (i.e. updating the consumption of a parent tracker does not + // update that of its children). + mutable Mutex child_trackers_lock_; + std::list<std::weak_ptr<MemTracker>> child_trackers_; + + // Iterator into parent_->child_trackers_ for this object. Stored to have O(1) + // remove. + std::list<std::weak_ptr<MemTracker>>::iterator child_tracker_it_; +}; + +// An std::allocator that manipulates a MemTracker during allocation +// and deallocation. +template<typename T, typename Alloc = std::allocator<T> > +class MemTrackerAllocator : public Alloc { + public: + typedef typename Alloc::pointer pointer; + typedef typename Alloc::const_pointer const_pointer; + typedef typename Alloc::size_type size_type; + + explicit MemTrackerAllocator(std::shared_ptr<MemTracker> mem_tracker) + : mem_tracker_(std::move(mem_tracker)) {} + + // This constructor is used for rebinding. + template <typename U> + MemTrackerAllocator(const MemTrackerAllocator<U>& allocator) + : Alloc(allocator), + mem_tracker_(allocator.mem_tracker()) { + } + + ~MemTrackerAllocator() { + } + + pointer allocate(size_type n, const_pointer hint = 0) { + // Ideally we'd use TryConsume() here to enforce the tracker's limit. + // However, that means throwing bad_alloc if the limit is exceeded, and + // it's not clear that the rest of Kudu can handle that. + mem_tracker_->Consume(n * sizeof(T)); + return Alloc::allocate(n, hint); + } + + void deallocate(pointer p, size_type n) { + Alloc::deallocate(p, n); + mem_tracker_->Release(n * sizeof(T)); + } + + // This allows an allocator<T> to be used for a different type. + template <class U> + struct rebind { + typedef MemTrackerAllocator<U, typename Alloc::template rebind<U>::other> other; + }; + + const std::shared_ptr<MemTracker>& mem_tracker() const { return mem_tracker_; } + + private: + std::shared_ptr<MemTracker> mem_tracker_; +}; + +// Convenience class that adds memory consumption to a tracker when declared, +// releasing it when the end of scope is reached. +class ScopedTrackedConsumption { + public: + ScopedTrackedConsumption(std::shared_ptr<MemTracker> tracker, + int64_t to_consume) + : tracker_(std::move(tracker)), consumption_(to_consume) { + DCHECK(tracker_); + tracker_->Consume(consumption_); + } + + void Reset(int64_t new_consumption) { + // Consume(-x) is the same as Release(x). + tracker_->Consume(new_consumption - consumption_); + consumption_ = new_consumption; + } + + ~ScopedTrackedConsumption() { + tracker_->Release(consumption_); + } + + int64_t consumption() const { return consumption_; } + + private: + std::shared_ptr<MemTracker> tracker_; + int64_t consumption_; +}; + +} // namespace kudu + +#endif // KUDU_UTIL_MEM_TRACKER_H http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memcmpable_varint-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memcmpable_varint-test.cc b/be/src/kudu/util/memcmpable_varint-test.cc new file mode 100644 index 0000000..3e5b5e0 --- /dev/null +++ b/be/src/kudu/util/memcmpable_varint-test.cc @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/util/hexdump.h" +#include "kudu/util/memcmpable_varint.h" +#include "kudu/util/random.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +// Add operator<< to print pairs, used in a test below. +// This has to be done in the 'std' namespace due to the way that +// template resolution works. +namespace std { +template<typename T1, typename T2> +ostream &operator <<(ostream &os, const pair<T1, T2> &pair) { + return os << "(" << pair.first << ", " << pair.second << ")"; +} +} + +namespace kudu { + +class TestMemcmpableVarint : public KuduTest { + protected: + TestMemcmpableVarint() : random_(SeedRandom()) {} + + // Random number generator that generates different length integers + // with equal probability -- i.e it is equally as likely to generate + // a number with 8 bits as it is to generate one with 64 bits. + // This is useful for testing varint implementations, where a uniform + // random is skewed towards generating longer integers. + uint64_t Rand64WithRandomBitLength() { + return random_.Next64() >> random_.Uniform(64); + } + + Random random_; +}; + +static void DoRoundTripTest(uint64_t to_encode) { + static faststring buf; + buf.clear(); + PutMemcmpableVarint64(&buf, to_encode); + + uint64_t decoded; + Slice slice(buf); + bool success = GetMemcmpableVarint64(&slice, &decoded); + ASSERT_TRUE(success); + ASSERT_EQ(to_encode, decoded); + ASSERT_TRUE(slice.empty()); +} + + +TEST_F(TestMemcmpableVarint, TestRoundTrip) { + // Test the first 100K integers + // (exercises the special cases for <= 67823 in the code) + for (int i = 0; i < 100000; i++) { + DoRoundTripTest(i); + } + + // Test a bunch of random integers (which are likely to be many bytes) + for (int i = 0; i < 100000; i++) { + DoRoundTripTest(random_.Next64()); + } +} + + +// Test that a composite key can be made up of multiple memcmpable +// varints strung together, and that the resulting key compares the +// same as the original pair of integers (i.e left-to-right). +TEST_F(TestMemcmpableVarint, TestCompositeKeys) { + faststring buf1; + faststring buf2; + + const int n_trials = 1000; + + for (int i = 0; i < n_trials; i++) { + buf1.clear(); + buf2.clear(); + + pair<uint64_t, uint64_t> p1 = + make_pair(Rand64WithRandomBitLength(), Rand64WithRandomBitLength()); + PutMemcmpableVarint64(&buf1, p1.first); + PutMemcmpableVarint64(&buf1, p1.second); + + pair<uint64_t, uint64_t> p2 = + make_pair(Rand64WithRandomBitLength(), Rand64WithRandomBitLength()); + PutMemcmpableVarint64(&buf2, p2.first); + PutMemcmpableVarint64(&buf2, p2.second); + + SCOPED_TRACE(testing::Message() << p1 << "\n" << HexDump(Slice(buf1)) + << " vs\n" << p2 << "\n" << HexDump(Slice(buf2))); + if (p1 < p2) { + ASSERT_LT(Slice(buf1).compare(Slice(buf2)), 0); + } else if (p1 > p2) { + ASSERT_GT(Slice(buf1).compare(Slice(buf2)), 0); + } else { + ASSERT_EQ(Slice(buf1).compare(Slice(buf2)), 0); + } + } +} + +// Similar to the above test, but instead of being randomized, specifically +// tests "interesting" values -- i.e values around the boundaries of where +// the encoding changes its number of bytes. +TEST_F(TestMemcmpableVarint, TestInterestingCompositeKeys) { + vector<uint64_t> interesting_values = { 0, 1, 240, // 1 byte + 241, 2000, 2287, // 2 bytes + 2288, 40000, 67823, // 3 bytes + 67824, 1ULL << 23, (1ULL << 24) - 1, // 4 bytes + 1ULL << 24, 1ULL << 30, (1ULL << 32) - 1 }; // 5 bytes + + faststring buf1; + faststring buf2; + + for (uint64_t v1 : interesting_values) { + for (uint64_t v2 : interesting_values) { + buf1.clear(); + pair<uint64_t, uint64_t> p1 = make_pair(v1, v2); + PutMemcmpableVarint64(&buf1, p1.first); + PutMemcmpableVarint64(&buf1, p1.second); + + for (uint64_t v3 : interesting_values) { + for (uint64_t v4 : interesting_values) { + buf2.clear(); + pair<uint64_t, uint64_t> p2 = make_pair(v3, v4); + PutMemcmpableVarint64(&buf2, p2.first); + PutMemcmpableVarint64(&buf2, p2.second); + + SCOPED_TRACE(testing::Message() << p1 << "\n" << HexDump(Slice(buf1)) + << " vs\n" << p2 << "\n" << HexDump(Slice(buf2))); + if (p1 < p2) { + ASSERT_LT(Slice(buf1).compare(Slice(buf2)), 0); + } else if (p1 > p2) { + ASSERT_GT(Slice(buf1).compare(Slice(buf2)), 0); + } else { + ASSERT_EQ(Slice(buf1).compare(Slice(buf2)), 0); + } + } + } + } + } +} + +//////////////////////////////////////////////////////////// +// Benchmarks +//////////////////////////////////////////////////////////// + +#ifdef NDEBUG +TEST_F(TestMemcmpableVarint, BenchmarkEncode) { + faststring buf; + + int sum_sizes = 0; // need to do something with results to force evaluation + + LOG_TIMING(INFO, "Encoding integers") { + for (int trial = 0; trial < 100; trial++) { + for (uint64_t i = 0; i < 1000000; i++) { + buf.clear(); + PutMemcmpableVarint64(&buf, i); + sum_sizes += buf.size(); + } + } + } + ASSERT_GT(sum_sizes, 1); // use 'sum_sizes' to avoid optimizing it out. +} + +TEST_F(TestMemcmpableVarint, BenchmarkDecode) { + faststring buf; + + // Encode 1M integers into the buffer + for (uint64_t i = 0; i < 1000000; i++) { + PutMemcmpableVarint64(&buf, i); + } + + // Decode the whole buffer 100 times. + LOG_TIMING(INFO, "Decoding integers") { + uint64_t sum_vals = 0; + for (int trial = 0; trial < 100; trial++) { + Slice s(buf); + while (!s.empty()) { + uint64_t decoded; + CHECK(GetMemcmpableVarint64(&s, &decoded)); + sum_vals += decoded; + } + } + ASSERT_GT(sum_vals, 1); // use 'sum_vals' to avoid optimizing it out. + } +} + +#endif + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/memcmpable_varint.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/memcmpable_varint.cc b/be/src/kudu/util/memcmpable_varint.cc new file mode 100644 index 0000000..e55addf --- /dev/null +++ b/be/src/kudu/util/memcmpable_varint.cc @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This file contains code derived from sqlite4, distributed in the public domain. +// +// A variable length integer is an encoding of 64-bit unsigned integers +// into between 1 and 9 bytes. The encoding is designed so that small +// (and common) values take much less space that larger values. Additional +// properties: +// +// * The length of the varint can be determined after examining just +// the first byte of the encoding. +// +// * Varints compare in numerical order using memcmp(). +// +//************************************************************************ +// +// Treat each byte of the encoding as an unsigned integer between 0 and 255. +// Let the bytes of the encoding be called A0, A1, A2, ..., A8. +// +// DECODE +// +// If A0 is between 0 and 240 inclusive, then the result is the value of A0. +// +// If A0 is between 241 and 248 inclusive, then the result is +// 240+256*(A0-241)+A1. +// +// If A0 is 249 then the result is 2288+256*A1+A2. +// +// If A0 is 250 then the result is A1..A3 as a 3-byte big-ending integer. +// +// If A0 is 251 then the result is A1..A4 as a 4-byte big-ending integer. +// +// If A0 is 252 then the result is A1..A5 as a 5-byte big-ending integer. +// +// If A0 is 253 then the result is A1..A6 as a 6-byte big-ending integer. +// +// If A0 is 254 then the result is A1..A7 as a 7-byte big-ending integer. +// +// If A0 is 255 then the result is A1..A8 as a 8-byte big-ending integer. +// +// ENCODE +// +// Let the input value be V. +// +// If V<=240 then output a single by A0 equal to V. +// +// If V<=2287 then output A0 as (V-240)/256 + 241 and A1 as (V-240)%256. +// +// If V<=67823 then output A0 as 249, A1 as (V-2288)/256, and A2 +// as (V-2288)%256. +// +// If V<=16777215 then output A0 as 250 and A1 through A3 as a big-endian +// 3-byte integer. +// +// If V<=4294967295 then output A0 as 251 and A1..A4 as a big-ending +// 4-byte integer. +// +// If V<=1099511627775 then output A0 as 252 and A1..A5 as a big-ending +// 5-byte integer. +// +// If V<=281474976710655 then output A0 as 253 and A1..A6 as a big-ending +// 6-byte integer. +// +// If V<=72057594037927935 then output A0 as 254 and A1..A7 as a +// big-ending 7-byte integer. +// +// Otherwise then output A0 as 255 and A1..A8 as a big-ending 8-byte integer. +// +// SUMMARY +// +// Bytes Max Value Digits +// ------- --------- --------- +// 1 240 2.3 +// 2 2287 3.3 +// 3 67823 4.8 +// 4 2**24-1 7.2 +// 5 2**32-1 9.6 +// 6 2**40-1 12.0 +// 7 2**48-1 14.4 +// 8 2**56-1 16.8 +// 9 2**64-1 19.2 +// + +#include <glog/logging.h> + +#include "kudu/gutil/endian.h" +#include "kudu/util/faststring.h" +#include "kudu/util/memcmpable_varint.h" +#include "kudu/util/slice.h" + +namespace kudu { + +//////////////////////////////////////////////////////////// +// Begin code ripped from sqlite4 +//////////////////////////////////////////////////////////// + +// This function is borrowed from sqlite4/varint.c +static void varintWrite32(uint8_t *z, uint32_t y) { + z[0] = (uint8_t)(y>>24); + z[1] = (uint8_t)(y>>16); + z[2] = (uint8_t)(y>>8); + z[3] = (uint8_t)(y); +} + + +// Write a varint into z[]. The buffer z[] must be at least 9 characters +// long to accommodate the largest possible varint. Return the number of +// bytes of z[] used. +// +// This function is borrowed from sqlite4/varint.c +static size_t sqlite4PutVarint64(uint8_t *z, uint64_t x) { + uint64_t w, y; + if (x <= 240) { + z[0] = (uint8_t)x; + return 1; + } + if (x <= 2287) { + y = (uint64_t)(x - 240); + z[0] = (uint8_t)(y/256 + 241); + z[1] = (uint8_t)(y%256); + return 2; + } + if (x <= 67823) { + y = (uint64_t)(x - 2288); + z[0] = 249; + z[1] = (uint8_t)(y/256); + z[2] = (uint8_t)(y%256); + return 3; + } + y = (uint64_t)x; + w = (uint64_t)(x>>32); + if (w == 0) { + if (y <= 16777215) { + z[0] = 250; + z[1] = (uint8_t)(y>>16); + z[2] = (uint8_t)(y>>8); + z[3] = (uint8_t)(y); + return 4; + } + z[0] = 251; + varintWrite32(z+1, y); + return 5; + } + if (w <= 255) { + z[0] = 252; + z[1] = (uint8_t)w; + varintWrite32(z+2, y); + return 6; + } + if (w <= 65535) { + z[0] = 253; + z[1] = (uint8_t)(w>>8); + z[2] = (uint8_t)w; + varintWrite32(z+3, y); + return 7; + } + if (w <= 16777215) { + z[0] = 254; + z[1] = (uint8_t)(w>>16); + z[2] = (uint8_t)(w>>8); + z[3] = (uint8_t)w; + varintWrite32(z+4, y); + return 8; + } + z[0] = 255; + varintWrite32(z+1, w); + varintWrite32(z+5, y); + return 9; +} + +// Decode the varint in the first n bytes z[]. Write the integer value +// into *pResult and return the number of bytes in the varint. +// +// If the decode fails because there are not enough bytes in z[] then +// return 0; +// +// Borrowed from sqlite4 varint.c +static int sqlite4GetVarint64( + const uint8_t *z, + int n, + uint64_t *pResult) { + unsigned int x; + if ( n < 1) return 0; + if (z[0] <= 240) { + *pResult = z[0]; + return 1; + } + if (z[0] <= 248) { + if ( n < 2) return 0; + *pResult = (z[0]-241)*256 + z[1] + 240; + return 2; + } + if (n < z[0]-246 ) return 0; + if (z[0] == 249) { + *pResult = 2288 + 256*z[1] + z[2]; + return 3; + } + if (z[0] == 250) { + *pResult = (z[1]<<16) + (z[2]<<8) + z[3]; + return 4; + } + x = (z[1]<<24) + (z[2]<<16) + (z[3]<<8) + z[4]; + if (z[0] == 251) { + *pResult = x; + return 5; + } + if (z[0] == 252) { + *pResult = (((uint64_t)x)<<8) + z[5]; + return 6; + } + if (z[0] == 253) { + *pResult = (((uint64_t)x)<<16) + (z[5]<<8) + z[6]; + return 7; + } + if (z[0] == 254) { + *pResult = (((uint64_t)x)<<24) + (z[5]<<16) + (z[6]<<8) + z[7]; + return 8; + } + *pResult = (((uint64_t)x)<<32) + + (0xffffffff & ((z[5]<<24) + (z[6]<<16) + (z[7]<<8) + z[8])); + return 9; +} + +//////////////////////////////////////////////////////////// +// End code ripped from sqlite4 +//////////////////////////////////////////////////////////// + +void PutMemcmpableVarint64(faststring *dst, uint64_t value) { + uint8_t buf[9]; + int used = sqlite4PutVarint64(buf, value); + DCHECK_LE(used, sizeof(buf)); + dst->append(buf, used); +} + +bool GetMemcmpableVarint64(Slice *input, uint64_t *value) { + size_t size = sqlite4GetVarint64(input->data(), input->size(), value); + input->remove_prefix(size); + return size > 0; +} + + +} // namespace kudu
