This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5be6557aa56 [improve](streaming-job) add per-job lag metric to 
streaming insert jobs (#63194)
5be6557aa56 is described below

commit 5be6557aa5651d83a8163b4d83822fdc73fb3395
Author: wudi <[email protected]>
AuthorDate: Fri May 15 09:53:06 2026 +0800

    [improve](streaming-job) add per-job lag metric to streaming insert jobs 
(#63194)
    
    ### What problem does this PR solve?
    
    Related PR: #62224 (per-job metrics), #62269 (Lag column)
    
    Problem Summary:
    
    #62224 introduced per-job metrics (`streaming_job_per_job_scanned_rows`,
    `_load_bytes`, `_filtered_rows`, `_succeed_task_count`,
    `_failed_task_count`) for streaming insert jobs, exposed via `/metrics`
    with `job_id`/`job_name` labels for Prometheus.
    
    #62269 later added a `Lag` column to `SHOW JOBS` / `jobs()` TVF that
    reports end-to-end CDC delay in seconds, but the value was only exposed
    through SQL — there was no corresponding Prometheus metric, so
    dashboards/alerting on lag was not possible.
    
    This PR adds `streaming_job_per_job_lag` (unit: `SECONDS`) to the
    existing per-job metric set.
---
 .../insert/streaming/StreamingInsertJob.java           | 18 ++++++++++++++++++
 .../main/java/org/apache/doris/metric/MetricRepo.java  | 14 ++++++++++++++
 .../cdc/test_streaming_mysql_job_metrics.groovy        | 13 +++++++++++--
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index e86e7fcda64..7e6c2c3830b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -885,6 +885,24 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         super.onReplayCreate();
     }
 
+    public String getLag() {
+        return offsetProvider != null ? offsetProvider.getLag() : "";
+    }
+
+    // Numeric lag for metrics. Returns -1 when lag is not applicable (S3, 
snapshot phase)
+    // or unparseable, so dashboards can filter N/A jobs via lag >= 0.
+    public long getLagSeconds() {
+        String lagStr = getLag();
+        if (lagStr == null || lagStr.isEmpty()) {
+            return -1L;
+        }
+        try {
+            return Long.parseLong(lagStr);
+        } catch (NumberFormatException e) {
+            return -1L;
+        }
+    }
+
     /**
      * Because the offset statistics of the streamingInsertJob are all stored 
in txn,
      * only some fields are replayed here.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 83168087926..9b3e37d8f2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -91,6 +91,7 @@ public final class MetricRepo {
     public static final String STREAMING_JOB_PER_JOB_FILTERED_ROWS = 
"streaming_job_per_job_filtered_rows";
     public static final String STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT = 
"streaming_job_per_job_succeed_task_count";
     public static final String STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT = 
"streaming_job_per_job_failed_task_count";
+    public static final String STREAMING_JOB_PER_JOB_LAG = 
"streaming_job_per_job_lag";
     public static final String CLOUD_TAG = "cloud";
 
     public static LongCounterMetric COUNTER_REQUEST_ALL;
@@ -1246,6 +1247,7 @@ public final class MetricRepo {
         
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_FILTERED_ROWS);
         
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT);
         
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT);
+        DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_LAG);
 
         try {
             List<org.apache.doris.job.base.AbstractJob> jobs =
@@ -1328,6 +1330,18 @@ public final class MetricRepo {
                 failedTaskCount.addLabel(new MetricLabel("job_id", jobId))
                         .addLabel(new MetricLabel("job_name", jobName));
                 DORIS_METRIC_REGISTER.addMetrics(failedTaskCount);
+
+                GaugeMetric<Long> lag = new GaugeMetric<Long>(
+                        STREAMING_JOB_PER_JOB_LAG, MetricUnit.SECONDS,
+                        "per job lag in seconds of streaming job, -1 means 
N/A") {
+                    @Override
+                    public Long getValue() {
+                        return sJob.getLagSeconds();
+                    }
+                };
+                lag.addLabel(new MetricLabel("job_id", jobId))
+                        .addLabel(new MetricLabel("job_name", jobName));
+                DORIS_METRIC_REGISTER.addMetrics(lag);
             }
         } catch (Throwable t) {
             LOG.warn("failed to update streaming job per-job metrics", t);
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
index e01e1d1908e..fc317a3963a 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
@@ -200,12 +200,21 @@ suite("test_streaming_mysql_job_metrics",
                         metricCount++
                     }
 
+                    def perJobLag = result.find {
+                        it.tags?.metric == 
"doris_fe_streaming_job_per_job_lag" &&
+                        it.tags?.job_name == "${jobName}"
+                    }
+                    if (perJobLag != null) {
+                        log.info("per-job lag: ${perJobLag}".toString())
+                        metricCount++
+                    }
+
 
                 }
             }
 
-            // 9 streaming_job_* counters + 1 doris_fe_job RUNNING gauge + 5 
per-job metrics
-            if (metricCount >= 15) {
+            // 9 streaming_job_* counters + 1 doris_fe_job RUNNING gauge + 6 
per-job metrics
+            if (metricCount >= 16) {
                 break
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to