This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new f001e58 [GOBBLIN-1018] Report GC counts and durations from Gobblin containers … f001e58 is described below commit f001e58fdaa6b9d2ca28317f6f16c11a847c348d Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Tue Jan 14 14:37:00 2020 -0800 [GOBBLIN-1018] Report GC counts and durations from Gobblin containers … Closes #2864 from sv2000/gcStats --- .../gobblin/cluster/ContainerHealthMetrics.java | 7 +- .../cluster/ContainerHealthMetricsService.java | 136 ++++++++++++++++----- .../cluster/ContainerHealthMetricsServiceTest.java | 16 ++- 3 files changed, 124 insertions(+), 35 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java index 2bf187c..137e1ae 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java @@ -30,5 +30,10 @@ public class ContainerHealthMetrics { public static final String NUM_AVAILABLE_PROCESSORS = CONTAINER_METRICS_PREFIX + "numAvailableProcessors"; public static final String TOTAL_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "totalPhysicalMemSize"; public static final String FREE_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "freePhysicalMemSize"; - + public static final String MINOR_GC_COUNT = CONTAINER_METRICS_PREFIX + "minorGcCount"; + public static final String MINOR_GC_DURATION = CONTAINER_METRICS_PREFIX + "minorGcDuration"; + public static final String MAJOR_GC_COUNT = CONTAINER_METRICS_PREFIX + "majorGcCount"; + public static final String MAJOR_GC_DURATION = CONTAINER_METRICS_PREFIX + "majorGcDuration"; + public static final String UNKNOWN_GC_COUNT = CONTAINER_METRICS_PREFIX + "unknownGcCount"; + public static final String UNKNOWN_GC_DURATION = CONTAINER_METRICS_PREFIX + "unknownGcDuration"; } diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java index b043857..d603bae 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java @@ -17,18 +17,22 @@ package org.apache.gobblin.cluster; +import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import com.google.common.util.concurrent.AbstractScheduledService; import com.google.common.util.concurrent.AtomicDouble; import com.sun.management.OperatingSystemMXBean; import com.typesafe.config.Config; +import lombok.Data; + import org.apache.gobblin.metrics.ContextAwareGauge; import org.apache.gobblin.metrics.RootMetricContext; import org.apache.gobblin.util.ConfigUtils; @@ -43,39 +47,73 @@ import org.apache.gobblin.util.ConfigUtils; * {@link com.google.common.util.concurrent.ServiceManager} that manages the lifecycle of * a {@link ContainerHealthMetricsService}. * </p> - * TODO: Add Garbage Collection metrics. */ public class ContainerHealthMetricsService extends AbstractScheduledService { //Container metrics service configurations private static final String CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS = "container.health.metrics.service.reportingIntervalSeconds"; private static final Long DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL = 30L; + private static final Set<String> YOUNG_GC_TYPES = new HashSet<>(3); + private static final Set<String> OLD_GC_TYPES = new HashSet<String>(3); + + static { + // young generation GC names + YOUNG_GC_TYPES.add("PS Scavenge"); + YOUNG_GC_TYPES.add("ParNew"); + YOUNG_GC_TYPES.add("G1 Young Generation"); + + // old generation GC names + OLD_GC_TYPES.add("PS MarkSweep"); + OLD_GC_TYPES.add("ConcurrentMarkSweep"); + OLD_GC_TYPES.add("G1 Old Generation"); + } private final long metricReportingInterval; private final OperatingSystemMXBean operatingSystemMXBean; private final MemoryMXBean memoryMXBean; + private final List<GarbageCollectorMXBean> garbageCollectorMXBeans; + //Heap stats AtomicDouble processCpuLoad = new AtomicDouble(0); AtomicDouble systemCpuLoad = new AtomicDouble(0); AtomicDouble systemLoadAvg = new AtomicDouble(0); - AtomicLong committedVmemSize = new AtomicLong(0); - AtomicLong processCpuTime = new AtomicLong(0); - AtomicLong freeSwapSpaceSize = new AtomicLong(0); - AtomicLong numAvailableProcessors = new AtomicLong(0); - AtomicLong totalPhysicalMemSize = new AtomicLong(0); - AtomicLong totalSwapSpaceSize = new AtomicLong(0); - AtomicLong freePhysicalMemSize = new AtomicLong(0); - AtomicLong processHeapUsedSize = new AtomicLong(0); + AtomicDouble committedVmemSize = new AtomicDouble(0); + AtomicDouble processCpuTime = new AtomicDouble(0); + AtomicDouble freeSwapSpaceSize = new AtomicDouble(0); + AtomicDouble numAvailableProcessors = new AtomicDouble(0); + AtomicDouble totalPhysicalMemSize = new AtomicDouble(0); + AtomicDouble totalSwapSpaceSize = new AtomicDouble(0); + AtomicDouble freePhysicalMemSize = new AtomicDouble(0); + AtomicDouble processHeapUsedSize = new AtomicDouble(0); + + //GC stats and counters + AtomicDouble minorGcCount = new AtomicDouble(0); + AtomicDouble majorGcCount = new AtomicDouble(0); + AtomicDouble unknownGcCount = new AtomicDouble(0); + AtomicDouble minorGcDuration = new AtomicDouble(0); + AtomicDouble majorGcDuration = new AtomicDouble(0); + AtomicDouble unknownGcDuration = new AtomicDouble(0); public ContainerHealthMetricsService(Config config) { this.metricReportingInterval = ConfigUtils.getLong(config, CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS, DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL); this.operatingSystemMXBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class); this.memoryMXBean = ManagementFactory.getMemoryMXBean(); + this.garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans(); //Build all the gauges and register them with the metrics registry. List<ContextAwareGauge<Double>> systemMetrics = buildGaugeList(); systemMetrics.forEach(metric -> RootMetricContext.get().register(metric)); } + @Data + public static class GcStats { + long minorCount; + double minorDuration; + long majorCount; + double majorDuration; + long unknownCount; + double unknownDuration; + } + /** * Run one iteration of the scheduled task. If any invocation of this method throws an exception, * the service will transition to the {@link com.google.common.util.concurrent.Service.State#FAILED} state and this method will no @@ -94,35 +132,69 @@ public class ContainerHealthMetricsService extends AbstractScheduledService { this.totalSwapSpaceSize.set(this.operatingSystemMXBean.getTotalSwapSpaceSize()); this.freePhysicalMemSize.set(this.operatingSystemMXBean.getFreePhysicalMemorySize()); this.processHeapUsedSize.set(this.memoryMXBean.getHeapMemoryUsage().getUsed()); + + GcStats gcStats = collectGcStats(); + //Since GC Beans report accumulated counts/durations, we need to subtract the previous values to obtain the counts/durations + // since the last measurement time. + this.minorGcCount.set(gcStats.getMinorCount() - this.minorGcCount.get()); + this.minorGcDuration.set(gcStats.getMinorDuration() - this.minorGcDuration.get()); + this.majorGcCount.set(gcStats.getMajorCount() - this.majorGcCount.get()); + this.majorGcDuration.set(gcStats.getMajorDuration() - this.majorGcDuration.get()); + this.unknownGcCount.set(gcStats.getUnknownCount() - this.unknownGcCount.get()); + this.unknownGcDuration.set(gcStats.getUnknownDuration() - this.unknownGcDuration.get()); } protected List<ContextAwareGauge<Double>> buildGaugeList() { List<ContextAwareGauge<Double>> gaugeList = new ArrayList<>(); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD, - () -> this.processCpuLoad.get())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD, - () -> this.systemCpuLoad.get())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG, - () -> this.systemLoadAvg.get())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE, - () -> Long.valueOf(this.committedVmemSize.get()).doubleValue())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_TIME, - () -> Long.valueOf(this.processCpuTime.get()).doubleValue())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE, - () -> Long.valueOf(this.freeSwapSpaceSize.get()).doubleValue())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS, - () -> Long.valueOf(this.numAvailableProcessors.get()).doubleValue())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE, - () -> Long.valueOf(this.totalPhysicalMemSize.get()).doubleValue())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE, - () -> Long.valueOf(this.totalSwapSpaceSize.get()).doubleValue())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE, - () -> Long.valueOf(this.freePhysicalMemSize.get()).doubleValue())); - gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE, - () -> Long.valueOf(this.processHeapUsedSize.get()).doubleValue())); + + gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD, this.processCpuLoad)); + gaugeList.add(createGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD, this.systemCpuLoad)); + gaugeList.add(createGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG, this.systemLoadAvg)); + gaugeList.add(createGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE, this.committedVmemSize)); + gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_CPU_TIME, this.processCpuTime)); + gaugeList.add(createGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE, this.freeSwapSpaceSize)); + gaugeList.add(createGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS, this.numAvailableProcessors)); + gaugeList.add(createGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE, this.totalPhysicalMemSize)); + gaugeList.add(createGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE, this.totalSwapSpaceSize)); + gaugeList.add(createGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE, this.freePhysicalMemSize)); + gaugeList.add(createGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE, this.processHeapUsedSize)); + gaugeList.add(createGauge(ContainerHealthMetrics.MINOR_GC_COUNT, this.minorGcCount)); + gaugeList.add(createGauge(ContainerHealthMetrics.MINOR_GC_DURATION, this.minorGcDuration)); + gaugeList.add(createGauge(ContainerHealthMetrics.MAJOR_GC_COUNT, this.majorGcCount)); + gaugeList.add(createGauge(ContainerHealthMetrics.MAJOR_GC_DURATION, this.majorGcDuration)); + gaugeList.add(createGauge(ContainerHealthMetrics.UNKNOWN_GC_COUNT, this.unknownGcCount)); + gaugeList.add(createGauge(ContainerHealthMetrics.UNKNOWN_GC_DURATION, this.unknownGcDuration)); return gaugeList; } + private ContextAwareGauge<Double> createGauge(String name, AtomicDouble metric) { + return RootMetricContext.get().newContextAwareGauge(name, () -> metric.get()); + } + + private GcStats collectGcStats() { + //Collect GC stats by iterating over all GC beans. + GcStats gcStats = new GcStats(); + + for (GarbageCollectorMXBean garbageCollectorMXBean: this.garbageCollectorMXBeans) { + long count = garbageCollectorMXBean.getCollectionCount(); + double duration = (double) garbageCollectorMXBean.getCollectionTime(); + if (count >= 0) { + if (YOUNG_GC_TYPES.contains(garbageCollectorMXBean.getName())) { + gcStats.setMinorCount(gcStats.getMinorCount() + count); + gcStats.setMinorDuration(gcStats.getMinorDuration() + duration); + } + else if (OLD_GC_TYPES.contains(garbageCollectorMXBean.getName())) { + gcStats.setMajorCount(gcStats.getMajorCount() + count); + gcStats.setMajorDuration(gcStats.getMajorDuration() + duration); + } else { + gcStats.setUnknownCount(gcStats.getUnknownCount() + count); + gcStats.setUnknownDuration(gcStats.getUnknownDuration() + duration); + } + } + } + return gcStats; + } + /** * Returns the {@link Scheduler} object used to configure this service. This method will only be * called once. diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java index ac67c01..66efe74 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java @@ -30,10 +30,22 @@ public class ContainerHealthMetricsServiceTest { Config config = ConfigFactory.empty(); ContainerHealthMetricsService service = new ContainerHealthMetricsService(config); service.runOneIteration(); - long processCpuTime1 = service.processCpuTime.get(); + Assert.assertTrue( service.minorGcCount.get() >= 0); + Assert.assertTrue( service.minorGcDuration.get() >= 0); + Assert.assertTrue( service.majorGcCount.get() >= 0); + Assert.assertTrue( service.minorGcDuration.get() >= 0); + Assert.assertTrue( service.unknownGcCount.get() >= 0); + Assert.assertTrue( service.unknownGcDuration.get() >= 0); + double processCpuTime1 = service.processCpuTime.get(); Thread.sleep(10); service.runOneIteration(); - long processCpuTime2 = service.processCpuTime.get(); + double processCpuTime2 = service.processCpuTime.get(); Assert.assertTrue( processCpuTime1 <= processCpuTime2); + Assert.assertTrue( service.minorGcCount.get() >= 0); + Assert.assertTrue( service.minorGcDuration.get() >= 0); + Assert.assertTrue( service.majorGcCount.get() >= 0); + Assert.assertTrue( service.minorGcDuration.get() >= 0); + Assert.assertTrue( service.unknownGcCount.get() >= 0); + Assert.assertTrue( service.unknownGcDuration.get() >= 0); } }