This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new dc4bc1bddce branch-4.0: [Improve](StreamingJob) add more metrics to 
observe the streaming job #60493 (#60571)
dc4bc1bddce is described below

commit dc4bc1bddcea3fff46bf1e44c49db7169ca8ab6b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Feb 7 22:06:13 2026 +0800

    branch-4.0: [Improve](StreamingJob) add more metrics to observe the 
streaming job #60493 (#60571)
    
    Cherry-picked from #60493
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingInsertJob.java       |  27 +++-
 .../java/org/apache/doris/metric/MetricRepo.java   |  86 ++++++++++
 .../cdc/test_streaming_mysql_job_metrics.groovy    | 180 +++++++++++++++++++++
 .../test_routin_load_abnormal_job_monitor.groovy   |  15 +-
 4 files changed, 305 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 43fe652ee06..c820b8d532a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -54,6 +54,7 @@ import 
org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
 import org.apache.doris.job.util.StreamingJobUtils;
 import org.apache.doris.load.loadv2.LoadJob;
 import org.apache.doris.load.loadv2.LoadStatistic;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
@@ -516,6 +517,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     protected void fetchMeta() throws JobException {
+        long start = System.currentTimeMillis();
         try {
             if (tvfType != null) {
                 if (originTvfProps == null) {
@@ -537,7 +539,13 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 // If fetching meta fails, the job is paused
                 // and auto resume will automatically wake it up.
                 this.updateJobStatus(JobStatus.PAUSED);
+
+                
MetricRepo.COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT.increase(1L);
             }
+        } finally {
+            long end = System.currentTimeMillis();
+            MetricRepo.COUNTER_STREAMING_JOB_GET_META_LANTENCY.increase(end - 
start);
+            MetricRepo.COUNTER_STREAMING_JOB_GET_META_COUNT.increase(1L);
         }
     }
 
@@ -584,6 +592,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             failedTaskCount.incrementAndGet();
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             this.failureReason = new FailureReason(task.getErrMsg());
+            MetricRepo.COUNTER_STREAMING_JOB_TASK_FAILED_COUNT.increase(1L);
         } finally {
             writeUnlock();
         }
@@ -594,6 +603,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         try {
             resetFailureInfo(null);
             succeedTaskCount.incrementAndGet();
+            //update metric
+            MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT.increase(1L);
+            
MetricRepo.COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME.increase(task.getFinishTimeMs()
 - task.getStartTimeMs());
+
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
             AbstractStreamingTask nextTask = createStreamingTask();
             this.runningStreamTask = nextTask;
@@ -613,6 +626,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.jobStatistic.setFileNumber(this.jobStatistic.getFileNumber() + 
attachment.getNumFiles());
         this.jobStatistic.setFileSize(this.jobStatistic.getFileSize() + 
attachment.getFileBytes());
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+
+        //update metric
+        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(attachment.getScannedRows());
+        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(attachment.getLoadBytes());
     }
 
     private void 
updateCloudJobStatisticAndOffset(StreamingTaskTxnCommitAttachment attachment) {
@@ -624,6 +641,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         this.jobStatistic.setFileNumber(attachment.getNumFiles());
         this.jobStatistic.setFileSize(attachment.getFileBytes());
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(attachment.getOffset()));
+
+        //update metric
+        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.update(attachment.getScannedRows());
+        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.update(attachment.getLoadBytes());
     }
 
     private void updateJobStatisticAndOffset(CommitOffsetRequest 
offsetRequest) {
@@ -645,6 +666,11 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 .setFilteredRows(this.nonTxnJobStatistic.getFilteredRows() + 
offsetRequest.getFilteredRows());
         
this.nonTxnJobStatistic.setLoadBytes(this.nonTxnJobStatistic.getLoadBytes() + 
offsetRequest.getLoadBytes());
         
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
+
+        //update metric
+        
MetricRepo.COUNTER_STREAMING_JOB_TOTAL_ROWS.increase(offsetRequest.getScannedRows());
+        
MetricRepo.COUNTER_STREAMING_JOB_FILTER_ROWS.increase(offsetRequest.getFilteredRows());
+        
MetricRepo.COUNTER_STREAMING_JOB_LOAD_BYTES.increase(offsetRequest.getLoadBytes());
     }
 
     @Override
@@ -658,7 +684,6 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         super.onReplayCreate();
     }
 
-
     /**
      * Because the offset statistics of the streamingInsertJob are all stored 
in txn,
      * only some fields are replayed here.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index b7bbfea437b..bfebc386e77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -28,7 +28,9 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.util.NetUtils;
+import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.common.TaskStatus;
+import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.loadv2.JobState;
 import org.apache.doris.load.loadv2.LoadManager;
@@ -156,6 +158,17 @@ public final class MetricRepo {
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_TIME;
     public static LongCounterMetric COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT;
 
+    // Streaming job
+    public static LongCounterMetric COUNTER_STREAMING_JOB_GET_META_LANTENCY;
+    public static LongCounterMetric COUNTER_STREAMING_JOB_GET_META_COUNT;
+    public static LongCounterMetric COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT;
+    public static LongCounterMetric COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME;
+    public static LongCounterMetric COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT;
+    public static LongCounterMetric COUNTER_STREAMING_JOB_TASK_FAILED_COUNT;
+    public static LongCounterMetric COUNTER_STREAMING_JOB_TOTAL_ROWS;
+    public static LongCounterMetric COUNTER_STREAMING_JOB_FILTER_ROWS;
+    public static LongCounterMetric COUNTER_STREAMING_JOB_LOAD_BYTES;
+
     public static LongCounterMetric COUNTER_HIT_SQL_BLOCK_RULE;
 
     public static AutoMappedMetric<LongCounterMetric> THRIFT_COUNTER_RPC_ALL;
@@ -300,6 +313,7 @@ public final class MetricRepo {
         }
 
         initRoutineLoadJobMetrics();
+        initStreamingJobMetrics();
 
         // running alter job
         Alter alter = Env.getCurrentEnv().getAlterInstance();
@@ -637,6 +651,35 @@ public final class MetricRepo {
                 MetricUnit.NOUNIT, "task execute count of routine load");
         
DORIS_METRIC_REGISTER.addMetrics(COUNTER_ROUTINE_LOAD_TASK_EXECUTE_COUNT);
 
+        // streaming job metrics
+        COUNTER_STREAMING_JOB_GET_META_LANTENCY = new 
LongCounterMetric("streaming_job_get_meta_latency",
+                MetricUnit.MILLISECONDS, "get meta lantency of streaming job");
+        
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_GET_META_LANTENCY);
+        COUNTER_STREAMING_JOB_GET_META_COUNT = new 
LongCounterMetric("streaming_job_get_meta_count",
+                MetricUnit.NOUNIT, "get meta count of streaming job");
+        DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_GET_META_COUNT);
+        COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT = new 
LongCounterMetric("streaming_job_get_meta_fail_count",
+                MetricUnit.NOUNIT, "get meta fail count of streaming job");
+        
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_GET_META_FAIL_COUNT);
+        COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME = new 
LongCounterMetric("streaming_job_task_execute_time",
+                MetricUnit.MILLISECONDS, "task execute time of streaming job");
+        
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_TASK_EXECUTE_TIME);
+        COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT = new 
LongCounterMetric("streaming_job_task_execute_count",
+                MetricUnit.NOUNIT, "task execute count of streaming job");
+        
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_TASK_EXECUTE_COUNT);
+        COUNTER_STREAMING_JOB_TASK_FAILED_COUNT = new 
LongCounterMetric("streaming_job_task_failed_count",
+                MetricUnit.NOUNIT, "task failed count of streaming job");
+        
DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_TASK_FAILED_COUNT);
+        COUNTER_STREAMING_JOB_TOTAL_ROWS = new 
LongCounterMetric("streaming_job_total_rows", MetricUnit.ROWS,
+                "total rows of streaming job");
+        DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_TOTAL_ROWS);
+        COUNTER_STREAMING_JOB_FILTER_ROWS = new 
LongCounterMetric("streaming_job_filter_rows", MetricUnit.ROWS,
+                "filter rows of streaming job");
+        DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_FILTER_ROWS);
+        COUNTER_STREAMING_JOB_LOAD_BYTES = new 
LongCounterMetric("streaming_job_load_bytes", MetricUnit.BYTES,
+                "load bytes of streaming job");
+        DORIS_METRIC_REGISTER.addMetrics(COUNTER_STREAMING_JOB_LOAD_BYTES);
+
         COUNTER_HIT_SQL_BLOCK_RULE = new 
LongCounterMetric("counter_hit_sql_block_rule", MetricUnit.ROWS,
                 "total hit sql block rule query");
         DORIS_METRIC_REGISTER.addMetrics(COUNTER_HIT_SQL_BLOCK_RULE);
@@ -1059,6 +1102,49 @@ public final class MetricRepo {
         DORIS_METRIC_REGISTER.addMetrics(gauge);
     }
 
+    private static void initStreamingJobMetrics() {
+        // streaming insert jobs
+        for (JobStatus jobStatus : JobStatus.values()) {
+            if (jobStatus == JobStatus.PAUSED) {
+                addStreamingJobStateGaugeMetric(jobStatus, "USER_PAUSED",
+                        job -> job.getFailureReason() != null
+                                && job.getFailureReason().getCode() == 
InternalErrorCode.MANUAL_PAUSE_ERR);
+                addStreamingJobStateGaugeMetric(jobStatus, "ABNORMAL_PAUSED",
+                        job -> job.getFailureReason() != null
+                                && job.getFailureReason().getCode() != 
InternalErrorCode.MANUAL_PAUSE_ERR);
+            }
+            addStreamingJobStateGaugeMetric(jobStatus, jobStatus.name(), job 
-> true);
+        }
+    }
+
+    private static void addStreamingJobStateGaugeMetric(
+            JobStatus jobStatus, String stateLabel, 
Predicate<StreamingInsertJob> filter) {
+
+        GaugeMetric<Long> gauge = new GaugeMetric<Long>(
+                "job", MetricUnit.NOUNIT, "streaming job statistics") {
+            @Override
+            public Long getValue() {
+                if (!Env.getCurrentEnv().isMaster()) {
+                    return 0L;
+                }
+                List<org.apache.doris.job.base.AbstractJob> jobs =
+                        
Env.getCurrentEnv().getJobManager().queryJobs(org.apache.doris.job.common.JobType.INSERT);
+
+                return jobs.stream()
+                        .filter(job -> job instanceof StreamingInsertJob)
+                        .map(job -> (StreamingInsertJob) job)
+                        .filter(job -> job.getJobStatus() == jobStatus)
+                        .filter(filter)
+                        .count();
+            }
+        };
+
+        gauge.addLabel(new MetricLabel("job", "load"))
+                .addLabel(new MetricLabel("type", "STREAMING_JOB"))
+                .addLabel(new MetricLabel("state", stateLabel));
+        DORIS_METRIC_REGISTER.addMetrics(gauge);
+    }
+
     private static void initSystemMetrics() {
         // TCP retransSegs
         GaugeMetric<Long> tcpRetransSegs = (GaugeMetric<Long>) new 
GaugeMetric<Long>(
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
new file mode 100644
index 00000000000..644e63ce5f0
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_metrics.groovy
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+import groovy.json.JsonSlurper
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_mysql_job_metrics",
+      "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+
+    def jobName = "test_streaming_mysql_job_metrics"
+    def currentDb = (sql "select database()")[0][0]
+    def mysqlDb = "test_cdc_db"
+    def mysqlTable = "user_info_metrics"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}"""
+            sql """CREATE TABLE ${mysqlDb}.${mysqlTable} (
+                      `name` varchar(200) NOT NULL,
+                      `age` int DEFAULT NULL,
+                      PRIMARY KEY (`name`)
+                   ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES 
('Alice', 10)"""
+            sql """INSERT INTO ${mysqlDb}.${mysqlTable} (name, age) VALUES 
('Bob', 20)"""
+        }
+
+        // create streaming job: FROM MYSQL ... TO DATABASE currentDb
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING
+            FROM MYSQL (
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "include_tables" = "${mysqlTable}",
+                "offset" = "initial"
+            )
+            TO DATABASE ${currentDb} (
+                "table.create.properties.replication_num" = "1"
+            )
+        """
+
+        try {
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until({
+                        def jobInfo = sql """
+                            select SucceedTaskCount, Status
+                            from jobs("type"="insert")
+                            where Name = '${jobName}' and 
ExecuteType='STREAMING'
+                        """
+                        log.info("metrics job status: " + jobInfo)
+                        jobInfo.size() == 1 &&
+                                Integer.parseInt(jobInfo[0][0] as String) >= 1 
&&
+                                (jobInfo[0][1] as String) == "RUNNING"
+                    })
+        } catch (Exception ex) {
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("metrics show job: " + showjob)
+            log.info("metrics show task: " + showtask)
+            throw ex
+        }
+
+        int count = 0
+        int metricCount = 0
+        while (true) {
+            metricCount = 0
+            httpTest {
+                endpoint context.config.feHttpAddress
+                uri "/metrics?type=json"
+                op "get"
+                check { code, body ->
+                    logger.debug("code:${code} body:${body}")
+
+                    if 
(body.contains("doris_fe_streaming_job_get_meta_latency")) {
+                        log.info("contain 
doris_fe_streaming_job_get_meta_latency")
+                        metricCount++
+                    }
+                    if 
(body.contains("doris_fe_streaming_job_get_meta_count")) {
+                        log.info("contain 
doris_fe_streaming_job_get_meta_count")
+                        metricCount++
+                    }
+                    if 
(body.contains("doris_fe_streaming_job_get_meta_fail_count")) {
+                        log.info("contain 
doris_fe_streaming_job_get_meta_fail_count")
+                        metricCount++
+                    }
+                    if 
(body.contains("doris_fe_streaming_job_task_execute_time")) {
+                        log.info("contain 
doris_fe_streaming_job_task_execute_time")
+                        metricCount++
+                    }
+                    if 
(body.contains("doris_fe_streaming_job_task_execute_count")) {
+                        log.info("contain 
doris_fe_streaming_job_task_execute_count")
+                        metricCount++
+                    }
+                    if 
(body.contains("doris_fe_streaming_job_task_failed_count")) {
+                        log.info("contain 
doris_fe_streaming_job_task_failed_count")
+                        metricCount++
+                    }
+                    if (body.contains("doris_fe_streaming_job_total_rows")) {
+                        log.info("contain doris_fe_streaming_job_total_rows")
+                        metricCount++
+                    }
+                    if (body.contains("doris_fe_streaming_job_filter_rows")) {
+                        log.info("contain doris_fe_streaming_job_filter_rows")
+                        metricCount++
+                    }
+                    if (body.contains("doris_fe_streaming_job_load_bytes")) {
+                        log.info("contain doris_fe_streaming_job_load_bytes")
+                        metricCount++
+                    }
+
+                    // check doris_fe_job gauge: STREAMING_JOB in RUNNING 
state should be exactly 1
+                    def jsonSlurper = new JsonSlurper()
+                    def result = jsonSlurper.parseText(body)
+                    def entry = result.find {
+                        it.tags?.metric == "doris_fe_job" &&
+                        it.tags?.job == "load" &&
+                        it.tags?.type == "STREAMING_JOB" &&
+                        it.tags?.state == "RUNNING"
+                    }
+                    def value = entry ? entry.value : null
+                    log.info("streaming job RUNNING metric entry: 
${entry}".toString())
+                    log.info("streaming job RUNNING value: 
${value}".toString())
+                    if (value >= 1) {
+                        metricCount++
+                    }
+
+                }
+            }
+
+            // 9 streaming_job_* counters + 1 doris_fe_job RUNNING gauge
+            if (metricCount >= 10) {
+                break
+            }
+
+            count++
+            sleep(1000)
+            if (count > 60) {
+                // timeout, failed
+                assertEquals(1, 2)
+            }
+        }
+
+        
+
+        sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        def jobCountRsp = sql """select count(1) from jobs("type"="insert")  
where Name ='${jobName}'"""
+        assert jobCountRsp.get(0).get(0) == 0
+    }
+}
+
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
index ea466010929..a6cb574d62d 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routin_load_abnormal_job_monitor.groovy
@@ -201,7 +201,12 @@ suite("test_routine_load_abnormal_job_monitor","p0") {
                         def jsonSlurper = new JsonSlurper()
                         def result = jsonSlurper.parseText(body)
 
-                        def entry = result.find { it.tags?.metric == 
"doris_fe_job" && it.tags?.state == "ABNORMAL_PAUSED"}
+                        def entry = result.find {
+                            it.tags?.metric == "doris_fe_job" &&
+                            it.tags?.job == "load" &&
+                            it.tags?.type == "ROUTINE_LOAD" &&
+                            it.tags?.state == "ABNORMAL_PAUSED"
+                        }
                         def value = entry ? entry.value : null
                         log.info("Contains ABNORMAL_PAUSE: ${entry != 
null}".toString())
                         log.info("Value of ABNORMAL_PAUSE: 
${value}".toString())
@@ -209,7 +214,13 @@ suite("test_routine_load_abnormal_job_monitor","p0") {
                             metricCount++
                         }
 
-                        entry = result.find { it.tags?.metric == 
"doris_fe_job" && it.tags?.state == "USER_PAUSED"}
+                        entry = result.find {
+                            it.tags?.metric == "doris_fe_job" &&
+                            it.tags?.job == "load" &&
+                            it.tags?.type == "ROUTINE_LOAD" &&
+                            it.tags?.state == "USER_PAUSED"
+                        }
+
                         value = entry ? entry.value : null
                         log.info("Contains USER_PAUSE: ${entry != 
null}".toString())
                         log.info("Value of USER_PAUSE: ${value}".toString())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to