IMPALA-4160: Remove Llama support. Alas, poor Llama! I knew him, Impala: a system of infinite jest, of most excellent fancy: we hath borne him on our back a thousand times; and now, how abhorred in my imagination it is!
Done: * Removed QueryResourceMgr, ResourceBroker, CGroupsMgr * Removed untested 'offline' mode and NM failure detection from ImpalaServer * Removed all Llama-related Thrift files * Removed RM-related arguments to MemTracker constructors * Deprecated all RM-related flags, printing a warning if enable_rm is set * Removed expansion logic from MemTracker * Removed VCore logic from QuerySchedule * Removed all reservation-related logic from Scheduler * Removed RM metric descriptions * Various misc. small class changes Not done: * Remove RM flags (--enable_rm etc.) * Remove RM query options * Changes to RequestPoolService (see IMPALA-4159) * Remove estimates of VCores / memory from plan Change-Id: Icfb14209e31f6608bb7b8a33789e00411a6447ef Reviewed-on: http://gerrit.cloudera.org:8080/4445 Tested-by: Internal Jenkins Reviewed-by: Henry Robinson <he...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/19de09ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/19de09ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/19de09ab Branch: refs/heads/master Commit: 19de09ab7db4498fa3dd6e0775e32581139dd336 Parents: 3be61f9 Author: Henry Robinson <he...@cloudera.com> Authored: Thu Sep 15 18:09:46 2016 -0700 Committer: Henry Robinson <he...@cloudera.com> Committed: Tue Sep 20 23:50:43 2016 +0000 ---------------------------------------------------------------------- be/CMakeLists.txt | 4 - be/generated-sources/gen-cpp/CMakeLists.txt | 8 - be/src/bufferpool/reservation-tracker-test.cc | 6 +- be/src/exec/blocking-join-node.cc | 8 - be/src/exec/data-sink.cc | 2 +- be/src/exec/exec-node.cc | 5 +- be/src/exec/hash-join-node.cc | 1 - be/src/exec/hdfs-scan-node-base.cc | 1 - be/src/exec/hdfs-scan-node.cc | 37 - be/src/exec/kudu-scan-node-test.cc | 2 +- be/src/exec/kudu-scan-node.cc | 5 - be/src/exec/kudu-table-sink-test.cc | 3 +- be/src/exprs/expr-test.cc | 4 +- be/src/resourcebroker/CMakeLists.txt | 31 - be/src/resourcebroker/resource-broker.cc | 850 ------------------- be/src/resourcebroker/resource-broker.h | 424 --------- be/src/runtime/buffered-block-mgr-test.cc | 4 +- be/src/runtime/buffered-block-mgr.cc | 4 +- be/src/runtime/collection-value-builder-test.cc | 2 +- be/src/runtime/coordinator.cc | 23 +- be/src/runtime/data-stream-recvr.cc | 2 +- be/src/runtime/data-stream-test.cc | 8 +- be/src/runtime/disk-io-mgr-test.cc | 4 +- be/src/runtime/disk-io-mgr.cc | 4 +- be/src/runtime/exec-env.cc | 196 +---- be/src/runtime/exec-env.h | 19 - be/src/runtime/mem-pool-test.cc | 4 +- be/src/runtime/mem-tracker-test.cc | 6 +- be/src/runtime/mem-tracker.cc | 77 +- be/src/runtime/mem-tracker.h | 81 +- be/src/runtime/plan-fragment-executor.cc | 90 +- be/src/runtime/runtime-filter-bank.cc | 5 +- be/src/runtime/runtime-state.cc | 25 +- be/src/runtime/runtime-state.h | 16 +- be/src/runtime/test-env.cc | 2 +- be/src/scheduling/CMakeLists.txt | 1 - be/src/scheduling/query-resource-mgr.cc | 271 ------ be/src/scheduling/query-resource-mgr.h | 227 ----- be/src/scheduling/query-schedule.cc | 137 +-- be/src/scheduling/query-schedule.h | 38 +- be/src/scheduling/request-pool-service.cc | 9 +- be/src/scheduling/scheduler.h | 16 - be/src/scheduling/simple-scheduler-test.cc | 4 +- be/src/scheduling/simple-scheduler.cc | 171 +--- be/src/scheduling/simple-scheduler.h | 41 +- be/src/service/impala-server.cc | 92 +- be/src/service/impala-server.h | 24 - be/src/service/impalad-main.cc | 8 + be/src/service/query-exec-state.cc | 20 - be/src/util/CMakeLists.txt | 2 - be/src/util/cgroups-mgr.cc | 238 ------ be/src/util/cgroups-mgr.h | 175 ---- be/src/util/debug-util.h | 1 - be/src/util/llama-util.cc | 152 ---- be/src/util/llama-util.h | 75 -- be/src/util/thread-pool.h | 4 - be/src/util/thread.cc | 14 - be/src/util/thread.h | 21 - be/src/util/uid-util.h | 5 +- bin/bootstrap_toolchain.py | 2 +- bin/create-test-configuration.sh | 2 +- bin/generate_minidump_collection_testdata.py | 1 - bin/start-impala-cluster.py | 23 +- common/thrift/CMakeLists.txt | 2 - common/thrift/Frontend.thrift | 1 + common/thrift/ImpalaInternalService.thrift | 7 - common/thrift/Llama.thrift | 276 ------ common/thrift/ResourceBrokerService.thrift | 119 --- common/thrift/metrics.json | 210 ----- .../com/cloudera/impala/planner/Planner.java | 1 + testdata/cluster/admin | 12 +- .../cdh5/etc/hadoop/conf/yarn-site.xml.tmpl | 13 +- .../cdh5/etc/init.d/llama-application | 38 - .../etc/llama/conf/llama-log4j.properties.tmpl | 29 - .../cdh5/etc/llama/conf/llama-site.xml.tmpl | 86 -- .../common/etc/hadoop/conf/core-site.xml.tmpl | 10 - 76 files changed, 165 insertions(+), 4376 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 6df394f..2546bd0 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -167,7 +167,6 @@ if (DOXYGEN_FOUND) ${CMAKE_SOURCE_DIR}/be/src/common/ ${CMAKE_SOURCE_DIR}/be/src/exec/ ${CMAKE_SOURCE_DIR}/be/src/exprs/ - ${CMAKE_SOURCE_DIR}/be/src/resourcebroker/ ${CMAKE_SOURCE_DIR}/be/src/runtime/ ${CMAKE_SOURCE_DIR}/be/src/scheduling/ ${CMAKE_SOURCE_DIR}/be/src/service/ @@ -267,7 +266,6 @@ set (IMPALA_LINK_LIBS Exprs GlobalFlags ImpalaThrift - ResourceBroker Rpc Runtime Scheduling @@ -295,7 +293,6 @@ if (BUILD_SHARED_LIBS) Statestore Scheduling Catalog - ResourceBroker ImpalaThrift GlobalFlags Common @@ -423,7 +420,6 @@ add_subdirectory(src/catalog) add_subdirectory(src/codegen) add_subdirectory(src/common) add_subdirectory(src/exec) -add_subdirectory(src/resourcebroker) add_subdirectory(src/exprs) add_subdirectory(src/runtime) add_subdirectory(src/scheduling) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/generated-sources/gen-cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/generated-sources/gen-cpp/CMakeLists.txt b/be/generated-sources/gen-cpp/CMakeLists.txt index ac2907e..35f7d65 100644 --- a/be/generated-sources/gen-cpp/CMakeLists.txt +++ b/be/generated-sources/gen-cpp/CMakeLists.txt @@ -37,10 +37,6 @@ set(SRC_FILES ImpalaService_constants.cpp ImpalaService_types.cpp ImpalaHiveServer2Service.cpp - Llama_constants.cpp - Llama_types.cpp - LlamaAMService.cpp - LlamaNotificationService.cpp beeswax_constants.cpp beeswax_types.cpp BeeswaxService.cpp @@ -79,10 +75,6 @@ set(SRC_FILES NetworkTestService.cpp PlanNodes_constants.cpp PlanNodes_types.cpp - ResourceBrokerNotificationService.cpp - ResourceBrokerService_constants.cpp - ResourceBrokerService_types.cpp - ResourceBrokerService.cpp Results_constants.cpp Results_types.cpp Partitions_constants.cpp http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/bufferpool/reservation-tracker-test.cc b/be/src/bufferpool/reservation-tracker-test.cc index e43efb8..8dd1e41 100644 --- a/be/src/bufferpool/reservation-tracker-test.cc +++ b/be/src/bufferpool/reservation-tracker-test.cc @@ -235,8 +235,8 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) { // of different code paths. root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100); MemTracker root_mem_tracker; - MemTracker child_mem_tracker1(-1, -1, "Child 1", &root_mem_tracker); - MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, -1, "Child 2", &root_mem_tracker); + MemTracker child_mem_tracker1(-1, "Child 1", &root_mem_tracker); + MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, "Child 2", &root_mem_tracker); ReservationTracker child_reservations1, child_reservations2; child_reservations1.InitChildTracker( NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN); @@ -317,7 +317,7 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) { reservations[0].InitRootTracker(NewProfile(), 500); for (int i = 1; i < HIERARCHY_DEPTH; ++i) { mem_trackers[i].reset(new MemTracker( - mem_limits[i], -1, Substitute("Child $0", i), mem_trackers[i - 1].get())); + mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get())); reservations[i].InitChildTracker( NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/blocking-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc index 2c17d13..309bde4 100644 --- a/be/src/exec/blocking-join-node.cc +++ b/be/src/exec/blocking-join-node.cc @@ -25,7 +25,6 @@ #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/tuple-row.h" -#include "util/cgroups-mgr.h" #include "util/debug-util.h" #include "util/runtime-profile-counters.h" #include "util/time.h" @@ -196,13 +195,6 @@ Status BlockingJoinNode::ConstructBuildAndOpenProbe(RuntimeState* state, Thread build_thread( node_name_, "build thread", bind(&BlockingJoinNode::ProcessBuildInputAsync, this, state, build_sink, &build_side_status)); - if (!state->cgroup().empty()) { - Status status = state->exec_env()->cgroups_mgr()->AssignThreadToCgroup( - build_thread, state->cgroup()); - // If AssignThreadToCgroup() failed, we still need to wait for the build-side - // thread to complete before returning, so just log that error. - if (!status.ok()) state->LogError(status.msg()); - } // Open the left child so that it may perform any initialisation in parallel. // Don't exit even if we see an error, we still need to wait for the build thread // to finish. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index a6c8fcd..8c8b2dc 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -153,7 +153,7 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { DCHECK(mem_tracker != NULL); profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName())); mem_tracker_ = mem_tracker; - expr_mem_tracker_.reset(new MemTracker(-1, -1, "Exprs", mem_tracker, false)); + expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker, false)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 59ac997..2dce0d3 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -147,10 +147,9 @@ Status ExecNode::Prepare(RuntimeState* state) { DCHECK(runtime_profile_.get() != NULL); rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT); - mem_tracker_.reset(new MemTracker( - runtime_profile_.get(), -1, -1, runtime_profile_->name(), + mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, runtime_profile_->name(), state->instance_mem_tracker())); - expr_mem_tracker_.reset(new MemTracker(-1, -1, "Exprs", mem_tracker_.get(), false)); + expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false)); rows_returned_rate_ = runtime_profile()->AddDerivedCounter( ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc index 946ab48..52ba1d1 100644 --- a/be/src/exec/hash-join-node.cc +++ b/be/src/exec/hash-join-node.cc @@ -39,7 +39,6 @@ #include "common/names.h" -DECLARE_string(cgroup_hierarchy_path); DEFINE_bool(enable_probe_side_filtering, true, "Deprecated."); using namespace impala; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index de1dad0..c03817b 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -43,7 +43,6 @@ #include "runtime/raw-value.h" #include "runtime/row-batch.h" #include "runtime/string-buffer.h" -#include "scheduling/query-resource-mgr.h" #include "util/bit-util.h" #include "util/container-util.h" #include "util/debug-util.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 83f6452..03f81ed 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -27,7 +27,6 @@ #include "runtime/runtime-state.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" -#include "scheduling/query-resource-mgr.h" #include "util/debug-util.h" #include "util/disk-info.h" #include "util/runtime-profile-counters.h" @@ -154,12 +153,6 @@ Status HdfsScanNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state)); - // Assign scanner thread group to cgroup, if any. - if (!state->cgroup().empty()) { - scanner_threads_.SetCgroupsMgr(state->exec_env()->cgroups_mgr()); - scanner_threads_.SetCgroup(state->cgroup()); - } - // Compute the minimum bytes required to start a new thread. This is based on the // file format. // The higher the estimate, the less likely it is the query will fail but more likely @@ -212,12 +205,6 @@ Status HdfsScanNode::Open(RuntimeState* state) { thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb( bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1)); - if (runtime_state_->query_resource_mgr() != NULL) { - rm_callback_id_ = runtime_state_->query_resource_mgr()->AddVcoreAvailableCb( - bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, - runtime_state_->resource_pool())); - } - return Status::OK(); } @@ -228,9 +215,6 @@ void HdfsScanNode::Close(RuntimeState* state) { if (thread_avail_cb_id_ != -1) { state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_); } - if (state->query_resource_mgr() != NULL && rm_callback_id_ != -1) { - state->query_resource_mgr()->RemoveVcoreAvailableCb(rm_callback_id_); - } scanner_threads_.JoinAll(); @@ -326,8 +310,6 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) // 6. Don't start up a thread if there isn't enough memory left to run it. // 7. Don't start up more than maximum number of scanner threads configured. // 8. Don't start up if there are no thread tokens. - // 9. Don't start up if we are running too many threads for our vcore allocation - // (unless the thread is reserved, in which case it has to run). // Case 4. We have not issued the initial ranges so don't start a scanner thread. // Issuing ranges will call this function and we'll start the scanner threads then. @@ -360,25 +342,12 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) break; } - // Case 9. - if (!is_reserved) { - if (runtime_state_->query_resource_mgr() != NULL && - runtime_state_->query_resource_mgr()->IsVcoreOverSubscribed()) { - pool->ReleaseThreadToken(false); - break; - } - } - COUNTER_ADD(&active_scanner_thread_counter_, 1); COUNTER_ADD(num_scanner_threads_started_counter_, 1); stringstream ss; ss << "scanner-thread(" << num_scanner_threads_started_counter_->value() << ")"; scanner_threads_.AddThread( new Thread("hdfs-scan-node", ss.str(), &HdfsScanNode::ScannerThread, this)); - - if (runtime_state_->query_resource_mgr() != NULL) { - runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1); - } } } @@ -411,9 +380,6 @@ void HdfsScanNode::ScannerThread() { // Unlock before releasing the thread token to avoid deadlock in // ThreadTokenAvailableCb(). l.unlock(); - if (runtime_state_->query_resource_mgr() != NULL) { - runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1); - } runtime_state_->resource_pool()->ReleaseThreadToken(false); if (filter_status.ok()) { for (auto& ctx: filter_ctxs) { @@ -495,9 +461,6 @@ void HdfsScanNode::ScannerThread() { } COUNTER_ADD(&active_scanner_thread_counter_, -1); - if (runtime_state_->query_resource_mgr() != NULL) { - runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1); - } runtime_state_->resource_pool()->ReleaseThreadToken(false); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-scan-node-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-test.cc b/be/src/exec/kudu-scan-node-test.cc index 8324628..a0eabef 100644 --- a/be/src/exec/kudu-scan-node-test.cc +++ b/be/src/exec/kudu-scan-node-test.cc @@ -85,7 +85,7 @@ class KuduScanNodeTest : public testing::Test { exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool()); } - runtime_state_.reset(new RuntimeState(TExecPlanFragmentParams(), "", exec_env_.get())); + runtime_state_.reset(new RuntimeState(TExecPlanFragmentParams(), exec_env_.get())); runtime_state_->InitMemTrackers(TUniqueId(), NULL, -1); TKuduScanNode kudu_scan_node_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index e171afa..827631a 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -35,7 +35,6 @@ #include "runtime/row-batch.h" #include "runtime/string-value.h" #include "runtime/tuple-row.h" -#include "scheduling/query-resource-mgr.h" #include "util/disk-info.h" #include "util/jni-util.h" #include "util/periodic-counter-updater.h" @@ -248,10 +247,6 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) { VLOG_RPC << "Thread started: " << name; scanner_threads_.AddThread(new Thread("kudu-scan-node", name, &KuduScanNode::RunScannerThread, this, name, token)); - - if (runtime_state_->query_resource_mgr() != NULL) { - runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1); - } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-table-sink-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink-test.cc b/be/src/exec/kudu-table-sink-test.cc index 96aec55..a1cbb68 100644 --- a/be/src/exec/kudu-table-sink-test.cc +++ b/be/src/exec/kudu-table-sink-test.cc @@ -49,8 +49,7 @@ static const int THIRD_SLOT_ID = 3; class KuduTableSinkTest : public testing::Test { public: - KuduTableSinkTest() - : runtime_state_(TExecPlanFragmentParams(), "", &exec_env_) {} + KuduTableSinkTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_) {} virtual void SetUp() { // Create a Kudu client and the table (this will abort the test here http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exprs/expr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc index 8a19603..8064de0 100644 --- a/be/src/exprs/expr-test.cc +++ b/be/src/exprs/expr-test.cc @@ -1025,7 +1025,7 @@ template <typename T> void TestSingleLiteralConstruction( const ColumnType& type, const T& value, const string& string_val) { ObjectPool pool; RowDescriptor desc; - RuntimeState state(TExecPlanFragmentParams(), "", NULL); + RuntimeState state(TExecPlanFragmentParams(), NULL); MemTracker tracker; Expr* expr = pool.Add(new Literal(type, value)); @@ -1041,7 +1041,7 @@ TEST_F(ExprTest, NullLiteral) { for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) { NullLiteral expr(static_cast<PrimitiveType>(type)); ExprContext ctx(&expr); - RuntimeState state(TExecPlanFragmentParams(), "", NULL); + RuntimeState state(TExecPlanFragmentParams(), NULL); MemTracker tracker; EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker)); EXPECT_OK(ctx.Open(&state)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/resourcebroker/CMakeLists.txt b/be/src/resourcebroker/CMakeLists.txt deleted file mode 100644 index 776152c..0000000 --- a/be/src/resourcebroker/CMakeLists.txt +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -# where to put generated libraries -set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/resourcebroker") - -# where to put generated binaries -set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/resourcebroker") - -add_library(ResourceBroker - resource-broker.cc -) -add_dependencies(ResourceBroker thrift-deps) - -# TODO: Add resource broker BE test -# ADD_BE_TEST(resource-broker-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/resource-broker.cc ---------------------------------------------------------------------- diff --git a/be/src/resourcebroker/resource-broker.cc b/be/src/resourcebroker/resource-broker.cc deleted file mode 100644 index 4690c59..0000000 --- a/be/src/resourcebroker/resource-broker.cc +++ /dev/null @@ -1,850 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "resourcebroker/resource-broker.h" - -#include <boost/algorithm/string/join.hpp> -#include <boost/uuid/uuid.hpp> -#include <boost/uuid/uuid_generators.hpp> -#include <boost/uuid/uuid_io.hpp> -#include <boost/lexical_cast.hpp> -#include <gutil/strings/substitute.h> -#include <thrift/Thrift.h> - -#include "common/status.h" -#include "rpc/thrift-util.h" -#include "rpc/thrift-server.h" -#include "scheduling/query-resource-mgr.h" -#include "scheduling/scheduler.h" -#include "util/debug-util.h" -#include "util/uid-util.h" -#include "util/network-util.h" -#include "util/llama-util.h" -#include "util/time.h" -#include "gen-cpp/ResourceBrokerService.h" -#include "gen-cpp/Llama_types.h" - -#include "common/names.h" - -using boost::algorithm::join; -using boost::algorithm::to_lower; -using boost::uuids::random_generator; -using namespace ::apache::thrift::server; -using namespace ::apache::thrift; -using namespace impala; -using namespace strings; - -DECLARE_int64(llama_registration_timeout_secs); -DECLARE_int64(llama_registration_wait_secs); -DECLARE_int64(llama_max_request_attempts); - -DECLARE_int32(resource_broker_cnxn_attempts); -DECLARE_int32(resource_broker_cnxn_retry_interval_ms); -DECLARE_int32(resource_broker_send_timeout); -DECLARE_int32(resource_broker_recv_timeout); - -static const string LLAMA_KERBEROS_SERVICE_NAME = "llama"; - -namespace impala { - -// String to search for in Llama error messages to detect that Llama has restarted, -// and hence the resource broker must re-register. -const string LLAMA_RESTART_SEARCH_STRING = "unknown handle"; - -class LlamaNotificationThriftIf : public llama::LlamaNotificationServiceIf { - public: - LlamaNotificationThriftIf(ResourceBroker* resource_broker) - : resource_broker_(resource_broker) {} - - virtual void AMNotification(llama::TLlamaAMNotificationResponse& response, - const llama::TLlamaAMNotificationRequest& request) { - resource_broker_->AMNotification(request, response); - } - - virtual void NMNotification(llama::TLlamaNMNotificationResponse& response, - const llama::TLlamaNMNotificationRequest& request) { - LOG(WARNING) << "Ignoring node-manager notification. Handling not yet implemented."; - response.status.__set_status_code(llama::TStatusCode::OK); - } - - virtual ~LlamaNotificationThriftIf() {} - - private: - ResourceBroker* resource_broker_; -}; - -ResourceBroker::ResourceBroker(const vector<TNetworkAddress>& llama_addresses, - const TNetworkAddress& llama_callback_address, MetricGroup* metrics) : - llama_addresses_(llama_addresses), - active_llama_addr_idx_(-1), - llama_callback_address_(llama_callback_address), - metrics_(metrics), - scheduler_(NULL), - llama_callback_thrift_iface_(new LlamaNotificationThriftIf(this)), - llama_client_cache_(new ClientCache<llama::LlamaAMServiceClient>( - FLAGS_resource_broker_cnxn_attempts, - FLAGS_resource_broker_cnxn_retry_interval_ms, - FLAGS_resource_broker_send_timeout, - FLAGS_resource_broker_recv_timeout, - LLAMA_KERBEROS_SERVICE_NAME)) { - DCHECK(metrics != NULL); - llama_client_cache_->InitMetrics(metrics, "resource-broker"); - active_llama_metric_ = metrics->AddProperty<string>( - "resource-broker.active-llama", "none"); - active_llama_handle_metric_ = metrics->AddProperty<string>( - "resource-broker.active-llama-handle", "none"); - - reservation_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics, - "resource-broker.reservation-request-rpc-time"); - reservation_response_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics, - "resource-broker.reservation-request-response-time"); - reservation_requests_total_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.reservation-requests-total", 0); - reservation_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.reservation-requests-fulfilled", 0); - reservation_requests_failed_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.reservation-requests-failed", 0); - reservation_requests_rejected_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.reservation-requests-rejected", 0); - reservation_requests_timedout_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.reservation-requests-timedout", 0); - - expansion_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics, - "resource-broker.expansion-request-rpc-time"); - expansion_response_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics, - "resource-broker.expansion-request-response-time"); - expansion_requests_total_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.expansion-requests-total", 0); - expansion_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.expansion-requests-fulfilled", 0); - expansion_requests_failed_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.expansion-requests-failed", 0); - expansion_requests_rejected_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.expansion-requests-rejected", 0); - expansion_requests_timedout_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.expansion-requests-timedout", 0); - - requests_released_metric_ = metrics->AddCounter<int64_t>( - "resource-broker.requests-released", 0); - allocated_memory_metric_ = metrics->AddGauge<uint64_t>( - "resource-broker.memory-resources-in-use", 0L); - allocated_vcpus_metric_ = metrics->AddGauge<uint64_t>( - "resource-broker.vcpu-resources-in-use", 0); -} - -Status ResourceBroker::Init() { - // The scheduler must have been set before calling Init(). - DCHECK(scheduler_ != NULL); - DCHECK(llama_callback_thrift_iface_ != NULL); - if (llama_addresses_.size() == 0) { - return Status("No Llama addresses configured (see --llama_addresses)"); - } - - boost::shared_ptr<TProcessor> llama_callback_proc( - new llama::LlamaNotificationServiceProcessor(llama_callback_thrift_iface_)); - llama_callback_server_.reset(new ThriftServer("llama-callback", llama_callback_proc, - llama_callback_address_.port, NULL, metrics_, 5)); - RETURN_IF_ERROR(llama_callback_server_->Start()); - - // Generate client id for registration with Llama, and register with LLama. - random_generator uuid_generator; - llama_client_id_ = uuid_generator(); - RETURN_IF_ERROR(RegisterWithLlama()); - RETURN_IF_ERROR(RefreshLlamaNodes()); - return Status::OK(); -} - -Status ResourceBroker::RegisterWithLlama() { - // Remember the current llama_handle_ to detect if another thread has already - // completed the registration successfully. - llama::TUniqueId current_llama_handle = llama_handle_; - - // Start time that this thread attempted registration. Used to limit the time that a - // query will wait for re-registration with the Llama to succeed. - int64_t start = MonotonicSeconds(); - lock_guard<mutex> l(llama_registration_lock_); - if (llama_handle_ != current_llama_handle) return Status::OK(); - - active_llama_metric_->set_value("none"); - active_llama_handle_metric_->set_value("none"); - - int llama_addr_idx = (active_llama_addr_idx_ + 1) % llama_addresses_.size(); - int64_t now = MonotonicSeconds(); - while (FLAGS_llama_registration_timeout_secs == -1 || - (now - start) < FLAGS_llama_registration_timeout_secs) { - // Connect to the Llama at llama_address. - const TNetworkAddress& llama_address = llama_addresses_[llama_addr_idx]; - // Client status will be ok if a Thrift connection could be successfully established - // for the returned client at some point in the past. Hence, there is no guarantee - // that the connection is still valid now and we must check for broken pipes, etc. - Status client_status; - ClientConnection<llama::LlamaAMServiceClient> llama_client(llama_client_cache_.get(), - llama_address, &client_status); - if (client_status.ok()) { - // Register this resource broker with Llama. - llama::TLlamaAMRegisterRequest request; - request.__set_version(llama::TLlamaServiceVersion::V1); - llama::TUniqueId llama_uuid; - UUIDToTUniqueId(llama_client_id_, &llama_uuid); - request.__set_client_id(llama_uuid); - - llama::TNetworkAddress callback_address; - callback_address << llama_callback_address_; - request.__set_notification_callback_service(callback_address); - llama::TLlamaAMRegisterResponse response; - LOG(INFO) << "Registering Resource Broker with Llama at " << llama_address; - Status rpc_status = - llama_client.DoRpc(&llama::LlamaAMServiceClient::Register, request, &response); - if (rpc_status.ok()) { - // TODO: Is there a period where an inactive Llama may respond to RPCs? - // If so, then we need to keep cycling through Llamas here and not - // return an error. - RETURN_IF_ERROR(LlamaStatusToImpalaStatus( - response.status, "Failed to register Resource Broker with Llama.")); - LOG(INFO) << "Received Llama client handle " << response.am_handle - << ((response.am_handle == llama_handle_) ? " (same as old)" : ""); - llama_handle_ = response.am_handle; - break; - } - } - // Cycle through the list of Llama addresses for Llama failover. - llama_addr_idx = (llama_addr_idx + 1) % llama_addresses_.size(); - LOG(INFO) << "Failed to connect to Llama at " << llama_address << "." << endl - << "Error: " << client_status.GetDetail() << endl - << "Retrying to connect to Llama at " - << llama_addresses_[llama_addr_idx] << " in " - << FLAGS_llama_registration_wait_secs << "s."; - // Sleep to give Llama time to recover/failover before the next attempt. - SleepForMs(FLAGS_llama_registration_wait_secs * 1000); - now = MonotonicSeconds(); - } - DCHECK(FLAGS_llama_registration_timeout_secs != -1); - if ((now - start) >= FLAGS_llama_registration_timeout_secs) { - return Status("Failed to (re-)register Resource Broker with Llama."); - } - - if (llama_addr_idx != active_llama_addr_idx_) { - // TODO: We've switched to a different Llama (failover). Cancel all queries - // coordinated by this Impalad to free up physical resources that are not - // accounted for anymore by Yarn. - } - - // If we reached this point, (re-)registration was successful. - active_llama_addr_idx_ = llama_addr_idx; - active_llama_metric_->set_value(lexical_cast<string>(llama_addresses_[llama_addr_idx])); - active_llama_handle_metric_->set_value(lexical_cast<string>(llama_handle_)); - return Status::OK(); -} - -bool ResourceBroker::LlamaHasRestarted(const llama::TStatus& status) const { - if (status.status_code == llama::TStatusCode::OK || !status.__isset.error_msgs) { - return false; - } - // Check whether one of the error messages contains LLAMA_RESTART_SEARCH_STRING. - for (int i = 0; i < status.error_msgs.size(); ++i) { - string error_msg = status.error_msgs[i]; - to_lower(error_msg); - if (error_msg.find(LLAMA_RESTART_SEARCH_STRING) != string::npos) { - LOG(INFO) << "Assuming Llama restart from error message: " << status.error_msgs[i]; - return true; - } - } - return false; -} - -void ResourceBroker::Close() { - // Close connections to all Llama addresses, not just the active one. - for (const TNetworkAddress& llama_address: llama_addresses_) { - llama_client_cache_->CloseConnections(llama_address); - } - llama_callback_server_->Join(); -} - -void ResourceBroker::CreateLlamaReservationRequest( - const TResourceBrokerReservationRequest& src, - llama::TLlamaAMReservationRequest& dest) { - dest.version = llama::TLlamaServiceVersion::V1; - dest.am_handle = llama_handle_; - dest.gang = src.gang; - // Queue is optional, so must be explicitly set for all versions of Thrift to work - // together. - dest.__set_queue(src.queue); - dest.user = src.user; - dest.resources = src.resources; - random_generator uuid_generator; - llama::TUniqueId request_id; - UUIDToTUniqueId(uuid_generator(), &request_id); - dest.__set_reservation_id(request_id); -} - -template <class F, typename LlamaReqType, typename LlamaRespType> -Status ResourceBroker::LlamaRpc(const F& f, LlamaReqType* request, - LlamaRespType* response, StatsMetric<double>* rpc_time_metric) { - int attempts = 0; - MonotonicStopWatch sw; - // Indicates whether to re-register with Llama before the next RPC attempt, - // e.g. because Llama has restarted or become unavailable. - bool register_with_llama = false; - while (attempts < FLAGS_llama_max_request_attempts) { - if (register_with_llama) { - RETURN_IF_ERROR(ReRegisterWithLlama(*request, response)); - // Set the new Llama handle received from re-registering. - request->__set_am_handle(llama_handle_); - VLOG_RPC << "Retrying Llama RPC after re-registration: " << *request; - register_with_llama = false; - } - ++attempts; - Status rpc_status; - ClientConnection<llama::LlamaAMServiceClient> llama_client(llama_client_cache_.get(), - llama_addresses_[active_llama_addr_idx_], &rpc_status); - if (!rpc_status.ok()) { - register_with_llama = true; - continue; - } - - sw.Start(); - Status status = llama_client.DoRpc(f, *request, response); - if (!status.ok()) { - VLOG_RPC << "Error making Llama RPC: " << status.GetDetail(); - register_with_llama = status.code() == TErrorCode::RPC_CLIENT_CONNECT_FAILURE; - continue; - } - if (rpc_time_metric != NULL) { - rpc_time_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0)); - } - - // Check whether Llama has been restarted. If so, re-register with it. - // Break out of the loop here upon success of the RPC. - if (!LlamaHasRestarted(response->status)) break; - register_with_llama = true; - } - if (attempts >= FLAGS_llama_max_request_attempts) { - return Status(Substitute( - "Request aborted after $0 attempts due to connectivity issues with Llama.", - FLAGS_llama_max_request_attempts)); - } - return Status::OK(); -} - -template <typename LlamaReqType, typename LlamaRespType> -Status ResourceBroker::ReRegisterWithLlama(const LlamaReqType& request, - LlamaRespType* response) { - RETURN_IF_ERROR(RegisterWithLlama()); - return RefreshLlamaNodes(); -} - -template <> -Status ResourceBroker::ReRegisterWithLlama(const llama::TLlamaAMGetNodesRequest& request, - llama::TLlamaAMGetNodesResponse* response) { - return RegisterWithLlama(); -} - -void ResourceBroker::PendingRequest::GetResources(ResourceMap* resources) { - resources->clear(); - for (const llama::TAllocatedResource& resource: allocated_resources_) { - TNetworkAddress host = MakeNetworkAddress(resource.location); - (*resources)[host] = resource; - VLOG_QUERY << "Getting allocated resource for reservation id " - << reservation_id_ << " and location " << host; - } -} - -void ResourceBroker::PendingRequest::SetResources( - const vector<llama::TAllocatedResource>& resources) { - // TODO: Llama returns a dump of all resources that we need to manually group by - // reservation id. Can Llama do the grouping for us? - for (const llama::TAllocatedResource& resource: resources) { - // Ignore resources that don't belong to the given reservation id. - if (resource.reservation_id == request_id()) { - allocated_resources_.push_back(resource); - } - } -} - -bool ResourceBroker::WaitForNotification(int64_t timeout, ResourceMap* resources, - bool* timed_out, PendingRequest* pending_request) { - bool request_granted; - if (timeout <= 0) { - *timed_out = false; - request_granted = pending_request->promise()->Get(); - } else { - request_granted = pending_request->promise()->Get(timeout, timed_out); - } - - // Remove the promise from the pending-requests map. - const llama::TUniqueId& res_id = pending_request->reservation_id(); - { - lock_guard<mutex> l(pending_requests_lock_); - pending_requests_.erase(pending_request->request_id()); - if (pending_request->is_expansion()) { - PendingExpansionIdsMap::iterator it = pending_expansion_ids_.find(res_id); - if (it == pending_expansion_ids_.end()) { - // If the AMNotification was received as the reservation was being cleaned up, - // it's possible that the pending/allocated request structures were updated - // before this thread was able to acquire the lock. - VLOG_RPC << "Didn't find reservation=" << res_id << " in pending requests"; - return false; - } - it->second.erase(pending_request->request_id()); - } - } - - if (request_granted && !*timed_out) { - pending_request->GetResources(resources); - int64_t total_memory_mb = 0L; - int32_t total_vcpus = 0; - for (const ResourceMap::value_type& resource: *resources) { - total_memory_mb += resource.second.memory_mb; - total_vcpus += resource.second.v_cpu_cores; - } - { - lock_guard<mutex> l(allocated_requests_lock_); - AllocatedRequestMap::iterator it = allocated_requests_.find(res_id); - if (it == allocated_requests_.end()) { - // The reservation may have already been cleaned up. See above. - VLOG_RPC << "Didn't find reservation=" << res_id << " in allocated requests"; - return false; - } - it->second.push_back(AllocatedRequest(res_id, total_memory_mb, total_vcpus, - pending_request->is_expansion())); - allocated_memory_metric_->Increment(total_memory_mb * 1024L * 1024L); - allocated_vcpus_metric_->Increment(total_vcpus); - } - } - - return request_granted; -} - -Status ResourceBroker::Expand(const TUniqueId& reservation_id, - const llama::TResource& resource, int64_t timeout_ms, llama::TUniqueId* expansion_id, - llama::TAllocatedResource* allocated_resource) { - llama::TLlamaAMReservationExpansionRequest ll_request; - llama::TLlamaAMReservationExpansionResponse ll_response; - - ll_request.version = llama::TLlamaServiceVersion::V1; - ll_request.am_handle = llama_handle_; - ll_request.expansion_of << reservation_id; - random_generator uuid_generator; - llama::TUniqueId request_id; - UUIDToTUniqueId(uuid_generator(), &request_id); - ll_request.__set_expansion_id(request_id); - ll_request.resource = resource; - VLOG_RPC << "Sending expansion request for reservation_id=" << reservation_id - << " expansion_id=" << request_id - << " resource=" << resource; - - PendingRequest* pending_request; - { - lock_guard<mutex> l(pending_requests_lock_); - PendingExpansionIdsMap::iterator it = - pending_expansion_ids_.find(ll_request.expansion_of); - // If pending_expansion_ids_ doesn't contain the reservation id then the - // QueryResourceMgr has already been unregistered and the reservation has been - // released. - if (it == pending_expansion_ids_.end()) { - return Status(Substitute("Resource expansion request (expansion id=$0, " - "reservation id=$1) made after reservation released.", - PrintId(ll_request.expansion_id), PrintId(reservation_id))); - } - it->second.insert(request_id); - pending_request = new PendingRequest(ll_request.expansion_of, request_id, true); - pending_requests_.insert(make_pair(request_id, pending_request)); - } - - MonotonicStopWatch sw; - sw.Start(); - Status status = LlamaRpc(&llama::LlamaAMServiceClient::Expand, &ll_request, - &ll_response, expansion_rpc_time_metric_); - // Check the status of the response. - if (!status.ok()) { - expansion_requests_failed_metric_->Increment(1); - return status; - } - - Status request_status = LlamaStatusToImpalaStatus(ll_response.status); - if (!request_status.ok()) { - expansion_requests_failed_metric_->Increment(1); - return request_status; - } - - ResourceMap allocated_resources; - bool timed_out = false; - bool request_granted = WaitForNotification(timeout_ms, - &allocated_resources, &timed_out, pending_request); - - if (timed_out) { - expansion_requests_timedout_metric_->Increment(1); - Status release_status = ReleaseRequest(request_id); - if (!release_status.ok()) { - VLOG_QUERY << "Error releasing timed out expansion request, expansion_id=" - << request_id << " status: " << release_status.GetDetail(); - } - return Status(Substitute("Resource expansion request (expansion id=$0, " - "reservation id=$1) exceeded timeout of $2.", - PrintId(ll_request.expansion_id), - PrintId(reservation_id), - PrettyPrinter::Print(timeout_ms * 1000L * 1000L, TUnit::TIME_NS))); - } - expansion_response_time_metric_->Update( - sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0)); - - if (!request_granted) { - if (pending_request->is_cancelled()) { - return Status(Substitute("Resource expansion request (expansion id=$0, " - "reservation id=$1) was cancelled.", PrintId(ll_request.expansion_id), - PrintId(reservation_id))); - } - expansion_requests_rejected_metric_->Increment(1); - return Status(Substitute("Resource expansion request (expansion id=$0, " - "reservation id=$1) was rejected.", PrintId(ll_request.expansion_id), - PrintId(reservation_id))); - } - - DCHECK_EQ(allocated_resources.size(), 1); - *allocated_resource = allocated_resources.begin()->second; - *expansion_id = request_id; - - VLOG_QUERY << "Fulfilled expansion for id=" << ll_response.expansion_id - << " resource=" << *allocated_resource; - expansion_requests_fulfilled_metric_->Increment(1); - return Status::OK(); -} - -Status ResourceBroker::Reserve(const TResourceBrokerReservationRequest& request, - TResourceBrokerReservationResponse* response) { - VLOG_QUERY << "Sending reservation request: " << request; - reservation_requests_total_metric_->Increment(1); - - llama::TLlamaAMReservationRequest ll_request; - llama::TLlamaAMReservationResponse ll_response; - CreateLlamaReservationRequest(request, ll_request); - const llama::TUniqueId& res_id = ll_request.reservation_id; - - PendingRequest* pending_request; - { - pending_request = new PendingRequest(res_id, res_id, false); - lock_guard<mutex> l(pending_requests_lock_); - pending_requests_.insert(make_pair(pending_request->request_id(), pending_request)); - } - { - lock_guard<mutex> l(allocated_requests_lock_); - DCHECK(allocated_requests_.find(res_id) == allocated_requests_.end()); - allocated_requests_[res_id] = vector<AllocatedRequest>(); - } - - MonotonicStopWatch sw; - sw.Start(); - Status status = LlamaRpc(&llama::LlamaAMServiceClient::Reserve, &ll_request, - &ll_response, reservation_rpc_time_metric_); - // Check the status of the response. - if (!status.ok()) { - reservation_requests_failed_metric_->Increment(1); - return status; - } - Status request_status = LlamaStatusToImpalaStatus(ll_response.status); - if (!request_status.ok()) { - reservation_requests_failed_metric_->Increment(1); - return request_status; - } - VLOG_RPC << "Received reservation response, waiting for notification on: " << res_id; - - bool timed_out = false; - bool request_granted = WaitForNotification(request.request_timeout, - &response->allocated_resources, &timed_out, pending_request); - - if (request_granted || timed_out) { - // Set the reservation_id to make sure it eventually gets released - even if when - // timed out, since the response may arrive later. - response->__set_reservation_id(CastTUniqueId<llama::TUniqueId, TUniqueId>(res_id)); - } - - if (timed_out) { - reservation_requests_timedout_metric_->Increment(1); - return Status(Substitute( - "Resource reservation request (id=$0) exceeded timeout of $1.", - PrintId(res_id), - PrettyPrinter::Print(request.request_timeout * 1000L * 1000L, - TUnit::TIME_NS))); - } - reservation_response_time_metric_->Update( - sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0)); - - if (!request_granted) { - reservation_requests_rejected_metric_->Increment(1); - return Status(Substitute("Resource reservation request (id=$0) was rejected.", - PrintId(res_id))); - } - - response->__set_reservation_id(CastTUniqueId<llama::TUniqueId, TUniqueId>(res_id)); - VLOG_QUERY << "Fulfilled reservation with id: " << res_id; - reservation_requests_fulfilled_metric_->Increment(1); - return Status::OK(); -} - -void ResourceBroker::ClearRequests(const TUniqueId& reservation_id) { - int64_t total_memory_bytes = 0L; - int32_t total_vcpus = 0L; - llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, llama::TUniqueId>(reservation_id); - { - lock_guard<mutex> l(pending_requests_lock_); - PendingExpansionIdsMap::iterator it = pending_expansion_ids_.find(llama_id); - if (it != pending_expansion_ids_.end()) { - for (const llama::TUniqueId& id: it->second) { - PendingRequestMap::iterator request_it = pending_requests_.find(id); - DCHECK(request_it != pending_requests_.end()); - if (request_it == pending_requests_.end()) continue; - // It is possible that the AMNotification thread set the promise and the thread - // waiting on the promise hasn't had a chance to acquire the - // pending_requests_lock_ yet to remove it from pending_requests_. We don't need - // to do anything because it will be released with the reservation anyway. - if (request_it->second->promise()->IsSet()) continue; - request_it->second->SetCancelled(); - request_it->second->promise()->Set(false); - } - it->second.clear(); - pending_expansion_ids_.erase(it); - } - } - { - lock_guard<mutex> l(allocated_requests_lock_); - AllocatedRequestMap::iterator it = allocated_requests_.find(llama_id); - if (it == allocated_requests_.end()) return; - for (AllocatedRequest& allocated_req: it->second) { - DCHECK(allocated_req.reservation_id() == llama_id); - total_memory_bytes += (allocated_req.memory_mb() * 1024L * 1024L); - total_vcpus += allocated_req.vcpus(); - } - it->second.clear(); - allocated_requests_.erase(it); - allocated_memory_metric_->Increment(-total_memory_bytes); - allocated_vcpus_metric_->Increment(-total_vcpus); - } - - VLOG_QUERY << "Releasing " - << PrettyPrinter::Print(total_memory_bytes, TUnit::BYTES) - << " and " << total_vcpus << " cores for " << llama_id; -} - -Status ResourceBroker::ReleaseRequest(const llama::TUniqueId& request_id) { - llama::TLlamaAMReleaseRequest llama_request; - llama::TLlamaAMReleaseResponse llama_response; - llama_request.version = llama::TLlamaServiceVersion::V1; - llama_request.am_handle = llama_handle_; - llama_request.reservation_id = request_id; - - RETURN_IF_ERROR(LlamaRpc(&llama::LlamaAMServiceClient::Release, - &llama_request, &llama_response,reservation_rpc_time_metric_)); - RETURN_IF_ERROR(LlamaStatusToImpalaStatus(llama_response.status)); - return Status::OK(); -} - -Status ResourceBroker::ReleaseReservation(const impala::TUniqueId& reservation_id) { - VLOG_QUERY << "Releasing all resources for reservation: " << reservation_id; - llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, llama::TUniqueId>(reservation_id); - - ClearRequests(reservation_id); - RETURN_IF_ERROR(ReleaseRequest(llama_id)); - requests_released_metric_->Increment(1); - return Status::OK(); -} - -void ResourceBroker::AMNotification(const llama::TLlamaAMNotificationRequest& request, - llama::TLlamaAMNotificationResponse& response) { - { - // This Impalad may have restarted, so it is possible Llama is sending notifications - // while this Impalad is registering with Llama. - lock_guard<mutex> l(llama_registration_lock_); - if (request.am_handle != llama_handle_) { - VLOG_QUERY << "Ignoring Llama AM notification with mismatched AM handle. " - << "Known handle: " << llama_handle_ << ". Received handle: " - << request.am_handle; - // Ignore all notifications with mismatched handles. - return; - } - } - // Nothing to be done for heartbeats. - if (request.heartbeat) return; - VLOG_QUERY << "Received non-heartbeat AM notification"; - - lock_guard<mutex> l(pending_requests_lock_); - - // Process granted allocations. - for (const llama::TUniqueId& res_id: request.allocated_reservation_ids) { - PendingRequestMap::iterator it = pending_requests_.find(res_id); - if (it == pending_requests_.end()) { - VLOG_RPC << "Allocation for " << res_id << " arrived after timeout or cleanup"; - continue; - } - if (it->second->promise()->IsSet()) { - // The promise should not have been set unless it was already cancelled. - DCHECK(it->second->is_cancelled()); - continue; - } - LOG(INFO) << "Received allocated resource for reservation id: " << res_id; - it->second->SetResources(request.allocated_resources); - it->second->promise()->Set(true); - } - - // Process rejected allocations. - for (const llama::TUniqueId& res_id: request.rejected_reservation_ids) { - PendingRequestMap::iterator it = pending_requests_.find(res_id); - if (it == pending_requests_.end()) { - VLOG_RPC << "Rejection for " << res_id << " arrived after timeout"; - continue; - } - if (it->second->promise()->IsSet()) { - DCHECK(it->second->is_cancelled()); - continue; - } - it->second->promise()->Set(false); - } - - // TODO: We maybe want a thread pool for handling preemptions to avoid - // blocking this function on query cancellations. - // Process preempted reservations. - for (const llama::TUniqueId& res_id: request.preempted_reservation_ids) { - TUniqueId impala_res_id; - impala_res_id << res_id; - scheduler_->HandlePreemptedReservation(impala_res_id); - } - - // Process preempted client resources. - for (const llama::TUniqueId& res_id: request.preempted_client_resource_ids) { - TUniqueId impala_res_id; - impala_res_id << res_id; - scheduler_->HandlePreemptedResource(impala_res_id); - } - - // Process lost client resources. - for (const llama::TUniqueId& res_id: request.lost_client_resource_ids) { - TUniqueId impala_res_id; - impala_res_id << res_id; - scheduler_->HandlePreemptedResource(impala_res_id); - } - - response.status.__set_status_code(llama::TStatusCode::OK); -} - -void ResourceBroker::NMNotification(const llama::TLlamaNMNotificationRequest& request, - llama::TLlamaNMNotificationResponse& response) { -} - -Status ResourceBroker::RefreshLlamaNodes() { - llama::TLlamaAMGetNodesRequest llama_request; - llama_request.__set_am_handle(llama_handle_); - llama_request.__set_version(llama::TLlamaServiceVersion::V1); - llama::TLlamaAMGetNodesResponse llama_response; - - RETURN_IF_ERROR(LlamaRpc(&llama::LlamaAMServiceClient::GetNodes, &llama_request, - &llama_response, NULL)); - RETURN_IF_ERROR(LlamaStatusToImpalaStatus(llama_response.status)); - llama_nodes_ = llama_response.nodes; - LOG(INFO) << "Llama Nodes [" << join(llama_nodes_, ", ") << "]"; - return Status::OK(); -} - -bool ResourceBroker::GetQueryResourceMgr(const TUniqueId& query_id, - const TUniqueId& reservation_id, const TNetworkAddress& local_resource_address, - QueryResourceMgr** mgr) { - lock_guard<mutex> l(query_resource_mgrs_lock_); - pair<int32_t, QueryResourceMgr*>* entry = &query_resource_mgrs_[query_id]; - if (entry->second == NULL) { - entry->second = - new QueryResourceMgr(reservation_id, local_resource_address, query_id); - DCHECK_EQ(entry->first, 0); - // Also create the per-query entries in the allocated_resources_ and - // pending_expansion_ids_ map. - llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, llama::TUniqueId>(reservation_id); - { - lock_guard<mutex> pending_lock(pending_requests_lock_); - DCHECK(pending_expansion_ids_.find(llama_id) == pending_expansion_ids_.end()); - pending_expansion_ids_[llama_id] = boost::unordered_set<llama::TUniqueId>(); - } - { - lock_guard<mutex> allocated_lock(allocated_requests_lock_); - if (allocated_requests_.find(llama_id) == allocated_requests_.end()) { - allocated_requests_[llama_id] = vector<AllocatedRequest>(); - } - } - } - *mgr = entry->second; - // Return true if this is the first reference to this resource mgr. - return ++entry->first == 1L; -} - -void ResourceBroker::UnregisterQueryResourceMgr(const TUniqueId& query_id) { - lock_guard<mutex> l(query_resource_mgrs_lock_); - QueryResourceMgrsMap::iterator it = query_resource_mgrs_.find(query_id); - DCHECK(it != query_resource_mgrs_.end()) - << "UnregisterQueryResourceMgr() without corresponding GetQueryResourceMgr()"; - if (--it->second.first == 0) { - it->second.second->Shutdown(); - ClearRequests(it->second.second->reservation_id()); - delete it->second.second; - query_resource_mgrs_.erase(it); - } -} - -ostream& operator<<(ostream& os, - const map<TNetworkAddress, llama::TAllocatedResource>& resources) { - typedef map<TNetworkAddress, llama::TAllocatedResource> ResourceMap; - int count = 0; - for (const ResourceMap::value_type& resource: resources) { - os << "(" << resource.first << "," << resource.second << ")"; - if (++count != resources.size()) os << ","; - } - return os; -} - -ostream& operator<<(ostream& os, const TResourceBrokerReservationRequest& request) { - os << "Reservation Request(" - << "queue=" << request.queue << " " - << "user=" << request.user << " " - << "gang=" << request.gang << " " - << "request_timeout=" << request.request_timeout << " " - << "resources=["; - for (int i = 0; i < request.resources.size(); ++i) { - os << request.resources[i]; - if (i + 1 != request.resources.size()) os << ","; - } - os << "])"; - return os; -} - -ostream& operator<<(ostream& os, const TResourceBrokerReservationResponse& reservation) { - os << "Granted Reservation(" - << "reservation id=" << reservation.reservation_id << " " - << "resources=[" << reservation.allocated_resources << "])"; - return os; -} - -ostream& operator<<(ostream& os, const TResourceBrokerExpansionRequest& request) { - os << "Expansion Request(" - << "reservation id=" << request.reservation_id << " " - << "resource=" << request.resource << " " - << "request_timeout=" << request.request_timeout << ")"; - return os; -} - -ostream& operator<<(ostream& os, const TResourceBrokerExpansionResponse& expansion) { - os << "Expansion Response(" - << "reservation id=" << expansion.reservation_id << " " - << "resources=[" << expansion.allocated_resources << "])"; - return os; -} - -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/resource-broker.h ---------------------------------------------------------------------- diff --git a/be/src/resourcebroker/resource-broker.h b/be/src/resourcebroker/resource-broker.h deleted file mode 100644 index b9e0bd7..0000000 --- a/be/src/resourcebroker/resource-broker.h +++ /dev/null @@ -1,424 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef RESOURCE_BROKER_H_ -#define RESOURCE_BROKER_H_ - -#include <boost/unordered_map.hpp> -#include <boost/unordered_set.hpp> -#include <boost/scoped_ptr.hpp> -#include <boost/uuid/uuid.hpp> - -#include "runtime/client-cache.h" -#include "util/collection-metrics.h" -#include "util/promise.h" -#include "util/stopwatch.h" -#include "gen-cpp/LlamaAMService.h" -#include "gen-cpp/LlamaNotificationService.h" -#include "gen-cpp/ResourceBrokerService_types.h" - -namespace impala { - -class QueryResourceMgr; -class Status; -class MetricGroup; -class Scheduler; -class ResourceBrokerNotificationServiceClient; -class RuntimeProfile; - -/// Mediates resource-reservation requests between Impala and Yarn via the Llama service. -/// The resource broker requests resources via the Llama's thrift interface and exposes -/// a thrift server for the Llama to notify it of granted/denied/preempted resource -/// reservations. The reserve/release API of the resource broker is blocking. -/// The resource broker is configured with a list of Llama addresses that -/// are cycled through for failover. -/// TODO: Implement NM notification service. -class ResourceBroker { - public: - ResourceBroker(const std::vector<TNetworkAddress>& llama_addresses, - const TNetworkAddress& llama_callback_address, MetricGroup* metrics); - - /// Register this resource broker with LLama and starts the Llama callback service. - /// Returns a non-OK status if the callback service failed to start (e.g., port in use) - /// or if registration with the Llama failed (e.g., connection to Llama failed). - Status Init(); - - /// Closes the llama_client_cache_ and joins the llama_callback_server_. - void Close(); - - /// Requests resources from Llama. Blocks until the request has been granted or denied. - /// TODO: Remove thrift interface - Status Reserve(const TResourceBrokerReservationRequest& request, - TResourceBrokerReservationResponse* response); - - /// Requests more resources specified by 'resource' from Llama for an existing - /// reservation specified by the 'reservation_id'. Blocks until the request has been - /// granted or rejected, or no response is received within the timeout specified, in - /// which case a call to cancel the outstanding expansion is made and the call returns - /// with an error status. If timeout_ms <= 0, the call will not timeout. If the - /// expansion is successful, an OK status is returned and the 'expansion_id' and - /// 'allocated_resource' are set. An error status is returned if a timeout or an error - /// occurs. - Status Expand(const TUniqueId& reservation_id, const llama::TResource& resource, - int64_t timeout_ms, llama::TUniqueId* expansion_id, - llama::TAllocatedResource* allocated_resource); - - /// Removes the record of all resource requests associated with this - /// 'reservation_id', updating the per-node accounting of resources and cancels any - /// threads waiting on pending expansions. Does not communicate this to Llama, so the - /// coordinator should always call ReleaseReservation() to make sure that Llama knows - /// the resources should be released. - void ClearRequests(const TUniqueId& reservation_id); - - /// Releases resources acquired from Llama for this reservation and all associated - /// expansion requests across _all_ nodes. Should therefore only be called once per - /// query by the coordinator. - Status ReleaseReservation(const TUniqueId& reservation_id); - - /// Handles asynchronous Llama Application Master (AM) notifications including - /// granted/denied/preempted reservations and resources. - void AMNotification(const llama::TLlamaAMNotificationRequest& request, - llama::TLlamaAMNotificationResponse& response); - - /// Handles asynchronous notifications from the Llama Node Manager (NM) - /// auxiliary service, in particular, incoming Yarn container allocations - /// that are going to claim resources. - /// TODO: Implement once NM service is fully functional. - void NMNotification(const llama::TLlamaNMNotificationRequest& request, - llama::TLlamaNMNotificationResponse& response); - - const std::vector<std::string>& llama_nodes() { return llama_nodes_; } - - /// Retrieves the nodes known to Llama and stores them in llama_nodes_. - Status RefreshLlamaNodes(); - - void set_scheduler(Scheduler* scheduler) { scheduler_ = scheduler; }; - - /// Retrieves or creates a new QueryResourceMgr for the given query ID. Returns true if - /// this is the first 'checkout' of this QueryResourceMgr, false otherwise. The other - /// parameters are passed to the QueryResourceMgr constructor. - bool GetQueryResourceMgr(const TUniqueId& query_id, const TUniqueId& reservation_id, - const TNetworkAddress& local_resource_address, QueryResourceMgr** res_mgr); - - /// Decrements the reference count for a particular QueryResourceMgr. If this is the last - /// reference (i.e. the ref count goes to 0), the QueryResourceMgr is deleted. It's an - /// error to call this with a query_id that does not have a registered QueryResourceMgr. - void UnregisterQueryResourceMgr(const TUniqueId& query_id); - - private: - typedef std::map<TNetworkAddress, llama::TAllocatedResource> ResourceMap; - - bool has_standby_llama() { return llama_addresses_.size() > 1; } - - /// Registers this resource broker with the Llama. Cycles through the list of - /// Llama addresses to find the active Llama which is accepting requests (if any). - /// Returns a non-OK status if registration with any of the Llama's did not succeed - /// within FLAGS_llama_registration_timeout_s seconds. - /// Registration with the Llama is idempotent with respect to the llama_client_id_ - /// (see comment on llama_client_id_ for details). - Status RegisterWithLlama(); - - /// Issues the Llama RPC f where F is a thrift call taking LlamaReqType and returning - /// LlamaRespType. If failures occur, this function handles re-registering with Llama - /// if necessary and re-trying multiple times. If rpc_time_metric is non-NULL, the - /// metric is updated upon success of the RPC. Returns a non-OK status if the RPC - /// failed due to connectivity issues with the Llama. Returns OK if the RPC succeeded. - template <class F, typename LlamaReqType, typename LlamaRespType> - Status LlamaRpc(const F& f, LlamaReqType* request, LlamaRespType* response, - StatsMetric<double>* rpc_time_metric); - - /// Re-registers with Llama to recover from the Llama being unreachable. Handles both - /// Llama restart and failover. This function is a template to allow specialization on - /// the Llama request/response type. - template <typename LlamaReqType, typename LlamaRespType> - Status ReRegisterWithLlama(const LlamaReqType& request, LlamaRespType* response); - - /// Detects Llama restarts from the given return status of a Llama RPC. - bool LlamaHasRestarted(const llama::TStatus& status) const; - - /// Sends a Llama release RPC for the reservation or expansion with the specified - /// request_id. - Status ReleaseRequest(const llama::TUniqueId& request_id); - - /// Creates a Llama reservation request from a resource broker reservation request. - void CreateLlamaReservationRequest(const TResourceBrokerReservationRequest& src, - llama::TLlamaAMReservationRequest& dest); - - class PendingRequest; - /// Wait for a reservation or expansion request to be fulfilled by the Llama via an - /// async call into LlamaNotificationThriftIf::AMNotification(). If timeout_ms > 0, the - /// call will not wait longer than timeout_ms before returning false and *timed_out set - /// to true. If the request is fulfilled, resources and reservation_id are populated. - bool WaitForNotification(int64_t timeout_ms, ResourceMap* resources, bool* timed_out, - PendingRequest* reservation); - - /// Llama availability group. - std::vector<TNetworkAddress> llama_addresses_; - - /// Indexes into llama_addresses_ indicating the currently active Llama. - /// Protected by llama_registration_lock_. - int active_llama_addr_idx_; - - /// Address of thrift server started in this resource broker to handle - /// Llama notifications. - TNetworkAddress llama_callback_address_; - - MetricGroup* metrics_; - - Scheduler* scheduler_; - - /// Address of the active Llama. A Llama is considered active once we have successfully - /// registered with it. Set to "none" while registering with the Llama. - StringProperty* active_llama_metric_; - - /// Llama handle received from the active Llama upon registration. - /// Set to "none" while not registered with Llama. - StringProperty* active_llama_handle_metric_; - - /// Accumulated statistics on the time taken to RPC a reservation request and receive - /// an acknowledgement from Llama. - StatsMetric<double>* reservation_rpc_time_metric_; - - /// Accumulated statistics on the time taken to complete a reservation request - /// (granted or denied). The time includes the request RPC to Llama and the time - /// the requesting thread waits on the pending_requests_'s promise. - /// The metric does not include requests that timed out. - StatsMetric<double>* reservation_response_time_metric_; - - /// Total number of reservation requests. - IntCounter* reservation_requests_total_metric_; - - /// Number of fulfilled reservation requests. - IntCounter* reservation_requests_fulfilled_metric_; - - /// Reservation requests that failed due to a malformed request or an internal - /// error in Llama. - IntCounter* reservation_requests_failed_metric_; - - /// Number of well-formed reservation requests rejected by the central scheduler. - IntCounter* reservation_requests_rejected_metric_; - - /// Number of well-formed reservation requests that did not get fulfilled within - /// the timeout period. - IntCounter* reservation_requests_timedout_metric_; - - /// Accumulated statistics on the time taken to RPC an expansion request and receive an - /// acknowledgement from Llama. - StatsMetric<double>* expansion_rpc_time_metric_; - - /// Accumulated statistics on the time taken to complete an expansion request - /// (granted or denied). The time includes the request RPC to Llama and the time - /// the requesting thread waits on the pending_requests_'s promise. - /// The metric does not include requests that timed out. - StatsMetric<double>* expansion_response_time_metric_; - - /// Total number of expansion requests. - IntCounter* expansion_requests_total_metric_; - - /// Number of fulfilled expansion requests. - IntCounter* expansion_requests_fulfilled_metric_; - - /// Expansion requests that failed due to a malformed request or an internal - /// error in Llama. - IntCounter* expansion_requests_failed_metric_; - - /// Number of well-formed expansion requests rejected by the central scheduler. - IntCounter* expansion_requests_rejected_metric_; - - /// Number of well-formed expansion requests that did not get fulfilled within - /// the timeout period. - IntCounter* expansion_requests_timedout_metric_; - - /// Total amount of memory currently allocated by Llama to this node - UIntGauge* allocated_memory_metric_; - - /// Total number of vcpu cores currently allocated by Llama to this node - UIntGauge* allocated_vcpus_metric_; - - /// Total number of fulfilled reservation requests that have been released. - IntCounter* requests_released_metric_; - - /// Client id used to register with Llama. Set in Init(). Used to communicate to Llama - /// whether this Impalad has restarted. Registration with Llama is idempotent if the - /// same llama_client_id_ is passed, i.e., the same Llama handle is returned and - /// resource allocations are preserved. From Llama's perspective an unknown - /// llama_client_id_ indicates a new registration and all resources allocated by this - /// Impalad under a different llama_client_id_ are consider lost and will be released. - boost::uuids::uuid llama_client_id_; - - /// Thrift API implementation which proxies Llama notifications onto this ResourceBroker. - boost::shared_ptr<llama::LlamaNotificationServiceIf> llama_callback_thrift_iface_; - boost::scoped_ptr<ThriftServer> llama_callback_server_; - - /// Cache of Llama client connections. - boost::scoped_ptr<ClientCache<llama::LlamaAMServiceClient>> llama_client_cache_; - - /// Lock to ensure that only a single registration with Llama is sent, e.g., - /// when multiple concurrent requests realize that Llama has restarted. - boost::mutex llama_registration_lock_; - - /// Handle received from Llama during registration. Set in RegisterWithLlama(). - llama::TUniqueId llama_handle_; - - /// List of nodes registered with Llama. Set in RefreshLlamaNodes(). - std::vector<std::string> llama_nodes_; - - /// A PendingRequest tracks a single reservation or expansion request that is in flight - /// to Llama. A new PendingRequest is created in either Expand() or Reserve(), and its - /// promise() is blocked on there until a response is received for that request from - /// Llama via AMNotification(), or until a timeout occurs. - // - /// Every request has a unique request_id which is assigned by the resource broker. Each - /// request is also associated with exactly one reservation, via reservation_id(). This - /// allows us to track which resources belong to which reservation, and to make sure that - /// all are correctly accounted for when the reservation is released. Each reservation ID - /// will belong to exactly one reservation request, and 0 or more expansion requests. - class PendingRequest { - public: - PendingRequest(const llama::TUniqueId& reservation_id, - const llama::TUniqueId& request_id, bool is_expansion) - : reservation_id_(reservation_id), request_id_(request_id), - is_expansion_(is_expansion), is_cancelled_(false) { - DCHECK(is_expansion || reservation_id == request_id); - } - - /// Promise is set to true if the reservation or expansion request was granted, false - /// if it was rejected by Yarn. When promise()->Get() returns true, - /// allocated_resources_ will be populated and it will be safe to call GetResources(). - Promise<bool>* promise() { return &promise_; } - - /// Called by WaitForNotification() to populate a map of resources once the - /// corresponding request has returned successfully (and promise() therefore has - /// returned true). - void GetResources(ResourceMap* resources); - - /// Populates allocated_resources_ from all members of resources that match the given - /// reservation id. Called in AMNotification(). - void SetResources(const std::vector<llama::TAllocatedResource>& resources); - - const llama::TUniqueId& request_id() const { return request_id_; } - const llama::TUniqueId& reservation_id() const { return reservation_id_; } - - bool is_expansion() const { return is_expansion_; } - bool is_cancelled() const { return is_cancelled_; } - - /// Sets the cancelled flag to true. Is only called before the promise is set and - /// while the pending_requests_lock_ is held to avoid races. - void SetCancelled() { is_cancelled_ = true; } - - private: - /// Promise object that WaitForNotification() waits on and AMNotification() signals. - Promise<bool> promise_; - - /// Filled in by AMNotification(), so that WaitForNotification() can read the set of - /// allocated_resources without AMNotification() having to wait (hence the copy is - /// deliberate, since the original copy may go out of scope). - std::vector<llama::TAllocatedResource> allocated_resources_; - - /// The ID for the reservation associated with this request. There is always exactly - /// one reservation associated with every request. - llama::TUniqueId reservation_id_; - - /// The unique ID for this request. If this is a reservation request, request_id_ == - /// reservation_id_, otherwise this is generated during Expand(). - llama::TUniqueId request_id_; - - /// True if this is an expansion request, false if it is a reservation request - bool is_expansion_; - - /// Set if the request was cancelled. - bool is_cancelled_; - }; - - /// Protects pending_requests_ and pending_expansion_ids_ - boost::mutex pending_requests_lock_; - - /// Map from unique request ID provided to Llama (for both reservation and expansion - /// requests) to PendingRequest object used to coordinate when a response is received - /// from Llama. - typedef boost::unordered_map<llama::TUniqueId, PendingRequest*> PendingRequestMap; - PendingRequestMap pending_requests_; - - /// Map from reservation IDs to pending expansion IDs. All pending request IDs have a - /// PendingRequest in pending_requests_. - typedef boost::unordered_map<llama::TUniqueId, boost::unordered_set<llama::TUniqueId>> - PendingExpansionIdsMap; - PendingExpansionIdsMap pending_expansion_ids_; - - /// An AllocatedRequest tracks resources allocated in response to one reservation or - /// expansion request. - class AllocatedRequest { - public: - AllocatedRequest(const llama::TUniqueId& reservation_id, - uint64_t memory_mb, uint32_t vcpus, bool is_expansion) - : reservation_id_(reservation_id), memory_mb_(memory_mb), vcpus_(vcpus), - is_expansion_(is_expansion) { } - - const llama::TUniqueId reservation_id() const { return reservation_id_; } - uint64_t memory_mb() const { return memory_mb_; } - uint32_t vcpus() const { return vcpus_; } - bool is_expansion() const { return is_expansion_; } - - private: - /// The reservation ID for this request. Expansions all share the same reservation ID. - llama::TUniqueId reservation_id_; - - /// The total memory allocated to this request - uint64_t memory_mb_; - - /// The number of VCPUs allocated to this request - uint32_t vcpus_; - - /// True if this is an expansion request, false if it is a reservation request - bool is_expansion_; - }; - - /// Protectes allocated_requests_ - boost::mutex allocated_requests_lock_; - - /// Map from reservation ID to all satisfied requests - reservation and expansion - - /// associated with that reservation. Used only for bookkeeping so that Impala can report - /// on the current resource usage. - typedef boost::unordered_map<llama::TUniqueId, std::vector<AllocatedRequest>> - AllocatedRequestMap; - AllocatedRequestMap allocated_requests_; - - /// Protects query_resource_mgrs_ - boost::mutex query_resource_mgrs_lock_; - typedef boost::unordered_map<TUniqueId, std::pair<int32_t, QueryResourceMgr*>> - QueryResourceMgrsMap; - - /// Map from query ID to a (ref_count, QueryResourceMgr*) pair, i.e. one QueryResourceMgr - /// per query. The refererence count is always non-zero - once it hits zero the entry in - /// the map is removed and the QueryResourceMgr is deleted. - QueryResourceMgrsMap query_resource_mgrs_; -}; - -std::ostream& operator<<(std::ostream& os, - const TResourceBrokerReservationRequest& request); - -std::ostream& operator<<(std::ostream& os, - const TResourceBrokerReservationResponse& reservation); - -std::ostream& operator<<(std::ostream& os, - const TResourceBrokerExpansionRequest& request); - -std::ostream& operator<<(std::ostream& os, - const TResourceBrokerExpansionResponse& expansion); -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/buffered-block-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index 5b5ee8a..169baf2 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -553,8 +553,8 @@ class BufferedBlockMgrTest : public ::testing::Test { const int num_threads = 8; thread_group workers; // Create a shared RuntimeState with no BufferedBlockMgr. - RuntimeState* shared_state = new RuntimeState(TExecPlanFragmentParams(), "", - test_env_->exec_env()); + RuntimeState* shared_state = + new RuntimeState(TExecPlanFragmentParams(), test_env_->exec_env()); for (int i = 0; i < num_threads; ++i) { thread* t = new thread(bind( &BufferedBlockMgrTest::CreateDestroyThread, this, shared_state)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/buffered-block-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index 90c1041..d582c31 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -1317,8 +1317,8 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile, integrity_check_timer_ = ADD_TIMER(profile_.get(), "TotalIntegrityCheckTime"); // Create a new mem_tracker and allocate buffers. - mem_tracker_.reset(new MemTracker( - profile(), mem_limit, -1, "Block Manager", parent_tracker)); + mem_tracker_.reset( + new MemTracker(profile(), mem_limit, "Block Manager", parent_tracker)); initialized_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/collection-value-builder-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/collection-value-builder-test.cc b/be/src/runtime/collection-value-builder-test.cc index c7843b7..b8f4b65 100644 --- a/be/src/runtime/collection-value-builder-test.cc +++ b/be/src/runtime/collection-value-builder-test.cc @@ -40,7 +40,7 @@ TEST(CollectionValueBuilderTest, MaxBufferSize) { CollectionValue coll_value; int64_t initial_capacity = (INT_MAX / 8) + 1; int64_t mem_limit = initial_capacity * 4 * 4; - MemTracker tracker(mem_limit, mem_limit); + MemTracker tracker(mem_limit); MemPool pool(&tracker); CollectionValueBuilder coll_value_builder( &coll_value, tuple_desc, &pool, NULL, initial_capacity); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 4728866..e2dd1a4 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -62,7 +62,6 @@ #include "util/error-util.h" #include "util/hdfs-bulk-ops.h" #include "util/hdfs-util.h" -#include "util/llama-util.h" #include "util/network-util.h" #include "util/pretty-printer.h" #include "util/summary-util.h" @@ -486,7 +485,7 @@ Status Coordinator::Exec(QuerySchedule& schedule, runtime_state()->obj_pool(), request.fragments[0].output_exprs, output_expr_ctxs)); MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new MemTracker( - -1, -1, "Output exprs", runtime_state()->instance_mem_tracker(), false)); + -1, "Output exprs", runtime_state()->instance_mem_tracker(), false)); RETURN_IF_ERROR(Expr::Prepare( *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker)); } else { @@ -503,12 +502,12 @@ Status Coordinator::Exec(QuerySchedule& schedule, MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker( schedule.request_pool(), exec_env_->process_mem_tracker()); query_mem_tracker_ = - MemTracker::GetQueryMemTracker(query_id_, query_limit, -1, pool_tracker, NULL); + MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker); executor_.reset(NULL); } - filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter (Coordinator)", - query_mem_tracker(), false)); + filter_mem_tracker_.reset( + new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false)); // Initialize the execution profile structures. InitExecProfile(request); @@ -1900,20 +1899,6 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule, SetExecPlanDescriptorTable(fragment, rpc_params); TNetworkAddress exec_host = params.hosts[fragment_instance_idx]; - if (schedule.HasReservation()) { - // The reservation has already have been validated at this point. - TNetworkAddress resource_hostport; - schedule.GetResourceHostport(exec_host, &resource_hostport); - map<TNetworkAddress, llama::TAllocatedResource>::const_iterator it = - schedule.reservation()->allocated_resources.find(resource_hostport); - // Only set reserved resource if we actually have one for this plan - // fragment. Otherwise, don't set it (usually this the coordinator fragment), and it - // won't participate in dynamic RM controls. - if (it != schedule.reservation()->allocated_resources.end()) { - fragment_instance_ctx.__set_reserved_resource(it->second); - fragment_instance_ctx.__set_local_resource_address(resource_hostport); - } - } FragmentScanRangeAssignment::const_iterator it = params.scan_range_assignment.find(exec_host); // Scan ranges may not always be set, so use an empty structure if so. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc index 9bd51af..50103ba 100644 --- a/be/src/runtime/data-stream-recvr.cc +++ b/be/src/runtime/data-stream-recvr.cc @@ -295,7 +295,7 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t is_merging_(is_merging), num_buffered_bytes_(0), profile_(profile) { - mem_tracker_.reset(new MemTracker(-1, -1, "DataStreamRecvr", parent_tracker)); + mem_tracker_.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker)); // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; sender_queues_.reserve(num_queues); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index dae4724..f5eb783 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -112,9 +112,7 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf { class DataStreamTest : public testing::Test { protected: - DataStreamTest() - : runtime_state_(TExecPlanFragmentParams(), "", &exec_env_), - next_val_(0) { + DataStreamTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_), next_val_(0) { // Initialize Mem trackers for use by the data stream receiver. exec_env_.InitForFeTests(); runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1); @@ -482,7 +480,7 @@ class DataStreamTest : public testing::Test { void Sender(int sender_num, int channel_buffer_size, TPartitionType::type partition_type) { - RuntimeState state(TExecPlanFragmentParams(), "", &exec_env_); + RuntimeState state(TExecPlanFragmentParams(), &exec_env_); state.set_desc_tbl(desc_tbl_); state.InitMemTrackers(TUniqueId(), NULL, -1); VLOG_QUERY << "create sender " << sender_num; @@ -596,7 +594,7 @@ TEST_F(DataStreamTest, BasicTest) { // TODO: Make lifecycle requirements more explicit. TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) { scoped_ptr<RuntimeState> runtime_state( - new RuntimeState(TExecPlanFragmentParams(), "", &exec_env_)); + new RuntimeState(TExecPlanFragmentParams(), &exec_env_)); runtime_state->InitMemTrackers(TUniqueId(), NULL, -1); scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));