Copilot commented on code in PR #60734:
URL: https://github.com/apache/doris/pull/60734#discussion_r2802228744


##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -263,16 +263,16 @@ void transmit_blockv2(PBackendService_Stub* stub,
                                                          
ExchangeSendCallback<PTransmitDataResult>>>
                               closure);
 #endif
-class ExchangeSinkBuffer : public HasTaskExecutionCtx {
+class ExchangeSinkBuffer : std::enable_shared_from_this<ExchangeSinkBuffer> {

Review Comment:
   `std::enable_shared_from_this` should be inherited publicly. With the 
current declaration `class ExchangeSinkBuffer : 
std::enable_shared_from_this<...>`, the inheritance is private, which prevents 
`std::shared_ptr` from properly wiring up the internal weak pointer and can 
make `weak_from_this()` always return empty. Since this PR relies on 
`weak_from_this()` to guard async callbacks, please change it to `public 
std::enable_shared_from_this<ExchangeSinkBuffer>` so the lifetime check works 
as intended (and to avoid future `bad_weak_ptr` issues if `shared_from_this()` 
is used).
   ```suggestion
   class ExchangeSinkBuffer : public 
std::enable_shared_from_this<ExchangeSinkBuffer> {
   ```



##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -584,11 +584,8 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& 
ins,
         return;
     }
     ins.rpc_channel_is_turn_off = true;
-    auto weak_task_ctx = weak_task_exec_ctx();
-    if (auto pip_ctx = weak_task_ctx.lock()) {
-        for (auto& parent : _parents) {
-            parent->on_channel_finished(ins.id);
-        }
+    for (auto& parent : _parents) {
+        parent->on_channel_finished(ins.id);

Review Comment:
   `_parents` stores raw `ExchangeSinkLocalState*`, and this method can be 
reached from async RPC callbacks. Removing the previous liveness guard and 
unconditionally calling `parent->on_channel_finished(...)` can lead to 
use-after-free if the local state has already been destroyed (even if the sink 
buffer is still alive). Please reintroduce a safe lifetime check (e.g., lock a 
`TaskExecutionContext`/other owning context before iterating) or change 
`_parents` to a lifetime-safe reference (or clear/unregister parents under lock 
during close) before calling into them.
   ```suggestion
           if (parent != nullptr) {
               parent->on_channel_finished(ins.id);
           }
   ```



##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -263,16 +263,16 @@ void transmit_blockv2(PBackendService_Stub* stub,
                                                          
ExchangeSendCallback<PTransmitDataResult>>>
                               closure);
 #endif
-class ExchangeSinkBuffer : public HasTaskExecutionCtx {
+class ExchangeSinkBuffer : std::enable_shared_from_this<ExchangeSinkBuffer> {
 public:
     ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId 
node_id,
                        RuntimeState* state, const std::vector<InstanceLoId>& 
sender_ins_ids);
 #ifdef BE_TEST
     ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum)
-            : HasTaskExecutionCtx(state), _state(state), 
_exchange_sink_num(sinknum) {};
+            : _state(state), _exchange_sink_num(sinknum) {};

Review Comment:
   This header removes `HasTaskExecutionCtx` from `ExchangeSinkBuffer`, but the 
`.cpp` implementation still constructs it (`: HasTaskExecutionCtx(state), 
...`). That will not compile after this change. Either restore 
`HasTaskExecutionCtx` as a base/member, or remove the base-class initializer 
and any remaining dependencies on it in the `.cpp` constructor(s).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to