This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 549bc3e288e [fix](pipelinex) fix fragment instance progress reports
(#40325) (#40987)
549bc3e288e is described below
commit 549bc3e288e003856e9c233f0d38855b621e6e2e
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Sep 19 23:58:38 2024 +0800
[fix](pipelinex) fix fragment instance progress reports (#40325) (#40987)
backport #40325
---
be/src/runtime/fragment_mgr.cpp | 14 ++++++++++----
.../main/java/org/apache/doris/qe/Coordinator.java | 21 ++++++++++++++++-----
gensrc/thrift/FrontendService.thrift | 7 +++++++
3 files changed, 33 insertions(+), 9 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 829440f339e..e5cd7c7cb8d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -402,7 +402,6 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
int64_t num_rows_load_success = 0;
int64_t num_rows_load_filtered = 0;
int64_t num_rows_load_unselected = 0;
- int64_t num_finished_ranges = 0;
if (req.runtime_state->num_rows_load_total() > 0 ||
req.runtime_state->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
@@ -411,7 +410,11 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
num_rows_load_success = req.runtime_state->num_rows_load_success();
num_rows_load_filtered =
req.runtime_state->num_rows_load_filtered();
num_rows_load_unselected =
req.runtime_state->num_rows_load_unselected();
- num_finished_ranges = req.runtime_state->num_finished_range();
+ params.__isset.fragment_instance_reports = true;
+ TFragmentInstanceReport t;
+
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
+
t.__set_num_finished_range(req.runtime_state->num_finished_range());
+ params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 ||
rs->num_rows_load_filtered() > 0 ||
@@ -420,11 +423,14 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
num_rows_load_success += rs->num_rows_load_success();
num_rows_load_filtered += rs->num_rows_load_filtered();
num_rows_load_unselected += rs->num_rows_load_unselected();
- num_finished_ranges += rs->num_finished_range();
+ params.__isset.fragment_instance_reports = true;
+ TFragmentInstanceReport t;
+ t.__set_fragment_instance_id(rs->fragment_instance_id());
+ t.__set_num_finished_range(rs->num_finished_range());
+ params.fragment_instance_reports.push_back(t);
}
}
}
- params.__set_finished_scan_ranges(num_finished_ranges);
params.load_counters.emplace(s_dpp_normal_all,
std::to_string(num_rows_load_success));
params.load_counters.emplace(s_dpp_abnormal_all,
std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows,
std::to_string(num_rows_load_unselected));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index ef67084741b..cc1b36ac941 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -93,6 +93,7 @@ import org.apache.doris.thrift.TExecPlanFragmentParamsList;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
+import org.apache.doris.thrift.TFragmentInstanceReport;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.thrift.TNetworkAddress;
@@ -2821,11 +2822,21 @@ public class Coordinator implements CoordInterface {
}
if (params.isSetLoadedRows() && jobId != -1) {
- Env.getCurrentEnv().getLoadManager().updateJobProgress(
- jobId, params.getBackendId(), params.getQueryId(),
params.getFragmentInstanceId(),
- params.getLoadedRows(), params.getLoadedBytes(),
params.isDone());
-
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
- params.getQueryId(), params.getFragmentInstanceId(),
params.getFinishedScanRanges());
+ if (params.isSetFragmentInstanceReports()) {
+ for (TFragmentInstanceReport report :
params.getFragmentInstanceReports()) {
+ Env.getCurrentEnv().getLoadManager().updateJobProgress(
+ jobId, params.getBackendId(), params.getQueryId(),
report.getFragmentInstanceId(),
+ params.getLoadedRows(), params.getLoadedBytes(),
params.isDone());
+
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
+ params.getQueryId(),
report.getFragmentInstanceId(), report.getNumFinishedRange());
+ }
+ } else {
+ Env.getCurrentEnv().getLoadManager().updateJobProgress(
+ jobId, params.getBackendId(), params.getQueryId(),
params.getFragmentInstanceId(),
+ params.getLoadedRows(), params.getLoadedBytes(),
params.isDone());
+
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
+ params.getQueryId(), params.getFragmentInstanceId(),
params.getFinishedScanRanges());
+ }
}
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 61dce73400b..5480a84cf69 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -417,6 +417,11 @@ struct TReportWorkloadRuntimeStatusParams {
2: optional map<string, TQueryStatistics> query_statistics_map
}
+struct TFragmentInstanceReport {
+ 1: optional Types.TUniqueId fragment_instance_id;
+ 2: optional i32 num_finished_range;
+}
+
// The results of an INSERT query, sent to the coordinator as part of
// TReportExecStatusParams
struct TReportExecStatusParams {
@@ -487,6 +492,8 @@ struct TReportExecStatusParams {
26: optional list<DataSinks.THivePartitionUpdate> hive_partition_updates
28: optional list<DataSinks.TIcebergCommitData> iceberg_commit_datas
+
+ 31: optional list<TFragmentInstanceReport> fragment_instance_reports;
}
struct TFeResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]