This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new b620c72f01b branch-4.1: [improve](streaming-job) add per-job lag 
metric to streaming insert jobs #63194 (#63271)
b620c72f01b is described below

commit b620c72f01b3304f1751c2c9600632c98c9a6a9d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 20 16:32:49 2026 +0800

    branch-4.1: [improve](streaming-job) add per-job lag metric to streaming 
insert jobs #63194 (#63271)
    
    Cherry-picked from #63194
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingInsertJob.java           | 18 ++++++++++++++++++
 .../main/java/org/apache/doris/metric/MetricRepo.java  | 14 ++++++++++++++
 .../cdc/test_streaming_mysql_job_metrics.groovy        | 13 +++++++++++--
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index f0fd31623fa..5fe72415c0e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -871,6 +871,24 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         super.onReplayCreate();
     }
 
+    public String getLag() {
+        return offsetProvider != null ? offsetProvider.getLag() : "";
+    }
+
+    // Numeric lag for metrics. Returns -1 when lag is not applicable (S3, 
snapshot phase)
+    // or unparseable, so dashboards can filter N/A jobs via lag >= 0.
+    public long getLagSeconds() {
+        String lagStr = getLag();
+        if (lagStr == null || lagStr.isEmpty()) {
+            return -1L;
+        }
+        try {
+            return Long.parseLong(lagStr);
+        } catch (NumberFormatException e) {
+            return -1L;
+        }
+    }
+
     /**
      * Because the offset statistics of the streamingInsertJob are all stored 
in txn,
      * only some fields are replayed here.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 0d4b7558077..1340f81f180 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -91,6 +91,7 @@ public final class MetricRepo {
     public static final String STREAMING_JOB_PER_JOB_FILTERED_ROWS = 
"streaming_job_per_job_filtered_rows";
     public static final String STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT = 
"streaming_job_per_job_succeed_task_count";
     public static final String STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT = 
"streaming_job_per_job_failed_task_count";
+    public static final String STREAMING_JOB_PER_JOB_LAG = 
"streaming_job_per_job_lag";
     public static final String CLOUD_TAG = "cloud";
 
     public static LongCounterMetric COUNTER_REQUEST_ALL;
@@ -1218,6 +1219,7 @@ public final class MetricRepo {
         
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_FILTERED_ROWS);
         
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_SUCCEED_TASK_COUNT);
         
DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_FAILED_TASK_COUNT);
+        DORIS_METRIC_REGISTER.removeMetrics(STREAMING_JOB_PER_JOB_LAG);
 
         try {
             List<org.apache.doris.job.base.AbstractJob> jobs =
@@ -1300,6 +1302,18 @@ public final class MetricRepo {
                 failedTaskCount.addLabel(new MetricLabel("job_id", jobId))
                         .addLabel(new MetricLabel("job_name", jobName));
                 DORIS_METRIC_REGISTER.addMetrics(failedTaskCount);
+
+                GaugeMetric<Long> lag = new GaugeMetric<Long>(
+                        STREAMING_JOB_PER_JOB_LAG, MetricUnit.SECONDS,
+                        "per job lag in seconds of streaming job, -1 means 
N/A") {
+                    @Override
+                    public Long getValue() {
+                        return sJob.getLagSeconds();
+                    }
+                };
+                lag.addLabel(new MetricLabel("job_id", jobId))
+                        .addLabel(new MetricLabel("job_name", jobName));
+                DORIS_METRIC_REGISTER.addMetrics(lag);
             }
         } catch (Throwable t) {
             LOG.warn("failed to update streaming job per-job metrics", t);
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
index e01e1d1908e..fc317a3963a 100644
--- 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
@@ -200,12 +200,21 @@ suite("test_streaming_mysql_job_metrics",
                         metricCount++
                     }
 
+                    def perJobLag = result.find {
+                        it.tags?.metric == 
"doris_fe_streaming_job_per_job_lag" &&
+                        it.tags?.job_name == "${jobName}"
+                    }
+                    if (perJobLag != null) {
+                        log.info("per-job lag: ${perJobLag}".toString())
+                        metricCount++
+                    }
+
 
                 }
             }
 
-            // 9 streaming_job_* counters + 1 doris_fe_job RUNNING gauge + 5 
per-job metrics
-            if (metricCount >= 15) {
+            // 9 streaming_job_* counters + 1 doris_fe_job RUNNING gauge + 6 
per-job metrics
+            if (metricCount >= 16) {
                 break
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to