This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 85425b81f04c856d7d5ec375242303f78ec7964e
Author: Sahil Takiar <[email protected]>
AuthorDate: Wed Nov 13 21:13:53 2019 -0800

    IMPALA-9124: ImpalaServer and ClientRequestState refactoring
    
    Re-factoring several areas of the ImpalaServer and ClientRequestState
    in preparation for future work required for transparent query retries.
    
    Re-factored TExecRequest ownership in ClientRequestState. Previously,
    the TExecRequest was passed to the ClientRequestState via the
    Exec(TExecRequest) method. Now it is created and owned by the
    ClientRequestState and only exposed as a constant reference. It is
    stored as a plain member variable.
    
    Re-factored ImpalaServer::UnregisterQuery into multiple methods. Made
    ClientRequestStateMap a dedicated class that extends ShardedQueryMap
    with additional methods to add and delete a ClientRequestState. The
    re-factoring breaks up the large UnregisterQuery method into multiple
    smaller method and adds some additional code documentation.
    
    Re-factored ImpalaServer::client_request_state_map_ into a separate
    class.
    
    Testing:
    * Ran core tests
    
    Change-Id: Ib92c932a69802225af3fd9bf15f85c85317acaca
    Reviewed-on: http://gerrit.cloudera.org:8080/14755
    Reviewed-by: Sahil Takiar <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/client-request-state-map.h | 74 +++++++++++++++++++++++++++++++
 be/src/service/client-request-state.cc    | 22 +++++----
 be/src/service/client-request-state.h     | 31 ++++++++++---
 be/src/service/impala-server.cc           | 67 +++++++++++-----------------
 be/src/service/impala-server.h            | 20 ++++++---
 5 files changed, 152 insertions(+), 62 deletions(-)

diff --git a/be/src/service/client-request-state-map.h 
b/be/src/service/client-request-state-map.h
new file mode 100644
index 0000000..20c5ccc
--- /dev/null
+++ b/be/src/service/client-request-state-map.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "gutil/strings/substitute.h"
+#include "util/sharded-query-map-util.h"
+
+namespace impala {
+
+class ClientRequestState;
+
+/// A ShardedQueryMap for ClientRequestStates. Maps a query_id to its 
corresponding
+/// ClientRequestState. Provides helper methods to easily add and delete
+/// ClientRequestStates from a ShardedQueryMap. The ClientRequestStates are 
non-const, so
+/// users of this class need to synchronize access to the ClientRequestStates 
either by
+/// creating a ScopedShardedMapRef or by locking on the 
ClientRequestState::lock().
+class ClientRequestStateMap
+    : public ShardedQueryMap<std::shared_ptr<ClientRequestState>> {
+ public:
+  /// Adds the given (query_id, request_state) pair to the map. Returns an 
error Status
+  /// if the query id already exists in the map.
+  Status AddClientRequestState(
+      const TUniqueId& query_id, std::shared_ptr<ClientRequestState> 
request_state) {
+    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id, 
this);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto entry = map_ref->find(query_id);
+    if (entry != map_ref->end()) {
+      // There shouldn't be an active query with that same id.
+      // (query_id is globally unique)
+      return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
+          strings::Substitute("query id $0 already exists", 
PrintId(query_id))));
+    }
+    map_ref->insert(make_pair(query_id, request_state));
+    return Status::OK();
+  }
+
+  /// Deletes the specified (query_id, request_state) pair from the map and 
sets the given
+  /// request_state pointer to the ClientRequestState associated with the 
given query_id.
+  /// If request_state == nullptr, it is not set. Returns an error Status if 
the query_id
+  /// cannot be found in the map.
+  Status DeleteClientRequestState(const TUniqueId& query_id,
+      std::shared_ptr<ClientRequestState>* request_state = nullptr) {
+    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id, 
this);
+    DCHECK(map_ref.get() != nullptr);
+    auto entry = map_ref->find(query_id);
+    if (entry == map_ref->end()) {
+      string error_msg =
+          strings::Substitute("Invalid or unknown query handle $0", 
PrintId(query_id));
+      VLOG(1) << error_msg;
+      return Status::Expected(error_msg);
+    } else if (request_state != nullptr) {
+      *request_state = entry->second;
+    }
+    map_ref->erase(entry);
+    return Status::OK();
+  }
+};
+}
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 13bb433..8619060 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -177,9 +177,8 @@ void 
ClientRequestState::SetFrontendProfile(TRuntimeProfileNode profile) {
   frontend_profile_->Update(prof_tree);
 }
 
-Status ClientRequestState::Exec(TExecRequest* exec_request) {
+Status ClientRequestState::Exec() {
   MarkActive();
-  exec_request_ = *exec_request;
 
   profile_->AddChild(server_profile_);
   summary_profile_->AddInfoString("Query Type", PrintThriftEnum(stmt_type()));
@@ -188,7 +187,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) 
{
   summary_profile_->AddInfoString("Query Options (set by configuration and 
planner)",
       DebugQueryOptions(exec_request_.query_options));
 
-  switch (exec_request->stmt_type) {
+  switch (exec_request_.stmt_type) {
     case TStmtType::QUERY:
     case TStmtType::DML:
       DCHECK(exec_request_.__isset.query_exec_request);
@@ -1115,19 +1114,19 @@ Status ClientRequestState::Cancel(bool check_inflight, 
const Status* cause) {
 }
 
 Status ClientRequestState::UpdateCatalog() {
-  if (!exec_request().__isset.query_exec_request ||
-      exec_request().query_exec_request.stmt_type != TStmtType::DML) {
+  if (!exec_request_.__isset.query_exec_request ||
+      exec_request_.query_exec_request.stmt_type != TStmtType::DML) {
     return Status::OK();
   }
 
   query_events_->MarkEvent("DML data written");
   SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer"));
 
-  TQueryExecRequest query_exec_request = exec_request().query_exec_request;
+  TQueryExecRequest query_exec_request = exec_request_.query_exec_request;
   if (query_exec_request.__isset.finalize_params) {
     const TFinalizeParams& finalize_params = 
query_exec_request.finalize_params;
     TUpdateCatalogRequest catalog_update;
-    catalog_update.__set_sync_ddl(exec_request().query_options.sync_ddl);
+    catalog_update.__set_sync_ddl(exec_request_.query_options.sync_ddl);
     catalog_update.__set_header(TCatalogServiceRequestHeader());
     catalog_update.header.__set_requesting_user(effective_user());
     catalog_update.header.__set_client_ip(session()->network_address.hostname);
@@ -1385,6 +1384,11 @@ bool ClientRequestState::GetDmlStats(TDmlResult* 
dml_result, Status* query_statu
   return true;
 }
 
+Status ClientRequestState::InitExecRequest(const TQueryCtx& query_ctx) {
+  return UpdateQueryStatus(
+      exec_env_->frontend()->GetExecRequest(query_ctx, &exec_request_));
+}
+
 void ClientRequestState::UpdateEndTime() {
   // Update the query's end time only if it isn't set previously.
   if (end_time_us_.CompareAndSwap(0, UnixMicros())) {
@@ -1459,7 +1463,7 @@ void ClientRequestState::LogQueryEvents() {
 }
 
 Status ClientRequestState::LogAuditRecord(const Status& query_status) {
-  const TExecRequest& request = exec_request();
+  const TExecRequest& request = exec_request_;
   stringstream ss;
   rapidjson::StringBuffer buffer;
   rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
@@ -1535,7 +1539,7 @@ Status ClientRequestState::LogAuditRecord(const Status& 
query_status) {
 }
 
 Status ClientRequestState::LogLineageRecord() {
-  const TExecRequest& request = exec_request();
+  const TExecRequest& request = exec_request_;
   if (request.stmt_type == TStmtType::EXPLAIN || 
(!request.__isset.query_exec_request &&
       !request.__isset.catalog_op_request)) {
     return Status::OK();
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index 97ea18c..57f7a03 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -50,7 +50,7 @@ class TRuntimeProfileTree;
 class TupleRow;
 enum class AdmissionOutcome;
 
-/// Execution state of the client-facing side a query. This captures everything
+/// Execution state of the client-facing side of a query. This captures 
everything
 /// necessary to convert row batches received by the coordinator into results
 /// we can return to the client. It also captures all state required for
 /// servicing query-related requests from the client.
@@ -73,12 +73,13 @@ class ClientRequestState {
   /// which then sets the frontend profile.
   void SetFrontendProfile(TRuntimeProfileNode profile);
 
-  /// Based on query type, this either initiates execution of a exec_request 
or submits
-  /// the query to the Admission controller for asynchronous admission 
control. When this
-  /// returns the operation state is either RUNNING_STATE or PENDING_STATE.
+  /// Based on query type, this either initiates execution of this 
ClientRequestState's
+  /// TExecRequest or submits the query to the Admission controller for 
asynchronous
+  /// admission control. When this returns the operation state is either 
RUNNING_STATE or
+  /// PENDING_STATE.
   /// Non-blocking.
   /// Must *not* be called with lock_ held.
-  Status Exec(TExecRequest* exec_request) WARN_UNUSED_RESULT;
+  Status Exec() WARN_UNUSED_RESULT;
 
   /// Execute a HiveServer2 metadata operation
   /// TODO: This is likely a superset of GetTableNames/GetDbs. Coalesce these 
different
@@ -184,6 +185,13 @@ class ClientRequestState {
   /// Caller must not hold 'lock()'.
   bool GetDmlStats(TDmlResult* dml_result, Status* query_status);
 
+  /// Creates and sets the TExecRequest for the query associated with this
+  /// ClientRequestState. The TExecRequest is created by the Impala frontend 
via the
+  /// method Frontend::GetExecRequest(TQueryCtx, TExecRequest). The TQueryCtx 
is created
+  /// by the ImpalaServer and contains the full query string
+  /// (TQueryCtx::TClientRequest::stmt).
+  Status InitExecRequest(const TQueryCtx& query_ctx);
+
   ImpalaServer::SessionState* session() const { return session_.get(); }
 
   /// Queries are run and authorized on behalf of the effective_user.
@@ -219,7 +227,12 @@ class ClientRequestState {
   bool returns_result_set() { return !result_metadata_.columns.empty(); }
   const TResultSetMetadata* result_metadata() const { return 
&result_metadata_; }
   const TUniqueId& query_id() const { return query_ctx_.query_id; }
-  const TExecRequest& exec_request() const { return exec_request_; }
+  /// Returns the TExecRequest for the query associated with this 
ClientRequestState.
+  /// Contents are only valid after InitExecRequest(TQueryCtx) initializes the
+  /// TExecRequest.
+  const TExecRequest& exec_request() const {
+    return exec_request_;
+  }
   TStmtType::type stmt_type() const { return exec_request_.stmt_type; }
   TCatalogOpType::type catalog_op_type() const {
     return exec_request_.catalog_op_request.op_type;
@@ -452,7 +465,13 @@ protected:
   apache::hive::service::cli::thrift::TOperationState::type operation_state_ =
       apache::hive::service::cli::thrift::TOperationState::INITIALIZED_STATE;
 
+  /// The current status of the query tracked by this ClientRequestState. 
Updated by
+  /// UpdateQueryStatus(Status).
   Status query_status_;
+
+  /// The TExecRequest for the query tracked by this ClientRequestState. The 
TExecRequest
+  /// is initialized in InitExecRequest(TQueryCtx). It should not be used until
+  /// InitExecRequest(TQueryCtx) has been called.
   TExecRequest exec_request_;
 
   /// If true, effective_user() has access to the runtime profile and execution
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index c79d708..76219be 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -907,12 +907,11 @@ Status ImpalaServer::ExecuteInternal(
   DCHECK(session_state != nullptr);
   *registered_request_state = false;
 
-  request_state->reset(new ClientRequestState(query_ctx, exec_env_, 
exec_env_->frontend(),
-      this, session_state));
+  request_state->reset(new ClientRequestState(
+      query_ctx, exec_env_, exec_env_->frontend(), this, session_state));
 
   (*request_state)->query_events()->MarkEvent("Query submitted");
 
-  TExecRequest result;
   {
     // Keep a lock on request_state so that registration and setting
     // result_metadata are atomic.
@@ -940,9 +939,11 @@ Status ImpalaServer::ExecuteInternal(
           statement_length, max_statement_length));
     }
 
-    RETURN_IF_ERROR((*request_state)->UpdateQueryStatus(
-        exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
+    // Takes the TQueryCtx and calls into the frontend to initialize the 
TExecRequest for
+    // this query.
+    RETURN_IF_ERROR((*request_state)->InitExecRequest(query_ctx));
 
+    const TExecRequest& result = (*request_state)->exec_request();
     (*request_state)->query_events()->MarkEvent("Planning finished");
     (*request_state)->set_user_profile_access(result.user_has_profile_access);
     (*request_state)->summary_profile()->AddEventSequence(
@@ -952,10 +953,11 @@ Status ImpalaServer::ExecuteInternal(
       (*request_state)->set_result_metadata(result.result_set_metadata);
     }
   }
-  VLOG(2) << "Execution request: " << ThriftDebugString(result);
+  VLOG(2) << "Execution request: "
+          << ThriftDebugString((*request_state)->exec_request());
 
   // start execution of query; also starts fragment status reports
-  RETURN_IF_ERROR((*request_state)->Exec(&result));
+  RETURN_IF_ERROR((*request_state)->Exec());
   Status status = UpdateCatalogMetrics();
   if (!status.ok()) {
     VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
@@ -1030,22 +1032,10 @@ Status 
ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
     return Status::Expected("Session has been closed, ignoring query.");
   }
   const TUniqueId& query_id = request_state->query_id();
-  {
-    DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
-    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
-        &client_request_state_map_);
-    DCHECK(map_ref.get() != nullptr);
-
-    auto entry = map_ref->find(query_id);
-    if (entry != map_ref->end()) {
-      // There shouldn't be an active query with that same id.
-      // (query_id is globally unique)
-      stringstream ss;
-      ss << "query id " << PrintId(query_id) << " already exists";
-      return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, ss.str()));
-    }
-    map_ref->insert(make_pair(query_id, request_state));
-  }
+  DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
+  RETURN_IF_ERROR(
+      client_request_state_map_.AddClientRequestState(query_id, 
request_state));
+
   // Metric is decremented in UnregisterQuery().
   ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(1L);
   VLOG_QUERY << "Registered query query_id=" << PrintId(query_id)
@@ -1128,26 +1118,23 @@ void ImpalaServer::UpdateExecSummary(
 Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool 
check_inflight,
     const Status* cause) {
   VLOG_QUERY << "UnregisterQuery(): query_id=" << PrintId(query_id);
-
+  // Cancel the query.
   RETURN_IF_ERROR(CancelInternal(query_id, check_inflight, cause));
 
+  // Delete it from the client_request_state_map_ and from the http_handler_.
+  DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
   shared_ptr<ClientRequestState> request_state;
-  {
-    DCHECK_EQ(this, ExecEnv::GetInstance()->impala_server());
-    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
-        &client_request_state_map_);
-    DCHECK(map_ref.get() != nullptr);
-
-    auto entry = map_ref->find(query_id);
-    if (entry == map_ref->end()) {
-      VLOG(1) << "Invalid or unknown query handle " << PrintId(query_id);
-      return Status::Expected("Invalid or unknown query handle");
-    } else {
-      request_state = entry->second;
-    }
-    map_ref->erase(entry);
-  }
+  RETURN_IF_ERROR(
+      client_request_state_map_.DeleteClientRequestState(query_id, 
&request_state));
+
+  // Close and delete the ClientRequestState.
+  RETURN_IF_ERROR(CloseClientRequestState(request_state));
+
+  return Status::OK();
+}
 
+Status ImpalaServer::CloseClientRequestState(
+    const std::shared_ptr<ClientRequestState>& request_state) {
   request_state->Done();
 
   int64_t duration_us = request_state->end_time_us() - 
request_state->start_time_us();
@@ -1163,7 +1150,7 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& 
query_id, bool check_infli
   }
   {
     lock_guard<mutex> l(request_state->session()->lock);
-    request_state->session()->inflight_queries.erase(query_id);
+    
request_state->session()->inflight_queries.erase(request_state->query_id());
   }
 
   if (request_state->GetCoordinator() != nullptr) {
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index a05bddc..7e78532 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -39,12 +39,12 @@
 #include "runtime/timestamp-value.h"
 #include "runtime/types.h"
 #include "scheduling/query-schedule.h"
+#include "service/client-request-state-map.h"
 #include "service/query-options.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/condition-variable.h"
 #include "util/container-util.h"
 #include "util/runtime-profile.h"
-#include "util/sharded-query-map-util.h"
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
 #include "util/time.h"
@@ -696,12 +696,19 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Unregister the query by cancelling it, removing exec_state from
   /// client_request_state_map_, and removing the query id from session state's
-  /// in-flight query list.  If check_inflight is true, then return an error 
if the query
-  /// is not yet in-flight.  Otherwise, proceed even if the query isn't yet 
in-flight (for
+  /// in-flight query list. If check_inflight is true, then return an error if 
the query
+  /// is not yet in-flight. Otherwise, proceed even if the query isn't yet 
in-flight (for
   /// cleaning up after an error on the query issuing path).
   Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
       const Status* cause = NULL) WARN_UNUSED_RESULT;
 
+  /// Performs any final cleanup necessary before the given ClientRequestState 
goes out
+  /// of scope and is deleted. Marks the given ClientRequestState as done, 
removes the
+  /// query from the inflight queries list, updates query_locations_, and 
archives the
+  /// query. Used when unregistering the query.
+  Status CloseClientRequestState(
+      const std::shared_ptr<ClientRequestState>& request_state);
+
   /// Initiates query cancellation reporting the given cause as the query 
status.
   /// Assumes deliberate cancellation by the user if the cause is NULL.  
Returns an
   /// error if query_id is not found.  If check_inflight is true, then return 
an error
@@ -1080,10 +1087,9 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Thread that runs UnresponsiveBackendThread().
   std::unique_ptr<Thread> unresponsive_backend_thread_;
 
-  /// maps from query id to exec state; ClientRequestState is owned by us and 
referenced
-  /// as a shared_ptr to allow asynchronous deletion
-  typedef class ShardedQueryMap<std::shared_ptr<ClientRequestState>>
-      ClientRequestStateMap;
+  /// A ClientRequestStateMap maps query ids to ClientRequestStates. The
+  /// ClientRequestStates are owned by the ImpalaServer and 
ClientRequestStateMap
+  /// references them using shared_ptr to allow asynchronous deletion.
   ClientRequestStateMap client_request_state_map_;
 
   /// Default query options in the form of TQueryOptions and 
beeswax::ConfigVariable

Reply via email to