Copilot commented on code in PR #61782:
URL: https://github.com/apache/doris/pull/61782#discussion_r2996028911


##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -256,7 +254,7 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(std::shared_ptr<Quer
             auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
                                               
DummyBrpcCallback<PSyncFilterSizeResponse>>::
                     create_unique(std::make_shared<PSyncFilterSizeRequest>(),
-                                  
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
+                                  
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());

Review Comment:
   This call site previously passed a `QueryContext` weak_ptr (conditionally 
based on `ignore_runtime_filter_error`) into `AutoReleaseClosure` so failures 
could be escalated/canceled when the option is false. With the ctx parameter 
removed and the callback being `DummyBrpcCallback`, RPC failure/non-OK 
`status()` handling appears to be reduced to logging only, which can change 
query correctness/termination behavior. Recommendation (mandatory): either (1) 
restore ctx-based cancellation semantics in `AutoReleaseClosure` (capturing 
status/failure before invoking `call()`), or (2) replace `DummyBrpcCallback` 
with a runtime-filter-specific callback that performs the required 
cancel/disable/sub behavior under the same option gating.



##########
be/src/exec/runtime_filter/runtime_filter_producer.cpp:
##########
@@ -94,51 +94,43 @@ Status RuntimeFilterProducer::publish(RuntimeState* state, 
bool build_hash_table
     return Status::OK();
 }
 
-class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
-                                                  
DummyBrpcCallback<PSendFilterSizeResponse>> {
-    std::shared_ptr<Dependency> _dependency;
-    // Should use weak ptr here, because when query context deconstructs, 
should also delete runtime filter
-    // context, it not the memory is not released. And rpc is in another 
thread, it will hold rf context
-    // after query context because the rpc is not returned.
-    std::weak_ptr<RuntimeFilterWrapper> _wrapper;
-    using Base =
-            AutoReleaseClosure<PSendFilterSizeRequest, 
DummyBrpcCallback<PSendFilterSizeResponse>>;
-    friend class RuntimeFilterProducer;
-    ENABLE_FACTORY_CREATOR(SyncSizeClosure);
-
-    void _process_if_rpc_failed() override {
-        Defer defer {[&]() {
-            Base::_process_if_rpc_failed();
+// Callback for sync-size RPCs. Handles errors (disable wrapper + sub 
dependency) in call().
+class SyncSizeCallback : public DummyBrpcCallback<PSendFilterSizeResponse> {
+    ENABLE_FACTORY_CREATOR(SyncSizeCallback);
+
+public:
+    SyncSizeCallback(std::shared_ptr<Dependency> dependency,
+                     std::shared_ptr<RuntimeFilterWrapper> wrapper)
+            : _dependency(std::move(dependency)), _wrapper(wrapper) {}
+
+    void call() override {
+        // On error: disable the wrapper and sub the dependency here, because 
set_synced_size()
+        // will never be called (the merge node won't respond with a sync).
+        // On success: do NOT sub here. The merge node will respond with 
sync_filter_size,
+        // which calls set_synced_size() -> _dependency->sub().
+        if (cntl_->Failed()) {
+            if (auto w = _wrapper.lock()) {
+                w->set_state(RuntimeFilterWrapper::State::DISABLED, 
cntl_->ErrorText());
+            }
             ((CountedFinishDependency*)_dependency.get())->sub();
-        }};
-        auto wrapper = _wrapper.lock();
-        if (!wrapper) {
             return;
         }
 
-        wrapper->set_state(RuntimeFilterWrapper::State::DISABLED, 
cntl_->ErrorText());
-    }
-
-    void _process_if_meet_error_status(const Status& status) override {
-        Defer defer {[&]() {
-            Base::_process_if_meet_error_status(status);
+        Status status = Status::create(response_->status());
+        if (!status.ok()) {
+            if (auto w = _wrapper.lock()) {
+                w->set_state(RuntimeFilterWrapper::State::DISABLED, 
status.to_string());
+            }
             ((CountedFinishDependency*)_dependency.get())->sub();
-        }};
-        auto wrapper = _wrapper.lock();
-        if (!wrapper) {
-            return;
         }

Review Comment:
   The C-style cast to `CountedFinishDependency*` can silently turn into UB if 
`_dependency` is not actually that type. Recommendation (mandatory): store 
`_dependency` as `std::shared_ptr<CountedFinishDependency>` in 
`SyncSizeCallback` (and pass that type in), or use a `static_cast` with a debug 
assertion documenting the invariant; avoid C-style casts here.



##########
be/src/exec/runtime_filter/runtime_filter_producer.cpp:
##########
@@ -199,13 +191,15 @@ Status RuntimeFilterProducer::send_size(RuntimeState* 
state, uint64_t local_filt
     }
 
     auto request = std::make_shared<PSendFilterSizeRequest>();
-    auto callback = 
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
+    auto callback = SyncSizeCallback::create_shared(_dependency, _wrapper);
+    // Store callback in the producer to keep it alive until the RPC completes.
+    // AutoReleaseClosure holds callbacks via weak_ptr, so without this the 
callback
+    // would be destroyed when this function returns and error-path sub() 
would never fire.
+    _sync_size_callback = callback;
     // RuntimeFilter maybe deconstructed before the rpc finished, so that 
could not use
     // a raw pointer in closure. Has to use the context's shared ptr.
-    auto closure = SyncSizeClosure::create_unique(request, callback, 
_dependency, _wrapper,
-                                                  
state->query_options().ignore_runtime_filter_error
-                                                          ? 
std::weak_ptr<QueryContext> {}
-                                                          : 
state->get_query_ctx_weak());
+    auto closure = AutoReleaseClosure<PSendFilterSizeRequest, 
SyncSizeCallback>::create_unique(
+            request, callback);

Review Comment:
   `_sync_size_callback` is a single slot, but `send_size()` can be invoked 
more than once during the producer lifecycle in some implementations; if a 
second call happens before the first RPC completes, this assignment overwrites 
the stored callback and can still allow the earlier callback to be destroyed 
early (reintroducing the 'weak_ptr callback dies before completion' failure). 
Also, the stored callback is never cleared on completion, which can 
unnecessarily retain `_dependency` until producer teardown. Recommendation 
(mandatory): make the storage support multiple in-flight RPCs (e.g., 
vector/list keyed by RPC generation or request id), and clear the stored entry 
when the RPC completes (both success and error paths), e.g., by having the 
callback clear a weak back-reference to the producer or by clearing in the 
completion handler that observes the response.



##########
be/src/exec/exchange/vdata_stream_sender.h:
##########
@@ -164,6 +164,7 @@ class Channel {
 
     std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> 
get_send_callback(RpcInstance* ins,
                                                                                
  bool eos) {
+        // here we reuse the callback because it's re-construction may be 
expensive due to many parameters' capture

Review Comment:
   Fix grammar/spelling: change \"it's re-construction\" to \"its 
reconstruction\" (and consider rephrasing to avoid the apostrophe possessive).
   ```suggestion
           // here we reuse the callback because reconstructing it may be 
expensive due to capturing many parameters
   ```



##########
be/test/exec/exchange/exchange_sink_test.cpp:
##########
@@ -234,4 +234,138 @@ TEST_F(ExchangeSinkTest, test_queue_size) {
     }
 }
 
+// Callback that records the state of response_ and cntl_ at the moment call() 
is invoked,
+// then mutates them (simulating callback reuse triggering a new RPC). This 
lets us verify:
+// 1. call() was invoked
+// 2. The callback saw the correct original state (before any mutation)
+// 3. After call(), the shared objects are mutated (so any code reading them 
after call()
+//    would see wrong values — this is the bug the fix prevents)
+template <typename Response>
+class StateCapturingCallback : public DummyBrpcCallback<Response> {
+    ENABLE_FACTORY_CREATOR(StateCapturingCallback);
+
+public:
+    StateCapturingCallback() = default;
+
+    enum class MutateAction {
+        WRITE_ERROR,  // Write an error status into response
+        CLEAR_STATUS, // Clear the status field
+        RESET_CNTL,   // Call cntl_->Reset()
+    };
+
+    void set_mutate_action(MutateAction action) { _action = action; }
+
+    void call() override {
+        call_invoked = true;
+        // Capture state BEFORE mutation — this is what call() sees.
+        cntl_failed_at_call_time = this->cntl_->Failed();
+        if (this->cntl_->Failed()) {
+            cntl_error_at_call_time = this->cntl_->ErrorText();
+        }
+        response_status_at_call_time = 
Status::create(this->response_->status());
+
+        // Now mutate (simulating callback reuse / new RPC)
+        switch (_action) {
+        case MutateAction::WRITE_ERROR: {
+            Status err = Status::InternalError("injected by callback reuse");
+            err.to_protobuf(this->response_->mutable_status());
+            break;
+        }
+        case MutateAction::CLEAR_STATUS: {
+            this->response_->mutable_status()->set_status_code(0);
+            this->response_->mutable_status()->clear_error_msgs();
+            break;
+        }
+        case MutateAction::RESET_CNTL: {
+            this->cntl_->Reset();
+            break;
+        }
+        }
+    }
+
+    // Observable state
+    bool call_invoked = false;
+    bool cntl_failed_at_call_time = false;
+    std::string cntl_error_at_call_time;
+    Status response_status_at_call_time;
+
+private:
+    MutateAction _action = MutateAction::WRITE_ERROR;
+};
+
+using TestCallback = StateCapturingCallback<PTransmitDataResult>;
+
+// Test: Response starts OK. call() writes an error into it.
+// With correct ordering (log-before-call): the closure's logging sees OK (no 
warning),
+// then call() runs and the callback captures the OK status at call time.
+// With WRONG ordering (call-before-log): call() writes error first, then the 
closure
+// would log the error — a false positive. We verify call() saw OK at 
invocation time,
+// proving it ran after (or at least not before) the status was checked by the 
closure.
+TEST_F(ExchangeSinkTest, test_closure_call_sees_original_ok_response) {
+    auto callback = TestCallback::create_shared();
+    // Response starts OK (default).
+    callback->set_mutate_action(TestCallback::MutateAction::WRITE_ERROR);

Review Comment:
   The PR description lists failing/passing test names 
`test_closure_call_must_not_corrupt_status_check`, 
`test_closure_call_must_not_hide_error_status`, and 
`test_closure_call_must_not_hide_rpc_failure`, but the added tests in the diff 
use different names (e.g., `test_closure_call_sees_original_ok_response`). 
Recommendation (optional): update the PR description (or rename tests) so the 
referenced test names match what's actually added, to make it easier to 
validate the claim.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to