This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new b6e7d03  STORM-3774 add V2 Cgroup metrics (#3398)
b6e7d03 is described below

commit b6e7d0355e0397b8acc566961ed31279338998e1
Author: agresch <[email protected]>
AuthorDate: Mon Jun 14 11:34:17 2021 -0500

    STORM-3774 add V2 Cgroup metrics (#3398)
---
 conf/defaults.yaml                                 | 11 +--
 docs/cgroups_in_storm.md                           | 43 +++++++++---
 .../storm/container/cgroup/core/CpuCore.java       | 18 ++---
 .../jvm/org/apache/storm/metric/SystemBolt.java    | 10 ++-
 .../org/apache/storm/metric/cgroup/CGroupCpu.java  |  1 +
 .../storm/metric/cgroup/CGroupCpuGuarantee.java    |  1 +
 .../cgroup/CGroupCpuGuaranteeByCfsQuota.java       |  1 +
 .../storm/metric/cgroup/CGroupMemoryLimit.java     |  1 +
 .../storm/metric/cgroup/CGroupMemoryUsage.java     |  1 +
 .../storm/metric/cgroup/CGroupMetricsBase.java     |  1 +
 .../WorkerMetricRegistrant.java}                   | 22 ++----
 .../apache/storm/metrics2/cgroup/CGroupCpu.java    | 80 +++++++++++++++++++++
 .../cgroup/CGroupCpuGuarantee.java                 | 42 ++++++-----
 .../cgroup/CGroupCpuGuaranteeByCfsQuota.java       | 62 ++++++++++++++++
 .../storm/metrics2/cgroup/CGroupCpuStat.java       | 82 ++++++++++++++++++++++
 .../cgroup/CGroupMemoryLimit.java                  | 29 ++++++--
 .../cgroup/CGroupMemoryUsage.java                  | 24 +++++--
 .../cgroup/CGroupMetricsBase.java                  | 35 ++-------
 18 files changed, 363 insertions(+), 101 deletions(-)

diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index e6d0ada..3e2a8f8 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -407,11 +407,12 @@ storm.worker.min.cpu.pcore.percent: 0.0
 
 storm.topology.classpath.beginning.enabled: false
 worker.metrics:
-    "CGroupMemory": "org.apache.storm.metric.cgroup.CGroupMemoryUsage"
-    "CGroupMemoryLimit": "org.apache.storm.metric.cgroup.CGroupMemoryLimit"
-    "CGroupCpu": "org.apache.storm.metric.cgroup.CGroupCpu"
-    "CGroupCpuGuarantee": "org.apache.storm.metric.cgroup.CGroupCpuGuarantee"
-    "CGroupCpuGuaranteeByCfsQuota": 
"org.apache.storm.metric.cgroup.CGroupCpuGuaranteeByCfsQuota"
+    "CGroupMemory": "org.apache.storm.metrics2.cgroup.CGroupMemoryUsage"
+    "CGroupMemoryLimit": "org.apache.storm.metrics2.cgroup.CGroupMemoryLimit"
+    "CGroupCpu": "org.apache.storm.metrics2.cgroup.CGroupCpu"
+    "CGroupCpuGuarantee": "org.apache.storm.metrics2.cgroup.CGroupCpuGuarantee"
+    "CGroupCpuGuaranteeByCfsQuota": 
"org.apache.storm.metrics2.cgroup.CGroupCpuGuaranteeByCfsQuota"
+    "CGroupCpuStat": "org.apache.storm.metrics2.cgroup.CGroupCpuStat"
 
 # The number of buckets for running statistics
 num.stat.buckets: 20
diff --git a/docs/cgroups_in_storm.md b/docs/cgroups_in_storm.md
index 146ba6b..621c962 100644
--- a/docs/cgroups_in_storm.md
+++ b/docs/cgroups_in_storm.md
@@ -8,7 +8,7 @@ documentation: true
 
 CGroups are used by Storm to limit the resource usage of workers to guarantee 
fairness and QOS.  
 
-**Please note: CGroups is currently supported only on Linux platforms (kernel 
version 2.6.24 and above)** 
+**Please note: CGroups are currently supported only on Linux platforms (kernel 
version 2.6.24 and above)** 
 
 ## Setup
 
@@ -86,30 +86,53 @@ CGroups not only can limit the amount of resources a worker 
has access to, but i
 
 ## CGroupCPU
 
-org.apache.storm.metric.cgroup.CGroupCPU reports back metrics similar to 
org.apache.storm.metrics.sigar.CPUMetric, except for everything within the 
CGroup.  It reports both user and system CPU usage in ms as a map
+org.apache.storm.metrics2.cgroup.CGroupCPU reports metrics similar to 
org.apache.storm.metrics.sigar.CPUMetric, but for everything within the CGroup. 
 It reports both user and system CPU usage in ms. 
 
 ```
-{
-   "user-ms": number
-   "sys-ms": number
-}
+   "CGroupCPU.user-ms": number
+   "CGroupCPU.sys-ms": number
 ```
 
 CGroup reports these as CLK_TCK counts, and not milliseconds so the accuracy 
is determined by what CLK_TCK is set to.  On most systems it is 100 times a 
second so at most the accuracy is 10 ms.
 
-To make this metric work cpuacct must be mounted.
+To make these metrics work cpuacct must be mounted.
 
 ## CGroupCpuGuarantee
 
-org.apache.storm.metric.cgroup.CGroupCpuGuarantee reports back an approximate 
number of ms of CPU time that this worker is guaranteed to get.  This is 
calculated from the resources requested by the tasks in that given worker.
+org.apache.storm.metrics2.cgroup.CGroupCpuGuarantee reports back an 
approximate number of ms of CPU time that this worker is guaranteed to get.  
This is calculated from the resources requested by the tasks in that given 
worker.
+
+## CGroupCpuGuaranteeByCfsQuota
+
+org.apache.storm.metrics2.cgroup.CGroupCpuGuaranteeByCfsQuota reports the 
percentage of the cpu guaranteed for the worker from cpu.cfs_period_us and 
cpu.cfs_quota_us.
+
+## CGroupCpuStat
+
+org.apache.storm.metrics2.cgroup.CGroupCpuStat reports the bandwidth 
statistics of the CGroup. It includes
+```
+   "CGroupCpuStat.nr.period-count": number
+   "CGroupCpuStat.nr.throttled-count": number
+   "CGroupCpuStat.nr.throttled-percentage": number
+   "CGroupCpuStat.throttled.time-ms": number
+```
+
+It is based on the following `cpu.stat`:
+  - `nr_periods`: Number of enforcement intervals that have elapsed.
+  - `nr_throttled`: Number of times the group has been throttled/limited.
+  - `throttled_time`: The total time duration (in nanoseconds) for which 
entities of the group have been throttled.
+
+And the reported metrics are
+  - `nr.period-count`: the difference of `nr_periods` between two consecutive 
reporting cycles
+  - `nr.throttled-count`: the difference of `nr_throttled` between two 
consecutive reporting cycles
+  - `nr.throttled-percentage`: (`nr.throttled-count` / `nr.period-count`)
+  - `throttled.time-ms`: the difference of `throttled_time` in milliseconds 
between two consecutive reporting cycles
 
 ## CGroupMemory
 
-org.apache.storm.metric.cgroup.CGroupMemoryUsage reports the current memory 
usage of all processes in the cgroup in bytes
+org.apache.storm.metrics2.cgroup.CGroupMemoryUsage reports the current memory 
usage of all processes in the cgroup in bytes
 
 ## CGroupMemoryLimit
 
-org.apache.storm.metric.cgroup.CGroupMemoryLimit report the current limit in 
bytes for all of the processes in the cgroup.  If running with CGroups enabled 
in storm this is the on-heap request + the off-heap request for all tasks 
within the worker + any extra slop space given to workers.
+org.apache.storm.metrics2.cgroup.CGroupMemoryLimit report the current limit in 
bytes for all of the processes in the cgroup.  If running with CGroups enabled 
in storm this is the on-heap request + the off-heap request for all tasks 
within the worker + any extra slop space given to workers.
 
 ## Usage/Debugging CGroups in your topology
 
diff --git 
a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java 
b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
index 0cafeee..0367c9d 100755
--- a/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
+++ b/storm-client/src/jvm/org/apache/storm/container/cgroup/core/CpuCore.java
@@ -82,23 +82,23 @@ public class CpuCore implements CgroupCore {
     }
 
     public static class Stat {
-        public final int nrPeriods;
-        public final int nrThrottled;
-        public final int throttledTime;
+        public final long nrPeriods;
+        public final long nrThrottled;
+        public final long throttledTime;
 
         public Stat(List<String> statStr) {
-            this.nrPeriods = Integer.parseInt(statStr.get(0).split(" ")[1]);
-            this.nrThrottled = Integer.parseInt(statStr.get(1).split(" ")[1]);
-            this.throttledTime = Integer.parseInt(statStr.get(2).split(" 
")[1]);
+            this.nrPeriods = Long.parseLong(statStr.get(0).split(" ")[1]);
+            this.nrThrottled = Long.parseLong(statStr.get(1).split(" ")[1]);
+            this.throttledTime = Long.parseLong(statStr.get(2).split(" ")[1]);
         }
 
         @Override
         public int hashCode() {
             final int prime = 31;
             int result = 1;
-            result = prime * result + nrPeriods;
-            result = prime * result + nrThrottled;
-            result = prime * result + throttledTime;
+            result = prime * result + Long.hashCode(nrPeriods);
+            result = prime * result + Long.hashCode(nrThrottled);
+            result = prime * result + Long.hashCode(throttledTime);
             return result;
         }
 
diff --git a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java 
b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
index 70e4696..e9483e7 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/SystemBolt.java
@@ -21,6 +21,7 @@ import java.lang.management.RuntimeMXBean;
 import java.util.Map;
 import org.apache.storm.Config;
 import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metrics2.WorkerMetricRegistrant;
 import org.apache.storm.task.IBolt;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -95,7 +96,14 @@ public class SystemBolt implements IBolt {
         }
         for (Map.Entry<String, String> metric : metrics.entrySet()) {
             try {
-                context.registerMetric(metric.getKey(), (IMetric) 
ReflectionUtils.newInstance(metric.getValue(), conf), bucketSize);
+                Object workerMetric = 
ReflectionUtils.newInstance(metric.getValue(), conf);
+                if (workerMetric instanceof IMetric) {
+                    context.registerMetric(metric.getKey(), (IMetric) 
workerMetric, bucketSize);
+                } else if (workerMetric instanceof WorkerMetricRegistrant) {
+                    ((WorkerMetricRegistrant) 
workerMetric).registerMetrics(context);
+                } else {
+                    throw new RuntimeException("Invalid worker metric " + 
workerMetric);
+                }
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
diff --git a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java 
b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
index eef6546..9789a6e 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpu.java
@@ -25,6 +25,7 @@ import 
org.apache.storm.container.cgroup.core.CpuacctCore.StatType;
 /**
  * Report CPU used in the cgroup.
  */
+@Deprecated
 public class CGroupCpu extends CGroupMetricsBase<Map<String, Long>> {
     long previousSystem = 0;
     long previousUser = 0;
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java 
b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
index d83048b..5fa1e02 100644
--- 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
+++ 
b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
@@ -23,6 +23,7 @@ import org.apache.storm.container.cgroup.core.CpuCore;
  * It gets the result from cpu.shares.
  * Use this when org.apache.storm.container.cgroup.CgroupManager is used as 
the storm.resource.isolation.plugin.
  */
+@Deprecated
 public class CGroupCpuGuarantee extends CGroupMetricsBase<Long> {
     long previousTime = -1;
 
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuaranteeByCfsQuota.java
 
b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuaranteeByCfsQuota.java
index 46f97b9..e986ce0 100644
--- 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuaranteeByCfsQuota.java
+++ 
b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuaranteeByCfsQuota.java
@@ -23,6 +23,7 @@ import org.apache.storm.container.cgroup.core.CpuCore;
  * It gets the result from cpu.cfs_period_us and cpu.cfs_quota_us.
  * Use this when org.apache.storm.container.docker.DockerManager is used as 
the storm.resource.isolation.plugin.
  */
+@Deprecated
 public class CGroupCpuGuaranteeByCfsQuota extends CGroupMetricsBase<Long> {
     long previousTime = 0;
 
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java 
b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
index d6d5750..e306a18 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
@@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Reports the current memory limit of the cgroup for this worker.
  */
+@Deprecated
 public class CGroupMemoryLimit extends CGroupMetricsBase<Long> {
     private static final Logger LOG = 
LoggerFactory.getLogger(CGroupMemoryLimit.class);
     private static final long BYTES_PER_MB = 1024 * 1024;
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java 
b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
index e30feae..99edccb 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
@@ -20,6 +20,7 @@ import org.apache.storm.container.cgroup.core.MemoryCore;
 /**
  * Reports the current memory usage of the cgroup for this worker.
  */
+@Deprecated
 public class CGroupMemoryUsage extends CGroupMetricsBase<Long> {
 
     public CGroupMemoryUsage(Map<String, Object> conf) {
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java 
b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
index ad4cc16..97090b7 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Base class for checking if CGroups are enabled, etc.
  */
+@Deprecated
 public abstract class CGroupMetricsBase<T> implements IMetric {
     private static final Logger LOG = 
LoggerFactory.getLogger(CGroupMetricsBase.class);
     private boolean enabled;
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/WorkerMetricRegistrant.java
similarity index 55%
copy from 
storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
copy to 
storm-client/src/jvm/org/apache/storm/metrics2/WorkerMetricRegistrant.java
index e30feae..ee0f3c5 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/WorkerMetricRegistrant.java
@@ -10,24 +10,10 @@
  * and limitations under the License.
  */
 
-package org.apache.storm.metric.cgroup;
+package org.apache.storm.metrics2;
 
-import java.util.Map;
-import org.apache.storm.container.cgroup.SubSystemType;
-import org.apache.storm.container.cgroup.core.CgroupCore;
-import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.task.TopologyContext;
 
-/**
- * Reports the current memory usage of the cgroup for this worker.
- */
-public class CGroupMemoryUsage extends CGroupMetricsBase<Long> {
-
-    public CGroupMemoryUsage(Map<String, Object> conf) {
-        super(conf, SubSystemType.memory);
-    }
-
-    @Override
-    public Long getDataFrom(CgroupCore core) throws Exception {
-        return ((MemoryCore) core).getPhysicalUsage();
-    }
+public interface WorkerMetricRegistrant {
+    void registerMetrics(TopologyContext topologyContext);
 }
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpu.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpu.java
new file mode 100644
index 0000000..f4c6958
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpu.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2.cgroup;
+
+import com.codahale.metrics.Gauge;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Map;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CpuacctCore;
+import org.apache.storm.metrics2.WorkerMetricRegistrant;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Report CPU used in the cgroup.
+ */
+public class CGroupCpu extends CGroupMetricsBase implements 
WorkerMetricRegistrant {
+    private int userHz = -1;
+
+    public CGroupCpu(Map<String, Object> conf) {
+        super(conf, SubSystemType.cpuacct);
+    }
+
+    @Override
+    public void registerMetrics(TopologyContext topologyContext) {
+        if (enabled) {
+            topologyContext.registerGauge("CGroupCpu.user-ms", new 
Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    CpuacctCore cpu = (CpuacctCore) core;
+                    try {
+                        Map<CpuacctCore.StatType, Long> stat = 
cpu.getCpuStat();
+                        long userHz = stat.get(CpuacctCore.StatType.user);
+                        long hz = getUserHz();
+                        return userHz * 1000 / hz;
+                    } catch (IOException e) {
+                        throw new RuntimeException("Failed to get metric 
value", e);
+                    }
+                }
+            });
+
+            topologyContext.registerGauge("CGroupCpu.sys-ms", new 
Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    CpuacctCore cpu = (CpuacctCore) core;
+                    try {
+                        Map<CpuacctCore.StatType, Long> stat = 
cpu.getCpuStat();
+                        long systemHz = stat.get(CpuacctCore.StatType.system);
+                        long hz = getUserHz();
+                        return systemHz * 1000 / hz;
+                    } catch (IOException e) {
+                        throw new RuntimeException("Failed to get metric 
value", e);
+                    }
+                }
+            });
+        }
+    }
+
+    private synchronized int getUserHz() throws IOException {
+        if (userHz < 0) {
+            ProcessBuilder pb = new ProcessBuilder("getconf", "CLK_TCK");
+            Process p = pb.start();
+            BufferedReader in = new BufferedReader(new 
InputStreamReader(p.getInputStream()));
+            String line = in.readLine().trim();
+            userHz = Integer.valueOf(line);
+        }
+        return userHz;
+    }
+}
\ No newline at end of file
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuGuarantee.java
similarity index 54%
copy from 
storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
copy to 
storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuGuarantee.java
index d83048b..d6a2226 100644
--- 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuGuarantee.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
  * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
@@ -10,39 +10,45 @@
  * and limitations under the License.
  */
 
-package org.apache.storm.metric.cgroup;
+package org.apache.storm.metrics2.cgroup;
 
+import com.codahale.metrics.Gauge;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.storm.container.cgroup.SubSystemType;
-import org.apache.storm.container.cgroup.core.CgroupCore;
 import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.metrics2.WorkerMetricRegistrant;
+import org.apache.storm.task.TopologyContext;
 
 /**
- * Report the guaranteed number of ms this worker has requested.
+ * Report the guaranteed number of cpu percentage this worker has requested.
  * It gets the result from cpu.shares.
  * Use this when org.apache.storm.container.cgroup.CgroupManager is used as 
the storm.resource.isolation.plugin.
  */
-public class CGroupCpuGuarantee extends CGroupMetricsBase<Long> {
-    long previousTime = -1;
+public class CGroupCpuGuarantee extends CGroupMetricsBase implements 
WorkerMetricRegistrant {
+    private long shares = -1L;
 
     public CGroupCpuGuarantee(Map<String, Object> conf) {
         super(conf, SubSystemType.cpu);
     }
 
     @Override
-    public Long getDataFrom(CgroupCore core) throws IOException {
-        CpuCore cpu = (CpuCore) core;
-        Long msGuarantee = null;
-        long now = System.currentTimeMillis();
-        if (previousTime > 0) {
-            long shares = cpu.getCpuShares();
-            //By convention each share corresponds to 1% of a CPU core
-            // or 100 = 1 core full time. So the guaranteed number of ms
-            // (approximately) should be ...
-            msGuarantee = (shares * (now - previousTime)) / 100;
+    public void registerMetrics(TopologyContext topologyContext) {
+        if (enabled) {
+            topologyContext.registerGauge("CGroupCpuGuarantee", new 
Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    if (shares < 0) {
+                        CpuCore cpu = (CpuCore) core;
+                        try {
+                            shares = cpu.getCpuShares();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return shares;
+                }
+            });
         }
-        previousTime = now;
-        return msGuarantee;
     }
 }
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuGuaranteeByCfsQuota.java
 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuGuaranteeByCfsQuota.java
new file mode 100644
index 0000000..d68bac4
--- /dev/null
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuGuaranteeByCfsQuota.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2.cgroup;
+
+import com.codahale.metrics.Gauge;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.metrics2.WorkerMetricRegistrant;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Report the percentage of the cpu guaranteed for the worker.
+ * It gets the result from cpu.cfs_period_us and cpu.cfs_quota_us.
+ * Use this when org.apache.storm.container.docker.DockerManager or 
org.apache.storm.container.oci.RuncLibContainerManager
+ * is used as the storm.resource.isolation.plugin.
+ */
+public class CGroupCpuGuaranteeByCfsQuota extends CGroupMetricsBase implements 
WorkerMetricRegistrant {
+    long guarantee = -1;
+
+    public CGroupCpuGuaranteeByCfsQuota(Map<String, Object> conf) {
+        super(conf, SubSystemType.cpu);
+    }
+
+    @Override
+    public void registerMetrics(TopologyContext topologyContext) {
+        if (enabled) {
+            topologyContext.registerGauge("CGroupCpuGuaranteeByCfsQuota", new 
Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    if (guarantee < 0) {
+                        CpuCore cpu = (CpuCore) core;
+                        try {
+                            long cpuCfsQuotaUs = cpu.getCpuCfsQuotaUs();
+                            if (cpuCfsQuotaUs == -1) {
+                                //cpu.cfs_quota_us = -1 indicates that the 
cgroup does not adhere to any CPU time restrictions.
+                                guarantee = -1L;
+                            } else {
+                                long cpuCfsPeriodUs = cpu.getCpuCfsPeriodUs();
+                                guarantee = cpuCfsQuotaUs * 100 / 
cpuCfsPeriodUs;
+                            }
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    return guarantee;
+                }
+            });
+        }
+    }
+}
diff --git 
a/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuStat.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuStat.java
new file mode 100644
index 0000000..a26e3e0
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupCpuStat.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2.cgroup;
+
+import com.codahale.metrics.Gauge;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CpuCore;
+import org.apache.storm.metrics2.WorkerMetricRegistrant;
+import org.apache.storm.task.TopologyContext;
+
+public class CGroupCpuStat extends CGroupMetricsBase implements 
WorkerMetricRegistrant {
+
+    public CGroupCpuStat(Map<String, Object> conf) {
+        super(conf, SubSystemType.cpu);
+    }
+
+    @Override
+    public void registerMetrics(TopologyContext topologyContext) {
+        if (enabled) {
+            topologyContext.registerGauge("CGroupCpuStat.nr.period-count", new 
Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    try {
+                        CpuCore.Stat stat = ((CpuCore) core).getCpuStat();
+                        return stat.nrPeriods;
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+
+            topologyContext.registerGauge("CGroupCpuStat.nr.throttled-count", 
new Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    try {
+                        CpuCore.Stat stat = ((CpuCore) core).getCpuStat();
+                        return stat.nrThrottled;
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+
+            
topologyContext.registerGauge("CGroupCpuStat.nr.throttled-percentage", new 
Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    try {
+                        CpuCore.Stat stat = ((CpuCore) core).getCpuStat();
+                        return (long) (stat.nrThrottled * 100.0 / 
stat.nrPeriods);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+
+            topologyContext.registerGauge("CGroupCpuStat.throttled.time-ms", 
new Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    try {
+                        CpuCore.Stat stat = ((CpuCore) core).getCpuStat();
+                        return Math.round(stat.throttledTime / 1000_000.0);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+        }
+    }
+}
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMemoryLimit.java
similarity index 65%
copy from 
storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
copy to 
storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMemoryLimit.java
index d6d5750..fa6d203 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryLimit.java
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMemoryLimit.java
@@ -10,19 +10,22 @@
  * and limitations under the License.
  */
 
-package org.apache.storm.metric.cgroup;
+package org.apache.storm.metrics2.cgroup;
 
+import com.codahale.metrics.Gauge;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.storm.container.cgroup.SubSystemType;
-import org.apache.storm.container.cgroup.core.CgroupCore;
 import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.metrics2.WorkerMetricRegistrant;
+import org.apache.storm.task.TopologyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Reports the current memory limit of the cgroup for this worker.
  */
-public class CGroupMemoryLimit extends CGroupMetricsBase<Long> {
+public class CGroupMemoryLimit extends CGroupMetricsBase implements 
WorkerMetricRegistrant {
     private static final Logger LOG = 
LoggerFactory.getLogger(CGroupMemoryLimit.class);
     private static final long BYTES_PER_MB = 1024 * 1024;
     private final long workerLimitBytes;
@@ -40,10 +43,22 @@ public class CGroupMemoryLimit extends 
CGroupMetricsBase<Long> {
     }
 
     @Override
-    public Long getDataFrom(CgroupCore core) throws Exception {
-        if (workerLimitBytes > 0) {
-            return workerLimitBytes;
+    public void registerMetrics(TopologyContext topologyContext) {
+        if (enabled) {
+            topologyContext.registerGauge("CGroupMemoryLimit", new 
Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    if (workerLimitBytes > 0) {
+                        return workerLimitBytes;
+                    }
+                    try {
+                        return ((MemoryCore) core).getPhysicalUsageLimit();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
         }
-        return ((MemoryCore) core).getPhysicalUsageLimit();
     }
+
 }
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMemoryUsage.java
similarity index 58%
copy from 
storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
copy to 
storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMemoryUsage.java
index e30feae..32dade1 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMemoryUsage.java
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMemoryUsage.java
@@ -10,24 +10,38 @@
  * and limitations under the License.
  */
 
-package org.apache.storm.metric.cgroup;
+package org.apache.storm.metrics2.cgroup;
 
+import com.codahale.metrics.Gauge;
+import java.io.IOException;
 import java.util.Map;
 import org.apache.storm.container.cgroup.SubSystemType;
-import org.apache.storm.container.cgroup.core.CgroupCore;
 import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.metrics2.WorkerMetricRegistrant;
+import org.apache.storm.task.TopologyContext;
 
 /**
  * Reports the current memory usage of the cgroup for this worker.
  */
-public class CGroupMemoryUsage extends CGroupMetricsBase<Long> {
+public class CGroupMemoryUsage extends CGroupMetricsBase implements 
WorkerMetricRegistrant {
 
     public CGroupMemoryUsage(Map<String, Object> conf) {
         super(conf, SubSystemType.memory);
     }
 
     @Override
-    public Long getDataFrom(CgroupCore core) throws Exception {
-        return ((MemoryCore) core).getPhysicalUsage();
+    public void registerMetrics(TopologyContext topologyContext) {
+        if (enabled) {
+            topologyContext.registerGauge("CGroupMemoryUsage", new 
Gauge<Long>() {
+                @Override
+                public Long getValue() {
+                    try {
+                        return ((MemoryCore) core).getPhysicalUsage();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+        }
     }
 }
diff --git 
a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMetricsBase.java
similarity index 83%
copy from 
storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
copy to 
storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMetricsBase.java
index ad4cc16..50f6a7b 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
+++ 
b/storm-client/src/jvm/org/apache/storm/metrics2/cgroup/CGroupMetricsBase.java
@@ -10,11 +10,10 @@
  * and limitations under the License.
  */
 
-package org.apache.storm.metric.cgroup;
+package org.apache.storm.metrics2.cgroup;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.util.Map;
 import org.apache.storm.Config;
@@ -22,7 +21,6 @@ import org.apache.storm.container.cgroup.CgroupCenter;
 import org.apache.storm.container.cgroup.CgroupCoreFactory;
 import org.apache.storm.container.cgroup.SubSystemType;
 import org.apache.storm.container.cgroup.core.CgroupCore;
-import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,10 +28,10 @@ import org.slf4j.LoggerFactory;
 /**
  * Base class for checking if CGroups are enabled, etc.
  */
-public abstract class CGroupMetricsBase<T> implements IMetric {
+public abstract class CGroupMetricsBase {
     private static final Logger LOG = 
LoggerFactory.getLogger(CGroupMetricsBase.class);
-    private boolean enabled;
-    private CgroupCore core = null;
+    protected boolean enabled;
+    protected CgroupCore core = null;
 
     public CGroupMetricsBase(Map<String, Object> conf, SubSystemType type) {
         final String simpleName = getClass().getSimpleName();
@@ -84,7 +82,7 @@ public abstract class CGroupMetricsBase<T> implements IMetric 
{
         String hierarchyDir = (String) 
conf.get(Config.STORM_CGROUP_HIERARCHY_DIR);
         if (StringUtils.isEmpty(hierarchyDir) || !new File(hierarchyDir, 
cgroupPath).exists()) {
             LOG.info("{} is not set or does not exist. checking {}", 
Config.STORM_CGROUP_HIERARCHY_DIR,
-                Config.STORM_OCI_CGROUP_ROOT);
+                    Config.STORM_OCI_CGROUP_ROOT);
 
             String ociCgroupRoot = (String) 
conf.get(Config.STORM_OCI_CGROUP_ROOT);
             hierarchyDir = ociCgroupRoot + File.separator + type;
@@ -96,26 +94,7 @@ public abstract class CGroupMetricsBase<T> implements 
IMetric {
         }
 
         core = CgroupCoreFactory.getInstance(type, new File(hierarchyDir, 
cgroupPath).getAbsolutePath());
-
         enabled = true;
-        LOG.info("{} is ENABLED {} exists...", simpleName, hierarchyDir);
+        LOG.info("Metric {} is ENABLED and directory {} exists...", 
simpleName, hierarchyDir);
     }
-
-    @Override
-    public Object getValueAndReset() {
-        if (!enabled) {
-            return null;
-        }
-        try {
-            return getDataFrom(core);
-        } catch (FileNotFoundException e) {
-            LOG.warn("Exception trying to read a file {}", e);
-            //Something happened and we couldn't find the file, so ignore it 
for now.
-            return null;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public abstract T getDataFrom(CgroupCore core) throws Exception;
-}
\ No newline at end of file
+}

Reply via email to