This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 7c7646a908f branch-4.1: [feat](job) add per-job routine load metrics 
#63576 (#63953)
7c7646a908f is described below

commit 7c7646a908f44a36a9941b1ea7303b371cfea034
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jun 2 14:02:01 2026 +0800

    branch-4.1: [feat](job) add per-job routine load metrics #63576 (#63953)
    
    Cherry-picked from #63576
    
    Co-authored-by: hui lai <[email protected]>
---
 .../java/org/apache/doris/metric/MetricRepo.java   | 74 ++++++++++++++++++++++
 1 file changed, 74 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 1340f81f180..a55214fb061 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -39,6 +39,7 @@ import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadManager;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.load.routineload.RoutineLoadManager;
+import org.apache.doris.load.routineload.RoutineLoadStatistic;
 import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.monitor.jvm.JvmService;
 import org.apache.doris.monitor.jvm.JvmStats;
@@ -92,6 +93,16 @@ public final class MetricRepo {
     public static final String STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT = 
"streaming_job_per_job_succeed_task_count";
     public static final String STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT = 
"streaming_job_per_job_failed_task_count";
     public static final String STREAMING_JOB_PER_JOB_LAG = 
"streaming_job_per_job_lag";
+    public static final String ROUTINE_LOAD_PER_JOB_TOTAL_ROWS = 
"routine_load_per_job_total_rows";
+    public static final String ROUTINE_LOAD_PER_JOB_ERROR_ROWS = 
"routine_load_per_job_error_rows";
+    public static final String ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES = 
"routine_load_per_job_received_bytes";
+    public static final String ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME =
+            "routine_load_per_job_task_execute_time";
+    public static final String ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT =
+            "routine_load_per_job_task_execute_count";
+    public static final String ROUTINE_LOAD_PER_JOB_PROGRESS = 
"routine_load_per_job_progress";
+    public static final String ROUTINE_LOAD_PER_JOB_LAG = 
"routine_load_per_job_lag";
+    public static final String ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM = 
"routine_load_per_job_abort_task_num";
     public static final String CLOUD_TAG = "cloud";
 
     public static LongCounterMetric COUNTER_REQUEST_ALL;
@@ -1163,6 +1174,66 @@ public final class MetricRepo {
         DORIS_METRIC_REGISTER.addMetrics(gauge);
     }
 
+    public static void updateRoutineLoadJobPerJobMetrics() {
+        // Clear previous per-job gauges before checking mastership. If this 
FE loses mastership,
+        // the old gauges registered during the previous master epoch must not 
be exported.
+        DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TOTAL_ROWS);
+        DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_ERROR_ROWS);
+        
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES);
+        
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME);
+        
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT);
+        DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_PROGRESS);
+        DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_LAG);
+        
DORIS_METRIC_REGISTER.removeMetrics(ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM);
+
+        if (!Env.getCurrentEnv().isMaster()) {
+            return;
+        }
+
+        try {
+            RoutineLoadManager routineLoadManager = 
Env.getCurrentEnv().getRoutineLoadManager();
+            for (RoutineLoadJob job : 
routineLoadManager.getActiveRoutineLoadJobs()) {
+                String jobId = String.valueOf(job.getId());
+                String jobName = job.getName();
+                RoutineLoadStatistic stat = job.getRoutineLoadStatistic();
+
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TOTAL_ROWS, 
MetricUnit.ROWS,
+                        "per job total rows of routine load", jobId, jobName, 
() -> stat.totalRows);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_ERROR_ROWS, 
MetricUnit.ROWS,
+                        "per job error rows of routine load", jobId, jobName, 
() -> stat.errorRows);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_RECEIVED_BYTES, 
MetricUnit.BYTES,
+                        "per job received bytes of routine load", jobId, 
jobName, () -> stat.receivedBytes);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_TIME, 
MetricUnit.MILLISECONDS,
+                        "per job task execute time of routine load", jobId, 
jobName,
+                        () -> stat.totalTaskExcutionTimeMs);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_TASK_EXECUTE_COUNT, 
MetricUnit.NOUNIT,
+                        "per job task execute count of routine load", jobId, 
jobName, () -> stat.committedTaskNum);
+                addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_PROGRESS, 
MetricUnit.NOUNIT,
+                        "per job routine load progress", jobId, jobName, 
job::totalProgress);
+                addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_LAG, 
MetricUnit.NOUNIT,
+                        "per job routine load lag", jobId, jobName, 
job::totalLag);
+                
addRoutineLoadPerJobGaugeMetric(ROUTINE_LOAD_PER_JOB_ABORT_TASK_NUM, 
MetricUnit.NOUNIT,
+                        "per job number of aborted tasks in routine load", 
jobId, jobName,
+                        () -> stat.abortedTaskNum);
+            }
+        } catch (Throwable t) {
+            LOG.warn("failed to update routine load per-job metrics", t);
+        }
+    }
+
+    private static void addRoutineLoadPerJobGaugeMetric(String metricName, 
MetricUnit unit, String description,
+            String jobId, String jobName, Supplier<Long> valueSupplier) {
+        GaugeMetric<Long> gauge = new GaugeMetric<Long>(metricName, unit, 
description) {
+            @Override
+            public Long getValue() {
+                return valueSupplier.get();
+            }
+        };
+        gauge.addLabel(new MetricLabel("job_id", jobId))
+                .addLabel(new MetricLabel("job_name", jobName));
+        DORIS_METRIC_REGISTER.addMetrics(gauge);
+    }
+
     private static void initStreamingJobMetrics() {
         // streaming insert jobs
         for (JobStatus jobStatus : JobStatus.values()) {
@@ -1480,6 +1551,9 @@ public final class MetricRepo {
         // update load job metrics
         updateLoadJobMetrics();
 
+        // update per-job routine load metrics
+        updateRoutineLoadJobPerJobMetrics();
+
         // update per-job streaming job metrics
         updateStreamingJobPerJobMetrics();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to