This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fcf5d29c201 [improve][broker] Support cgroup v2 by using
`jdk.internal.platform.Metrics` in Pulsar Loadbalancer (#16832)
fcf5d29c201 is described below
commit fcf5d29c2010570a7e12ee233d58dfbcfc6fea93
Author: Cong Zhao <[email protected]>
AuthorDate: Sat Apr 29 01:52:52 2023 +0800
[improve][broker] Support cgroup v2 by using
`jdk.internal.platform.Metrics` in Pulsar Loadbalancer (#16832)
(cherry picked from commit b8543ad979798d89da9f35a9375de7e16b7e5e25)
---
bin/pulsar | 2 +
buildtools/pom.xml | 1 +
pom.xml | 1 +
.../pulsar/broker/loadbalance/LinuxInfoUtils.java | 66 +++++++++++++++++++---
.../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 2 +-
.../broker/loadbalance/SimpleBrokerStartTest.java | 26 +++++++++
.../impl/LinuxBrokerHostUsageImplTest.java | 37 +++++++++++-
7 files changed, 122 insertions(+), 13 deletions(-)
diff --git a/bin/pulsar b/bin/pulsar
index 9d924bb296c..20ed1f7f22b 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -307,6 +307,8 @@ if [[ -z "$IS_JAVA_8" ]]; then
OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED"
# MBeanStatsGenerator
OPTS="$OPTS --add-opens
jdk.management/com.sun.management.internal=ALL-UNNAMED"
+ # LinuxInfoUtils
+ OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED"
fi
OPTS="-cp $PULSAR_CLASSPATH $OPTS"
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index cbc4dfee05e..1a02938fc53 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -57,6 +57,7 @@
<test.additional.args>
--add-opens java.base/jdk.internal.loader=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED <!--Mockito-->
+ --add-opens java.base/jdk.internal.platform=ALL-UNNAMED
<!--LinuxInfoUtils-->
</test.additional.args>
</properties>
diff --git a/pom.xml b/pom.xml
index c1e6454eb66..6806efce101 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@ flexible messaging model and an intuitive client
API.</description>
--add-opens java.base/sun.net=ALL-UNNAMED <!--netty.DnsResolverUtil-->
--add-opens java.management/sun.management=ALL-UNNAMED
<!--JvmDefaultGCMetricsLogger & MBeanStatsGenerator-->
--add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED
<!--MBeanStatsGenerator-->
+ --add-opens java.base/jdk.internal.platform=ALL-UNNAMED
<!--LinuxInfoUtils-->
</test.additional.args>
<testReuseFork>true</testReuseFork>
<testForkCount>4</testForkCount>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
index c9fce46a305..17aa7170fc6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -45,6 +47,7 @@ public class LinuxInfoUtils {
private static final String CGROUPS_CPU_USAGE_PATH =
"/sys/fs/cgroup/cpu/cpuacct.usage";
private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH =
"/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH =
"/sys/fs/cgroup/cpu/cpu.cfs_period_us";
+
// proc states
private static final String PROC_STAT_PATH = "/proc/stat";
private static final String NIC_PATH = "/sys/class/net/";
@@ -52,6 +55,30 @@ public class LinuxInfoUtils {
private static final int ARPHRD_ETHER = 1;
private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed";
+ private static Object /*jdk.internal.platform.Metrics*/ metrics;
+ private static Method getMetricsProviderMethod;
+ private static Method getCpuQuotaMethod;
+ private static Method getCpuPeriodMethod;
+ private static Method getCpuUsageMethod;
+
+ static {
+ try {
+ metrics =
Class.forName("jdk.internal.platform.Container").getMethod("metrics")
+ .invoke(null);
+ if (metrics != null) {
+ getMetricsProviderMethod =
metrics.getClass().getMethod("getProvider");
+ getMetricsProviderMethod.setAccessible(true);
+ getCpuQuotaMethod =
metrics.getClass().getMethod("getCpuQuota");
+ getCpuQuotaMethod.setAccessible(true);
+ getCpuPeriodMethod =
metrics.getClass().getMethod("getCpuPeriod");
+ getCpuPeriodMethod.setAccessible(true);
+ getCpuUsageMethod =
metrics.getClass().getMethod("getCpuUsage");
+ getCpuUsageMethod.setAccessible(true);
+ }
+ } catch (Throwable e) {
+ log.warn("Failed to get runtime metrics", e);
+ }
+ }
/**
* Determine whether the OS is the linux kernel.
@@ -66,9 +93,14 @@ public class LinuxInfoUtils {
*/
public static boolean isCGroupEnabled() {
try {
- return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+ if (metrics == null) {
+ return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+ }
+ String provider = (String)
getMetricsProviderMethod.invoke(metrics);
+ log.info("[LinuxInfo] The system metrics provider is: {}",
provider);
+ return provider.contains("cgroup");
} catch (Exception e) {
- log.warn("[LinuxInfo] Failed to check cgroup CPU usage file: {}",
e.getMessage());
+ log.warn("[LinuxInfo] Failed to check cgroup CPU: {}",
e.getMessage());
return false;
}
}
@@ -81,13 +113,21 @@ public class LinuxInfoUtils {
public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
if (isCGroupsEnabled) {
try {
- long quota =
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
- long period =
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
+ long quota;
+ long period;
+ if (metrics != null && getCpuQuotaMethod != null &&
getCpuPeriodMethod != null) {
+ quota = (long) getCpuQuotaMethod.invoke(metrics);
+ period = (long) getCpuPeriodMethod.invoke(metrics);
+ } else {
+ quota =
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
+ period =
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
+ }
+
if (quota > 0) {
return 100.0 * quota / period;
}
- } catch (IOException e) {
- log.warn("[LinuxInfo] Failed to read CPU quotas from cgroups",
e);
+ } catch (Exception e) {
+ log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup",
e);
// Fallback to availableProcessors
}
}
@@ -99,11 +139,14 @@ public class LinuxInfoUtils {
* Get CGroup cpu usage.
* @return Cpu usage
*/
- public static double getCpuUsageForCGroup() {
+ public static long getCpuUsageForCGroup() {
try {
+ if (metrics != null && getCpuUsageMethod != null) {
+ return (long) getCpuUsageMethod.invoke(metrics);
+ }
return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH));
- } catch (IOException e) {
- log.error("[LinuxInfo] Failed to read CPU usage from {}",
CGROUPS_CPU_USAGE_PATH, e);
+ } catch (Exception e) {
+ log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e);
return -1;
}
}
@@ -291,6 +334,11 @@ public class LinuxInfoUtils {
UP
}
+ @VisibleForTesting
+ public static Object getMetrics() {
+ return metrics;
+ }
+
@AllArgsConstructor
public enum NICUsageType {
// transport
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 8412658d86a..2f7ca614943 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -140,7 +140,7 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
}
private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
- double usage = getCpuUsageForCGroup();
+ double usage = (double) getCpuUsageForCGroup();
double currentUsage = usage - lastCpuUsage;
lastCpuUsage = usage;
return 100 * currentUsage / elapsedTimeSeconds /
TimeUnit.SECONDS.toNanos(1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
index 6de2eb90f12..28dde8b7f55 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.loadbalance;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -28,6 +30,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
@@ -96,4 +99,27 @@ public class SimpleBrokerStartTest {
}
+ @Test
+ public void testCGroupMetrics() {
+ if (!LinuxInfoUtils.isLinux()) {
+ return;
+ }
+
+ boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup"));
+ boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled();
+ Assert.assertEquals(cGroupEnabled, existsCGroup);
+
+ double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled);
+ log.info("totalCpuLimit: {}", totalCpuLimit);
+ Assert.assertTrue(totalCpuLimit > 0.0);
+
+ if (cGroupEnabled) {
+ Assert.assertNotNull(LinuxInfoUtils.getMetrics());
+
+ long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup();
+ log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup);
+ Assert.assertTrue(cpuUsageForCGroup > 0);
+ }
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
index 39298dce0b1..563f707c445 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
@@ -18,15 +18,19 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import lombok.Cleanup;
-import org.testng.Assert;
-import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+@Slf4j
public class LinuxBrokerHostUsageImplTest {
@Test
@@ -42,4 +46,31 @@ public class LinuxBrokerHostUsageImplTest {
double totalLimit =
linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
}
+
+ @Test
+ public void testCpuUsage() throws InterruptedException {
+ if (!LinuxInfoUtils.isLinux()) {
+ return;
+ }
+
+ @Cleanup("shutdown")
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+ LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
+ new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE,
Optional.empty(), executorService);
+
+ linuxBrokerHostUsage.calculateBrokerHostUsage();
+ TimeUnit.SECONDS.sleep(1);
+ linuxBrokerHostUsage.calculateBrokerHostUsage();
+
+ double usage =
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage;
+ double limit =
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit;
+ float percentUsage =
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage();
+
+ Assert.assertTrue(usage > 0);
+ Assert.assertTrue(limit > 0);
+ Assert.assertTrue(limit >= usage);
+ Assert.assertTrue(percentUsage > 0);
+
+ log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit,
percentUsage);
+ }
}