BiteTheDDDDt commented on code in PR #64851:
URL: https://github.com/apache/doris/pull/64851#discussion_r3473424943


##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -48,6 +51,219 @@
 
 namespace doris {
 
+namespace {
+
+constexpr int DEFAULT_RUNTIME_FILTER_TREE_PUBLISH_FANOUT = 4;
+constexpr int MAX_RUNTIME_FILTER_TREE_PUBLISH_FANOUT = 128;
+constexpr int64_t DEFAULT_RUNTIME_FILTER_TREE_PUBLISH_MAX_SEND_BYTES = 256L * 
1024L * 1024L;
+
+TNetworkAddress to_thrift_network_address(const PNetworkAddress& address) {
+    TNetworkAddress result;
+    result.hostname = address.hostname();
+    result.port = address.port();
+    return result;
+}
+
+PNetworkAddress to_proto_network_address(const TNetworkAddress& address) {
+    PNetworkAddress result;
+    result.set_hostname(address.hostname);
+    result.set_port(address.port);
+    return result;
+}
+
+bool can_use_tree_publish_targets(const 
std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+    return std::all_of(targets.begin(), targets.end(), [](const auto& target) {
+        return target.__isset.target_fragment_ids && 
!target.target_fragment_ids.empty();
+    });
+}
+
+std::vector<RuntimeFilterPublishTarget> build_runtime_filter_publish_targets(
+        const std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+    std::vector<RuntimeFilterPublishTarget> publish_targets;
+    publish_targets.reserve(targets.size());
+    for (const auto& target : targets) {
+        DORIS_CHECK(target.__isset.target_fragment_ids);
+        DORIS_CHECK(!target.target_fragment_ids.empty());
+        RuntimeFilterPublishTarget publish_target;
+        publish_target.addr = 
to_proto_network_address(target.target_fragment_instance_addr);
+        publish_target.fragment_ids = target.target_fragment_ids;
+        publish_targets.emplace_back(std::move(publish_target));
+    }
+    return publish_targets;
+}
+
+class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure {
+public:
+    explicit 
RuntimeFilterRelayRpcClosure(std::shared_ptr<PPublishFilterRequestV2> request)
+            : _request(std::move(request)) {}
+
+    void Run() override {
+        std::unique_ptr<RuntimeFilterRelayRpcClosure> self(this);
+        if (_cntl->Failed()) {
+            LOG(WARNING) << "Runtime filter relay rpc failed: " << 
_cntl->ErrorText();
+            return;
+        }
+        Status st = Status::create(_response->status());
+        if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>()) {
+            LOG(WARNING) << "Runtime filter relay rpc returns error status: " 
<< st;
+        }
+    }
+
+    brpc::Controller* cntl() { return _cntl.get(); }
+    PPublishFilterRequestV2* request() { return _request.get(); }
+    PPublishFilterResponse* response() { return _response.get(); }
+
+private:
+    std::unique_ptr<brpc::Controller> _cntl = 
std::make_unique<brpc::Controller>();
+    std::shared_ptr<PPublishFilterRequestV2> _request;
+    std::unique_ptr<PPublishFilterResponse> _response = 
std::make_unique<PPublishFilterResponse>();
+};
+
+void send_runtime_filter_relay_rpc(const RuntimeFilterPublishTask& task,
+                                   const butil::IOBuf& request_attachment, int 
timeout_ms) {
+    TNetworkAddress address = to_thrift_network_address(task.receiver.addr);
+    std::shared_ptr<PBackendService_Stub> stub(
+            
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(address));
+    if (stub == nullptr) {
+        LOG(WARNING) << "Failed to init runtime filter relay rpc to " << 
address.hostname << ":"
+                     << address.port;
+        return;
+    }
+
+    auto* closure = new RuntimeFilterRelayRpcClosure(
+            std::make_shared<PPublishFilterRequestV2>(task.request));
+    if (request_attachment.size() > 0) {
+        closure->cntl()->request_attachment().append(request_attachment);
+    }
+    closure->cntl()->set_timeout_ms(timeout_ms);
+    if (config::execution_ignore_eovercrowded) {
+        closure->cntl()->ignore_eovercrowded();
+    }
+    stub->apply_filterv2(closure->cntl(), closure->request(), 
closure->response(), closure);
+}
+
+void set_request_legacy_target(const TRuntimeFilterTargetParamsV2& target,
+                               PPublishFilterRequestV2* request) {
+    if (target.__isset.target_fragment_ids) {
+        for (const auto& target_fragment_id : target.target_fragment_ids) {
+            request->add_fragment_ids(target_fragment_id);
+        }
+    } else {
+        // FE not upgraded yet.

Review Comment:
   我不清楚这个没设置target_fragment_ids和fe有没有升级有啥关系
   你都走set_request_legacy_target了,那要么是fe没开树形发布,要么是fe没升级吧,为啥内部还需要这样判断



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to