This is an automated email from the ASF dual-hosted git repository. stakiar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 85425b81f04c856d7d5ec375242303f78ec7964e Author: Sahil Takiar <[email protected]> AuthorDate: Wed Nov 13 21:13:53 2019 -0800 IMPALA-9124: ImpalaServer and ClientRequestState refactoring Re-factoring several areas of the ImpalaServer and ClientRequestState in preparation for future work required for transparent query retries. Re-factored TExecRequest ownership in ClientRequestState. Previously, the TExecRequest was passed to the ClientRequestState via the Exec(TExecRequest) method. Now it is created and owned by the ClientRequestState and only exposed as a constant reference. It is stored as a plain member variable. Re-factored ImpalaServer::UnregisterQuery into multiple methods. Made ClientRequestStateMap a dedicated class that extends ShardedQueryMap with additional methods to add and delete a ClientRequestState. The re-factoring breaks up the large UnregisterQuery method into multiple smaller method and adds some additional code documentation. Re-factored ImpalaServer::client_request_state_map_ into a separate class. Testing: * Ran core tests Change-Id: Ib92c932a69802225af3fd9bf15f85c85317acaca Reviewed-on: http://gerrit.cloudera.org:8080/14755 Reviewed-by: Sahil Takiar <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/client-request-state-map.h | 74 +++++++++++++++++++++++++++++++ be/src/service/client-request-state.cc | 22 +++++---- be/src/service/client-request-state.h | 31 ++++++++++--- be/src/service/impala-server.cc | 67 +++++++++++----------------- be/src/service/impala-server.h | 20 ++++++--- 5 files changed, 152 insertions(+), 62 deletions(-) diff --git a/be/src/service/client-request-state-map.h b/be/src/service/client-request-state-map.h new file mode 100644 index 0000000..20c5ccc --- /dev/null +++ b/be/src/service/client-request-state-map.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "gutil/strings/substitute.h" +#include "util/sharded-query-map-util.h" + +namespace impala { + +class ClientRequestState; + +/// A ShardedQueryMap for ClientRequestStates. Maps a query_id to its corresponding +/// ClientRequestState. Provides helper methods to easily add and delete +/// ClientRequestStates from a ShardedQueryMap. The ClientRequestStates are non-const, so +/// users of this class need to synchronize access to the ClientRequestStates either by +/// creating a ScopedShardedMapRef or by locking on the ClientRequestState::lock(). +class ClientRequestStateMap + : public ShardedQueryMap<std::shared_ptr<ClientRequestState>> { + public: + /// Adds the given (query_id, request_state) pair to the map. Returns an error Status + /// if the query id already exists in the map. + Status AddClientRequestState( + const TUniqueId& query_id, std::shared_ptr<ClientRequestState> request_state) { + ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id, this); + DCHECK(map_ref.get() != nullptr); + + auto entry = map_ref->find(query_id); + if (entry != map_ref->end()) { + // There shouldn't be an active query with that same id. + // (query_id is globally unique) + return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, + strings::Substitute("query id $0 already exists", PrintId(query_id)))); + } + map_ref->insert(make_pair(query_id, request_state)); + return Status::OK(); + } + + /// Deletes the specified (query_id, request_state) pair from the map and sets the given + /// request_state pointer to the ClientRequestState associated with the given query_id. + /// If request_state == nullptr, it is not set. Returns an error Status if the query_id + /// cannot be found in the map. + Status DeleteClientRequestState(const TUniqueId& query_id, + std::shared_ptr<ClientRequestState>* request_state = nullptr) { + ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id, this); + DCHECK(map_ref.get() != nullptr); + auto entry = map_ref->find(query_id); + if (entry == map_ref->end()) { + string error_msg = + strings::Substitute("Invalid or unknown query handle $0", PrintId(query_id)); + VLOG(1) << error_msg; + return Status::Expected(error_msg); + } else if (request_state != nullptr) { + *request_state = entry->second; + } + map_ref->erase(entry); + return Status::OK(); + } +}; +} diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 13bb433..8619060 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -177,9 +177,8 @@ void ClientRequestState::SetFrontendProfile(TRuntimeProfileNode profile) { frontend_profile_->Update(prof_tree); } -Status ClientRequestState::Exec(TExecRequest* exec_request) { +Status ClientRequestState::Exec() { MarkActive(); - exec_request_ = *exec_request; profile_->AddChild(server_profile_); summary_profile_->AddInfoString("Query Type", PrintThriftEnum(stmt_type())); @@ -188,7 +187,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) { summary_profile_->AddInfoString("Query Options (set by configuration and planner)", DebugQueryOptions(exec_request_.query_options)); - switch (exec_request->stmt_type) { + switch (exec_request_.stmt_type) { case TStmtType::QUERY: case TStmtType::DML: DCHECK(exec_request_.__isset.query_exec_request); @@ -1115,19 +1114,19 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) { } Status ClientRequestState::UpdateCatalog() { - if (!exec_request().__isset.query_exec_request || - exec_request().query_exec_request.stmt_type != TStmtType::DML) { + if (!exec_request_.__isset.query_exec_request || + exec_request_.query_exec_request.stmt_type != TStmtType::DML) { return Status::OK(); } query_events_->MarkEvent("DML data written"); SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer")); - TQueryExecRequest query_exec_request = exec_request().query_exec_request; + TQueryExecRequest query_exec_request = exec_request_.query_exec_request; if (query_exec_request.__isset.finalize_params) { const TFinalizeParams& finalize_params = query_exec_request.finalize_params; TUpdateCatalogRequest catalog_update; - catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl); + catalog_update.__set_sync_ddl(exec_request_.query_options.sync_ddl); catalog_update.__set_header(TCatalogServiceRequestHeader()); catalog_update.header.__set_requesting_user(effective_user()); catalog_update.header.__set_client_ip(session()->network_address.hostname); @@ -1385,6 +1384,11 @@ bool ClientRequestState::GetDmlStats(TDmlResult* dml_result, Status* query_statu return true; } +Status ClientRequestState::InitExecRequest(const TQueryCtx& query_ctx) { + return UpdateQueryStatus( + exec_env_->frontend()->GetExecRequest(query_ctx, &exec_request_)); +} + void ClientRequestState::UpdateEndTime() { // Update the query's end time only if it isn't set previously. if (end_time_us_.CompareAndSwap(0, UnixMicros())) { @@ -1459,7 +1463,7 @@ void ClientRequestState::LogQueryEvents() { } Status ClientRequestState::LogAuditRecord(const Status& query_status) { - const TExecRequest& request = exec_request(); + const TExecRequest& request = exec_request_; stringstream ss; rapidjson::StringBuffer buffer; rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); @@ -1535,7 +1539,7 @@ Status ClientRequestState::LogAuditRecord(const Status& query_status) { } Status ClientRequestState::LogLineageRecord() { - const TExecRequest& request = exec_request(); + const TExecRequest& request = exec_request_; if (request.stmt_type == TStmtType::EXPLAIN || (!request.__isset.query_exec_request && !request.__isset.catalog_op_request)) { return Status::OK(); diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 97ea18c..57f7a03 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -50,7 +50,7 @@ class TRuntimeProfileTree; class TupleRow; enum class AdmissionOutcome; -/// Execution state of the client-facing side a query. This captures everything +/// Execution state of the client-facing side of a query. This captures everything /// necessary to convert row batches received by the coordinator into results /// we can return to the client. It also captures all state required for /// servicing query-related requests from the client. @@ -73,12 +73,13 @@ class ClientRequestState { /// which then sets the frontend profile. void SetFrontendProfile(TRuntimeProfileNode profile); - /// Based on query type, this either initiates execution of a exec_request or submits - /// the query to the Admission controller for asynchronous admission control. When this - /// returns the operation state is either RUNNING_STATE or PENDING_STATE. + /// Based on query type, this either initiates execution of this ClientRequestState's + /// TExecRequest or submits the query to the Admission controller for asynchronous + /// admission control. When this returns the operation state is either RUNNING_STATE or + /// PENDING_STATE. /// Non-blocking. /// Must *not* be called with lock_ held. - Status Exec(TExecRequest* exec_request) WARN_UNUSED_RESULT; + Status Exec() WARN_UNUSED_RESULT; /// Execute a HiveServer2 metadata operation /// TODO: This is likely a superset of GetTableNames/GetDbs. Coalesce these different @@ -184,6 +185,13 @@ class ClientRequestState { /// Caller must not hold 'lock()'. bool GetDmlStats(TDmlResult* dml_result, Status* query_status); + /// Creates and sets the TExecRequest for the query associated with this + /// ClientRequestState. The TExecRequest is created by the Impala frontend via the + /// method Frontend::GetExecRequest(TQueryCtx, TExecRequest). The TQueryCtx is created + /// by the ImpalaServer and contains the full query string + /// (TQueryCtx::TClientRequest::stmt). + Status InitExecRequest(const TQueryCtx& query_ctx); + ImpalaServer::SessionState* session() const { return session_.get(); } /// Queries are run and authorized on behalf of the effective_user. @@ -219,7 +227,12 @@ class ClientRequestState { bool returns_result_set() { return !result_metadata_.columns.empty(); } const TResultSetMetadata* result_metadata() const { return &result_metadata_; } const TUniqueId& query_id() const { return query_ctx_.query_id; } - const TExecRequest& exec_request() const { return exec_request_; } + /// Returns the TExecRequest for the query associated with this ClientRequestState. + /// Contents are only valid after InitExecRequest(TQueryCtx) initializes the + /// TExecRequest. + const TExecRequest& exec_request() const { + return exec_request_; + } TStmtType::type stmt_type() const { return exec_request_.stmt_type; } TCatalogOpType::type catalog_op_type() const { return exec_request_.catalog_op_request.op_type; @@ -452,7 +465,13 @@ protected: apache::hive::service::cli::thrift::TOperationState::type operation_state_ = apache::hive::service::cli::thrift::TOperationState::INITIALIZED_STATE; + /// The current status of the query tracked by this ClientRequestState. Updated by + /// UpdateQueryStatus(Status). Status query_status_; + + /// The TExecRequest for the query tracked by this ClientRequestState. The TExecRequest + /// is initialized in InitExecRequest(TQueryCtx). It should not be used until + /// InitExecRequest(TQueryCtx) has been called. TExecRequest exec_request_; /// If true, effective_user() has access to the runtime profile and execution diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index c79d708..76219be 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -907,12 +907,11 @@ Status ImpalaServer::ExecuteInternal( DCHECK(session_state != nullptr); *registered_request_state = false; - request_state->reset(new ClientRequestState(query_ctx, exec_env_, exec_env_->frontend(), - this, session_state)); + request_state->reset(new ClientRequestState( + query_ctx, exec_env_, exec_env_->frontend(), this, session_state)); (*request_state)->query_events()->MarkEvent("Query submitted"); - TExecRequest result; { // Keep a lock on request_state so that registration and setting // result_metadata are atomic. @@ -940,9 +939,11 @@ Status ImpalaServer::ExecuteInternal( statement_length, max_statement_length)); } - RETURN_IF_ERROR((*request_state)->UpdateQueryStatus( - exec_env_->frontend()->GetExecRequest(query_ctx, &result))); + // Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest for + // this query. + RETURN_IF_ERROR((*request_state)->InitExecRequest(query_ctx)); + const TExecRequest& result = (*request_state)->exec_request(); (*request_state)->query_events()->MarkEvent("Planning finished"); (*request_state)->set_user_profile_access(result.user_has_profile_access); (*request_state)->summary_profile()->AddEventSequence( @@ -952,10 +953,11 @@ Status ImpalaServer::ExecuteInternal( (*request_state)->set_result_metadata(result.result_set_metadata); } } - VLOG(2) << "Execution request: " << ThriftDebugString(result); + VLOG(2) << "Execution request: " + << ThriftDebugString((*request_state)->exec_request()); // start execution of query; also starts fragment status reports - RETURN_IF_ERROR((*request_state)->Exec(&result)); + RETURN_IF_ERROR((*request_state)->Exec()); Status status = UpdateCatalogMetrics(); if (!status.ok()) { VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail(); @@ -1030,22 +1032,10 @@ Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state, return Status::Expected("Session has been closed, ignoring query."); } const TUniqueId& query_id = request_state->query_id(); - { - DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server()); - ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id, - &client_request_state_map_); - DCHECK(map_ref.get() != nullptr); - - auto entry = map_ref->find(query_id); - if (entry != map_ref->end()) { - // There shouldn't be an active query with that same id. - // (query_id is globally unique) - stringstream ss; - ss << "query id " << PrintId(query_id) << " already exists"; - return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, ss.str())); - } - map_ref->insert(make_pair(query_id, request_state)); - } + DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server()); + RETURN_IF_ERROR( + client_request_state_map_.AddClientRequestState(query_id, request_state)); + // Metric is decremented in UnregisterQuery(). ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(1L); VLOG_QUERY << "Registered query query_id=" << PrintId(query_id) @@ -1128,26 +1118,23 @@ void ImpalaServer::UpdateExecSummary( Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_inflight, const Status* cause) { VLOG_QUERY << "UnregisterQuery(): query_id=" << PrintId(query_id); - + // Cancel the query. RETURN_IF_ERROR(CancelInternal(query_id, check_inflight, cause)); + // Delete it from the client_request_state_map_ and from the http_handler_. + DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server()); shared_ptr<ClientRequestState> request_state; - { - DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server()); - ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id, - &client_request_state_map_); - DCHECK(map_ref.get() != nullptr); - - auto entry = map_ref->find(query_id); - if (entry == map_ref->end()) { - VLOG(1) << "Invalid or unknown query handle " << PrintId(query_id); - return Status::Expected("Invalid or unknown query handle"); - } else { - request_state = entry->second; - } - map_ref->erase(entry); - } + RETURN_IF_ERROR( + client_request_state_map_.DeleteClientRequestState(query_id, &request_state)); + + // Close and delete the ClientRequestState. + RETURN_IF_ERROR(CloseClientRequestState(request_state)); + + return Status::OK(); +} +Status ImpalaServer::CloseClientRequestState( + const std::shared_ptr<ClientRequestState>& request_state) { request_state->Done(); int64_t duration_us = request_state->end_time_us() - request_state->start_time_us(); @@ -1163,7 +1150,7 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli } { lock_guard<mutex> l(request_state->session()->lock); - request_state->session()->inflight_queries.erase(query_id); + request_state->session()->inflight_queries.erase(request_state->query_id()); } if (request_state->GetCoordinator() != nullptr) { diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index a05bddc..7e78532 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -39,12 +39,12 @@ #include "runtime/timestamp-value.h" #include "runtime/types.h" #include "scheduling/query-schedule.h" +#include "service/client-request-state-map.h" #include "service/query-options.h" #include "statestore/statestore-subscriber.h" #include "util/condition-variable.h" #include "util/container-util.h" #include "util/runtime-profile.h" -#include "util/sharded-query-map-util.h" #include "util/simple-logger.h" #include "util/thread-pool.h" #include "util/time.h" @@ -696,12 +696,19 @@ class ImpalaServer : public ImpalaServiceIf, /// Unregister the query by cancelling it, removing exec_state from /// client_request_state_map_, and removing the query id from session state's - /// in-flight query list. If check_inflight is true, then return an error if the query - /// is not yet in-flight. Otherwise, proceed even if the query isn't yet in-flight (for + /// in-flight query list. If check_inflight is true, then return an error if the query + /// is not yet in-flight. Otherwise, proceed even if the query isn't yet in-flight (for /// cleaning up after an error on the query issuing path). Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight, const Status* cause = NULL) WARN_UNUSED_RESULT; + /// Performs any final cleanup necessary before the given ClientRequestState goes out + /// of scope and is deleted. Marks the given ClientRequestState as done, removes the + /// query from the inflight queries list, updates query_locations_, and archives the + /// query. Used when unregistering the query. + Status CloseClientRequestState( + const std::shared_ptr<ClientRequestState>& request_state); + /// Initiates query cancellation reporting the given cause as the query status. /// Assumes deliberate cancellation by the user if the cause is NULL. Returns an /// error if query_id is not found. If check_inflight is true, then return an error @@ -1080,10 +1087,9 @@ class ImpalaServer : public ImpalaServiceIf, /// Thread that runs UnresponsiveBackendThread(). std::unique_ptr<Thread> unresponsive_backend_thread_; - /// maps from query id to exec state; ClientRequestState is owned by us and referenced - /// as a shared_ptr to allow asynchronous deletion - typedef class ShardedQueryMap<std::shared_ptr<ClientRequestState>> - ClientRequestStateMap; + /// A ClientRequestStateMap maps query ids to ClientRequestStates. The + /// ClientRequestStates are owned by the ImpalaServer and ClientRequestStateMap + /// references them using shared_ptr to allow asynchronous deletion. ClientRequestStateMap client_request_state_map_; /// Default query options in the form of TQueryOptions and beeswax::ConfigVariable
