github-actions[bot] commented on code in PR #61130:
URL: https://github.com/apache/doris/pull/61130#discussion_r2902959715
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -280,13 +280,14 @@ Status ConcurrentContextMap<Key, Value,
ValueType>::apply_if_not_exists(
}
template <typename Key, typename Value, typename ValueType>
-void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
+bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
auto id = get_map_id(query_id, _internal_map.size());
{
std::unique_lock lock(*_internal_map[id].first);
auto& map = _internal_map[id].second;
- map.erase(query_id);
+ return map.erase(query_id) != 0;
}
+ return false;
Review Comment:
**[HIGH] Unreachable code.** The `return map.erase(query_id) != 0;` at line
288 always returns from within the braced block, so this `return false;` on
line 290 is dead code. The compiler may warn about this. Consider removing the
outer braces to make the control flow clearer:
```cpp
bool ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id)
{
auto id = get_map_id(query_id, _internal_map.size());
std::unique_lock lock(*_internal_map[id].first);
auto& map = _internal_map[id].second;
return map.erase(query_id) != 0;
}
```
##########
be/src/exec/pipeline/pipeline_fragment_context.cpp:
##########
@@ -179,9 +179,16 @@ void PipelineFragmentContext::cancel(const Status reason) {
{
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks >= _total_tasks) {
+ if (_need_notify_close) {
+ // if fragment cancelled and waiting for notify to close, need
to remove from fragment mgr
+ _exec_env->fragment_mgr()->remove_pipeline_context({_query_id,
_fragment_id});
Review Comment:
**[NOTE] Cancel path and `_wait_close_guard`.** When `cancel()` runs and all
tasks are already closed with `_need_notify_close` true, this code removes the
PFC from `_pipeline_map` and sets `_need_notify_close = false`. However, if
`listen_wait_close()` was previously called and stored a guard in
`_wait_close_guard`, this path does NOT clear `_wait_close_guard`. The guard
will only be released when the PFC is destroyed.
This means the brpc response for `wait_for_destroy` is deferred until PFC
destruction rather than being sent immediately on cancel. The caller
(`RecCTESourceOperatorX`) will block on `brpc::Join()` until then. Consider
explicitly setting `_wait_close_guard = nullptr` here to send the response
promptly.
##########
be/src/runtime/query_context.h:
##########
@@ -410,7 +415,28 @@ class QueryContext : public
std::enable_shared_from_this<QueryContext> {
timespec get_query_arrival_timestamp() const { return
this->_query_arrival_timestamp; }
QuerySource get_query_source() const { return this->_query_source; }
- const TQueryOptions get_query_options() const { return _query_options; }
+ TQueryOptions get_query_options() const { return _query_options; }
+
+ uint32_t get_stage(int filter_id) {
+ std::lock_guard<std::mutex> lock(_filter_id_to_stage_mtx);
+ return _filter_id_to_stage[filter_id];
Review Comment:
**[LOW] Map insertion on read.** `operator[]` on `_filter_id_to_stage`
inserts a default entry (0) for any filter_id that doesn't exist. For non-CTE
queries, every runtime filter stage check will insert an entry. Consider using
`find()` with a default return:
```cpp
uint32_t get_stage(int filter_id) {
std::lock_guard<std::mutex> lock(_filter_id_to_stage_mtx);
auto it = _filter_id_to_stage.find(filter_id);
return it != _filter_id_to_stage.end() ? it->second : 0;
}
```
##########
be/src/exec/runtime_filter/runtime_filter_mgr.cpp:
##########
@@ -372,6 +388,12 @@ Status
RuntimeFilterMergeControllerEntity::_send_rf_to_target(GlobalMergeContext
butil::IOBuf request_attachment;
PPublishFilterRequestV2 apply_request;
+ if (auto q_ctx = ctx.lock(); q_ctx) {
+
apply_request.set_stage(q_ctx->get_stage(cnt_val.runtime_filter_desc.filter_id));
Review Comment:
**[NOTE] Stage read outside `cnt_val.mtx`.** `_send_rf_to_target` is called
after releasing `cnt_val.mtx` (from `merge()`), but here it reads the stage via
`q_ctx->get_stage()`. This is safe because `get_stage()` has its own lock.
However, there's a subtle race: between releasing `cnt_val.mtx` in `merge()`
and reaching here, `GlobalMergeContext::reset()` could run (acquiring
`cnt_val.mtx`), incrementing `stage`. This would cause the publish to carry the
NEW stage number rather than the old one, potentially being rejected by
receivers.
In practice this is likely prevented by the sequential nature of the
recursive CTE protocol (wait_for_destroy completes before reset_global_rf), but
worth a comment.
##########
be/src/exec/pipeline/pipeline_fragment_context.h:
##########
@@ -128,11 +130,24 @@ class PipelineFragmentContext : public
TaskExecutionContext {
std::string get_load_error_url();
std::string get_first_error_msg();
- Status wait_close(bool close);
- Status rebuild(ThreadPool* thread_pool);
- Status set_to_rerun();
+ std::set<int> get_deregister_runtime_filter() const;
- bool need_notify_close() const { return _need_notify_close; }
+ Status listen_wait_close(const std::shared_ptr<brpc::ClosureGuard>& guard,
Review Comment:
**[MEDIUM] Thread safety concern.** `listen_wait_close()` reads/writes
`_need_notify_close` and `_wait_close_guard` without holding `_task_mutex`.
Meanwhile, `cancel()` modifies `_need_notify_close` under `_task_mutex`. This
creates a TOCTOU race:
1. `listen_wait_close()` checks `_need_notify_close` is true (line 140)
2. `cancel()` runs, sets `_need_notify_close = false` (line 191 of .cpp)
3. `listen_wait_close()` sets `_wait_close_guard = guard` (line 147)
4. `_close_fragment_instance()` sees `!_need_notify_close`, enters the
normal close path, never clears `_wait_close_guard`
The brpc response would be deferred until PFC destruction, which may work in
practice but is fragile. Consider acquiring `_task_mutex` in this method, or
document why the race is benign.
##########
gensrc/proto/internal_service.proto:
##########
@@ -75,11 +75,10 @@ message PTransmitRecCTEBlockResult {
message PRerunFragmentParams {
enum Opcode {
- wait = 1; // wait fragment execute done
- release = 2; // release current round's resource
- rebuild = 3; // rebuild next round pipeline tasks
- submit = 4; // submit tasks to execute
- close = 5; // close fragment
+ wait_for_destroy = 1; // deregister RF, destroy old PFC, async wait
for tasks to close via brpc closure
+ rebuild = 2; // rebuild next round pipeline tasks
+ submit = 3; // submit tasks to execute
+ final_close = 4; // async wait for tasks to close, send report,
and clean up (last round)
}
Review Comment:
**[MEDIUM] Rolling upgrade compatibility.** The enum values have been
renumbered: `rebuild` changed from 3→2, `submit` from 4→3. Since
`RecCTESourceOperatorX::_fragments_to_reset` can target remote BEs (the
addresses come from FE plan), during a rolling upgrade an old BE sending
`rebuild=3` would be interpreted as `submit=3` by a new BE, causing incorrect
behavior.
Consider keeping the original numeric values to maintain wire compatibility:
```protobuf
enum Opcode {
wait_for_destroy = 1;
rebuild = 3; // keep original value
submit = 4; // keep original value
final_close = 5; // keep original value
}
```
Protobuf enum values don't need to be contiguous.
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1460,35 +1504,96 @@ Status FragmentMgr::transmit_rec_cte_block(
}
}
-Status FragmentMgr::rerun_fragment(const TUniqueId& query_id, int fragment,
+Status FragmentMgr::rerun_fragment(const std::shared_ptr<brpc::ClosureGuard>&
guard,
+ const TUniqueId& query_id, int fragment,
PRerunFragmentParams_Opcode stage) {
- if (auto q_ctx = get_query_ctx(query_id)) {
- SCOPED_ATTACH_TASK(q_ctx.get());
+ if (stage == PRerunFragmentParams::wait_for_destroy ||
+ stage == PRerunFragmentParams::final_close) {
auto fragment_ctx = _pipeline_map.find({query_id, fragment});
if (!fragment_ctx) {
return Status::NotFound("Fragment context (query-id: {},
fragment-id: {}) not found",
print_id(query_id), fragment);
}
- if (stage == PRerunFragmentParams::wait) {
- return fragment_ctx->wait_close(false);
- } else if (stage == PRerunFragmentParams::release) {
- return fragment_ctx->set_to_rerun();
- } else if (stage == PRerunFragmentParams::rebuild) {
- return fragment_ctx->rebuild(_thread_pool.get());
- } else if (stage == PRerunFragmentParams::submit) {
- return fragment_ctx->submit();
- } else if (stage == PRerunFragmentParams::close) {
- return fragment_ctx->wait_close(true);
- } else {
- return Status::InvalidArgument("Unknown rerun fragment opcode:
{}", stage);
+ if (stage == PRerunFragmentParams::wait_for_destroy) {
+ std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+ auto it = _rerunnable_params_map.find({query_id, fragment});
+ if (it == _rerunnable_params_map.end()) {
+ auto st = fragment_ctx->listen_wait_close(guard, true);
+ if (!st.ok()) {
+ LOG(WARNING) << fmt::format(
+ "wait_for_destroy fragment context (query-id: {},
fragment-id: "
+ "{}) failed: {}",
+ print_id(query_id), fragment, st.to_string());
+ }
+ return Status::NotFound(
+ "Rerunnable params (query-id: {}, fragment-id: {}) not
found",
+ print_id(query_id), fragment);
+ }
+
+ it->second.deregister_runtime_filter_ids.merge(
+ fragment_ctx->get_deregister_runtime_filter());
}
+
+ auto* query_ctx = fragment_ctx->get_query_ctx();
+ SCOPED_ATTACH_TASK(query_ctx);
+ RETURN_IF_ERROR(
+ fragment_ctx->listen_wait_close(guard, stage ==
PRerunFragmentParams::final_close));
+ remove_pipeline_context({query_id, fragment});
+ return Status::OK();
+ } else if (stage == PRerunFragmentParams::rebuild) {
+ auto q_ctx = get_query_ctx(query_id);
+ if (!q_ctx) {
+ return Status::NotFound(
+ "rerun_fragment: Query context (query-id: {}) not found,
maybe finished",
+ print_id(query_id));
+ }
+ SCOPED_ATTACH_TASK(q_ctx.get());
+ RerunableFragmentInfo info;
+ {
+ std::lock_guard<std::mutex> lk(_rerunnable_params_lock);
+ auto it = _rerunnable_params_map.find({query_id, fragment});
+ if (it == _rerunnable_params_map.end()) {
+ return Status::NotFound("rebuild (query-id: {}, fragment-id:
{}) not found",
+ print_id(query_id), fragment);
+ }
+ it->second.stage++;
+ RETURN_IF_ERROR(q_ctx->update_filters_stage(it->second.stage,
+
it->second.deregister_runtime_filter_ids));
+ info = it->second;
Review Comment:
**[LOW] Large struct copy under lock.** `info = it->second` copies the
entire `RerunableFragmentInfo`, which includes `TPipelineFragmentParams` (a
potentially large thrift struct with the full plan) and
`TPipelineFragmentParamsList`. This copy happens while holding
`_rerunnable_params_lock`.
Consider either:
1. Moving the heavy parts out of the struct copy (e.g., store `params` as
`shared_ptr<const TPipelineFragmentParams>`)
2. Or extracting just the needed fields under the lock and releasing it
before the PFC construction
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]