This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0a4c980d146 [improve][broker] Introduce `BitRateUnit` for calculate
bitrate (#15435)
0a4c980d146 is described below
commit 0a4c980d1469e8a23d008b726fce5aa5f011c47b
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu May 5 11:28:47 2022 +0800
[improve][broker] Introduce `BitRateUnit` for calculate bitrate (#15435)
---
.../java/org/apache/pulsar/broker/BitRateUnit.java | 175 +++++++++++++++++++++
.../org/apache/pulsar/broker/BitRateUnitTest.java | 110 +++++++++++++
.../pulsar/broker/loadbalance/LinuxInfoUtils.java | 41 ++---
.../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 11 +-
.../broker/loadbalance/LinuxInfoUtilsTest.java | 31 ----
5 files changed, 300 insertions(+), 68 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java
new file mode 100644
index 00000000000..7e0d0dcfda1
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+public enum BitRateUnit {
+
+ Bit {
+ public double toBit(double bitRate) {
+ return bitRate;
+ }
+
+ public double toKilobit(double bitRate) {
+ return bitRate / C0;
+ }
+
+ public double toMegabit(double bitRate) {
+ return bitRate / Math.pow(C0, 2);
+ }
+
+ public double toGigabit(double bitRate) {
+ return bitRate / Math.pow(C0, 3);
+ }
+
+ public double toByte(double bitRate) {
+ return bitRate / C1;
+ }
+
+ public double convert(double bitRate, BitRateUnit bitRateUnit) {
+ return bitRateUnit.toBit(bitRate);
+ }
+ },
+ Kilobit {
+ public double toBit(double bitRate) {
+ return bitRate * C0;
+ }
+
+ public double toKilobit(double bitRate) {
+ return bitRate;
+ }
+
+ public double toMegabit(double bitRate) {
+ return bitRate / C0;
+ }
+
+ public double toGigabit(double bitRate) {
+ return bitRate / Math.pow(C0, 2);
+ }
+
+ public double toByte(double bitRate) {
+ return bitRate * C0 / C1;
+ }
+
+ public double convert(double bitRate, BitRateUnit bitRateUnit) {
+ return bitRateUnit.toKilobit(bitRate);
+ }
+ },
+ Megabit {
+ public double toBit(double bitRate) {
+ return bitRate * Math.pow(C0, 2);
+ }
+
+ public double toKilobit(double bitRate) {
+ return bitRate * C0;
+ }
+
+ public double toMegabit(double bitRate) {
+ return bitRate;
+ }
+
+ public double toGigabit(double bitRate) {
+ return bitRate / C0;
+ }
+
+ public double toByte(double bitRate) {
+ return bitRate * Math.pow(C0, 2) / C1;
+ }
+
+ public double convert(double bitRate, BitRateUnit bitRateUnit) {
+ return bitRateUnit.toMegabit(bitRate);
+ }
+ },
+ Gigabit {
+ public double toBit(double bitRate) {
+ return bitRate * Math.pow(C0, 3);
+ }
+
+ public double toKilobit(double bitRate) {
+ return bitRate * Math.pow(C0, 2);
+ }
+
+ public double toMegabit(double bitRate) {
+ return bitRate * C0;
+ }
+
+ public double toGigabit(double bitRate) {
+ return bitRate;
+ }
+
+ public double toByte(double bitRate) {
+ return bitRate * Math.pow(C0, 3) / C1;
+ }
+
+ public double convert(double bitRate, BitRateUnit bitRateUnit) {
+ return bitRateUnit.toGigabit(bitRate);
+ }
+ },
+ Byte {
+ public double toBit(double bitRate) {
+ return bitRate * C1;
+ }
+
+ public double toKilobit(double bitRate) {
+ return bitRate * C1 / C0;
+ }
+
+ public double toMegabit(double bitRate) {
+ return bitRate * C1 / Math.pow(C0, 2);
+ }
+
+ public double toGigabit(double bitRate) {
+ return bitRate * C1 / Math.pow(C0, 3);
+ }
+
+ public double toByte(double bitRate) {
+ return bitRate;
+ }
+
+ public double convert(double bitRate, BitRateUnit bitRateUnit) {
+ return bitRateUnit.toByte(bitRate);
+ }
+ };
+
+ static final int C0 = 1000;
+ static final int C1 = 8;
+
+ public double toBit(double bitRate) {
+ throw new AbstractMethodError();
+ }
+
+ public double toKilobit(double bitRate) {
+ throw new AbstractMethodError();
+ }
+
+ public double toMegabit(double bitRate) {
+ throw new AbstractMethodError();
+ }
+
+ public double toGigabit(double bitRate) {
+ throw new AbstractMethodError();
+ }
+
+ public double toByte(double bitRate) {
+ throw new AbstractMethodError();
+ }
+
+ public double convert(double bitRate, BitRateUnit bitRateUnit) {
+ throw new AbstractMethodError();
+ }
+}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
new file mode 100644
index 00000000000..14adcc70273
--- /dev/null
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+
+public class BitRateUnitTest {
+
+ @Test
+ public void testBps() {
+ double bps = 1231434.12;
+ assertEquals(BitRateUnit.Bit.toBit(bps), bps);
+ assertEquals(BitRateUnit.Bit.toByte(bps), bps / 8);
+ assertEquals(BitRateUnit.Bit.toKilobit(bps), bps / 1000);
+ assertEquals(BitRateUnit.Bit.toMegabit(bps), bps / 1000 / 1000);
+ assertEquals(BitRateUnit.Bit.toGigabit(bps), bps / 1000 / 1000 / 1000);
+ }
+
+ @Test
+ public void testKbps() {
+ double kbps = 1231434.12;
+ assertEquals(BitRateUnit.Kilobit.toBit(kbps), kbps * 1000);
+ assertEquals(BitRateUnit.Kilobit.toByte(kbps), kbps * 1000 / 8);
+ assertEquals(BitRateUnit.Kilobit.toKilobit(kbps), kbps);
+ assertEquals(BitRateUnit.Kilobit.toMegabit(kbps), kbps / 1000);
+ assertEquals(BitRateUnit.Kilobit.toGigabit(kbps), kbps / 1000 / 1000);
+ }
+
+ @Test
+ public void testMbps() {
+ double mbps = 1231434.12;
+ assertEquals(BitRateUnit.Megabit.toBit(mbps), mbps * 1000 * 1000);
+ assertEquals(BitRateUnit.Megabit.toByte(mbps), mbps * 1000 * 1000 / 8);
+ assertEquals(BitRateUnit.Megabit.toKilobit(mbps), mbps * 1000);
+ assertEquals(BitRateUnit.Megabit.toMegabit(mbps), mbps);
+ assertEquals(BitRateUnit.Megabit.toGigabit(mbps), mbps / 1000);
+ }
+
+ @Test
+ public void testGbps() {
+ double gbps = 1231434.12;
+ assertEquals(BitRateUnit.Gigabit.toBit(gbps),gbps * 1000 * 1000 * 1000
);
+ assertEquals(BitRateUnit.Gigabit.toByte(gbps), gbps * 1000 * 1000 *
1000 / 8);
+ assertEquals(BitRateUnit.Gigabit.toKilobit(gbps), gbps * 1000 * 1000);
+ assertEquals(BitRateUnit.Gigabit.toMegabit(gbps), gbps * 1000);
+ assertEquals(BitRateUnit.Gigabit.toGigabit(gbps), gbps);
+ }
+
+ @Test
+ public void testByte() {
+ double bytes = 1231434.12;
+ assertEquals(BitRateUnit.Byte.toBit(bytes), bytes * 8);
+ assertEquals(BitRateUnit.Byte.toByte(bytes), bytes);
+ assertEquals(BitRateUnit.Byte.toKilobit(bytes), bytes / 1000 * 8);
+ assertEquals(BitRateUnit.Byte.toMegabit(bytes), bytes / 1000 / 1000 *
8);
+ assertEquals(BitRateUnit.Byte.toGigabit(bytes), bytes / 1000 / 1000 /
1000 * 8);
+ }
+
+
+ @Test
+ public void testConvert() {
+ double unit = 12334125.1234;
+ assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toBit(unit));
+ assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toBit(unit));
+ assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toBit(unit));
+ assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toBit(unit));
+ assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toBit(unit));
+
+ assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toKilobit(unit));
+ assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toKilobit(unit));
+ assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toKilobit(unit));
+ assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toKilobit(unit));
+ assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toKilobit(unit));
+
+ assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toMegabit(unit));
+ assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toMegabit(unit));
+ assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toMegabit(unit));
+ assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toMegabit(unit));
+ assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toMegabit(unit));
+
+ assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toGigabit(unit));
+ assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toGigabit(unit));
+ assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toGigabit(unit));
+ assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toGigabit(unit));
+ assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toGigabit(unit));
+
+ assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toByte(unit));
+ assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toByte(unit));
+ assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toByte(unit));
+ assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toByte(unit));
+ assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toByte(unit));
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
index 62d80776d85..c23b3e8d2a6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
@@ -35,6 +35,7 @@ import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.pulsar.broker.BitRateUnit;
@Slf4j
public class LinuxInfoUtils {
@@ -165,36 +166,36 @@ public class LinuxInfoUtils {
/**
* Get all physical nic limit.
* @param nics All nic path
- * @param nicUnit Nic speed unit
+ * @param bitRateUnit Bit rate unit
* @return Total nic limit
*/
- public static double getTotalNicLimit(List<String> nics, NICUnit nicUnit) {
- return nicUnit.convertBy(nics.stream().mapToDouble(nicPath -> {
+ public static double getTotalNicLimit(List<String> nics, BitRateUnit
bitRateUnit) {
+ return bitRateUnit.convert(nics.stream().mapToDouble(nicPath -> {
try {
return
readDoubleFromFile(getReplacedNICPath(NIC_SPEED_TEMPLATE, nicPath));
} catch (IOException e) {
log.error("[LinuxInfo] Failed to get total nic limit.", e);
return 0d;
}
- }).sum());
+ }).sum(), BitRateUnit.Bit);
}
/**
* Get all physical nic usage.
* @param nics All nic path
* @param type Nic's usage type: transport, receive
- * @param unit Nic usage unit
+ * @param bitRateUnit Bit rate unit
* @return Total nic usage
*/
- public static double getTotalNicUsage(List<String> nics, NICUsageType
type, UsageUnit unit) {
- return unit.convertBy(nics.stream().mapToDouble(nic -> {
+ public static double getTotalNicUsage(List<String> nics, NICUsageType
type, BitRateUnit bitRateUnit) {
+ return bitRateUnit.convert(nics.stream().mapToDouble(nic -> {
try {
return readDoubleFromFile(getReplacedNICPath(type.template,
nic));
} catch (IOException e) {
log.error("[LinuxInfo] Failed to read {} bytes for NIC {} ",
type, nic, e);
return 0d;
}
- }).sum());
+ }).sum(), BitRateUnit.Byte);
}
/**
@@ -221,7 +222,7 @@ public class LinuxInfoUtils {
if (CollectionUtils.isEmpty(physicalNICs)) {
return false;
}
- double totalNicLimit = getTotalNicLimit(physicalNICs, NICUnit.Kbps);
+ double totalNicLimit = getTotalNicLimit(physicalNICs,
BitRateUnit.Kilobit);
return totalNicLimit > 0;
}
@@ -241,28 +242,6 @@ public class LinuxInfoUtils {
return Double.parseDouble(readTrimStringFromFile(path));
}
- @AllArgsConstructor
- public enum NICUnit {
- Kbps(1000);
-
- private final int convertUnit;
-
- public double convertBy(double usageBytes) {
- return this.convertUnit * usageBytes;
- }
- }
-
- @AllArgsConstructor
- public enum UsageUnit {
- Kbps(8d / 1000);
-
- private final double convertUnit;
-
- public double convertBy(double usageBytes) {
- return this.convertUnit * usageBytes;
- }
- }
-
@AllArgsConstructor
public enum NICUsageType {
// transport
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 67a62b52896..42ef33fbce9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.NICUnit;
import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.NICUsageType;
-import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.UsageUnit;
import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForCGroup;
import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForEntireHost;
import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getPhysicalNICs;
@@ -36,6 +34,7 @@ import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BitRateUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
@@ -90,8 +89,8 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
public void calculateBrokerHostUsage() {
List<String> nics = getPhysicalNICs();
double totalNicLimit = getTotalNicLimitWithConfiguration(nics);
- double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX,
UsageUnit.Kbps);
- double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX,
UsageUnit.Kbps);
+ double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX,
BitRateUnit.Kilobit);
+ double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX,
BitRateUnit.Kilobit);
double totalCpuLimit = getTotalCpuLimit(isCGroupsEnabled);
long now = System.currentTimeMillis();
double elapsedSeconds = (now - lastCollection) / 1000d;
@@ -125,8 +124,8 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
private double getTotalNicLimitWithConfiguration(List<String> nics) {
// Use the override value as configured. Return the total max speed
across all available NICs, converted
// from Gbps into Kbps
- return overrideBrokerNicSpeedGbps.map(aDouble -> aDouble * nics.size()
* 1000 * 1000)
- .orElseGet(() -> getTotalNicLimit(nics, NICUnit.Kbps));
+ return overrideBrokerNicSpeedGbps.map(BitRateUnit.Gigabit::toKilobit)
+ .orElseGet(() -> getTotalNicLimit(nics, BitRateUnit.Kilobit));
}
private double getTotalCpuUsage(double elapsedTimeSeconds) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtilsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtilsTest.java
deleted file mode 100644
index f1e14009bd2..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtilsTest.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.broker.loadbalance;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class LinuxInfoUtilsTest {
- @Test
- public void testUsageUnit() {
- Assert.assertTrue(LinuxInfoUtils.UsageUnit.Kbps.convertBy(100) > 0);
- }
-}
\ No newline at end of file