github-actions[bot] commented on code in PR #60812:
URL: https://github.com/apache/doris/pull/60812#discussion_r2884487679


##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1370,6 +1405,10 @@ Status FragmentMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request) {
     query_id.__set_hi(queryid.hi);
     query_id.__set_lo(queryid.lo);
     if (auto q_ctx = get_query_ctx(query_id)) {
+        // just discard low stage request

Review Comment:
   **[Minor] Unused variable**: `rf_ids` is assigned but never used. The next 
line passes `it->second.deregister_runtime_filter_ids` directly. This looks 
like a leftover from an earlier iteration. Should be removed to avoid compiler 
warnings (`-Wunused-variable`).



##########
be/src/runtime/query_context.h:
##########
@@ -410,7 +413,30 @@ class QueryContext : public 
std::enable_shared_from_this<QueryContext> {
     timespec get_query_arrival_timestamp() const { return 
this->_query_arrival_timestamp; }
     QuerySource get_query_source() const { return this->_query_source; }
 
-    const TQueryOptions get_query_options() const { return _query_options; }
+    TQueryOptions get_query_options() const { return _query_options; }

Review Comment:
   **[Minor] Unnecessary copy**: `filter_ids` is taken by value 
(`std::set<int32_t> filter_ids`) but is only iterated, never modified. Consider 
taking it by `const std::set<int32_t>&` to avoid the copy.



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1370,6 +1405,10 @@ Status FragmentMgr::sync_filter_size(const 
PSyncFilterSizeRequest* request) {
     query_id.__set_hi(queryid.hi);
     query_id.__set_lo(queryid.lo);
     if (auto q_ctx = get_query_ctx(query_id)) {
+        // just discard low stage request
+        if (q_ctx->get_stage(request->filter_id()) != request->stage()) {
+            return Status::OK();
+        }

Review Comment:
   **[High] Dangling Pointer Risk**: `info` is a raw pointer into 
`_rerunnable_params_map`, but the `_rerunnable_params_lock` has been released 
at this point (line 1566). If a concurrent `remove_query_context()` or `stop()` 
erases this entry from the map, `info` becomes a dangling pointer, and 
subsequent accesses (`info->params`, `info->finish_callback`, etc.) are 
use-after-free.
   
   This can happen if the query is cancelled while `recreate_and_submit` is in 
progress. The `q_ctx` shared_ptr keeps the QueryContext alive, but does NOT 
prevent the `RerunableFragmentInfo` struct from being erased from 
`_rerunnable_params_map`.
   
   Fix: Copy out the needed fields (`params`, `finish_callback`) while still 
holding the lock, rather than escaping a raw pointer.



##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -179,9 +179,16 @@ void PipelineFragmentContext::cancel(const Status reason) {
     {
         std::lock_guard<std::mutex> l(_task_mutex);
         if (_closed_tasks >= _total_tasks) {
+            if (_need_notify_close) {
+                // if fragment cancelled and waiting for notify to close, need 
to remove from fragment mgr
+                _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, 
_fragment_id});
+                _need_notify_close = false;
+            }
             // All tasks in this PipelineXFragmentContext already closed.
             return;
         }
+         // make fragment release by self after cancel

Review Comment:
   **[Nit] Extra indentation space**: This line has 9 leading spaces (`+        
 // make...`) but the surrounding code uses 8 spaces (two levels of 4). The 
next line `_need_notify_close = false;` also has extra indentation. Please fix 
to match the surrounding style.



##########
be/src/runtime/query_context.h:
##########
@@ -394,6 +394,9 @@ class QueryContext : public 
std::enable_shared_from_this<QueryContext> {
     std::map<std::pair<TUniqueId, int>, pipeline::RecCTEScanLocalState*> 
_cte_scan;
     std::mutex _cte_scan_lock;
 
+    // for represent the rf's fragment execution round number of recursive cte
+    std::unordered_map<int, uint32_t> _filter_id_to_stage; // filter id -> 
stage number

Review Comment:
   **[Critical] Data Race on `_filter_id_to_stage`**: This `unordered_map` is 
read by `get_stage()` from multiple threads (brpc workers in `apply_filterv2`, 
`send_filter_size`, `sync_filter_size`, `merge_filter`, and pipeline execution 
threads in `RuntimeFilterProducer::send_size`, `_send_rf_to_target`) and 
written by `update_filters_stage()` from the rerun handler thread. There is 
**no synchronization** protecting this map.
   
   `std::unordered_map::operator[]` in the writer can trigger a rehash, 
invalidating iterators used by concurrent `find()` calls in readers. This is 
undefined behavior and can cause crashes or corrupted reads.
   
   Compare with the nearby `_cte_scan` map (line 394) which has its own 
`_cte_scan_lock`. A similar `std::mutex` or `std::shared_mutex` is needed here, 
or use a lock-free approach (e.g., `std::atomic<uint32_t>` array if filter IDs 
are bounded).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to