This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fcf5d29c201 [improve][broker] Support cgroup v2 by using 
`jdk.internal.platform.Metrics` in Pulsar Loadbalancer (#16832)
fcf5d29c201 is described below

commit fcf5d29c2010570a7e12ee233d58dfbcfc6fea93
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Apr 29 01:52:52 2023 +0800

    [improve][broker] Support cgroup v2 by using 
`jdk.internal.platform.Metrics` in Pulsar Loadbalancer (#16832)
    
    (cherry picked from commit b8543ad979798d89da9f35a9375de7e16b7e5e25)
---
 bin/pulsar                                         |  2 +
 buildtools/pom.xml                                 |  1 +
 pom.xml                                            |  1 +
 .../pulsar/broker/loadbalance/LinuxInfoUtils.java  | 66 +++++++++++++++++++---
 .../loadbalance/impl/LinuxBrokerHostUsageImpl.java |  2 +-
 .../broker/loadbalance/SimpleBrokerStartTest.java  | 26 +++++++++
 .../impl/LinuxBrokerHostUsageImplTest.java         | 37 +++++++++++-
 7 files changed, 122 insertions(+), 13 deletions(-)

diff --git a/bin/pulsar b/bin/pulsar
index 9d924bb296c..20ed1f7f22b 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -307,6 +307,8 @@ if [[ -z "$IS_JAVA_8" ]]; then
   OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED"
   # MBeanStatsGenerator
   OPTS="$OPTS --add-opens 
jdk.management/com.sun.management.internal=ALL-UNNAMED"
+  # LinuxInfoUtils
+  OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED"
 fi
 
 OPTS="-cp $PULSAR_CLASSPATH $OPTS"
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index cbc4dfee05e..1a02938fc53 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -57,6 +57,7 @@
     <test.additional.args>
       --add-opens java.base/jdk.internal.loader=ALL-UNNAMED
       --add-opens java.base/java.lang=ALL-UNNAMED <!--Mockito-->
+      --add-opens java.base/jdk.internal.platform=ALL-UNNAMED 
<!--LinuxInfoUtils-->
     </test.additional.args>
   </properties>
 
diff --git a/pom.xml b/pom.xml
index c1e6454eb66..6806efce101 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@ flexible messaging model and an intuitive client 
API.</description>
       --add-opens java.base/sun.net=ALL-UNNAMED <!--netty.DnsResolverUtil-->
       --add-opens java.management/sun.management=ALL-UNNAMED 
<!--JvmDefaultGCMetricsLogger & MBeanStatsGenerator-->
       --add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED 
<!--MBeanStatsGenerator-->
+      --add-opens java.base/jdk.internal.platform=ALL-UNNAMED 
<!--LinuxInfoUtils-->
     </test.additional.args>
     <testReuseFork>true</testReuseFork>
     <testForkCount>4</testForkCount>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
index c9fce46a305..17aa7170fc6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -45,6 +47,7 @@ public class LinuxInfoUtils {
     private static final String CGROUPS_CPU_USAGE_PATH = 
"/sys/fs/cgroup/cpu/cpuacct.usage";
     private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = 
"/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
     private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = 
"/sys/fs/cgroup/cpu/cpu.cfs_period_us";
+
     // proc states
     private static final String PROC_STAT_PATH = "/proc/stat";
     private static final String NIC_PATH = "/sys/class/net/";
@@ -52,6 +55,30 @@ public class LinuxInfoUtils {
     private static final int ARPHRD_ETHER = 1;
     private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed";
 
+    private static Object /*jdk.internal.platform.Metrics*/ metrics;
+    private static Method getMetricsProviderMethod;
+    private static Method getCpuQuotaMethod;
+    private static Method getCpuPeriodMethod;
+    private static Method getCpuUsageMethod;
+
+    static {
+        try {
+            metrics = 
Class.forName("jdk.internal.platform.Container").getMethod("metrics")
+                    .invoke(null);
+            if (metrics != null) {
+                getMetricsProviderMethod = 
metrics.getClass().getMethod("getProvider");
+                getMetricsProviderMethod.setAccessible(true);
+                getCpuQuotaMethod = 
metrics.getClass().getMethod("getCpuQuota");
+                getCpuQuotaMethod.setAccessible(true);
+                getCpuPeriodMethod = 
metrics.getClass().getMethod("getCpuPeriod");
+                getCpuPeriodMethod.setAccessible(true);
+                getCpuUsageMethod = 
metrics.getClass().getMethod("getCpuUsage");
+                getCpuUsageMethod.setAccessible(true);
+            }
+        } catch (Throwable e) {
+            log.warn("Failed to get runtime metrics", e);
+        }
+    }
 
     /**
      * Determine whether the OS is the linux kernel.
@@ -66,9 +93,14 @@ public class LinuxInfoUtils {
      */
     public static boolean isCGroupEnabled() {
         try {
-            return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+            if (metrics == null) {
+                return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+            }
+            String provider = (String) 
getMetricsProviderMethod.invoke(metrics);
+            log.info("[LinuxInfo] The system metrics provider is: {}", 
provider);
+            return provider.contains("cgroup");
         } catch (Exception e) {
-            log.warn("[LinuxInfo] Failed to check cgroup CPU usage file: {}", 
e.getMessage());
+            log.warn("[LinuxInfo] Failed to check cgroup CPU: {}", 
e.getMessage());
             return false;
         }
     }
@@ -81,13 +113,21 @@ public class LinuxInfoUtils {
     public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
         if (isCGroupsEnabled) {
             try {
-                long quota = 
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
-                long period = 
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
+                long quota;
+                long period;
+                if (metrics != null && getCpuQuotaMethod != null && 
getCpuPeriodMethod != null) {
+                    quota = (long) getCpuQuotaMethod.invoke(metrics);
+                    period = (long) getCpuPeriodMethod.invoke(metrics);
+                } else {
+                    quota = 
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
+                    period = 
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
+                }
+
                 if (quota > 0) {
                     return 100.0 * quota / period;
                 }
-            } catch (IOException e) {
-                log.warn("[LinuxInfo] Failed to read CPU quotas from cgroups", 
e);
+            } catch (Exception e) {
+                log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup", 
e);
                 // Fallback to availableProcessors
             }
         }
@@ -99,11 +139,14 @@ public class LinuxInfoUtils {
      * Get CGroup cpu usage.
      * @return Cpu usage
      */
-    public static double getCpuUsageForCGroup() {
+    public static long getCpuUsageForCGroup() {
         try {
+            if (metrics != null && getCpuUsageMethod != null) {
+                return (long) getCpuUsageMethod.invoke(metrics);
+            }
             return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH));
-        } catch (IOException e) {
-            log.error("[LinuxInfo] Failed to read CPU usage from {}", 
CGROUPS_CPU_USAGE_PATH, e);
+        } catch (Exception e) {
+            log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e);
             return -1;
         }
     }
@@ -291,6 +334,11 @@ public class LinuxInfoUtils {
         UP
     }
 
+    @VisibleForTesting
+    public static Object getMetrics() {
+        return metrics;
+    }
+
     @AllArgsConstructor
     public enum NICUsageType {
         // transport
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 8412658d86a..2f7ca614943 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -140,7 +140,7 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
     }
 
     private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
-        double usage = getCpuUsageForCGroup();
+        double usage = (double) getCpuUsageForCGroup();
         double currentUsage = usage - lastCpuUsage;
         lastCpuUsage = usage;
         return 100 * currentUsage / elapsedTimeSeconds / 
TimeUnit.SECONDS.toNanos(1);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index 6de2eb90f12..28dde8b7f55 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.loadbalance;
 
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.Optional;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +30,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -96,4 +99,27 @@ public class SimpleBrokerStartTest {
     }
 
 
+    @Test
+    public void testCGroupMetrics() {
+        if (!LinuxInfoUtils.isLinux()) {
+            return;
+        }
+
+        boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup"));
+        boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled();
+        Assert.assertEquals(cGroupEnabled, existsCGroup);
+
+        double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled);
+        log.info("totalCpuLimit: {}", totalCpuLimit);
+        Assert.assertTrue(totalCpuLimit > 0.0);
+
+        if (cGroupEnabled) {
+            Assert.assertNotNull(LinuxInfoUtils.getMetrics());
+
+            long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup();
+            log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup);
+            Assert.assertTrue(cpuUsageForCGroup > 0);
+        }
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
index 39298dce0b1..563f707c445 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
@@ -18,15 +18,19 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import lombok.Cleanup;
-import org.testng.Assert;
-import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
+@Slf4j
 public class LinuxBrokerHostUsageImplTest {
 
     @Test
@@ -42,4 +46,31 @@ public class LinuxBrokerHostUsageImplTest {
         double totalLimit = 
linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
         Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
     }
+
+    @Test
+    public void testCpuUsage() throws InterruptedException {
+        if (!LinuxInfoUtils.isLinux()) {
+            return;
+        }
+
+        @Cleanup("shutdown")
+        ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+        LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
+                new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, 
Optional.empty(), executorService);
+
+        linuxBrokerHostUsage.calculateBrokerHostUsage();
+        TimeUnit.SECONDS.sleep(1);
+        linuxBrokerHostUsage.calculateBrokerHostUsage();
+
+        double usage = 
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage;
+        double limit = 
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit;
+        float percentUsage = 
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage();
+
+        Assert.assertTrue(usage > 0);
+        Assert.assertTrue(limit > 0);
+        Assert.assertTrue(limit >= usage);
+        Assert.assertTrue(percentUsage > 0);
+
+        log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit, 
percentUsage);
+    }
 }

Reply via email to