This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5f05fd28841ad4f0f7b2d18de329530d6b5edf3e Author: Matteo Merli <[email protected]> AuthorDate: Thu Jul 9 22:17:54 2020 -0700 Use CGroup CPU usage when present (#7475) * Use CGroup CPU usage when present * Also read cpu limits from cgroups * Fixed test * Fixed string trimming * Addressed comments (cherry picked from commit 8bc4880044cd80bc2975dcf18ef21e8018595f3d) --- .../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 116 +++++++++++++-------- 1 file changed, 75 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java index 2d67f10..992b743 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.loadbalance.impl; +import com.google.common.base.Charsets; import com.sun.management.OperatingSystemMXBean; + import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.file.Files; @@ -33,28 +35,33 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; + import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.loadbalance.BrokerHostUsage; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Class that will return the broker host usage. */ +@Slf4j public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { private long lastCollection; private double lastTotalNicUsageTx; private double lastTotalNicUsageRx; - private CpuStat lastCpuStat; + private double lastCpuUsage; + private double lastCpuTotalTime; private OperatingSystemMXBean systemBean; private SystemResourceUsage usage; private final Optional<Double> overrideBrokerNicSpeedGbps; + private final boolean isCGroupsEnabled; - private static final Logger LOG = LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class); + private static final String CGROUPS_CPU_USAGE_PATH = "/sys/fs/cgroup/cpu/cpuacct.usage"; + private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; + private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; public LinuxBrokerHostUsageImpl(PulsarService pulsar) { this( @@ -73,6 +80,16 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps; executorService.scheduleAtFixedRate(this::calculateBrokerHostUsage, 0, hostUsageCheckIntervalMin, TimeUnit.MINUTES); + + boolean isCGroupsEnabled = false; + try { + isCGroupsEnabled = Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH)); + } catch (Exception e) { + log.warn("Failed to check cgroup CPU usage file: {}", e.getMessage()); + } + + this.isCGroupsEnabled = isCGroupsEnabled; + calculateBrokerHostUsage(); } @Override @@ -87,29 +104,20 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { double totalNicUsageTx = getTotalNicUsageTxKb(nics); double totalNicUsageRx = getTotalNicUsageRxKb(nics); double totalCpuLimit = getTotalCpuLimit(); - CpuStat cpuStat = getTotalCpuUsage(); SystemResourceUsage usage = new SystemResourceUsage(); long now = System.currentTimeMillis(); + double elapsedSeconds = (now - lastCollection) / 1000d; + double cpuUsage = getTotalCpuUsage(elapsedSeconds); if (lastCollection == 0L) { usage.setMemory(getMemUsage()); usage.setBandwidthIn(new ResourceUsage(0d, totalNicLimit)); usage.setBandwidthOut(new ResourceUsage(0d, totalNicLimit)); - usage.setCpu(new ResourceUsage(0d, totalCpuLimit)); } else { - double elapsedSeconds = (now - lastCollection) / 1000d; double nicUsageTx = (totalNicUsageTx - lastTotalNicUsageTx) / elapsedSeconds; double nicUsageRx = (totalNicUsageRx - lastTotalNicUsageRx) / elapsedSeconds; - if (cpuStat != null && lastCpuStat != null) { - // we need two non null stats to get a usage report - long cpuTimeDiff = cpuStat.getTotalTime() - lastCpuStat.getTotalTime(); - long cpuUsageDiff = cpuStat.getUsage() - lastCpuStat.getUsage(); - double cpuUsage = ((double) cpuUsageDiff / (double) cpuTimeDiff) * totalCpuLimit; - usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit)); - } - usage.setMemory(getMemUsage()); usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit)); usage.setBandwidthOut(new ResourceUsage(nicUsageTx, totalNicLimit)); @@ -117,15 +125,37 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { lastTotalNicUsageTx = totalNicUsageTx; lastTotalNicUsageRx = totalNicUsageRx; - lastCpuStat = cpuStat; lastCollection = System.currentTimeMillis(); this.usage = usage; + usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit)); } private double getTotalCpuLimit() { + if (isCGroupsEnabled) { + try { + long quota = readLongFromFile(CGROUPS_CPU_LIMIT_QUOTA_PATH); + long period = readLongFromFile(CGROUPS_CPU_LIMIT_PERIOD_PATH); + if (quota > 0) { + return 100.0 * quota / period; + } + } catch (IOException e) { + log.warn("Failed to read CPU quotas from cgroups", e); + // Fallback to availableProcessors + } + } + + // Fallback to JVM reported CPU quota return 100 * Runtime.getRuntime().availableProcessors(); } + private double getTotalCpuUsage(double elapsedTimeSeconds) { + if (isCGroupsEnabled) { + return getTotalCpuUsageForCGroup(elapsedTimeSeconds); + } else { + return getTotalCpuUsageForEntireHost(); + } + } + /** * Reads first line of /proc/stat to get total cpu usage. * @@ -137,18 +167,36 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { * Line is split in "words", filtering the first. The sum of all numbers give the amount of cpu cycles used this * far. Real CPU usage should equal the sum substracting the idle cycles, this would include iowait, irq and steal. */ - private CpuStat getTotalCpuUsage() { + private double getTotalCpuUsageForEntireHost() { try (Stream<String> stream = Files.lines(Paths.get("/proc/stat"))) { String[] words = stream.findFirst().get().split("\\s+"); long total = Arrays.stream(words).filter(s -> !s.contains("cpu")).mapToLong(Long::parseLong).sum(); - long idle = Long.parseLong(words[4]); + long usage = total - idle; + + double currentUsage = (usage - lastCpuUsage) / (total - lastCpuTotalTime) * getTotalCpuLimit(); + + lastCpuUsage = usage; + lastCpuTotalTime = total; - return new CpuStat(total, total - idle); + return currentUsage; } catch (IOException e) { - LOG.error("Failed to read CPU usage from /proc/stat", e); - return null; + log.error("Failed to read CPU usage from /proc/stat", e); + return -1; + } + } + + private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) { + try { + long usage = readLongFromFile(CGROUPS_CPU_USAGE_PATH); + double currentUsage = usage - lastCpuUsage; + lastCpuUsage = usage; + + return 100 * currentUsage / elapsedTimeSeconds / TimeUnit.SECONDS.toNanos(1); + } catch (IOException e) { + log.error("Failed to read CPU usage from {}", CGROUPS_CPU_USAGE_PATH, e); + return -1; } } @@ -163,7 +211,7 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { return stream.filter(this::isPhysicalNic).map(path -> path.getFileName().toString()) .collect(Collectors.toList()); } catch (IOException e) { - LOG.error("Failed to find NICs", e); + log.error("Failed to find NICs", e); return Collections.emptyList(); } } @@ -194,7 +242,7 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { try { return Double.parseDouble(new String(Files.readAllBytes(getNicSpeedPath(s)))); } catch (IOException e) { - LOG.error("Failed to read speed for nic " + s, e); + log.error("Failed to read speed for nic " + s, e); return 0d; } }).sum() * 1024); @@ -213,7 +261,7 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { try { return Double.parseDouble(new String(Files.readAllBytes(getNicRxPath(s)))); } catch (IOException e) { - LOG.error("Failed to read rx_bytes for NIC " + s, e); + log.error("Failed to read rx_bytes for NIC " + s, e); return 0d; } }).sum() * 8 / 1024; @@ -224,27 +272,13 @@ public class LinuxBrokerHostUsageImpl implements BrokerHostUsage { try { return Double.parseDouble(new String(Files.readAllBytes(getNicTxPath(s)))); } catch (IOException e) { - LOG.error("Failed to read tx_bytes for NIC " + s, e); + log.error("Failed to read tx_bytes for NIC " + s, e); return 0d; } }).sum() * 8 / 1024; } - private class CpuStat { - private long totalTime; - private long usage; - - CpuStat(long totalTime, long usage) { - this.totalTime = totalTime; - this.usage = usage; - } - - long getTotalTime() { - return totalTime; - } - - long getUsage() { - return usage; - } + private static long readLongFromFile(String path) throws IOException { + return Long.parseLong(new String(Files.readAllBytes(Paths.get(path)), Charsets.UTF_8).trim()); } }
