github-actions[bot] commented on code in PR #23142:
URL: https://github.com/apache/doris/pull/23142#discussion_r1335785712
##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -344,54 +363,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
return Status::OK();
}
-void PipelineFragmentContext::_stop_report_thread() {
- if (!_report_thread_active) {
- return;
+void PipelineFragmentContext::_init_next_report_time() {
+ auto interval_s = config::pipeline_status_report_interval;
+ if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second >
interval_s) {
+ std::vector<string> ins_ids;
Review Comment:
warning: variable 'ins_ids' is not initialized
[cppcoreguidelines-init-variables]
be/src/pipeline/pipeline_fragment_context.cpp:369:
```diff
- ;
+ = 0;
```
##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -344,54 +363,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
return Status::OK();
}
-void PipelineFragmentContext::_stop_report_thread() {
- if (!_report_thread_active) {
- return;
+void PipelineFragmentContext::_init_next_report_time() {
+ auto interval_s = config::pipeline_status_report_interval;
+ if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second >
interval_s) {
+ std::vector<string> ins_ids;
+ instance_ids(ins_ids);
+ VLOG_FILE << "enable period report: instance_id="
+ << fmt::format("{}", fmt::join(ins_ids, ", "));
+ uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) *
NANOS_PER_SEC;
+ // We don't want to wait longer than it takes to run the entire
fragment.
+ _previous_report_time =
+ MonotonicNanos() + report_fragment_offset -
(uint64_t)(interval_s)*NANOS_PER_SEC;
+ _disable_period_report = false;
}
+}
- _report_thread_active = false;
-
- _stop_report_thread_cv.notify_one();
- // Wait infinitly to ensure that the report task is finished and the this
variable
- // is not used in report thread.
- _report_thread_future.wait();
+void PipelineFragmentContext::refresh_next_report_time() {
+ auto disable = _disable_period_report.load(std::memory_order_acq_rel);
+ DCHECK(disable == true);
+ _previous_report_time = MonotonicNanos();
+ _disable_period_report.compare_exchange_strong(disable, false);
}
-void PipelineFragmentContext::report_profile() {
- SCOPED_ATTACH_TASK(_runtime_state.get());
- VLOG_FILE << "report_profile(): instance_id=" <<
_runtime_state->fragment_instance_id();
-
- _report_thread_active = true;
-
- std::unique_lock<std::mutex> l(_report_thread_lock);
- // tell Open() that we started
- _report_thread_started_cv.notify_one();
-
- // Jitter the reporting time of remote fragments by a random amount between
- // 0 and the report_interval. This way, the coordinator doesn't get all
the
- // updates at once so its better for contention as well as smoother
progress
- // reporting.
- int report_fragment_offset = rand() % config::status_report_interval;
- // We don't want to wait longer than it takes to run the entire fragment.
- _stop_report_thread_cv.wait_for(l,
std::chrono::seconds(report_fragment_offset));
- while (_report_thread_active) {
- if (config::status_report_interval > 0) {
- // wait_for can return because the timeout occurred or the
condition variable
- // was signaled. We can't rely on its return value to distinguish
between the
- // two cases (e.g. there is a race here where the wait timed out
but before grabbing
- // the lock, the condition variable was signaled). Instead, we
will use an external
- // flag, _report_thread_active, to coordinate this.
- _stop_report_thread_cv.wait_for(l,
-
std::chrono::seconds(config::status_report_interval));
- } else {
- LOG(WARNING) << "config::status_report_interval is equal to or
less than zero, exiting "
- "reporting thread.";
- break;
+void PipelineFragmentContext::trigger_report_if_necessary() {
+ if (!_is_report_success) {
+ return;
+ }
+ auto disable = _disable_period_report.load(std::memory_order_acq_rel);
+ if (disable) {
+ return;
+ }
+ int32_t interval_s = config::pipeline_status_report_interval;
+ if (interval_s <= 0) {
+ LOG(WARNING)
+ << "config::status_report_interval is equal to or less than
zero, do not trigger "
+ "report.";
+ }
+ uint64_t next_report_time =
_previous_report_time.load(std::memory_order_acq_rel) +
Review Comment:
warning: variable 'next_report_time' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
} = 0
```
##########
be/src/pipeline/pipeline_fragment_context.cpp:
##########
@@ -344,54 +363,56 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
return Status::OK();
}
-void PipelineFragmentContext::_stop_report_thread() {
- if (!_report_thread_active) {
- return;
+void PipelineFragmentContext::_init_next_report_time() {
+ auto interval_s = config::pipeline_status_report_interval;
+ if (_is_report_success && interval_s > 0 && _query_ctx->timeout_second >
interval_s) {
+ std::vector<string> ins_ids;
+ instance_ids(ins_ids);
+ VLOG_FILE << "enable period report: instance_id="
+ << fmt::format("{}", fmt::join(ins_ids, ", "));
+ uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) *
NANOS_PER_SEC;
+ // We don't want to wait longer than it takes to run the entire
fragment.
+ _previous_report_time =
+ MonotonicNanos() + report_fragment_offset -
(uint64_t)(interval_s)*NANOS_PER_SEC;
+ _disable_period_report = false;
}
+}
- _report_thread_active = false;
-
- _stop_report_thread_cv.notify_one();
- // Wait infinitly to ensure that the report task is finished and the this
variable
- // is not used in report thread.
- _report_thread_future.wait();
+void PipelineFragmentContext::refresh_next_report_time() {
+ auto disable = _disable_period_report.load(std::memory_order_acq_rel);
+ DCHECK(disable == true);
+ _previous_report_time = MonotonicNanos();
+ _disable_period_report.compare_exchange_strong(disable, false);
}
-void PipelineFragmentContext::report_profile() {
- SCOPED_ATTACH_TASK(_runtime_state.get());
- VLOG_FILE << "report_profile(): instance_id=" <<
_runtime_state->fragment_instance_id();
-
- _report_thread_active = true;
-
- std::unique_lock<std::mutex> l(_report_thread_lock);
- // tell Open() that we started
- _report_thread_started_cv.notify_one();
-
- // Jitter the reporting time of remote fragments by a random amount between
- // 0 and the report_interval. This way, the coordinator doesn't get all
the
- // updates at once so its better for contention as well as smoother
progress
- // reporting.
- int report_fragment_offset = rand() % config::status_report_interval;
- // We don't want to wait longer than it takes to run the entire fragment.
- _stop_report_thread_cv.wait_for(l,
std::chrono::seconds(report_fragment_offset));
- while (_report_thread_active) {
- if (config::status_report_interval > 0) {
- // wait_for can return because the timeout occurred or the
condition variable
- // was signaled. We can't rely on its return value to distinguish
between the
- // two cases (e.g. there is a race here where the wait timed out
but before grabbing
- // the lock, the condition variable was signaled). Instead, we
will use an external
- // flag, _report_thread_active, to coordinate this.
- _stop_report_thread_cv.wait_for(l,
-
std::chrono::seconds(config::status_report_interval));
- } else {
- LOG(WARNING) << "config::status_report_interval is equal to or
less than zero, exiting "
- "reporting thread.";
- break;
+void PipelineFragmentContext::trigger_report_if_necessary() {
+ if (!_is_report_success) {
+ return;
+ }
+ auto disable = _disable_period_report.load(std::memory_order_acq_rel);
+ if (disable) {
+ return;
+ }
+ int32_t interval_s = config::pipeline_status_report_interval;
+ if (interval_s <= 0) {
+ LOG(WARNING)
+ << "config::status_report_interval is equal to or less than
zero, do not trigger "
+ "report.";
+ }
+ uint64_t next_report_time =
_previous_report_time.load(std::memory_order_acq_rel) +
+ (uint64_t)(interval_s)*NANOS_PER_SEC;
+ if (MonotonicNanos() > next_report_time) {
+ if (!_disable_period_report.compare_exchange_strong(disable, true,
+
std::memory_order_acq_rel)) {
+ return;
}
-
if (VLOG_FILE_IS_ON) {
- VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " :
" ")
- << "profile for instance " <<
_runtime_state->fragment_instance_id();
+ std::vector<string> ins_ids;
Review Comment:
warning: variable 'ins_ids' is not initialized
[cppcoreguidelines-init-variables]
be/src/pipeline/pipeline_fragment_context.cpp:410:
```diff
- ;
+ = 0;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]