This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 920c89c3f4a [enhancement](cloud) reconnect after the RPC request to
the meta service fails (#45668) (#46358)
920c89c3f4a is described below
commit 920c89c3f4a0f79b71647b691d8e95a69553b037
Author: Luwei <[email protected]>
AuthorDate: Fri Jan 3 21:11:18 2025 +0800
[enhancement](cloud) reconnect after the RPC request to the meta service
fails (#45668) (#46358)
pick #45668
---------
Co-authored-by: Gavin Chou <[email protected]>
---
be/src/cloud/cloud_meta_mgr.cpp | 191 ++++++++++++++-------
be/src/cloud/config.cpp | 4 +
be/src/cloud/config.h | 4 +
.../main/java/org/apache/doris/common/Config.java | 4 +
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 52 +++++-
5 files changed, 191 insertions(+), 64 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 55ffe46286c..747baea780c 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -139,7 +139,71 @@ class MetaServiceProxy {
public:
static Status get_client(std::shared_ptr<MetaService_Stub>* stub) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client",
Status::OK(), stub);
- return get_pooled_client(stub);
+ return get_pooled_client(stub, nullptr);
+ }
+
+ static Status get_proxy(MetaServiceProxy** proxy) {
+ // The 'stub' is a useless parameter, added only to reuse the
`get_pooled_client` function.
+ std::shared_ptr<MetaService_Stub> stub;
+ return get_pooled_client(&stub, proxy);
+ }
+
+ void set_unhealthy() {
+ std::unique_lock lock(_mutex);
+ maybe_unhealthy = true;
+ }
+
+ bool need_reconn(long now) {
+ return maybe_unhealthy && ((now - last_reconn_time_ms.front()) >
+
config::meta_service_rpc_reconnect_interval_ms);
+ }
+
+ Status get(std::shared_ptr<MetaService_Stub>* stub) {
+ using namespace std::chrono;
+
+ auto now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ {
+ std::shared_lock lock(_mutex);
+ if (_deadline_ms >= now && !is_idle_timeout(now) &&
!need_reconn(now)) {
+ _last_access_at_ms.store(now, std::memory_order_relaxed);
+ *stub = _stub;
+ return Status::OK();
+ }
+ }
+
+ auto channel = std::make_unique<brpc::Channel>();
+ Status s = init_channel(channel.get());
+ if (!s.ok()) [[unlikely]] {
+ return s;
+ }
+
+ *stub = std::make_shared<MetaService_Stub>(channel.release(),
+
google::protobuf::Service::STUB_OWNS_CHANNEL);
+
+ long deadline = now;
+ // connection age only works without list endpoint.
+ if (!is_meta_service_endpoint_list() &&
+ config::meta_service_connection_age_base_seconds > 0) {
+ std::default_random_engine rng(static_cast<uint32_t>(now));
+ std::uniform_int_distribution<> uni(
+ config::meta_service_connection_age_base_seconds,
+ config::meta_service_connection_age_base_seconds * 2);
+ deadline = now +
duration_cast<milliseconds>(seconds(uni(rng))).count();
+ } else {
+ deadline = LONG_MAX;
+ }
+
+ // Last one WIN
+ std::unique_lock lock(_mutex);
+ _last_access_at_ms.store(now, std::memory_order_relaxed);
+ _deadline_ms = deadline;
+ _stub = *stub;
+
+ last_reconn_time_ms.push(now);
+ last_reconn_time_ms.pop();
+ maybe_unhealthy = false;
+
+ return Status::OK();
}
private:
@@ -147,7 +211,17 @@ private:
return config::meta_service_endpoint.find(',') != std::string::npos;
}
- static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
+ /**
+ * This function initializes a pool of `MetaServiceProxy` objects and
selects one using
+ * round-robin. It returns a client stub via the selected proxy.
+ *
+ * @param stub A pointer to a shared pointer of `MetaService_Stub` to be
retrieved.
+ * @param proxy (Optional) A pointer to store the selected
`MetaServiceProxy`.
+ *
+ * @return Status Returns `Status::OK()` on success or an error status on
failure.
+ */
+ static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub,
+ MetaServiceProxy** proxy) {
static std::once_flag proxies_flag;
static size_t num_proxies = 1;
static std::atomic<size_t> index(0);
@@ -164,10 +238,16 @@ private:
for (size_t i = 0; i + 1 < num_proxies; ++i) {
size_t next_index = index.fetch_add(1, std::memory_order_relaxed)
% num_proxies;
Status s = proxies[next_index].get(stub);
+ if (proxy != nullptr) {
+ *proxy = &(proxies[next_index]);
+ }
if (s.ok()) return Status::OK();
}
size_t next_index = index.fetch_add(1, std::memory_order_relaxed) %
num_proxies;
+ if (proxy != nullptr) {
+ *proxy = &(proxies[next_index]);
+ }
return proxies[next_index].get(stub);
}
@@ -220,53 +300,13 @@ private:
_last_access_at_ms.load(std::memory_order_relaxed) +
idle_timeout_ms < now;
}
- Status get(std::shared_ptr<MetaService_Stub>* stub) {
- using namespace std::chrono;
-
- auto now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- {
- std::shared_lock lock(_mutex);
- if (_deadline_ms >= now && !is_idle_timeout(now)) {
- _last_access_at_ms.store(now, std::memory_order_relaxed);
- *stub = _stub;
- return Status::OK();
- }
- }
-
- auto channel = std::make_unique<brpc::Channel>();
- Status s = init_channel(channel.get());
- if (!s.ok()) [[unlikely]] {
- return s;
- }
-
- *stub = std::make_shared<MetaService_Stub>(channel.release(),
-
google::protobuf::Service::STUB_OWNS_CHANNEL);
-
- long deadline = now;
- // connection age only works without list endpoint.
- if (!is_meta_service_endpoint_list() &&
- config::meta_service_connection_age_base_seconds > 0) {
- std::default_random_engine rng(static_cast<uint32_t>(now));
- std::uniform_int_distribution<> uni(
- config::meta_service_connection_age_base_seconds,
- config::meta_service_connection_age_base_seconds * 2);
- deadline = now +
duration_cast<milliseconds>(seconds(uni(rng))).count();
- } else {
- deadline = LONG_MAX;
- }
-
- // Last one WIN
- std::unique_lock lock(_mutex);
- _last_access_at_ms.store(now, std::memory_order_relaxed);
- _deadline_ms = deadline;
- _stub = *stub;
- return Status::OK();
- }
-
std::shared_mutex _mutex;
std::atomic<long> _last_access_at_ms {0};
long _deadline_ms {0};
std::shared_ptr<MetaService_Stub> _stub;
+
+ std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}};
+ bool maybe_unhealthy = false;
};
template <typename T, typename... Ts>
@@ -323,9 +363,11 @@ Status retry_rpc(std::string_view op_name, const Request&
req, Response* res,
std::default_random_engine rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
std::uniform_int_distribution<uint32_t> u2(500, 1000);
- std::shared_ptr<MetaService_Stub> stub;
- RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+ MetaServiceProxy* proxy;
+ RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
while (true) {
+ std::shared_ptr<MetaService_Stub> stub;
+ RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
cntl.set_max_retry(kBrpcRetryTimes);
@@ -333,6 +375,7 @@ Status retry_rpc(std::string_view op_name, const Request&
req, Response* res,
(stub.get()->*method)(&cntl, &req, res, nullptr);
if (cntl.Failed()) [[unlikely]] {
error_msg = cntl.ErrorText();
+ proxy->set_unhealthy();
} else if (res->status().code() == MetaServiceCode::OK) {
return Status::OK();
} else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
@@ -388,11 +431,12 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet*
tablet, bool warmup_delta_
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets",
Status::OK(), tablet);
- std::shared_ptr<MetaService_Stub> stub;
- RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
-
+ MetaServiceProxy* proxy;
+ RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
int tried = 0;
while (true) {
+ std::shared_ptr<MetaService_Stub> stub;
+ RETURN_IF_ERROR(proxy->get(&stub));
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
GetRowsetRequest req;
@@ -430,6 +474,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet*
tablet, bool warmup_delta_
_get_rowset_latency << latency;
int retry_times = config::meta_service_rpc_retry_times;
if (cntl.Failed()) {
+ proxy->set_unhealthy();
if (tried++ < retry_times) {
auto rng = make_random_engine();
std::uniform_int_distribution<uint32_t> u(20, 200);
@@ -633,15 +678,10 @@ Status
CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
*delete_bitmap = *new_delete_bitmap;
}
- std::shared_ptr<MetaService_Stub> stub;
- RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
-
int64_t new_max_version = std::max(old_max_version,
rs_metas.rbegin()->end_version());
- brpc::Controller cntl;
// When there are many delete bitmaps that need to be synchronized, it
// may take a longer time, especially when loading the tablet for the
// first time, so set a relatively long timeout time.
- cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
GetDeleteBitmapRequest req;
GetDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
@@ -669,10 +709,43 @@ Status
CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
}
VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
- stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
- if (cntl.Failed()) {
- return Status::RpcError("failed to get delete bitmap: {}",
cntl.ErrorText());
+
+ int retry_times = 0;
+ MetaServiceProxy* proxy;
+ RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
+ auto start = std::chrono::high_resolution_clock::now();
+ while (true) {
+ std::shared_ptr<MetaService_Stub> stub;
+ RETURN_IF_ERROR(proxy->get(&stub));
+ // When there are many delete bitmaps that need to be synchronized, it
+ // may take a longer time, especially when loading the tablet for the
+ // first time, so set a relatively long timeout time.
+ brpc::Controller cntl;
+ cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
+ cntl.set_max_retry(kBrpcRetryTimes);
+ res.Clear();
+ stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
+ if (cntl.Failed()) [[unlikely]] {
+ LOG_INFO("failed to get delete bitmap")
+ .tag("reason", cntl.ErrorText())
+ .tag("tablet_id", tablet->tablet_id())
+ .tag("partition_id", tablet->partition_id())
+ .tag("tried", retry_times);
+ proxy->set_unhealthy();
+ } else {
+ break;
+ }
+
+ if (++retry_times > config::delete_bitmap_rpc_retry_times) {
+ if (cntl.Failed()) {
+ return Status::RpcError("failed to get delete bitmap,
tablet={} err={}",
+ tablet->tablet_id(), cntl.ErrorText());
+ }
+ break;
+ }
}
+ auto end = std::chrono::high_resolution_clock::now();
+
if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get delete bitmap: {}",
res.status().msg());
}
@@ -722,7 +795,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet*
tablet, int64_t old_
{rst_id, segment_ids[i], vers[i]},
roaring::Roaring::readSafe(delete_bitmaps[i].data(),
delete_bitmaps[i].length()));
}
- int64_t latency = cntl.latency_us();
+ int64_t latency =
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
if (latency > 100 * 1000) { // 100ms
LOG(INFO) << "finish get_delete_bitmap rpc. rowset_ids.size()=" <<
rowset_ids.size()
<< ", delete_bitmaps.size()=" << delete_bitmaps.size() << ",
latency=" << latency
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index feb81d0a074..1b568741442 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -77,5 +77,9 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");
DEFINE_mBool(enable_cloud_tablet_report, "false");
+
+DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");
+
+DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");
#include "common/compile_check_end.h"
} // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index a8a7c0c48ec..50f058bf8b0 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -111,5 +111,9 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe);
DECLARE_Bool(enable_cloud_tablet_report);
+DECLARE_mInt32(delete_bitmap_rpc_retry_times);
+
+DECLARE_mInt64(meta_service_rpc_reconnect_interval_ms);
+
#include "common/compile_check_end.h"
} // namespace doris::config
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2f61f8242eb..a9a86601002 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3248,4 +3248,8 @@ public class Config extends ConfigBase {
"For disabling certain SQL queries, the configuration item is a
list of simple class names of AST"
+ "(for example CreateRepositoryStmt,
CreatePolicyCommand), separated by commas."})
public static String block_sql_ast_names = "";
+
+ public static long meta_service_rpc_reconnect_interval_ms = 5000;
+
+ public static long meta_service_rpc_retry_cnt = 10;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index d7f718e3ca4..2a69132e44b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -27,7 +27,10 @@ import io.grpc.StatusRuntimeException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -41,6 +44,7 @@ public class MetaServiceProxy {
// use concurrent map to allow access serviceMap in multi thread.
private ReentrantLock lock = new ReentrantLock();
private final Map<String, MetaServiceClient> serviceMap;
+ private Queue<Long> lastConnTimeMs = new LinkedList<>();
static {
if (Config.isCloudMode() && (Config.meta_service_endpoint == null ||
Config.meta_service_endpoint.isEmpty())) {
@@ -50,6 +54,9 @@ public class MetaServiceProxy {
public MetaServiceProxy() {
this.serviceMap = Maps.newConcurrentMap();
+ for (int i = 0; i < 3; ++i) {
+ lastConnTimeMs.add(0L);
+ }
}
private static class SingletonHolder {
@@ -77,6 +84,16 @@ public class MetaServiceProxy {
return MetaServiceProxy.SingletonHolder.get();
}
+ public boolean needReconn() {
+ lock.lock();
+ try {
+ long now = System.currentTimeMillis();
+ return (now - lastConnTimeMs.element() >
Config.meta_service_rpc_reconnect_interval_ms);
+ } finally {
+ lock.unlock();
+ }
+ }
+
public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest
request)
throws RpcException {
try {
@@ -138,6 +155,8 @@ public class MetaServiceProxy {
if (service == null) {
service = new MetaServiceClient(address);
serviceMap.put(address, service);
+ lastConnTimeMs.add(System.currentTimeMillis());
+ lastConnTimeMs.remove();
}
return service;
} finally {
@@ -150,6 +169,7 @@ public class MetaServiceProxy {
public static class MetaServiceClientWrapper {
private final MetaServiceProxy proxy;
+ private Random random = new Random();
public MetaServiceClientWrapper(MetaServiceProxy proxy) {
this.proxy = proxy;
@@ -157,18 +177,40 @@ public class MetaServiceProxy {
public <Response> Response executeRequest(Function<MetaServiceClient,
Response> function) throws RpcException {
int tried = 0;
- while (tried++ < 3) {
+ while (tried++ < Config.meta_service_rpc_retry_cnt) {
+ MetaServiceClient client = null;
try {
- MetaServiceClient client = proxy.getProxy();
+ client = proxy.getProxy();
return function.apply(client);
} catch (StatusRuntimeException sre) {
- if (sre.getStatus().getCode() == Status.Code.UNAVAILABLE
|| tried == 3) {
+ LOG.info("failed to request meta servive code {}, msg {},
trycnt {}", sre.getStatus().getCode(),
+ sre.getMessage(), tried);
+ if (tried >= Config.meta_service_rpc_retry_cnt
+ || (sre.getStatus().getCode() !=
Status.Code.UNAVAILABLE
+ && sre.getStatus().getCode() !=
Status.Code.UNKNOWN)) {
throw new RpcException("", sre.getMessage(), sre);
}
} catch (Exception e) {
- throw new RpcException("", e.getMessage(), e);
+ LOG.info("failed to request meta servive trycnt {}",
tried, e);
+ if (tried >= Config.meta_service_rpc_retry_cnt) {
+ throw new RpcException("", e.getMessage(), e);
+ }
} catch (Throwable t) {
- throw new RpcException("", t.getMessage());
+ LOG.info("failed to request meta servive trycnt {}",
tried, t);
+ if (tried >= Config.meta_service_rpc_retry_cnt) {
+ throw new RpcException("", t.getMessage());
+ }
+ }
+
+ if (proxy.needReconn() && client != null) {
+ client.shutdown(true);
+ }
+
+ int delay = 20 + random.nextInt(200 - 20 + 1);
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException interruptedException) {
+ // ignore
}
}
return null; // impossible and unreachable, just make the compiler
happy
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]