BiteTheDDDDt commented on code in PR #64851:
URL: https://github.com/apache/doris/pull/64851#discussion_r3504117213
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -48,6 +51,219 @@
namespace doris {
+namespace {
+
+constexpr int DEFAULT_RUNTIME_FILTER_TREE_PUBLISH_FANOUT = 4;
Review Comment:
Resolved by the current version. The default/fallback handling has been
removed; missing or empty `target_fragment_ids` is treated as an invariant
violation with `DORIS_CHECK`, and tree publish only builds from explicit target
fragment ids.
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -48,6 +51,219 @@
namespace doris {
+namespace {
+
+constexpr int DEFAULT_RUNTIME_FILTER_TREE_PUBLISH_FANOUT = 4;
+constexpr int MAX_RUNTIME_FILTER_TREE_PUBLISH_FANOUT = 128;
+constexpr int64_t DEFAULT_RUNTIME_FILTER_TREE_PUBLISH_MAX_SEND_BYTES = 256L *
1024L * 1024L;
+
+TNetworkAddress to_thrift_network_address(const PNetworkAddress& address) {
+ TNetworkAddress result;
+ result.hostname = address.hostname();
+ result.port = address.port();
+ return result;
+}
+
+PNetworkAddress to_proto_network_address(const TNetworkAddress& address) {
+ PNetworkAddress result;
+ result.set_hostname(address.hostname);
+ result.set_port(address.port);
+ return result;
+}
+
+bool can_use_tree_publish_targets(const
std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+ return std::all_of(targets.begin(), targets.end(), [](const auto& target) {
+ return target.__isset.target_fragment_ids &&
!target.target_fragment_ids.empty();
+ });
+}
+
+std::vector<RuntimeFilterPublishTarget> build_runtime_filter_publish_targets(
+ const std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+ std::vector<RuntimeFilterPublishTarget> publish_targets;
+ publish_targets.reserve(targets.size());
+ for (const auto& target : targets) {
+ DORIS_CHECK(target.__isset.target_fragment_ids);
+ DORIS_CHECK(!target.target_fragment_ids.empty());
+ RuntimeFilterPublishTarget publish_target;
+ publish_target.addr =
to_proto_network_address(target.target_fragment_instance_addr);
+ publish_target.fragment_ids = target.target_fragment_ids;
+ publish_targets.emplace_back(std::move(publish_target));
+ }
+ return publish_targets;
+}
+
+class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure {
+public:
+ explicit
RuntimeFilterRelayRpcClosure(std::shared_ptr<PPublishFilterRequestV2> request)
+ : _request(std::move(request)) {}
+
+ void Run() override {
+ std::unique_ptr<RuntimeFilterRelayRpcClosure> self(this);
+ if (_cntl->Failed()) {
+ LOG(WARNING) << "Runtime filter relay rpc failed: " <<
_cntl->ErrorText();
+ return;
+ }
+ Status st = Status::create(_response->status());
+ if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>()) {
+ LOG(WARNING) << "Runtime filter relay rpc returns error status: "
<< st;
+ }
+ }
+
+ brpc::Controller* cntl() { return _cntl.get(); }
+ PPublishFilterRequestV2* request() { return _request.get(); }
+ PPublishFilterResponse* response() { return _response.get(); }
+
+private:
+ std::unique_ptr<brpc::Controller> _cntl =
std::make_unique<brpc::Controller>();
+ std::shared_ptr<PPublishFilterRequestV2> _request;
+ std::unique_ptr<PPublishFilterResponse> _response =
std::make_unique<PPublishFilterResponse>();
+};
+
+void send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task,
+ const butil::IOBuf& request_attachment, int
timeout_ms) {
+ TNetworkAddress address = to_thrift_network_address(task.receiver.addr);
+ std::shared_ptr<PBackendService_Stub> stub(
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(address));
+ if (stub == nullptr) {
+ LOG(WARNING) << "Failed to init runtime filter relay rpc to " <<
address.hostname << ":"
+ << address.port;
+ return;
+ }
+
+ auto* closure = new RuntimeFilterRelayRpcClosure(
+ std::make_shared<PPublishFilterRequestV2>(task.request));
+ if (request_attachment.size() > 0) {
+ closure->cntl()->request_attachment().append(request_attachment);
+ }
+ closure->cntl()->set_timeout_ms(timeout_ms);
+ if (config::execution_ignore_eovercrowded) {
+ closure->cntl()->ignore_eovercrowded();
+ }
+ stub->apply_filterv2(closure->cntl(), closure->request(),
closure->response(), closure);
+}
+
+void set_request_legacy_target(const TRuntimeFilterTargetParamsV2& target,
+ PPublishFilterRequestV2* request) {
+ if (target.__isset.target_fragment_ids) {
+ for (const auto& target_fragment_id : target.target_fragment_ids) {
+ request->add_fragment_ids(target_fragment_id);
+ }
+ } else {
+ // FE not upgraded yet.
+ for (const auto& target_fragment_instance_id :
target.target_fragment_instance_ids) {
+ PUniqueId* cur_id = request->add_fragment_instance_ids();
+ cur_id->set_hi(target_fragment_instance_id.hi);
+ cur_id->set_lo(target_fragment_instance_id.lo);
+ }
+ }
+}
+
+} // namespace
+
+std::vector<std::vector<RuntimeFilterPublishTarget>>
split_runtime_filter_publish_targets(
+ const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
+ if (targets.empty()) {
+ return {};
+ }
+
+ DORIS_CHECK(fanout > 0);
+ size_t slice_count = std::min(targets.size(), static_cast<size_t>(fanout));
+ std::vector<std::vector<RuntimeFilterPublishTarget>> slices;
+ slices.reserve(slice_count);
+ for (size_t offset = 0; offset < targets.size();) {
+ size_t remaining_targets = targets.size() - offset;
+ size_t remaining_slices = slice_count - slices.size();
+ size_t slice_size = (remaining_targets + remaining_slices - 1) /
remaining_slices;
+ slices.emplace_back(targets.begin() + offset, targets.begin() + offset
+ slice_size);
+ offset += slice_size;
+ }
+ return slices;
+}
+
+std::vector<RuntimeFilterPublishTask> build_runtime_filter_publish_tasks(
+ const PPublishFilterRequestV2& base_request,
+ const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
+ std::vector<RuntimeFilterPublishTask> tasks;
+ auto slices = split_runtime_filter_publish_targets(targets, fanout);
+ tasks.reserve(slices.size());
+ for (const auto& slice : slices) {
+ DORIS_CHECK(!slice.empty());
+ RuntimeFilterPublishTask task;
+ task.receiver = slice.front();
+ task.request = base_request;
+ task.request.clear_fragment_ids();
+ task.request.clear_fragment_instance_ids();
Review Comment:
Keeping this clear is intentional. `build_runtime_filter_publish_tasks()` is
used by both the root publisher and relay forwarding; a relay request already
contains local `fragment_ids` and existing `forward_targets`, so the child
request template needs to clear routing fields once and rebuild the per-child
local/forward targets.
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -48,6 +51,189 @@
namespace doris {
+namespace {
+
+std::vector<RuntimeFilterPublishTarget> build_runtime_filter_publish_targets(
+ const std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+ std::vector<RuntimeFilterPublishTarget> publish_targets;
+ publish_targets.reserve(targets.size());
+ for (const auto& target : targets) {
+ DORIS_CHECK(target.__isset.target_fragment_ids);
+ DORIS_CHECK(!target.target_fragment_ids.empty());
+ RuntimeFilterPublishTarget publish_target;
+
publish_target.addr.set_hostname(target.target_fragment_instance_addr.hostname);
+
publish_target.addr.set_port(target.target_fragment_instance_addr.port);
+ publish_target.fragment_ids = target.target_fragment_ids;
+ publish_targets.emplace_back(std::move(publish_target));
+ }
+ return publish_targets;
+}
+
+class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure {
+public:
+ RuntimeFilterRelayRpcClosure(std::shared_ptr<PPublishFilterRequestV2>
request,
+ std::weak_ptr<QueryContext> query_ctx)
+ : _request(std::move(request)),
+
_callback(HandleErrorBrpcCallback<PPublishFilterResponse>::create_shared(
+ std::move(query_ctx))) {}
+
+ void Run() override {
+ std::unique_ptr<RuntimeFilterRelayRpcClosure> self(this);
+ _callback->call();
+ }
+
+ brpc::Controller* cntl() { return _callback->cntl_.get(); }
+ PPublishFilterRequestV2* request() { return _request.get(); }
+ PPublishFilterResponse* response() { return _callback->response_.get(); }
+
+private:
+ std::shared_ptr<PPublishFilterRequestV2> _request;
+ std::shared_ptr<HandleErrorBrpcCallback<PPublishFilterResponse>> _callback;
+};
+
+Status send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task,
+ const butil::IOBuf& request_attachment,
int timeout_ms,
+ std::weak_ptr<QueryContext> query_ctx) {
+ std::shared_ptr<PBackendService_Stub> stub(
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(task.receiver.addr));
+ if (stub == nullptr) {
+ LOG(WARNING) << "Failed to init runtime filter relay rpc to "
+ << task.receiver.addr.hostname() << ":" <<
task.receiver.addr.port();
+ return Status::InternalError("Failed to init runtime filter relay rpc
to {}:{}",
+ task.receiver.addr.hostname(),
task.receiver.addr.port());
+ }
+
+ // brpc calls Run() exactly once; RuntimeFilterRelayRpcClosure deletes
itself there.
+ auto* closure = new RuntimeFilterRelayRpcClosure(
+ std::make_shared<PPublishFilterRequestV2>(task.request),
std::move(query_ctx));
+ if (request_attachment.size() > 0) {
+ closure->cntl()->request_attachment().append(request_attachment);
+ }
+ closure->cntl()->set_timeout_ms(timeout_ms);
+ if (config::execution_ignore_eovercrowded) {
+ closure->cntl()->ignore_eovercrowded();
+ }
+ stub->apply_filterv2(closure->cntl(), closure->request(),
closure->response(), closure);
+ return Status::OK();
+}
+
+void set_request_direct_publish_target(const TRuntimeFilterTargetParamsV2&
target,
+ PPublishFilterRequestV2* request) {
+ DORIS_CHECK(target.__isset.target_fragment_ids);
+ DORIS_CHECK(!target.target_fragment_ids.empty());
+ for (const auto& target_fragment_id : target.target_fragment_ids) {
+ request->add_fragment_ids(target_fragment_id);
+ }
+}
+
+} // namespace
+
+std::vector<std::vector<RuntimeFilterPublishTarget>>
split_runtime_filter_publish_targets(
+ const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
+ DORIS_CHECK(!targets.empty());
+ DORIS_CHECK(fanout > 0);
+ size_t slice_count = std::min(targets.size(), static_cast<size_t>(fanout));
+ std::vector<std::vector<RuntimeFilterPublishTarget>> slices;
+ slices.reserve(slice_count);
+ for (size_t offset = 0; offset < targets.size();) {
+ size_t remaining_targets = targets.size() - offset;
+ size_t remaining_slices = slice_count - slices.size();
+ size_t slice_size = (remaining_targets + remaining_slices - 1) /
remaining_slices;
+ slices.emplace_back(targets.begin() + offset, targets.begin() + offset
+ slice_size);
+ offset += slice_size;
+ }
+ return slices;
+}
+
+std::vector<RuntimeFilterPublishTask> build_runtime_filter_publish_tasks(
+ const PPublishFilterRequestV2& base_request,
+ const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
+ std::vector<RuntimeFilterPublishTask> tasks;
+ auto slices = split_runtime_filter_publish_targets(targets, fanout);
+ PPublishFilterRequestV2 request_template = base_request;
+ request_template.clear_fragment_ids();
+ request_template.clear_fragment_instance_ids();
+ request_template.clear_forward_targets();
+ tasks.reserve(slices.size());
+ for (const auto& slice : slices) {
+ DORIS_CHECK(!slice.empty());
+ RuntimeFilterPublishTask task;
+ task.receiver = slice.front();
+ task.request = request_template;
+
+ for (int32_t fragment_id : task.receiver.fragment_ids) {
+ task.request.add_fragment_ids(fragment_id);
+ }
+ for (size_t i = 1; i < slice.size(); ++i) {
+ DORIS_CHECK(!slice[i].fragment_ids.empty());
+ PPublishFilterForwardTarget* forward_target =
task.request.add_forward_targets();
+ forward_target->mutable_target_addr()->CopyFrom(slice[i].addr);
+ for (int32_t fragment_id : slice[i].fragment_ids) {
+ forward_target->add_fragment_ids(fragment_id);
Review Comment:
I think we should not reuse the receiver fragment ids here.
`task.receiver.fragment_ids` is for the current relay receiver to apply
locally, while each `forward_target` must carry that downstream target address
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -48,6 +51,189 @@
namespace doris {
+namespace {
+
+std::vector<RuntimeFilterPublishTarget> build_runtime_filter_publish_targets(
+ const std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+ std::vector<RuntimeFilterPublishTarget> publish_targets;
+ publish_targets.reserve(targets.size());
+ for (const auto& target : targets) {
+ DORIS_CHECK(target.__isset.target_fragment_ids);
+ DORIS_CHECK(!target.target_fragment_ids.empty());
+ RuntimeFilterPublishTarget publish_target;
+
publish_target.addr.set_hostname(target.target_fragment_instance_addr.hostname);
+
publish_target.addr.set_port(target.target_fragment_instance_addr.port);
+ publish_target.fragment_ids = target.target_fragment_ids;
+ publish_targets.emplace_back(std::move(publish_target));
+ }
+ return publish_targets;
+}
+
+class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure {
+public:
+ RuntimeFilterRelayRpcClosure(std::shared_ptr<PPublishFilterRequestV2>
request,
+ std::weak_ptr<QueryContext> query_ctx)
+ : _request(std::move(request)),
+
_callback(HandleErrorBrpcCallback<PPublishFilterResponse>::create_shared(
+ std::move(query_ctx))) {}
+
+ void Run() override {
+ std::unique_ptr<RuntimeFilterRelayRpcClosure> self(this);
+ _callback->call();
+ }
+
+ brpc::Controller* cntl() { return _callback->cntl_.get(); }
+ PPublishFilterRequestV2* request() { return _request.get(); }
+ PPublishFilterResponse* response() { return _callback->response_.get(); }
+
+private:
+ std::shared_ptr<PPublishFilterRequestV2> _request;
+ std::shared_ptr<HandleErrorBrpcCallback<PPublishFilterResponse>> _callback;
+};
+
+Status send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task,
+ const butil::IOBuf& request_attachment,
int timeout_ms,
+ std::weak_ptr<QueryContext> query_ctx) {
+ std::shared_ptr<PBackendService_Stub> stub(
+
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(task.receiver.addr));
+ if (stub == nullptr) {
+ LOG(WARNING) << "Failed to init runtime filter relay rpc to "
+ << task.receiver.addr.hostname() << ":" <<
task.receiver.addr.port();
+ return Status::InternalError("Failed to init runtime filter relay rpc
to {}:{}",
+ task.receiver.addr.hostname(),
task.receiver.addr.port());
+ }
+
+ // brpc calls Run() exactly once; RuntimeFilterRelayRpcClosure deletes
itself there.
+ auto* closure = new RuntimeFilterRelayRpcClosure(
+ std::make_shared<PPublishFilterRequestV2>(task.request),
std::move(query_ctx));
+ if (request_attachment.size() > 0) {
+ closure->cntl()->request_attachment().append(request_attachment);
+ }
+ closure->cntl()->set_timeout_ms(timeout_ms);
+ if (config::execution_ignore_eovercrowded) {
+ closure->cntl()->ignore_eovercrowded();
+ }
+ stub->apply_filterv2(closure->cntl(), closure->request(),
closure->response(), closure);
+ return Status::OK();
+}
+
+void set_request_direct_publish_target(const TRuntimeFilterTargetParamsV2&
target,
+ PPublishFilterRequestV2* request) {
+ DORIS_CHECK(target.__isset.target_fragment_ids);
+ DORIS_CHECK(!target.target_fragment_ids.empty());
+ for (const auto& target_fragment_id : target.target_fragment_ids) {
+ request->add_fragment_ids(target_fragment_id);
+ }
+}
+
+} // namespace
+
+std::vector<std::vector<RuntimeFilterPublishTarget>>
split_runtime_filter_publish_targets(
+ const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
+ DORIS_CHECK(!targets.empty());
+ DORIS_CHECK(fanout > 0);
+ size_t slice_count = std::min(targets.size(), static_cast<size_t>(fanout));
+ std::vector<std::vector<RuntimeFilterPublishTarget>> slices;
+ slices.reserve(slice_count);
+ for (size_t offset = 0; offset < targets.size();) {
+ size_t remaining_targets = targets.size() - offset;
+ size_t remaining_slices = slice_count - slices.size();
+ size_t slice_size = (remaining_targets + remaining_slices - 1) /
remaining_slices;
+ slices.emplace_back(targets.begin() + offset, targets.begin() + offset
+ slice_size);
+ offset += slice_size;
+ }
+ return slices;
+}
+
+std::vector<RuntimeFilterPublishTask> build_runtime_filter_publish_tasks(
+ const PPublishFilterRequestV2& base_request,
+ const std::vector<RuntimeFilterPublishTarget>& targets, int fanout) {
+ std::vector<RuntimeFilterPublishTask> tasks;
+ auto slices = split_runtime_filter_publish_targets(targets, fanout);
+ PPublishFilterRequestV2 request_template = base_request;
+ request_template.clear_fragment_ids();
+ request_template.clear_fragment_instance_ids();
+ request_template.clear_forward_targets();
+ tasks.reserve(slices.size());
+ for (const auto& slice : slices) {
+ DORIS_CHECK(!slice.empty());
+ RuntimeFilterPublishTask task;
+ task.receiver = slice.front();
+ task.request = request_template;
+
+ for (int32_t fragment_id : task.receiver.fragment_ids) {
+ task.request.add_fragment_ids(fragment_id);
+ }
+ for (size_t i = 1; i < slice.size(); ++i) {
+ DORIS_CHECK(!slice[i].fragment_ids.empty());
+ PPublishFilterForwardTarget* forward_target =
task.request.add_forward_targets();
+ forward_target->mutable_target_addr()->CopyFrom(slice[i].addr);
+ for (int32_t fragment_id : slice[i].fragment_ids) {
+ forward_target->add_fragment_ids(fragment_id);
+ }
+ }
+ tasks.emplace_back(std::move(task));
+ }
+ return tasks;
+}
+
+int calculate_tree_publish_fanout(int64_t serialized_filter_size, size_t
target_count,
+ int64_t max_send_bytes) {
+ DORIS_CHECK(serialized_filter_size > 0);
+ DORIS_CHECK(max_send_bytes >= 0);
+ if (max_send_bytes == 0 || target_count <= 1) {
+ return 0;
+ }
+
+ const int64_t direct_target_limit = max_send_bytes /
serialized_filter_size;
+ if (target_count <= static_cast<size_t>(direct_target_limit)) {
+ return 0;
+ }
+
+ int64_t fanout = std::max<int64_t>(1, direct_target_limit);
+ fanout = std::min<int64_t>(fanout, static_cast<int64_t>(target_count - 1));
+ return static_cast<int>(fanout);
+}
+
+Status forward_runtime_filter(const PPublishFilterRequestV2& request,
+ const butil::IOBuf& request_attachment,
+ std::weak_ptr<QueryContext> query_ctx) {
+ if (request.forward_targets().empty()) {
+ return Status::OK();
+ }
+
+ std::vector<RuntimeFilterPublishTarget> targets;
+ targets.reserve(request.forward_targets_size());
+ for (const auto& forward_target : request.forward_targets()) {
+ DORIS_CHECK(forward_target.has_target_addr());
+ DORIS_CHECK(forward_target.fragment_ids_size() > 0);
+ RuntimeFilterPublishTarget target;
+ target.addr.CopyFrom(forward_target.target_addr());
+ target.fragment_ids.assign(forward_target.fragment_ids().begin(),
+ forward_target.fragment_ids().end());
+ targets.emplace_back(std::move(target));
+ }
+
+ DORIS_CHECK(request.has_tree_publish_fanout());
+ int fanout = request.tree_publish_fanout();
+ DORIS_CHECK(fanout > 0);
+ DORIS_CHECK(request.has_publish_rpc_timeout_ms());
+ int timeout_ms = request.publish_rpc_timeout_ms();
+ auto tasks = build_runtime_filter_publish_tasks(request, targets, fanout);
+ VLOG_NOTICE << "Runtime filter relay publish filter_id=" <<
request.filter_id()
+ << ", forward_targets=" << targets.size() << ",
child_rpc_count=" << tasks.size()
+ << ", fanout=" << fanout;
+ auto st = Status::OK();
+ for (const auto& task : tasks) {
+ Status rpc_st =
+ send_runtime_filter_relay_rpc(task, request_attachment,
timeout_ms, query_ctx);
+ if (!rpc_st.ok()) {
Review Comment:
Done in `75171cb7c77`. Now each failed relay child RPC logs a warning with
`query_id`, `filter_id`, target address, and the failure status, so multiple
failed forward targets can be identified independently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]