This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit bda0206639c03fb124959e48e346894adb4881f2
Author: Liang.Hua <36814772+jacob...@users.noreply.github.com>
AuthorDate: Fri Nov 4 11:47:07 2022 +0800

    KYLIN-5346 add monitor metrics for long running jobs
---
 .../apache/kylin/rest/config/MetricsConfig.java    |  1 +
 .../rest/config/initialize/MetricsRegistry.java    | 63 ++++++++++++++++++++-
 .../config/initialize/MetricsRegistryTest.java     | 65 ++++++++++++++++------
 .../apache/kylin/common/metrics/MetricsTag.java    |  2 +
 .../metrics/prometheus/PrometheusMetrics.java      |  1 +
 5 files changed, 113 insertions(+), 19 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java
index f5920789d0..a0126eee3e 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java
@@ -79,6 +79,7 @@ public class MetricsConfig {
             Set<String> allProjects = 
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).listAllProjects()
                     
.stream().map(ProjectInstance::getName).collect(Collectors.toSet());
 
+            
MetricsRegistry.refreshProjectLongRunningJobs(KylinConfig.getInstanceFromEnv(), 
allProjects);
             Sets.SetView<String> newProjects = Sets.difference(allProjects, 
allControlledProjects);
             for (String newProject : newProjects) {
                 log.info("Register project metrics for {}", newProject);
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java
index f8d653dcba..987df10e52 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java
@@ -43,6 +43,7 @@ import 
org.apache.kylin.common.metrics.prometheus.PrometheusMetrics;
 import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.job.execution.NExecutableManager;
@@ -57,8 +58,6 @@ import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.user.ManagedUser;
-import org.apache.kylin.metadata.user.NKylinUserManager;
 import org.apache.kylin.query.util.LoadCounter;
 import org.apache.kylin.query.util.LoadDesc;
 import org.apache.kylin.rest.service.ProjectService;
@@ -72,6 +71,8 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.kylin.metadata.user.ManagedUser;
+import org.apache.kylin.metadata.user.NKylinUserManager;
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.Meter;
 import io.micrometer.core.instrument.MeterRegistry;
@@ -88,6 +89,10 @@ public class MetricsRegistry {
     private static final String GLOBAL = "global";
 
     private static final Map<String, Long> totalStorageSizeMap = 
Maps.newHashMap();
+    private static volatile Map<String, Map<Integer, Long>> 
projectPendingJobMap = Maps.newHashMap();
+    private static volatile Map<String, Map<Double, Long>> 
projectRunningJobMap = Maps.newHashMap();
+    private static final int[] PENDING_JOB_TIMEOUT_MINUTE = new int[] { 5, 10, 
15, 30 };
+    private static final double[] RUNNING_JOB_TIMEOUT_HOUR = new double[] { 
0.5d, 1d, 1.5d, 2d, 3d };
 
     private static final Logger logger = 
LoggerFactory.getLogger(MetricsRegistry.class);
 
@@ -99,6 +104,42 @@ public class MetricsRegistry {
         });
     }
 
+    public static void refreshProjectLongRunningJobs(KylinConfig kylinConfig, 
Set<String> projects) {
+        Map<String, Map<Integer, Long>> tempProjectPendingJobMap = 
Maps.newHashMap();
+        Map<String, Map<Double, Long>> tempProjectRunningJobMap = 
Maps.newHashMap();
+        for (String project : projects) {
+            final NExecutableManager executableManager = 
NExecutableManager.getInstance(kylinConfig, project);
+            tempProjectPendingJobMap.put(project, 
collectTimeoutToPendingJobsMap(executableManager));
+            tempProjectRunningJobMap.put(project, 
collectTimeoutToRunningJobsMap(executableManager));
+        }
+        projectPendingJobMap = tempProjectPendingJobMap;
+        projectRunningJobMap = tempProjectRunningJobMap;
+    }
+    
+    private static Map<Integer, Long> 
collectTimeoutToPendingJobsMap(NExecutableManager executableManager) {
+        Map<Integer, Long> timeoutToPendingJobsMap = Maps.newHashMap();
+        List<AbstractExecutable> pendingJobs = 
executableManager.getAllJobs().stream()
+                .filter(e -> 
ExecutableState.READY.name().equals(e.getOutput().getStatus()))
+                .map(executableManager::fromPO).collect(Collectors.toList());
+        for (int pendingJobMin : PENDING_JOB_TIMEOUT_MINUTE) {
+            timeoutToPendingJobsMap.put(pendingJobMin,
+                    pendingJobs.stream().filter(e -> e.getWaitTime() / 1000.0 
> pendingJobMin * 60).count());
+        }
+        return timeoutToPendingJobsMap;
+    }
+
+    private static Map<Double, Long> 
collectTimeoutToRunningJobsMap(NExecutableManager executableManager) {
+        Map<Double, Long> timeoutToRunningJobsMap = Maps.newHashMap();
+        List<AbstractExecutable> runningJobs = 
executableManager.getAllJobs().stream()
+                .filter(e -> 
ExecutableState.RUNNING.name().equals(e.getOutput().getStatus()))
+                .map(executableManager::fromPO).collect(Collectors.toList());
+        for (double runningJobHour : RUNNING_JOB_TIMEOUT_HOUR) {
+            timeoutToRunningJobsMap.put(runningJobHour,
+                    runningJobs.stream().filter(e -> e.getDuration() / 1000.0 
> runningJobHour * 3600).count());
+        }
+        return timeoutToRunningJobsMap;
+    }
+
     public static void removeProjectFromStorageSizeMap(String project) {
         totalStorageSizeMap.remove(project);
     }
@@ -189,6 +230,24 @@ public class MetricsRegistry {
                         : 
scheduler.getContext().getRunningJobs().values().stream()
                                 .filter(job -> 
ExecutableState.RUNNING.equals(job.getOutput().getState())).count())
                 .tags(projectTag).tags(MetricsTag.STATE.getVal(), 
MetricsTag.RUNNING.getVal()).register(meterRegistry);
+
+        for (double runningTimeoutHour : RUNNING_JOB_TIMEOUT_HOUR) {
+            Gauge.builder(PrometheusMetrics.JOB_LONG_RUNNING.getValue(),
+                    () -> 
MetricsRegistry.projectRunningJobMap.getOrDefault(project, Maps.newHashMap())
+                            .getOrDefault(runningTimeoutHour, 0L))
+                    .tags(projectTag).tags(MetricsTag.STATE.getVal(), 
MetricsTag.RUNNING.getVal(),
+                            MetricsTag.TIMEOUT.getVal(), runningTimeoutHour + 
"h")
+                    .register(meterRegistry);
+        }
+
+        for (int waitTimeoutMin : PENDING_JOB_TIMEOUT_MINUTE) {
+            Gauge.builder(PrometheusMetrics.JOB_LONG_RUNNING.getValue(),
+                    () -> 
MetricsRegistry.projectPendingJobMap.getOrDefault(project, Maps.newHashMap())
+                            .getOrDefault(waitTimeoutMin, 0L))
+                    .tags(projectTag).tags(MetricsTag.STATE.getVal(), 
MetricsTag.WAITING.getVal(),
+                            MetricsTag.TIMEOUT.getVal(), waitTimeoutMin + "m")
+                    .register(meterRegistry);
+        }
     }
 
     public static void registerHostMetrics(String host) {
diff --git 
a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
 
b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
index d414abf2e5..a3e02ca687 100644
--- 
a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
+++ 
b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
@@ -22,23 +22,15 @@ import static 
org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasou
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.commons.dbcp2.BasicDataSourceFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.execution.BaseTestExecutable;
-import org.apache.kylin.job.execution.DefaultOutput;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.JobTypeEnum;
-import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.job.execution.SucceedTestExecutable;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.rest.util.SpringContext;
 import org.apache.kylin.common.metrics.MetricsController;
 import org.apache.kylin.common.metrics.MetricsGroup;
 import org.apache.kylin.common.metrics.MetricsName;
@@ -46,9 +38,19 @@ import org.apache.kylin.common.metrics.MetricsTag;
 import org.apache.kylin.common.metrics.prometheus.PrometheusMetrics;
 import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultOutput;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
 import org.apache.kylin.query.util.LoadCounter;
 import org.apache.kylin.rest.response.StorageVolumeInfoResponse;
 import org.apache.kylin.rest.service.ProjectService;
+import org.apache.kylin.rest.util.SpringContext;
 import org.apache.spark.sql.SparderEnv;
 import org.junit.Assert;
 import org.junit.Before;
@@ -77,7 +79,7 @@ import lombok.var;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ SpringContext.class, MetricsGroup.class, 
UserGroupInformation.class, JdbcDataSource.class,
-        SparderEnv.class, NDefaultScheduler.class, LoadCounter.class})
+        SparderEnv.class, NDefaultScheduler.class, NExecutableManager.class, 
LoadCounter.class })
 public class MetricsRegistryTest extends NLocalFileMetadataTestCase {
 
     private MeterRegistry meterRegistry;
@@ -107,6 +109,7 @@ public class MetricsRegistryTest extends 
NLocalFileMetadataTestCase {
         PowerMockito.mockStatic(SpringContext.class);
         PowerMockito.mockStatic(SparderEnv.class);
         PowerMockito.mockStatic(NDefaultScheduler.class);
+        PowerMockito.mockStatic(NExecutableManager.class);
         PowerMockito.mockStatic(LoadCounter.class);
     }
 
@@ -209,6 +212,33 @@ public class MetricsRegistryTest extends 
NLocalFileMetadataTestCase {
         MetricsRegistry.registerProjectPrometheusMetrics(kylinConfig, project);
         Collection<Gauge> gauges6 = 
meterRegistry.find(PrometheusMetrics.JOB_COUNTS.getValue()).gauges();
         gauges6.forEach(e -> Assert.assertEquals(1, e.value(), 0));
+        Collection<Meter> meters4 = 
meterRegistry.find(PrometheusMetrics.JOB_LONG_RUNNING.getValue()).meters();
+        meters4.forEach(meter -> meterRegistry.remove(meter));
+        MetricsRegistry.registerProjectPrometheusMetrics(kylinConfig, project);
+        Collection<Gauge> gauges7 = 
meterRegistry.find(PrometheusMetrics.JOB_LONG_RUNNING.getValue()).gauges();
+        Assert.assertEquals(0, gauges7.stream().filter(e -> e.value() == 
1).count());
+        NExecutableManager executableManager = 
PowerMockito.mock(NExecutableManager.class);
+        PowerMockito.when(NExecutableManager.getInstance(kylinConfig, 
"default")).thenReturn(executableManager);
+        ExecutablePO mockExecutablePO = Mockito.mock(ExecutablePO.class);
+        ExecutablePO mockExecutablePO1 = Mockito.mock(ExecutablePO.class);
+        AbstractExecutable mockAbstractExecutable = 
Mockito.mock(AbstractExecutable.class);
+        AbstractExecutable mockAbstractExecutable1 = 
Mockito.mock(AbstractExecutable.class);
+        ExecutableOutputPO mockExecutableOutputPO = 
Mockito.mock(ExecutableOutputPO.class);
+        ExecutableOutputPO mockExecutableOutputPO1 = 
Mockito.mock(ExecutableOutputPO.class);
+        
Mockito.when(mockExecutablePO.getOutput()).thenReturn(mockExecutableOutputPO);
+        
Mockito.when(mockExecutablePO1.getOutput()).thenReturn(mockExecutableOutputPO1);
+        
Mockito.when(mockExecutableOutputPO.getStatus()).thenReturn(ExecutableState.READY.name());
+        
Mockito.when(mockExecutableOutputPO1.getStatus()).thenReturn(ExecutableState.RUNNING.name());
+        Mockito.when(mockAbstractExecutable.getWaitTime()).thenReturn(8 * 60 * 
1000L);
+        Mockito.when(mockAbstractExecutable1.getDuration()).thenReturn(3 * 60 
* 60 * 1000L);
+        
Mockito.when(executableManager.fromPO(mockExecutablePO)).thenReturn(mockAbstractExecutable);
+        
Mockito.when(executableManager.fromPO(mockExecutablePO1)).thenReturn(mockAbstractExecutable1);
+        Mockito.when(executableManager.getAllJobs())
+                .thenReturn(Lists.newArrayList(mockExecutablePO, 
mockExecutablePO1));
+        Set<String> projectSet = new HashSet<>();
+        projectSet.add(project);
+        MetricsRegistry.refreshProjectLongRunningJobs(kylinConfig, projectSet);
+        Assert.assertEquals(5, gauges7.stream().filter(e -> e.value() == 
1).count());
     }
 
 
@@ -220,17 +250,18 @@ public class MetricsRegistryTest extends 
NLocalFileMetadataTestCase {
         
Mockito.when(projectService.getStorageVolumeInfoResponse(project)).thenReturn(response);
         
PowerMockito.when(SpringContext.getBean(ProjectService.class)).thenReturn(projectService);
 
+        val manager = PowerMockito.mock(NExecutableManager.class);
+        PowerMockito.when(NExecutableManager.getInstance(getTestConfig(), 
project)).thenReturn(manager);
         MetricsRegistry.registerProjectMetrics(getTestConfig(), project, 
"localhost");
         MetricsRegistry.registerHostMetrics("localhost");
         List<Meter> meters = meterRegistry.getMeters();
         Assert.assertEquals(0, meters.size());
 
-        val manager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        BaseTestExecutable executable = new SucceedTestExecutable();
-        executable.setParam("test1", "test1");
-        executable.setProject(project);
-        executable.setJobType(JobTypeEnum.INDEX_BUILD);
-        manager.addJob(executable);
+        ExecutablePO mockExecutablePO = Mockito.mock(ExecutablePO.class);
+        ExecutableOutputPO mockExecutableOutputPO = 
Mockito.mock(ExecutableOutputPO.class);
+        
Mockito.when(mockExecutablePO.getOutput()).thenReturn(mockExecutableOutputPO);
+        
Mockito.when(mockExecutableOutputPO.getStatus()).thenReturn(ExecutableState.READY.name());
+        
Mockito.when(manager.getAllJobs()).thenReturn(Lists.newArrayList(mockExecutablePO));
 
         var result = MetricsController.getDefaultMetricRegistry()
                 
.getGauges(MetricFilter.contains(MetricsName.JOB_RUNNING_GAUGE.getVal()));
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java 
b/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java
index de7f235048..926070d66a 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java
@@ -36,6 +36,8 @@ public enum MetricsTag {
     RUNNING("running"), //
     JOB_CATEGORY("category"), //
     HOST("host"), //
+    TIMEOUT("timeout"), //
+    WAITING("waiting"), //
     HIT_SECOND_STORAGE("hit_second_storage");
 
     private final String value;
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java
 
b/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java
index 550edbe5ed..52c1e3d892 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java
@@ -42,6 +42,7 @@ public enum PrometheusMetrics {
 
     JOB_COUNTS("ke_job_counts", Type.PROJECT_METRIC), //
     JOB_MINUTES("ke_job_minutes", Type.PROJECT_METRIC), //
+    JOB_LONG_RUNNING("ke_long_running_jobs", Type.PROJECT_METRIC), //
 
     MODEL_BUILD_DURATION("ke_model_build_minutes", Type.PROJECT_METRIC | 
Type.MODEL_METRIC);
 

Reply via email to