luwei16 commented on code in PR #52812:
URL: https://github.com/apache/doris/pull/52812#discussion_r2195339683


##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -609,6 +611,52 @@ Status PriorTaskWorkerPool::submit_task(const 
TAgentTaskRequest& task) {
     });
 }
 
+Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(const 
TAgentTaskRequest& task) {
+    const TTaskType::type task_type = task.task_type;
+    int64_t signature = task.signature;
+    std::string type_str;
+    EnumToString(TTaskType, task_type, type_str);
+    auto req = std::make_unique<TAgentTaskRequest>(task);
+
+    DCHECK(req->__isset.priority && req->priority == TPriority::HIGH);
+    do {
+        std::lock_guard lock(s_task_signatures_mtx);
+        auto& set = s_task_signatures[task_type];
+        if (!set.contains(signature)) {
+            // If it doesn't exist, put it directly into the priority queue
+            add_task_count(*req, 1);
+            set.insert(signature);
+            std::lock_guard lock(_mtx);
+            _high_prior_queue.push_back(std::move(req));
+            _high_prior_condv.notify_one();
+            _normal_condv.notify_one();
+            break;
+        } else {
+            std::lock_guard lock(_mtx);
+            for (auto it = _normal_queue.begin(); it != _normal_queue.end();) {
+                // If it exists in the normal queue, cancel the task in the 
normal queue
+                if ((*it)->signature == signature) {
+                    _normal_queue.erase(it);                     // cancel the 
original task
+                    _high_prior_queue.push_back(std::move(req)); // add the 
new task to the queue
+                    _high_prior_condv.notify_one();
+                    _normal_condv.notify_one();
+                    break;
+                } else {
+                    ++it; // doesn't meet the condition, continue to the next 
one
+                }
+            }
+            // If it exists in the high priority queue, no operation is needed
+            LOG_INFO("task has already existed in high prior 
queue.").tag("signature", signature);
+        }
+    } while (true);

Review Comment:
   while (false)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to