IMPALA-1972/IMPALA-3882: Fix client_request_state_map_lock_ contention Holding client_request_state_map_lock_ and CRS::lock_ together in certain paths could potentially block the impalad from registering new queries. The most common occurrence of this is while loading the webpage of a query while the query planning is still in progress. Since we hold the CRS::lock_ during planning, it blocks the web page from loading which inturn blocks incoming queries by holding client_request_state_map_lock_.
This patch makes client_request_state_map_lock_ a terminal lock so that we don't have interleaving locking with CRS::lock_. Testing: Tested it locally by adding a long sleep in JniFrontend.createExecRequest() and still was able to refresh the web UI and run parallel queries. Also added a custom cluster test that does the same sequence of actions by injecting a metadata loading pause. Change-Id: Ie44daa93e3ae4d04d091261f3ec4891caffe8026 Reviewed-on: http://gerrit.cloudera.org:8080/6707 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/aa076491 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/aa076491 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/aa076491 Branch: refs/heads/master Commit: aa076491b999020b173cd21a53b28f27ee0f3b5d Parents: ee0fc26 Author: Bharath Vissapragada <[email protected]> Authored: Wed Apr 19 16:28:13 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed May 24 03:36:02 2017 +0000 ---------------------------------------------------------------------- be/src/service/impala-beeswax-server.cc | 23 +++--- be/src/service/impala-hs2-server.cc | 15 ++-- be/src/service/impala-http-handler.cc | 19 +++-- be/src/service/impala-server.cc | 47 ++++++----- be/src/service/impala-server.h | 14 ++-- tests/custom_cluster/test_query_concurrency.py | 92 +++++++++++++++++++++ www/query_plan.tmpl | 8 +- 7 files changed, 164 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa076491/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index 67ecc79..2f80fcd 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -95,7 +95,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query, RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR); shared_ptr<ClientRequestState> request_state; - DCHECK(session != NULL); // The session should exist. + DCHECK(session != nullptr); // The session should exist. { // The session is created when the client connects. Depending on the underlying // transport, the username may be known at that time. If the username hasn't been set @@ -188,15 +188,14 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata, TUniqueId query_id; QueryHandleToTUniqueId(handle, &query_id); VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id); - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state.get() == nullptr)) { RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); } { - // make sure we release the lock on request_state if we see any error - lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); + lock_guard<mutex> l(*request_state->lock()); // Convert TResultSetMetadata to Beeswax.ResultsMetadata const TResultSetMetadata* result_set_md = request_state->result_metadata(); @@ -277,8 +276,8 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) { TUniqueId query_id; QueryHandleToTUniqueId(handle, &query_id); - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); - if (request_state.get() == NULL) { + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); + if (request_state.get() == nullptr) { stringstream str; str << "unknown query id: " << query_id; LOG(ERROR) << str.str(); @@ -294,7 +293,7 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) { error_log_ss << join(request_state->GetAnalysisWarnings(), "\n"); // Add warnings from execution - if (request_state->coord() != NULL) { + if (request_state->coord() != nullptr) { if (!request_state->query_status().ok()) error_log_ss << "\n\n"; error_log_ss << request_state->coord()->GetErrorLog(); } @@ -404,7 +403,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query, shared_ptr<SessionState> session; const TUniqueId& session_id = ThriftServer::GetThreadConnectionId(); RETURN_IF_ERROR(GetSessionState(session_id, &session)); - DCHECK(session != NULL); + DCHECK(session != nullptr); { // The session is created when the client connects. Depending on the underlying // transport, the username may be known at that time. If the username hasn't been @@ -455,7 +454,7 @@ inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle, Status ImpalaServer::FetchInternal(const TUniqueId& query_id, const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) { - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state == nullptr)) { return Status(Substitute("Invalid query handle: $0", PrintId(query_id))); } @@ -516,14 +515,14 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id, TInsertResult* insert_result) { - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state == nullptr)) { return Status(Substitute("Invalid query handle: $0", PrintId(query_id))); } Status query_status; { - lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); + lock_guard<mutex> l(*request_state->lock()); query_status = request_state->query_status(); if (query_status.ok()) { // Coord may be NULL for a SELECT with LIMIT 0. @@ -532,7 +531,7 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id, // coordinator, depending on how we choose to drive the table sink. int64_t num_row_errors = 0; bool has_kudu_stats = false; - if (request_state->coord() != NULL) { + if (request_state->coord() != nullptr) { for (const PartitionStatusMap::value_type& v: request_state->coord()->per_partition_status()) { const pair<string, TInsertPartitionStatus> partition_status = v; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa076491/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index b428512..ea8c624 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -179,7 +179,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size, bool fetch_first, TFetchResultsResp* fetch_results) { - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state == nullptr)) { return Status(Substitute("Invalid query handle: $0", PrintId(query_id))); } @@ -634,7 +634,7 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val, request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR); VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id); - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state.get() == nullptr)) { // No handle was found HS2_RETURN_ERROR(return_val, @@ -670,7 +670,7 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val, request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR); VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id); - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state.get() == nullptr)) { // No handle was found HS2_RETURN_ERROR(return_val, @@ -692,7 +692,7 @@ void ImpalaServer::CloseOperation(TCloseOperationResp& return_val, request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR); VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id); - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state.get() == nullptr)) { // No handle was found HS2_RETURN_ERROR(return_val, @@ -731,7 +731,7 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val, HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id), SQLSTATE_GENERAL_ERROR); - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state.get() == nullptr)) { VLOG_QUERY << "GetResultSetMetadata(): invalid query handle"; // No handle was found @@ -739,8 +739,7 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val, Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR); } { - // make sure we release the lock on request_state if we see any error - lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); + lock_guard<mutex> l(*request_state->lock()); // Convert TResultSetMetadata to TGetResultSetMetadataResp const TResultSetMetadata* result_set_md = request_state->result_metadata(); @@ -807,7 +806,7 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) { HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId( request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR); - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (UNLIKELY(request_state.get() == nullptr)) { // No handle was found HS2_RETURN_ERROR(return_val, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa076491/be/src/service/impala-http-handler.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 6ba8429..f441c4d 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -41,7 +41,6 @@ #include "common/names.h" -using boost::adopt_lock_t; using namespace apache::thrift; using namespace beeswax; using namespace impala; @@ -231,7 +230,6 @@ void ImpalaHttpHandler::QueryProfileEncodedHandler(const Webserver::ArgumentMap& ss.str(Substitute("Could not obtain runtime profile: $0", status.GetDetail())); } } - document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator()); Value profile(ss.str().c_str(), document->GetAllocator()); document->AddMember("contents", profile, document->GetAllocator()); @@ -696,6 +694,8 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include const Webserver::ArgumentMap& args, Document* document) { TUniqueId query_id; Status status = ParseIdFromArguments(args, &query_id, "query_id"); + Value query_id_val(PrintId(query_id).c_str(), document->GetAllocator()); + document->AddMember("query_id", query_id_val, document->GetAllocator()); if (!status.ok()) { // Redact the error message, it may contain part or all of the query. Value json_error(RedactCopy(status.GetDetail()).c_str(), document->GetAllocator()); @@ -713,10 +713,19 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include // Search the in-flight queries first, followed by the archived ones. { shared_ptr<ClientRequestState> request_state = - server_->GetClientRequestState(query_id, true); + server_->GetClientRequestState(query_id); if (request_state != NULL) { found = true; - lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); + // If the query plan isn't generated, avoid waiting for the request + // state lock to be acquired, since it could potentially be an expensive + // call, if the table Catalog metadata loading is in progress. Instead + // update the caller that the plan information is unavailable. + if (request_state->query_state() == beeswax::QueryState::CREATED) { + document->AddMember( + "plan_metadata_unavailable", "true", document->GetAllocator()); + return; + } + lock_guard<mutex> l(*request_state->lock()); if (request_state->coord() == NULL) { const string& err = Substitute("Invalid query id: $0", PrintId(query_id)); Value json_error(err.c_str(), document->GetAllocator()); @@ -780,8 +789,6 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include Value json_status(query_status.ok() ? "OK" : RedactCopy(query_status.GetDetail()).c_str(), document->GetAllocator()); document->AddMember("status", json_status, document->GetAllocator()); - Value json_id(PrintId(query_id).c_str(), document->GetAllocator()); - document->AddMember("query_id", json_id, document->GetAllocator()); } void ImpalaHttpHandler::BackendsHandler(const Webserver::ArgumentMap& args, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa076491/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index da09eb1..07f6c38 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -185,6 +185,12 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and co DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query " "fragments."); +#ifndef NDEBUG + DEFINE_int64(stress_metadata_loading_pause_injection_ms, 0, "Simulates metadata loading" + "for a given query by injecting a sleep equivalent to this configuration in " + "milliseconds. Only used for testing."); +#endif + // TODO: Remove for Impala 3.0. DEFINE_string(local_nodemanager_url, "", "Deprecated"); @@ -591,8 +597,12 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id, DCHECK(output != nullptr); // Search for the query id in the active query map { - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (request_state.get() != nullptr) { + // For queries in CREATED state, the profile information isn't populated yet. + if (request_state->query_state() == beeswax::QueryState::CREATED) { + return Status("Query plan is not ready."); + } lock_guard<mutex> l(*request_state->lock()); if (base64_encoded) { request_state->profile().SerializeToArchiveString(output); @@ -624,12 +634,11 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id, Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* result) { // Search for the query id in the active query map. { - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (request_state != nullptr) { - lock_guard<mutex> l(*request_state->lock(), adopt_lock_t()); + lock_guard<mutex> l(*request_state->lock()); if (request_state->coord() != nullptr) { request_state->coord()->GetTExecSummary(result); - TExecProgress progress; progress.__set_num_completed_scan_ranges( request_state->coord()->progress().num_complete()); @@ -803,15 +812,6 @@ Status ImpalaServer::ExecuteInternal( { // Keep a lock on request_state so that registration and setting // result_metadata are atomic. - // - // Note: this acquires the request_state lock *before* the - // client_request_state_map_ lock. This is the opposite of - // GetClientRequestState(..., true), and therefore looks like a - // candidate for deadlock. The reason this works here is that - // GetClientRequestState cannot find request_state (under the exec state - // map lock) and take it's lock until RegisterQuery has - // finished. By that point, the exec state map lock will have been - // given up, so the classic deadlock interleaving is not possible. lock_guard<mutex> l(*(*request_state)->lock()); // register exec state as early as possible so that queries that @@ -820,8 +820,18 @@ Status ImpalaServer::ExecuteInternal( RETURN_IF_ERROR(RegisterQuery(session_state, *request_state)); *registered_request_state = true; + +#ifndef NDEBUG + // Inject a sleep to simulate metadata loading pauses for tables. This + // is only used for testing. + if (FLAGS_stress_metadata_loading_pause_injection_ms > 0) { + SleepForMs(FLAGS_stress_metadata_loading_pause_injection_ms); + } +#endif + RETURN_IF_ERROR((*request_state)->UpdateQueryStatus( exec_env_->frontend()->GetExecRequest(query_ctx, &result))); + (*request_state)->query_events()->MarkEvent("Planning finished"); (*request_state)->summary_profile()->AddEventSequence( result.timeline.name, result.timeline); @@ -1010,7 +1020,7 @@ Status ImpalaServer::UpdateCatalogMetrics() { Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflight, const Status* cause) { VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id); - shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false); + shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id); if (request_state == nullptr) return Status("Invalid or unknown query handle"); RETURN_IF_ERROR(request_state->Cancel(check_inflight, cause)); return Status::OK(); @@ -1100,7 +1110,7 @@ void ImpalaServer::ReportExecStatus( // every report (assign each query a local int32_t id and use that to index into a // vector of ClientRequestStates, w/o lookup or locking?) shared_ptr<ClientRequestState> request_state = - GetClientRequestState(params.query_id, false); + GetClientRequestState(params.query_id); if (request_state.get() == nullptr) { // This is expected occasionally (since a report RPC might be in flight while // cancellation is happening). Return an error to the caller to get it to stop. @@ -1830,7 +1840,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) { // break here and sleep. if (expiration_event->first > now) break; shared_ptr<ClientRequestState> query_state = - GetClientRequestState(expiration_event->second, false); + GetClientRequestState(expiration_event->second); if (query_state.get() == nullptr) { // Query was deleted some other way. queries_by_timestamp_.erase(expiration_event++); @@ -1992,13 +2002,12 @@ bool ImpalaServer::GetSessionIdForQuery(const TUniqueId& query_id, } shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState( - const TUniqueId& query_id, bool lock) { + const TUniqueId& query_id) { lock_guard<mutex> l(client_request_state_map_lock_); ClientRequestStateMap::iterator i = client_request_state_map_.find(query_id); if (i == client_request_state_map_.end()) { return shared_ptr<ClientRequestState>(); } else { - if (lock) i->second->lock()->lock(); return i->second; } } @@ -2008,7 +2017,7 @@ void ImpalaServer::UpdateFilter(TUpdateFilterResult& result, DCHECK(params.__isset.query_id); DCHECK(params.__isset.filter_id); shared_ptr<ClientRequestState> client_request_state = - GetClientRequestState(params.query_id, false); + GetClientRequestState(params.query_id); if (client_request_state.get() == nullptr) { LOG(INFO) << "Could not find client request state: " << params.query_id; return; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa076491/be/src/service/impala-server.h ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 88ee934..7cdb319 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -83,11 +83,10 @@ class ClientRequestState; /// 1. session_state_map_lock_ /// 2. SessionState::lock /// 3. query_expiration_lock_ -/// 4. client_request_state_map_lock_ -/// 5. ClientRequestState::fetch_rows_lock -/// 6. ClientRequestState::lock -/// 7. ClientRequestState::expiration_data_lock_ -/// 8. Coordinator::exec_summary_lock +/// 4. ClientRequestState::fetch_rows_lock +/// 5. ClientRequestState::lock +/// 6. ClientRequestState::expiration_data_lock_ +/// 7. Coordinator::exec_summary_lock /// /// Coordinator::lock_ should not be acquired at the same time as the /// ImpalaServer/SessionState/ClientRequestState locks. Aside from @@ -101,6 +100,7 @@ class ClientRequestState; /// * uuid_lock_ /// * catalog_version_lock_ /// * connection_to_sessions_map_lock_ +/// * client_request_state_map_lock_ /// /// TODO: The state of a running query is currently not cleaned up if the /// query doesn't experience any errors at runtime and close() doesn't get called. @@ -397,10 +397,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf, static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED; /// Return exec state for given query_id, or NULL if not found. - /// If 'lock' is true, the returned exec state's lock() will be acquired before - /// the client_request_state_map_lock_ is released. std::shared_ptr<ClientRequestState> GetClientRequestState( - const TUniqueId& query_id, bool lock); + const TUniqueId& query_id); /// Writes the session id, if found, for the given query to the output /// parameter. Returns false if no query with the given ID is found. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa076491/tests/custom_cluster/test_query_concurrency.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_query_concurrency.py b/tests/custom_cluster/test_query_concurrency.py new file mode 100644 index 0000000..53bc72b --- /dev/null +++ b/tests/custom_cluster/test_query_concurrency.py @@ -0,0 +1,92 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +import requests +import time +from time import localtime, strftime +from threading import Thread +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_cluster import ImpalaCluster +from tests.common.skip import SkipIfBuildType + [email protected]_dev_build +class TestQueryConcurrency(CustomClusterTestSuite): + """Tests if multiple queries are registered on the coordinator when + submitted in parallel along with clients trying to access the web UI. + The intention here is to check that the web server call paths don't hold + global locks that can conflict with other requests and prevent the impalad + from servicing them. It is done by simulating a metadata loading pause + using the configuration key --metadata_loading_pause_injection_ms that + makes the frontend hold the ClientRequestState::lock_ for longer duration.""" + + TEST_QUERY = "select count(*) from tpch.supplier" + POLLING_TIMEOUT_S = 15 + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def setup_class(cls): + if cls.exploration_strategy() != 'exhaustive': + pytest.skip('Runs only in exhaustive mode.') + super(TestQueryConcurrency, cls).setup_class() + + def poll_query_page(self, impalad, query_id): + """Polls the debug plan page of a given query id in a loop till the timeout + of POLLING_TIMEOUT_S is hit.""" + start = time.time() + while time.time() - start < self.POLLING_TIMEOUT_S: + try: + impalad.service.read_debug_webpage("query_plan?query_id=" + query_id) + except e: + pass + time.sleep(1) + + def check_registered_queries(self, impalad, count): + """Asserts that the registered query count on a given impalad matches 'count' + before POLLING_TIMEOUT_S is hit.""" + start = time.time() + while time.time() - start < self.POLLING_TIMEOUT_S: + inflight_query_ids = impalad.service.get_in_flight_queries() + if inflight_query_ids is not None and len(inflight_query_ids) == count: + return inflight_query_ids + time.sleep(1) + assert False, "Registered query count doesn't match: " + str(count) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--stress_metadata_loading_pause_injection_ms=100000") + def test_query_concurrency(self, vector): + impalad = self.cluster.get_any_impalad() + client1 = impalad.service.create_beeswax_client() + client2 = impalad.service.create_beeswax_client() + q1 = Thread(target = client1.execute_async, args = (self.TEST_QUERY,)) + q2 = Thread(target = client2.execute_async, args = (self.TEST_QUERY,)) + q1.start() + inflight_query_ids = self.check_registered_queries(impalad, 1) + Thread(target = self.poll_query_page,\ + args = (impalad, inflight_query_ids[0]['query_id'],)).start() + time.sleep(2) + q2.start() + inflight_query_ids = self.check_registered_queries(impalad, 2) + result = impalad.service.read_debug_webpage("query_profile_encoded?query_id="\ + + inflight_query_ids[1]['query_id']) + assert result.startswith("Could not obtain runtime profile") + client1.close() + client2.close() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/aa076491/www/query_plan.tmpl ---------------------------------------------------------------------- diff --git a/www/query_plan.tmpl b/www/query_plan.tmpl index 0487529..ffb5c75 100644 --- a/www/query_plan.tmpl +++ b/www/query_plan.tmpl @@ -38,8 +38,13 @@ under the License. {{> www/query_detail_tabs.tmpl }} -<h3>Plan</h3> +{{?plan_metadata_unavailable}} +<h3>Plan not yet available. Page will update when query planning completes.</h3> +{{/plan_metadata_unavailable}} + +{{^plan_metadata_unavailable}} +<h3>Plan</h3> <div> <label> <input type="checkbox" checked="true" id="colour_scheme"/> @@ -48,6 +53,7 @@ under the License. </div> <svg style="border: 1px solid darkgray" width=1200 height=600 class="panel"><g/></svg> +{{/plan_metadata_unavailable}} {{> www/common-footer.tmpl }}
