This is an automated email from the ASF dual-hosted git repository.
adonisling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 26ccb6ba5a [feature-wip](MTMV) Add some metrics for MTMV (#16913)
26ccb6ba5a is described below
commit 26ccb6ba5af1c3817d59f109009f9bc461ba3489
Author: huangzhaowei <[email protected]>
AuthorDate: Mon Feb 27 11:27:23 2023 +0800
[feature-wip](MTMV) Add some metrics for MTMV (#16913)
Demo:
```
# HELP doris_fe_mtmv_job Total job number of mtmv.
# TYPE doris_fe_mtmv_job gauge
doris_fe_mtmv_job{type="TOTAL-JOB"} 1
doris_fe_mtmv_job{type="ACTIVE-JOB"} 1
# HELP doris_fe_mtmv_task Running task number of mtmv.
# TYPE doris_fe_mtmv_task gauge
doris_fe_mtmv_task{type="RUNNING-TASK"} 0
doris_fe_mtmv_task{type="PENDING-TASK"} 0
doris_fe_mtmv_task{type="FAILED-TASK"} 0
doris_fe_mtmv_task{type="TOTAL-TASK"} 1
```
---
.../apache/doris/metric/DorisMetricRegistry.java | 6 +-
.../java/org/apache/doris/mtmv/MTMVJobManager.java | 81 ++++++++++++++++++++++
.../org/apache/doris/mtmv/MTMVTaskManager.java | 14 +++-
.../org/apache/doris/mtmv/MTMVJobManagerTest.java | 25 +++++++
4 files changed, 120 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
index 24b96ad86c..73b49c12eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/DorisMetricRegistry.java
@@ -19,8 +19,6 @@ package org.apache.doris.metric;
import org.apache.doris.catalog.Env;
-import com.google.common.collect.Lists;
-
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -52,11 +50,11 @@ public class DorisMetricRegistry {
}
public synchronized List<Metric> getMetrics() {
- return Lists.newArrayList(metrics);
+ return
metrics.stream().sorted(Comparator.comparing(Metric::getName)).collect(Collectors.toList());
}
public synchronized List<Metric> getSystemMetrics() {
- return Lists.newArrayList(systemMetrics);
+ return
systemMetrics.stream().sorted(Comparator.comparing(Metric::getName)).collect(Collectors.toList());
}
// the metrics by metric name
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
index 642a2452ad..a6f10128a5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVJobManager.java
@@ -21,6 +21,10 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
+import org.apache.doris.metric.GaugeMetric;
+import org.apache.doris.metric.Metric;
+import org.apache.doris.metric.MetricLabel;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.mtmv.MTMVUtils.TaskRetryPolicy;
import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
@@ -58,6 +62,9 @@ import java.util.stream.Collectors;
public class MTMVJobManager {
private static final Logger LOG =
LogManager.getLogger(MTMVJobManager.class);
+ // make sure that metrics were registered only once.
+ private static volatile boolean metricsRegistered = false;
+
private final Map<Long, MTMVJob> idToJobMap;
private final Map<String, MTMVJob> nameToJobMap;
private final Map<Long, ScheduledFuture<?>> periodFutureMap;
@@ -113,7 +120,81 @@ public class MTMVJobManager {
}, 0, 1, TimeUnit.MINUTES);
taskManager.startTaskScheduler();
+ initMetrics();
+ }
+ }
+
+ private void initMetrics() {
+ if (metricsRegistered) {
+ return;
}
+ metricsRegistered = true;
+
+ // total jobs
+ GaugeMetric<Integer> totalJob = new GaugeMetric<Integer>("mtmv_job",
+ Metric.MetricUnit.NOUNIT, "Total job number of mtmv.") {
+ @Override
+ public Integer getValue() {
+ return nameToJobMap.size();
+ }
+ };
+ totalJob.addLabel(new MetricLabel("type", "TOTAL-JOB"));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(totalJob);
+
+ // active jobs
+ GaugeMetric<Integer> activeJob = new GaugeMetric<Integer>("mtmv_job",
+ Metric.MetricUnit.NOUNIT, "Active job number of mtmv.") {
+ @Override
+ public Integer getValue() {
+ return periodFutureMap.size();
+ }
+ };
+ activeJob.addLabel(new MetricLabel("type", "ACTIVE-JOB"));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(activeJob);
+
+ // total tasks
+ GaugeMetric<Integer> totalTask = new GaugeMetric<Integer>("mtmv_task",
+ Metric.MetricUnit.NOUNIT, "Total task number of mtmv.") {
+ @Override
+ public Integer getValue() {
+ return getTaskManager().getAllHistory().size();
+ }
+ };
+ totalTask.addLabel(new MetricLabel("type", "TOTAL-TASK"));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(totalTask);
+
+ // running tasks
+ GaugeMetric<Integer> runningTask = new
GaugeMetric<Integer>("mtmv_task",
+ Metric.MetricUnit.NOUNIT, "Running task number of mtmv.") {
+ @Override
+ public Integer getValue() {
+ return getTaskManager().getRunningTaskMap().size();
+ }
+ };
+ runningTask.addLabel(new MetricLabel("type", "RUNNING-TASK"));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(runningTask);
+
+ // pending tasks
+ GaugeMetric<Integer> pendingTask = new
GaugeMetric<Integer>("mtmv_task",
+ Metric.MetricUnit.NOUNIT, "Pending task number of mtmv.") {
+ @Override
+ public Integer getValue() {
+ return getTaskManager().getPendingTaskMap().size();
+ }
+ };
+ pendingTask.addLabel(new MetricLabel("type", "PENDING-TASK"));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(pendingTask);
+
+ // failed tasks
+ GaugeMetric<Integer> failedTask = new GaugeMetric<Integer>("mtmv_task",
+ Metric.MetricUnit.NOUNIT, "Failed task number of mtmv.") {
+ @Override
+ public Integer getValue() {
+ return getTaskManager().getFailedTaskCount();
+ }
+ };
+ failedTask.addLabel(new MetricLabel("type", "FAILED-TASK"));
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(failedTask);
}
public void stop() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
index 615fb2f37c..2de94e5201 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVTaskManager.java
@@ -50,6 +50,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -74,6 +75,8 @@ public class MTMVTaskManager {
private final MTMVJobManager mtmvJobManager;
+ private final AtomicInteger failedTaskCount = new AtomicInteger(0);
+
public MTMVTaskManager(MTMVJobManager mtmvJobManager) {
this.mtmvJobManager = mtmvJobManager;
}
@@ -194,8 +197,11 @@ public class MTMVTaskManager {
if (future.isDone()) {
runningIterator.remove();
addHistory(taskExecutor.getTask());
- changeAndLogTaskStatus(taskExecutor.getJobId(),
taskExecutor.getTask(), TaskState.RUNNING,
- taskExecutor.getTask().getState());
+ MTMVUtils.TaskState finalState =
taskExecutor.getTask().getState();
+ if (finalState == TaskState.FAILURE) {
+ failedTaskCount.incrementAndGet();
+ }
+ changeAndLogTaskStatus(taskExecutor.getJobId(),
taskExecutor.getTask(), TaskState.RUNNING, finalState);
TriggerMode triggerMode =
taskExecutor.getJob().getTriggerMode();
if (triggerMode == TriggerMode.ONCE) {
@@ -211,6 +217,10 @@ public class MTMVTaskManager {
}
}
+ public int getFailedTaskCount() {
+ return failedTaskCount.get();
+ }
+
private void scheduledPendingTask() {
int currentRunning = runningTaskMap.size();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
index 08f5e653a8..55cf3dceb0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVJobManagerTest.java
@@ -18,6 +18,8 @@
package org.apache.doris.mtmv;
import org.apache.doris.common.DdlException;
+import org.apache.doris.metric.Metric;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.MTMVJob;
@@ -135,4 +137,27 @@ public class MTMVJobManagerTest extends TestWithFeService {
// index 10: ErrorCode
//Assertions.assertEquals("0", taskRow.get(10));
}
+
+ @Test
+ public void testMetrics() {
+ MTMVJobManager jobManager = new MTMVJobManager();
+ jobManager.start();
+
+ int jobMetricCount = 0;
+ int taskMetricCount = 0;
+ List<Metric> metrics = MetricRepo.DORIS_METRIC_REGISTER.getMetrics();
+ for (Metric metric : metrics) {
+ if (metric.getName().startsWith("mtmv")) {
+ if (metric.getName().equals("mtmv_job")) {
+ jobMetricCount++;
+ } else if (metric.getName().equals("mtmv_task")) {
+ taskMetricCount++;
+ } else {
+ Assertions.fail();
+ }
+ }
+ }
+ Assertions.assertEquals(2, jobMetricCount);
+ Assertions.assertEquals(4, taskMetricCount);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]