This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 170baba9e4f branch-4.0: [enhancement](cloud) improve FE RPC retry and 
MetaService connection handling #59698 (#59849)
170baba9e4f is described below

commit 170baba9e4f2985ee45a6159d3337e402dcfa21b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 14 11:40:32 2026 +0800

    branch-4.0: [enhancement](cloud) improve FE RPC retry and MetaService 
connection handling #59698 (#59849)
    
    Cherry-picked from #59698
    
    Co-authored-by: Luwei <[email protected]>
---
 be/src/cloud/config.cpp                            |   2 +-
 .../runtime/stream_load/stream_load_executor.cpp   |  23 ++--
 be/src/util/thrift_rpc_helper.cpp                  |  52 +++++++--
 be/src/util/thrift_rpc_helper.h                    |  11 ++
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |  35 +++++-
 .../doris/cloud/rpc/MetaServiceProxyTest.java      | 127 +++++++++++++++++++++
 7 files changed, 227 insertions(+), 25 deletions(-)

diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index f80ef83d8a0..db6de7b718e 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -109,7 +109,7 @@ DEFINE_mBool(enable_cloud_tablet_report, "true");
 
 DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");
 
-DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");
+DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "100");
 
 DEFINE_mInt32(meta_service_conflict_error_retry_times, "10");
 
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 5ac18954e1d..907fbdf7410 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -176,15 +176,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* 
ctx) {
     TLoadTxnBeginResult result;
     Status status;
     int64_t duration_ns = 0;
-    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
+    auto master_addr_provider = [this]() { return 
_exec_env->cluster_info()->master_fe_addr; };
+    TNetworkAddress master_addr = master_addr_provider();
     if (master_addr.hostname.empty() || master_addr.port == 0) {
         status = Status::Error<SERVICE_UNAVAILABLE>("Have not get FE Master 
heartbeat yet");
     } else {
         SCOPED_RAW_TIMER(&duration_ns);
 #ifndef BE_TEST
         RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-                master_addr.hostname, master_addr.port,
-                [&request, &result](FrontendServiceConnection& client) {
+                master_addr_provider, [&request, 
&result](FrontendServiceConnection& client) {
                     client->loadTxnBegin(result, request);
                 }));
 #else
@@ -213,14 +213,14 @@ Status 
StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
     TLoadTxnCommitRequest request;
     get_commit_request(ctx, request);
 
-    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     TLoadTxnCommitResult result;
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
 #ifndef BE_TEST
+        auto master_addr_provider = [this]() { return 
_exec_env->cluster_info()->master_fe_addr; };
         RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-                master_addr.hostname, master_addr.port,
+                master_addr_provider,
                 [&request, &result](FrontendServiceConnection& client) {
                     client->loadTxnPreCommit(result, request);
                 },
@@ -258,13 +258,13 @@ Status 
StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
         request.__set_txnId(ctx->txn_id);
     }
 
-    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     TLoadTxn2PCResult result;
     int64_t duration_ns = 0;
     {
         SCOPED_RAW_TIMER(&duration_ns);
+        auto master_addr_provider = [this]() { return 
_exec_env->cluster_info()->master_fe_addr; };
         RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-                master_addr.hostname, master_addr.port,
+                master_addr_provider,
                 [&request, &result](FrontendServiceConnection& client) {
                     client->loadTxn2PC(result, request);
                 },
@@ -310,11 +310,11 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* 
ctx) {
     TLoadTxnCommitRequest request;
     get_commit_request(ctx, request);
 
-    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     TLoadTxnCommitResult result;
 #ifndef BE_TEST
+    auto master_addr_provider = [this]() { return 
_exec_env->cluster_info()->master_fe_addr; };
     RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_addr.hostname, master_addr.port,
+            master_addr_provider,
             [&request, &result](FrontendServiceConnection& client) {
                 client->loadTxnCommit(result, request);
             },
@@ -342,7 +342,6 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext* 
ctx) {
 void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
     
DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1);
 
-    TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
     TLoadTxnRollbackRequest request;
     set_request_auth(&request, ctx->auth);
     request.__set_db(ctx->db);
@@ -363,9 +362,9 @@ void StreamLoadExecutor::rollback_txn(StreamLoadContext* 
ctx) {
 
     TLoadTxnRollbackResult result;
 #ifndef BE_TEST
+    auto master_addr_provider = [this]() { return 
_exec_env->cluster_info()->master_fe_addr; };
     auto rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
-            master_addr.hostname, master_addr.port,
-            [&request, &result](FrontendServiceConnection& client) {
+            master_addr_provider, [&request, 
&result](FrontendServiceConnection& client) {
                 client->loadTxnRollback(result, request);
             });
     if (!rpc_st.ok()) {
diff --git a/be/src/util/thrift_rpc_helper.cpp 
b/be/src/util/thrift_rpc_helper.cpp
index de5091cf7f1..bf26230c909 100644
--- a/be/src/util/thrift_rpc_helper.cpp
+++ b/be/src/util/thrift_rpc_helper.cpp
@@ -63,7 +63,16 @@ void ThriftRpcHelper::setup(ExecEnv* exec_env) {
 template <typename T>
 Status ThriftRpcHelper::rpc(const std::string& ip, const int32_t port,
                             std::function<void(ClientConnection<T>&)> 
callback, int timeout_ms) {
-    TNetworkAddress address = make_network_address(ip, port);
+    return rpc<T>([ip, port]() { return make_network_address(ip, port); }, 
callback, timeout_ms);
+}
+
+template <typename T>
+Status ThriftRpcHelper::rpc(std::function<TNetworkAddress()> address_provider,
+                            std::function<void(ClientConnection<T>&)> 
callback, int timeout_ms) {
+    TNetworkAddress address = address_provider();
+    if (address.hostname.empty() || address.port == 0) {
+        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE address is 
not available");
+    }
     Status status;
     DBUG_EXECUTE_IF("thriftRpcHelper.rpc.error", { timeout_ms = 30000; });
     ClientConnection<T> client(_s_exec_env->get_client_cache<T>(), address, 
timeout_ms, &status);
@@ -85,15 +94,36 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const 
int32_t port,
 #endif
             std::this_thread::sleep_for(
                     
std::chrono::milliseconds(config::thrift_client_retry_interval_ms));
-            status = client.reopen(timeout_ms);
-            if (!status.ok()) {
+            TNetworkAddress retry_address = address_provider();
+            if (retry_address.hostname.empty() || retry_address.port == 0) {
+                return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>("FE 
address is not available");
+            }
+            if (retry_address.hostname != address.hostname || 
retry_address.port != address.port) {
+#ifndef ADDRESS_SANITIZER
+                LOG(INFO) << "retrying call frontend service with new 
address=" << retry_address;
+#endif
+                Status retry_status;
+                ClientConnection<T> 
retry_client(_s_exec_env->get_client_cache<T>(), retry_address,
+                                                 timeout_ms, &retry_status);
+                if (!retry_status.ok()) {
+#ifndef ADDRESS_SANITIZER
+                    LOG(WARNING) << "Connect frontend failed, address=" << 
retry_address
+                                 << ", status=" << retry_status;
+#endif
+                    return retry_status;
+                }
+                callback(retry_client);
+            } else {
+                status = client.reopen(timeout_ms);
+                if (!status.ok()) {
 #ifndef ADDRESS_SANITIZER
-                LOG(WARNING) << "client reopen failed. address=" << address
-                             << ", status=" << status;
+                    LOG(WARNING) << "client reopen failed. address=" << address
+                                 << ", status=" << status;
 #endif
-                return status;
+                    return status;
+                }
+                callback(client);
             }
-            callback(client);
         }
     } catch (apache::thrift::TException& e) {
 #ifndef ADDRESS_SANITIZER
@@ -104,8 +134,8 @@ Status ThriftRpcHelper::rpc(const std::string& ip, const 
int32_t port,
                 
std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2));
         // just reopen to disable this connection
         static_cast<void>(client.reopen(timeout_ms));
-        return Status::RpcError("failed to call frontend service, FE 
address={}:{}, reason: {}", ip,
-                                port, e.what());
+        return Status::RpcError("failed to call frontend service, FE 
address={}:{}, reason: {}",
+                                address.hostname, address.port, e.what());
     }
     return Status::OK();
 }
@@ -114,6 +144,10 @@ template Status 
ThriftRpcHelper::rpc<FrontendServiceClient>(
         const std::string& ip, const int32_t port,
         std::function<void(ClientConnection<FrontendServiceClient>&)> 
callback, int timeout_ms);
 
+template Status ThriftRpcHelper::rpc<FrontendServiceClient>(
+        std::function<TNetworkAddress()> address_provider,
+        std::function<void(ClientConnection<FrontendServiceClient>&)> 
callback, int timeout_ms);
+
 template Status ThriftRpcHelper::rpc<BackendServiceClient>(
         const std::string& ip, const int32_t port,
         std::function<void(ClientConnection<BackendServiceClient>&)> callback, 
int timeout_ms);
diff --git a/be/src/util/thrift_rpc_helper.h b/be/src/util/thrift_rpc_helper.h
index cf876990e6e..3617967b815 100644
--- a/be/src/util/thrift_rpc_helper.h
+++ b/be/src/util/thrift_rpc_helper.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <gen_cpp/Types_types.h>
 #include <stdint.h>
 
 #include <functional>
@@ -43,10 +44,20 @@ public:
         return rpc(ip, port, callback, config::thrift_rpc_timeout_ms);
     }
 
+    template <typename T>
+    static Status rpc(std::function<TNetworkAddress()> address_provider,
+                      std::function<void(ClientConnection<T>&)> callback) {
+        return rpc(address_provider, callback, config::thrift_rpc_timeout_ms);
+    }
+
     template <typename T>
     static Status rpc(const std::string& ip, const int32_t port,
                       std::function<void(ClientConnection<T>&)> callback, int 
timeout_ms);
 
+    template <typename T>
+    static Status rpc(std::function<TNetworkAddress()> address_provider,
+                      std::function<void(ClientConnection<T>&)> callback, int 
timeout_ms);
+
     static ExecEnv* get_exec_env() { return _s_exec_env; }
 
 private:
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 938e7cbe49e..9f8b9da0d3b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3649,7 +3649,7 @@ public class Config extends ConfigBase {
                     + "(for example CreateRepositoryStmt, 
CreatePolicyCommand), separated by commas."})
     public static String block_sql_ast_names = "";
 
-    public static long meta_service_rpc_reconnect_interval_ms = 5000;
+    public static long meta_service_rpc_reconnect_interval_ms = 100;
 
     public static long meta_service_rpc_retry_cnt = 10;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 049344bafe6..567d99c1939 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -178,10 +178,12 @@ public class MetaServiceProxy {
             long maxRetries = Config.meta_service_rpc_retry_cnt;
             for (long tried = 1; tried <= maxRetries; tried++) {
                 MetaServiceClient client = null;
+                boolean requestFailed = false;
                 try {
                     client = proxy.getProxy();
                     return function.apply(client);
                 } catch (StatusRuntimeException sre) {
+                    requestFailed = true;
                     LOG.warn("failed to request meta service code {}, msg {}, 
trycnt {}", sre.getStatus().getCode(),
                             sre.getMessage(), tried);
                     boolean shouldRetry = false;
@@ -200,12 +202,13 @@ public class MetaServiceProxy {
                         throw new RpcException("", sre.getMessage(), sre);
                     }
                 } catch (Exception e) {
+                    requestFailed = true;
                     LOG.warn("failed to request meta servive trycnt {}", 
tried, e);
                     if (tried >= maxRetries) {
                         throw new RpcException("", e.getMessage(), e);
                     }
                 } finally {
-                    if (proxy.needReconn() && client != null) {
+                    if (requestFailed && proxy.needReconn() && client != null) 
{
                         client.shutdown(true);
                     }
                 }
@@ -227,7 +230,35 @@ public class MetaServiceProxy {
 
     public Future<Cloud.GetVersionResponse> 
getVisibleVersionAsync(Cloud.GetVersionRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> 
client.getVisibleVersionAsync(request));
+        MetaServiceClient client = null;
+        try {
+            client = getProxy();
+            Future<Cloud.GetVersionResponse> future = 
client.getVisibleVersionAsync(request);
+            if (future instanceof 
com.google.common.util.concurrent.ListenableFuture) {
+                
com.google.common.util.concurrent.ListenableFuture<Cloud.GetVersionResponse> 
listenableFuture =
+                        
(com.google.common.util.concurrent.ListenableFuture<Cloud.GetVersionResponse>) 
future;
+                MetaServiceClient finalClient = client;
+                
com.google.common.util.concurrent.Futures.addCallback(listenableFuture,
+                        new 
com.google.common.util.concurrent.FutureCallback<Cloud.GetVersionResponse>() {
+                            @Override
+                            public void onSuccess(Cloud.GetVersionResponse 
result) {
+                            }
+
+                            @Override
+                            public void onFailure(Throwable t) {
+                                if (finalClient != null) {
+                                    finalClient.shutdown(true);
+                                }
+                            }
+                        }, 
com.google.common.util.concurrent.MoreExecutors.directExecutor());
+            }
+            return future;
+        } catch (Exception e) {
+            if (client != null) {
+                client.shutdown(true);
+            }
+            throw new RpcException("", e.getMessage(), e);
+        }
     }
 
     public Cloud.GetVersionResponse getVersion(Cloud.GetVersionRequest 
request) throws RpcException {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
new file mode 100644
index 00000000000..2d5a4353265
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.rpc;
+
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.rpc.RpcException;
+
+import com.google.common.util.concurrent.SettableFuture;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Map;
+import java.util.Queue;
+
+public class MetaServiceProxyTest {
+    private String originEndpoint;
+    private long originReconnectIntervalMs;
+    private long originRetryCnt;
+
+    @Before
+    public void setUp() {
+        originEndpoint = Config.meta_service_endpoint;
+        originReconnectIntervalMs = 
Config.meta_service_rpc_reconnect_interval_ms;
+        originRetryCnt = Config.meta_service_rpc_retry_cnt;
+
+        Config.meta_service_endpoint = "127.0.0.1:12345";
+        Config.meta_service_rpc_reconnect_interval_ms = 0;
+        Config.meta_service_rpc_retry_cnt = 1;
+    }
+
+    @After
+    public void tearDown() {
+        Config.meta_service_endpoint = originEndpoint;
+        Config.meta_service_rpc_reconnect_interval_ms = 
originReconnectIntervalMs;
+        Config.meta_service_rpc_retry_cnt = originRetryCnt;
+    }
+
+    @Test
+    public void testExecuteRequestNoShutdownOnSuccess() throws RpcException {
+        MetaServiceProxy proxy = new MetaServiceProxy();
+        MetaServiceClient client = Mockito.mock(MetaServiceClient.class);
+        Mockito.when(client.isNormalState()).thenReturn(true);
+        Mockito.when(client.isConnectionAgeExpired()).thenReturn(false);
+
+        Map<String, MetaServiceClient> serviceMap = 
Deencapsulation.getField(proxy, "serviceMap");
+        serviceMap.put(Config.meta_service_endpoint, client);
+        Queue<Long> lastConnTimeMs = Deencapsulation.getField(proxy, 
"lastConnTimeMs");
+        lastConnTimeMs.clear();
+        lastConnTimeMs.add(0L);
+        lastConnTimeMs.add(0L);
+        lastConnTimeMs.add(0L);
+
+        MetaServiceProxy.MetaServiceClientWrapper wrapper = 
Deencapsulation.getField(proxy, "w");
+        String response = wrapper.executeRequest((ignored) -> "ok");
+        Assert.assertEquals("ok", response);
+        Mockito.verify(client, Mockito.never()).shutdown(Mockito.anyBoolean());
+    }
+
+    @Test
+    public void testExecuteRequestShutdownOnFailure() {
+        MetaServiceProxy proxy = new MetaServiceProxy();
+        MetaServiceClient client = Mockito.mock(MetaServiceClient.class);
+        Mockito.when(client.isNormalState()).thenReturn(true);
+        Mockito.when(client.isConnectionAgeExpired()).thenReturn(false);
+
+        Map<String, MetaServiceClient> serviceMap = 
Deencapsulation.getField(proxy, "serviceMap");
+        serviceMap.put(Config.meta_service_endpoint, client);
+        Queue<Long> lastConnTimeMs = Deencapsulation.getField(proxy, 
"lastConnTimeMs");
+        lastConnTimeMs.clear();
+        lastConnTimeMs.add(0L);
+        lastConnTimeMs.add(0L);
+        lastConnTimeMs.add(0L);
+
+        MetaServiceProxy.MetaServiceClientWrapper wrapper = 
Deencapsulation.getField(proxy, "w");
+        try {
+            wrapper.executeRequest((ignored) -> {
+                throw new RuntimeException("rpc failed");
+            });
+            Assert.fail("should throw RpcException");
+        } catch (RpcException ignored) {
+            // expected
+        }
+
+        Mockito.verify(client).shutdown(true);
+    }
+
+    @Test
+    public void testGetVisibleVersionAsyncShutdownOnFailure() throws 
RpcException {
+        MetaServiceProxy proxy = new MetaServiceProxy();
+        MetaServiceClient client = Mockito.mock(MetaServiceClient.class);
+        Mockito.when(client.isNormalState()).thenReturn(true);
+        Mockito.when(client.isConnectionAgeExpired()).thenReturn(false);
+
+        SettableFuture<Cloud.GetVersionResponse> future = 
SettableFuture.create();
+        
Mockito.when(client.getVisibleVersionAsync(Mockito.any())).thenReturn(future);
+
+        Map<String, MetaServiceClient> serviceMap = 
Deencapsulation.getField(proxy, "serviceMap");
+        serviceMap.put(Config.meta_service_endpoint, client);
+
+        Cloud.GetVersionRequest request = 
Cloud.GetVersionRequest.newBuilder().build();
+        proxy.getVisibleVersionAsync(request);
+
+        future.setException(new RuntimeException("async failed"));
+
+        Mockito.verify(client).shutdown(true);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to