This is an automated email from the ASF dual-hosted git repository.

adonisling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 26ccb6ba5a [feature-wip](MTMV) Add some metrics for MTMV (#16913)
26ccb6ba5a is described below

commit 26ccb6ba5af1c3817d59f109009f9bc461ba3489
Author: huangzhaowei <[email protected]>
AuthorDate: Mon Feb 27 11:27:23 2023 +0800

    [feature-wip](MTMV) Add some metrics for MTMV (#16913)
    
    Demo:
    
    ```
    # HELP doris_fe_mtmv_job Total job number of mtmv.
    # TYPE doris_fe_mtmv_job gauge
    doris_fe_mtmv_job{type="TOTAL-JOB"} 1
    doris_fe_mtmv_job{type="ACTIVE-JOB"} 1
    # HELP doris_fe_mtmv_task Running task number of mtmv.
    # TYPE doris_fe_mtmv_task gauge
    doris_fe_mtmv_task{type="RUNNING-TASK"} 0
    doris_fe_mtmv_task{type="PENDING-TASK"} 0
    doris_fe_mtmv_task{type="FAILED-TASK"} 0
    doris_fe_mtmv_task{type="TOTAL-TASK"} 1
    ```
---
 .../apache/doris/metric/DorisMetricRegistry.java   |  6 +-
 .../java/org/apache/doris/mtmv/MTMVJobManager.java | 81 ++++++++++++++++++++++
 .../org/apache/doris/mtmv/MTMVTaskManager.java     | 14 +++-
 .../org/apache/doris/mtmv/MTMVJobManagerTest.java  | 25 +++++++
 4 files changed, 120 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
index 24b96ad86c..73b49c12eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
@@ -19,8 +19,6 @@ package org.apache.doris.metric;
 
 import org.apache.doris.catalog.Env;
 
-import com.google.common.collect.Lists;
-
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
@@ -52,11 +50,11 @@ public class DorisMetricRegistry {
     }
 
     public synchronized List<Metric> getMetrics() {
-        return Lists.newArrayList(metrics);
+        return 
metrics.stream().sorted(Comparator.comparing(Metric::getName)).collect(Collectors.toList());
     }
 
     public synchronized List<Metric> getSystemMetrics() {
-        return Lists.newArrayList(systemMetrics);
+        return 
systemMetrics.stream().sorted(Comparator.comparing(Metric::getName)).collect(Collectors.toList());
     }
 
     // the metrics by metric name
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index 642a2452ad..a6f10128a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -21,6 +21,10 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric;
+import org.apache.doris.metric.MetricLabel;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mtmv.MTMVUtils.JobState;
 import org.apache.doris.mtmv.MTMVUtils.TaskRetryPolicy;
 import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
@@ -58,6 +62,9 @@ import java.util.stream.Collectors;
 public class MTMVJobManager {
     private static final Logger LOG = 
LogManager.getLogger(MTMVJobManager.class);
 
+    // make sure that metrics were registered only once.
+    private static volatile boolean metricsRegistered = false;
+
     private final Map<Long, MTMVJob> idToJobMap;
     private final Map<String, MTMVJob> nameToJobMap;
     private final Map<Long, ScheduledFuture<?>> periodFutureMap;
@@ -113,7 +120,81 @@ public class MTMVJobManager {
             }, 0, 1, TimeUnit.MINUTES);
 
             taskManager.startTaskScheduler();
+            initMetrics();
+        }
+    }
+
+    private void initMetrics() {
+        if (metricsRegistered) {
+            return;
         }
+        metricsRegistered = true;
+
+        // total jobs
+        GaugeMetric<Integer> totalJob = new GaugeMetric<Integer>("mtmv_job",
+                Metric.MetricUnit.NOUNIT, "Total job number of mtmv.") {
+            @Override
+            public Integer getValue() {
+                return nameToJobMap.size();
+            }
+        };
+        totalJob.addLabel(new MetricLabel("type", "TOTAL-JOB"));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(totalJob);
+
+        // active jobs
+        GaugeMetric<Integer> activeJob = new GaugeMetric<Integer>("mtmv_job",
+                Metric.MetricUnit.NOUNIT, "Active job number of mtmv.") {
+            @Override
+            public Integer getValue() {
+                return periodFutureMap.size();
+            }
+        };
+        activeJob.addLabel(new MetricLabel("type", "ACTIVE-JOB"));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(activeJob);
+
+        // total tasks
+        GaugeMetric<Integer> totalTask = new GaugeMetric<Integer>("mtmv_task",
+                Metric.MetricUnit.NOUNIT, "Total task number of mtmv.") {
+            @Override
+            public Integer getValue() {
+                return getTaskManager().getAllHistory().size();
+            }
+        };
+        totalTask.addLabel(new MetricLabel("type", "TOTAL-TASK"));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(totalTask);
+
+        // running tasks
+        GaugeMetric<Integer> runningTask = new 
GaugeMetric<Integer>("mtmv_task",
+                Metric.MetricUnit.NOUNIT, "Running task number of mtmv.") {
+            @Override
+            public Integer getValue() {
+                return getTaskManager().getRunningTaskMap().size();
+            }
+        };
+        runningTask.addLabel(new MetricLabel("type", "RUNNING-TASK"));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(runningTask);
+
+        // pending tasks
+        GaugeMetric<Integer> pendingTask = new 
GaugeMetric<Integer>("mtmv_task",
+                Metric.MetricUnit.NOUNIT, "Pending task number of mtmv.") {
+            @Override
+            public Integer getValue() {
+                return getTaskManager().getPendingTaskMap().size();
+            }
+        };
+        pendingTask.addLabel(new MetricLabel("type", "PENDING-TASK"));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(pendingTask);
+
+        // failed tasks
+        GaugeMetric<Integer> failedTask = new GaugeMetric<Integer>("mtmv_task",
+                Metric.MetricUnit.NOUNIT, "Failed task number of mtmv.") {
+            @Override
+            public Integer getValue() {
+                return getTaskManager().getFailedTaskCount();
+            }
+        };
+        failedTask.addLabel(new MetricLabel("type", "FAILED-TASK"));
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(failedTask);
     }
 
     public void stop() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index 615fb2f37c..2de94e5201 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -50,6 +50,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
@@ -74,6 +75,8 @@ public class MTMVTaskManager {
 
     private final MTMVJobManager mtmvJobManager;
 
+    private final AtomicInteger failedTaskCount = new AtomicInteger(0);
+
     public MTMVTaskManager(MTMVJobManager mtmvJobManager) {
         this.mtmvJobManager = mtmvJobManager;
     }
@@ -194,8 +197,11 @@ public class MTMVTaskManager {
             if (future.isDone()) {
                 runningIterator.remove();
                 addHistory(taskExecutor.getTask());
-                changeAndLogTaskStatus(taskExecutor.getJobId(), 
taskExecutor.getTask(), TaskState.RUNNING,
-                        taskExecutor.getTask().getState());
+                MTMVUtils.TaskState finalState = 
taskExecutor.getTask().getState();
+                if (finalState == TaskState.FAILURE) {
+                    failedTaskCount.incrementAndGet();
+                }
+                changeAndLogTaskStatus(taskExecutor.getJobId(), 
taskExecutor.getTask(), TaskState.RUNNING, finalState);
 
                 TriggerMode triggerMode = 
taskExecutor.getJob().getTriggerMode();
                 if (triggerMode == TriggerMode.ONCE) {
@@ -211,6 +217,10 @@ public class MTMVTaskManager {
         }
     }
 
+    public int getFailedTaskCount() {
+        return failedTaskCount.get();
+    }
+
     private void scheduledPendingTask() {
         int currentRunning = runningTaskMap.size();
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
index 08f5e653a8..55cf3dceb0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
@@ -18,6 +18,8 @@
 package org.apache.doris.mtmv;
 
 import org.apache.doris.common.DdlException;
+import org.apache.doris.metric.Metric;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mtmv.MTMVUtils.JobState;
 import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
 import org.apache.doris.mtmv.metadata.MTMVJob;
@@ -135,4 +137,27 @@ public class MTMVJobManagerTest extends TestWithFeService {
         // index 10: ErrorCode
         //Assertions.assertEquals("0", taskRow.get(10));
     }
+
+    @Test
+    public void testMetrics() {
+        MTMVJobManager jobManager = new MTMVJobManager();
+        jobManager.start();
+
+        int jobMetricCount = 0;
+        int taskMetricCount = 0;
+        List<Metric> metrics = MetricRepo.DORIS_METRIC_REGISTER.getMetrics();
+        for (Metric metric : metrics) {
+            if (metric.getName().startsWith("mtmv")) {
+                if (metric.getName().equals("mtmv_job")) {
+                    jobMetricCount++;
+                } else if (metric.getName().equals("mtmv_task")) {
+                    taskMetricCount++;
+                } else {
+                    Assertions.fail();
+                }
+            }
+        }
+        Assertions.assertEquals(2, jobMetricCount);
+        Assertions.assertEquals(4, taskMetricCount);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to