Copilot commented on code in PR #60734:
URL: https://github.com/apache/doris/pull/60734#discussion_r2802228744
##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -263,16 +263,16 @@ void transmit_blockv2(PBackendService_Stub* stub,
ExchangeSendCallback<PTransmitDataResult>>>
closure);
#endif
-class ExchangeSinkBuffer : public HasTaskExecutionCtx {
+class ExchangeSinkBuffer : std::enable_shared_from_this<ExchangeSinkBuffer> {
Review Comment:
`std::enable_shared_from_this` should be inherited publicly. With the
current declaration `class ExchangeSinkBuffer :
std::enable_shared_from_this<...>`, the inheritance is private, which prevents
`std::shared_ptr` from properly wiring up the internal weak pointer and can
make `weak_from_this()` always return empty. Since this PR relies on
`weak_from_this()` to guard async callbacks, please change it to `public
std::enable_shared_from_this<ExchangeSinkBuffer>` so the lifetime check works
as intended (and to avoid future `bad_weak_ptr` issues if `shared_from_this()`
is used).
```suggestion
class ExchangeSinkBuffer : public
std::enable_shared_from_this<ExchangeSinkBuffer> {
```
##########
be/src/pipeline/exec/exchange_sink_buffer.cpp:
##########
@@ -584,11 +584,8 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance&
ins,
return;
}
ins.rpc_channel_is_turn_off = true;
- auto weak_task_ctx = weak_task_exec_ctx();
- if (auto pip_ctx = weak_task_ctx.lock()) {
- for (auto& parent : _parents) {
- parent->on_channel_finished(ins.id);
- }
+ for (auto& parent : _parents) {
+ parent->on_channel_finished(ins.id);
Review Comment:
`_parents` stores raw `ExchangeSinkLocalState*`, and this method can be
reached from async RPC callbacks. Removing the previous liveness guard and
unconditionally calling `parent->on_channel_finished(...)` can lead to
use-after-free if the local state has already been destroyed (even if the sink
buffer is still alive). Please reintroduce a safe lifetime check (e.g., lock a
`TaskExecutionContext`/other owning context before iterating) or change
`_parents` to a lifetime-safe reference (or clear/unregister parents under lock
during close) before calling into them.
```suggestion
if (parent != nullptr) {
parent->on_channel_finished(ins.id);
}
```
##########
be/src/pipeline/exec/exchange_sink_buffer.h:
##########
@@ -263,16 +263,16 @@ void transmit_blockv2(PBackendService_Stub* stub,
ExchangeSendCallback<PTransmitDataResult>>>
closure);
#endif
-class ExchangeSinkBuffer : public HasTaskExecutionCtx {
+class ExchangeSinkBuffer : std::enable_shared_from_this<ExchangeSinkBuffer> {
public:
ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, PlanNodeId
node_id,
RuntimeState* state, const std::vector<InstanceLoId>&
sender_ins_ids);
#ifdef BE_TEST
ExchangeSinkBuffer(RuntimeState* state, int64_t sinknum)
- : HasTaskExecutionCtx(state), _state(state),
_exchange_sink_num(sinknum) {};
+ : _state(state), _exchange_sink_num(sinknum) {};
Review Comment:
This header removes `HasTaskExecutionCtx` from `ExchangeSinkBuffer`, but the
`.cpp` implementation still constructs it (`: HasTaskExecutionCtx(state),
...`). That will not compile after this change. Either restore
`HasTaskExecutionCtx` as a base/member, or remove the base-class initializer
and any remaining dependencies on it in the `.cpp` constructor(s).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]