This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 1f9db9e05b2766d35c186e8e8b4e25b89f7d5793 Author: Andrew Sherman <[email protected]> AuthorDate: Fri Mar 8 13:47:03 2024 -0800 IMPALA-12264: Add limit on number of HS2 sessions per user. Add a new flag --max_hs2_sessions_per_user which sets a limit on the number of Hiveserver2 sessions that can be concurrently opened by a user on a coordinator. By default this value is -1, which disables the new feature. An attempt to open more sessions than the new flag value results in an error. This is implemented by maintaining a map of users to a count of HS2 sessions. If the per-user HS2 session counts are being limited in this way, then the per-user counts are visible on the /sessions page of the webui. Add a new test case in test_session_expiration.py. Change-Id: Idd28edc352102d89774f6ece5376e7c79ae41aa8 Reviewed-on: http://gerrit.cloudera.org:8080/21128 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/impala-hs2-server.cc | 62 +++++++++++++++++++++++-- be/src/service/impala-http-handler.cc | 30 ++++++++++-- be/src/service/impala-http-handler.h | 3 ++ be/src/service/impala-server.cc | 30 ++++-------- be/src/service/impala-server.h | 20 ++++++++ tests/custom_cluster/test_session_expiration.py | 37 ++++++++++++++- www/sessions.tmpl | 32 +++++++++++++ 7 files changed, 184 insertions(+), 30 deletions(-) diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index 00a93d113..9392e1a02 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -20,23 +20,18 @@ #include "service/impala-server.inline.h" #include <algorithm> -#include <type_traits> #include <boost/algorithm/string.hpp> #include <boost/algorithm/string/join.hpp> #include <boost/bind.hpp> -#include <boost/date_time/posix_time/posix_time_types.hpp> #include <boost/unordered_set.hpp> -#include <jni.h> #include <gperftools/heap-profiler.h> #include <gperftools/malloc_extension.h> #include <gtest/gtest.h> #include <gutil/strings/substitute.h> -#include <thrift/protocol/TDebugProtocol.h> #include "common/logging.h" #include "common/version.h" -#include "rpc/thrift-util.h" #include "runtime/coordinator.h" #include "runtime/exec-env.h" #include "runtime/raw-value.h" @@ -96,6 +91,7 @@ DECLARE_string(hostname); DECLARE_int32(webserver_port); DECLARE_int32(idle_session_timeout); DECLARE_int32(disconnected_session_timeout); +DECLARE_int32(max_hs2_sessions_per_user); DECLARE_bool(ping_expose_webserver_url); DECLARE_string(anonymous_user_name); @@ -422,6 +418,11 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val, } } + Status status = IncrementAndCheckSessionCount(state->connected_user); + HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR); + + // At this point all checks are complete, store session status information. + RegisterSessionTimeout(state->session_timeout); TQueryOptionsToMap(state->QueryOptions(), &return_val.configuration); @@ -454,6 +455,57 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val, << "<" << TNetworkAddressToString(state->network_address) << ">."; } +void ImpalaServer::DecrementCount( + std::map<std::string, int64>& loads, const std::string& key) { + // Check if key is present as dereferencing the map will insert it. + // FIXME C++20: use contains(). + if (!loads.count(key)) { + string msg = Substitute("Missing key $0 when decrementing count", key); + LOG(WARNING) << msg; + DCHECK(false) << msg; + return; + } + int64& current_value = loads[key]; + if (current_value == 1) { + // Remove the entry from the map if the current_value will go to zero. + loads.erase(key); + return; + } + if (current_value < 1) { + // Don't allow decrement below zero. + string msg = Substitute("Attempt to decrement below zero with key $0 ", key); + LOG(WARNING) << msg; + return; + } + loads[key]--; +} + +void ImpalaServer::DecrementSessionCount(const string& user_name) { + if (FLAGS_max_hs2_sessions_per_user > 0) { + lock_guard<mutex> l(per_user_session_count_lock_); + DecrementCount(per_user_session_count_map_, user_name); + } +} + +Status ImpalaServer::IncrementAndCheckSessionCount(const string& user_name) { + if (FLAGS_max_hs2_sessions_per_user > 0) { + lock_guard<mutex> l(per_user_session_count_lock_); + // Only check user limit if there is already a session for the user. + if (per_user_session_count_map_.count(user_name)) { + int64 load = per_user_session_count_map_[user_name]; + if (load >= FLAGS_max_hs2_sessions_per_user) { + const string& err_msg = + Substitute("Number of sessions for user $0 exceeds coordinator limit $1", + user_name, FLAGS_max_hs2_sessions_per_user); + VLOG(1) << err_msg; + return Status::Expected(err_msg); + } + } + per_user_session_count_map_[user_name]++; + } + return Status::OK(); +} + void ImpalaServer::CloseSession(TCloseSessionResp& return_val, const TCloseSessionReq& request) { VLOG_QUERY << "CloseSession(): request=" << RedactedDebugString(request); diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc index 27aba2618..c2def1203 100644 --- a/be/src/service/impala-http-handler.cc +++ b/be/src/service/impala-http-handler.cc @@ -20,7 +20,6 @@ #include <algorithm> #include <mutex> #include <sstream> -#include <boost/lexical_cast.hpp> #include <boost/unordered_set.hpp> #include <rapidjson/prettywriter.h> #include <rapidjson/stringbuffer.h> @@ -45,7 +44,6 @@ #include "service/impala-server.h" #include "service/query-state-record.h" #include "thrift/protocol/TDebugProtocol.h" -#include "util/coding-util.h" #include "util/debug-util.h" #include "util/logging-support.h" #include "util/pretty-printer.h" @@ -711,6 +709,9 @@ void ImpalaHttpHandler::SessionsHandler(const Webserver::WebRequest& req, VLOG(1) << "Step3: Fill the connections information into the document."; FillConnectionsInfo(document, connection_contexts); + + VLOG(1) << "Step4: Fill the hs2 users information into the document."; + FillUsersInfo(document); } void ImpalaHttpHandler::FillSessionsInfo(Document* document) { @@ -776,6 +777,29 @@ void ImpalaHttpHandler::FillSessionsInfo(Document* document) { document->GetAllocator()); } +// Comparer that will sort the users array by the session_count field. +bool SessionCountComparer(const Value& a, const Value& b) { + return a["session_count"].GetInt64() < b["session_count"].GetInt64(); +} + +void ImpalaHttpHandler::FillUsersInfo(Document* document) { + Value users(kArrayType); + { + lock_guard<mutex> l(server_->per_user_session_count_lock_); + for (auto const& it : server_->per_user_session_count_map_) { + const string& name = it.first; + const int64& session_count = it.second; + Value users_json(kObjectType); + Value user_name(name.c_str(), document->GetAllocator()); + users_json.AddMember("user", user_name, document->GetAllocator()); + users_json.AddMember("session_count", session_count, document->GetAllocator()); + users.PushBack(users_json, document->GetAllocator()); + } + } + sort(users.Begin(), users.End(), SessionCountComparer); + document->AddMember("users", users, document->GetAllocator()); +} + void ImpalaHttpHandler::FillClientHostsInfo( Document* document, const ThriftServer::ConnectionContextList& connection_contexts) { lock_guard<mutex> session_state_map_l(server_->session_state_map_lock_); @@ -937,7 +961,7 @@ void ImpalaHttpHandler::CatalogHandler(const Webserver::WebRequest& req, database.AddMember("name", str, document->GetAllocator()); TGetTablesResult get_table_results; - Status status = server_->exec_env_->frontend()->GetTableNames( + status = server_->exec_env_->frontend()->GetTableNames( db.db_name, NULL, NULL, &get_table_results); if (!status.ok()) { Value error(status.GetDetail().c_str(), document->GetAllocator()); diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h index 968495ed0..e8ed8dbb5 100644 --- a/be/src/service/impala-http-handler.h +++ b/be/src/service/impala-http-handler.h @@ -285,6 +285,9 @@ class ImpalaHttpHandler { /// Fill the sessions information into the document. void FillSessionsInfo(rapidjson::Document* document); + /// Fill the hs2 users information into the document. + void FillUsersInfo(rapidjson::Document* document); + /// Fill the client hosts information into the document. void FillClientHostsInfo(rapidjson::Document* document, const ThriftServer::ConnectionContextList& connection_contexts); diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index efe0d6705..963960661 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -17,11 +17,9 @@ #include "service/impala-server.h" -#include <netdb.h> #include <unistd.h> #include <algorithm> #include <exception> -#include <fstream> #include <sstream> #ifdef CALLONCEHACK #include <calloncehack.h> @@ -32,12 +30,7 @@ #include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/trim.hpp> #include <boost/bind.hpp> -#include <boost/date_time/posix_time/posix_time_types.hpp> -#include <boost/filesystem.hpp> -#include <boost/lexical_cast.hpp> #include <boost/unordered_set.hpp> -#include <gperftools/malloc_extension.h> -#include <gutil/strings/numbers.h> #include <gutil/strings/split.h> #include <gutil/strings/substitute.h> #include <gutil/walltime.h> @@ -45,9 +38,6 @@ #include <openssl/evp.h> #include <rapidjson/rapidjson.h> #include <rapidjson/stringbuffer.h> -#include <rapidjson/writer.h> -#include <rapidjson/error/en.h> -#include <sys/socket.h> #include <sys/types.h> #include "catalog/catalog-server.h" @@ -56,7 +46,6 @@ #include "common/logging.h" #include "common/object-pool.h" #include "common/thread-debug-info.h" -#include "common/version.h" #include "exec/external-data-source-executor.h" #include "exprs/timezone_db.h" #include "gen-cpp/CatalogService_constants.h" @@ -70,13 +59,10 @@ #include "rpc/rpc-trace.h" #include "rpc/thrift-thread.h" #include "rpc/thrift-util.h" -#include "runtime/client-cache.h" #include "runtime/coordinator.h" #include "runtime/exec-env.h" #include "runtime/lib-cache.h" #include "runtime/query-driver.h" -#include "runtime/timestamp-value.h" -#include "runtime/timestamp-value.inline.h" #include "runtime/tmp-file-mgr.h" #include "runtime/io/disk-io-mgr.h" #include "scheduling/admission-control-service.h" @@ -87,7 +73,6 @@ #include "service/impala-http-handler.h" #include "service/query-state-record.h" #include "util/auth-util.h" -#include "util/bit-util.h" #include "util/coding-util.h" #include "util/common-metrics.h" #include "util/debug-util.h" @@ -98,7 +83,6 @@ #include "util/metrics.h" #include "util/network-util.h" #include "util/openssl-util.h" -#include "util/parse-util.h" #include "util/pretty-printer.h" #include "util/redactor.h" #include "util/runtime-profile-counters.h" @@ -112,9 +96,7 @@ #include "gen-cpp/Types_types.h" #include "gen-cpp/ImpalaService.h" -#include "gen-cpp/DataSinks_types.h" #include "gen-cpp/ImpalaService_types.h" -#include "gen-cpp/LineageGraph_types.h" #include "gen-cpp/Frontend_types.h" #include "common/names.h" @@ -279,6 +261,10 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i DEFINE_int32(disconnected_session_timeout, 15 * 60, "The time, in seconds, that a " "hiveserver2 session will be maintained after the last connection that it has been " "used over is disconnected."); +DEFINE_int32(max_hs2_sessions_per_user, -1, "The maximum allowed number of HiveServer2 " + "sessions that can be opened by any single connected user on a coordinator. " + "If set to -1 or 0 then this check is not performed. If set to a positive value " + "then the per-user session count is viewable in the webui under /sessions."); DEFINE_int32(idle_client_poll_period_s, 30, "The poll period, in seconds, after " "no activity from an Impala client which an Impala service thread (beeswax and HS2) " "wakes up to check if the connection should be closed. If --idle_session_timeout is " @@ -604,7 +590,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env) } } if (FLAGS_status_report_cancellation_padding <= 0) { - const string& err = "--status_report_cancellationn_padding should be > 0."; + const string& err = "--status_report_cancellation_padding should be > 0."; LOG(ERROR) << err; if (FLAGS_abort_on_config_error) { CLEAN_EXIT_WITH_ERROR(Substitute("Aborting Impala Server startup: $0", err)); @@ -1146,7 +1132,7 @@ void ImpalaServer::ArchiveQuery(const QueryHandle& query_handle) { } } - // 'fetch_rows_lock()' protects several fields in ClientReqestState that are read + // 'fetch_rows_lock()' protects several fields in ClientRequestState that are read // during QueryStateRecord creation. There should be no contention on this lock because // the query has already been closed (e.g. no more results can be fetched). shared_ptr<QueryStateRecord> record = nullptr; @@ -1808,6 +1794,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id, ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(-1L); } else { ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_HS2_SESSIONS->Increment(-1L); + DecrementSessionCount(session_state->connected_user); } unordered_set<TUniqueId> inflight_queries; { @@ -2713,6 +2700,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) { UnregisterSessionTimeout(FLAGS_disconnected_session_timeout); query_cancel_status = Status::Expected(TErrorCode::DISCONNECTED_SESSION_CLOSED); + DecrementSessionCount(session_state->connected_user); } else { // Check if the session should be expired. if (session_state->expired || session_state->session_timeout == 0) { @@ -2913,7 +2901,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) { std::unique_ptr<AdmissionControlServiceProxy> proxy; Status get_proxy_status = AdmissionControlService::GetProxy(&proxy); if (!get_proxy_status.ok()) { - LOG(ERROR) << "Admission heartbeat thread was unabe to get an " + LOG(ERROR) << "Admission heartbeat thread was unable to get an " "AdmissionControlService proxy:" << get_proxy_status.GetDetail(); continue; diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 9cebfe0dc..8024a6133 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -197,6 +197,7 @@ class TQueryExecRequest; /// * uuid_lock_ /// * catalog_version_lock_ /// * connection_to_sessions_map_lock_ +/// * per_user_session_count_lock_ /// /// TODO: The same doesn't apply to the execution state of an individual plan /// fragment: the originating coordinator might die, but we can get notified of @@ -698,6 +699,10 @@ class ImpalaServer : public ImpalaServiceIf, TQueryOptions QueryOptions(); }; + /// Helper function that decrements the value associated with the given key. + /// Removes the entry from the map if the value becomes zero. + static void DecrementCount(std::map<std::string, int64>& loads, const std::string& key); + private: struct ExpirationEvent; class SecretArg; @@ -1422,6 +1427,21 @@ class ImpalaServer : public ImpalaServiceIf, /// acquisition order. std::mutex connection_to_sessions_map_lock_; + /// A map from user to a count of sessions created by the user. + typedef std::map<std::string, int64> SessionCounts; + SessionCounts per_user_session_count_map_; + + /// Protects per_user_session_count_map_. See "Locking" in the class comment for lock + /// acquisition order. + std::mutex per_user_session_count_lock_; + + /// Increment the count of HS2 sessions used by the user. + /// If max_hs2_sessions_per_user is greater than zero, and the count of HS2 sessions + /// used by the user would be above that value, then an error status is returned. + Status IncrementAndCheckSessionCount(const string& user_name); + /// Decrement the count of HS2 sessions used by the user. + void DecrementSessionCount(const string& user_name); + /// Map from a connection ID to the associated list of sessions so that all can be /// closed when the connection ends. HS2 allows for multiplexing several sessions across /// a single connection. If a session has already been closed (only possible via HS2) it diff --git a/tests/custom_cluster/test_session_expiration.py b/tests/custom_cluster/test_session_expiration.py index fccf35f34..e9850e486 100644 --- a/tests/custom_cluster/test_session_expiration.py +++ b/tests/custom_cluster/test_session_expiration.py @@ -20,11 +20,14 @@ from __future__ import absolute_import, division, print_function import pytest import socket + +import re from time import sleep from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_cluster import DEFAULT_HS2_PORT + class TestSessionExpiration(CustomClusterTestSuite): """Tests query expiration logic""" @@ -72,7 +75,6 @@ class TestSessionExpiration(CustomClusterTestSuite): assert num_expired + 1 == impalad.service.get_metric_value( "impala-server.num-sessions-expired") - @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--idle_session_timeout=5 " "--idle_client_poll_period_s=0") @@ -166,3 +168,36 @@ class TestSessionExpiration(CustomClusterTestSuite): assert num_hs2_connections + 1 == impalad.service.get_metric_value( "impala.thrift-server.hiveserver2-frontend.connections-in-use") sock.close() + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args("--max_hs2_sessions_per_user=2") + def test_max_hs2_sessions_per_user(self): + """Test that the --max_hs2_sessions_per_user flag restricts the total number of + sessions per user. Also checks that the per-user count of hs2 sessions can + be seen in the webui.""" + impalad = self.cluster.get_first_impalad() + self.close_impala_clients() + # Create 2 sessions. + client1 = impalad.service.create_hs2_client() + client1.execute_async("select sleep(5000)") + client2 = impalad.service.create_hs2_client() + client2.execute_async("select sleep(5000)") + try: + # Trying to open a third session should fail. + impalad.service.create_hs2_client() + assert False, "should have failed" + except Exception as e: + assert re.match(r"Number of sessions for user \S+ exceeds coordinator limit 2", + str(e)), "Unexpected exception: " + str(e) + + # Test webui for hs2 sessions. + res = impalad.service.get_debug_webpage_json("/sessions") + assert res['num_sessions'] == 2 + assert res['users'][0]['user'] is not None + assert res['users'][0]['session_count'] == 2 + # Let queries finish, session count should go to zero. + sleep(6) + client1.close() + client2.close() + res = impalad.service.get_debug_webpage_json("/sessions") + assert res['num_sessions'] == 0 diff --git a/www/sessions.tmpl b/www/sessions.tmpl index 6216c25c5..2e0280d4d 100644 --- a/www/sessions.tmpl +++ b/www/sessions.tmpl @@ -136,6 +136,29 @@ which the session is removed from this list.</div> </tbody> </table> +<h2>HiveServer2 Sessions by User</h2> + +<div class="alert alert-info" role="alert"> +This data is only available if --max_hs2_sessions_per_user is set to a positive value. +</div> + +<table id="users-tbl" class='table table-bordered table-hover table-condensed'> + <thead> + <tr> + <th><small>Name of Connected User</small></th> + <th><small>Number of open HiveServer2 Sessions</small></th> + </tr> + </thead> + <tbody> + {{#users}} + <tr> + <td><small>{{user}}</small></td> + <td><small>{{session_count}}</small></td> + </tr> + {{/users}} + </tbody> +</table> + <script> $(document).ready(function() { $('#client-hosts-tbl').DataTable({ @@ -163,4 +186,13 @@ which the session is removed from this list.</div> }); </script> +<script> + $(document).ready(function() { + $('#users-tbl').DataTable({ + "order": [[ 1, "desc" ]], + "pageLength": 100 + }); + }); +</script> + {{> www/common-footer.tmpl }}
