This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit bda0206639c03fb124959e48e346894adb4881f2 Author: Liang.Hua <36814772+jacob...@users.noreply.github.com> AuthorDate: Fri Nov 4 11:47:07 2022 +0800 KYLIN-5346 add monitor metrics for long running jobs --- .../apache/kylin/rest/config/MetricsConfig.java | 1 + .../rest/config/initialize/MetricsRegistry.java | 63 ++++++++++++++++++++- .../config/initialize/MetricsRegistryTest.java | 65 ++++++++++++++++------ .../apache/kylin/common/metrics/MetricsTag.java | 2 + .../metrics/prometheus/PrometheusMetrics.java | 1 + 5 files changed, 113 insertions(+), 19 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java index f5920789d0..a0126eee3e 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java @@ -79,6 +79,7 @@ public class MetricsConfig { Set<String> allProjects = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).listAllProjects() .stream().map(ProjectInstance::getName).collect(Collectors.toSet()); + MetricsRegistry.refreshProjectLongRunningJobs(KylinConfig.getInstanceFromEnv(), allProjects); Sets.SetView<String> newProjects = Sets.difference(allProjects, allControlledProjects); for (String newProject : newProjects) { log.info("Register project metrics for {}", newProject); diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java index f8d653dcba..987df10e52 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java @@ -43,6 +43,7 @@ import org.apache.kylin.common.metrics.prometheus.PrometheusMetrics; import org.apache.kylin.common.persistence.metadata.JdbcDataSource; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.JobTypeEnum; import org.apache.kylin.job.execution.NExecutableManager; @@ -57,8 +58,6 @@ import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metadata.user.ManagedUser; -import org.apache.kylin.metadata.user.NKylinUserManager; import org.apache.kylin.query.util.LoadCounter; import org.apache.kylin.query.util.LoadDesc; import org.apache.kylin.rest.service.ProjectService; @@ -72,6 +71,8 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.kylin.metadata.user.ManagedUser; +import org.apache.kylin.metadata.user.NKylinUserManager; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.MeterRegistry; @@ -88,6 +89,10 @@ public class MetricsRegistry { private static final String GLOBAL = "global"; private static final Map<String, Long> totalStorageSizeMap = Maps.newHashMap(); + private static volatile Map<String, Map<Integer, Long>> projectPendingJobMap = Maps.newHashMap(); + private static volatile Map<String, Map<Double, Long>> projectRunningJobMap = Maps.newHashMap(); + private static final int[] PENDING_JOB_TIMEOUT_MINUTE = new int[] { 5, 10, 15, 30 }; + private static final double[] RUNNING_JOB_TIMEOUT_HOUR = new double[] { 0.5d, 1d, 1.5d, 2d, 3d }; private static final Logger logger = LoggerFactory.getLogger(MetricsRegistry.class); @@ -99,6 +104,42 @@ public class MetricsRegistry { }); } + public static void refreshProjectLongRunningJobs(KylinConfig kylinConfig, Set<String> projects) { + Map<String, Map<Integer, Long>> tempProjectPendingJobMap = Maps.newHashMap(); + Map<String, Map<Double, Long>> tempProjectRunningJobMap = Maps.newHashMap(); + for (String project : projects) { + final NExecutableManager executableManager = NExecutableManager.getInstance(kylinConfig, project); + tempProjectPendingJobMap.put(project, collectTimeoutToPendingJobsMap(executableManager)); + tempProjectRunningJobMap.put(project, collectTimeoutToRunningJobsMap(executableManager)); + } + projectPendingJobMap = tempProjectPendingJobMap; + projectRunningJobMap = tempProjectRunningJobMap; + } + + private static Map<Integer, Long> collectTimeoutToPendingJobsMap(NExecutableManager executableManager) { + Map<Integer, Long> timeoutToPendingJobsMap = Maps.newHashMap(); + List<AbstractExecutable> pendingJobs = executableManager.getAllJobs().stream() + .filter(e -> ExecutableState.READY.name().equals(e.getOutput().getStatus())) + .map(executableManager::fromPO).collect(Collectors.toList()); + for (int pendingJobMin : PENDING_JOB_TIMEOUT_MINUTE) { + timeoutToPendingJobsMap.put(pendingJobMin, + pendingJobs.stream().filter(e -> e.getWaitTime() / 1000.0 > pendingJobMin * 60).count()); + } + return timeoutToPendingJobsMap; + } + + private static Map<Double, Long> collectTimeoutToRunningJobsMap(NExecutableManager executableManager) { + Map<Double, Long> timeoutToRunningJobsMap = Maps.newHashMap(); + List<AbstractExecutable> runningJobs = executableManager.getAllJobs().stream() + .filter(e -> ExecutableState.RUNNING.name().equals(e.getOutput().getStatus())) + .map(executableManager::fromPO).collect(Collectors.toList()); + for (double runningJobHour : RUNNING_JOB_TIMEOUT_HOUR) { + timeoutToRunningJobsMap.put(runningJobHour, + runningJobs.stream().filter(e -> e.getDuration() / 1000.0 > runningJobHour * 3600).count()); + } + return timeoutToRunningJobsMap; + } + public static void removeProjectFromStorageSizeMap(String project) { totalStorageSizeMap.remove(project); } @@ -189,6 +230,24 @@ public class MetricsRegistry { : scheduler.getContext().getRunningJobs().values().stream() .filter(job -> ExecutableState.RUNNING.equals(job.getOutput().getState())).count()) .tags(projectTag).tags(MetricsTag.STATE.getVal(), MetricsTag.RUNNING.getVal()).register(meterRegistry); + + for (double runningTimeoutHour : RUNNING_JOB_TIMEOUT_HOUR) { + Gauge.builder(PrometheusMetrics.JOB_LONG_RUNNING.getValue(), + () -> MetricsRegistry.projectRunningJobMap.getOrDefault(project, Maps.newHashMap()) + .getOrDefault(runningTimeoutHour, 0L)) + .tags(projectTag).tags(MetricsTag.STATE.getVal(), MetricsTag.RUNNING.getVal(), + MetricsTag.TIMEOUT.getVal(), runningTimeoutHour + "h") + .register(meterRegistry); + } + + for (int waitTimeoutMin : PENDING_JOB_TIMEOUT_MINUTE) { + Gauge.builder(PrometheusMetrics.JOB_LONG_RUNNING.getValue(), + () -> MetricsRegistry.projectPendingJobMap.getOrDefault(project, Maps.newHashMap()) + .getOrDefault(waitTimeoutMin, 0L)) + .tags(projectTag).tags(MetricsTag.STATE.getVal(), MetricsTag.WAITING.getVal(), + MetricsTag.TIMEOUT.getVal(), waitTimeoutMin + "m") + .register(meterRegistry); + } } public static void registerHostMetrics(String host) { diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java index d414abf2e5..a3e02ca687 100644 --- a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java +++ b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java @@ -22,23 +22,15 @@ import static org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasou import java.io.IOException; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.dbcp2.BasicDataSourceFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.execution.BaseTestExecutable; -import org.apache.kylin.job.execution.DefaultOutput; -import org.apache.kylin.job.execution.Executable; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.JobTypeEnum; -import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.job.execution.SucceedTestExecutable; -import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; -import org.apache.kylin.rest.util.SpringContext; import org.apache.kylin.common.metrics.MetricsController; import org.apache.kylin.common.metrics.MetricsGroup; import org.apache.kylin.common.metrics.MetricsName; @@ -46,9 +38,19 @@ import org.apache.kylin.common.metrics.MetricsTag; import org.apache.kylin.common.metrics.prometheus.PrometheusMetrics; import org.apache.kylin.common.persistence.metadata.JdbcDataSource; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.job.dao.ExecutableOutputPO; +import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultOutput; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.NExecutableManager; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; import org.apache.kylin.query.util.LoadCounter; import org.apache.kylin.rest.response.StorageVolumeInfoResponse; import org.apache.kylin.rest.service.ProjectService; +import org.apache.kylin.rest.util.SpringContext; import org.apache.spark.sql.SparderEnv; import org.junit.Assert; import org.junit.Before; @@ -77,7 +79,7 @@ import lombok.var; @RunWith(PowerMockRunner.class) @PrepareForTest({ SpringContext.class, MetricsGroup.class, UserGroupInformation.class, JdbcDataSource.class, - SparderEnv.class, NDefaultScheduler.class, LoadCounter.class}) + SparderEnv.class, NDefaultScheduler.class, NExecutableManager.class, LoadCounter.class }) public class MetricsRegistryTest extends NLocalFileMetadataTestCase { private MeterRegistry meterRegistry; @@ -107,6 +109,7 @@ public class MetricsRegistryTest extends NLocalFileMetadataTestCase { PowerMockito.mockStatic(SpringContext.class); PowerMockito.mockStatic(SparderEnv.class); PowerMockito.mockStatic(NDefaultScheduler.class); + PowerMockito.mockStatic(NExecutableManager.class); PowerMockito.mockStatic(LoadCounter.class); } @@ -209,6 +212,33 @@ public class MetricsRegistryTest extends NLocalFileMetadataTestCase { MetricsRegistry.registerProjectPrometheusMetrics(kylinConfig, project); Collection<Gauge> gauges6 = meterRegistry.find(PrometheusMetrics.JOB_COUNTS.getValue()).gauges(); gauges6.forEach(e -> Assert.assertEquals(1, e.value(), 0)); + Collection<Meter> meters4 = meterRegistry.find(PrometheusMetrics.JOB_LONG_RUNNING.getValue()).meters(); + meters4.forEach(meter -> meterRegistry.remove(meter)); + MetricsRegistry.registerProjectPrometheusMetrics(kylinConfig, project); + Collection<Gauge> gauges7 = meterRegistry.find(PrometheusMetrics.JOB_LONG_RUNNING.getValue()).gauges(); + Assert.assertEquals(0, gauges7.stream().filter(e -> e.value() == 1).count()); + NExecutableManager executableManager = PowerMockito.mock(NExecutableManager.class); + PowerMockito.when(NExecutableManager.getInstance(kylinConfig, "default")).thenReturn(executableManager); + ExecutablePO mockExecutablePO = Mockito.mock(ExecutablePO.class); + ExecutablePO mockExecutablePO1 = Mockito.mock(ExecutablePO.class); + AbstractExecutable mockAbstractExecutable = Mockito.mock(AbstractExecutable.class); + AbstractExecutable mockAbstractExecutable1 = Mockito.mock(AbstractExecutable.class); + ExecutableOutputPO mockExecutableOutputPO = Mockito.mock(ExecutableOutputPO.class); + ExecutableOutputPO mockExecutableOutputPO1 = Mockito.mock(ExecutableOutputPO.class); + Mockito.when(mockExecutablePO.getOutput()).thenReturn(mockExecutableOutputPO); + Mockito.when(mockExecutablePO1.getOutput()).thenReturn(mockExecutableOutputPO1); + Mockito.when(mockExecutableOutputPO.getStatus()).thenReturn(ExecutableState.READY.name()); + Mockito.when(mockExecutableOutputPO1.getStatus()).thenReturn(ExecutableState.RUNNING.name()); + Mockito.when(mockAbstractExecutable.getWaitTime()).thenReturn(8 * 60 * 1000L); + Mockito.when(mockAbstractExecutable1.getDuration()).thenReturn(3 * 60 * 60 * 1000L); + Mockito.when(executableManager.fromPO(mockExecutablePO)).thenReturn(mockAbstractExecutable); + Mockito.when(executableManager.fromPO(mockExecutablePO1)).thenReturn(mockAbstractExecutable1); + Mockito.when(executableManager.getAllJobs()) + .thenReturn(Lists.newArrayList(mockExecutablePO, mockExecutablePO1)); + Set<String> projectSet = new HashSet<>(); + projectSet.add(project); + MetricsRegistry.refreshProjectLongRunningJobs(kylinConfig, projectSet); + Assert.assertEquals(5, gauges7.stream().filter(e -> e.value() == 1).count()); } @@ -220,17 +250,18 @@ public class MetricsRegistryTest extends NLocalFileMetadataTestCase { Mockito.when(projectService.getStorageVolumeInfoResponse(project)).thenReturn(response); PowerMockito.when(SpringContext.getBean(ProjectService.class)).thenReturn(projectService); + val manager = PowerMockito.mock(NExecutableManager.class); + PowerMockito.when(NExecutableManager.getInstance(getTestConfig(), project)).thenReturn(manager); MetricsRegistry.registerProjectMetrics(getTestConfig(), project, "localhost"); MetricsRegistry.registerHostMetrics("localhost"); List<Meter> meters = meterRegistry.getMeters(); Assert.assertEquals(0, meters.size()); - val manager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - BaseTestExecutable executable = new SucceedTestExecutable(); - executable.setParam("test1", "test1"); - executable.setProject(project); - executable.setJobType(JobTypeEnum.INDEX_BUILD); - manager.addJob(executable); + ExecutablePO mockExecutablePO = Mockito.mock(ExecutablePO.class); + ExecutableOutputPO mockExecutableOutputPO = Mockito.mock(ExecutableOutputPO.class); + Mockito.when(mockExecutablePO.getOutput()).thenReturn(mockExecutableOutputPO); + Mockito.when(mockExecutableOutputPO.getStatus()).thenReturn(ExecutableState.READY.name()); + Mockito.when(manager.getAllJobs()).thenReturn(Lists.newArrayList(mockExecutablePO)); var result = MetricsController.getDefaultMetricRegistry() .getGauges(MetricFilter.contains(MetricsName.JOB_RUNNING_GAUGE.getVal())); diff --git a/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java b/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java index de7f235048..926070d66a 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java @@ -36,6 +36,8 @@ public enum MetricsTag { RUNNING("running"), // JOB_CATEGORY("category"), // HOST("host"), // + TIMEOUT("timeout"), // + WAITING("waiting"), // HIT_SECOND_STORAGE("hit_second_storage"); private final String value; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java b/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java index 550edbe5ed..52c1e3d892 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java @@ -42,6 +42,7 @@ public enum PrometheusMetrics { JOB_COUNTS("ke_job_counts", Type.PROJECT_METRIC), // JOB_MINUTES("ke_job_minutes", Type.PROJECT_METRIC), // + JOB_LONG_RUNNING("ke_long_running_jobs", Type.PROJECT_METRIC), // MODEL_BUILD_DURATION("ke_model_build_minutes", Type.PROJECT_METRIC | Type.MODEL_METRIC);