This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7fc78e3f87e [opt](brpc) check and remove unavailable brpc stubs
(#43212) (#43859)
7fc78e3f87e is described below
commit 7fc78e3f87e0c3fd85dd1de9fa55e1d3f27a6bf4
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Nov 14 19:52:06 2024 +0800
[opt](brpc) check and remove unavailable brpc stubs (#43212) (#43859)
---
be/src/common/config.cpp | 5 +++
be/src/common/config.h | 4 ++
be/src/runtime/fragment_mgr.cpp | 68 +++++++++++++++++++++++++++++++-
be/src/runtime/fragment_mgr.h | 9 +++++
be/src/runtime/query_context.h | 23 +++++++++++
be/src/vec/sink/vdata_stream_sender.cpp | 9 +++++
regression-test/pipeline/p0/conf/be.conf | 1 +
regression-test/pipeline/p1/conf/be.conf | 2 +
8 files changed, 120 insertions(+), 1 deletion(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0878d27e833..c239dc3e72d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -529,6 +529,9 @@ DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
//https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely
DEFINE_Bool(enable_brpc_builtin_services, "true");
+// Enable brpc connection check
+DEFINE_Bool(enable_brpc_connection_check, "false");
+
// The maximum amount of data that can be processed by a stream load
DEFINE_mInt64(streaming_load_max_mb, "102400");
// Some data formats, such as JSON, cannot be streamed.
@@ -964,6 +967,8 @@ DEFINE_mInt64(brpc_streaming_client_batch_bytes, "262144");
// so as to avoid occupying the execution thread for a long time.
DEFINE_mInt32(max_fragment_start_wait_time_seconds, "30");
+DEFINE_mInt32(fragment_mgr_cancel_worker_interval_seconds, "1");
+
// Node role tag for backend. Mix role is the default role, and computation
role have no
// any tablet.
DEFINE_String(be_node_role, "mix");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a282abb37ee..289c56464f3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1014,6 +1014,8 @@ DECLARE_mInt64(brpc_streaming_client_batch_bytes);
DECLARE_Bool(enable_brpc_builtin_services);
+DECLARE_Bool(enable_brpc_connection_check);
+
// Max waiting time to wait the "plan fragment start" rpc.
// If timeout, the fragment will be cancelled.
// This parameter is usually only used when the FE loses connection,
@@ -1021,6 +1023,8 @@ DECLARE_Bool(enable_brpc_builtin_services);
// so as to avoid occupying the execution thread for a long time.
DECLARE_mInt32(max_fragment_start_wait_time_seconds);
+DECLARE_Int32(fragment_mgr_cancel_worker_interval_seconds);
+
// Node role tag for backend. Mix role is the default role, and computation
role have no
// any tablet.
DECLARE_String(be_node_role);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 3a4c752ccae..bcb48559178 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -17,6 +17,7 @@
#include "runtime/fragment_mgr.h"
+#include <brpc/controller.h>
#include <bvar/latency_recorder.h>
#include <exprs/runtime_filter.h>
#include <fmt/format.h>
@@ -84,6 +85,7 @@
#include "runtime/workload_group/workload_group_manager.h"
#include "runtime/workload_management/workload_query_info.h"
#include "service/backend_options.h"
+#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
@@ -1284,6 +1286,7 @@ void FragmentMgr::cancel_worker() {
}
VecDateTimeValue now = VecDateTimeValue::local_time();
+ std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem>
brpc_stub_with_queries;
{
std::lock_guard<std::mutex> lock(_lock);
for (auto& fragment_instance_itr : _fragment_instance_map) {
@@ -1291,6 +1294,7 @@ void FragmentMgr::cancel_worker() {
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
}
}
+
for (auto& pipeline_itr : _pipeline_map) {
if (pipeline_itr.second->is_timeout(now)) {
std::vector<TUniqueId> ins_ids;
@@ -1308,6 +1312,18 @@ void FragmentMgr::cancel_worker() {
LOG_WARNING("Query {} is timeout", print_id(it->first));
it = _query_ctx_map.erase(it);
} else {
+ if (config::enable_brpc_connection_check) {
+ auto brpc_stubs = it->second->get_using_brpc_stubs();
+ for (auto& item : brpc_stubs) {
+ if (!brpc_stub_with_queries.contains(item.second))
{
+ brpc_stub_with_queries.emplace(item.second,
+ BrpcItem
{item.first, {it->second}});
+ } else {
+
brpc_stub_with_queries[item.second].queries.emplace_back(
+ it->second);
+ }
+ }
+ }
++it;
}
}
@@ -1431,7 +1447,11 @@ void FragmentMgr::cancel_worker() {
std::string("Coordinator dead."));
}
- } while
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
+ for (auto it : brpc_stub_with_queries) {
+ _check_brpc_available(it.first, it.second);
+ }
+ } while (!_stop_background_threads_latch.wait_for(
+
std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
}
@@ -1448,6 +1468,52 @@ void FragmentMgr::debug(std::stringstream& ss) {
}
}
+void FragmentMgr::_check_brpc_available(const
std::shared_ptr<PBackendService_Stub>& brpc_stub,
+ const BrpcItem& brpc_item) {
+ const std::string message = "hello doris!";
+ std::string error_message;
+ int32_t failed_count = 0;
+ while (true) {
+ PHandShakeRequest request;
+ request.set_hello(message);
+ PHandShakeResponse response;
+ brpc::Controller cntl;
+ cntl.set_timeout_ms(500 * (failed_count + 1));
+ cntl.set_max_retry(10);
+ brpc_stub->hand_shake(&cntl, &request, &response, nullptr);
+
+ if (cntl.Failed()) {
+ error_message = cntl.ErrorText();
+ LOG(WARNING) << "brpc stub: " <<
brpc_item.network_address.hostname << ":"
+ << brpc_item.network_address.port << " check failed:
" << error_message;
+ } else if (response.has_status() && response.status().status_code() ==
0) {
+ break;
+ } else {
+ error_message = response.DebugString();
+ LOG(WARNING) << "brpc stub: " <<
brpc_item.network_address.hostname << ":"
+ << brpc_item.network_address.port << " check failed:
" << error_message;
+ }
+ failed_count++;
+ if (failed_count == 2) {
+ for (const auto& query_wptr : brpc_item.queries) {
+ auto query = query_wptr.lock();
+ if (query && !query->is_cancelled()) {
+ cancel_query(query->query_id(),
PPlanFragmentCancelReason::INTERNAL_ERROR,
+ fmt::format("brpc(dest: {}:{}) check failed:
{}",
+
brpc_item.network_address.hostname,
+ brpc_item.network_address.port,
error_message));
+ }
+ }
+
+ LOG(WARNING) << "remove brpc stub from cache: " <<
brpc_item.network_address.hostname
+ << ":" << brpc_item.network_address.port << ", error:
" << error_message;
+ ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+ brpc_item.network_address.hostname,
brpc_item.network_address.port);
+ break;
+ }
+ }
+}
+
/*
* 1. resolve opaqued_query_plan to thrift structure
* 2. build TExecPlanFragmentParams
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 16ad368ae61..0c1bb3033d9 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -159,6 +159,12 @@ private:
void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor,
const FinishCallback& cb);
+ struct BrpcItem {
+ TNetworkAddress network_address;
+ std::vector<std::weak_ptr<QueryContext>> queries;
+ };
+
+ std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId&
query_id);
template <typename Param>
void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
@@ -177,6 +183,9 @@ private:
Status _get_query_ctx(const Params& params, TUniqueId query_id, bool
pipeline,
QuerySource query_type,
std::shared_ptr<QueryContext>& query_ctx);
+ void _check_brpc_available(const std::shared_ptr<PBackendService_Stub>&
brpc_stub,
+ const BrpcItem& brpc_item);
+
// This is input params
ExecEnv* _exec_env = nullptr;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 862da39bfae..e781ae61cab 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -19,6 +19,7 @@
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
#include <atomic>
#include <memory>
@@ -305,6 +306,25 @@ public:
}
}
+ void add_using_brpc_stub(const TNetworkAddress& network_address,
+ std::shared_ptr<PBackendService_Stub> brpc_stub) {
+ if (network_address.port == 0) {
+ return;
+ }
+ std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
+ if (!_using_brpc_stubs.contains(network_address)) {
+ _using_brpc_stubs.emplace(network_address, brpc_stub);
+ }
+
+ DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get());
+ }
+
+ std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
+ get_using_brpc_stubs() {
+ std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
+ return _using_brpc_stubs;
+ }
+
private:
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
@@ -363,6 +383,9 @@ private:
// help us manage the query.
QuerySource _query_source;
+ std::mutex _brpc_stubs_mutex;
+ std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
_using_brpc_stubs;
+
public:
timespec get_query_arrival_timestamp() const { return
this->_query_arrival_timestamp; }
QuerySource get_query_source() const { return this->_query_source; }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 394005f6adf..0733c39621e 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -21,6 +21,7 @@
#include <fmt/ranges.h> // IWYU pragma: keep
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/Metrics_types.h>
+#include <gen_cpp/Types_types.h>
#include <gen_cpp/data.pb.h>
#include <gen_cpp/internal_service.pb.h>
#include <glog/logging.h>
@@ -131,8 +132,16 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
_brpc_stub =
state->exec_env()->brpc_internal_client_cache()->get_client(
"127.0.0.1", _brpc_dest_addr.port);
+ if (config::enable_brpc_connection_check) {
+ auto network_address = _brpc_dest_addr;
+ network_address.hostname = "127.0.0.1";
+ state->get_query_ctx()->add_using_brpc_stub(network_address,
_brpc_stub);
+ }
} else {
_brpc_stub =
state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr);
+ if (config::enable_brpc_connection_check) {
+ state->get_query_ctx()->add_using_brpc_stub(_brpc_dest_addr,
_brpc_stub);
+ }
}
if (!_brpc_stub) {
diff --git a/regression-test/pipeline/p0/conf/be.conf
b/regression-test/pipeline/p0/conf/be.conf
index 3aaf8777b37..b7565d721c6 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -90,3 +90,4 @@ enable_missing_rows_correctness_check=true
#enable_jvm_monitor = true
crash_in_memory_tracker_inaccurate = true
+enable_brpc_connection_check=true
diff --git a/regression-test/pipeline/p1/conf/be.conf
b/regression-test/pipeline/p1/conf/be.conf
index e9bf1dbdd88..356e045e31b 100644
--- a/regression-test/pipeline/p1/conf/be.conf
+++ b/regression-test/pipeline/p1/conf/be.conf
@@ -82,3 +82,5 @@ enable_missing_rows_correctness_check=true
enable_jvm_monitor = true
crash_in_memory_tracker_inaccurate = true
+enable_table_size_correctness_check=true
+enable_brpc_connection_check=true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]