This is an automated email from the ASF dual-hosted git repository. wzhou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4c21084e20b0a10ba5c552f22a1a34c74e20ede2 Author: wzhou-code <[email protected]> AuthorDate: Fri Oct 4 15:30:57 2024 -0700 IMPALA-13427: Make connect timeout of statestore HA RPC tunable In some deployment environment, unsuccessful socket connection attempts take long time to return error if connect timeout of the socket is set as 0. This causes statestored to tie up in unsuccessful connection attempts during initialization if its peer is not ready when statestore HA is enabled. Currently socket connect timeout of Thrift RPC is always set as 0. This patch makes socket connect timeout of Thrift RPC tunable, and adds a flag variable statestore_ha_client_rpc_conn_timeout_ms for the underlying socket connect timeout for Statestore HA RPC. Its default value equals 0, which equals the default value of TSocket.connTimeout_. Testing: - Added new test cases for statestore HA with non zero value for statestore_ha_client_rpc_conn_timeout_ms. - Passed core tests. Change-Id: Ie5840a76b4a34b4714c47b86f6366328b5ceed3a Reviewed-on: http://gerrit.cloudera.org:8080/21900 Reviewed-by: Norbert Luksa <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/rpc/thrift-client.h | 3 ++ be/src/runtime/client-cache.cc | 3 +- be/src/runtime/client-cache.h | 17 +++++--- be/src/statestore/statestore.cc | 4 +- tests/custom_cluster/test_statestored_ha.py | 61 +++++++++++++++++++++++------ 5 files changed, 68 insertions(+), 20 deletions(-) diff --git a/be/src/rpc/thrift-client.h b/be/src/rpc/thrift-client.h index 9e3c8eac3..a88fe300a 100644 --- a/be/src/rpc/thrift-client.h +++ b/be/src/rpc/thrift-client.h @@ -62,6 +62,9 @@ class ThriftClientImpl { /// Set send timeout on the underlying TSocket. void setSendTimeout(int32_t ms) { socket_->setSendTimeout(ms); } + /// Set connect timeout on the underlying TSocket. + void setConnTimeout(int32_t ms) { socket_->setConnTimeout(ms); } + Status init_status() { return init_status_; } protected: diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc index c1936a27d..af902da56 100644 --- a/be/src/runtime/client-cache.cc +++ b/be/src/runtime/client-cache.cc @@ -119,9 +119,10 @@ Status ClientCacheHelper::CreateClient(const TNetworkAddress& address, return client_impl->init_status(); } - // Set the TSocket's send and receive timeouts. + // Set the TSocket's send, receive and connect timeouts. client_impl->setRecvTimeout(recv_timeout_ms_); client_impl->setSendTimeout(send_timeout_ms_); + client_impl->setConnTimeout(conn_timeout_ms_); Status status = client_impl->OpenWithRetry(num_tries_, wait_ms_); if (!status.ok()) { diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h index cb80e0b2a..78ada4cfa 100644 --- a/be/src/runtime/client-cache.h +++ b/be/src/runtime/client-cache.h @@ -130,11 +130,12 @@ class ClientCacheHelper { template <class T> friend class ClientCache; /// Private constructor so that only ClientCache can instantiate this class. ClientCacheHelper(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms, - int32_t recv_timeout_ms) + int32_t recv_timeout_ms, int32_t conn_timeout_ms) : num_tries_(num_tries), wait_ms_(wait_ms), send_timeout_ms_(send_timeout_ms), recv_timeout_ms_(recv_timeout_ms), + conn_timeout_ms_(conn_timeout_ms), metrics_enabled_(false) { } /// There are three lock categories - the cache-wide lock (cache_lock_), the locks for a @@ -193,6 +194,10 @@ class ClientCacheHelper { /// Time to wait for the underlying socket to receive data, e.g., for an RPC response. const int32_t recv_timeout_ms_; + /// Time to wait for setting up underlying TSocket connection. The default value + /// equals 0, which is same as the default value of TSocket.connTimeout_. + const int32_t conn_timeout_ms_; + /// True if metrics have been registered (i.e. InitMetrics() was called)), and *_metric_ /// are valid pointers. bool metrics_enabled_; @@ -410,7 +415,7 @@ class ClientCache { typedef ThriftClient<T> Client; ClientCache(const std::string& service_name = "", bool enable_ssl = false) - : client_cache_helper_(1, 0, 0, 0) { + : client_cache_helper_(1, 0, 0, 0, 0) { client_factory_ = boost::bind<ThriftClientImpl*>( boost::mem_fn(&ClientCache::MakeClient), this, _1, _2, service_name, enable_ssl); } @@ -418,11 +423,13 @@ class ClientCache { /// Create a ClientCache where connections are tried num_tries times, with a pause of /// wait_ms between attempts. The underlying TSocket's send and receive timeouts of /// each connection can also be set. If num_tries == 0, retry connections indefinitely. - /// A send/receive timeout of 0 means there is no timeout. + /// A send/receive/connect timeout of 0 means there is no timeout. ClientCache(uint32_t num_tries, uint64_t wait_ms, int32_t send_timeout_ms = 0, int32_t recv_timeout_ms = 0, const std::string& service_name = "", - bool enable_ssl = false) - : client_cache_helper_(num_tries, wait_ms, send_timeout_ms, recv_timeout_ms) { + bool enable_ssl = false, int32_t conn_timeout_ms = 0) + : client_cache_helper_( + num_tries, wait_ms, send_timeout_ms, recv_timeout_ms, conn_timeout_ms) { + DCHECK_GE(conn_timeout_ms, 0); client_factory_ = boost::bind<ThriftClientImpl*>( boost::mem_fn(&ClientCache::MakeClient), this, _1, _2, service_name, enable_ssl); } diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc index 5b5ec184f..98ab753cc 100644 --- a/be/src/statestore/statestore.cc +++ b/be/src/statestore/statestore.cc @@ -155,6 +155,8 @@ DEFINE_int32(statestore_peer_cnxn_retry_interval_ms, 1000, "The interval, in ms, DEFINE_int32(statestore_ha_client_rpc_timeout_ms, 300000, "(Advanced) The underlying " "TSocket send/recv timeout in milliseconds for a client RPC of Statestore HA " "service."); +DEFINE_int32(statestore_ha_client_rpc_conn_timeout_ms, 0, "(Advanced) The underlying " + "TSocket conn timeout in milliseconds for a client RPC of Statestore HA service."); DEFINE_int64(update_statestore_rpc_resend_interval_ms, 100, "(Advanced) Interval " "(in ms) with which the statestore resends the RPCs of updating statestored's role " "to subscribers if the statestore has failed to send the RPCs to the subscribers."); @@ -753,7 +755,7 @@ Statestore::Statestore(MetricGroup* metrics) ha_client_cache_.reset(new StatestoreHaClientCache(1, 0, FLAGS_statestore_ha_client_rpc_timeout_ms, FLAGS_statestore_ha_client_rpc_timeout_ms, "", - IsInternalTlsConfigured())); + IsInternalTlsConfigured(), FLAGS_statestore_ha_client_rpc_conn_timeout_ms)); ha_client_cache_->InitMetrics(metrics, "statestored-ha"); ha_standby_ss_failure_detector_.reset(new MissedHeartbeatFailureDetector( FLAGS_statestore_max_missed_heartbeats, diff --git a/tests/custom_cluster/test_statestored_ha.py b/tests/custom_cluster/test_statestored_ha.py index 35dcca7f7..6cea78644 100644 --- a/tests/custom_cluster/test_statestored_ha.py +++ b/tests/custom_cluster/test_statestored_ha.py @@ -171,12 +171,9 @@ class TestStatestoredHA(CustomClusterTestSuite): pytest.skip("Skip this tests for UBSAN builds since " + assert_string) assert False, assert_string - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - statestored_args="--use_network_address_as_statestore_priority=true", - start_args="--enable_statestored_ha") - def test_statestored_ha_with_two_statestored(self): - """The test case for cluster started with statestored HA enabled.""" + def __test_statestored_ha_with_two_statestored(self): + """Basic test case for cluster started with two statestored when statestored HA is + enabled.""" # Verify two statestored instances are created with one as active. statestoreds = self.cluster.statestoreds() assert(len(statestoreds) == 2) @@ -210,13 +207,26 @@ class TestStatestoredHA(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - statestored_args="--enable_statestored_ha=true " - "--use_network_address_as_statestore_priority=true " - "--statestore_ha_preemption_wait_period_ms=200", - catalogd_args="--enable_statestored_ha=true", - impalad_args="--enable_statestored_ha=true") - def test_statestored_ha_with_one_statestored(self): - """The test case for cluster with only one statestored when statestored HA is + statestored_args="--use_network_address_as_statestore_priority=true", + start_args="--enable_statestored_ha") + def test_statestored_ha_with_two_statestored(self): + """The test case for cluster with two statestored when statestored HA is enabled. + """ + self.__test_statestored_ha_with_two_statestored() + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + statestored_args="--use_network_address_as_statestore_priority=true " + "--statestore_ha_preemption_wait_period_ms=200 " + "--statestore_ha_client_rpc_conn_timeout_ms=100", + start_args="--enable_statestored_ha") + def test_statestored_ha_with_two_statestored_and_conn_timeout(self): + """The test case for cluster with two statestored when statestored HA is enabled. + statestore_ha_client_rpc_conn_timeout_ms is set as 100 ms.""" + self.__test_statestored_ha_with_two_statestored() + + def __test_statestored_ha_with_one_statestored(self): + """Basic test case for cluster with only one statestored when statestored HA is enabled.""" # Verify the statestored instances is created as active. statestoreds = self.cluster.statestoreds() @@ -233,6 +243,31 @@ class TestStatestoredHA(CustomClusterTestSuite): # Verify simple queries are ran successfully. self.__run_simple_queries() + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + statestored_args="--enable_statestored_ha=true " + "--use_network_address_as_statestore_priority=true " + "--statestore_ha_preemption_wait_period_ms=200", + catalogd_args="--enable_statestored_ha=true", + impalad_args="--enable_statestored_ha=true") + def test_statestored_ha_with_one_statestored(self): + """The test case for cluster with only one statestored when statestored HA is + enabled.""" + self.__test_statestored_ha_with_one_statestored() + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + statestored_args="--enable_statestored_ha=true " + "--use_network_address_as_statestore_priority=true " + "--statestore_ha_preemption_wait_period_ms=200 " + "--statestore_ha_client_rpc_conn_timeout_ms=100", + catalogd_args="--enable_statestored_ha=true", + impalad_args="--enable_statestored_ha=true") + def test_statestored_ha_with_one_statestored_and_conn_timeout(self): + """The test case for cluster with only one statestored when statestored HA is + enabled. statestore_ha_client_rpc_conn_timeout_ms is set as 100 ms.""" + self.__test_statestored_ha_with_one_statestored() + @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( statestored_args="--use_network_address_as_statestore_priority=true",
