This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ca88447 IMPALA-9930 (part 2): Introduce new admission control rpc
service
ca88447 is described below
commit ca884476bb9f3ed2de3927a8c7d8107f8551f8f8
Author: Thomas Tauber-Marshall <[email protected]>
AuthorDate: Tue Jul 14 10:35:42 2020 -0700
IMPALA-9930 (part 2): Introduce new admission control rpc service
This patch introduces a new krpc service, AdmissionControlService,
which coordinators can use to submit queries for admission.
This patch adds some simple configuration flags that make it possible
to have coordinators use this service to submit their queries for
admission to other coordinators. These flags are only to make this
patch testable and will be replaced when the separate admission
control daemon is introduced in IMPALA-9975.
The interface consists of the following RPCs:
- AdmitQuery: takes a TQueryExecRequest and a TQueryOptions
(serialized into sidecars), places the request on a queue to be
processed by a thread pool and then immediately returns.
- GetQueryStatus: takes a query id and returns the current admission
status, including the QuerySchedulePB if admission has completed
successfully but the query has not been released yet.
- ReleaseQueryBackends: called when individual backends complete but
the overall query is still running to release resources
incrementally. This RPC will be called at most O(log(# backends))
per query due to BackendResourceState, which batches backends to
release together.
- ReleaseQuery: called when the query has completely finished.
Releases all remaining resources.
- CancelAdmission: called if a query is cancelled before an admission
decision has been made to indicate that it should no longer be
considered for admission.
The majority of the patch consists of two classes:
- AdmissionControlClient: used to abstract whether admission is being
performed locally or remotely. In the local case, it is basically
just a wrapper around AdmissionController. In the remote case, it
handles serializing/deserializing of RPC params, polling
GetQueryStatus() until a decision has been made, etc.
- AdmissionControlService: exports the RPC interface and acts as a
wrapper around AdmissionController.
Some notable changes involved:
- AdmissionController::SubmitForAdmission() no longer blocks while a
query is queued. Instead, a new function WaitOnQueued() can be used
to monitor the admission status of a queued query.
- Adding events to the query timeline is moved out of
AdmissionController and into the AdmissionControlClient classes, so
that it always happens on the coordinator.
- When a cluster is run in the new admission control service mode,
only the impalad that is performing admission control exposes the
/admission http endpoint. Observability will be cleaned up in a
subsequent patch.
Testing:
- Modified existing admission control tests to run both with and
without the admission control service enabled, including both the
functional and stress tests. The 'num_queries' param in the stress
test is modified to only use a single value to reduce the number of
tests that are run and keep the running time reasonable.
- Ran tpch10 on a local minicluster and observed no significant
regressions.
Change-Id: I594fc593a27b24b6952e381a9bc1a9a5c6b757ae
Reviewed-on: http://gerrit.cloudera.org:8080/16412
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/exec-env.cc | 15 ++
be/src/runtime/exec-env.h | 3 +
be/src/scheduling/CMakeLists.txt | 2 +
be/src/scheduling/admission-control-client.cc | 22 +-
be/src/scheduling/admission-control-client.h | 7 +-
be/src/scheduling/admission-control-service.cc | 300 +++++++++++++++++++++
be/src/scheduling/admission-control-service.h | 138 ++++++++++
be/src/scheduling/admission-controller-test.cc | 2 +-
be/src/scheduling/admission-controller.cc | 186 ++++++++-----
be/src/scheduling/admission-controller.h | 37 ++-
.../scheduling/local-admission-control-client.cc | 17 +-
be/src/scheduling/local-admission-control-client.h | 1 +
.../scheduling/remote-admission-control-client.cc | 241 +++++++++++++++++
...-client.h => remote-admission-control-client.h} | 31 ++-
be/src/scheduling/schedule-state.cc | 25 +-
be/src/scheduling/schedule-state.h | 11 +-
be/src/service/client-request-state.cc | 6 +-
be/src/service/impala-http-handler.cc | 63 +++--
be/src/util/sharded-query-map-util.cc | 11 +
common/protobuf/admission_control_service.proto | 97 +++++++
tests/common/resource_pool_config.py | 8 +-
tests/custom_cluster/test_admission_controller.py | 160 ++++++++---
tests/hs2/hs2_test_suite.py | 12 +
tests/util/web_pages_util.py | 6 +-
24 files changed, 1222 insertions(+), 179 deletions(-)
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 48dd888..ff87be7 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -43,6 +43,7 @@
#include "runtime/query-exec-mgr.h"
#include "runtime/thread-resource-mgr.h"
#include "runtime/tmp-file-mgr.h"
+#include "scheduling/admission-control-service.h"
#include "scheduling/admission-controller.h"
#include "scheduling/cluster-membership-mgr.h"
#include "scheduling/request-pool-service.h"
@@ -162,6 +163,12 @@ DEFINE_int32(metrics_webserver_port, 0,
DEFINE_string(metrics_webserver_interface, "",
"Interface to start metrics webserver on. If blank, webserver binds to
0.0.0.0");
+DEFINE_bool(is_admission_controller, false,
+ "(Experimental) If true, this impalad will export the
AdmissionControlService "
+ "interface, allowing it to perform admission control for coordinators that
have "
+ "--admission_control_service_addr set to point to this impalad. This flag
will be "
+ "removed in a future version, see IMPALA-9155.");
+
const static string DEFAULT_FS = "fs.defaultFS";
// The multiplier for how many queries a dedicated coordinator can run
compared to an
@@ -281,6 +288,8 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int
webserver_port,
if (FLAGS_is_coordinator) {
hdfs_op_thread_pool_.reset(
CreateHdfsOpThreadPool("hdfs-worker-pool",
FLAGS_num_hdfs_worker_threads, 1024));
+ }
+ if (FLAGS_is_coordinator || FLAGS_is_admission_controller) {
scheduler_.reset(new Scheduler(metrics_.get(),
request_pool_service_.get()));
}
@@ -410,6 +419,12 @@ Status ExecEnv::Init() {
data_svc_.reset(new DataStreamService(rpc_metrics_));
RETURN_IF_ERROR(data_svc_->Init());
RETURN_IF_ERROR(stream_mgr_->Init(data_svc_->mem_tracker()));
+ // Initialize the AdmissionControlService, if enabled.
+ if (FLAGS_is_admission_controller) {
+ admission_svc_.reset(new AdmissionControlService(rpc_metrics_));
+ RETURN_IF_ERROR(admission_svc_->Init());
+ }
+
// Bump thread cache to 1GB to reduce contention for TCMalloc central
// list's spinlock.
if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index c685613..0fed647 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -41,6 +41,7 @@ class KuduClient;
namespace impala {
class AdmissionController;
+class AdmissionControlService;
class BufferPool;
class CallableThreadPool;
class ClusterMembershipMgr;
@@ -143,6 +144,7 @@ class ExecEnv {
ClusterMembershipMgr* cluster_membership_mgr() { return
cluster_membership_mgr_.get(); }
Scheduler* scheduler() { return scheduler_.get(); }
AdmissionController* admission_controller() { return
admission_controller_.get(); }
+ AdmissionControlService* admission_control_service() { return
admission_svc_.get(); }
StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
const TNetworkAddress& configured_backend_address() const {
@@ -207,6 +209,7 @@ class ExecEnv {
boost::scoped_ptr<RpcMgr> rpc_mgr_;
boost::scoped_ptr<ControlService> control_svc_;
boost::scoped_ptr<DataStreamService> data_svc_;
+ boost::scoped_ptr<AdmissionControlService> admission_svc_;
/// Query-wide buffer pool and the root reservation tracker for the pool. The
/// reservation limit is equal to the maximum capacity of the pool. Created
in
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 90f5357..05a80de 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -30,12 +30,14 @@ add_library(Scheduling STATIC
${ADMISSION_CONTROL_SERVICE_PROTO_SRCS}
admission-controller.cc
admission-control-client.cc
+ admission-control-service.cc
executor-blacklist.cc
cluster-membership-mgr.cc
cluster-membership-test-util.cc
executor-group.cc
hash-ring.cc
local-admission-control-client.cc
+ remote-admission-control-client.cc
request-pool-service.cc
scheduler-test-util.cc
schedule-state.cc
diff --git a/be/src/scheduling/admission-control-client.cc
b/be/src/scheduling/admission-control-client.cc
index e5f4c51..bf1cc7a 100644
--- a/be/src/scheduling/admission-control-client.cc
+++ b/be/src/scheduling/admission-control-client.cc
@@ -17,14 +17,30 @@
#include "scheduling/admission-control-client.h"
-#include "common/names.h"
#include "scheduling/local-admission-control-client.h"
+#include "scheduling/remote-admission-control-client.h"
+
+#include "common/names.h"
+
+DECLARE_string(admission_control_service_addr);
+DECLARE_bool(is_admission_controller);
namespace impala {
+// Profile query events
+const string AdmissionControlClient::QUERY_EVENT_SUBMIT_FOR_ADMISSION =
+ "Submit for admission";
+const string AdmissionControlClient::QUERY_EVENT_QUEUED = "Queued";
+const string AdmissionControlClient::QUERY_EVENT_COMPLETED_ADMISSION =
+ "Completed admission";
+
void AdmissionControlClient::Create(
- const TUniqueId& query_id, unique_ptr<AdmissionControlClient>* client) {
- client->reset(new LocalAdmissionControlClient(query_id));
+ const TQueryCtx& query_ctx, unique_ptr<AdmissionControlClient>* client) {
+ if (FLAGS_is_admission_controller ||
FLAGS_admission_control_service_addr.empty()) {
+ client->reset(new LocalAdmissionControlClient(query_ctx.query_id));
+ } else {
+ client->reset(new RemoteAdmissionControlClient(query_ctx));
+ }
}
} // namespace impala
diff --git a/be/src/scheduling/admission-control-client.h
b/be/src/scheduling/admission-control-client.h
index 6d6dbb6..2622b41 100644
--- a/be/src/scheduling/admission-control-client.h
+++ b/be/src/scheduling/admission-control-client.h
@@ -32,14 +32,19 @@ namespace impala {
// controller running locally or to one running remotely.
class AdmissionControlClient {
public:
+ static const std::string QUERY_EVENT_SUBMIT_FOR_ADMISSION;
+ static const std::string QUERY_EVENT_QUEUED;
+ static const std::string QUERY_EVENT_COMPLETED_ADMISSION;
+
// Creates a new AdmissionControlClient and returns it in 'client'.
static void Create(
- const TUniqueId& query_id, std::unique_ptr<AdmissionControlClient>*
client);
+ const TQueryCtx& query_ctx, std::unique_ptr<AdmissionControlClient>*
client);
virtual ~AdmissionControlClient() {}
// Called to schedule and admit the query. Blocks until an admission
decision is made.
virtual Status SubmitForAdmission(const
AdmissionController::AdmissionRequest& request,
+ RuntimeProfile::EventSequence* query_events,
std::unique_ptr<QuerySchedulePB>* schedule_result) = 0;
// Called when the query has completed to release all of its resources.
diff --git a/be/src/scheduling/admission-control-service.cc
b/be/src/scheduling/admission-control-service.cc
new file mode 100644
index 0000000..5569cc3
--- /dev/null
+++ b/be/src/scheduling/admission-control-service.cc
@@ -0,0 +1,300 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/admission-control-service.h"
+
+#include "common/constant-strings.h"
+#include "gen-cpp/admission_control_service.pb.h"
+#include "gutil/strings/substitute.h"
+#include "kudu/rpc/rpc_context.h"
+#include "rpc/rpc-mgr.h"
+#include "rpc/rpc-mgr.inline.h"
+#include "rpc/sidecar-util.h"
+#include "rpc/thrift-util.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "scheduling/admission-controller.h"
+#include "util/cpu-info.h"
+#include "util/kudu-status-util.h"
+#include "util/memory-metrics.h"
+#include "util/parse-util.h"
+#include "util/promise.h"
+
+#include "common/names.h"
+
+using kudu::rpc::RpcContext;
+
+static const string QUEUE_LIMIT_MSG = "(Advanced) Limit on RPC payloads
consumption for "
+ "AdmissionControlService. "
+ + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
+DEFINE_string(admission_control_service_queue_mem_limit, "50MB",
QUEUE_LIMIT_MSG.c_str());
+DEFINE_int32(admission_control_service_num_svc_threads, 0,
+ "Number of threads for processing admission control service's RPCs. if
left at "
+ "default value 0, it will be set to number of CPU cores. Set it to a
positive value "
+ "to change from the default.");
+DEFINE_int32(admission_thread_pool_size, 5,
+ "(Advanced) Size of the thread-pool processing AdmitQuery requests.");
+DEFINE_int32(max_admission_queue_size, 50,
+ "(Advanced) Max size of the queue for the AdmitQuery thread pool.");
+
+DEFINE_string(admission_control_service_addr, "",
+ "(Experimental) If provided, queries submitted to this impalad will be
scheduled and "
+ "admitted by contacting the admission control service at the specified
address. This "
+ "flag will be removed in a future version, see IMPALA-9155.");
+DEFINE_int32(admission_status_wait_time_ms, 100,
+ "(Advanced) The number of milliseconds the GetQueryStatus() rpc in the
admission "
+ "control service will wait for admission to complete before returning.");
+
+namespace impala {
+
+#define RESPOND_IF_ERROR(stmt) \
+ do { \
+ const Status& _status = (stmt); \
+ if (UNLIKELY(!_status.ok())) { \
+ RespondAndReleaseRpc(_status, resp, rpc_context); \
+ return; \
+ } \
+ } while (false)
+
+AdmissionControlService::AdmissionControlService(MetricGroup* metric_group)
+ :
AdmissionControlServiceIf(ExecEnv::GetInstance()->rpc_mgr()->metric_entity(),
+ ExecEnv::GetInstance()->rpc_mgr()->result_tracker()) {
+ MemTracker* process_mem_tracker =
ExecEnv::GetInstance()->process_mem_tracker();
+ bool is_percent; // not used
+ int64_t bytes_limit =
+ ParseUtil::ParseMemSpec(FLAGS_admission_control_service_queue_mem_limit,
+ &is_percent, process_mem_tracker->limit());
+ if (bytes_limit <= 0) {
+ CLEAN_EXIT_WITH_ERROR(
+ Substitute("Invalid mem limit for admission control service queue: "
+ "'$0'.",
+ FLAGS_admission_control_service_queue_mem_limit));
+ }
+ mem_tracker_.reset(new MemTracker(
+ bytes_limit, "Admission Control Service Queue", process_mem_tracker));
+ MemTrackerMetric::CreateMetrics(
+ metric_group, mem_tracker_.get(), "AdmissionControlService");
+}
+
+Status AdmissionControlService::Init() {
+ int num_svc_threads = FLAGS_admission_control_service_num_svc_threads > 0 ?
+ FLAGS_admission_control_service_num_svc_threads :
+ CpuInfo::num_cores();
+ // The maximum queue length is set to maximum 32-bit value. Its actual
capacity is
+ // bound by memory consumption against 'mem_tracker_'.
+ RETURN_IF_ERROR(ExecEnv::GetInstance()->rpc_mgr()->RegisterService(
+ num_svc_threads, std::numeric_limits<int32_t>::max(), this,
mem_tracker_.get()));
+
+ admission_thread_pool_.reset(
+ new ThreadPool<UniqueIdPB>("admission-control-service",
"admission-worker",
+ FLAGS_admission_thread_pool_size, FLAGS_max_admission_queue_size,
+ bind<void>(&AdmissionControlService::AdmitFromThreadPool, this,
_2)));
+ ABORT_IF_ERROR(admission_thread_pool_->Init());
+
+ return Status::OK();
+}
+
+Status AdmissionControlService::GetProxy(const TNetworkAddress& address,
+ const string& hostname, unique_ptr<AdmissionControlServiceProxy>* proxy) {
+ // Create a AdmissionControlService proxy to the destination.
+ RETURN_IF_ERROR(ExecEnv::GetInstance()->rpc_mgr()->GetProxy(address,
hostname, proxy));
+ return Status::OK();
+}
+
+void AdmissionControlService::AdmitQuery(
+ const AdmitQueryRequestPB* req, AdmitQueryResponsePB* resp, RpcContext*
rpc_context) {
+ VLOG(1) << "AdmitQuery: query_id=" << req->query_id()
+ << " coordinator=" << req->coord_id();
+
+ shared_ptr<AdmissionState> admission_state;
+ admission_state = make_shared<AdmissionState>(req->query_id(),
req->coord_id());
+
+ admission_state->summary_profile =
+ RuntimeProfile::Create(&admission_state->profile_pool, "Summary");
+
+ RESPOND_IF_ERROR(GetSidecar(req->query_exec_request_sidecar_idx(),
rpc_context,
+ &admission_state->query_exec_request));
+
+ for (const NetworkAddressPB& address :
req->blacklisted_executor_addresses()) {
+ admission_state->blacklisted_executor_addresses.emplace(address);
+ }
+
+ RESPOND_IF_ERROR(admission_state_map_.Add(req->query_id(), admission_state));
+ admission_thread_pool_->Offer(req->query_id());
+ RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
+}
+
+void AdmissionControlService::GetQueryStatus(const GetQueryStatusRequestPB*
req,
+ GetQueryStatusResponsePB* resp, kudu::rpc::RpcContext* rpc_context) {
+ VLOG(2) << "GetQueryStatus " << req->query_id();
+
+ shared_ptr<AdmissionState> admission_state;
+ RESPOND_IF_ERROR(admission_state_map_.Get(req->query_id(),
&admission_state));
+
+ Status status = Status::OK();
+ {
+ lock_guard<mutex> l(admission_state->lock);
+ if (admission_state->submitted) {
+ if (!admission_state->admission_done) {
+ bool timed_out;
+ admission_state->admit_status =
+
ExecEnv::GetInstance()->admission_controller()->WaitOnQueued(req->query_id(),
+ &admission_state->schedule,
FLAGS_admission_status_wait_time_ms,
+ &timed_out);
+ if (!timed_out) {
+ admission_state->admission_done = true;
+ if (admission_state->admit_status.ok()) {
+ for (const auto& entry :
admission_state->schedule->backend_exec_params()) {
+ admission_state->unreleased_backends.emplace(entry.address());
+ }
+ }
+ } else {
+ DCHECK(admission_state->admit_status.ok());
+ }
+ }
+
+ if (admission_state->admission_done) {
+ if (admission_state->admit_status.ok()) {
+ *resp->mutable_query_schedule() = *admission_state->schedule.get();
+ } else {
+ status = admission_state->admit_status;
+ }
+ }
+
+ // Always send the profile even if admission isn't done yet.
+ TRuntimeProfileTree tree;
+ admission_state->summary_profile->ToThrift(&tree);
+ int sidecar_idx;
+ Status sidecar_status = SetFaststringSidecar(tree, rpc_context,
&sidecar_idx);
+ if (!sidecar_status.ok()) {
+ // We don't need to fail the query just because we can't return the
profile, so
+ // just log the error.
+ LOG(WARNING) << "Failed to set profile sidecar in GetQueryStatus: "
+ << sidecar_status;
+ } else {
+ resp->set_summary_profile_sidecar_idx(sidecar_idx);
+ }
+ }
+ }
+
+ RespondAndReleaseRpc(status, resp, rpc_context);
+}
+
+void AdmissionControlService::ReleaseQuery(const ReleaseQueryRequestPB* req,
+ ReleaseQueryResponsePB* resp, RpcContext* rpc_context) {
+ VLOG(1) << "ReleaseQuery: query_id=" << req->query_id();
+ shared_ptr<AdmissionState> admission_state;
+ RESPOND_IF_ERROR(admission_state_map_.Get(req->query_id(),
&admission_state));
+
+ {
+ lock_guard<mutex> l(admission_state->lock);
+ if (!admission_state->released) {
+ ExecEnv::GetInstance()->admission_controller()->ReleaseQuery(
+ req->query_id(), req->peak_mem_consumption());
+ admission_state->released = true;
+ } else {
+ LOG(WARNING) << "Query " << req->query_id() << " was already released.";
+ }
+ }
+
+ RESPOND_IF_ERROR(admission_state_map_.Delete(req->query_id()));
+ RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
+}
+
+void AdmissionControlService::ReleaseQueryBackends(
+ const ReleaseQueryBackendsRequestPB* req, ReleaseQueryBackendsResponsePB*
resp,
+ RpcContext* rpc_context) {
+ VLOG(2) << "ReleaseQueryBackends: query_id=" << req->query_id();
+ shared_ptr<AdmissionState> admission_state;
+ RESPOND_IF_ERROR(admission_state_map_.Get(req->query_id(),
&admission_state));
+
+ {
+ lock_guard<mutex> l(admission_state->lock);
+ vector<NetworkAddressPB> host_addrs;
+ for (const NetworkAddressPB& host_addr : req->host_addr()) {
+ auto it = admission_state->unreleased_backends.find(host_addr);
+ if (it == admission_state->unreleased_backends.end()) {
+ string err = Substitute("Backend $0 was already released for $1",
+ NetworkAddressPBToString(host_addr), PrintId(req->query_id()));
+ LOG(WARNING) << err;
+ RespondAndReleaseRpc(Status(err), resp, rpc_context);
+ return;
+ }
+ host_addrs.push_back(host_addr);
+ admission_state->unreleased_backends.erase(it);
+ }
+
+ ExecEnv::GetInstance()->admission_controller()->ReleaseQueryBackends(
+ req->query_id(), host_addrs);
+ }
+
+ RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
+}
+
+void AdmissionControlService::CancelAdmission(const CancelAdmissionRequestPB*
req,
+ CancelAdmissionResponsePB* resp, kudu::rpc::RpcContext* rpc_context) {
+ VLOG(1) << "CancelAdmission: query_id=" << req->query_id();
+ shared_ptr<AdmissionState> admission_state;
+ RESPOND_IF_ERROR(admission_state_map_.Get(req->query_id(),
&admission_state));
+ admission_state->admit_outcome.Set(AdmissionOutcome::CANCELLED);
+ RespondAndReleaseRpc(Status::OK(), resp, rpc_context);
+}
+
+void AdmissionControlService::AdmitFromThreadPool(UniqueIdPB query_id) {
+ shared_ptr<AdmissionState> admission_state;
+ Status s = admission_state_map_.Get(query_id, &admission_state);
+ if (!s.ok()) {
+ LOG(ERROR) << s;
+ return;
+ }
+
+ {
+ lock_guard<mutex> l(admission_state->lock);
+ bool queued;
+ AdmissionController::AdmissionRequest request = {admission_state->query_id,
+ admission_state->coord_id, admission_state->query_exec_request,
+
admission_state->query_exec_request.query_ctx.client_request.query_options,
+ admission_state->summary_profile,
+ admission_state->blacklisted_executor_addresses};
+ admission_state->admit_status =
+
ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(request,
+ &admission_state->admit_outcome, &admission_state->schedule,
&queued,
+ &admission_state->request_pool);
+ admission_state->submitted = true;
+ if (!queued) {
+ admission_state->admission_done = true;
+ if (admission_state->admit_status.ok()) {
+ for (const auto& entry :
admission_state->schedule->backend_exec_params()) {
+ admission_state->unreleased_backends.emplace(entry.address());
+ }
+ }
+ } else {
+ DCHECK(admission_state->admit_status.ok());
+ }
+ }
+}
+
+template <typename ResponsePBType>
+void AdmissionControlService::RespondAndReleaseRpc(
+ const Status& status, ResponsePBType* response, RpcContext* rpc_context) {
+ status.ToProto(response->mutable_status());
+ // Release the memory against the control service's memory tracker.
+ mem_tracker_->Release(rpc_context->GetTransferSize());
+ rpc_context->RespondSuccess();
+}
+
+} // namespace impala
diff --git a/be/src/scheduling/admission-control-service.h
b/be/src/scheduling/admission-control-service.h
new file mode 100644
index 0000000..96ca8fa
--- /dev/null
+++ b/be/src/scheduling/admission-control-service.h
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/object-pool.h"
+#include "common/status.h"
+#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/admission_control_service.proxy.h"
+#include "gen-cpp/admission_control_service.service.h"
+#include "scheduling/admission-controller.h"
+#include "util/sharded-query-map-util.h"
+#include "util/thread-pool.h"
+#include "util/unique-id-hash.h"
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
+namespace impala {
+
+class MemTracker;
+class MetricGroup;
+class QuerySchedulePB;
+
+/// Singleton class that exports the RPC service used for submitting queries
remotely for
+/// admission.
+class AdmissionControlService : public AdmissionControlServiceIf,
+ public CacheLineAligned {
+ public:
+ AdmissionControlService(MetricGroup* metric_group);
+
+ /// Initializes the service by registering it with the singleton RPC manager.
+ /// This mustn't be called until RPC manager has been initialized.
+ Status Init();
+
+ virtual void AdmitQuery(const AdmitQueryRequestPB* req,
AdmitQueryResponsePB* resp,
+ kudu::rpc::RpcContext* context) override;
+ virtual void GetQueryStatus(const GetQueryStatusRequestPB* req,
+ GetQueryStatusResponsePB* resp, kudu::rpc::RpcContext* context) override;
+ virtual void ReleaseQuery(const ReleaseQueryRequestPB* req,
+ ReleaseQueryResponsePB* resp, kudu::rpc::RpcContext* context) override;
+ virtual void ReleaseQueryBackends(const ReleaseQueryBackendsRequestPB* req,
+ ReleaseQueryBackendsResponsePB* resp, kudu::rpc::RpcContext* context)
override;
+ virtual void CancelAdmission(const CancelAdmissionRequestPB* req,
+ CancelAdmissionResponsePB* resp, kudu::rpc::RpcContext* context)
override;
+
+ /// Gets a AdmissionControlService proxy to a server with 'address' and
'hostname'.
+ /// The newly created proxy is returned in 'proxy'. Returns error status on
failure.
+ static Status GetProxy(const TNetworkAddress& address, const std::string&
hostname,
+ std::unique_ptr<AdmissionControlServiceProxy>* proxy);
+
+ private:
+ friend class ImpalaHttpHandler;
+
+ struct AdmissionState {
+ public:
+ AdmissionState(const UniqueIdPB& query_id, const UniqueIdPB& coord_id)
+ : query_id(query_id), coord_id(coord_id) {}
+
+ // The following are copied from the AdmitQueryRequestPB for this query
and are valid
+ // at any point after this AdmissionState has been added to
'admission_state_map_'.
+ UniqueIdPB query_id;
+ UniqueIdPB coord_id;
+ TQueryExecRequest query_exec_request;
+ std::unordered_set<NetworkAddressPB> blacklisted_executor_addresses;
+
+ // Protects all of the following members.
+ std::mutex lock;
+
+ // True if SubmitForAdmission has been called for this query.
+ bool submitted = false;
+
+ // True if a final admission decision has been made for this query.
+ bool admission_done = false;
+
+ // If 'admission_done' is true, then this represents the final admission
outcome, i.e.
+ // an error indicates the query being rejected for admission.
+ Status admit_status;
+
+ // Used to indicate cancellation of admission to AdmissionController.
+ Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER> admit_outcome;
+
+ // If admission was successful, contains the results of admission.
+ std::unique_ptr<QuerySchedulePB> schedule;
+
+ // List of backends that have not been released yet.
+ std::unordered_set<NetworkAddressPB> unreleased_backends;
+
+ // True if ReleaseQuery() has been called for this query.
+ bool released = false;
+
+ // Runtime profile used to record admission related info. Passed into
+ // AdmissionController, which updates it.
+ ObjectPool profile_pool;
+ RuntimeProfile* summary_profile;
+
+ // The name of the request pool for this query. Valid if 'submitted' is
true.
+ std::string request_pool = "";
+ };
+
+ /// Tracks the memory usage of payload in the service queue.
+ std::unique_ptr<MemTracker> mem_tracker_;
+
+ /// Used to perform the actual work of scheduling and admitting queries, so
that
+ /// AdmitQuery() can return immediately.
+ std::unique_ptr<ThreadPool<UniqueIdPB>> admission_thread_pool_;
+
+ /// Thread-safe map from query ids to info about the query.
+ ShardedQueryPBMap<std::shared_ptr<AdmissionState>> admission_state_map_;
+
+ /// Callback for 'admission_thread_pool_'.
+ void AdmitFromThreadPool(UniqueIdPB query_id);
+
+ /// Helper for serializing 'status' as part of 'response'. Also releases
memory
+ /// of the RPC payload previously accounted towards the internal memory
tracker.
+ template <typename ResponsePBType>
+ void RespondAndReleaseRpc(
+ const Status& status, ResponsePBType* response, kudu::rpc::RpcContext*
rpc_context);
+};
+
+} // namespace impala
diff --git a/be/src/scheduling/admission-controller-test.cc
b/be/src/scheduling/admission-controller-test.cc
index c929c14..50547e1 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -101,7 +101,7 @@ class AdmissionControllerTest : public testing::Test {
TQueryOptions* query_options = pool_.Add(new TQueryOptions());
query_options->__set_mem_limit(mem_limit);
ScheduleState* schedule_state =
- pool_.Add(new ScheduleState(*query_id, *request, *query_options,
profile));
+ pool_.Add(new ScheduleState(*query_id, *request, *query_options,
profile, true));
schedule_state->set_executor_group(executor_group);
SetHostsInScheduleState(*schedule_state, num_hosts, is_dedicated_coord);
schedule_state->UpdateMemoryRequirements(config);
diff --git a/be/src/scheduling/admission-controller.cc
b/be/src/scheduling/admission-controller.cc
index fc635ee..9bba393 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -36,6 +36,7 @@
#include "util/metrics.h"
#include "util/pretty-printer.h"
#include "util/runtime-profile-counters.h"
+#include "util/scope-exit-trigger.h"
#include "util/thread.h"
#include "util/time.h"
#include "util/uid-util.h"
@@ -134,11 +135,6 @@ const string POOL_MIN_QUERY_MEM_LIMIT_METRIC_KEY_FORMAT =
const string POOL_CLAMP_MEM_LIMIT_QUERY_OPTION_METRIC_KEY_FORMAT =
"admission-controller.pool-clamp-mem-limit-query-option.$0";
-// Profile query events
-const string QUERY_EVENT_SUBMIT_FOR_ADMISSION = "Submit for admission";
-const string QUERY_EVENT_QUEUED = "Queued";
-const string QUERY_EVENT_COMPLETED_ADMISSION = "Completed admission";
-
// Profile info strings
const string AdmissionController::PROFILE_INFO_KEY_ADMISSION_RESULT =
"Admission result";
const string AdmissionController::PROFILE_INFO_VAL_ADMIT_IMMEDIATELY =
@@ -1128,7 +1124,9 @@ void
AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& pool
Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
- unique_ptr<QuerySchedulePB>* schedule_result) {
+ unique_ptr<QuerySchedulePB>* schedule_result, bool* queued,
+ std::string* request_pool) {
+ *queued = false;
DebugActionNoFail(request.query_options, "AC_BEFORE_ADMISSION");
DCHECK(schedule_result->get() == nullptr);
@@ -1140,34 +1138,42 @@ Status AdmissionController::SubmitForAdmission(const
AdmissionRequest& request,
request.summary_profile->AddInfoString("Blacklisted Executors",
blacklist_str);
}
- // Note the queue_node will not exist in the queue when this method returns.
- QueueNode queue_node(request, admit_outcome, request.summary_profile);
+ QueueNode* queue_node;
+ {
+ lock_guard<mutex> lock(queue_nodes_lock_);
+ auto it = queue_nodes_.emplace(std::piecewise_construct,
+ std::forward_as_tuple(request.query_id),
+ std::forward_as_tuple(request, admit_outcome,
request.summary_profile));
+ if (!it.second) {
+ // The query_id already existed in queue_nodes_.
+ return Status("Cannot submit the same query for admission multiple
times.");
+ }
+ queue_node = &it.first->second;
+ }
+
+ const auto queue_node_deleter = MakeScopeExitTrigger([&]() {
+ if (!queued) queue_nodes_.erase(request.query_id);
+ });
// Re-resolve the pool name to propagate any resolution errors now that this
request is
// known to require a valid pool. All executor groups / schedules will use
the same pool
// name.
- string pool_name;
- TPoolConfig pool_cfg;
- RETURN_IF_ERROR(
- ResolvePoolAndGetConfig(request.request.query_ctx, &pool_name,
&pool_cfg));
- request.summary_profile->AddInfoString("Request Pool", pool_name);
-
- // We track this outside of the queue node so that it is still available
after the query
- // has been dequeued.
- string initial_queue_reason;
- ScopedEvent completedEvent(request.query_events,
QUERY_EVENT_COMPLETED_ADMISSION);
+ RETURN_IF_ERROR(ResolvePoolAndGetConfig(
+ request.request.query_ctx, &queue_node->pool_name,
&queue_node->pool_cfg));
+ request.summary_profile->AddInfoString("Request Pool",
queue_node->pool_name);
+
{
// Take lock to ensure the Dequeue thread does not modify the request
queue.
lock_guard<mutex> lock(admission_ctrl_lock_);
- request.query_events->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
- pool_config_map_[pool_name] = pool_cfg;
- PoolStats* stats = GetPoolStats(pool_name);
- stats->UpdateConfigMetrics(pool_cfg);
+ pool_config_map_[queue_node->pool_name] = queue_node->pool_cfg;
+ PoolStats* stats = GetPoolStats(queue_node->pool_name);
+ stats->UpdateConfigMetrics(queue_node->pool_cfg);
bool unused_bool;
- bool must_reject = !FindGroupToAdmitOrReject(membership_snapshot, pool_cfg,
- /* admit_from_queue=*/false, stats, &queue_node, unused_bool);
+ bool must_reject =
+ !FindGroupToAdmitOrReject(membership_snapshot, queue_node->pool_cfg,
+ /* admit_from_queue=*/false, stats, queue_node, unused_bool);
if (must_reject) {
AdmissionOutcome outcome =
admit_outcome->Set(AdmissionOutcome::REJECTED);
if (outcome != AdmissionOutcome::REJECTED) {
@@ -1179,15 +1185,15 @@ Status AdmissionController::SubmitForAdmission(const
AdmissionRequest& request,
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
stats->metrics()->total_rejected->Increment(1);
- const ErrorMsg& rejected_msg = ErrorMsg(
- TErrorCode::ADMISSION_REJECTED, pool_name,
queue_node.not_admitted_reason);
+ const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED,
+ queue_node->pool_name, queue_node->not_admitted_reason);
VLOG_QUERY << rejected_msg.msg();
return Status::Expected(rejected_msg);
}
- if (queue_node.admitted_schedule.get() != nullptr) {
- DCHECK(queue_node.admitted_schedule->query_schedule_pb().get() !=
nullptr);
- const string& group_name =
queue_node.admitted_schedule->executor_group();
+ if (queue_node->admitted_schedule.get() != nullptr) {
+ DCHECK(queue_node->admitted_schedule->query_schedule_pb().get() !=
nullptr);
+ const string& group_name =
queue_node->admitted_schedule->executor_group();
VLOG(3) << "Can admit to group " << group_name << " (or cancelled)";
DCHECK_EQ(stats->local_stats().num_queued, 0);
AdmissionOutcome outcome =
admit_outcome->Set(AdmissionOutcome::ADMITTED);
@@ -1198,24 +1204,24 @@ Status AdmissionController::SubmitForAdmission(const
AdmissionRequest& request,
return Status::CANCELLED;
}
VLOG_QUERY << "Admitting query id=" << PrintId(request.query_id);
- AdmitQuery(queue_node.admitted_schedule.get(), false);
+ AdmitQuery(queue_node->admitted_schedule.get(), false);
stats->UpdateWaitTime(0);
VLOG_RPC << "Final: " << stats->DebugString();
- *schedule_result =
move(queue_node.admitted_schedule->query_schedule_pb());
+ *schedule_result =
move(queue_node->admitted_schedule->query_schedule_pb());
+ if (request_pool != nullptr) *request_pool = queue_node->pool_name;
return Status::OK();
}
// We cannot immediately admit but do not need to reject, so queue the
request
- RequestQueue* queue = &request_queue_map_[pool_name];
+ RequestQueue* queue = &request_queue_map_[queue_node->pool_name];
VLOG_QUERY << "Queuing, query id=" << PrintId(request.query_id)
- << " reason: " << queue_node.not_admitted_reason;
- if (queue_node.not_admitted_details.size() > 0) {
- VLOG_RPC << "Top mem consuming queries: "
- << queue_node.not_admitted_details;
+ << " reason: " << queue_node->not_admitted_reason;
+ if (queue_node->not_admitted_details.size() > 0) {
+ VLOG_RPC << "Top mem consuming queries: " <<
queue_node->not_admitted_details;
}
- initial_queue_reason = queue_node.not_admitted_reason;
+ queue_node->initial_queue_reason = queue_node->not_admitted_reason;
stats->Queue();
- queue->Enqueue(&queue_node);
+ queue->Enqueue(queue_node);
}
// Update the profile info before waiting. These properties will be updated
with
@@ -1223,69 +1229,104 @@ Status AdmissionController::SubmitForAdmission(const
AdmissionRequest& request,
request.summary_profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_QUEUED);
request.summary_profile->AddInfoString(
- PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, initial_queue_reason);
+ PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, queue_node->initial_queue_reason);
request.summary_profile->AddInfoString(
- PROFILE_INFO_KEY_LAST_QUEUED_REASON, queue_node.not_admitted_reason);
- request.query_events->MarkEvent(QUERY_EVENT_QUEUED);
+ PROFILE_INFO_KEY_LAST_QUEUED_REASON, queue_node->not_admitted_reason);
+
+ queue_node->wait_start_ms = MonotonicMillis();
+ *queued = true;
+ return Status::OK();
+}
- int64_t queue_wait_timeout_ms = GetQueueTimeoutForPoolMs(pool_cfg);
- int64_t wait_start_ms = MonotonicMillis();
+Status AdmissionController::WaitOnQueued(const UniqueIdPB& query_id,
+ unique_ptr<QuerySchedulePB>* schedule_result, int64_t timeout_ms,
+ bool* wait_timed_out) {
+ if (wait_timed_out != nullptr) *wait_timed_out = false;
+
+ QueueNode* queue_node;
+ {
+ lock_guard<mutex> lock(queue_nodes_lock_);
+ auto it = queue_nodes_.find(query_id);
+ if (it == queue_nodes_.end()) {
+ return Status(
+ Substitute("WaitOnQueued failed: unknown query_id=$0",
PrintId(query_id)));
+ }
+ queue_node = &it->second;
+ }
+
+ int64_t queue_wait_timeout_ms =
GetQueueTimeoutForPoolMs(queue_node->pool_cfg);
// Block in Get() up to the time out, waiting for the promise to be set when
the query
// is admitted or cancelled.
- bool timed_out;
- admit_outcome->Get(queue_wait_timeout_ms, &timed_out);
- int64_t wait_time_ms = MonotonicMillis() - wait_start_ms;
- request.summary_profile->AddInfoString(PROFILE_INFO_KEY_INITIAL_QUEUE_REASON,
- Substitute(
- PROFILE_INFO_VAL_INITIAL_QUEUE_REASON, wait_time_ms,
initial_queue_reason));
+ bool get_timed_out = false;
+ queue_node->admit_outcome->Get(
+ (timeout_ms > 0 ? min(queue_wait_timeout_ms, timeout_ms) :
queue_wait_timeout_ms),
+ &get_timed_out);
+ int64_t wait_time_ms = MonotonicMillis() - queue_node->wait_start_ms;
+
+ queue_node->profile->AddInfoString(PROFILE_INFO_KEY_INITIAL_QUEUE_REASON,
+ Substitute(PROFILE_INFO_VAL_INITIAL_QUEUE_REASON, wait_time_ms,
+ queue_node->initial_queue_reason));
+
+ if (get_timed_out && wait_time_ms < queue_wait_timeout_ms) {
+ if (wait_timed_out != nullptr) *wait_timed_out = true;
+ // No admission decision has been made yet, so just return.
+ return Status::OK();
+ }
+
+ const auto queue_node_deleter =
+ MakeScopeExitTrigger([&]() { queue_nodes_.erase(query_id); });
// Disallow the FAIL action here. It would leave the queue in an
inconsistent state.
- DebugActionNoFail(request.query_options, "AC_AFTER_ADMISSION_OUTCOME");
+ DebugActionNoFail(
+ queue_node->admission_request.query_options,
"AC_AFTER_ADMISSION_OUTCOME");
{
lock_guard<mutex> lock(admission_ctrl_lock_);
// If the query has not been admitted or cancelled up till now, it will be
considered
// to be timed out.
- AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::TIMED_OUT);
- RequestQueue* queue = &request_queue_map_[pool_name];
- pools_for_updates_.insert(pool_name);
- PoolStats* pool_stats = GetPoolStats(pool_name);
+ AdmissionOutcome outcome =
+ queue_node->admit_outcome->Set(AdmissionOutcome::TIMED_OUT);
+ RequestQueue* queue = &request_queue_map_[queue_node->pool_name];
+ pools_for_updates_.insert(queue_node->pool_name);
+ PoolStats* pool_stats = GetPoolStats(queue_node->pool_name);
pool_stats->UpdateWaitTime(wait_time_ms);
if (outcome == AdmissionOutcome::REJECTED) {
- if (queue->Remove(&queue_node)) pool_stats->Dequeue(true);
- request.summary_profile->AddInfoString(
+ if (queue->Remove(queue_node)) pool_stats->Dequeue(true);
+ queue_node->profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
- const ErrorMsg& rejected_msg = ErrorMsg(
- TErrorCode::ADMISSION_REJECTED, pool_name,
queue_node.not_admitted_reason);
+ const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED,
+ queue_node->pool_name, queue_node->not_admitted_reason);
VLOG_QUERY << rejected_msg.msg();
return Status::Expected(rejected_msg);
} else if (outcome == AdmissionOutcome::TIMED_OUT) {
- bool removed = queue->Remove(&queue_node);
+ bool removed = queue->Remove(queue_node);
DCHECK(removed);
pool_stats->Dequeue(true);
- request.summary_profile->AddInfoString(
+ queue_node->profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_TIME_OUT);
- const ErrorMsg& rejected_msg =
- ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT, queue_wait_timeout_ms,
pool_name,
- queue_node.not_admitted_reason, queue_node.not_admitted_details);
+ const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT,
+ queue_wait_timeout_ms, queue_node->pool_name,
queue_node->not_admitted_reason,
+ queue_node->not_admitted_details);
VLOG_QUERY << rejected_msg.msg();
return Status::Expected(rejected_msg);
} else if (outcome == AdmissionOutcome::CANCELLED) {
- if (queue->Remove(&queue_node)) pool_stats->Dequeue(false);
- request.summary_profile->AddInfoString(
+ if (queue->Remove(queue_node)) {
+ pool_stats->Dequeue(false);
+ }
+ queue_node->profile->AddInfoString(
PROFILE_INFO_KEY_ADMISSION_RESULT,
PROFILE_INFO_VAL_CANCELLED_IN_QUEUE);
VLOG_QUERY << PROFILE_INFO_VAL_CANCELLED_IN_QUEUE
- << ", query id=" << PrintId(request.query_id);
+ << ", query id=" << PrintId(query_id);
return Status::CANCELLED;
}
// The dequeue thread updates the stats (to avoid a race condition) so we
do
// not change them here.
DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED);
- DCHECK(queue_node.admitted_schedule.get() != nullptr);
- *schedule_result = move(queue_node.admitted_schedule->query_schedule_pb());
- DCHECK(!queue->Contains(&queue_node));
- VLOG_QUERY << "Admitted queued query id=" << PrintId(request.query_id);
+ DCHECK(queue_node->admitted_schedule.get() != nullptr);
+ *schedule_result =
move(queue_node->admitted_schedule->query_schedule_pb());
+ DCHECK(!queue->Contains(queue_node));
+ VLOG_QUERY << "Admitted queued query id=" << PrintId(query_id);
VLOG_RPC << "Final: " << pool_stats->DebugString();
return Status::OK();
}
@@ -1583,9 +1624,8 @@ Status AdmissionController::ComputeGroupScheduleStates(
executor_group = temp_executor_group.get();
}
- unique_ptr<ScheduleState> group_state =
- make_unique<ScheduleState>(request.query_id, request.request,
- request.query_options, request.summary_profile,
request.query_events);
+ unique_ptr<ScheduleState> group_state =
make_unique<ScheduleState>(request.query_id,
+ request.request, request.query_options, request.summary_profile,
false);
const string& group_name = executor_group->name();
VLOG(3) << "Scheduling for executor group: " << group_name << " with "
<< executor_group->NumExecutors() << " executors";
diff --git a/be/src/scheduling/admission-controller.h
b/be/src/scheduling/admission-controller.h
index 459b41b..6381b1d 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -314,22 +314,19 @@ class AdmissionController {
/// This struct contains all information needed to create a schedule and try
to
/// admit it. None of the members are owned by the instances of this class
(usually they
- /// are owned by the ClientRequestState).
+ /// are owned by the ClientRequestState or AdmissionControlService).
struct AdmissionRequest {
const UniqueIdPB& query_id;
const UniqueIdPB& coord_id;
const TQueryExecRequest& request;
const TQueryOptions& query_options;
RuntimeProfile* summary_profile;
- RuntimeProfile::EventSequence* query_events;
std::unordered_set<NetworkAddressPB>& blacklisted_executor_addresses;
};
- /// Submits the request for admission. May returns immediately if rejected,
but
- /// otherwise blocks until the request is either admitted, times out, gets
rejected
- /// later, or cancelled by the client (by setting 'admit_outcome' to
CANCELLED). When
- /// this method returns, the following <admit_outcome, Return Status> pairs
are
- /// possible:
+ /// Submits the request for admission. If the query is queued, 'queued' will
be true
+ /// and WaitOnQueued() must be called to block until a decision is made.
Otherwise, when
+ /// this method returns, the following <admit_outcome, Status> pairs are
possible:
/// - Admitted: <ADMITTED, Status::OK>
/// - Rejected or timed out: <REJECTED or TIMED_OUT, Status(msg: reason for
the same)>
/// - Cancelled: <CANCELLED, Status::CANCELLED>
@@ -337,7 +334,15 @@ class AdmissionController {
/// cancelled to ensure that the pool statistics are updated.
Status SubmitForAdmission(const AdmissionRequest& request,
Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
- std::unique_ptr<QuerySchedulePB>* schedule_result);
+ std::unique_ptr<QuerySchedulePB>* schedule_result, bool* queued,
+ std::string* request_pool = nullptr);
+
+ /// After SubmitForAdmission(), if the query was queued this must be called.
If
+ /// 'timeout_ms' is 0, it will block until a decision is made. Otherwise, if
the
+ /// function returns due to the timeout 'wait_timed_out' will be true.
+ Status WaitOnQueued(const UniqueIdPB& query_id,
+ std::unique_ptr<QuerySchedulePB>* schedule_result, int64_t timeout_ms =
0,
+ bool* wait_timed_out = nullptr);
/// Updates the pool statistics when a query completes (either successfully,
/// is cancelled or failed). This should be called for all requests that have
@@ -711,6 +716,10 @@ class AdmissionController {
/// Profile to be updated with information about admission.
RuntimeProfile* profile;
+ /// Config of the pool this query will be scheduled on.
+ string pool_name;
+ TPoolConfig pool_cfg;
+
/// END: Members that are valid for new objects after initialization
/////////////////////////////////////////
@@ -726,6 +735,12 @@ class AdmissionController {
/// this queue node.
std::vector<GroupScheduleState> group_states;
+ /// Info about why this query was queued.
+ string initial_queue_reason;
+
+ /// The MonotonicMillis() time when the query was queued.
+ int64_t wait_start_ms;
+
/// END: Members that are only valid while queued, but invalid once
dequeued.
/////////////////////////////////////////
@@ -753,6 +768,12 @@ class AdmissionController {
/////////////////////////////////////////
};
+ /// Protects 'queue_nodes_'. Should not be held with 'admission_ctrl_lock_'.
+ std::mutex queue_nodes_lock_;
+
+ /// Map from query id to the info needed to make admission decisions for
queued queries.
+ std::unordered_map<UniqueIdPB, QueueNode> queue_nodes_;
+
/// Queue for the queries waiting to be admitted for execution. Once the
/// maximum number of concurrently executing queries has been reached,
/// incoming queries are queued and admitted first come, first served.
diff --git a/be/src/scheduling/local-admission-control-client.cc
b/be/src/scheduling/local-admission-control-client.cc
index 0f785da..da62ee1 100644
--- a/be/src/scheduling/local-admission-control-client.cc
+++ b/be/src/scheduling/local-admission-control-client.cc
@@ -18,6 +18,7 @@
#include "scheduling/local-admission-control-client.h"
#include "runtime/exec-env.h"
+#include "util/runtime-profile-counters.h"
#include "util/uid-util.h"
#include "common/names.h"
@@ -30,9 +31,21 @@
LocalAdmissionControlClient::LocalAdmissionControlClient(const TUniqueId& query_
Status LocalAdmissionControlClient::SubmitForAdmission(
const AdmissionController::AdmissionRequest& request,
+ RuntimeProfile::EventSequence* query_events,
std::unique_ptr<QuerySchedulePB>* schedule_result) {
- return ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
- request, &admit_outcome_, schedule_result);
+ ScopedEvent completedEvent(
+ query_events, AdmissionControlClient::QUERY_EVENT_COMPLETED_ADMISSION);
+ query_events->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
+ bool queued;
+ Status status =
ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
+ request, &admit_outcome_, schedule_result, &queued);
+ if (queued) {
+ query_events->MarkEvent(QUERY_EVENT_QUEUED);
+ DCHECK(status.ok());
+ status = ExecEnv::GetInstance()->admission_controller()->WaitOnQueued(
+ request.query_id, schedule_result);
+ }
+ return status;
}
void LocalAdmissionControlClient::ReleaseQuery(int64_t peak_mem_consumption) {
diff --git a/be/src/scheduling/local-admission-control-client.h
b/be/src/scheduling/local-admission-control-client.h
index 8f3da3c..f8b7a73 100644
--- a/be/src/scheduling/local-admission-control-client.h
+++ b/be/src/scheduling/local-admission-control-client.h
@@ -36,6 +36,7 @@ class LocalAdmissionControlClient : public
AdmissionControlClient {
LocalAdmissionControlClient(const TUniqueId& query_id);
virtual Status SubmitForAdmission(const
AdmissionController::AdmissionRequest& request,
+ RuntimeProfile::EventSequence* query_events,
std::unique_ptr<QuerySchedulePB>* schedule_result) override;
virtual void ReleaseQuery(int64_t peak_mem_consumption) override;
virtual void ReleaseQueryBackends(
diff --git a/be/src/scheduling/remote-admission-control-client.cc
b/be/src/scheduling/remote-admission-control-client.cc
new file mode 100644
index 0000000..c24d854
--- /dev/null
+++ b/be/src/scheduling/remote-admission-control-client.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scheduling/remote-admission-control-client.h"
+
+#include "gen-cpp/admission_control_service.pb.h"
+#include "gen-cpp/admission_control_service.proxy.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "rpc/rpc-mgr.inline.h"
+#include "rpc/sidecar-util.h"
+#include "runtime/exec-env.h"
+#include "scheduling/admission-control-service.h"
+#include "util/debug-util.h"
+#include "util/kudu-status-util.h"
+#include "util/runtime-profile-counters.h"
+#include "util/time.h"
+#include "util/uid-util.h"
+
+#include "common/names.h"
+
+DECLARE_string(admission_control_service_addr);
+
+DEFINE_int32(admission_status_retry_time_ms, 10,
+ "(Advanced) The number of milliseconds coordinators will wait before
retrying the "
+ "GetQueryStatus rpc.");
+
+using namespace strings;
+using namespace kudu::rpc;
+
+namespace impala {
+
+RemoteAdmissionControlClient::RemoteAdmissionControlClient(const TQueryCtx&
query_ctx)
+ : query_ctx_(query_ctx),
+ address_(MakeNetworkAddress(FLAGS_admission_control_service_addr)) {
+ TUniqueIdToUniqueIdPB(query_ctx.query_id, &query_id_);
+}
+
+Status RemoteAdmissionControlClient::SubmitForAdmission(
+ const AdmissionController::AdmissionRequest& request,
+ RuntimeProfile::EventSequence* query_events,
+ std::unique_ptr<QuerySchedulePB>* schedule_result) {
+ ScopedEvent completedEvent(
+ query_events, AdmissionControlClient::QUERY_EVENT_COMPLETED_ADMISSION);
+
+ std::unique_ptr<AdmissionControlServiceProxy> proxy;
+ RETURN_IF_ERROR(AdmissionControlService::GetProxy(address_,
address_.hostname, &proxy));
+ AdmitQueryRequestPB req;
+ AdmitQueryResponsePB resp;
+ RpcController rpc_controller;
+
+ *req.mutable_query_id() = request.query_id;
+ *req.mutable_coord_id() = ExecEnv::GetInstance()->backend_id();
+
+ KrpcSerializer serializer;
+ int sidecar_idx1;
+ RETURN_IF_ERROR(
+ serializer.SerializeToSidecar(&request.request, &rpc_controller,
&sidecar_idx1));
+ req.set_query_exec_request_sidecar_idx(sidecar_idx1);
+
+ for (const NetworkAddressPB& address :
request.blacklisted_executor_addresses) {
+ *req.add_blacklisted_executor_addresses() = address;
+ }
+
+ query_events->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
+ {
+ /// We hold 'lock_' for the duration of AdmitQuery to coordinate with
CancelAdmission
+ /// and avoid the scenario where the CancelAdmission rpc is sent first,
the admission
+ /// controller doesn't find the query because it wasn't submitted yet so
nothing is
+ /// cancelled, then the AdmitQuery rpc is sent and the query is scheduled
despite
+ /// already having been cancelled.
+ lock_guard<mutex> l(lock_);
+ if (cancelled_) {
+ return Status("Query already cancelled.");
+ }
+
+ KUDU_RETURN_IF_ERROR(
+ proxy->AdmitQuery(req, &resp, &rpc_controller), "AdmitQuery rpc
failed");
+ Status admit_status(resp.status());
+ RETURN_IF_ERROR(admit_status);
+
+ pending_admit_ = true;
+ }
+
+ Status admit_status = Status::OK();
+ while (true) {
+ RpcController rpc_controller2;
+ GetQueryStatusRequestPB get_status_req;
+ GetQueryStatusResponsePB get_status_resp;
+ *get_status_req.mutable_query_id() = request.query_id;
+ KUDU_RETURN_IF_ERROR(
+ proxy->GetQueryStatus(get_status_req, &get_status_resp,
&rpc_controller2),
+ "GetQueryStatus rpc failed");
+
+ if (get_status_resp.has_summary_profile_sidecar_idx()) {
+ TRuntimeProfileTree tree;
+ RETURN_IF_ERROR(GetSidecar(
+ get_status_resp.summary_profile_sidecar_idx(), &rpc_controller2,
&tree));
+ request.summary_profile->Update(tree);
+ }
+
+ if (get_status_resp.has_query_schedule()) {
+ schedule_result->reset(new QuerySchedulePB());
+ schedule_result->get()->Swap(get_status_resp.mutable_query_schedule());
+ break;
+ }
+ admit_status = Status(get_status_resp.status());
+ if (!admit_status.ok()) {
+ break;
+ }
+ query_events->MarkEvent(QUERY_EVENT_QUEUED);
+
+ SleepForMs(FLAGS_admission_status_retry_time_ms);
+ }
+
+ {
+ lock_guard<mutex> l(lock_);
+ pending_admit_ = false;
+ }
+
+ return admit_status;
+}
+
+void RemoteAdmissionControlClient::ReleaseQuery(int64_t peak_mem_consumption) {
+ std::unique_ptr<AdmissionControlServiceProxy> proxy;
+ Status get_proxy_status =
+ AdmissionControlService::GetProxy(address_, address_.hostname, &proxy);
+ if (!get_proxy_status.ok()) {
+ LOG(ERROR) << "ReleaseQuery for " << query_id_
+ << " failed to get proxy: " << get_proxy_status;
+ return;
+ }
+
+ ReleaseQueryRequestPB req;
+ ReleaseQueryResponsePB resp;
+ *req.mutable_query_id() = query_id_;
+ req.set_peak_mem_consumption(peak_mem_consumption);
+ Status rpc_status =
+ RpcMgr::DoRpcWithRetry(proxy,
&AdmissionControlServiceProxy::ReleaseQuery, req,
+ &resp, query_ctx_, "ReleaseQuery() RPC failed", RPC_NUM_RETRIES,
RPC_TIMEOUT_MS,
+ RPC_BACKOFF_TIME_MS, "REMOTE_AC_RELEASE_QUERY");
+
+ // Failure of this rpc is not considered a query failure, so we just log it.
+ // TODO: we need to be sure that the resources do in fact get cleaned up in
situation
+ // like these (IMPALA-9976).
+ if (!rpc_status.ok()) {
+ LOG(WARNING) << "ReleaseQuery rpc failed for " << query_id_ << ": " <<
rpc_status;
+ }
+ Status resp_status(resp.status());
+ if (!resp_status.ok()) {
+ LOG(WARNING) << "ReleaseQuery failed for " << query_id_ << ": " <<
resp_status;
+ }
+}
+
+void RemoteAdmissionControlClient::ReleaseQueryBackends(
+ const vector<NetworkAddressPB>& host_addrs) {
+ std::unique_ptr<AdmissionControlServiceProxy> proxy;
+ Status get_proxy_status =
+ AdmissionControlService::GetProxy(address_, address_.hostname, &proxy);
+ if (!get_proxy_status.ok()) {
+ LOG(ERROR) << "ReleaseQueryBackends for " << query_id_
+ << " failed to get proxy: " << get_proxy_status;
+ return;
+ }
+
+ ReleaseQueryBackendsRequestPB req;
+ ReleaseQueryBackendsResponsePB resp;
+ *req.mutable_query_id() = query_id_;
+ for (const NetworkAddressPB& addr : host_addrs) {
+ *req.add_host_addr() = addr;
+ }
+ Status rpc_status =
+ RpcMgr::DoRpcWithRetry(proxy,
&AdmissionControlServiceProxy::ReleaseQueryBackends,
+ req, &resp, query_ctx_, "ReleaseQueryBackends() RPC failed",
RPC_NUM_RETRIES,
+ RPC_TIMEOUT_MS, RPC_BACKOFF_TIME_MS, "REMOTE_AC_RELEASE_BACKENDS");
+
+ // Failure of this rpc is not considered a query failure, so we just log it.
+ // TODO: we need to be sure that the resources do in fact get cleaned up in
situation
+ // like these (IMPALA-9976).
+ if (!rpc_status.ok()) {
+ LOG(WARNING) << "ReleaseQueryBackends rpc failed for " << query_id_ << ": "
+ << rpc_status;
+ }
+ Status resp_status(resp.status());
+ if (!resp_status.ok()) {
+ LOG(WARNING) << "ReleaseQueryBackends failed for " << query_id_ << ": "
+ << resp_status;
+ }
+}
+
+void RemoteAdmissionControlClient::CancelAdmission() {
+ {
+ lock_guard<mutex> l(lock_);
+ cancelled_ = true;
+ if (!pending_admit_) {
+ // Nothing to cancel.
+ return;
+ }
+ }
+
+ std::unique_ptr<AdmissionControlServiceProxy> proxy;
+ Status get_proxy_status =
+ AdmissionControlService::GetProxy(address_, address_.hostname, &proxy);
+ if (!get_proxy_status.ok()) {
+ LOG(WARNING) << "CancelAdmission for " << query_id_
+ << " failed to get proxy: " << get_proxy_status;
+ }
+
+ CancelAdmissionRequestPB req;
+ CancelAdmissionResponsePB resp;
+ *req.mutable_query_id() = query_id_;
+ Status rpc_status =
+ RpcMgr::DoRpcWithRetry(proxy,
&AdmissionControlServiceProxy::CancelAdmission, req,
+ &resp, query_ctx_, "CancelAdmission() RPC failed", RPC_NUM_RETRIES,
+ RPC_TIMEOUT_MS, RPC_BACKOFF_TIME_MS, "REMOTE_AC_CANCEL_ADMISSION");
+
+ // Failure of this rpc is not considered a query failure, so we just log it.
+ if (!rpc_status.ok()) {
+ LOG(WARNING) << "CancelAdmission rpc failed for " << query_id_ << ": " <<
rpc_status;
+ }
+ Status resp_status(resp.status());
+ if (!resp_status.ok()) {
+ LOG(WARNING) << "CancelAdmission failed for " << query_id_ << ": " <<
resp_status;
+ }
+}
+
+} // namespace impala
diff --git a/be/src/scheduling/local-admission-control-client.h
b/be/src/scheduling/remote-admission-control-client.h
similarity index 64%
copy from be/src/scheduling/local-admission-control-client.h
copy to be/src/scheduling/remote-admission-control-client.h
index 8f3da3c..59fed78 100644
--- a/be/src/scheduling/local-admission-control-client.h
+++ b/be/src/scheduling/remote-admission-control-client.h
@@ -31,11 +31,12 @@ namespace impala {
/// Implementation of AdmissionControlClient used to submit queries for
admission to an
/// AdmissionController running locally on the coordinator.
-class LocalAdmissionControlClient : public AdmissionControlClient {
+class RemoteAdmissionControlClient : public AdmissionControlClient {
public:
- LocalAdmissionControlClient(const TUniqueId& query_id);
+ RemoteAdmissionControlClient(const TQueryCtx& query_ctx);
virtual Status SubmitForAdmission(const
AdmissionController::AdmissionRequest& request,
+ RuntimeProfile::EventSequence* query_events,
std::unique_ptr<QuerySchedulePB>* schedule_result) override;
virtual void ReleaseQuery(int64_t peak_mem_consumption) override;
virtual void ReleaseQueryBackends(
@@ -43,14 +44,30 @@ class LocalAdmissionControlClient : public
AdmissionControlClient {
virtual void CancelAdmission() override;
private:
+ // Owned by the ClientRequestState.
+ const TQueryCtx& query_ctx_;
+
// The id of the query being considered for admission.
UniqueIdPB query_id_;
- /// Promise used by the admission controller.
AdmissionController:SubmitForAdmission()
- /// will block on this promise until the query is either rejected, admitted,
times out,
- /// or is cancelled. Can be set to CANCELLED by CancelAdmission() in order
to cancel,
- /// but otherwise is set by AdmissionController with the admission decision.
- Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER> admit_outcome_;
+ /// The address of the remote admission controller to use.
+ TNetworkAddress address_;
+
+ /// Protects 'pending_admit_' and 'cancelled_'.
+ std::mutex lock_;
+
+ /// If true, the AdmitQuery rpc has been sent but a final admission decision
has not yet
+ /// been recieved by GetQueryStatus().
+ bool pending_admit_ = false;
+
+ /// If true, CancelAdmission() was called. If SubmitForAdmission() is called
+ /// subsequently, it will not send the AdmitQuery rpc
+ bool cancelled_ = false;
+
+ /// Constants related to retrying the idempotent rpcs.
+ static const int RPC_NUM_RETRIES = 3;
+ static const int64_t RPC_TIMEOUT_MS = 10 * MILLIS_PER_SEC;
+ static const int64_t RPC_BACKOFF_TIME_MS = 3 * MILLIS_PER_SEC;
};
} // namespace impala
diff --git a/be/src/scheduling/schedule-state.cc
b/be/src/scheduling/schedule-state.cc
index e004f3a..7985987 100644
--- a/be/src/scheduling/schedule-state.cc
+++ b/be/src/scheduling/schedule-state.cc
@@ -54,29 +54,20 @@ FragmentScheduleState::FragmentScheduleState(
}
ScheduleState::ScheduleState(const UniqueIdPB& query_id, const
TQueryExecRequest& request,
- const TQueryOptions& query_options, RuntimeProfile* summary_profile,
- RuntimeProfile::EventSequence* query_events)
+ const TQueryOptions& query_options, RuntimeProfile* summary_profile, bool
is_test)
: query_id_(query_id),
request_(request),
query_options_(query_options),
query_schedule_pb_(new QuerySchedulePB()),
summary_profile_(summary_profile),
- query_events_(query_events),
next_instance_id_(query_id) {
- Init();
-}
-
-ScheduleState::ScheduleState(const UniqueIdPB& query_id, const
TQueryExecRequest& request,
- const TQueryOptions& query_options, RuntimeProfile* summary_profile)
- : query_id_(query_id),
- request_(request),
- query_options_(query_options),
- query_schedule_pb_(new QuerySchedulePB()),
- summary_profile_(summary_profile),
- next_instance_id_(query_id),
- rng_(rand()) {
- // Init() is not called, this constructor is for white box testing only.
- DCHECK(TestInfo::is_test());
+ if (is_test) {
+ // For tests, don't call Init() and seed the random number generator for
deterministic
+ // results.
+ rng_.seed(rand());
+ } else {
+ Init();
+ }
}
void ScheduleState::Init() {
diff --git a/be/src/scheduling/schedule-state.h
b/be/src/scheduling/schedule-state.h
index 39181cb..233b256 100644
--- a/be/src/scheduling/schedule-state.h
+++ b/be/src/scheduling/schedule-state.h
@@ -147,13 +147,10 @@ struct FragmentScheduleState {
/// everything else is discarded.
class ScheduleState {
public:
+ /// For testing only: specify 'is_test=true' to build a ScheduleState object
without
+ /// running Init() and to seed the random number generator for deterministic
results.
ScheduleState(const UniqueIdPB& query_id, const TQueryExecRequest& request,
- const TQueryOptions& query_options, RuntimeProfile* summary_profile,
- RuntimeProfile::EventSequence* query_events);
-
- /// For testing only: build a ScheduleState object but do not run Init().
- ScheduleState(const UniqueIdPB& query_id, const TQueryExecRequest& request,
- const TQueryOptions& query_options, RuntimeProfile* summary_profile);
+ const TQueryOptions& query_options, RuntimeProfile* summary_profile,
bool is_test);
/// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
/// - all fragments have a FragmentScheduleState
@@ -236,7 +233,6 @@ class ScheduleState {
RuntimeProfile* summary_profile() { return summary_profile_; }
- RuntimeProfile::EventSequence* query_events() { return query_events_; }
int64_t largest_min_reservation() const { return largest_min_reservation_; }
@@ -312,7 +308,6 @@ class ScheduleState {
/// TODO: move these into QueryState
RuntimeProfile* summary_profile_;
- RuntimeProfile::EventSequence* query_events_;
/// Maps from plan node id to its fragment idx. Filled in c'tor.
std::vector<int32_t> plan_node_to_fragment_idx_;
diff --git a/be/src/service/client-request-state.cc
b/be/src/service/client-request-state.cc
index 763b7f5..6a60e24 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -164,7 +164,7 @@ ClientRequestState::ClientRequestState(const TQueryCtx&
query_ctx, Frontend* fro
summary_profile_->AddChild(frontend_profile_);
- AdmissionControlClient::Create(query_ctx.query_id,
&admission_control_client_);
+ AdmissionControlClient::Create(query_ctx_, &admission_control_client_);
}
ClientRequestState::~ClientRequestState() {
@@ -575,8 +575,8 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
Status admit_status = admission_control_client_->SubmitForAdmission(
{query_id_pb, ExecEnv::GetInstance()->backend_id(),
exec_request_->query_exec_request, exec_request_->query_options,
- summary_profile_, query_events_, blacklisted_executor_addresses_},
- &schedule_);
+ summary_profile_, blacklisted_executor_addresses_},
+ query_events_, &schedule_);
{
lock_guard<mutex> l(lock_);
if (!UpdateQueryStatus(admit_status).ok()) return;
diff --git a/be/src/service/impala-http-handler.cc
b/be/src/service/impala-http-handler.cc
index 6f01ae5..65307d5 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -36,6 +36,7 @@
#include "runtime/query-state.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
+#include "scheduling/admission-control-service.h"
#include "scheduling/admission-controller.h"
#include "scheduling/cluster-membership-mgr.h"
#include "service/client-request-state.h"
@@ -64,6 +65,8 @@ using namespace strings;
DECLARE_int32(query_log_size);
DECLARE_int32(query_stmt_size);
DECLARE_bool(use_local_catalog);
+DECLARE_bool(is_admission_controller);
+DECLARE_string(admission_control_service_addr);
namespace {
@@ -170,11 +173,16 @@ void ImpalaHttpHandler::RegisterHandlers(Webserver*
webserver, bool metrics_only
[this](const auto& req, auto* doc) {
this->QuerySummaryHandler(false, false, req, doc); }, false);
+ // Only enable the admission control endpoints for impalads that are running
admission
+ // control, which is all impalads if the admission control service is not
enabled,
+ // indicated by 'admission_control_service_addr' not being specified.
+ if (FLAGS_admission_control_service_addr.empty() ||
FLAGS_is_admission_controller) {
webserver->RegisterUrlCallback("/admission", "admission_controller.tmpl",
MakeCallback(this, &ImpalaHttpHandler::AdmissionStateHandler), true);
webserver->RegisterUrlCallback("/resource_pool_reset", "",
MakeCallback(this, &ImpalaHttpHandler::ResetResourcePoolStatsHandler),
false);
+ }
RegisterLogLevelCallbacks(webserver, true);
}
@@ -1069,22 +1077,45 @@ void ImpalaHttpHandler::AdmissionStateHandler(
unsigned long num_backends;
};
unordered_map<string, vector<QueryInfo>> running_queries;
- server_->query_driver_map_.DoFuncForAllEntries([&running_queries](
- const std::shared_ptr<QueryDriver>& query_driver) {
- // Make sure only queries past admission control are added.
- ClientRequestState* request_state =
query_driver->GetActiveClientRequestState();
- auto query_state = request_state->exec_state();
- if (query_state != ClientRequestState::ExecState::INITIALIZED
- && query_state != ClientRequestState::ExecState::PENDING
- && request_state->schedule() != nullptr)
- running_queries[request_state->request_pool()].push_back(
- {request_state->query_id(),
request_state->schedule()->per_backend_mem_limit(),
- request_state->schedule()->per_backend_mem_to_admit(),
- request_state->schedule()->coord_backend_mem_limit(),
- request_state->schedule()->coord_backend_mem_to_admit(),
- static_cast<unsigned long>(
- request_state->schedule()->backend_exec_params().size())});
- });
+ if (FLAGS_is_admission_controller) {
+ ExecEnv::GetInstance()
+ ->admission_control_service()
+ ->admission_state_map_.DoFuncForAllEntries(
+ [&running_queries](
+ const std::shared_ptr<AdmissionControlService::AdmissionState>&
+ query_info) {
+ lock_guard<mutex> l(query_info->lock);
+ if (query_info->schedule.get() != nullptr) {
+ TUniqueId query_id;
+ UniqueIdPBToTUniqueId(query_info->query_id, &query_id);
+ running_queries[query_info->request_pool].push_back(
+ {query_id, query_info->schedule->per_backend_mem_limit(),
+ query_info->schedule->per_backend_mem_to_admit(),
+ query_info->schedule->coord_backend_mem_limit(),
+ query_info->schedule->coord_backend_mem_to_admit(),
+ static_cast<unsigned long>(
+
query_info->schedule->backend_exec_params().size())});
+ };
+ });
+ } else {
+ server_->query_driver_map_.DoFuncForAllEntries(
+ [&running_queries](const std::shared_ptr<QueryDriver>& query_driver) {
+ // Make sure only queries past admission control are added.
+ ClientRequestState* request_state =
query_driver->GetActiveClientRequestState();
+ auto query_state = request_state->exec_state();
+ if (query_state != ClientRequestState::ExecState::INITIALIZED
+ && query_state != ClientRequestState::ExecState::PENDING
+ && request_state->schedule() != nullptr)
+ running_queries[request_state->request_pool()].push_back(
+ {request_state->query_id(),
+ request_state->schedule()->per_backend_mem_limit(),
+ request_state->schedule()->per_backend_mem_to_admit(),
+ request_state->schedule()->coord_backend_mem_limit(),
+ request_state->schedule()->coord_backend_mem_to_admit(),
+ static_cast<unsigned long>(
+
request_state->schedule()->backend_exec_params().size())});
+ });
+ }
// Add the running queries to the resource_pools json.
for (int i = 0; i < resource_pools.Size(); i++) {
diff --git a/be/src/util/sharded-query-map-util.cc
b/be/src/util/sharded-query-map-util.cc
index 5d5317e..a723b27 100644
--- a/be/src/util/sharded-query-map-util.cc
+++ b/be/src/util/sharded-query-map-util.cc
@@ -18,6 +18,7 @@
#include "util/sharded-query-map-util.h"
#include "runtime/query-driver.h"
+#include "scheduling/admission-control-service.h"
#include "util/debug-util.h"
namespace impala {
@@ -75,4 +76,14 @@ template Status GenericShardedQueryMap<TUniqueId,
std::shared_ptr<QueryDriver>>:
template Status GenericShardedQueryMap<TUniqueId,
std::shared_ptr<QueryDriver>>::Delete(
TUniqueId const&);
+// Needed by AdmissionControlService
+template Status GenericShardedQueryMap<UniqueIdPB,
+ std::shared_ptr<AdmissionControlService::AdmissionState>>::Add(UniqueIdPB
const&,
+ const std::shared_ptr<AdmissionControlService::AdmissionState>&);
+template Status GenericShardedQueryMap<UniqueIdPB,
+ std::shared_ptr<AdmissionControlService::AdmissionState>>::Get(UniqueIdPB
const&,
+ std::shared_ptr<AdmissionControlService::AdmissionState>*);
+template Status GenericShardedQueryMap<UniqueIdPB,
+
std::shared_ptr<AdmissionControlService::AdmissionState>>::Delete(UniqueIdPB
const&);
+
} // namespace impala
diff --git a/common/protobuf/admission_control_service.proto
b/common/protobuf/admission_control_service.proto
index 6b59af0..ee4cdb4 100644
--- a/common/protobuf/admission_control_service.proto
+++ b/common/protobuf/admission_control_service.proto
@@ -152,3 +152,100 @@ message QuerySchedulePB {
// successfully.
optional int64 coord_backend_mem_to_admit = 8;
}
+
+message AdmitQueryRequestPB {
+ optional UniqueIdPB query_id = 1;
+
+ // The BackendId of the coordinator for this query.
+ optional UniqueIdPB coord_id = 2;
+
+ // Idx of the TQueryExecRequest sidecar.
+ optional int32 query_exec_request_sidecar_idx = 3;
+
+ // List of backends this query should not be scheduled on.
+ repeated NetworkAddressPB blacklisted_executor_addresses = 4;
+}
+
+message AdmitQueryResponsePB {
+ // Ok if the request was successfully handed off to the admission thread
pool for
+ // processing
+ optional StatusPB status = 1;
+}
+
+message GetQueryStatusRequestPB {
+ optional UniqueIdPB query_id = 1;
+}
+
+message GetQueryStatusResponsePB {
+ // Error if the query was rejected or retrieving the status failed.
+ optional StatusPB status = 1;
+
+ // The results of scheduling and admisison control. WIll only be set if
admission was
+ // successful and the query has not yet been released.
+ optional QuerySchedulePB query_schedule = 2;
+
+ // Idx of the TRuntimeProfileTree sidecar.
+ optional int32 summary_profile_sidecar_idx = 3;
+}
+
+message ReleaseQueryRequestPB {
+ optional UniqueIdPB query_id = 1;
+
+ // Corresponds to the 'peak_mem_consumption' parameter of
+ // AdmissionController::ReleaseQuery()
+ optional int64 peak_mem_consumption = 3;
+}
+
+message ReleaseQueryResponsePB {
+ optional StatusPB status = 1;
+}
+
+message ReleaseQueryBackendsRequestPB {
+ optional UniqueIdPB query_id = 1;
+
+ // List of backends that have completed. The resources for this query on
these backends
+ // will be released.
+ repeated NetworkAddressPB host_addr = 2;
+}
+
+message ReleaseQueryBackendsResponsePB {
+ optional StatusPB status = 1;
+}
+
+message CancelAdmissionRequestPB {
+ optional UniqueIdPB query_id = 1;
+}
+
+message CancelAdmissionResponsePB {
+ optional StatusPB status = 1;
+}
+
+service AdmissionControlService {
+ /// Called by the coordinator to start scheduling. The actual work is done
on a thread
+ /// pool, so this call returns immedately. TODO: there are some situations
where we can
+ /// return the admission result quickly, eg. if the query is rejected. We
should
+ /// evaluate the benefits of saving a call to GetQueryStatus() in those
situations.
+ rpc AdmitQuery(AdmitQueryRequestPB) returns (AdmitQueryResponsePB);
+
+ /// Called by the coordinator after AdmitQuery() to monitor the admission
status of the
+ /// query. The call will block for a configurable amount of time before
returning. This
+ /// call is idempotent and will return the schedule on each call between
successful
+ /// admission and the query getting released.
+ rpc GetQueryStatus(GetQueryStatusRequestPB) returns
(GetQueryStatusResponsePB);
+
+ /// Called by the coordinator when the query has completely finished,
releases all
+ /// remaining resources.
+ rpc ReleaseQuery(ReleaseQueryRequestPB) returns (ReleaseQueryResponsePB);
+
+ /// Called after individual backends have finished to release their
resources while
+ /// other backends are running. Due to the use of
Coordinator::BackendResourceState,
+ /// this will be called a max of log(# of backends) times per query. TODO:
we can save
+ /// an rpc if we combine the release of the final batch of backends with the
call to
+ /// ReleaseQuery.
+ rpc ReleaseQueryBackends(ReleaseQueryBackendsRequestPB)
+ returns (ReleaseQueryBackendsResponsePB);
+
+ /// Called by the coordinator to cancel scheduling of a query for which
GetQueryStatus
+ /// has not yet returned a schedule.
+ rpc CancelAdmission(CancelAdmissionRequestPB) returns
(CancelAdmissionResponsePB);
+}
diff --git a/tests/common/resource_pool_config.py
b/tests/common/resource_pool_config.py
index ee785b7..47a37b5 100644
--- a/tests/common/resource_pool_config.py
+++ b/tests/common/resource_pool_config.py
@@ -31,8 +31,12 @@ class ResourcePoolConfig(object):
# metrics debug page. Add to this dictionary if other configs are need for
tests.
CONFIG_TO_METRIC_STR_MAPPING = {'max-query-mem-limit':
'pool-max-query-mem-limit'}
- def __init__(self, impala_service, llama_site_path):
+ """'impala_service' should point to an impalad to be used for running
queries.
+ 'ac_service' should point to the service running the admission controller
and is used
+ for checking metrics values on the debug webui."""
+ def __init__(self, impala_service, ac_service, llama_site_path):
self.impala_service = impala_service
+ self.ac_service = ac_service
self.llama_site_path = llama_site_path
tree = ET.parse(llama_site_path)
self.root = tree.getroot()
@@ -63,7 +67,7 @@ class ResourcePoolConfig(object):
while (time() - start_time < timeout):
handle = client.execute_async("select 'wait_for_config_change'")
client.close_query(handle)
- current_val = str(self.impala_service.get_metric_value(metric_key))
+ current_val = str(self.ac_service.get_metric_value(metric_key))
if current_val == target_val:
return
sleep(0.1)
diff --git a/tests/custom_cluster/test_admission_controller.py
b/tests/custom_cluster/test_admission_controller.py
index 64521da..fbbb55b 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -83,11 +83,6 @@ STATESTORE_RPC_FREQUENCY_MS = 100
SUBMISSION_DELAY_MS = \
[0, STATESTORE_RPC_FREQUENCY_MS / 2, STATESTORE_RPC_FREQUENCY_MS * 3 / 2]
-# The number of queries to submit. The test does not support fewer queries than
-# MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES to keep some validation
logic
-# simple.
-NUM_QUERIES = [15, 30, 50]
-
# Whether we will submit queries to all available impalads (in a round-robin
fashion)
ROUND_ROBIN_SUBMISSION = [True, False]
@@ -282,6 +277,14 @@ class TestAdmissionController(TestAdmissionControllerBase,
HS2TestSuite):
finally:
client.close()
+ def get_ac_process(self):
+ """Returns the Process that is running the admission control service."""
+ return self.cluster.impalads[0]
+
+ def get_ac_log_name(self):
+ """Returns the prefix of the log files for the admission control
process."""
+ return "impalad"
+
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_config_args(
@@ -593,7 +596,8 @@ class TestAdmissionController(TestAdmissionControllerBase,
HS2TestSuite):
self.client.wait_for_finished_timeout(handle, 1000)
expected_mem_limits = self.__get_mem_limits_admission_debug_page()
actual_mem_limits =
self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
- mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
+ mem_admitted =\
+ get_mem_admitted_backends_debug_page(self.cluster,
self.get_ac_process())
debug_string = " expected_mem_limits:" + str(
expected_mem_limits) + " actual_mem_limits:" + str(
actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
@@ -622,7 +626,7 @@ class TestAdmissionController(TestAdmissionControllerBase,
HS2TestSuite):
and 'executor' and their respective mem values in bytes."""
# Based on how the cluster is setup, the first impalad in the cluster is
the
# coordinator.
- response_json =
self.cluster.impalads[0].service.get_debug_webpage_json("admission")
+ response_json =
self.get_ac_process().service.get_debug_webpage_json("admission")
assert 'resource_pools' in response_json
assert len(response_json['resource_pools']) == 1
assert response_json['resource_pools'][0]['running_queries']
@@ -752,7 +756,7 @@ class TestAdmissionController(TestAdmissionControllerBase,
HS2TestSuite):
handle = client.execute_async("select 1")
sleep(1)
client.close_query(handle)
- self.assert_impalad_log_contains('INFO',
+ self.assert_log_contains(self.get_ac_log_name(), 'INFO',
"Ready to be Rejected but already cancelled, query id=")
client.clear_configuration()
@@ -760,7 +764,7 @@ class TestAdmissionController(TestAdmissionControllerBase,
HS2TestSuite):
handle = client.execute_async("select 2")
sleep(1)
client.close_query(handle)
- self.assert_impalad_log_contains('INFO',
+ self.assert_log_contains(self.get_ac_log_name(), 'INFO',
"Ready to be Admitted immediately but already cancelled, query id=")
client.set_configuration_option("debug_action",
@@ -794,7 +798,8 @@ class TestAdmissionController(TestAdmissionControllerBase,
HS2TestSuite):
client.close_query(queued_query_handle)
queued_profile = client.get_runtime_profile(queued_query_handle)
assert "Admission result: Cancelled (queued)" in queued_profile,
queued_profile
- self.assert_impalad_log_contains('INFO', "Dequeued cancelled query=")
+ self.assert_log_contains(
+ self.get_ac_log_name(), 'INFO', "Dequeued cancelled query=")
client.clear_configuration()
handle = client.execute_async("select sleep(10000)")
@@ -807,12 +812,13 @@ class
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
queued_profile = client.get_runtime_profile(queued_query_handle)
assert "Admission result: Cancelled (queued)" in queued_profile
for i in self.cluster.impalads:
-
i.service.wait_for_metric_value("impala-server.num-fragments-in-flight", 0)
- assert self.cluster.impalads[0].service.get_metric_value(
+ i.service.wait_for_metric_value(
+ "impala-server.num-fragments-in-flight", 0, timeout=20)
+ assert self.get_ac_process().service.get_metric_value(
"admission-controller.agg-num-running.default-pool") == 0
- assert self.cluster.impalads[0].service.get_metric_value(
+ assert self.get_ac_process().service.get_metric_value(
"admission-controller.total-admitted.default-pool") == 4
- assert self.cluster.impalads[0].service.get_metric_value(
+ assert self.get_ac_process().service.get_metric_value(
"admission-controller.total-queued.default-pool") == 2
finally:
client.close()
@@ -964,7 +970,8 @@ class TestAdmissionController(TestAdmissionControllerBase,
HS2TestSuite):
NUM_QUERIES = 5
coordinator_limited_metric = \
"admission-controller.total-dequeue-failed-coordinator-limited"
- original_metric_value = self.get_metric(coordinator_limited_metric)
+ original_metric_value = self.get_ac_process().service.get_metric_value(
+ coordinator_limited_metric)
profiles = self._execute_and_collect_profiles([STMT for i in
xrange(NUM_QUERIES)],
TIMEOUT_S, config_options={"mt_dop": 4})
@@ -991,7 +998,8 @@ class TestAdmissionController(TestAdmissionControllerBase,
HS2TestSuite):
# The number of admission control slots on the coordinator is limited
# so the failures to dequeue should trigger a bump in the
coordinator_limited_metric.
- later_metric_value = self.get_metric(coordinator_limited_metric)
+ later_metric_value = self.get_ac_process().service.get_metric_value(
+ coordinator_limited_metric)
assert later_metric_value > original_metric_value, \
"Metric %s did not change" % coordinator_limited_metric
@@ -1017,7 +1025,7 @@ class
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.client.wait_for_admission_control(handle_running)
handle_queued = self.client.execute_async(query)
self.client.wait_for_admission_control(handle_queued)
- self.impalad_test_service.wait_for_metric_value(
+ self.get_ac_process().service.wait_for_metric_value(
"admission-controller.total-queued.default-pool", 1)
# Queued queries don't show up on backends
self.__assert_num_queries_accounted(1, 1)
@@ -1082,7 +1090,8 @@ class
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
# Change config to be invalid.
llama_site_path = os.path.join(RESOURCES_DIR,
"copy-mem-limit-test-llama-site.xml")
- config = ResourcePoolConfig(self.cluster.impalads[0].service,
llama_site_path)
+ config = ResourcePoolConfig(
+ self.cluster.impalads[0].service, self.get_ac_process().service,
llama_site_path)
config.set_config_value(pool_name, config_str, 1)
# Close running query so the queued one gets a chance.
self.client.close_query(sleep_query_handle)
@@ -1140,9 +1149,8 @@ class
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
self.wait_for_operation_state(queued_query_resp.operationHandle,
TCLIService.TOperationState.PENDING_STATE)
# Check whether the query log message correctly exposes the queuing status.
- get_log_req = TCLIService.TGetLogReq()
- get_log_req.operationHandle = queued_query_resp.operationHandle
- log = self.hs2_client.GetLog(get_log_req).log
+ log = self.wait_for_log_message(
+ queued_query_resp.operationHandle, "Admission result :")
assert "Admission result : Queued" in log, log
assert "Latest admission queue reason : number of running queries 1 is at
or over "
"limit 1" in log, log
@@ -1171,7 +1179,7 @@ class
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
"""Test behaviour with a failed statestore. Queries should continue to be
admitted
but we should generate diagnostics about the stale topic."""
self.cluster.statestored.kill()
- impalad = self.cluster.impalads[0]
+ impalad = self.get_ac_process()
# Sleep until the update should be definitely stale.
sleep(STALE_TOPIC_THRESHOLD_MS / 1000. * 1.5)
ac_json = impalad.service.get_debug_webpage_json('/admission')
@@ -1289,7 +1297,8 @@ class
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
in self.client.get_runtime_profile(handle))
get_num_completed_backends(self.cluster.impalads[0].service,
handle.get_handle().id) == 1
- mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
+ mem_admitted =\
+ get_mem_admitted_backends_debug_page(self.cluster,
self.get_ac_process())
num_executor_zero_admitted = 0
for executor_mem_admitted in mem_admitted['executor']:
if executor_mem_admitted == 0:
@@ -1304,6 +1313,36 @@ class
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
assert executor_mem_admitted == 0
+class TestAdmissionControllerWithACService(TestAdmissionController):
+ """Runs all of the tests from TestAdmissionController but with the second
impalad in the
+ minicluster configured to perform all admission control."""
+
+ def get_ac_process(self):
+ return self.cluster.impalads[1]
+
+ def get_ac_log_name(self):
+ return "impalad_node1"
+
+ def setup_method(self, method):
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+
+ PER_IMPALAD_ACS_ARGS = [
+ '--is_admission_controller=false',
+ '--is_admission_controller=true',
+ '--is_admission_controller=false',
+ ]
+ if 'start_args' not in method.func_dict:
+ method.func_dict['start_args'] = list()
+ method.func_dict['start_args'].append(
+ "--per_impalad_args=" + ";".join(PER_IMPALAD_ACS_ARGS))
+ if 'impalad_args' not in method.func_dict:
+ method.func_dict["impalad_args"] = ""
+ method.func_dict["impalad_args"] +=\
+ " --admission_control_service_addr=127.0.0.1:27001 "
+ super(TestAdmissionController, self).setup_method(method)
+
+
class TestAdmissionControllerStress(TestAdmissionControllerBase):
"""Submits a number of queries (parameterized) with some delay between
submissions
(parameterized) and the ability to submit to one impalad or many in a
round-robin
@@ -1339,14 +1378,12 @@ class
TestAdmissionControllerStress(TestAdmissionControllerBase):
def add_test_dimensions(cls):
super(TestAdmissionControllerStress, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
- ImpalaTestDimension('num_queries', *NUM_QUERIES))
- cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('round_robin_submission', *ROUND_ROBIN_SUBMISSION))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('submission_delay_ms', *SUBMISSION_DELAY_MS))
# Additional constraints for code coverage jobs and core.
- num_queries = None
+ num_queries = 50
if ImpalaTestClusterProperties.get_instance().has_code_coverage():
# Code coverage builds can't handle the increased concurrency.
num_queries = 15
@@ -1357,9 +1394,11 @@ class
TestAdmissionControllerStress(TestAdmissionControllerBase):
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('round_robin_submission'))
- if num_queries is not None:
- cls.ImpalaTestMatrix.add_constraint(
- lambda v: v.get_value('num_queries') == num_queries)
+ # The number of queries to submit. The test does not support fewer queries
than
+ # MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES to keep some
validation logic
+ # simple.
+ cls.ImpalaTestMatrix.add_dimension(
+ ImpalaTestDimension('num_queries', num_queries))
def setup(self):
# All threads are stored in this list and it's used just to make sure we
clean up
@@ -1395,6 +1434,12 @@ class
TestAdmissionControllerStress(TestAdmissionControllerBase):
LOG.debug("Join thread for query num %s %s", thread.query_num,
"TIMED OUT" if thread.isAlive() else "")
+ def get_ac_processes(self):
+ """Returns a list of all Processes which may be used to perform admission
control. If
+ round-robin submission is not being used, only the first Process in this
list will
+ perform admission control."""
+ return self.cluster.impalads
+
def get_admission_metrics(self):
"""
Returns a map of the admission metrics, aggregated across all of the
impalads.
@@ -1404,7 +1449,7 @@ class
TestAdmissionControllerStress(TestAdmissionControllerBase):
"""
metrics = {'admitted': 0, 'queued': 0, 'dequeued': 0, 'rejected': 0,
'released': 0, 'timed-out': 0}
- for impalad in self.impalads:
+ for impalad in self.ac_processes:
keys = [metric_key(self.pool_name, 'total-%s' % short_name)
for short_name in metrics.keys()]
values = impalad.service.get_metric_values(keys, [0] * len(keys))
@@ -1683,13 +1728,28 @@ class
TestAdmissionControllerStress(TestAdmissionControllerBase):
num_queued += 1
return num_queued
+ def wait_on_queries_page_num_queued(self, min_queued, max_queued):
+ start_time = time()
+ LOG.info("Waiting for %s <= queued queries <= %s" % (min_queued,
max_queued))
+ actual_queued = self._get_queries_page_num_queued()
+ while actual_queued < min_queued or actual_queued > max_queued:
+ assert (time() - start_time < STRESS_TIMEOUT), ("Timed out waiting %s
seconds for "
+ "%s <= queued queries <= %s, %s currently queued.",
+ STRESS_TIMEOUT, min_queued, max_queued, actual_queued)
+ sleep(0.1)
+ actual_queued = self._get_queries_page_num_queued()
+ LOG.info("Found %s queued queries after %s seconds", actual_queued,
+ round(time() - start_time, 1))
+
def run_admission_test(self, vector, additional_query_options):
LOG.info("Starting test case with parameters: %s", vector)
self.impalads = self.cluster.impalads
+ self.ac_processes = self.get_ac_processes()
round_robin_submission = vector.get_value('round_robin_submission')
submission_delay_ms = vector.get_value('submission_delay_ms')
if not round_robin_submission:
self.impalads = [self.impalads[0]]
+ self.ac_processes = [self.ac_processes[0]]
num_queries = vector.get_value('num_queries')
assert num_queries >= MAX_NUM_CONCURRENT_QUERIES + MAX_NUM_QUEUED_QUERIES
@@ -1738,10 +1798,9 @@ class
TestAdmissionControllerStress(TestAdmissionControllerBase):
initial_metric_deltas = metric_deltas
# Like above, check that the count from the queries webpage json is
reasonable.
- queries_page_num_queued = self._get_queries_page_num_queued()
- assert queries_page_num_queued >=\
- min(num_queries - metric_deltas['admitted'], MAX_NUM_QUEUED_QUERIES)
- assert queries_page_num_queued <= MAX_NUM_QUEUED_QUERIES *
len(self.impalads)
+ min_queued = min(num_queries - metric_deltas['admitted'],
MAX_NUM_QUEUED_QUERIES)
+ max_queued = MAX_NUM_QUEUED_QUERIES * len(self.impalads)
+ self.wait_on_queries_page_num_queued(min_queued, max_queued)
self._check_queries_page_resource_pools()
# Admit queries in waves until all queries are done. A new wave of
admission
@@ -1792,8 +1851,7 @@ class
TestAdmissionControllerStress(TestAdmissionControllerBase):
assert metric_deltas['rejected'] == num_queries - expected_admitted
# All queries should be completed by now.
- queries_page_num_queued = self._get_queries_page_num_queued()
- assert queries_page_num_queued == 0
+ self.wait_on_queries_page_num_queued(0, 0)
self._check_queries_page_resource_pools()
for thread in self.all_threads:
@@ -1865,3 +1923,33 @@ class
TestAdmissionControllerStress(TestAdmissionControllerBase):
query_mem_limit = (proc_limit / MAX_NUM_CONCURRENT_QUERIES / num_impalads)
- 1
self.run_admission_test(vector,
{'request_pool': self.pool_name, 'mem_limit': query_mem_limit})
+
+
+class
TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
+ """Runs all of the tests from TestAdmissionControllerStress but with the
second impalad
+ in the minicluster configured to perform all admission control."""
+
+ def get_ac_processes(self):
+ return [self.cluster.impalads[1]]
+
+ def get_ac_log_name(self):
+ return "impalad_node1"
+
+ def setup_method(self, method):
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+
+ PER_IMPALAD_ACS_ARGS = [
+ '--is_admission_controller=false',
+ '--is_admission_controller=true',
+ '--is_admission_controller=false',
+ ]
+ if 'start_args' not in method.func_dict:
+ method.func_dict['start_args'] = list()
+ method.func_dict['start_args'].append(
+ "--per_impalad_args=" + ";".join(PER_IMPALAD_ACS_ARGS))
+ if 'impalad_args' not in method.func_dict:
+ method.func_dict["impalad_args"] = ""
+ method.func_dict["impalad_args"] +=\
+ " --admission_control_service_addr=127.0.0.1:27001 "
+ super(TestAdmissionControllerStress, self).setup_method(method)
diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py
index 01bd574..c92b555 100644
--- a/tests/hs2/hs2_test_suite.py
+++ b/tests/hs2/hs2_test_suite.py
@@ -335,6 +335,18 @@ class HS2TestSuite(ImpalaTestSuite):
assert False, 'Did not complete admission control processing in time,
current ' \
'operation state of query: %s' %
(get_operation_status_resp.operationState)
+ def wait_for_log_message(self, operationHandle, expected_message,
timeout=10):
+ start_time = time()
+ while (time() - start_time < timeout):
+ get_log_req = TCLIService.TGetLogReq()
+ get_log_req.operationHandle = operationHandle
+ log = self.hs2_client.GetLog(get_log_req).log
+ if expected_message in log:
+ return log
+ sleep(0.05)
+ assert False, "Did not find expected log message '%s' in time, latest log:
'%s'" \
+ % (expected_message, log)
+
def execute_statement(self, statement, conf_overlay=None,
expected_status_code=TCLIService.TStatusCode.SUCCESS_STATUS,
expected_error_prefix=None):
diff --git a/tests/util/web_pages_util.py b/tests/util/web_pages_util.py
index 5bf97f7..317289e 100644
--- a/tests/util/web_pages_util.py
+++ b/tests/util/web_pages_util.py
@@ -30,7 +30,7 @@ def get_num_completed_backends(service, query_id):
return num_complete_backends
-def get_mem_admitted_backends_debug_page(cluster):
+def get_mem_admitted_backends_debug_page(cluster, ac_process=None):
"""Helper method assumes a cluster using a dedicated coordinator. Returns
the mem
admitted to the backends extracted from the backends debug page of the
coordinator
impala daemon. Returns a dictionary with the keys 'coordinator' and
'executor' and
@@ -38,7 +38,9 @@ def get_mem_admitted_backends_debug_page(cluster):
admitted for each executor."""
# Based on how the cluster is setup, the first impalad in the cluster is the
# coordinator.
- response_json =
cluster.impalads[0].service.get_debug_webpage_json('backends')
+ if ac_process is None:
+ ac_process = cluster.impalads[0]
+ response_json = ac_process.service.get_debug_webpage_json('backends')
assert 'backends' in response_json
assert len(response_json['backends']) >= 2
ret = dict()