This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5f05fd28841ad4f0f7b2d18de329530d6b5edf3e
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jul 9 22:17:54 2020 -0700

    Use CGroup CPU usage when present (#7475)
    
    * Use CGroup CPU usage when present
    
    * Also read cpu limits from cgroups
    
    * Fixed test
    
    * Fixed string trimming
    
    * Addressed comments
    
    (cherry picked from commit 8bc4880044cd80bc2975dcf18ef21e8018595f3d)
---
 .../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 116 +++++++++++++--------
 1 file changed, 75 insertions(+), 41 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 2d67f10..992b743 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import com.google.common.base.Charsets;
 import com.sun.management.OperatingSystemMXBean;
+
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.file.Files;
@@ -33,28 +35,33 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * Class that will return the broker host usage.
  */
+@Slf4j
 public class LinuxBrokerHostUsageImpl implements BrokerHostUsage {
     private long lastCollection;
     private double lastTotalNicUsageTx;
     private double lastTotalNicUsageRx;
-    private CpuStat lastCpuStat;
+    private double lastCpuUsage;
+    private double lastCpuTotalTime;
     private OperatingSystemMXBean systemBean;
     private SystemResourceUsage usage;
 
     private final Optional<Double> overrideBrokerNicSpeedGbps;
+    private final boolean isCGroupsEnabled;
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(LinuxBrokerHostUsageImpl.class);
+    private static final String CGROUPS_CPU_USAGE_PATH = 
"/sys/fs/cgroup/cpu/cpuacct.usage";
+    private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH = 
"/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
+    private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH = 
"/sys/fs/cgroup/cpu/cpu.cfs_period_us";
 
     public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
         this(
@@ -73,6 +80,16 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
         this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
         executorService.scheduleAtFixedRate(this::calculateBrokerHostUsage, 0,
                 hostUsageCheckIntervalMin, TimeUnit.MINUTES);
+
+        boolean isCGroupsEnabled = false;
+        try {
+             isCGroupsEnabled = 
Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+        } catch (Exception e) {
+            log.warn("Failed to check cgroup CPU usage file: {}", 
e.getMessage());
+        }
+
+        this.isCGroupsEnabled = isCGroupsEnabled;
+        calculateBrokerHostUsage();
     }
 
     @Override
@@ -87,29 +104,20 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
         double totalNicUsageTx = getTotalNicUsageTxKb(nics);
         double totalNicUsageRx = getTotalNicUsageRxKb(nics);
         double totalCpuLimit = getTotalCpuLimit();
-        CpuStat cpuStat = getTotalCpuUsage();
 
         SystemResourceUsage usage = new SystemResourceUsage();
         long now = System.currentTimeMillis();
+        double elapsedSeconds = (now - lastCollection) / 1000d;
+        double cpuUsage = getTotalCpuUsage(elapsedSeconds);
 
         if (lastCollection == 0L) {
             usage.setMemory(getMemUsage());
             usage.setBandwidthIn(new ResourceUsage(0d, totalNicLimit));
             usage.setBandwidthOut(new ResourceUsage(0d, totalNicLimit));
-            usage.setCpu(new ResourceUsage(0d, totalCpuLimit));
         } else {
-            double elapsedSeconds = (now - lastCollection) / 1000d;
             double nicUsageTx = (totalNicUsageTx - lastTotalNicUsageTx) / 
elapsedSeconds;
             double nicUsageRx = (totalNicUsageRx - lastTotalNicUsageRx) / 
elapsedSeconds;
 
-            if (cpuStat != null && lastCpuStat != null) {
-                // we need two non null stats to get a usage report
-                long cpuTimeDiff = cpuStat.getTotalTime() - 
lastCpuStat.getTotalTime();
-                long cpuUsageDiff = cpuStat.getUsage() - 
lastCpuStat.getUsage();
-                double cpuUsage = ((double) cpuUsageDiff / (double) 
cpuTimeDiff) * totalCpuLimit;
-                usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
-            }
-
             usage.setMemory(getMemUsage());
             usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit));
             usage.setBandwidthOut(new ResourceUsage(nicUsageTx, 
totalNicLimit));
@@ -117,15 +125,37 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
 
         lastTotalNicUsageTx = totalNicUsageTx;
         lastTotalNicUsageRx = totalNicUsageRx;
-        lastCpuStat = cpuStat;
         lastCollection = System.currentTimeMillis();
         this.usage = usage;
+        usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
     }
 
     private double getTotalCpuLimit() {
+        if (isCGroupsEnabled) {
+            try {
+                long quota = readLongFromFile(CGROUPS_CPU_LIMIT_QUOTA_PATH);
+                long period = readLongFromFile(CGROUPS_CPU_LIMIT_PERIOD_PATH);
+                if (quota > 0) {
+                    return 100.0 * quota / period;
+                }
+            } catch (IOException e) {
+                log.warn("Failed to read CPU quotas from cgroups", e);
+                // Fallback to availableProcessors
+            }
+        }
+
+        // Fallback to JVM reported CPU quota
         return 100 * Runtime.getRuntime().availableProcessors();
     }
 
+    private double getTotalCpuUsage(double elapsedTimeSeconds) {
+        if (isCGroupsEnabled) {
+            return getTotalCpuUsageForCGroup(elapsedTimeSeconds);
+        } else {
+            return getTotalCpuUsageForEntireHost();
+        }
+    }
+
     /**
      * Reads first line of /proc/stat to get total cpu usage.
      *
@@ -137,18 +167,36 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
      * Line is split in "words", filtering the first. The sum of all numbers 
give the amount of cpu cycles used this
      * far. Real CPU usage should equal the sum substracting the idle cycles, 
this would include iowait, irq and steal.
      */
-    private CpuStat getTotalCpuUsage() {
+    private double getTotalCpuUsageForEntireHost() {
         try (Stream<String> stream = Files.lines(Paths.get("/proc/stat"))) {
             String[] words = stream.findFirst().get().split("\\s+");
 
             long total = Arrays.stream(words).filter(s -> 
!s.contains("cpu")).mapToLong(Long::parseLong).sum();
-
             long idle = Long.parseLong(words[4]);
+            long usage = total - idle;
+
+            double currentUsage = (usage - lastCpuUsage)  / (total - 
lastCpuTotalTime) * getTotalCpuLimit();
+
+            lastCpuUsage = usage;
+            lastCpuTotalTime = total;
 
-            return new CpuStat(total, total - idle);
+            return currentUsage;
         } catch (IOException e) {
-            LOG.error("Failed to read CPU usage from /proc/stat", e);
-            return null;
+            log.error("Failed to read CPU usage from /proc/stat", e);
+            return -1;
+        }
+    }
+
+    private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
+        try {
+            long usage = readLongFromFile(CGROUPS_CPU_USAGE_PATH);
+            double currentUsage = usage - lastCpuUsage;
+            lastCpuUsage = usage;
+
+            return 100 * currentUsage / elapsedTimeSeconds / 
TimeUnit.SECONDS.toNanos(1);
+        } catch (IOException e) {
+            log.error("Failed to read CPU usage from {}", 
CGROUPS_CPU_USAGE_PATH, e);
+            return -1;
         }
     }
 
@@ -163,7 +211,7 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
             return stream.filter(this::isPhysicalNic).map(path -> 
path.getFileName().toString())
                     .collect(Collectors.toList());
         } catch (IOException e) {
-            LOG.error("Failed to find NICs", e);
+            log.error("Failed to find NICs", e);
             return Collections.emptyList();
         }
     }
@@ -194,7 +242,7 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
                     try {
                         return Double.parseDouble(new 
String(Files.readAllBytes(getNicSpeedPath(s))));
                     } catch (IOException e) {
-                        LOG.error("Failed to read speed for nic " + s, e);
+                        log.error("Failed to read speed for nic " + s, e);
                         return 0d;
                     }
                 }).sum() * 1024);
@@ -213,7 +261,7 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
             try {
                 return Double.parseDouble(new 
String(Files.readAllBytes(getNicRxPath(s))));
             } catch (IOException e) {
-                LOG.error("Failed to read rx_bytes for NIC " + s, e);
+                log.error("Failed to read rx_bytes for NIC " + s, e);
                 return 0d;
             }
         }).sum() * 8 / 1024;
@@ -224,27 +272,13 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
             try {
                 return Double.parseDouble(new 
String(Files.readAllBytes(getNicTxPath(s))));
             } catch (IOException e) {
-                LOG.error("Failed to read tx_bytes for NIC " + s, e);
+                log.error("Failed to read tx_bytes for NIC " + s, e);
                 return 0d;
             }
         }).sum() * 8 / 1024;
     }
 
-    private class CpuStat {
-        private long totalTime;
-        private long usage;
-
-        CpuStat(long totalTime, long usage) {
-            this.totalTime = totalTime;
-            this.usage = usage;
-        }
-
-        long getTotalTime() {
-            return totalTime;
-        }
-
-        long getUsage() {
-            return usage;
-        }
+    private static long readLongFromFile(String path) throws IOException {
+        return Long.parseLong(new String(Files.readAllBytes(Paths.get(path)), 
Charsets.UTF_8).trim());
     }
 }

Reply via email to