This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a4c980d146 [improve][broker] Introduce `BitRateUnit` for calculate 
bitrate (#15435)
0a4c980d146 is described below

commit 0a4c980d1469e8a23d008b726fce5aa5f011c47b
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu May 5 11:28:47 2022 +0800

    [improve][broker] Introduce `BitRateUnit` for calculate bitrate (#15435)
---
 .../java/org/apache/pulsar/broker/BitRateUnit.java | 175 +++++++++++++++++++++
 .../org/apache/pulsar/broker/BitRateUnitTest.java  | 110 +++++++++++++
 .../pulsar/broker/loadbalance/LinuxInfoUtils.java  |  41 ++---
 .../loadbalance/impl/LinuxBrokerHostUsageImpl.java |  11 +-
 .../broker/loadbalance/LinuxInfoUtilsTest.java     |  31 ----
 5 files changed, 300 insertions(+), 68 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java
new file mode 100644
index 00000000000..7e0d0dcfda1
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+public enum BitRateUnit {
+
+    Bit {
+        public double toBit(double bitRate) {
+            return bitRate;
+        }
+
+        public double toKilobit(double bitRate) {
+            return bitRate / C0;
+        }
+
+        public double toMegabit(double bitRate) {
+            return bitRate / Math.pow(C0, 2);
+        }
+
+        public double toGigabit(double bitRate) {
+            return bitRate / Math.pow(C0, 3);
+        }
+
+        public double toByte(double bitRate) {
+            return bitRate / C1;
+        }
+
+        public double convert(double bitRate, BitRateUnit bitRateUnit) {
+            return bitRateUnit.toBit(bitRate);
+        }
+    },
+    Kilobit {
+        public double toBit(double bitRate) {
+            return bitRate * C0;
+        }
+
+        public double toKilobit(double bitRate) {
+            return bitRate;
+        }
+
+        public double toMegabit(double bitRate) {
+            return bitRate / C0;
+        }
+
+        public double toGigabit(double bitRate) {
+            return bitRate / Math.pow(C0, 2);
+        }
+
+        public double toByte(double bitRate) {
+            return bitRate * C0 / C1;
+        }
+
+        public double convert(double bitRate, BitRateUnit bitRateUnit) {
+            return bitRateUnit.toKilobit(bitRate);
+        }
+    },
+    Megabit {
+        public double toBit(double bitRate) {
+            return bitRate * Math.pow(C0, 2);
+        }
+
+        public double toKilobit(double bitRate) {
+            return bitRate * C0;
+        }
+
+        public double toMegabit(double bitRate) {
+            return bitRate;
+        }
+
+        public double toGigabit(double bitRate) {
+            return bitRate / C0;
+        }
+
+        public double toByte(double bitRate) {
+            return bitRate * Math.pow(C0, 2) / C1;
+        }
+
+        public double convert(double bitRate, BitRateUnit bitRateUnit) {
+            return bitRateUnit.toMegabit(bitRate);
+        }
+    },
+    Gigabit {
+        public double toBit(double bitRate) {
+            return bitRate * Math.pow(C0, 3);
+        }
+
+        public double toKilobit(double bitRate) {
+            return bitRate * Math.pow(C0, 2);
+        }
+
+        public double toMegabit(double bitRate) {
+            return bitRate * C0;
+        }
+
+        public double toGigabit(double bitRate) {
+            return bitRate;
+        }
+
+        public double toByte(double bitRate) {
+            return bitRate * Math.pow(C0, 3) / C1;
+        }
+
+        public double convert(double bitRate, BitRateUnit bitRateUnit) {
+            return bitRateUnit.toGigabit(bitRate);
+        }
+    },
+    Byte {
+        public double toBit(double bitRate) {
+            return bitRate * C1;
+        }
+
+        public double toKilobit(double bitRate) {
+            return bitRate * C1 / C0;
+        }
+
+        public double toMegabit(double bitRate) {
+            return bitRate * C1 / Math.pow(C0, 2);
+        }
+
+        public double toGigabit(double bitRate) {
+            return bitRate * C1 / Math.pow(C0, 3);
+        }
+
+        public double toByte(double bitRate) {
+            return bitRate;
+        }
+
+        public double convert(double bitRate, BitRateUnit bitRateUnit) {
+            return bitRateUnit.toByte(bitRate);
+        }
+    };
+
+    static final int C0 = 1000;
+    static final int C1 = 8;
+
+    public double toBit(double bitRate) {
+        throw new AbstractMethodError();
+    }
+
+    public double toKilobit(double bitRate) {
+        throw new AbstractMethodError();
+    }
+
+    public double toMegabit(double bitRate) {
+        throw new AbstractMethodError();
+    }
+
+    public double toGigabit(double bitRate) {
+        throw new AbstractMethodError();
+    }
+
+    public double toByte(double bitRate) {
+        throw new AbstractMethodError();
+    }
+
+    public double convert(double bitRate, BitRateUnit bitRateUnit) {
+        throw new AbstractMethodError();
+    }
+}
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
new file mode 100644
index 00000000000..14adcc70273
--- /dev/null
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+
+public class BitRateUnitTest {
+
+    @Test
+    public void testBps() {
+        double bps = 1231434.12;
+        assertEquals(BitRateUnit.Bit.toBit(bps), bps);
+        assertEquals(BitRateUnit.Bit.toByte(bps), bps / 8);
+        assertEquals(BitRateUnit.Bit.toKilobit(bps), bps / 1000);
+        assertEquals(BitRateUnit.Bit.toMegabit(bps), bps / 1000 / 1000);
+        assertEquals(BitRateUnit.Bit.toGigabit(bps), bps / 1000 / 1000 / 1000);
+    }
+
+    @Test
+    public void testKbps() {
+        double kbps = 1231434.12;
+        assertEquals(BitRateUnit.Kilobit.toBit(kbps), kbps * 1000);
+        assertEquals(BitRateUnit.Kilobit.toByte(kbps), kbps * 1000 / 8);
+        assertEquals(BitRateUnit.Kilobit.toKilobit(kbps), kbps);
+        assertEquals(BitRateUnit.Kilobit.toMegabit(kbps), kbps / 1000);
+        assertEquals(BitRateUnit.Kilobit.toGigabit(kbps), kbps / 1000 / 1000);
+    }
+
+    @Test
+    public void testMbps() {
+        double mbps = 1231434.12;
+        assertEquals(BitRateUnit.Megabit.toBit(mbps), mbps * 1000 * 1000);
+        assertEquals(BitRateUnit.Megabit.toByte(mbps), mbps * 1000 * 1000 / 8);
+        assertEquals(BitRateUnit.Megabit.toKilobit(mbps), mbps * 1000);
+        assertEquals(BitRateUnit.Megabit.toMegabit(mbps), mbps);
+        assertEquals(BitRateUnit.Megabit.toGigabit(mbps), mbps / 1000);
+    }
+
+    @Test
+    public void testGbps() {
+        double gbps = 1231434.12;
+        assertEquals(BitRateUnit.Gigabit.toBit(gbps),gbps * 1000 * 1000 * 1000 
);
+        assertEquals(BitRateUnit.Gigabit.toByte(gbps), gbps * 1000 * 1000 * 
1000 / 8);
+        assertEquals(BitRateUnit.Gigabit.toKilobit(gbps), gbps * 1000 * 1000);
+        assertEquals(BitRateUnit.Gigabit.toMegabit(gbps), gbps * 1000);
+        assertEquals(BitRateUnit.Gigabit.toGigabit(gbps), gbps);
+    }
+
+    @Test
+    public void testByte() {
+        double bytes = 1231434.12;
+        assertEquals(BitRateUnit.Byte.toBit(bytes), bytes * 8);
+        assertEquals(BitRateUnit.Byte.toByte(bytes), bytes);
+        assertEquals(BitRateUnit.Byte.toKilobit(bytes), bytes / 1000 * 8);
+        assertEquals(BitRateUnit.Byte.toMegabit(bytes), bytes / 1000 / 1000 * 
8);
+        assertEquals(BitRateUnit.Byte.toGigabit(bytes), bytes / 1000 / 1000 / 
1000 * 8);
+    }
+
+
+    @Test
+    public void testConvert() {
+        double unit = 12334125.1234;
+        assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Bit), 
BitRateUnit.Bit.toBit(unit));
+        assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Kilobit), 
BitRateUnit.Kilobit.toBit(unit));
+        assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Megabit), 
BitRateUnit.Megabit.toBit(unit));
+        assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Gigabit), 
BitRateUnit.Gigabit.toBit(unit));
+        assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Byte), 
BitRateUnit.Byte.toBit(unit));
+
+        assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Bit),  
BitRateUnit.Bit.toKilobit(unit));
+        assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Kilobit), 
BitRateUnit.Kilobit.toKilobit(unit));
+        assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Megabit), 
BitRateUnit.Megabit.toKilobit(unit));
+        assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Gigabit), 
BitRateUnit.Gigabit.toKilobit(unit));
+        assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Byte), 
BitRateUnit.Byte.toKilobit(unit));
+
+        assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Bit), 
BitRateUnit.Bit.toMegabit(unit));
+        assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Kilobit), 
BitRateUnit.Kilobit.toMegabit(unit));
+        assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Megabit), 
BitRateUnit.Megabit.toMegabit(unit));
+        assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Gigabit), 
BitRateUnit.Gigabit.toMegabit(unit));
+        assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Byte), 
BitRateUnit.Byte.toMegabit(unit));
+
+        assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Bit), 
BitRateUnit.Bit.toGigabit(unit));
+        assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Kilobit), 
BitRateUnit.Kilobit.toGigabit(unit));
+        assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Megabit), 
BitRateUnit.Megabit.toGigabit(unit));
+        assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Gigabit), 
BitRateUnit.Gigabit.toGigabit(unit));
+        assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Byte), 
BitRateUnit.Byte.toGigabit(unit));
+
+        assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Bit), 
BitRateUnit.Bit.toByte(unit));
+        assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Kilobit), 
BitRateUnit.Kilobit.toByte(unit));
+        assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Megabit), 
BitRateUnit.Megabit.toByte(unit));
+        assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Gigabit), 
BitRateUnit.Gigabit.toByte(unit));
+        assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Byte), 
BitRateUnit.Byte.toByte(unit));
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
index 62d80776d85..c23b3e8d2a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
@@ -35,6 +35,7 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.SystemUtils;
+import org.apache.pulsar.broker.BitRateUnit;
 
 @Slf4j
 public class LinuxInfoUtils {
@@ -165,36 +166,36 @@ public class LinuxInfoUtils {
     /**
      * Get all physical nic limit.
      * @param nics All nic path
-     * @param nicUnit Nic speed unit
+     * @param bitRateUnit Bit rate unit
      * @return Total nic limit
      */
-    public static double getTotalNicLimit(List<String> nics, NICUnit nicUnit) {
-        return nicUnit.convertBy(nics.stream().mapToDouble(nicPath -> {
+    public static double getTotalNicLimit(List<String> nics, BitRateUnit 
bitRateUnit) {
+        return bitRateUnit.convert(nics.stream().mapToDouble(nicPath -> {
             try {
                 return 
readDoubleFromFile(getReplacedNICPath(NIC_SPEED_TEMPLATE, nicPath));
             } catch (IOException e) {
                 log.error("[LinuxInfo] Failed to get total nic limit.", e);
                 return 0d;
             }
-        }).sum());
+        }).sum(), BitRateUnit.Bit);
     }
 
     /**
      * Get all physical nic usage.
      * @param nics All nic path
      * @param type Nic's usage type:  transport, receive
-     * @param unit Nic usage unit
+     * @param bitRateUnit Bit rate unit
      * @return Total nic usage
      */
-    public static double getTotalNicUsage(List<String> nics, NICUsageType 
type, UsageUnit unit) {
-        return unit.convertBy(nics.stream().mapToDouble(nic -> {
+    public static double getTotalNicUsage(List<String> nics, NICUsageType 
type, BitRateUnit bitRateUnit) {
+        return bitRateUnit.convert(nics.stream().mapToDouble(nic -> {
             try {
                 return readDoubleFromFile(getReplacedNICPath(type.template, 
nic));
             } catch (IOException e) {
                 log.error("[LinuxInfo] Failed to read {} bytes for NIC {} ", 
type, nic, e);
                 return 0d;
             }
-        }).sum());
+        }).sum(), BitRateUnit.Byte);
     }
 
     /**
@@ -221,7 +222,7 @@ public class LinuxInfoUtils {
         if (CollectionUtils.isEmpty(physicalNICs)) {
             return false;
         }
-        double totalNicLimit = getTotalNicLimit(physicalNICs, NICUnit.Kbps);
+        double totalNicLimit = getTotalNicLimit(physicalNICs, 
BitRateUnit.Kilobit);
         return totalNicLimit > 0;
     }
 
@@ -241,28 +242,6 @@ public class LinuxInfoUtils {
         return Double.parseDouble(readTrimStringFromFile(path));
     }
 
-    @AllArgsConstructor
-    public enum NICUnit {
-        Kbps(1000);
-
-        private final int convertUnit;
-
-        public double convertBy(double usageBytes) {
-            return this.convertUnit * usageBytes;
-        }
-    }
-
-    @AllArgsConstructor
-    public enum UsageUnit {
-        Kbps(8d / 1000);
-
-        private final double convertUnit;
-
-        public double convertBy(double usageBytes) {
-            return this.convertUnit * usageBytes;
-        }
-    }
-
     @AllArgsConstructor
     public enum NICUsageType {
         // transport
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 67a62b52896..42ef33fbce9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.NICUnit;
 import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.NICUsageType;
-import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.UsageUnit;
 import static 
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForCGroup;
 import static 
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForEntireHost;
 import static 
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getPhysicalNICs;
@@ -36,6 +34,7 @@ import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BitRateUnit;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
 import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
@@ -90,8 +89,8 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
     public void calculateBrokerHostUsage() {
         List<String> nics = getPhysicalNICs();
         double totalNicLimit = getTotalNicLimitWithConfiguration(nics);
-        double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX, 
UsageUnit.Kbps);
-        double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX, 
UsageUnit.Kbps);
+        double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX, 
BitRateUnit.Kilobit);
+        double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX, 
BitRateUnit.Kilobit);
         double totalCpuLimit = getTotalCpuLimit(isCGroupsEnabled);
         long now = System.currentTimeMillis();
         double elapsedSeconds = (now - lastCollection) / 1000d;
@@ -125,8 +124,8 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
     private double getTotalNicLimitWithConfiguration(List<String> nics) {
         // Use the override value as configured. Return the total max speed 
across all available NICs, converted
         // from Gbps into Kbps
-        return overrideBrokerNicSpeedGbps.map(aDouble -> aDouble * nics.size() 
* 1000 * 1000)
-                .orElseGet(() -> getTotalNicLimit(nics, NICUnit.Kbps));
+        return overrideBrokerNicSpeedGbps.map(BitRateUnit.Gigabit::toKilobit)
+                .orElseGet(() -> getTotalNicLimit(nics, BitRateUnit.Kilobit));
     }
 
     private double getTotalCpuUsage(double elapsedTimeSeconds) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtilsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtilsTest.java
deleted file mode 100644
index f1e14009bd2..00000000000
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtilsTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.broker.loadbalance;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class LinuxInfoUtilsTest {
-    @Test
-    public void testUsageUnit() {
-        Assert.assertTrue(LinuxInfoUtils.UsageUnit.Kbps.convertBy(100) > 0);
-    }
-}
\ No newline at end of file

Reply via email to