This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 170baba9e4f branch-4.0: [enhancement](cloud) improve FE RPC retry and
MetaService connection handling #59698 (#59849)
170baba9e4f is described below
commit 170baba9e4f2985ee45a6159d3337e402dcfa21b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 14 11:40:32 2026 +0800
branch-4.0: [enhancement](cloud) improve FE RPC retry and MetaService
connection handling #59698 (#59849)
Cherry-picked from #59698
Co-authored-by: Luwei <[email protected]>
---
be/src/cloud/config.cpp | 2 +-
.../runtime/stream_load/stream_load_executor.cpp | 23 ++--
be/src/util/thrift_rpc_helper.cpp | 52 +++++++--
be/src/util/thrift_rpc_helper.h | 11 ++
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 35 +++++-
.../doris/cloud/rpc/MetaServiceProxyTest.java | 127 +++++++++++++++++++++
7 files changed, 227 insertions(+), 25 deletions(-)
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index f80ef83d8a0..db6de7b718e 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -109,7 +109,7 @@ DEFINE_mBool(enable_cloud_tablet_report, "true");
DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");
-DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");
+DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "100");
DEFINE_mInt32(meta_service_conflict_error_retry_times, "10");
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 5ac18954e1d..907fbdf7410 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -176,15 +176,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext*
ctx) {
TLoadTxnBeginResult result;
Status status;
int64_t duration_ns = 0;
- TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
+ auto master_addr_provider = [this]() { return
_exec_env->cluster_info()->master_fe_addr; };
+ TNetworkAddress master_addr = master_addr_provider();
if (master_addr.hostname.empty() || master_addr.port == 0) {
status = Status::Error<SERVICE_UNAVAILABLE>("Have not get FE Master
heartbeat yet");
} else {
SCOPED_RAW_TIMER(&duration_ns);
#ifndef BE_TEST
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&request, &result](FrontendServiceConnection& client) {
+ master_addr_provider, [&request,
&result](FrontendServiceConnection& client) {
client->loadTxnBegin(result, request);
}));
#else
@@ -213,14 +213,14 @@ Status
StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
TLoadTxnCommitRequest request;
get_commit_request(ctx, request);
- TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnCommitResult result;
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
#ifndef BE_TEST
+ auto master_addr_provider = [this]() { return
_exec_env->cluster_info()->master_fe_addr; };
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
+ master_addr_provider,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnPreCommit(result, request);
},
@@ -258,13 +258,13 @@ Status
StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
request.__set_txnId(ctx->txn_id);
}
- TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxn2PCResult result;
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
+ auto master_addr_provider = [this]() { return
_exec_env->cluster_info()->master_fe_addr; };
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
+ master_addr_provider,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxn2PC(result, request);
},
@@ -310,11 +310,11 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext*
ctx) {
TLoadTxnCommitRequest request;
get_commit_request(ctx, request);
- TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnCommitResult result;
#ifndef BE_TEST
+ auto master_addr_provider = [this]() { return
_exec_env->cluster_info()->master_fe_addr; };
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
+ master_addr_provider,
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnCommit(result, request);
},
@@ -342,7 +342,6 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext*
ctx) {
void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1);
- TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnRollbackRequest request;
set_request_auth(&request, ctx->auth);
request.__set_db(ctx->db);
@@ -363,9 +362,9 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext*
ctx) {
TLoadTxnRollbackResult result;
#ifndef BE_TEST
+ auto master_addr_provider = [this]() { return
_exec_env->cluster_info()->master_fe_addr; };
auto rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
- master_addr.hostname, master_addr.port,
- [&request, &result](FrontendServiceConnection& client) {
+ master_addr_provider, [&request,
&result](FrontendServiceConnection& client) {
client->loadTxnRollback(result, request);
});
if (!rpc_st.ok()) {
diff --git a/be/src/util/thrift_rpc_helper.cpp
b/be/src/util/thrift_rpc_helper.cpp
index de5091cf7f1..bf26230c909 100644
--- a/be/src/util/thrift_rpc_helper.cpp
+++ b/be/src/util/thrift_rpc_helper.cpp
@@ -63,7 +63,16 @@ void ThriftRpcHelper::setup(ExecEnv* exec_env) {
template <typename T>
Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port,
std::function<void(ClientConnection<T>&)>
callback, int timeout_ms) {
- TNetworkAddress address = make_network_address(ip, port);
+ return rpc<T>([ip, port]() { return make_network_address(ip, port); },
callback, timeout_ms);
+}
+
+template <typename T>
+Status ThriftRpcHelper::rpc(std::function<TNetworkAddress()> address_provider,
+ std::function<void(ClientConnection<T>&)>
callback, int timeout_ms) {
+ TNetworkAddress address = address_provider();
+ if (address.hostname.empty() || address.port == 0) {
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is
not available");
+ }
Status status;
DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; });
ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address,
timeout_ms, &status);
@@ -85,15 +94,36 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const
int32_t port,
#endif
std::this_thread::sleep_for(
std::chrono::milliseconds(config::thrift_client_retry_interval_ms));
- status = client.reopen(timeout_ms);
- if (!status.ok()) {
+ TNetworkAddress retry_address = address_provider();
+ if (retry_address.hostname.empty() || retry_address.port == 0) {
+ return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE
address is not available");
+ }
+ if (retry_address.hostname != address.hostname ||
retry_address.port != address.port) {
+#ifndef ADDRESS_SANITIZER
+ LOG(INFO) << "retrying call frontend service with new
address=" << retry_address;
+#endif
+ Status retry_status;
+ ClientConnection<T>
retry_client(_s_exec_env->get_client_cache<T>(), retry_address,
+ timeout_ms, &retry_status);
+ if (!retry_status.ok()) {
+#ifndef ADDRESS_SANITIZER
+ LOG(WARNING) << "Connect frontend failed, address=" <<
retry_address
+ << ", status=" << retry_status;
+#endif
+ return retry_status;
+ }
+ callback(retry_client);
+ } else {
+ status = client.reopen(timeout_ms);
+ if (!status.ok()) {
#ifndef ADDRESS_SANITIZER
- LOG(WARNING) << "client reopen failed. address=" << address
- << ", status=" << status;
+ LOG(WARNING) << "client reopen failed. address=" << address
+ << ", status=" << status;
#endif
- return status;
+ return status;
+ }
+ callback(client);
}
- callback(client);
}
} catch (apache::thrift::TException& e) {
#ifndef ADDRESS_SANITIZER
@@ -104,8 +134,8 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const
int32_t port,
std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2));
// just reopen to disable this connection
static_cast<void>(client.reopen(timeout_ms));
- return Status::RpcError("failed to call frontend service, FE
address={}:{}, reason: {}", ip,
- port, e.what());
+ return Status::RpcError("failed to call frontend service, FE
address={}:{}, reason: {}",
+ address.hostname, address.port, e.what());
}
return Status::OK();
}
@@ -114,6 +144,10 @@ template Status
ThriftRpcHelper::rpc<FrontendServiceClient>(
const std::string& ip, const int32_t port,
std::function<void(ClientConnection<FrontendServiceClient>&)>
callback, int timeout_ms);
+template Status ThriftRpcHelper::rpc<FrontendServiceClient>(
+ std::function<TNetworkAddress()> address_provider,
+ std::function<void(ClientConnection<FrontendServiceClient>&)>
callback, int timeout_ms);
+
template Status ThriftRpcHelper::rpc<BackendServiceClient>(
const std::string& ip, const int32_t port,
std::function<void(ClientConnection<BackendServiceClient>&)> callback,
int timeout_ms);
diff --git a/be/src/util/thrift_rpc_helper.h b/be/src/util/thrift_rpc_helper.h
index cf876990e6e..3617967b815 100644
--- a/be/src/util/thrift_rpc_helper.h
+++ b/be/src/util/thrift_rpc_helper.h
@@ -17,6 +17,7 @@
#pragma once
+#include <gen_cpp/Types_types.h>
#include <stdint.h>
#include <functional>
@@ -43,10 +44,20 @@ public:
return rpc(ip, port, callback, config::thrift_rpc_timeout_ms);
}
+ template <typename T>
+ static Status rpc(std::function<TNetworkAddress()> address_provider,
+ std::function<void(ClientConnection<T>&)> callback) {
+ return rpc(address_provider, callback, config::thrift_rpc_timeout_ms);
+ }
+
template <typename T>
static Status rpc(const std::string& ip, const int32_t port,
std::function<void(ClientConnection<T>&)> callback, int
timeout_ms);
+ template <typename T>
+ static Status rpc(std::function<TNetworkAddress()> address_provider,
+ std::function<void(ClientConnection<T>&)> callback, int
timeout_ms);
+
static ExecEnv* get_exec_env() { return _s_exec_env; }
private:
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 938e7cbe49e..9f8b9da0d3b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3649,7 +3649,7 @@ public class Config extends ConfigBase {
+ "(for example CreateRepositoryStmt,
CreatePolicyCommand), separated by commas."})
public static String block_sql_ast_names = "";
- public static long meta_service_rpc_reconnect_interval_ms = 5000;
+ public static long meta_service_rpc_reconnect_interval_ms = 100;
public static long meta_service_rpc_retry_cnt = 10;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 049344bafe6..567d99c1939 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -178,10 +178,12 @@ public class MetaServiceProxy {
long maxRetries = Config.meta_service_rpc_retry_cnt;
for (long tried = 1; tried <= maxRetries; tried++) {
MetaServiceClient client = null;
+ boolean requestFailed = false;
try {
client = proxy.getProxy();
return function.apply(client);
} catch (StatusRuntimeException sre) {
+ requestFailed = true;
LOG.warn("failed to request meta service code {}, msg {},
trycnt {}", sre.getStatus().getCode(),
sre.getMessage(), tried);
boolean shouldRetry = false;
@@ -200,12 +202,13 @@ public class MetaServiceProxy {
throw new RpcException("", sre.getMessage(), sre);
}
} catch (Exception e) {
+ requestFailed = true;
LOG.warn("failed to request meta servive trycnt {}",
tried, e);
if (tried >= maxRetries) {
throw new RpcException("", e.getMessage(), e);
}
} finally {
- if (proxy.needReconn() && client != null) {
+ if (requestFailed && proxy.needReconn() && client != null)
{
client.shutdown(true);
}
}
@@ -227,7 +230,35 @@ public class MetaServiceProxy {
public Future<Cloud.GetVersionResponse>
getVisibleVersionAsync(Cloud.GetVersionRequest request)
throws RpcException {
- return w.executeRequest((client) ->
client.getVisibleVersionAsync(request));
+ MetaServiceClient client = null;
+ try {
+ client = getProxy();
+ Future<Cloud.GetVersionResponse> future =
client.getVisibleVersionAsync(request);
+ if (future instanceof
com.google.common.util.concurrent.ListenableFuture) {
+
com.google.common.util.concurrent.ListenableFuture<Cloud.GetVersionResponse>
listenableFuture =
+
(com.google.common.util.concurrent.ListenableFuture<Cloud.GetVersionResponse>)
future;
+ MetaServiceClient finalClient = client;
+
com.google.common.util.concurrent.Futures.addCallback(listenableFuture,
+ new
com.google.common.util.concurrent.FutureCallback<Cloud.GetVersionResponse>() {
+ @Override
+ public void onSuccess(Cloud.GetVersionResponse
result) {
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (finalClient != null) {
+ finalClient.shutdown(true);
+ }
+ }
+ },
com.google.common.util.concurrent.MoreExecutors.directExecutor());
+ }
+ return future;
+ } catch (Exception e) {
+ if (client != null) {
+ client.shutdown(true);
+ }
+ throw new RpcException("", e.getMessage(), e);
+ }
}
public Cloud.GetVersionResponse getVersion(Cloud.GetVersionRequest
request) throws RpcException {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
new file mode 100644
index 00000000000..2d5a4353265
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.rpc;
+
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.rpc.RpcException;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.Queue;
+
+public class MetaServiceProxyTest {
+ private String originEndpoint;
+ private long originReconnectIntervalMs;
+ private long originRetryCnt;
+
+ @Before
+ public void setUp() {
+ originEndpoint = Config.meta_service_endpoint;
+ originReconnectIntervalMs =
Config.meta_service_rpc_reconnect_interval_ms;
+ originRetryCnt = Config.meta_service_rpc_retry_cnt;
+
+ Config.meta_service_endpoint = "127.0.0.1:12345";
+ Config.meta_service_rpc_reconnect_interval_ms = 0;
+ Config.meta_service_rpc_retry_cnt = 1;
+ }
+
+ @After
+ public void tearDown() {
+ Config.meta_service_endpoint = originEndpoint;
+ Config.meta_service_rpc_reconnect_interval_ms =
originReconnectIntervalMs;
+ Config.meta_service_rpc_retry_cnt = originRetryCnt;
+ }
+
+ @Test
+ public void testExecuteRequestNoShutdownOnSuccess() throws RpcException {
+ MetaServiceProxy proxy = new MetaServiceProxy();
+ MetaServiceClient client = Mockito.mock(MetaServiceClient.class);
+ Mockito.when(client.isNormalState()).thenReturn(true);
+ Mockito.when(client.isConnectionAgeExpired()).thenReturn(false);
+
+ Map<String, MetaServiceClient> serviceMap =
Deencapsulation.getField(proxy, "serviceMap");
+ serviceMap.put(Config.meta_service_endpoint, client);
+ Queue<Long> lastConnTimeMs = Deencapsulation.getField(proxy,
"lastConnTimeMs");
+ lastConnTimeMs.clear();
+ lastConnTimeMs.add(0L);
+ lastConnTimeMs.add(0L);
+ lastConnTimeMs.add(0L);
+
+ MetaServiceProxy.MetaServiceClientWrapper wrapper =
Deencapsulation.getField(proxy, "w");
+ String response = wrapper.executeRequest((ignored) -> "ok");
+ Assert.assertEquals("ok", response);
+ Mockito.verify(client, Mockito.never()).shutdown(Mockito.anyBoolean());
+ }
+
+ @Test
+ public void testExecuteRequestShutdownOnFailure() {
+ MetaServiceProxy proxy = new MetaServiceProxy();
+ MetaServiceClient client = Mockito.mock(MetaServiceClient.class);
+ Mockito.when(client.isNormalState()).thenReturn(true);
+ Mockito.when(client.isConnectionAgeExpired()).thenReturn(false);
+
+ Map<String, MetaServiceClient> serviceMap =
Deencapsulation.getField(proxy, "serviceMap");
+ serviceMap.put(Config.meta_service_endpoint, client);
+ Queue<Long> lastConnTimeMs = Deencapsulation.getField(proxy,
"lastConnTimeMs");
+ lastConnTimeMs.clear();
+ lastConnTimeMs.add(0L);
+ lastConnTimeMs.add(0L);
+ lastConnTimeMs.add(0L);
+
+ MetaServiceProxy.MetaServiceClientWrapper wrapper =
Deencapsulation.getField(proxy, "w");
+ try {
+ wrapper.executeRequest((ignored) -> {
+ throw new RuntimeException("rpc failed");
+ });
+ Assert.fail("should throw RpcException");
+ } catch (RpcException ignored) {
+ // expected
+ }
+
+ Mockito.verify(client).shutdown(true);
+ }
+
+ @Test
+ public void testGetVisibleVersionAsyncShutdownOnFailure() throws
RpcException {
+ MetaServiceProxy proxy = new MetaServiceProxy();
+ MetaServiceClient client = Mockito.mock(MetaServiceClient.class);
+ Mockito.when(client.isNormalState()).thenReturn(true);
+ Mockito.when(client.isConnectionAgeExpired()).thenReturn(false);
+
+ SettableFuture<Cloud.GetVersionResponse> future =
SettableFuture.create();
+
Mockito.when(client.getVisibleVersionAsync(Mockito.any())).thenReturn(future);
+
+ Map<String, MetaServiceClient> serviceMap =
Deencapsulation.getField(proxy, "serviceMap");
+ serviceMap.put(Config.meta_service_endpoint, client);
+
+ Cloud.GetVersionRequest request =
Cloud.GetVersionRequest.newBuilder().build();
+ proxy.getVisibleVersionAsync(request);
+
+ future.setException(new RuntimeException("async failed"));
+
+ Mockito.verify(client).shutdown(true);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]