http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/disk-io-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc index 089ada1..0f93875 100644 --- a/be/src/runtime/disk-io-mgr-test.cc +++ b/be/src/runtime/disk-io-mgr-test.cc @@ -635,7 +635,7 @@ TEST_F(DiskIoMgrTest, MemLimits) { DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE); ASSERT_OK(io_mgr.Init(&root_mem_tracker)); - MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker); + MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker); DiskIoRequestContext* reader; ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker)); @@ -950,7 +950,7 @@ TEST_F(DiskIoMgrTest, Buffers) { ASSERT_OK(io_mgr.Init(&root_mem_tracker)); ASSERT_EQ(root_mem_tracker.consumption(), 0); - MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker); + MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker); DiskIoRequestContext* reader; ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 5df69ed..dd45932 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -370,9 +370,9 @@ DiskIoMgr::~DiskIoMgr() { Status DiskIoMgr::Init(MemTracker* process_mem_tracker) { DCHECK(process_mem_tracker != NULL); free_buffer_mem_tracker_.reset( - new MemTracker(-1, -1, "Free Disk IO Buffers", process_mem_tracker, false)); + new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false)); unowned_buffer_mem_tracker_.reset( - new MemTracker(-1, -1, "Untracked Disk IO Buffers", process_mem_tracker, false)); + new MemTracker(-1, "Untracked Disk IO Buffers", process_mem_tracker, false)); // If we hit the process limit, see if we can reclaim some memory by removing // previously allocated (but unused) io buffers. process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 38e9f96..1b3fa14 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -24,7 +24,6 @@ #include <gutil/strings/substitute.h> #include "common/logging.h" -#include "resourcebroker/resource-broker.h" #include "runtime/backend-client.h" #include "runtime/client-cache.h" #include "runtime/coordinator.h" @@ -51,7 +50,6 @@ #include "util/webserver.h" #include "util/mem-info.h" #include "util/debug-util.h" -#include "util/cgroups-mgr.h" #include "util/memory-metrics.h" #include "util/pretty-printer.h" #include "util/thread-pool.h" @@ -84,49 +82,21 @@ DECLARE_int32(num_cores); DECLARE_int32(be_port); DECLARE_string(mem_limit); -DEFINE_bool(enable_rm, false, "Whether to enable resource management. If enabled, " - "-fair_scheduler_allocation_path is required."); -DEFINE_int32(llama_callback_port, 28000, - "Port where Llama notification callback should be started"); -// TODO: Deprecate llama_host and llama_port in favor of the new llama_hostports. -// This needs to be coordinated with CM. -DEFINE_string(llama_host, "", - "Host of Llama service that the resource broker should connect to"); -DEFINE_int32(llama_port, 15000, - "Port of Llama service that the resource broker should connect to"); -DEFINE_string(llama_addresses, "", - "Llama availability group given as a comma-separated list of hostports."); -DEFINE_int64(llama_registration_timeout_secs, 30, - "Maximum number of seconds that Impala will attempt to (re-)register " - "with Llama before aborting the triggering action with an error " - "(e.g. Impalad startup or a Llama RPC request). " - "A setting of -1 means try indefinitely."); -DEFINE_int64(llama_registration_wait_secs, 3, - "Number of seconds to wait between attempts during Llama registration."); -DEFINE_int64(llama_max_request_attempts, 5, - "Maximum number of times a non-registration Llama RPC request " - "(reserve/expand/release, etc.) is retried until the request is aborted. " - "An attempt is counted once Impala is registered with Llama, i.e., a " - "request survives at most llama_max_request_attempts-1 re-registrations."); -DEFINE_string(cgroup_hierarchy_path, "", "If Resource Management is enabled, this must " - "be set to the Impala-writeable root of the cgroups hierarchy into which execution " - "threads are assigned."); -DEFINE_string(staging_cgroup, "impala_staging", "Name of the cgroup that a query's " - "execution threads are moved into once the query completes."); - -// Use a low default value because the reconnection logic is performed manually -// for the purpose of faster Llama failover (otherwise we may try to reconnect to the -// inactive Llama for a long time). -DEFINE_int32(resource_broker_cnxn_attempts, 1, "The number of times to retry an " - "RPC connection to Llama. A setting of 0 means retry indefinitely"); -DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "The interval, in ms, " - "to wait between attempts to make an RPC connection to the Llama."); -DEFINE_int32(resource_broker_send_timeout, 0, "Time to wait, in ms, " - "for the underlying socket of an RPC to Llama to successfully send data. " - "A setting of 0 means the socket will wait indefinitely."); -DEFINE_int32(resource_broker_recv_timeout, 0, "Time to wait, in ms, " - "for the underlying socket of an RPC to Llama to successfully receive data. " - "A setting of 0 means the socket will wait indefinitely."); +// TODO: Remove the following RM-related flags in Impala 3.0. +DEFINE_bool(enable_rm, false, "Deprecated"); +DEFINE_int32(llama_callback_port, 28000, "Deprecated"); +DEFINE_string(llama_host, "", "Deprecated"); +DEFINE_int32(llama_port, 15000, "Deprecated"); +DEFINE_string(llama_addresses, "", "Deprecated"); +DEFINE_int64(llama_registration_timeout_secs, 30, "Deprecated"); +DEFINE_int64(llama_registration_wait_secs, 3, "Deprecated"); +DEFINE_int64(llama_max_request_attempts, 5, "Deprecated"); +DEFINE_string(cgroup_hierarchy_path, "", "Deprecated"); +DEFINE_string(staging_cgroup, "impala_staging", "Deprecated"); +DEFINE_int32(resource_broker_cnxn_attempts, 1, "Deprecated"); +DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "Deprecated"); +DEFINE_int32(resource_broker_send_timeout, 0, "Deprecated"); +DEFINE_int32(resource_broker_recv_timeout, 0, "Deprecated"); DEFINE_int32(coordinator_rpc_threads, 12, "(Advanced) Number of threads available to " "start fragments on remote Impala daemons."); @@ -145,11 +115,6 @@ DEFINE_int32(catalog_client_connection_num_retries, 3, "Retry catalog connection DEFINE_int32(catalog_client_rpc_timeout_ms, 0, "(Advanced) The underlying TSocket " "send/recv timeout in milliseconds for a catalog client RPC."); -// The key for a variable set in Impala's test environment only, to allow the -// resource-broker to correctly map node addresses into a form that Llama understand. -const static string PSEUDO_DISTRIBUTED_CONFIG_KEY = - "yarn.scheduler.include-port-in-node-name"; - const static string DEFAULT_FS = "fs.defaultFS"; namespace impala { @@ -160,35 +125,29 @@ ExecEnv::ExecEnv() : metrics_(new MetricGroup("impala-metrics")), stream_mgr_(new DataStreamMgr(metrics_.get())), impalad_client_cache_( - new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, - 0, FLAGS_backend_client_rpc_timeout_ms, - FLAGS_backend_client_rpc_timeout_ms, - "", !FLAGS_ssl_client_ca_certificate.empty())), + new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0, + FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "", + !FLAGS_ssl_client_ca_certificate.empty())), catalogd_client_cache_( - new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, - 0, FLAGS_catalog_client_rpc_timeout_ms, - FLAGS_catalog_client_rpc_timeout_ms, - "", !FLAGS_ssl_client_ca_certificate.empty())), + new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, 0, + FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "", + !FLAGS_ssl_client_ca_certificate.empty())), htable_factory_(new HBaseTableFactory()), disk_io_mgr_(new DiskIoMgr()), webserver_(new Webserver()), mem_tracker_(NULL), thread_mgr_(new ThreadResourceMgr), - cgroups_mgr_(NULL), hdfs_op_thread_pool_( CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)), tmp_file_mgr_(new TmpFileMgr), request_pool_service_(new RequestPoolService(metrics_.get())), frontend_(new Frontend()), - fragment_exec_thread_pool_( - new CallableThreadPool("coordinator-fragment-rpc", "worker", - FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), + fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc", + "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), enable_webserver_(FLAGS_enable_webserver), is_fe_tests_(false), - backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)), - is_pseudo_distributed_llama_(false) { - if (FLAGS_enable_rm) InitRm(); + backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { // Initialize the scheduler either dynamically (with a statestore) or statically (with // a standalone single backend) if (FLAGS_use_statestore) { @@ -202,33 +161,30 @@ ExecEnv::ExecEnv() subscriber_address, statestore_address, metrics_.get())); scheduler_.reset(new SimpleScheduler(statestore_subscriber_.get(), - statestore_subscriber_->id(), backend_address_, metrics_.get(), - webserver_.get(), resource_broker_.get(), request_pool_service_.get())); + statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(), + request_pool_service_.get())); } else { vector<TNetworkAddress> addresses; addresses.push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)); - scheduler_.reset(new SimpleScheduler(addresses, metrics_.get(), webserver_.get(), - resource_broker_.get(), request_pool_service_.get())); + scheduler_.reset(new SimpleScheduler( + addresses, metrics_.get(), webserver_.get(), request_pool_service_.get())); } if (exec_env_ == NULL) exec_env_ = this; - if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get()); } // TODO: Need refactor to get rid of duplicated code. ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, - int webserver_port, const string& statestore_host, int statestore_port) + int webserver_port, const string& statestore_host, int statestore_port) : metrics_(new MetricGroup("impala-metrics")), stream_mgr_(new DataStreamMgr(metrics_.get())), impalad_client_cache_( - new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, - 0, FLAGS_backend_client_rpc_timeout_ms, - FLAGS_backend_client_rpc_timeout_ms, - "", !FLAGS_ssl_client_ca_certificate.empty())), + new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0, + FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "", + !FLAGS_ssl_client_ca_certificate.empty())), catalogd_client_cache_( - new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, - 0, FLAGS_catalog_client_rpc_timeout_ms, - FLAGS_catalog_client_rpc_timeout_ms, - "", !FLAGS_ssl_client_ca_certificate.empty())), + new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, 0, + FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "", + !FLAGS_ssl_client_ca_certificate.empty())), htable_factory_(new HBaseTableFactory()), disk_io_mgr_(new DiskIoMgr()), webserver_(new Webserver(webserver_port)), @@ -238,16 +194,13 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)), tmp_file_mgr_(new TmpFileMgr), frontend_(new Frontend()), - fragment_exec_thread_pool_( - new CallableThreadPool("coordinator-fragment-rpc", "worker", - FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), + fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc", + "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), enable_webserver_(FLAGS_enable_webserver && webserver_port > 0), is_fe_tests_(false), - backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)), - is_pseudo_distributed_llama_(false) { + backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { request_pool_service_.reset(new RequestPoolService(metrics_.get())); - if (FLAGS_enable_rm) InitRm(); if (FLAGS_use_statestore && statestore_port > 0) { TNetworkAddress subscriber_address = @@ -260,73 +213,23 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, subscriber_address, statestore_address, metrics_.get())); scheduler_.reset(new SimpleScheduler(statestore_subscriber_.get(), - statestore_subscriber_->id(), backend_address_, metrics_.get(), - webserver_.get(), resource_broker_.get(), request_pool_service_.get())); + statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(), + request_pool_service_.get())); } else { vector<TNetworkAddress> addresses; addresses.push_back(MakeNetworkAddress(hostname, backend_port)); - scheduler_.reset(new SimpleScheduler(addresses, metrics_.get(), webserver_.get(), - resource_broker_.get(), request_pool_service_.get())); + scheduler_.reset(new SimpleScheduler( + addresses, metrics_.get(), webserver_.get(), request_pool_service_.get())); } if (exec_env_ == NULL) exec_env_ = this; - if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get()); } -void ExecEnv::InitRm() { - // Unique addresses from FLAGS_llama_addresses and FLAGS_llama_host/FLAGS_llama_port. - vector<TNetworkAddress> llama_addresses; - if (!FLAGS_llama_addresses.empty()) { - vector<string> components; - split(components, FLAGS_llama_addresses, is_any_of(","), token_compress_on); - for (int i = 0; i < components.size(); ++i) { - to_lower(components[i]); - TNetworkAddress llama_address = MakeNetworkAddress(components[i]); - if (find(llama_addresses.begin(), llama_addresses.end(), llama_address) - == llama_addresses.end()) { - llama_addresses.push_back(llama_address); - } - } - } - // Add Llama hostport from deprecated flags (if it does not already exist). - if (!FLAGS_llama_host.empty()) { - to_lower(FLAGS_llama_host); - TNetworkAddress llama_address = - MakeNetworkAddress(FLAGS_llama_host, FLAGS_llama_port); - if (find(llama_addresses.begin(), llama_addresses.end(), llama_address) - == llama_addresses.end()) { - llama_addresses.push_back(llama_address); - } - } - for (int i = 0; i < llama_addresses.size(); ++i) { - LOG(INFO) << "Llama address " << i << ": " << llama_addresses[i]; - } - - TNetworkAddress llama_callback_address = - MakeNetworkAddress(FLAGS_hostname, FLAGS_llama_callback_port); - resource_broker_.reset(new ResourceBroker(llama_addresses, llama_callback_address, - metrics_.get())); - cgroups_mgr_.reset(new CgroupsMgr(metrics_.get())); - - TGetHadoopConfigRequest config_request; - config_request.__set_name(PSEUDO_DISTRIBUTED_CONFIG_KEY); - TGetHadoopConfigResponse config_response; - frontend_->GetHadoopConfig(config_request, &config_response); - if (config_response.__isset.value) { - to_lower(config_response.value); - is_pseudo_distributed_llama_ = (config_response.value == "true"); - } else { - is_pseudo_distributed_llama_ = false; - } - if (is_pseudo_distributed_llama_) { - LOG(INFO) << "Pseudo-distributed Llama cluster detected"; - } -} ExecEnv::~ExecEnv() { } Status ExecEnv::InitForFeTests() { - mem_tracker_.reset(new MemTracker(-1, -1, "Process")); + mem_tracker_.reset(new MemTracker(-1, "Process")); is_fe_tests_ = true; return Status::OK(); } @@ -334,15 +237,6 @@ Status ExecEnv::InitForFeTests() { Status ExecEnv::StartServices() { LOG(INFO) << "Starting global services"; - if (FLAGS_enable_rm) { - // Initialize the resource broker to make sure the Llama is up and reachable. - DCHECK(resource_broker_.get() != NULL); - RETURN_IF_ERROR(resource_broker_->Init()); - DCHECK(cgroups_mgr_.get() != NULL); - RETURN_IF_ERROR( - cgroups_mgr_->Init(FLAGS_cgroup_hierarchy_path, FLAGS_staging_cgroup)); - } - // Initialize global memory limit. // Depending on the system configuration, we will have to calculate the process // memory limit either based on the available physical memory, or if overcommitting @@ -397,7 +291,7 @@ Status ExecEnv::StartServices() { #ifndef ADDRESS_SANITIZER // Limit of -1 means no memory limit. mem_tracker_.reset(new MemTracker(TcmallocMetric::PHYSICAL_BYTES_RESERVED, - bytes_limit > 0 ? bytes_limit : -1, -1, "Process")); + bytes_limit > 0 ? bytes_limit : -1, "Process")); // Since tcmalloc does not free unused memory, we may exceed the process mem limit even // if Impala is not actually using that much memory. Add a callback to free any unused @@ -407,7 +301,7 @@ Status ExecEnv::StartServices() { #else // tcmalloc metrics aren't defined in ASAN builds, just use the default behavior to // track process memory usage (sum of all children trackers). - mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, -1, "Process")); + mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, "Process")); #endif mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 1f37572..303876f 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -29,7 +29,6 @@ namespace impala { class CallableThreadPool; -class CgroupsMgr; class DataStreamMgr; class DiskIoMgr; class FragmentMgr; @@ -42,7 +41,6 @@ class MemTracker; class MetricGroup; class QueryResourceMgr; class RequestPoolService; -class ResourceBroker; class Scheduler; class StatestoreSubscriber; class TestExecEnv; @@ -89,7 +87,6 @@ class ExecEnv { MetricGroup* metrics() { return metrics_.get(); } MemTracker* process_mem_tracker() { return mem_tracker_.get(); } ThreadResourceMgr* thread_mgr() { return thread_mgr_.get(); } - CgroupsMgr* cgroups_mgr() { return cgroups_mgr_.get(); } HdfsOpThreadPool* hdfs_op_thread_pool() { return hdfs_op_thread_pool_.get(); } TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); } CallableThreadPool* fragment_exec_thread_pool() { @@ -102,7 +99,6 @@ class ExecEnv { void set_enable_webserver(bool enable) { enable_webserver_ = enable; } - ResourceBroker* resource_broker() { return resource_broker_.get(); } Scheduler* scheduler() { return scheduler_.get(); } StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); } @@ -119,11 +115,6 @@ class ExecEnv { /// differently. bool is_fe_tests() { return is_fe_tests_; } - /// Returns true if the Llama in use is pseudo-distributed, used for development - /// purposes. The pseudo-distributed version has special requirements for specifying - /// resource locations. - bool is_pseudo_distributed_llama() { return is_pseudo_distributed_llama_; } - /// Returns the configured defaultFs set in core-site.xml string default_fs() { return default_fs_; } @@ -131,7 +122,6 @@ class ExecEnv { /// Leave protected so that subclasses can override boost::scoped_ptr<MetricGroup> metrics_; boost::scoped_ptr<DataStreamMgr> stream_mgr_; - boost::scoped_ptr<ResourceBroker> resource_broker_; boost::scoped_ptr<Scheduler> scheduler_; boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_; boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_; @@ -141,7 +131,6 @@ class ExecEnv { boost::scoped_ptr<Webserver> webserver_; boost::scoped_ptr<MemTracker> mem_tracker_; boost::scoped_ptr<ThreadResourceMgr> thread_mgr_; - boost::scoped_ptr<CgroupsMgr> cgroups_mgr_; boost::scoped_ptr<HdfsOpThreadPool> hdfs_op_thread_pool_; boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_; boost::scoped_ptr<RequestPoolService> request_pool_service_; @@ -161,16 +150,8 @@ class ExecEnv { /// Address of the Impala backend server instance TNetworkAddress backend_address_; - /// True if the cluster has set 'yarn.scheduler.include-port-in-node-name' to true, - /// indicating that this cluster is pseudo-distributed. Should not be true in real - /// deployments. - bool is_pseudo_distributed_llama_; - /// fs.defaultFs value set in core-site.xml std::string default_fs_; - - /// Initialise cgroups manager, detect test RM environment and init resource broker. - void InitRm(); }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc index 99cec9a..604923c 100644 --- a/be/src/runtime/mem-pool-test.cc +++ b/be/src/runtime/mem-pool-test.cc @@ -224,8 +224,8 @@ TEST(MemPoolTest, ReturnPartial) { TEST(MemPoolTest, Limits) { MemTracker limit3(4 * MemPoolTest::INITIAL_CHUNK_SIZE); - MemTracker limit1(2 * MemPoolTest::INITIAL_CHUNK_SIZE, -1, "", &limit3); - MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, -1, "", &limit3); + MemTracker limit1(2 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3); + MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3); MemPool* p1 = new MemPool(&limit1); EXPECT_FALSE(limit1.AnyLimitExceeded()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc index 87eec14..546b8ab 100644 --- a/be/src/runtime/mem-tracker-test.cc +++ b/be/src/runtime/mem-tracker-test.cc @@ -64,7 +64,7 @@ TEST(MemTestTest, ConsumptionMetric) { UIntGauge metric(md, 0); EXPECT_EQ(metric.value(), 0); - MemTracker t(&metric, 100, -1, ""); + MemTracker t(&metric, 100, ""); EXPECT_TRUE(t.has_limit()); EXPECT_EQ(t.consumption(), 0); @@ -112,8 +112,8 @@ TEST(MemTestTest, ConsumptionMetric) { TEST(MemTestTest, TrackerHierarchy) { MemTracker p(100); - MemTracker c1(80, -1, "", &p); - MemTracker c2(50, -1, "", &p); + MemTracker c1(80, "", &p); + MemTracker c2(50, "", &p); // everything below limits c1.Consume(60); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index a9ceb76..a9de160 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -23,10 +23,8 @@ #include <gutil/strings/substitute.h> #include "bufferpool/reservation-tracker-counters.h" -#include "resourcebroker/resource-broker.h" #include "runtime/exec-env.h" #include "runtime/runtime-state.h" -#include "scheduling/query-resource-mgr.h" #include "util/debug-util.h" #include "util/mem-info.h" #include "util/pretty-printer.h" @@ -49,10 +47,9 @@ AtomicInt64 MemTracker::released_memory_since_gc_; // Name for request pool MemTrackers. '$0' is replaced with the pool name. const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0"; -MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const string& label, - MemTracker* parent, bool log_usage_if_zero) +MemTracker::MemTracker( + int64_t byte_limit, const string& label, MemTracker* parent, bool log_usage_if_zero) : limit_(byte_limit), - rm_reserved_limit_(rm_reserved_limit), label_(label), parent_(parent), consumption_(&local_counter_), @@ -60,7 +57,6 @@ MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const stri consumption_metric_(NULL), auto_unregister_(false), log_usage_if_zero_(log_usage_if_zero), - query_resource_mgr_(NULL), num_gcs_metric_(NULL), bytes_freed_by_last_gc_metric_(NULL), bytes_over_limit_metric_(NULL), @@ -69,11 +65,9 @@ MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const stri Init(); } -MemTracker::MemTracker( - RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit, +MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit, const std::string& label, MemTracker* parent) : limit_(byte_limit), - rm_reserved_limit_(rm_reserved_limit), label_(label), parent_(parent), consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)), @@ -81,7 +75,6 @@ MemTracker::MemTracker( consumption_metric_(NULL), auto_unregister_(false), log_usage_if_zero_(true), - query_resource_mgr_(NULL), num_gcs_metric_(NULL), bytes_freed_by_last_gc_metric_(NULL), bytes_over_limit_metric_(NULL), @@ -90,10 +83,9 @@ MemTracker::MemTracker( Init(); } -MemTracker::MemTracker(UIntGauge* consumption_metric, - int64_t byte_limit, int64_t rm_reserved_limit, const string& label) +MemTracker::MemTracker( + UIntGauge* consumption_metric, int64_t byte_limit, const string& label) : limit_(byte_limit), - rm_reserved_limit_(rm_reserved_limit), label_(label), parent_(NULL), consumption_(&local_counter_), @@ -101,7 +93,6 @@ MemTracker::MemTracker(UIntGauge* consumption_metric, consumption_metric_(consumption_metric), auto_unregister_(false), log_usage_if_zero_(true), - query_resource_mgr_(NULL), num_gcs_metric_(NULL), bytes_freed_by_last_gc_metric_(NULL), bytes_over_limit_metric_(NULL), @@ -111,7 +102,6 @@ MemTracker::MemTracker(UIntGauge* consumption_metric, void MemTracker::Init() { DCHECK_GE(limit_, -1); - DCHECK(rm_reserved_limit_ == -1 || limit_ == -1 || rm_reserved_limit_ <= limit_); // populate all_trackers_ and limit_trackers_ MemTracker* tracker = this; while (tracker != NULL) { @@ -173,9 +163,8 @@ MemTracker* MemTracker::GetRequestPoolMemTracker(const string& pool_name, } else { if (parent == NULL) return NULL; // First time this pool_name registered, make a new object. - MemTracker* tracker = new MemTracker(-1, -1, - Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name), - parent); + MemTracker* tracker = new MemTracker( + -1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name), parent); tracker->auto_unregister_ = true; tracker->pool_name_ = pool_name; pool_to_mem_trackers_[pool_name] = tracker; @@ -184,8 +173,7 @@ MemTracker* MemTracker::GetRequestPoolMemTracker(const string& pool_name, } shared_ptr<MemTracker> MemTracker::GetQueryMemTracker( - const TUniqueId& id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker* parent, - QueryResourceMgr* res_mgr) { + const TUniqueId& id, int64_t byte_limit, MemTracker* parent) { if (byte_limit != -1) { if (byte_limit > MemInfo::physical_mem()) { LOG(WARNING) << "Memory limit " @@ -210,12 +198,11 @@ shared_ptr<MemTracker> MemTracker::GetQueryMemTracker( } else { // First time this id registered, make a new object. Give a shared ptr to // the caller and put a weak ptr in the map. - shared_ptr<MemTracker> tracker = make_shared<MemTracker>(byte_limit, - rm_reserved_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent); + shared_ptr<MemTracker> tracker = make_shared<MemTracker>( + byte_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent); tracker->auto_unregister_ = true; tracker->query_id_ = id; request_to_mem_trackers_[id] = tracker; - if (res_mgr != NULL) tracker->SetQueryResourceMgr(res_mgr); return tracker; } } @@ -278,9 +265,6 @@ string MemTracker::LogUsage(const string& prefix) const { ss << prefix << label_ << ":"; if (CheckLimitExceeded()) ss << " memory limit exceeded."; if (limit_ > 0) ss << " Limit=" << PrettyPrinter::Print(limit_, TUnit::BYTES); - if (rm_reserved_limit_ > 0) { - ss << " RM Limit=" << PrettyPrinter::Print(rm_reserved_limit_, TUnit::BYTES); - } int64_t total = consumption(); int64_t peak = consumption_->value(); @@ -358,45 +342,4 @@ void MemTracker::GcTcmalloc() { #endif } -bool MemTracker::ExpandRmReservation(int64_t bytes) { - if (query_resource_mgr_ == NULL || rm_reserved_limit_ == -1) return false; - // TODO: Make this asynchronous after IO mgr changes to use TryConsume() are done. - lock_guard<mutex> l(resource_acquisition_lock_); - int64_t requested = consumption_->current_value() + bytes; - // Can't exceed the hard limit under any circumstance - if (requested >= limit_ && limit_ != -1) return false; - // Test to see if we can satisfy the limit anyhow; maybe a different request was already - // in flight. - if (requested < rm_reserved_limit_) return true; - - int64_t bytes_allocated; - Status status = query_resource_mgr_->RequestMemExpansion(bytes, &bytes_allocated); - if (!status.ok()) { - LOG(INFO) << "Failed to expand memory limit by " - << PrettyPrinter::Print(bytes, TUnit::BYTES) << ": " - << status.GetDetail(); - return false; - } - - for (const MemTracker* tracker: limit_trackers_) { - if (tracker == this) continue; - if (tracker->consumption_->current_value() + bytes_allocated > tracker->limit_) { - // TODO: Allocation may be larger than needed and might exceed some parent - // tracker limit. IMPALA-2182. - VLOG_RPC << "Failed to use " << bytes_allocated << " bytes allocated over " - << tracker->label() << " tracker limit=" << tracker->limit_ - << " consumption=" << tracker->consumption(); - // Don't adjust our limit; rely on query tear-down to release the resource. - return false; - } - } - - rm_reserved_limit_ += bytes_allocated; - // Resource broker might give us more than we ask for - if (limit_ != -1) rm_reserved_limit_ = min(rm_reserved_limit_, limit_); - VLOG_RPC << "Reservation bytes_allocated=" << bytes_allocated << " rm_reserved_limit=" - << rm_reserved_limit_ << " limit=" << limit_; - return true; -} - } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index e3548cc..a2c3e9b 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -39,7 +39,6 @@ namespace impala { class ReservationTrackerCounters; class MemTracker; -class QueryResourceMgr; /// A MemTracker tracks memory consumption; it contains an optional limit /// and can be arranged into a tree structure such that the consumption tracked @@ -66,19 +65,17 @@ class MemTracker { /// 'label' is the label used in the usage string (LogUsage()) /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be included /// in LogUsage() output if consumption is 0. - MemTracker(int64_t byte_limit = -1, int64_t rm_reserved_limit = -1, - const std::string& label = std::string(), MemTracker* parent = NULL, - bool log_usage_if_zero = true); + MemTracker(int64_t byte_limit = -1, const std::string& label = std::string(), + MemTracker* parent = NULL, bool log_usage_if_zero = true); /// C'tor for tracker for which consumption counter is created as part of a profile. /// The counter is created with name COUNTER_NAME. - MemTracker(RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit = -1, + MemTracker(RuntimeProfile* profile, int64_t byte_limit, const std::string& label = std::string(), MemTracker* parent = NULL); /// C'tor for tracker that uses consumption_metric as the consumption value. /// Consume()/Release() can still be called. This is used for the process tracker. - MemTracker(UIntGauge* consumption_metric, - int64_t byte_limit = -1, int64_t rm_reserved_limit = -1, + MemTracker(UIntGauge* consumption_metric, int64_t byte_limit = -1, const std::string& label = std::string()); ~MemTracker(); @@ -98,9 +95,8 @@ class MemTracker { /// 'parent' as the parent tracker. /// byte_limit and parent must be the same for all GetMemTracker() calls with the /// same id. - static std::shared_ptr<MemTracker> GetQueryMemTracker(const TUniqueId& id, - int64_t byte_limit, int64_t rm_reserved_limit, MemTracker* parent, - QueryResourceMgr* res_mgr); + static std::shared_ptr<MemTracker> GetQueryMemTracker( + const TUniqueId& id, int64_t byte_limit, MemTracker* parent); /// Returns a MemTracker object for request pool 'pool_name'. Calling this with the same /// 'pool_name' will return the same MemTracker object. This is used to track the local @@ -112,14 +108,6 @@ class MemTracker { static MemTracker* GetRequestPoolMemTracker(const std::string& pool_name, MemTracker* parent); - /// Returns the minimum of limit and rm_reserved_limit - int64_t effective_limit() const { - // TODO: maybe no limit should be MAX_LONG? - DCHECK(rm_reserved_limit_ <= limit_ || limit_ == -1); - if (rm_reserved_limit_ == -1) return limit_; - return rm_reserved_limit_; - } - /// Increases consumption of this tracker and its ancestors by 'bytes'. void Consume(int64_t bytes) { if (bytes <= 0) { @@ -166,49 +154,33 @@ class MemTracker { if (consumption_metric_ != NULL) RefreshConsumptionFromMetric(); if (UNLIKELY(bytes <= 0)) return true; int i; - // Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent - // won't accommodate the change. + // Walk the tracker tree top-down. for (i = all_trackers_.size() - 1; i >= 0; --i) { MemTracker* tracker = all_trackers_[i]; - int64_t limit = tracker->effective_limit(); + const int64_t limit = tracker->limit(); if (limit < 0) { tracker->consumption_->Add(bytes); // No limit at this tracker. } else { - // If TryConsume fails, we can try to GC or expand the RM reservation, but we may - // need to try several times if there are concurrent consumers because we don't - // take a lock before trying to update consumption_. + // If TryConsume fails, we can try to GC, but we may need to try several times if + // there are concurrent consumers because we don't take a lock before trying to + // update consumption_. while (true) { if (LIKELY(tracker->consumption_->TryAdd(bytes, limit))) break; VLOG_RPC << "TryConsume failed, bytes=" << bytes << " consumption=" << tracker->consumption_->current_value() - << " limit=" << limit << " attempting to GC and expand reservation"; - // TODO: This may not be right if more than one tracker can actually change its - // RM reservation limit. - if (UNLIKELY(tracker->GcMemory(limit - bytes) && - !tracker->ExpandRmReservation(bytes))) { + << " limit=" << limit << " attempting to GC"; + if (UNLIKELY(tracker->GcMemory(limit - bytes))) { DCHECK_GE(i, 0); // Failed for this mem tracker. Roll back the ones that succeeded. - // TODO: this doesn't roll it back completely since the max values for - // the updated trackers aren't decremented. The max values are only used - // for error reporting so this is probably okay. Rolling those back is - // pretty hard; we'd need something like 2PC. - // - // TODO: This might leave us with an allocated resource that we can't use. - // Specifically, the RM reservation of some ancestors' trackers may have been - // expanded only to fail at the current tracker. This may be wasteful as - // subsequent TryConsume() never gets to use the reserved resources. Consider - // adjusting the reservation of the ancestors' trackers. for (int j = all_trackers_.size() - 1; j > i; --j) { all_trackers_[j]->consumption_->Add(-bytes); } return false; } - VLOG_RPC << "GC or expansion succeeded, TryConsume bytes=" << bytes + VLOG_RPC << "GC succeeded, TryConsume bytes=" << bytes << " consumption=" << tracker->consumption_->current_value() - << " new limit=" << tracker->effective_limit() << " prev=" << limit; - // Need to update the limit if the RM reservation was expanded. - limit = tracker->effective_limit(); + << " limit=" << limit; } } } @@ -363,11 +335,6 @@ class MemTracker { /// can cause us to go way over mem limits. void GcTcmalloc(); - /// Set the resource mgr to allow expansion of limits (if NULL, no expansion is possible) - void SetQueryResourceMgr(QueryResourceMgr* context) { - query_resource_mgr_ = context; - } - /// Walks the MemTracker hierarchy and populates all_trackers_ and /// limit_trackers_ void Init(); @@ -378,11 +345,6 @@ class MemTracker { static std::string LogUsage(const std::string& prefix, const std::list<MemTracker*>& trackers); - /// Try to expand the limit (by asking the resource broker for more memory) by at least - /// 'bytes'. Returns false if not possible, true if the request succeeded. May allocate - /// more memory than was requested. - bool ExpandRmReservation(int64_t bytes); - /// Size, in bytes, that is considered a large value for Release() (or Consume() with /// a negative value). If tcmalloc is used, this can trigger it to GC. /// A higher value will make us call into tcmalloc less often (and therefore more @@ -425,11 +387,6 @@ class MemTracker { /// there is no consumption limit. int64_t limit_; - /// If > -1, when RM is enabled this is the limit after which this memtracker needs to - /// acquire more memory from Llama. - /// This limit is always less than or equal to the hard limit. - int64_t rm_reserved_limit_; - std::string label_; MemTracker* parent_; @@ -476,14 +433,6 @@ class MemTracker { /// if consumption is 0. bool log_usage_if_zero_; - /// Lock is taken during ExpandRmReservation() to prevent concurrent acquisition of new - /// resources. - boost::mutex resource_acquisition_lock_; - - /// If non-NULL, contains all the information required to expand resource reservations if - /// required. - QueryResourceMgr* query_resource_mgr_; - /// The number of times the GcFunctions were called. IntCounter* num_gcs_metric_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index 1e52d08..7300f44 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -32,26 +32,21 @@ #include "exec/hdfs-scan-node.h" #include "exec/hbase-table-scanner.h" #include "exprs/expr.h" -#include "resourcebroker/resource-broker.h" #include "runtime/descriptors.h" #include "runtime/data-stream-mgr.h" #include "runtime/row-batch.h" #include "runtime/runtime-filter-bank.h" #include "runtime/mem-tracker.h" -#include "scheduling/query-resource-mgr.h" -#include "util/cgroups-mgr.h" #include "util/cpu-info.h" #include "util/debug-util.h" #include "util/container-util.h" #include "util/parse-util.h" #include "util/mem-info.h" #include "util/periodic-counter-updater.h" -#include "util/llama-util.h" #include "util/pretty-printer.h" DEFINE_bool(serialize_batch, false, "serialize and deserialize each returned row batch"); DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds"); -DECLARE_bool(enable_rm); #include "common/names.h" @@ -76,9 +71,6 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, PlanFragmentExecutor::~PlanFragmentExecutor() { Close(); - if (is_prepared_ && runtime_state_->query_resource_mgr() != NULL) { - exec_env_->resource_broker()->UnregisterQueryResourceMgr(query_id_); - } // at this point, the report thread should have been stopped DCHECK(!report_thread_active_); } @@ -100,58 +92,15 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(fragment_instance_ctx); DCHECK(request.__isset.fragment_ctx); - bool request_has_reserved_resource = - request.fragment_instance_ctx.__isset.reserved_resource; - if (request_has_reserved_resource) { - VLOG_QUERY << "Executing fragment in reserved resource:\n" - << request.fragment_instance_ctx.reserved_resource; - } - - string cgroup = ""; - if (FLAGS_enable_rm && request_has_reserved_resource) { - cgroup = exec_env_->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_")); - } // Prepare() must not return before runtime_state_ is set if is_prepared_ was // set. Having runtime_state_.get() != NULL is a postcondition of this method in that // case. Do not call RETURN_IF_ERROR or explicitly return before this line. - runtime_state_.reset(new RuntimeState(request, cgroup, exec_env_)); + runtime_state_.reset(new RuntimeState(request, exec_env_)); // total_time_counter() is in the runtime_state_ so start it up now. SCOPED_TIMER(profile()->total_time_counter()); - // Register after setting runtime_state_ to ensure proper cleanup. - if (FLAGS_enable_rm && !cgroup.empty() && request_has_reserved_resource) { - bool is_first; - RETURN_IF_ERROR(exec_env_->cgroups_mgr()->RegisterFragment( - request.fragment_instance_ctx.fragment_instance_id, cgroup, &is_first)); - // The first fragment using cgroup sets the cgroup's CPU shares based on the reserved - // resource. - if (is_first) { - DCHECK(request_has_reserved_resource); - int32_t cpu_shares = exec_env_->cgroups_mgr()->VirtualCoresToCpuShares( - request.fragment_instance_ctx.reserved_resource.v_cpu_cores); - RETURN_IF_ERROR(exec_env_->cgroups_mgr()->SetCpuShares(cgroup, cpu_shares)); - } - } - - // TODO: Find the reservation id when the resource request is not set - if (FLAGS_enable_rm && request_has_reserved_resource) { - TUniqueId reservation_id; - reservation_id << request.fragment_instance_ctx.reserved_resource.reservation_id; - - // TODO: Combine this with RegisterFragment() etc. - QueryResourceMgr* res_mgr; - bool is_first = exec_env_->resource_broker()->GetQueryResourceMgr(query_id_, - reservation_id, request.fragment_instance_ctx.local_resource_address, &res_mgr); - DCHECK(res_mgr != NULL); - runtime_state_->SetQueryResourceMgr(res_mgr); - if (is_first) { - runtime_state_->query_resource_mgr()->InitVcoreAcquisition( - request.fragment_instance_ctx.reserved_resource.v_cpu_cores); - } - } - // reservation or a query option. int64_t bytes_limit = -1; if (runtime_state_->query_options().__isset.mem_limit && @@ -161,36 +110,14 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { << PrettyPrinter::Print(bytes_limit, TUnit::BYTES); } - int64_t rm_reservation_size_bytes = -1; - if (request_has_reserved_resource && - request.fragment_instance_ctx.reserved_resource.memory_mb > 0) { - int64_t rm_reservation_size_mb = - static_cast<int64_t>(request.fragment_instance_ctx.reserved_resource.memory_mb); - rm_reservation_size_bytes = rm_reservation_size_mb * 1024L * 1024L; - // Queries that use more than the hard limit will be killed, so it's not useful to - // have a reservation larger than the hard limit. Clamp reservation bytes limit to the - // hard limit (if it exists). - if (rm_reservation_size_bytes > bytes_limit && bytes_limit != -1) { - runtime_state_->LogError(ErrorMsg(TErrorCode::FRAGMENT_EXECUTOR, - PrettyPrinter::PrintBytes(rm_reservation_size_bytes), - PrettyPrinter::PrintBytes(bytes_limit))); - rm_reservation_size_bytes = bytes_limit; - } - VLOG_QUERY << "Using RM reservation memory limit from resource reservation: " - << PrettyPrinter::Print(rm_reservation_size_bytes, TUnit::BYTES); - } - DCHECK(!fragment_instance_ctx.request_pool.empty()); - runtime_state_->InitMemTrackers(query_id_, &fragment_instance_ctx.request_pool, - bytes_limit, rm_reservation_size_bytes); + runtime_state_->InitMemTrackers( + query_id_, &fragment_instance_ctx.request_pool, bytes_limit); RETURN_IF_ERROR(runtime_state_->CreateBlockMgr()); runtime_state_->InitFilterBank(); // Reserve one main thread from the pool runtime_state_->resource_pool()->AcquireThreadToken(); - if (runtime_state_->query_resource_mgr() != NULL) { - runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1); - } has_thread_token_ = true; average_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens", @@ -266,8 +193,8 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) { obj_pool(), request.fragment_ctx.fragment.output_sink, request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx, row_desc(), &sink_)); - sink_mem_tracker_.reset(new MemTracker(-1, -1, sink_->GetName(), - runtime_state_->instance_mem_tracker(), true)); + sink_mem_tracker_.reset(new MemTracker( + -1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true)); RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get())); RuntimeProfile* sink_profile = sink_->profile(); @@ -565,9 +492,6 @@ void PlanFragmentExecutor::ReleaseThreadToken() { if (has_thread_token_) { has_thread_token_ = false; runtime_state_->resource_pool()->ReleaseThreadToken(true); - if (runtime_state_->query_resource_mgr() != NULL) { - runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1); - } PeriodicCounterUpdater::StopSamplingCounter(average_thread_tokens_); PeriodicCounterUpdater::StopTimeSeriesCounter( thread_usage_sampled_counter_); @@ -583,10 +507,6 @@ void PlanFragmentExecutor::Close() { } // Prepare may not have been called, which sets runtime_state_ if (runtime_state_.get() != NULL) { - if (runtime_state_->query_resource_mgr() != NULL) { - exec_env_->cgroups_mgr()->UnregisterFragment( - runtime_state_->fragment_instance_id(), runtime_state_->cgroup()); - } if (plan_ != NULL) plan_->Close(runtime_state_.get()); for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) { runtime_state_->io_mgr()->UnregisterContext(context); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-filter-bank.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index 8d47431..a25bf8d 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -66,8 +66,8 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s default_filter_size_ = BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_)); - filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter Bank", - state->instance_mem_tracker(), false)); + filter_mem_tracker_.reset( + new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false)); } RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc, @@ -226,4 +226,3 @@ void RuntimeFilterBank::Close() { filter_mem_tracker_->Release(memory_allocated_->value()); filter_mem_tracker_->UnregisterFromParent(); } - http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 57aae58..5249076 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -67,18 +67,15 @@ static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024; namespace impala { -RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params, - const string& cgroup, ExecEnv* exec_env) +RuntimeState::RuntimeState( + const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env) : obj_pool_(new ObjectPool()), fragment_params_(fragment_params), - now_(new TimestampValue(query_ctx().now_string.c_str(), - query_ctx().now_string.size())), - cgroup_(cgroup), + now_(new TimestampValue( + query_ctx().now_string.c_str(), query_ctx().now_string.size())), codegen_expr_(false), - profile_(obj_pool_.get(), - "Fragment " + PrintId(fragment_ctx().fragment_instance_id)), + profile_(obj_pool_.get(), "Fragment " + PrintId(fragment_ctx().fragment_instance_id)), is_cancelled_(false), - query_resource_mgr_(NULL), root_node_id_(-1) { Status status = Init(exec_env); DCHECK(status.ok()) << status.GetDetail(); @@ -92,7 +89,6 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx) codegen_expr_(false), profile_(obj_pool_.get(), "<unnamed>"), is_cancelled_(false), - query_resource_mgr_(NULL), root_node_id_(-1) { fragment_params_.__set_query_ctx(query_ctx); fragment_params_.query_ctx.request.query_options.__set_batch_size(DEFAULT_BATCH_SIZE); @@ -147,18 +143,17 @@ Status RuntimeState::Init(ExecEnv* exec_env) { return Status::OK(); } -void RuntimeState::InitMemTrackers(const TUniqueId& query_id, const string* pool_name, - int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes) { +void RuntimeState::InitMemTrackers( + const TUniqueId& query_id, const string* pool_name, int64_t query_bytes_limit) { MemTracker* query_parent_tracker = exec_env_->process_mem_tracker(); if (pool_name != NULL) { query_parent_tracker = MemTracker::GetRequestPoolMemTracker(*pool_name, query_parent_tracker); } query_mem_tracker_ = - MemTracker::GetQueryMemTracker(query_id, query_bytes_limit, - query_rm_reservation_limit_bytes, query_parent_tracker, query_resource_mgr()); - instance_mem_tracker_.reset(new MemTracker(runtime_profile(), -1, -1, - runtime_profile()->name(), query_mem_tracker_.get())); + MemTracker::GetQueryMemTracker(query_id, query_bytes_limit, query_parent_tracker); + instance_mem_tracker_.reset(new MemTracker( + runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_.get())); } void RuntimeState::InitFilterBank() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index b5f7882..0bf9db5 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -65,8 +65,7 @@ typedef std::map<std::string, std::string> FileMoveMap; /// query and shared across all execution nodes of that query. class RuntimeState { public: - RuntimeState(const TExecPlanFragmentParams& fragment_params, - const std::string& cgroup, ExecEnv* exec_env); + RuntimeState(const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env); /// RuntimeState for executing expr in fe-support. RuntimeState(const TQueryCtx& query_ctx); @@ -81,7 +80,7 @@ class RuntimeState { /// tracker (in the fifth level). If 'request_pool' is null, no request pool mem /// tracker is set up, i.e. query pools will have the process mem pool as the parent. void InitMemTrackers(const TUniqueId& query_id, const std::string* request_pool, - int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes = -1); + int64_t query_bytes_limit); /// Initializes the runtime filter bank. Must be called after InitMemTrackers(). void InitFilterBank(); @@ -124,7 +123,6 @@ class RuntimeState { const TUniqueId& fragment_instance_id() const { return fragment_ctx().fragment_instance_id; } - const std::string& cgroup() const { return cgroup_; } ExecEnv* exec_env() { return exec_env_; } DataStreamMgr* stream_mgr() { return exec_env_->stream_mgr(); } HBaseTableFactory* htable_factory() { return exec_env_->htable_factory(); } @@ -262,9 +260,6 @@ class RuntimeState { /// execution doesn't continue if the query terminates abnormally. Status CheckQueryState(); - QueryResourceMgr* query_resource_mgr() const { return query_resource_mgr_; } - void SetQueryResourceMgr(QueryResourceMgr* res_mgr) { query_resource_mgr_ = res_mgr; } - private: /// Allow TestEnv to set block_mgr manually for testing. friend class TestEnv; @@ -301,9 +296,6 @@ class RuntimeState { /// Use pointer to avoid inclusion of timestampvalue.h and avoid clang issues. boost::scoped_ptr<TimestampValue> now_; - /// The Impala-internal cgroup into which execution threads are assigned. - /// If empty, no RM is enabled. - std::string cgroup_; ExecEnv* exec_env_; boost::scoped_ptr<LlvmCodeGen> codegen_; @@ -351,10 +343,6 @@ class RuntimeState { SpinLock query_status_lock_; Status query_status_; - /// Query-wide resource manager for resource expansion etc. Not owned by us; owned by - /// the ResourceBroker instead. - QueryResourceMgr* query_resource_mgr_; - /// Reader contexts that need to be closed when the fragment is closed. std::vector<DiskIoRequestContext*> reader_contexts_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index a5818af..8690e39 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -70,7 +70,7 @@ RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id) { TExecPlanFragmentParams plan_params = TExecPlanFragmentParams(); plan_params.query_ctx.query_id.hi = 0; plan_params.query_ctx.query_id.lo = query_id; - return new RuntimeState(plan_params, "", exec_env_.get()); + return new RuntimeState(plan_params, exec_env_.get()); } Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt index c5b4eb4..9cfb672 100644 --- a/be/src/scheduling/CMakeLists.txt +++ b/be/src/scheduling/CMakeLists.txt @@ -26,7 +26,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/scheduling") add_library(Scheduling STATIC admission-controller.cc backend-config.cc - query-resource-mgr.cc query-schedule.cc request-pool-service.cc simple-scheduler.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-resource-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc deleted file mode 100644 index abfe085..0000000 --- a/be/src/scheduling/query-resource-mgr.cc +++ /dev/null @@ -1,271 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "scheduling/query-resource-mgr.h" - -#include <boost/uuid/uuid.hpp> -#include <boost/uuid/uuid_generators.hpp> -#include <gutil/strings/substitute.h> -#include <sstream> - -#include "runtime/exec-env.h" -#include "resourcebroker/resource-broker.h" -#include "util/bit-util.h" -#include "util/cgroups-mgr.h" -#include "util/container-util.h" -#include "util/network-util.h" -#include "util/promise.h" -#include "util/time.h" - -#include "common/names.h" - -using boost::uuids::random_generator; -using boost::uuids::uuid; -using namespace impala; -using namespace strings; - -DEFINE_int64(rm_mem_expansion_timeout_ms, 5000, "The amount of time to wait (ms) " - "for a memory expansion request."); -DEFINE_double(max_vcore_oversubscription_ratio, 2.5, "(Advanced) The maximum ratio " - "allowed between running threads and acquired VCore resources for a query's fragments" - " on a single node"); - -ResourceResolver::ResourceResolver(const unordered_set<TNetworkAddress>& unique_hosts) { - if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) { - CreateLocalLlamaNodeMapping(unique_hosts); - } -} - -void ResourceResolver::GetResourceHostport(const TNetworkAddress& src, - TNetworkAddress* dest) { - if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) { - *dest = impalad_to_dn_[src]; - } else { - dest->hostname = src.hostname; - dest->port = 0; - } -} - -void ResourceResolver::CreateLocalLlamaNodeMapping( - const unordered_set<TNetworkAddress>& unique_hosts) { - DCHECK(ExecEnv::GetInstance()->is_pseudo_distributed_llama()); - const vector<string>& llama_nodes = - ExecEnv::GetInstance()->resource_broker()->llama_nodes(); - DCHECK(!llama_nodes.empty()); - int llama_node_ix = 0; - for (const TNetworkAddress& host: unique_hosts) { - TNetworkAddress dn_hostport = MakeNetworkAddress(llama_nodes[llama_node_ix]); - impalad_to_dn_[host] = dn_hostport; - dn_to_impalad_[dn_hostport] = host; - LOG(INFO) << "Mapping Datanode " << dn_hostport << " to Impalad: " << host; - // Round robin the registered Llama nodes. - llama_node_ix = (llama_node_ix + 1) % llama_nodes.size(); - } -} - -QueryResourceMgr::QueryResourceMgr(const TUniqueId& reservation_id, - const TNetworkAddress& local_resource_location, const TUniqueId& query_id) - : reservation_id_(reservation_id), query_id_(query_id), - local_resource_location_(local_resource_location), exit_(false), callback_count_(0), - threads_running_(0), vcores_(0) { - max_vcore_oversubscription_ratio_ = FLAGS_max_vcore_oversubscription_ratio; -} - -void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) { - LOG(INFO) << "Initialising vcore acquisition thread for query " << PrintId(query_id_) - << " (" << init_vcores << " initial vcores)"; - DCHECK(acquire_vcore_thread_.get() == NULL) - << "Double initialisation of QueryResourceMgr::InitCpuAcquisition()"; - vcores_ = init_vcores; - - // These shared pointers to atomic values are used to communicate between the vcore - // acquisition thread and the class destructor. If the acquisition thread is in the - // middle of an Expand() call, the destructor might have to wait 5s (the default - // timeout) to return. This holds up query close operations. So instead check to see if - // the thread is in Expand(), and if so we set a synchronised flag early_exit_ which it - // inspects immediately after exiting Expand(), and if true, exits before touching any - // of the class-wide state (because the destructor may have finished before this point). - - thread_in_expand_.reset(new AtomicInt32()); - early_exit_.reset(new AtomicInt32()); - acquire_vcore_thread_.reset( - new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)), - bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this, - thread_in_expand_, early_exit_))); -} - -llama::TResource QueryResourceMgr::CreateResource(int64_t memory_mb, int64_t vcores) { - DCHECK(memory_mb > 0 || vcores > 0); - DCHECK(reservation_id_ != TUniqueId()) << "Expansion requires existing reservation"; - - unordered_set<TNetworkAddress> hosts; - hosts.insert(local_resource_location_); - ResourceResolver resolver(hosts); - llama::TResource res; - res.memory_mb = memory_mb; - res.v_cpu_cores = vcores; - TNetworkAddress res_address; - resolver.GetResourceHostport(local_resource_location_, &res_address); - res.__set_askedLocation(TNetworkAddressToString(res_address)); - - random_generator uuid_generator; - uuid id = uuid_generator(); - res.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]); - res.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]); - res.enforcement = llama::TLocationEnforcement::MUST; - return res; -} - -bool QueryResourceMgr::AboveVcoreSubscriptionThreshold() { - return threads_running_ > vcores_ * (max_vcore_oversubscription_ratio_ * 0.8); -} - -void QueryResourceMgr::NotifyThreadUsageChange(int delta) { - lock_guard<mutex> l(threads_running_lock_); - threads_running_ += delta; - DCHECK(threads_running_ >= 0L); - if (AboveVcoreSubscriptionThreshold()) threads_changed_cv_.notify_all(); -} - -int32_t QueryResourceMgr::AddVcoreAvailableCb(const VcoreAvailableCb& callback) { - lock_guard<mutex> l(callbacks_lock_); - callbacks_[callback_count_] = callback; - callbacks_it_ = callbacks_.begin(); - return callback_count_++; -} - -void QueryResourceMgr::RemoveVcoreAvailableCb(int32_t callback_id) { - lock_guard<mutex> l(callbacks_lock_); - CallbackMap::iterator it = callbacks_.find(callback_id); - DCHECK(it != callbacks_.end()) << "Could not find callback with id: " << callback_id; - callbacks_.erase(it); - callbacks_it_ = callbacks_.begin(); -} - -Status QueryResourceMgr::RequestMemExpansion(int64_t requested_bytes, - int64_t* allocated_bytes) { - DCHECK(allocated_bytes != NULL); - *allocated_bytes = 0; - int64_t requested_mb = BitUtil::Ceil(requested_bytes, 1024L * 1024L); - llama::TResource res = CreateResource(max<int64_t>(1, requested_mb), 0); - llama::TUniqueId expansion_id; - llama::TAllocatedResource resource; - RETURN_IF_ERROR(ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id_, - res, FLAGS_rm_mem_expansion_timeout_ms, &expansion_id, &resource)); - - DCHECK_EQ(resource.v_cpu_cores, 0L) << "Unexpected VCPUs returned by Llama"; - *allocated_bytes = resource.memory_mb * 1024L * 1024L; - return Status::OK(); -} - -void QueryResourceMgr::AcquireVcoreResources( - shared_ptr<AtomicInt32> thread_in_expand, - shared_ptr<AtomicInt32> early_exit) { - // Take a copy because we'd like to print it in some cases after the destructor. - TUniqueId reservation_id = reservation_id_; - VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id; - while (!ShouldExit()) { - { - unique_lock<mutex> l(threads_running_lock_); - while (!AboveVcoreSubscriptionThreshold() && !ShouldExit()) { - threads_changed_cv_.wait(l); - } - } - if (ShouldExit()) break; - - llama::TResource res = CreateResource(0L, 1); - VLOG_QUERY << "Expanding VCore allocation: " << reservation_id_; - - // First signal that we are about to enter a blocking Expand() call. - thread_in_expand->Add(1L); - - // TODO: Could cause problems if called during or after a system-wide shutdown - llama::TAllocatedResource resource; - llama::TUniqueId expansion_id; - Status status = ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id, - res, -1, &expansion_id, &resource); - thread_in_expand->Add(-1L); - // If signalled to exit quickly by the destructor, exit the loop now. It's important - // to do so without accessing any class variables since they may no longer be valid. - // Need to check after setting thread_in_expand to avoid a race. - if (early_exit->Add(0L) != 0) { - VLOG_QUERY << "Fragment finished during Expand(): " << reservation_id; - break; - } - if (!status.ok()) { - VLOG_QUERY << "Could not expand CPU resources for query " << PrintId(query_id_) - << ", reservation: " << PrintId(reservation_id_) << ". Error was: " - << status.GetDetail(); - // Sleep to avoid flooding the resource broker, particularly if requests are being - // rejected quickly (and therefore we stay oversubscribed) - // TODO: configurable timeout - SleepForMs(250); - continue; - } - - DCHECK(resource.v_cpu_cores == 1) - << "Asked for 1 core, got: " << resource.v_cpu_cores; - vcores_ += resource.v_cpu_cores; - - ExecEnv* exec_env = ExecEnv::GetInstance(); - const string& cgroup = - exec_env->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_")); - int32_t num_shares = exec_env->cgroups_mgr()->VirtualCoresToCpuShares(vcores_); - exec_env->cgroups_mgr()->SetCpuShares(cgroup, num_shares); - - // TODO: Only call one callback no matter how many VCores we just added; maybe call - // all of them? - { - lock_guard<mutex> l(callbacks_lock_); - if (callbacks_.size() != 0) { - callbacks_it_->second(); - if (++callbacks_it_ == callbacks_.end()) callbacks_it_ = callbacks_.begin(); - } - } - } - VLOG_QUERY << "Leaving VCore acquisition thread: " << reservation_id; -} - -bool QueryResourceMgr::ShouldExit() { - lock_guard<mutex> l(exit_lock_); - return exit_; -} - -void QueryResourceMgr::Shutdown() { - { - lock_guard<mutex> l(exit_lock_); - if (exit_) return; - exit_ = true; - } - { - lock_guard<mutex> l(callbacks_lock_); - callbacks_.clear(); - } - threads_changed_cv_.notify_all(); -} - -QueryResourceMgr::~QueryResourceMgr() { - if (acquire_vcore_thread_.get() == NULL) return; - if (!ShouldExit()) Shutdown(); - // First, set the early exit flag. Then check to see if the thread is in Expand(). If - // so, the acquisition thread is guaranteed to see early_exit_ == 1L once it finishes - // Expand(), and will exit immediately. It's therefore safe not to wait for it. - early_exit_->Add(1L); - if (thread_in_expand_->Add(0L) == 0L) { - acquire_vcore_thread_->Join(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-resource-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-resource-mgr.h b/be/src/scheduling/query-resource-mgr.h deleted file mode 100644 index 10da312..0000000 --- a/be/src/scheduling/query-resource-mgr.h +++ /dev/null @@ -1,227 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef SCHEDULING_QUERY_RESOURCE_MGR_H -#define SCHEDULING_QUERY_RESOURCE_MGR_H - -#include "common/atomic.h" -#include "common/status.h" -#include "gen-cpp/Types_types.h" -#include "gen-cpp/ResourceBrokerService.h" -#include "gen-cpp/ImpalaInternalService.h" -#include "gen-cpp/Frontend_types.h" -#include "util/promise.h" -#include "util/thread.h" - -#include <boost/function.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/unordered_map.hpp> -#include <boost/unordered_set.hpp> -#include <string> - -namespace impala { - -class ResourceBroker; - -/// Utility class to map hosts to the Llama-registered resource-holding hosts -/// (i.e. datanodes). -class ResourceResolver { - public: - ResourceResolver(const boost::unordered_set<TNetworkAddress>& unique_hosts); - - /// Translates src into a network address suitable for identifying resources across - /// interactions with the Llama. The MiniLlama expects resources to be requested on - /// IP:port addresses of Hadoop DNs, whereas the regular Llama only deals with the - /// hostnames of Yarn NMs. For MiniLlama setups this translation uses the - /// impalad_to_dn_ mapping to populate dest. When using the regular Llama, this - /// translation sets a fixed port of 0 in dest because the Llama strips away the port - /// of resource locations. - void GetResourceHostport(const TNetworkAddress& src, TNetworkAddress* dst); - - private: - /// Impala mini clusters using the Mini Llama require translating the impalad hostports - /// to Hadoop DN hostports registered with the Llama during resource requests - /// (and then in reverse for translating granted resources to impalads). - /// These maps form a bi-directional hostport mapping Hadoop DN <-> impalad. - boost::unordered_map<TNetworkAddress, TNetworkAddress> impalad_to_dn_; - boost::unordered_map<TNetworkAddress, TNetworkAddress> dn_to_impalad_; - - /// Called only in pseudo-distributed setups (i.e. testing only) to populate - /// impalad_to_dn_ and dn_to_impalad_ - void CreateLocalLlamaNodeMapping( - const boost::unordered_set<TNetworkAddress>& unique_hosts); -}; - -/// Tracks all the state necessary to create expansion requests for all fragments of a -/// single query on a single node. Code that might need to expand the memory reservation -/// for this query (i.e. MemTracker) can use this class to construct expansion requests -/// that may then be submitted to the ResourceBroker. -// -/// If InitCpuAcquisition() is called, this class will monitor the thread token to VCore -/// ratio (thread consumers must use NotifyThreadUsageChange() to update the thread -/// consumption count). If the ratio gets too high (see AboveVcoreSubscriptionThreshold() -/// for details), we will try to acquire more VCore resources from Llama asynchronously. -/// If the ratio passes a higher threshold (see IsVcoreOverSubscribed()), we say that the -/// query fragments are currently oversubscribing their VCore resources. -// -/// Threads are typically handed to a fragment by the thread resource manager, which deals -/// in tokens. When a fragment wants to use a token to start a thread, it should only do so -/// if the ratio of threads to VCores (which map directly onto cgroup shares) is not too -/// large. If it is too large - i.e. the VCores are oversubscribed - the fragment should -/// wait to spin up a new threads until more VCore resources are acquired as above. To help -/// with this, each fragment may register one or more callbacks with their -/// QueryResourceMgr; when more VCore resources are acquired the callbacks are invoked in -/// round-robin fashion. The callback should try and re-acquire the previously untaken -/// thread token, and then a new thread may be started. -// -/// Only CPU-heavy threads need be managed using this class. -// -/// TODO: Handle reducing the number of VCores when threads finish. -/// TODO: Consider combining more closely with ThreadResourceMgr. -/// TODO: Add counters to RuntimeProfile to track resources. -class QueryResourceMgr { - public: - QueryResourceMgr(const TUniqueId& reservation_id, - const TNetworkAddress& local_resource_location, const TUniqueId& query_id); - - /// Must be called only once. Starts a separate thread to monitor thread consumption, - /// which asks for more VCores from Llama periodically. - void InitVcoreAcquisition(int32_t init_vcores); - - /// Should be used to check if another thread token may be acquired by this - /// query. Fragments may ignore this when acquiring a new CPU token, but the result will - /// be a larger thread:VCore ratio. - // - /// Note that this threshold is larger than the one in - /// AboveVcoreSubscriptionThreshold(). We want to start acquiring more VCore allocations - /// before we get so oversubscribed that adding new threads is considered a bad idea. - inline bool IsVcoreOverSubscribed() { - boost::lock_guard<boost::mutex> l(threads_running_lock_); - return threads_running_ > vcores_ * max_vcore_oversubscription_ratio_; - } - - /// Called when thread consumption goes up or down. If the total consumption goes above a - /// subscription threshold, the acquisition thread will be woken to ask for more VCores. - void NotifyThreadUsageChange(int delta); - - /// All callbacks registered here are called in round-robin fashion when more VCores are - /// acquired. Returns a unique ID that can be used as an argument to - /// RemoveVcoreAvailableCb(). - typedef boost::function<void ()> VcoreAvailableCb; - int32_t AddVcoreAvailableCb(const VcoreAvailableCb& callback); - - /// Removes the callback with the given ID. - void RemoveVcoreAvailableCb(int32_t callback_id); - - /// Request an expansion of requested_bytes. If the expansion can be fulfilled within - /// the timeout period, the number of bytes allocated is returned in allocated_bytes - /// (which may be more than requested). Otherwise an error status is returned. - Status RequestMemExpansion(int64_t requested_bytes, int64_t* allocated_bytes); - - /// Sets the exit flag for the VCore acquisiton thread, but does not block. Also clears - /// the set of callbacks, so that after Shutdown() has returned, no callback will be - /// invoked. - void Shutdown(); - - /// Waits for the VCore acquisition thread to stop. - ~QueryResourceMgr(); - - const TUniqueId& reservation_id() const { return reservation_id_; } - - private: - /// ID of the single reservation corresponding to this query - TUniqueId reservation_id_; - - /// Query ID of the query this class manages resources for. - TUniqueId query_id_; - - /// Network address of the local service registered with Llama. Usually corresponds to - /// <local-address>:0, unless a pseudo-dstributed Llama is being used (see - /// ResourceResolver::CreateLocalLlamaNodeMapping()). - TNetworkAddress local_resource_location_; - - /// Used to control shutdown of AcquireCpuResources(). - boost::mutex exit_lock_; - bool exit_; - - /// Protects callbacks_ and callbacks_it_ - boost::mutex callbacks_lock_; - - /// List of callbacks to notify when a new VCore resource is available. - typedef boost::unordered_map<int32_t, VcoreAvailableCb> CallbackMap; - CallbackMap callbacks_; - - /// Round-robin iterator to notify callbacks about new VCores one at a time. - CallbackMap::iterator callbacks_it_; - - /// Total number of callbacks that were ever registered. Used to give each callback a - /// unique ID so that they can be removed. - int32_t callback_count_; - - /// Protects threads_running_, threads_changed_cv_ and vcores_. - boost::mutex threads_running_lock_; - - /// Waited on by AcquireCpuResources(), and notified by NotifyThreadUsageChange(). - boost::condition_variable threads_changed_cv_; - - /// The number of threads we know to be running on behalf of this query. - int64_t threads_running_; - - /// The number of VCores acquired for this node for this query. - int64_t vcores_; - - /// Set to FLAGS_max_vcore_oversubscription_ratio in the constructor. If the ratio of - /// threads to VCores exceeds this number, no more threads may be executed by this query - /// until more VCore resources are acquired. - float max_vcore_oversubscription_ratio_; - - /// Runs AcquireVcoreResources() after InitVcoreAcquisition() is called. - boost::scoped_ptr<Thread> acquire_vcore_thread_; - - /// Signals to the vcore acquisition thread that it should exit after it exits from any - /// pending Expand() call. Is a shared_ptr so that it will remain valid even after the - /// parent QueryResourceMgr has been destroyed. - /// TODO: Combine with ShouldExit(), and replace with AtomicBool when we have such a - /// thing. - std::shared_ptr<AtomicInt32> early_exit_; - - /// Signals to the destructor that the vcore acquisition thread is currently in an - /// Expand() RPC. If so, the destructor does not need to wait for the acquisition thread - /// to exit. - std::shared_ptr<AtomicInt32> thread_in_expand_; - - /// Creates the llama resource for the memory and/or cores specified, associated with - /// the reservation context. - llama::TResource CreateResource(int64_t memory_mb, int64_t vcores); - - /// Run as a thread owned by acquire_cpu_thread_. Waits for notification from - /// NotifyThreadUsageChange(), then checks the subscription level to decide if more - /// VCores are needed, and starts a new expansion request if so. - void AcquireVcoreResources(std::shared_ptr<AtomicInt32 > thread_in_expand, - std::shared_ptr<AtomicInt32> early_exit); - - /// True if thread:VCore subscription is too high, meaning more VCores are required. - /// Must be called holding threads_running_ lock. - bool AboveVcoreSubscriptionThreshold(); - - /// Notifies acquire_cpu_thread_ that it should terminate. Does not block. - bool ShouldExit(); -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-schedule.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc index f893f1c..b5745c4 100644 --- a/be/src/scheduling/query-schedule.cc +++ b/be/src/scheduling/query-schedule.cc @@ -28,7 +28,6 @@ #include "util/uid-util.h" #include "util/debug-util.h" #include "util/parse-util.h" -#include "util/llama-util.h" #include "common/names.h" @@ -36,28 +35,13 @@ using boost::uuids::random_generator; using boost::uuids::uuid; using namespace impala; -DEFINE_bool(rm_always_use_defaults, false, "If true, all queries use the same initial" - " resource requests regardless of their computed resource estimates. Only meaningful " - "if --enable_rm is set."); -DEFINE_string(rm_default_memory, "4G", "The initial amount of memory that" - " a query should reserve on each node if either it does not have an available " - "estimate, or if --rm_always_use_defaults is set."); -DEFINE_int32(rm_default_cpu_vcores, 2, "The initial number of virtual cores that" - " a query should reserve on each node if either it does not have an available " - "estimate, or if --rm_always_use_defaults is set."); - +// TODO: Remove for Impala 3.0. +DEFINE_bool(rm_always_use_defaults, false, "Deprecated"); +DEFINE_string(rm_default_memory, "4G", "Deprecated"); +DEFINE_int32(rm_default_cpu_vcores, 2, "Deprecated"); namespace impala { -// Default value for the request_timeout in a reservation request. The timeout is the -// max time in milliseconds to wait for a resource request to be fulfilled by Llama. -// The default value of five minutes was determined to be reasonable based on -// experiments on a 20-node cluster with TPCDS 15TB and 8 concurrent clients. -// Over 30% of queries timed out with a reservation timeout of 1 minute but only less -// than 5% timed out when using 5 minutes. Still, the default value is somewhat -// arbitrary and a good value is workload dependent. -const int64_t DEFAULT_REQUEST_TIMEOUT_MS = 5 * 60 * 1000; - QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request, const TQueryOptions& query_options, RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events) @@ -97,10 +81,9 @@ int64_t QuerySchedule::GetClusterMemoryEstimate() const { int64_t QuerySchedule::GetPerHostMemoryEstimate() const { // Precedence of different estimate sources is: // user-supplied RM query option > - // server-side defaults (if rm_always_use_defaults == true) > // query option limit > // estimate > - // server-side defaults (if rm_always_use_defaults == false) + // server-side defaults int64_t query_option_memory_limit = numeric_limits<int64_t>::max(); bool has_query_option = false; if (query_options_.__isset.mem_limit && query_options_.mem_limit > 0) { @@ -116,12 +99,10 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const { } int64_t per_host_mem = 0L; + // TODO: Remove rm_initial_mem and associated logic when we're sure that clients won't + // be affected. if (query_options_.__isset.rm_initial_mem && query_options_.rm_initial_mem > 0) { per_host_mem = query_options_.rm_initial_mem; - } else if (FLAGS_rm_always_use_defaults) { - bool ignored; - per_host_mem = ParseUtil::ParseMemSpec(FLAGS_rm_default_memory, - &ignored, 0); } else if (has_query_option) { per_host_mem = query_option_memory_limit; } else if (has_estimate) { @@ -134,115 +115,11 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const { } // Cap the memory estimate at the amount of physical memory available. The user's // provided value or the estimate from planning can each be unreasonable. - // TODO: Get this limit from Llama (Yarn sets it). return min(per_host_mem, MemInfo::physical_mem()); } -int16_t QuerySchedule::GetPerHostVCores() const { - // Precedence of different estimate sources is: - // server-side defaults (if rm_always_use_defaults == true) > - // computed estimates - // server-side defaults (if rm_always_use_defaults == false) - int16_t v_cpu_cores = FLAGS_rm_default_cpu_vcores; - if (!FLAGS_rm_always_use_defaults && query_options_.__isset.v_cpu_cores && - query_options_.v_cpu_cores > 0) { - v_cpu_cores = query_options_.v_cpu_cores; - } - - return v_cpu_cores; -} - -void QuerySchedule::GetResourceHostport(const TNetworkAddress& src, - TNetworkAddress* dst) { - DCHECK(dst != NULL); - DCHECK(resource_resolver_.get() != NULL) - << "resource_resolver_ is NULL, didn't call SetUniqueHosts()?"; - resource_resolver_->GetResourceHostport(src, dst); -} - void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_hosts) { unique_hosts_ = unique_hosts; - resource_resolver_.reset(new ResourceResolver(unique_hosts_)); -} - -void QuerySchedule::PrepareReservationRequest(const string& pool, const string& user) { - reservation_request_.resources.clear(); - reservation_request_.version = TResourceBrokerServiceVersion::V1; - reservation_request_.queue = pool; - reservation_request_.gang = true; - // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because - // Llama checks group membership based on the short name of the principal. - reservation_request_.user = llama::GetShortName(user); - - // Set optional request timeout from query options. - if (query_options_.__isset.reservation_request_timeout) { - DCHECK_GT(query_options_.reservation_request_timeout, 0); - reservation_request_.__set_request_timeout( - query_options_.reservation_request_timeout); - } - - // Set the reservation timeout from the query options or use a default. - int64_t timeout = DEFAULT_REQUEST_TIMEOUT_MS; - if (query_options_.__isset.reservation_request_timeout) { - timeout = query_options_.reservation_request_timeout; - } - reservation_request_.__set_request_timeout(timeout); - - int32_t memory_mb = GetPerHostMemoryEstimate() / 1024 / 1024; - int32_t v_cpu_cores = GetPerHostVCores(); - // The memory_mb and v_cpu_cores estimates may legitimately be zero, - // e.g., for constant selects. Do not reserve any resources in those cases. - if (memory_mb == 0 && v_cpu_cores == 0) return; - - DCHECK(resource_resolver_.get() != NULL) - << "resource_resolver_ is NULL, didn't call SetUniqueHosts()?"; - random_generator uuid_generator; - for (const TNetworkAddress& host: unique_hosts_) { - reservation_request_.resources.push_back(llama::TResource()); - llama::TResource& resource = reservation_request_.resources.back(); - uuid id = uuid_generator(); - resource.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]); - resource.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]); - resource.enforcement = llama::TLocationEnforcement::MUST; - - TNetworkAddress resource_hostport; - resource_resolver_->GetResourceHostport(host, &resource_hostport); - stringstream ss; - ss << resource_hostport; - resource.askedLocation = ss.str(); - resource.memory_mb = memory_mb; - resource.v_cpu_cores = v_cpu_cores; - } -} - -Status QuerySchedule::ValidateReservation() { - if (!HasReservation()) return Status("Query schedule does not have a reservation."); - vector<TNetworkAddress> hosts_missing_resources; - ResourceResolver resolver(unique_hosts_); - for (const FragmentExecParams& params: fragment_exec_params_) { - for (const TNetworkAddress& host: params.hosts) { - // Ignore the coordinator host which is not contained in unique_hosts_. - if (unique_hosts_.find(host) == unique_hosts_.end()) continue; - TNetworkAddress resource_hostport; - resolver.GetResourceHostport(host, &resource_hostport); - if (reservation_.allocated_resources.find(resource_hostport) == - reservation_.allocated_resources.end()) { - hosts_missing_resources.push_back(host); - } - } - } - if (!hosts_missing_resources.empty()) { - stringstream ss; - ss << "Failed to validate reservation " << reservation_.reservation_id << "." << endl - << "Missing resources for hosts ["; - for (int i = 0; i < hosts_missing_resources.size(); ++i) { - ss << hosts_missing_resources[i]; - if (i + 1 != hosts_missing_resources.size()) ss << ", "; - } - ss << "]"; - return Status(ss.str()); - } - return Status::OK(); } }
