This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3d4c22d1194 [fix](pipelinex) fix fragment instance progress reports
(part 2) (#40694)
3d4c22d1194 is described below
commit 3d4c22d1194366ff947967063a47366b9cb98ff7
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Sep 25 16:37:46 2024 +0800
[fix](pipelinex) fix fragment instance progress reports (part 2) (#40694)
## Proposed changes
This PR is a followup of #40325. Because PipelineX has deprecated the
old report.
This PR fixed the `ScannedRows` and `LoadBytes` in the progress of the
`SHOW LOAD` result.
Currently the progress will only be updated when a fragment instance is
done.
Timely progress updates will be supported in a later PR.
---
be/src/runtime/fragment_mgr.cpp | 4 ++++
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 2 +-
gensrc/thrift/FrontendService.thrift | 2 ++
3 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7a4687b50d1..3f8e762408c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -361,6 +361,8 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
TFragmentInstanceReport t;
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
t.__set_num_finished_range(req.runtime_state->num_finished_range());
+ t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
+ t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
@@ -374,6 +376,8 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(rs->num_finished_range());
+ t.__set_loaded_rows(rs->num_rows_load_total());
+ t.__set_loaded_bytes(rs->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8d8d5c81929..4ac83361e25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2451,7 +2451,7 @@ public class Coordinator implements CoordInterface {
for (TFragmentInstanceReport report :
params.getFragmentInstanceReports()) {
Env.getCurrentEnv().getLoadManager().updateJobProgress(
jobId, params.getBackendId(), params.getQueryId(),
report.getFragmentInstanceId(),
- params.getLoadedRows(), params.getLoadedBytes(),
params.isDone());
+ report.getLoadedRows(), report.getLoadedBytes(),
params.isDone());
Env.getCurrentEnv().getProgressManager().updateProgress(String.valueOf(jobId),
params.getQueryId(),
report.getFragmentInstanceId(), report.getNumFinishedRange());
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 32d69ce081c..c6766034158 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -437,6 +437,8 @@ struct TQueryProfile {
struct TFragmentInstanceReport {
1: optional Types.TUniqueId fragment_instance_id;
2: optional i32 num_finished_range;
+ 3: optional i64 loaded_rows
+ 4: optional i64 loaded_bytes
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]