zhannngchen commented on code in PR #10007:
URL: https://github.com/apache/incubator-doris/pull/10007#discussion_r891915838
##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -673,38 +673,50 @@ void
TaskWorkerPool::_publish_version_worker_thread_callback() {
while (_is_work) {
TAgentTaskRequest agent_task_req;
TPublishVersionRequest publish_version_req;
- {
- std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
- while (_is_work && _tasks.empty()) {
- _worker_thread_condition_variable.wait(worker_thread_lock);
- }
- if (!_is_work) {
- return;
- }
+ bool current_finished = false;
+ std::vector<TTabletId> error_tablet_ids;
+ Status res = Status::OK();
+ while (!current_finished) {
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ while (_is_work && _tasks.empty()) {
+ _worker_thread_condition_variable.wait(worker_thread_lock);
+ }
+ if (!_is_work) {
+ return;
+ }
- agent_task_req = _tasks.front();
- publish_version_req = agent_task_req.publish_version_req;
- _tasks.pop_front();
- }
+ agent_task_req = _tasks.front();
+ publish_version_req = agent_task_req.publish_version_req;
+ _tasks.pop_front();
+ }
- DorisMetrics::instance()->publish_task_request_total->increment(1);
- VLOG_NOTICE << "get publish version task, signature:" <<
agent_task_req.signature;
+ DorisMetrics::instance()->publish_task_request_total->increment(1);
+ VLOG_NOTICE << "get publish version task, signature:" <<
agent_task_req.signature;
- std::vector<TTabletId> error_tablet_ids;
- uint32_t retry_time = 0;
- Status res = Status::OK();
- while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
- error_tablet_ids.clear();
- EnginePublishVersionTask engine_task(publish_version_req,
&error_tablet_ids);
- res = _env->storage_engine()->execute_task(&engine_task);
- if (res.ok()) {
- break;
- } else {
- LOG(WARNING) << "publish version error, retry.
[transaction_id="
- << publish_version_req.transaction_id
- << ", error_tablets_size=" <<
error_tablet_ids.size() << "]";
- ++retry_time;
- std::this_thread::sleep_for(std::chrono::seconds(1));
+ uint32_t retry_time = 0;
+ while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
+ error_tablet_ids.clear();
+ EnginePublishVersionTask engine_task(publish_version_req,
&error_tablet_ids);
+ res = _env->storage_engine()->execute_task(&engine_task);
+ if (res.ok()) {
+ current_finished = true;
+ break;
+ } else if (res.precise_code() ==
OLAP_ERR_ROWSET_VERSION_NOT_CONTINUOUS) {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
Review Comment:
Add comments here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]