This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1f9db9e05b2766d35c186e8e8b4e25b89f7d5793
Author: Andrew Sherman <[email protected]>
AuthorDate: Fri Mar 8 13:47:03 2024 -0800

    IMPALA-12264: Add limit on number of HS2 sessions per user.
    
    Add a new flag --max_hs2_sessions_per_user which sets a limit on the
    number of Hiveserver2 sessions that can be concurrently opened by a
    user on a coordinator. By default this value is -1, which disables
    the new feature. An attempt to open more sessions than the new flag
    value results in an error. This is implemented by maintaining a map of
    users to a count of HS2 sessions.
    
    If the per-user HS2 session counts are being limited in this way,
    then the per-user counts are visible on the /sessions page of the
    webui.
    
    Add a new test case in test_session_expiration.py.
    
    Change-Id: Idd28edc352102d89774f6ece5376e7c79ae41aa8
    Reviewed-on: http://gerrit.cloudera.org:8080/21128
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/impala-hs2-server.cc             | 62 +++++++++++++++++++++++--
 be/src/service/impala-http-handler.cc           | 30 ++++++++++--
 be/src/service/impala-http-handler.h            |  3 ++
 be/src/service/impala-server.cc                 | 30 ++++--------
 be/src/service/impala-server.h                  | 20 ++++++++
 tests/custom_cluster/test_session_expiration.py | 37 ++++++++++++++-
 www/sessions.tmpl                               | 32 +++++++++++++
 7 files changed, 184 insertions(+), 30 deletions(-)

diff --git a/be/src/service/impala-hs2-server.cc 
b/be/src/service/impala-hs2-server.cc
index 00a93d113..9392e1a02 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -20,23 +20,18 @@
 #include "service/impala-server.inline.h"
 
 #include <algorithm>
-#include <type_traits>
 
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/join.hpp>
 #include <boost/bind.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
 #include <boost/unordered_set.hpp>
-#include <jni.h>
 #include <gperftools/heap-profiler.h>
 #include <gperftools/malloc_extension.h>
 #include <gtest/gtest.h>
 #include <gutil/strings/substitute.h>
-#include <thrift/protocol/TDebugProtocol.h>
 
 #include "common/logging.h"
 #include "common/version.h"
-#include "rpc/thrift-util.h"
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/raw-value.h"
@@ -96,6 +91,7 @@ DECLARE_string(hostname);
 DECLARE_int32(webserver_port);
 DECLARE_int32(idle_session_timeout);
 DECLARE_int32(disconnected_session_timeout);
+DECLARE_int32(max_hs2_sessions_per_user);
 DECLARE_bool(ping_expose_webserver_url);
 DECLARE_string(anonymous_user_name);
 
@@ -422,6 +418,11 @@ void ImpalaServer::OpenSession(TOpenSessionResp& 
return_val,
     }
   }
 
+  Status status = IncrementAndCheckSessionCount(state->connected_user);
+  HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
+
+  // At this point all checks are complete, store session status information.
+
   RegisterSessionTimeout(state->session_timeout);
   TQueryOptionsToMap(state->QueryOptions(), &return_val.configuration);
 
@@ -454,6 +455,57 @@ void ImpalaServer::OpenSession(TOpenSessionResp& 
return_val,
              << "<" << TNetworkAddressToString(state->network_address) << ">.";
 }
 
+void ImpalaServer::DecrementCount(
+    std::map<std::string, int64>& loads, const std::string& key) {
+  // Check if key is present as dereferencing the map will insert it.
+  // FIXME C++20: use contains().
+  if (!loads.count(key)) {
+    string msg = Substitute("Missing key $0 when decrementing count", key);
+    LOG(WARNING) << msg;
+    DCHECK(false) << msg;
+    return;
+  }
+  int64& current_value = loads[key];
+  if (current_value == 1) {
+    // Remove the entry from the map if the current_value will go to zero.
+    loads.erase(key);
+    return;
+  }
+  if (current_value < 1) {
+    // Don't allow decrement below zero.
+    string msg = Substitute("Attempt to decrement below zero with key $0 ", 
key);
+    LOG(WARNING) << msg;
+    return;
+  }
+  loads[key]--;
+}
+
+void ImpalaServer::DecrementSessionCount(const string& user_name) {
+  if (FLAGS_max_hs2_sessions_per_user > 0) {
+    lock_guard<mutex> l(per_user_session_count_lock_);
+    DecrementCount(per_user_session_count_map_, user_name);
+  }
+}
+
+Status ImpalaServer::IncrementAndCheckSessionCount(const string& user_name) {
+  if (FLAGS_max_hs2_sessions_per_user > 0) {
+    lock_guard<mutex> l(per_user_session_count_lock_);
+    // Only check user limit if there is already a session for the user.
+    if (per_user_session_count_map_.count(user_name)) {
+      int64 load = per_user_session_count_map_[user_name];
+      if (load >= FLAGS_max_hs2_sessions_per_user) {
+        const string& err_msg =
+            Substitute("Number of sessions for user $0 exceeds coordinator 
limit $1",
+                user_name, FLAGS_max_hs2_sessions_per_user);
+        VLOG(1) << err_msg;
+        return Status::Expected(err_msg);
+      }
+    }
+    per_user_session_count_map_[user_name]++;
+  }
+  return Status::OK();
+}
+
 void ImpalaServer::CloseSession(TCloseSessionResp& return_val,
     const TCloseSessionReq& request) {
   VLOG_QUERY << "CloseSession(): request=" << RedactedDebugString(request);
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index 27aba2618..c2def1203 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -20,7 +20,6 @@
 #include <algorithm>
 #include <mutex>
 #include <sstream>
-#include <boost/lexical_cast.hpp>
 #include <boost/unordered_set.hpp>
 #include <rapidjson/prettywriter.h>
 #include <rapidjson/stringbuffer.h>
@@ -45,7 +44,6 @@
 #include "service/impala-server.h"
 #include "service/query-state-record.h"
 #include "thrift/protocol/TDebugProtocol.h"
-#include "util/coding-util.h"
 #include "util/debug-util.h"
 #include "util/logging-support.h"
 #include "util/pretty-printer.h"
@@ -711,6 +709,9 @@ void ImpalaHttpHandler::SessionsHandler(const 
Webserver::WebRequest& req,
 
   VLOG(1) << "Step3: Fill the connections information into the document.";
   FillConnectionsInfo(document, connection_contexts);
+
+  VLOG(1) << "Step4: Fill the hs2 users information into the document.";
+  FillUsersInfo(document);
 }
 
 void ImpalaHttpHandler::FillSessionsInfo(Document* document) {
@@ -776,6 +777,29 @@ void ImpalaHttpHandler::FillSessionsInfo(Document* 
document) {
       document->GetAllocator());
 }
 
+// Comparer that will sort the users array by the session_count field.
+bool SessionCountComparer(const Value& a, const Value& b) {
+  return a["session_count"].GetInt64() < b["session_count"].GetInt64();
+}
+
+void ImpalaHttpHandler::FillUsersInfo(Document* document) {
+  Value users(kArrayType);
+  {
+    lock_guard<mutex> l(server_->per_user_session_count_lock_);
+    for (auto const& it : server_->per_user_session_count_map_) {
+      const string& name = it.first;
+      const int64& session_count = it.second;
+      Value users_json(kObjectType);
+      Value user_name(name.c_str(), document->GetAllocator());
+      users_json.AddMember("user", user_name, document->GetAllocator());
+      users_json.AddMember("session_count", session_count, 
document->GetAllocator());
+      users.PushBack(users_json, document->GetAllocator());
+    }
+  }
+  sort(users.Begin(), users.End(), SessionCountComparer);
+  document->AddMember("users", users, document->GetAllocator());
+}
+
 void ImpalaHttpHandler::FillClientHostsInfo(
     Document* document, const ThriftServer::ConnectionContextList& 
connection_contexts) {
   lock_guard<mutex> session_state_map_l(server_->session_state_map_lock_);
@@ -937,7 +961,7 @@ void ImpalaHttpHandler::CatalogHandler(const 
Webserver::WebRequest& req,
     database.AddMember("name", str, document->GetAllocator());
 
     TGetTablesResult get_table_results;
-    Status status = server_->exec_env_->frontend()->GetTableNames(
+    status = server_->exec_env_->frontend()->GetTableNames(
         db.db_name, NULL, NULL, &get_table_results);
     if (!status.ok()) {
       Value error(status.GetDetail().c_str(), document->GetAllocator());
diff --git a/be/src/service/impala-http-handler.h 
b/be/src/service/impala-http-handler.h
index 968495ed0..e8ed8dbb5 100644
--- a/be/src/service/impala-http-handler.h
+++ b/be/src/service/impala-http-handler.h
@@ -285,6 +285,9 @@ class ImpalaHttpHandler {
   /// Fill the sessions information into the document.
   void FillSessionsInfo(rapidjson::Document* document);
 
+  /// Fill the hs2 users information into the document.
+  void FillUsersInfo(rapidjson::Document* document);
+
   /// Fill the client hosts information into the document.
   void FillClientHostsInfo(rapidjson::Document* document,
       const ThriftServer::ConnectionContextList& connection_contexts);
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index efe0d6705..963960661 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -17,11 +17,9 @@
 
 #include "service/impala-server.h"
 
-#include <netdb.h>
 #include <unistd.h>
 #include <algorithm>
 #include <exception>
-#include <fstream>
 #include <sstream>
 #ifdef CALLONCEHACK
 #include <calloncehack.h>
@@ -32,12 +30,7 @@
 #include <boost/algorithm/string/replace.hpp>
 #include <boost/algorithm/string/trim.hpp>
 #include <boost/bind.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
-#include <boost/filesystem.hpp>
-#include <boost/lexical_cast.hpp>
 #include <boost/unordered_set.hpp>
-#include <gperftools/malloc_extension.h>
-#include <gutil/strings/numbers.h>
 #include <gutil/strings/split.h>
 #include <gutil/strings/substitute.h>
 #include <gutil/walltime.h>
@@ -45,9 +38,6 @@
 #include <openssl/evp.h>
 #include <rapidjson/rapidjson.h>
 #include <rapidjson/stringbuffer.h>
-#include <rapidjson/writer.h>
-#include <rapidjson/error/en.h>
-#include <sys/socket.h>
 #include <sys/types.h>
 
 #include "catalog/catalog-server.h"
@@ -56,7 +46,6 @@
 #include "common/logging.h"
 #include "common/object-pool.h"
 #include "common/thread-debug-info.h"
-#include "common/version.h"
 #include "exec/external-data-source-executor.h"
 #include "exprs/timezone_db.h"
 #include "gen-cpp/CatalogService_constants.h"
@@ -70,13 +59,10 @@
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-thread.h"
 #include "rpc/thrift-util.h"
-#include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
 #include "runtime/query-driver.h"
-#include "runtime/timestamp-value.h"
-#include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "scheduling/admission-control-service.h"
@@ -87,7 +73,6 @@
 #include "service/impala-http-handler.h"
 #include "service/query-state-record.h"
 #include "util/auth-util.h"
-#include "util/bit-util.h"
 #include "util/coding-util.h"
 #include "util/common-metrics.h"
 #include "util/debug-util.h"
@@ -98,7 +83,6 @@
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "util/openssl-util.h"
-#include "util/parse-util.h"
 #include "util/pretty-printer.h"
 #include "util/redactor.h"
 #include "util/runtime-profile-counters.h"
@@ -112,9 +96,7 @@
 
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/ImpalaService.h"
-#include "gen-cpp/DataSinks_types.h"
 #include "gen-cpp/ImpalaService_types.h"
-#include "gen-cpp/LineageGraph_types.h"
 #include "gen-cpp/Frontend_types.h"
 
 #include "common/names.h"
@@ -279,6 +261,10 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, 
that a query may be i
 DEFINE_int32(disconnected_session_timeout, 15 * 60, "The time, in seconds, 
that a "
     "hiveserver2 session will be maintained after the last connection that it 
has been "
     "used over is disconnected.");
+DEFINE_int32(max_hs2_sessions_per_user, -1, "The maximum allowed number of 
HiveServer2 "
+    "sessions that can be opened by any single connected user on a 
coordinator. "
+    "If set to -1 or 0 then this check is not performed. If set to a positive 
value "
+    "then the per-user session count is viewable in the webui under 
/sessions.");
 DEFINE_int32(idle_client_poll_period_s, 30, "The poll period, in seconds, 
after "
     "no activity from an Impala client which an Impala service thread (beeswax 
and HS2) "
     "wakes up to check if the connection should be closed. If 
--idle_session_timeout is "
@@ -604,7 +590,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
       }
     }
     if (FLAGS_status_report_cancellation_padding <= 0) {
-      const string& err = "--status_report_cancellationn_padding should be > 
0.";
+      const string& err = "--status_report_cancellation_padding should be > 
0.";
       LOG(ERROR) << err;
       if (FLAGS_abort_on_config_error) {
         CLEAN_EXIT_WITH_ERROR(Substitute("Aborting Impala Server startup: $0", 
err));
@@ -1146,7 +1132,7 @@ void ImpalaServer::ArchiveQuery(const QueryHandle& 
query_handle) {
     }
   }
 
-  // 'fetch_rows_lock()' protects several fields in ClientReqestState that are 
read
+  // 'fetch_rows_lock()' protects several fields in ClientRequestState that 
are read
   // during QueryStateRecord creation. There should be no contention on this 
lock because
   // the query has already been closed (e.g. no more results can be fetched).
   shared_ptr<QueryStateRecord> record = nullptr;
@@ -1808,6 +1794,7 @@ Status ImpalaServer::CloseSessionInternal(const 
TUniqueId& session_id,
     ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(-1L);
   } else {
     ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(-1L);
+    DecrementSessionCount(session_state->connected_user);
   }
   unordered_set<TUniqueId> inflight_queries;
   {
@@ -2713,6 +2700,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t 
session_timeout) {
             UnregisterSessionTimeout(FLAGS_disconnected_session_timeout);
             query_cancel_status =
                 Status::Expected(TErrorCode::DISCONNECTED_SESSION_CLOSED);
+            DecrementSessionCount(session_state->connected_user);
           } else {
             // Check if the session should be expired.
             if (session_state->expired || session_state->session_timeout == 0) 
{
@@ -2913,7 +2901,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t 
session_timeout) {
     std::unique_ptr<AdmissionControlServiceProxy> proxy;
     Status get_proxy_status = AdmissionControlService::GetProxy(&proxy);
     if (!get_proxy_status.ok()) {
-      LOG(ERROR) << "Admission heartbeat thread was unabe to get an "
+      LOG(ERROR) << "Admission heartbeat thread was unable to get an "
                     "AdmissionControlService proxy:"
                  << get_proxy_status.GetDetail();
       continue;
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 9cebfe0dc..8024a6133 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -197,6 +197,7 @@ class TQueryExecRequest;
 /// * uuid_lock_
 /// * catalog_version_lock_
 /// * connection_to_sessions_map_lock_
+/// * per_user_session_count_lock_
 ///
 /// TODO: The same doesn't apply to the execution state of an individual plan
 /// fragment: the originating coordinator might die, but we can get notified of
@@ -698,6 +699,10 @@ class ImpalaServer : public ImpalaServiceIf,
     TQueryOptions QueryOptions();
   };
 
+  /// Helper function that decrements the value associated with the given key.
+  /// Removes the entry from the map if the value becomes zero.
+  static void DecrementCount(std::map<std::string, int64>& loads, const 
std::string& key);
+
  private:
   struct ExpirationEvent;
   class SecretArg;
@@ -1422,6 +1427,21 @@ class ImpalaServer : public ImpalaServiceIf,
   /// acquisition order.
   std::mutex connection_to_sessions_map_lock_;
 
+  /// A map from user to a count of sessions created by the user.
+  typedef std::map<std::string, int64> SessionCounts;
+  SessionCounts per_user_session_count_map_;
+
+  /// Protects per_user_session_count_map_. See "Locking" in the class comment 
for lock
+  /// acquisition order.
+  std::mutex per_user_session_count_lock_;
+
+  /// Increment the count of HS2 sessions used by the user.
+  /// If max_hs2_sessions_per_user is greater than zero, and the count of HS2 
sessions
+  /// used by the user would be above that value, then an error status is 
returned.
+  Status IncrementAndCheckSessionCount(const string& user_name);
+  /// Decrement the count of HS2 sessions used by the user.
+  void DecrementSessionCount(const string& user_name);
+
   /// Map from a connection ID to the associated list of sessions so that all 
can be
   /// closed when the connection ends. HS2 allows for multiplexing several 
sessions across
   /// a single connection. If a session has already been closed (only possible 
via HS2) it
diff --git a/tests/custom_cluster/test_session_expiration.py 
b/tests/custom_cluster/test_session_expiration.py
index fccf35f34..e9850e486 100644
--- a/tests/custom_cluster/test_session_expiration.py
+++ b/tests/custom_cluster/test_session_expiration.py
@@ -20,11 +20,14 @@
 from __future__ import absolute_import, division, print_function
 import pytest
 import socket
+
+import re
 from time import sleep
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_cluster import DEFAULT_HS2_PORT
 
+
 class TestSessionExpiration(CustomClusterTestSuite):
   """Tests query expiration logic"""
 
@@ -72,7 +75,6 @@ class TestSessionExpiration(CustomClusterTestSuite):
     assert num_expired + 1 == impalad.service.get_metric_value(
       "impala-server.num-sessions-expired")
 
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_session_timeout=5 "
        "--idle_client_poll_period_s=0")
@@ -166,3 +168,36 @@ class TestSessionExpiration(CustomClusterTestSuite):
     assert num_hs2_connections + 1 == impalad.service.get_metric_value(
         "impala.thrift-server.hiveserver2-frontend.connections-in-use")
     sock.close()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--max_hs2_sessions_per_user=2")
+  def test_max_hs2_sessions_per_user(self):
+    """Test that the --max_hs2_sessions_per_user flag restricts the total 
number of
+    sessions per user. Also checks that the per-user count of hs2 sessions can
+    be seen in the webui."""
+    impalad = self.cluster.get_first_impalad()
+    self.close_impala_clients()
+    # Create 2 sessions.
+    client1 = impalad.service.create_hs2_client()
+    client1.execute_async("select sleep(5000)")
+    client2 = impalad.service.create_hs2_client()
+    client2.execute_async("select sleep(5000)")
+    try:
+      # Trying to open a third session should fail.
+      impalad.service.create_hs2_client()
+      assert False, "should have failed"
+    except Exception as e:
+      assert re.match(r"Number of sessions for user \S+ exceeds coordinator 
limit 2",
+                      str(e)), "Unexpected exception: " + str(e)
+
+    # Test webui for hs2 sessions.
+    res = impalad.service.get_debug_webpage_json("/sessions")
+    assert res['num_sessions'] == 2
+    assert res['users'][0]['user'] is not None
+    assert res['users'][0]['session_count'] == 2
+    # Let queries finish, session count should go to zero.
+    sleep(6)
+    client1.close()
+    client2.close()
+    res = impalad.service.get_debug_webpage_json("/sessions")
+    assert res['num_sessions'] == 0
diff --git a/www/sessions.tmpl b/www/sessions.tmpl
index 6216c25c5..2e0280d4d 100644
--- a/www/sessions.tmpl
+++ b/www/sessions.tmpl
@@ -136,6 +136,29 @@ which the session is removed from this list.</div>
   </tbody>
 </table>
 
+<h2>HiveServer2 Sessions by User</h2>
+
+<div class="alert alert-info" role="alert">
+This data is only available if --max_hs2_sessions_per_user is set to a 
positive value.
+</div>
+
+<table id="users-tbl" class='table table-bordered table-hover table-condensed'>
+  <thead>
+    <tr>
+      <th><small>Name of Connected User</small></th>
+      <th><small>Number of open HiveServer2 Sessions</small></th>
+    </tr>
+  </thead>
+  <tbody>
+    {{#users}}
+    <tr>
+      <td><small>{{user}}</small></td>
+      <td><small>{{session_count}}</small></td>
+    </tr>
+    {{/users}}
+  </tbody>
+</table>
+
 <script>
     $(document).ready(function() {
         $('#client-hosts-tbl').DataTable({
@@ -163,4 +186,13 @@ which the session is removed from this list.</div>
     });
 </script>
 
+<script>
+    $(document).ready(function() {
+        $('#users-tbl').DataTable({
+            "order": [[ 1, "desc" ]],
+            "pageLength": 100
+        });
+    });
+</script>
+
 {{> www/common-footer.tmpl }}

Reply via email to