github-actions[bot] commented on code in PR #23142:
URL: https://github.com/apache/doris/pull/23142#discussion_r1335785712


##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -344,54 +363,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
     return Status::OK();
 }
 
-void PipelineFragmentContext::_stop_report_thread() {
-    if (!_report_thread_active) {
-        return;
+void PipelineFragmentContext::_init_next_report_time() {
+    auto interval_s = config::pipeline_status_report_interval;
+    if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second > 
interval_s) {
+        std::vector<string> ins_ids;

Review Comment:
   warning: variable 'ins_ids' is not initialized 
[cppcoreguidelines-init-variables]
   
   be/src/pipeline/pipeline_fragment_context.cpp:369:
   ```diff
   - ;
   +  = 0;
   ```
   



##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -344,54 +363,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
     return Status::OK();
 }
 
-void PipelineFragmentContext::_stop_report_thread() {
-    if (!_report_thread_active) {
-        return;
+void PipelineFragmentContext::_init_next_report_time() {
+    auto interval_s = config::pipeline_status_report_interval;
+    if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second > 
interval_s) {
+        std::vector<string> ins_ids;
+        instance_ids(ins_ids);
+        VLOG_FILE << "enable period report: instance_id="
+                  << fmt::format("{}", fmt::join(ins_ids, ", "));
+        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * 
NANOS_PER_SEC;
+        // We don't want to wait longer than it takes to run the entire 
fragment.
+        _previous_report_time =
+                MonotonicNanos() + report_fragment_offset - 
(uint64_t)(interval_s)*NANOS_PER_SEC;
+        _disable_period_report = false;
     }
+}
 
-    _report_thread_active = false;
-
-    _stop_report_thread_cv.notify_one();
-    // Wait infinitly to ensure that the report task is finished and the this 
variable
-    // is not used in report thread.
-    _report_thread_future.wait();
+void PipelineFragmentContext::refresh_next_report_time() {
+    auto disable = _disable_period_report.load(std::memory_order_acq_rel);
+    DCHECK(disable == true);
+    _previous_report_time = MonotonicNanos();
+    _disable_period_report.compare_exchange_strong(disable, false);
 }
 
-void PipelineFragmentContext::report_profile() {
-    SCOPED_ATTACH_TASK(_runtime_state.get());
-    VLOG_FILE << "report_profile(): instance_id=" << 
_runtime_state->fragment_instance_id();
-
-    _report_thread_active = true;
-
-    std::unique_lock<std::mutex> l(_report_thread_lock);
-    // tell Open() that we started
-    _report_thread_started_cv.notify_one();
-
-    // Jitter the reporting time of remote fragments by a random amount between
-    // 0 and the report_interval.  This way, the coordinator doesn't get all 
the
-    // updates at once so its better for contention as well as smoother 
progress
-    // reporting.
-    int report_fragment_offset = rand() % config::status_report_interval;
-    // We don't want to wait longer than it takes to run the entire fragment.
-    _stop_report_thread_cv.wait_for(l, 
std::chrono::seconds(report_fragment_offset));
-    while (_report_thread_active) {
-        if (config::status_report_interval > 0) {
-            // wait_for can return because the timeout occurred or the 
condition variable
-            // was signaled.  We can't rely on its return value to distinguish 
between the
-            // two cases (e.g. there is a race here where the wait timed out 
but before grabbing
-            // the lock, the condition variable was signaled).  Instead, we 
will use an external
-            // flag, _report_thread_active, to coordinate this.
-            _stop_report_thread_cv.wait_for(l,
-                                            
std::chrono::seconds(config::status_report_interval));
-        } else {
-            LOG(WARNING) << "config::status_report_interval is equal to or 
less than zero, exiting "
-                            "reporting thread.";
-            break;
+void PipelineFragmentContext::trigger_report_if_necessary() {
+    if (!_is_report_success) {
+        return;
+    }
+    auto disable = _disable_period_report.load(std::memory_order_acq_rel);
+    if (disable) {
+        return;
+    }
+    int32_t interval_s = config::pipeline_status_report_interval;
+    if (interval_s <= 0) {
+        LOG(WARNING)
+                << "config::status_report_interval is equal to or less than 
zero, do not trigger "
+                   "report.";
+    }
+    uint64_t next_report_time = 
_previous_report_time.load(std::memory_order_acq_rel) +

Review Comment:
   warning: variable 'next_report_time' is not initialized 
[cppcoreguidelines-init-variables]
   
   ```suggestion
   } = 0
   ```
   



##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -344,54 +363,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
     return Status::OK();
 }
 
-void PipelineFragmentContext::_stop_report_thread() {
-    if (!_report_thread_active) {
-        return;
+void PipelineFragmentContext::_init_next_report_time() {
+    auto interval_s = config::pipeline_status_report_interval;
+    if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second > 
interval_s) {
+        std::vector<string> ins_ids;
+        instance_ids(ins_ids);
+        VLOG_FILE << "enable period report: instance_id="
+                  << fmt::format("{}", fmt::join(ins_ids, ", "));
+        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * 
NANOS_PER_SEC;
+        // We don't want to wait longer than it takes to run the entire 
fragment.
+        _previous_report_time =
+                MonotonicNanos() + report_fragment_offset - 
(uint64_t)(interval_s)*NANOS_PER_SEC;
+        _disable_period_report = false;
     }
+}
 
-    _report_thread_active = false;
-
-    _stop_report_thread_cv.notify_one();
-    // Wait infinitly to ensure that the report task is finished and the this 
variable
-    // is not used in report thread.
-    _report_thread_future.wait();
+void PipelineFragmentContext::refresh_next_report_time() {
+    auto disable = _disable_period_report.load(std::memory_order_acq_rel);
+    DCHECK(disable == true);
+    _previous_report_time = MonotonicNanos();
+    _disable_period_report.compare_exchange_strong(disable, false);
 }
 
-void PipelineFragmentContext::report_profile() {
-    SCOPED_ATTACH_TASK(_runtime_state.get());
-    VLOG_FILE << "report_profile(): instance_id=" << 
_runtime_state->fragment_instance_id();
-
-    _report_thread_active = true;
-
-    std::unique_lock<std::mutex> l(_report_thread_lock);
-    // tell Open() that we started
-    _report_thread_started_cv.notify_one();
-
-    // Jitter the reporting time of remote fragments by a random amount between
-    // 0 and the report_interval.  This way, the coordinator doesn't get all 
the
-    // updates at once so its better for contention as well as smoother 
progress
-    // reporting.
-    int report_fragment_offset = rand() % config::status_report_interval;
-    // We don't want to wait longer than it takes to run the entire fragment.
-    _stop_report_thread_cv.wait_for(l, 
std::chrono::seconds(report_fragment_offset));
-    while (_report_thread_active) {
-        if (config::status_report_interval > 0) {
-            // wait_for can return because the timeout occurred or the 
condition variable
-            // was signaled.  We can't rely on its return value to distinguish 
between the
-            // two cases (e.g. there is a race here where the wait timed out 
but before grabbing
-            // the lock, the condition variable was signaled).  Instead, we 
will use an external
-            // flag, _report_thread_active, to coordinate this.
-            _stop_report_thread_cv.wait_for(l,
-                                            
std::chrono::seconds(config::status_report_interval));
-        } else {
-            LOG(WARNING) << "config::status_report_interval is equal to or 
less than zero, exiting "
-                            "reporting thread.";
-            break;
+void PipelineFragmentContext::trigger_report_if_necessary() {
+    if (!_is_report_success) {
+        return;
+    }
+    auto disable = _disable_period_report.load(std::memory_order_acq_rel);
+    if (disable) {
+        return;
+    }
+    int32_t interval_s = config::pipeline_status_report_interval;
+    if (interval_s <= 0) {
+        LOG(WARNING)
+                << "config::status_report_interval is equal to or less than 
zero, do not trigger "
+                   "report.";
+    }
+    uint64_t next_report_time = 
_previous_report_time.load(std::memory_order_acq_rel) +
+                                (uint64_t)(interval_s)*NANOS_PER_SEC;
+    if (MonotonicNanos() > next_report_time) {
+        if (!_disable_period_report.compare_exchange_strong(disable, true,
+                                                            
std::memory_order_acq_rel)) {
+            return;
         }
-
         if (VLOG_FILE_IS_ON) {
-            VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : 
" ")
-                      << "profile for instance " << 
_runtime_state->fragment_instance_id();
+            std::vector<string> ins_ids;

Review Comment:
   warning: variable 'ins_ids' is not initialized 
[cppcoreguidelines-init-variables]
   
   be/src/pipeline/pipeline_fragment_context.cpp:410:
   ```diff
   - ;
   +  = 0;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to