This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f89bd8cbda7 branch-4.0: [feat](job) add per-job routine load metrics 
(#64274)
f89bd8cbda7 is described below

commit f89bd8cbda7c37dc2efe4fea3c5090411ab04006
Author: hui lai <[email protected]>
AuthorDate: Thu Jun 11 11:16:18 2026 +0800

    branch-4.0: [feat](job) add per-job routine load metrics (#64274)
    
    pick https://github.com/apache/doris/pull/63576
---
 .../java/org/apache/doris/metric/MetricRepo.java   | 74 ++++++++++++++++++++++
 1 file changed, 74 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index b507ceb2152..a60e1ec4bf2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -38,6 +38,7 @@ import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadManager;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.load.routineload.RoutineLoadManager;
+import org.apache.doris.load.routineload.RoutineLoadStatistic;
 import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.monitor.jvm.JvmService;
 import org.apache.doris.monitor.jvm.JvmStats;
@@ -84,6 +85,16 @@ public final class MetricRepo {
     public static final String TABLET_MAX_COMPACTION_SCORE = 
"tablet_max_compaction_score";
     public static final String TABLET_ACCESS_RECENT = "tablet_access_recent";
     public static final String TABLET_ACCESS_TOTAL = "tablet_access_total";
+    public static final String ROUTINE_LOAD_PER_JOB_TOTAL_ROWS = 
"routine_load_per_job_total_rows";
+    public static final String ROUTINE_LOAD_PER_JOB_ERROR_ROWS = 
"routine_load_per_job_error_rows";
+    public static final String ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES = 
"routine_load_per_job_received_bytes";
+    public static final String ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME =
+            "routine_load_per_job_task_execute_time";
+    public static final String ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT =
+            "routine_load_per_job_task_execute_count";
+    public static final String ROUTINE_LOAD_PER_JOB_PROGRESS = 
"routine_load_per_job_progress";
+    public static final String ROUTINE_LOAD_PER_JOB_LAG = 
"routine_load_per_job_lag";
+    public static final String ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM = 
"routine_load_per_job_abort_task_num";
     public static final String CLOUD_TAG = "cloud";
 
     public static LongCounterMetric COUNTER_REQUEST_ALL;
@@ -1156,6 +1167,66 @@ public final class MetricRepo {
         DORIS_METRIC_REGISTER.addMetrics(gauge);
     }
 
+    public static void updateRoutineLoadJobPerJobMetrics() {
+        // Clear previous per-job gauges before checking mastership. If this 
FE loses mastership,
+        // the old gauges registered during the previous master epoch must not 
be exported.
+        DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TOTAL_ROWS);
+        DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_ERROR_ROWS);
+        
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES);
+        
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME);
+        
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT);
+        DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_PROGRESS);
+        DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_LAG);
+        
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM);
+
+        if (!Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+
+        try {
+            RoutineLoadManager routineLoadManager = 
Env.getCurrentEnv().getRoutineLoadManager();
+            for (RoutineLoadJob job : 
routineLoadManager.getActiveRoutineLoadJobs()) {
+                String jobId = String.valueOf(job.getId());
+                String jobName = job.getName();
+                RoutineLoadStatistic stat = job.getRoutineLoadStatistic();
+
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TOTAL_ROWS, 
MetricUnit.ROWS,
+                        "per job total rows of routine load", jobId, jobName, 
() -> stat.totalRows);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_ERROR_ROWS, 
MetricUnit.ROWS,
+                        "per job error rows of routine load", jobId, jobName, 
() -> stat.errorRows);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES, 
MetricUnit.BYTES,
+                        "per job received bytes of routine load", jobId, 
jobName, () -> stat.receivedBytes);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME, 
MetricUnit.MILLISECONDS,
+                        "per job task execute time of routine load", jobId, 
jobName,
+                        () -> stat.totalTaskExcutionTimeMs);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT, 
MetricUnit.NOUNIT,
+                        "per job task execute count of routine load", jobId, 
jobName, () -> stat.committedTaskNum);
+                addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_PROGRESS, 
MetricUnit.NOUNIT,
+                        "per job routine load progress", jobId, jobName, 
job::totalProgress);
+                addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_LAG, 
MetricUnit.NOUNIT,
+                        "per job routine load lag", jobId, jobName, 
job::totalLag);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM, 
MetricUnit.NOUNIT,
+                        "per job number of aborted tasks in routine load", 
jobId, jobName,
+                        () -> stat.abortedTaskNum);
+            }
+        } catch (Throwable t) {
+            LOG.warn("failed to update routine load per-job metrics", t);
+        }
+    }
+
+    private static void addRoutineLoadPerJobGaugeMetric(String metricName, 
MetricUnit unit, String description,
+            String jobId, String jobName, Supplier<Long> valueSupplier) {
+        GaugeMetric<Long> gauge = new GaugeMetric<Long>(metricName, unit, 
description) {
+            @Override
+            public Long getValue() {
+                return valueSupplier.get();
+            }
+        };
+        gauge.addLabel(new MetricLabel("job_id", jobId))
+                .addLabel(new MetricLabel("job_name", jobName));
+        DORIS_METRIC_REGISTER.addMetrics(gauge);
+    }
+
     private static void initStreamingJobMetrics() {
         // streaming insert jobs
         for (JobStatus jobStatus : JobStatus.values()) {
@@ -1359,6 +1430,9 @@ public final class MetricRepo {
         // update load job metrics
         updateLoadJobMetrics();
 
+        // update per-job routine load metrics
+        updateRoutineLoadJobPerJobMetrics();
+
         // jvm
         JvmService jvmService = new JvmService();
         JvmStats jvmStats = jvmService.stats();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to