This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 9dcd136df IMPALA-12699: Set timeout for catalog RPCs
9dcd136df is described below
commit 9dcd136df13df632d90636abb97e9e168f1d8a89
Author: stiga-huang <[email protected]>
AuthorDate: Wed Mar 13 18:50:29 2024 +0800
IMPALA-12699: Set timeout for catalog RPCs
We have seen trivial GetPartialCatalogObject RPCs hanging in coordinator
side, e.g. IMPALA-11409. Due to the piggyback mechanism of fetching
metadata in local-catalog mode (see comments in
CatalogdMetaProvider#loadWithCaching()), a hanging RPC on shared
metadata (e.g. db/table list) could block other queries on the same
coordinator.
Such lightweight requests don't need to acquire table lock or trigger
table loading in catalogd. The causes of the hanging are usually
network issues, e.g. TCP connection become half open due to TCP
retransmissions timed out. A retry on the RPC helps to recover from such
failures. Currently, the timeout for catalog RPC is set to 0 by default.
This prevent the retry and let the client to wait infinitely.
This patch distinguishes the lightweight catalog RPCs and uses a
dedicated catalogd client cache for them. They use a timeout of 30 mins
which is longer enough to tolerate TCP retransmission timeouts.
Also sets a timeout of 10 hours for other catalog RPCs. Operations take
longer than that are usually abnormal and hanging.
Tests
- Add e2e test to verify the lightweight RPC client cache is used.
- Adjust TestRestart.test_catalog_connection_retries to use local
catalog mode since in the legacy catalog mode, coordinator only sends
PrioritizeLoad requests which are lightweight RPCs.
This is a continuation of patch by Wenzhe Zhou <[email protected]>
Change-Id: Iad39a79d0c89f2b04380f610a7e60558429e9c6e
Reviewed-on: http://gerrit.cloudera.org:8080/21146
Reviewed-by: Wenzhe Zhou <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/exec/catalog-op-executor.cc | 21 ++++++++++++++++-----
be/src/runtime/client-cache.cc | 9 ++++++++-
be/src/runtime/client-cache.h | 8 +++++---
be/src/runtime/exec-env.cc | 15 +++++++++++++--
be/src/runtime/exec-env.h | 4 ++++
common/thrift/metrics.json | 20 ++++++++++++++++++++
tests/custom_cluster/test_local_catalog.py | 14 ++++++++++++++
tests/custom_cluster/test_restart_services.py | 3 +++
8 files changed, 83 insertions(+), 11 deletions(-)
diff --git a/be/src/exec/catalog-op-executor.cc
b/be/src/exec/catalog-op-executor.cc
index 985928a9c..0691d30e1 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -375,9 +375,13 @@ Status CatalogOpExecutor::GetPartialCatalogObject(
if (FLAGS_inject_latency_before_catalog_fetch_ms > 0) {
SleepForMs(FLAGS_inject_latency_before_catalog_fetch_ms);
}
+ // Non-table requests are lightweight requests that won't be blocked by
table loading
+ // or table locks. Note that when loading table list of a db, the type is DB.
+ auto client_cache_ptr = (req.object_desc.type == TCatalogObjectType::TABLE) ?
+ env_->catalogd_client_cache() :
env_->catalogd_lightweight_req_client_cache();
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
- CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+ CatalogServiceConnection::DoRpcWithRetry(client_cache_ptr,
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::GetPartialCatalogObject, req,
FLAGS_catalog_client_connection_num_retries,
@@ -399,7 +403,8 @@ Status CatalogOpExecutor::PrioritizeLoad(const
TPrioritizeLoadRequest& req,
TPrioritizeLoadResponse* result) {
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
- CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+ CatalogServiceConnection::DoRpcWithRetry(
+ env_->catalogd_lightweight_req_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::PrioritizeLoad, req,
FLAGS_catalog_client_connection_num_retries,
@@ -427,7 +432,11 @@ Status CatalogOpExecutor::UpdateTableUsage(const
TUpdateTableUsageRequest& req,
TUpdateTableUsageResponse* resp) {
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
- CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+ CatalogServiceConnection::DoRpcWithRetry(
+ // The operation doesn't require table lock in catalogd. It doesn't
require
+ // the table being loaded neither. So we can use clients for
lightweight
+ // requests.
+ env_->catalogd_lightweight_req_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::UpdateTableUsage, req,
FLAGS_catalog_client_connection_num_retries,
@@ -441,7 +450,8 @@ Status CatalogOpExecutor::GetNullPartitionName(
const TGetNullPartitionNameRequest& req, TGetNullPartitionNameResponse*
result) {
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
- CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+ CatalogServiceConnection::DoRpcWithRetry(
+ env_->catalogd_lightweight_req_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::GetNullPartitionName, req,
FLAGS_catalog_client_connection_num_retries,
@@ -455,7 +465,8 @@ Status CatalogOpExecutor::GetLatestCompactions(
const TGetLatestCompactionsRequest& req, TGetLatestCompactionsResponse*
result) {
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
- CatalogServiceConnection::DoRpcWithRetry(env_->catalogd_client_cache(),
+ CatalogServiceConnection::DoRpcWithRetry(
+ env_->catalogd_lightweight_req_client_cache(),
*ExecEnv::GetInstance()->GetCatalogdAddress().get(),
&CatalogServiceClientWrapper::GetLatestCompactions, req,
FLAGS_catalog_client_connection_num_retries,
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index fdf44c16b..c1936a27d 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -233,17 +233,24 @@ void ClientCacheHelper::TestShutdown() {
}
}
-void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string&
key_prefix) {
+void ClientCacheHelper::InitMetrics(MetricGroup* metrics, const string&
key_prefix,
+ const string& key_appendix) {
DCHECK(metrics != NULL);
// Not strictly needed if InitMetrics is called before any cache usage, but
ensures that
// metrics_enabled_ is published.
lock_guard<mutex> lock(cache_lock_);
stringstream count_ss;
count_ss << key_prefix << ".client-cache.clients-in-use";
+ if (!key_appendix.empty()) {
+ count_ss << "-" << key_appendix;
+ }
clients_in_use_metric_ = metrics->AddGauge(count_ss.str(), 0);
stringstream max_ss;
max_ss << key_prefix << ".client-cache.total-clients";
+ if (!key_appendix.empty()) {
+ max_ss << "-" << key_appendix;
+ }
total_clients_metric_ = metrics->AddGauge(max_ss.str(), 0);
metrics_enabled_ = true;
}
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 018c366c6..cb80e0b2a 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -123,7 +123,8 @@ class ClientCacheHelper {
/// Creates two metrics for this cache measuring the number of clients
currently used,
/// and the total number in the cache.
- void InitMetrics(MetricGroup* metrics, const std::string& key_prefix);
+ void InitMetrics(MetricGroup* metrics, const std::string& key_prefix,
+ const std::string& key_appendix);
private:
template <class T> friend class ClientCache;
@@ -447,8 +448,9 @@ class ClientCache {
/// metrics have keys that are prefixed by the key_prefix argument
/// (which should not end in a period).
/// Must be called before the cache is used, otherwise the metrics might be
wrong
- void InitMetrics(MetricGroup* metrics, const std::string& key_prefix) {
- client_cache_helper_.InitMetrics(metrics, key_prefix);
+ void InitMetrics(MetricGroup* metrics, const std::string& key_prefix,
+ const std::string& key_appendix = "") {
+ client_cache_helper_.InitMetrics(metrics, key_prefix, key_appendix);
}
private:
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index d0430dd4c..fb1ed7381 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -161,8 +161,13 @@ DEFINE_int32(backend_client_rpc_timeout_ms, 300000,
"(Advanced) The underlying "
DEFINE_int32(catalog_client_connection_num_retries, 10, "The number of times
connections "
"or RPCs to the catalog should be retried.");
-DEFINE_int32(catalog_client_rpc_timeout_ms, 0, "(Advanced) The underlying
TSocket "
- "send/recv timeout in milliseconds for a catalog client RPC.");
+DEFINE_int32(catalog_client_rpc_timeout_ms, 36000000, "(Advanced) The
underlying TSocket "
+ "send/recv timeout in milliseconds for a catalog client RPC. The default
is 10 hours."
+ " Operations take longer than this are usually abnormal and hanging.");
+DEFINE_int32(catalog_lightweight_rpc_timeout_ms, 1800000, "(Advanced) The
underlying "
+ "TSocket send/recv timeout in milliseconds for a lightweight catalog RPC
which "
+ "shouldn't take long in catalogd, e.g. fetching db/table list. The default
is 30 "
+ "minutes which is long enough to tolerate TCP timeout due to
retransmission.");
DEFINE_int32(catalog_client_rpc_retry_interval_ms, 3000, "(Advanced) The time
to wait "
"before retrying when the catalog RPC client fails to connect to catalogd
or when "
"RPCs to the catalogd fail.");
@@ -225,6 +230,10 @@ ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int
webserver_port,
catalogd_client_cache_(new CatalogServiceClientCache(1, 0,
FLAGS_catalog_client_rpc_timeout_ms,
FLAGS_catalog_client_rpc_timeout_ms, "",
!FLAGS_ssl_client_ca_certificate.empty())),
+ catalogd_lightweight_req_client_cache_(new CatalogServiceClientCache(1, 0,
+ FLAGS_catalog_lightweight_rpc_timeout_ms,
+ FLAGS_catalog_lightweight_rpc_timeout_ms, "",
+ !FLAGS_ssl_client_ca_certificate.empty())),
htable_factory_(new HBaseTableFactory()),
disk_io_mgr_(new io::DiskIoMgr()),
webserver_(new Webserver(FLAGS_webserver_interface, webserver_port,
metrics_.get())),
@@ -429,6 +438,8 @@ Status ExecEnv::Init() {
RETURN_IF_ERROR(metrics_webserver_->Start());
}
catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
+ catalogd_lightweight_req_client_cache_->InitMetrics(
+ metrics_.get(), "catalog.server", "for-lightweight-rpc");
RETURN_IF_ERROR(RegisterMemoryMetrics(
metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
// Initialize impalad metrics
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 53f8f68bc..3a87df656 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -127,6 +127,9 @@ class ExecEnv {
CatalogServiceClientCache* catalogd_client_cache() {
return catalogd_client_cache_.get();
}
+ CatalogServiceClientCache* catalogd_lightweight_req_client_cache() {
+ return catalogd_lightweight_req_client_cache_.get();
+ }
HBaseTableFactory* htable_factory() { return htable_factory_.get(); }
io::DiskIoMgr* disk_io_mgr() { return disk_io_mgr_.get(); }
Webserver* webserver() { return webserver_.get(); }
@@ -232,6 +235,7 @@ class ExecEnv {
boost::scoped_ptr<AdmissionController> admission_controller_;
boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_;
+ boost::scoped_ptr<CatalogServiceClientCache>
catalogd_lightweight_req_client_cache_;
boost::scoped_ptr<HBaseTableFactory> htable_factory_;
boost::scoped_ptr<io::DiskIoMgr> disk_io_mgr_;
boost::scoped_ptr<Webserver> webserver_;
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 7cfc5d19b..5cbd4786a 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -521,6 +521,26 @@
"kind": "GAUGE",
"key": "catalog.server.client-cache.total-clients"
},
+ {
+ "description": "The number of clients currently in use by the Catalog
Server client cache for lightweight RPC.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Catalog Server Client Cache Clients In Use for lightweight RPC",
+ "units": "NONE",
+ "kind": "GAUGE",
+ "key": "catalog.server.client-cache.clients-in-use-for-lightweight-rpc"
+ },
+ {
+ "description": "The total number of clients in the Catalog Server client
cache for lightweight RPC.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Catalog Server Client Cache Total Clients for lightweight RPC",
+ "units": "NONE",
+ "kind": "GAUGE",
+ "key": "catalog.server.client-cache.total-clients-for-lightweight-rpc"
+ },
{
"description": "The full version string of the Catalog Server.",
"contexts": [
diff --git a/tests/custom_cluster/test_local_catalog.py
b/tests/custom_cluster/test_local_catalog.py
index d958cd765..93a587a9d 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -560,6 +560,20 @@ class
TestLocalCatalogObservability(CustomClusterTestSuite):
finally:
client.close()
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal")
+ def test_lightweight_rpc_metrics(self):
+ """Verify catalogd client cache for lightweight RPCs is used correctly"""
+ # Fetching the db and table list should be lightweight requests
+ self.execute_query("describe database functional")
+ self.execute_query("show tables in functional")
+ impalad = self.cluster.impalads[0].service
+ assert 0 ==
impalad.get_metric_value("catalog.server.client-cache.total-clients")
+ assert 1 == impalad.get_metric_value(
+ "catalog.server.client-cache.total-clients-for-lightweight-rpc")
+
class TestFullAcid(CustomClusterTestSuite):
@classmethod
diff --git a/tests/custom_cluster/test_restart_services.py
b/tests/custom_cluster/test_restart_services.py
index cd0f76522..bc9e803d0 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -118,6 +118,9 @@ class TestRestart(CustomClusterTestSuite):
self.wait_for_state(handle, QueryState.EXCEPTION, 20, client=client)
@pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ catalogd_args="--catalog_topic_mode=minimal",
+ impalad_args="--use_local_catalog=true")
def test_catalog_connection_retries(self):
"""Test that connections to the catalogd are retried, both new connections
and cached
connections."""