github-actions[bot] commented on code in PR #64851:
URL: https://github.com/apache/doris/pull/64851#discussion_r3473478336


##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -428,6 +643,71 @@ Status 
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
     }
 
     std::vector<TRuntimeFilterTargetParamsV2>& targets = cnt_val.targetv2_info;
+    int timeout_ms = get_execution_rpc_timeout_ms(execution_timeout);
+    int64_t max_send_bytes = 
query_options.__isset.runtime_filter_tree_publish_max_send_bytes
+                                     ? 
query_options.runtime_filter_tree_publish_max_send_bytes
+                                     : 
DEFAULT_RUNTIME_FILTER_TREE_PUBLISH_MAX_SEND_BYTES;
+    DORIS_CHECK(max_send_bytes >= 0);
+    apply_request.set_merge_time(merge_time);
+    *apply_request.mutable_query_id() = query_id;
+    const int64_t serialized_filter_size =
+            static_cast<int64_t>(apply_request.ByteSizeLong()) + std::max(0, 
len);
+    const bool has_tree_publish_targets = 
can_use_tree_publish_targets(targets);
+    const int fanout =
+            calculate_tree_publish_fanout(serialized_filter_size, 
targets.size(), max_send_bytes);
+    const bool use_tree_publish =

Review Comment:
   This enables tree publish based only on fanout, target fragment IDs, and 
target count, but the actual subtree is carried only in the new 
`PPublishFilterRequestV2.forward_targets` field and is forwarded only by the 
new `apply_filterv2` code. During a rolling BE upgrade, if one selected 
receiver is still an old BE, it will parse/apply the request for its own 
`fragment_ids` and ignore the unknown `forward_targets`; the parent sees an OK 
response, while every target behind that relay never receives the filter and 
waits until timeout or runs without it. Please gate tree publish on receiver BE 
capability/version, or fall back to direct publish until all selected relay 
receivers are known to support forwarding.



##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -48,6 +51,219 @@
 
 namespace doris {
 
+namespace {
+
+constexpr int DEFAULT_RUNTIME_FILTER_TREE_PUBLISH_FANOUT = 4;
+constexpr int MAX_RUNTIME_FILTER_TREE_PUBLISH_FANOUT = 128;
+constexpr int64_t DEFAULT_RUNTIME_FILTER_TREE_PUBLISH_MAX_SEND_BYTES = 256L * 
1024L * 1024L;
+
+TNetworkAddress to_thrift_network_address(const PNetworkAddress& address) {
+    TNetworkAddress result;
+    result.hostname = address.hostname();
+    result.port = address.port();
+    return result;
+}
+
+PNetworkAddress to_proto_network_address(const TNetworkAddress& address) {
+    PNetworkAddress result;
+    result.set_hostname(address.hostname);
+    result.set_port(address.port);
+    return result;
+}
+
+bool can_use_tree_publish_targets(const 
std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+    return std::all_of(targets.begin(), targets.end(), [](const auto& target) {
+        return target.__isset.target_fragment_ids && 
!target.target_fragment_ids.empty();
+    });
+}
+
+std::vector<RuntimeFilterPublishTarget> build_runtime_filter_publish_targets(
+        const std::vector<TRuntimeFilterTargetParamsV2>& targets) {
+    std::vector<RuntimeFilterPublishTarget> publish_targets;
+    publish_targets.reserve(targets.size());
+    for (const auto& target : targets) {
+        DORIS_CHECK(target.__isset.target_fragment_ids);
+        DORIS_CHECK(!target.target_fragment_ids.empty());
+        RuntimeFilterPublishTarget publish_target;
+        publish_target.addr = 
to_proto_network_address(target.target_fragment_instance_addr);
+        publish_target.fragment_ids = target.target_fragment_ids;
+        publish_targets.emplace_back(std::move(publish_target));
+    }
+    return publish_targets;
+}
+
+class RuntimeFilterRelayRpcClosure final : public google::protobuf::Closure {
+public:
+    explicit 
RuntimeFilterRelayRpcClosure(std::shared_ptr<PPublishFilterRequestV2> request)
+            : _request(std::move(request)) {}
+
+    void Run() override {

Review Comment:
   In the tree path, failures from the relay's child RPCs no longer honor the 
normal runtime-filter error policy. The direct publish path uses 
`HandleErrorBrpcCallback` with the query context, so when 
`ignore_runtime_filter_error` is false an `apply_filterv2` RPC failure or 
non-OK status cancels the query. A relay node instead uses 
`RuntimeFilterRelayRpcClosure`, which only logs failures, and 
`forward_runtime_filter()` returns OK immediately after scheduling the 
children; the parent `apply_filterv2` response is still based only on the 
relay's local apply. In a slice like root -> A -> B, an A -> B failure now 
leaves B without the filter and does not cancel the query, whereas direct root 
-> B publish would have cancelled. Please keep the relay child RPCs on the same 
query-aware callback/error policy, or otherwise propagate relay publish failure 
consistently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to