This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 67260ed932a [feat](job) add per-job routine load metrics (#63576)
67260ed932a is described below
commit 67260ed932a096440e33694bd0cc4f51767bece6
Author: hui lai <[email protected]>
AuthorDate: Thu May 28 10:23:05 2026 +0800
[feat](job) add per-job routine load metrics (#63576)
### What problem does this PR solve?
Problem Summary:
Before this change, FE only exposed aggregate routine load metrics, such
as total loaded rows, error rows, received bytes, task execution time,
progress, lag, and aborted task count across all routine load jobs.
These metrics were useful for observing the whole FE, but they could not
identify which routine load job contributed to a spike, lag, or abnormal
error/task count.
This change adds per-job routine load metrics. Each metric is exported
with `job_id` and `job_name` labels, so users can inspect the status of
a single routine load job from the FE metrics endpoint.
The new metrics are:
- `doris_fe_routine_load_per_job_total_rows`
- `doris_fe_routine_load_per_job_error_rows`
- `doris_fe_routine_load_per_job_received_bytes`
- `doris_fe_routine_load_per_job_task_execute_time`
- `doris_fe_routine_load_per_job_task_execute_count`
- `doris_fe_routine_load_per_job_progress`
- `doris_fe_routine_load_per_job_lag`
- `doris_fe_routine_load_per_job_abort_task_num`
Example query from FE metrics endpoint:
```shell
curl http://<fe_host>:<http_port>/metrics | grep routine_load_per_job
```
Example Prometheus query:
```
doris_fe_routine_load_per_job_lag
```
<img width="3116" height="1558" alt="image"
src="https://github.com/user-attachments/assets/0fb22c81-3556-44fe-9520-18beabb62859"
/>
---
.../java/org/apache/doris/metric/MetricRepo.java | 74 ++++++++++++++++++++++
1 file changed, 74 insertions(+)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index c1ab64a9590..723cb3f6eb1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -39,6 +39,7 @@ import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadManager;
+import org.apache.doris.load.routineload.RoutineLoadStatistic;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.monitor.jvm.JvmService;
import org.apache.doris.monitor.jvm.JvmStats;
@@ -92,6 +93,16 @@ public final class MetricRepo {
public static final String STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT =
"streaming_job_per_job_succeed_task_count";
public static final String STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT =
"streaming_job_per_job_failed_task_count";
public static final String STREAMING_JOB_PER_JOB_LAG =
"streaming_job_per_job_lag";
+ public static final String ROUTINE_LOAD_PER_JOB_TOTAL_ROWS =
"routine_load_per_job_total_rows";
+ public static final String ROUTINE_LOAD_PER_JOB_ERROR_ROWS =
"routine_load_per_job_error_rows";
+ public static final String ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES =
"routine_load_per_job_received_bytes";
+ public static final String ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME =
+ "routine_load_per_job_task_execute_time";
+ public static final String ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT =
+ "routine_load_per_job_task_execute_count";
+ public static final String ROUTINE_LOAD_PER_JOB_PROGRESS =
"routine_load_per_job_progress";
+ public static final String ROUTINE_LOAD_PER_JOB_LAG =
"routine_load_per_job_lag";
+ public static final String ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM =
"routine_load_per_job_abort_task_num";
public static final String CLOUD_TAG = "cloud";
public static LongCounterMetric COUNTER_REQUEST_ALL;
@@ -1192,6 +1203,66 @@ public final class MetricRepo {
DORIS_METRIC_REGISTER.addMetrics(gauge);
}
+ public static void updateRoutineLoadJobPerJobMetrics() {
+ // Clear previous per-job gauges before checking mastership. If this
FE loses mastership,
+ // the old gauges registered during the previous master epoch must not
be exported.
+ DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TOTAL_ROWS);
+ DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_ERROR_ROWS);
+
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES);
+
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME);
+
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT);
+ DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_PROGRESS);
+ DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_LAG);
+
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM);
+
+ if (!Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+
+ try {
+ RoutineLoadManager routineLoadManager =
Env.getCurrentEnv().getRoutineLoadManager();
+ for (RoutineLoadJob job :
routineLoadManager.getActiveRoutineLoadJobs()) {
+ String jobId = String.valueOf(job.getId());
+ String jobName = job.getName();
+ RoutineLoadStatistic stat = job.getRoutineLoadStatistic();
+
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TOTAL_ROWS,
MetricUnit.ROWS,
+ "per job total rows of routine load", jobId, jobName,
() -> stat.totalRows);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_ERROR_ROWS,
MetricUnit.ROWS,
+ "per job error rows of routine load", jobId, jobName,
() -> stat.errorRows);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES,
MetricUnit.BYTES,
+ "per job received bytes of routine load", jobId,
jobName, () -> stat.receivedBytes);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME,
MetricUnit.MILLISECONDS,
+ "per job task execute time of routine load", jobId,
jobName,
+ () -> stat.totalTaskExcutionTimeMs);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT,
MetricUnit.NOUNIT,
+ "per job task execute count of routine load", jobId,
jobName, () -> stat.committedTaskNum);
+ addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_PROGRESS,
MetricUnit.NOUNIT,
+ "per job routine load progress", jobId, jobName,
job::totalProgress);
+ addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_LAG,
MetricUnit.NOUNIT,
+ "per job routine load lag", jobId, jobName,
job::totalLag);
+
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM,
MetricUnit.NOUNIT,
+ "per job number of aborted tasks in routine load",
jobId, jobName,
+ () -> stat.abortedTaskNum);
+ }
+ } catch (Throwable t) {
+ LOG.warn("failed to update routine load per-job metrics", t);
+ }
+ }
+
+ private static void addRoutineLoadPerJobGaugeMetric(String metricName,
MetricUnit unit, String description,
+ String jobId, String jobName, Supplier<Long> valueSupplier) {
+ GaugeMetric<Long> gauge = new GaugeMetric<Long>(metricName, unit,
description) {
+ @Override
+ public Long getValue() {
+ return valueSupplier.get();
+ }
+ };
+ gauge.addLabel(new MetricLabel("job_id", jobId))
+ .addLabel(new MetricLabel("job_name", jobName));
+ DORIS_METRIC_REGISTER.addMetrics(gauge);
+ }
+
private static void initStreamingJobMetrics() {
// streaming insert jobs
for (JobStatus jobStatus : JobStatus.values()) {
@@ -1509,6 +1580,9 @@ public final class MetricRepo {
// update load job metrics
updateLoadJobMetrics();
+ // update per-job routine load metrics
+ updateRoutineLoadJobPerJobMetrics();
+
// update per-job streaming job metrics
updateStreamingJobPerJobMetrics();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]