This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a47c6acb7a2 Revert "[improve][broker][branch-2.10] Backport Linux
metrics changes from master branch (#20659)"
a47c6acb7a2 is described below
commit a47c6acb7a250c677fabd03c12f5db298c9fb5e8
Author: fengyubiao <[email protected]>
AuthorDate: Wed Nov 29 18:12:35 2023 +0800
Revert "[improve][broker][branch-2.10] Backport Linux metrics changes from
master branch (#20659)"
This reverts commit b355d3176f7b9fa029e97352c9cb0f4dd0e33649.
---
bin/pulsar | 2 -
pom.xml | 1 -
.../java/org/apache/pulsar/broker/BitRateUnit.java | 175 ----------
.../org/apache/pulsar/broker/BitRateUnitTest.java | 110 ------
.../org/apache/pulsar/broker/PulsarService.java | 9 -
.../pulsar/broker/loadbalance/LinuxInfoUtils.java | 369 ---------------------
.../loadbalance/impl/LinuxBrokerHostUsageImpl.java | 199 ++++++++---
.../pulsar/broker/EmbeddedPulsarCluster.java | 1 -
.../apache/pulsar/broker/PulsarServiceTest.java | 1 -
.../apache/pulsar/broker/SLAMonitoringTest.java | 1 -
.../broker/admin/BrokerAdminClientTlsAuthTest.java | 1 -
.../broker/auth/MockedPulsarServiceBaseTest.java | 2 -
.../AntiAffinityNamespaceGroupTest.java | 2 -
.../loadbalance/LeaderElectionServiceTest.java | 1 -
.../broker/loadbalance/LoadBalancerTest.java | 1 -
.../loadbalance/LoadReportNetworkLimitTest.java | 23 +-
.../broker/loadbalance/SimpleBrokerStartTest.java | 125 -------
.../loadbalance/SimpleLoadManagerImplTest.java | 2 -
.../loadbalance/impl/BundleSplitterTaskTest.java | 1 -
.../impl/LinuxBrokerHostUsageImplTest.java | 76 -----
.../impl/ModularLoadManagerImplTest.java | 3 -
.../OwnerShipForCurrentServerTestBase.java | 1 -
.../broker/service/BacklogQuotaManagerTest.java | 1 -
.../pulsar/broker/service/BkEnsemblesTestBase.java | 1 -
.../broker/service/BrokerBookieIsolationTest.java | 5 -
.../pulsar/broker/service/MaxMessageSizeTest.java | 1 -
.../PersistentDispatcherFailoverConsumerTest.java | 2 -
.../service/PersistentTopicConcurrentTest.java | 2 -
.../pulsar/broker/service/PersistentTopicTest.java | 1 -
.../pulsar/broker/service/ReplicatorTestBase.java | 1 -
.../pulsar/broker/service/ServerCnxTest.java | 1 -
.../pulsar/broker/service/TopicOwnerTest.java | 1 -
.../persistent/PersistentSubscriptionTest.java | 1 -
.../broker/transaction/TransactionTestBase.java | 1 -
.../coordinator/TransactionMetaStoreTestBase.java | 1 -
.../apache/pulsar/broker/web/WebServiceTest.java | 1 -
.../pulsar/client/api/BrokerServiceLookupTest.java | 6 -
.../client/api/ClientDeduplicationFailureTest.java | 1 -
.../pulsar/client/api/NonPersistentTopicTest.java | 3 -
.../pulsar/client/api/ServiceUrlProviderTest.java | 1 -
.../worker/PulsarFunctionE2ESecurityTest.java | 1 -
.../worker/PulsarFunctionLocalRunTest.java | 1 -
.../worker/PulsarFunctionPublishTest.java | 1 -
.../functions/worker/PulsarFunctionTlsTest.java | 1 -
.../worker/PulsarWorkerAssignmentTest.java | 1 -
.../apache/pulsar/io/AbstractPulsarE2ETest.java | 1 -
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 1 -
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 2 -
.../integration/topologies/PulsarCluster.java | 1 -
49 files changed, 169 insertions(+), 978 deletions(-)
diff --git a/bin/pulsar b/bin/pulsar
index 7c2a75aad66..8f8d77ed1f7 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -322,8 +322,6 @@ if [[ -z "$IS_JAVA_8" ]]; then
OPTS="$OPTS --add-opens java.management/sun.management=ALL-UNNAMED"
# MBeanStatsGenerator
OPTS="$OPTS --add-opens
jdk.management/com.sun.management.internal=ALL-UNNAMED"
- # LinuxInfoUtils
- OPTS="$OPTS --add-opens java.base/jdk.internal.platform=ALL-UNNAMED"
fi
ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=*
-Dzookeeper.snapshot.trust.empty=true -Dzookeeper.tcpKeepAlive=true"
diff --git a/pom.xml b/pom.xml
index a0eb4668399..4902bb4d276 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1921,7 +1921,6 @@ flexible messaging model and an intuitive client
API.</description>
--add-opens java.base/java.io=ALL-UNNAMED <!--Bookkeeper NativeIO -->
--add-opens java.management/sun.management=ALL-UNNAMED
<!--JvmDefaultGCMetricsLogger & MBeanStatsGenerator-->
--add-opens jdk.management/com.sun.management.internal=ALL-UNNAMED
<!--MBeanStatsGenerator-->
- --add-opens java.base/jdk.internal.platform=ALL-UNNAMED
<!--LinuxInfoUtils-->
</test.additional.args>
</properties>
<build>
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java
deleted file mode 100644
index 7e0d0dcfda1..00000000000
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/BitRateUnit.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker;
-
-public enum BitRateUnit {
-
- Bit {
- public double toBit(double bitRate) {
- return bitRate;
- }
-
- public double toKilobit(double bitRate) {
- return bitRate / C0;
- }
-
- public double toMegabit(double bitRate) {
- return bitRate / Math.pow(C0, 2);
- }
-
- public double toGigabit(double bitRate) {
- return bitRate / Math.pow(C0, 3);
- }
-
- public double toByte(double bitRate) {
- return bitRate / C1;
- }
-
- public double convert(double bitRate, BitRateUnit bitRateUnit) {
- return bitRateUnit.toBit(bitRate);
- }
- },
- Kilobit {
- public double toBit(double bitRate) {
- return bitRate * C0;
- }
-
- public double toKilobit(double bitRate) {
- return bitRate;
- }
-
- public double toMegabit(double bitRate) {
- return bitRate / C0;
- }
-
- public double toGigabit(double bitRate) {
- return bitRate / Math.pow(C0, 2);
- }
-
- public double toByte(double bitRate) {
- return bitRate * C0 / C1;
- }
-
- public double convert(double bitRate, BitRateUnit bitRateUnit) {
- return bitRateUnit.toKilobit(bitRate);
- }
- },
- Megabit {
- public double toBit(double bitRate) {
- return bitRate * Math.pow(C0, 2);
- }
-
- public double toKilobit(double bitRate) {
- return bitRate * C0;
- }
-
- public double toMegabit(double bitRate) {
- return bitRate;
- }
-
- public double toGigabit(double bitRate) {
- return bitRate / C0;
- }
-
- public double toByte(double bitRate) {
- return bitRate * Math.pow(C0, 2) / C1;
- }
-
- public double convert(double bitRate, BitRateUnit bitRateUnit) {
- return bitRateUnit.toMegabit(bitRate);
- }
- },
- Gigabit {
- public double toBit(double bitRate) {
- return bitRate * Math.pow(C0, 3);
- }
-
- public double toKilobit(double bitRate) {
- return bitRate * Math.pow(C0, 2);
- }
-
- public double toMegabit(double bitRate) {
- return bitRate * C0;
- }
-
- public double toGigabit(double bitRate) {
- return bitRate;
- }
-
- public double toByte(double bitRate) {
- return bitRate * Math.pow(C0, 3) / C1;
- }
-
- public double convert(double bitRate, BitRateUnit bitRateUnit) {
- return bitRateUnit.toGigabit(bitRate);
- }
- },
- Byte {
- public double toBit(double bitRate) {
- return bitRate * C1;
- }
-
- public double toKilobit(double bitRate) {
- return bitRate * C1 / C0;
- }
-
- public double toMegabit(double bitRate) {
- return bitRate * C1 / Math.pow(C0, 2);
- }
-
- public double toGigabit(double bitRate) {
- return bitRate * C1 / Math.pow(C0, 3);
- }
-
- public double toByte(double bitRate) {
- return bitRate;
- }
-
- public double convert(double bitRate, BitRateUnit bitRateUnit) {
- return bitRateUnit.toByte(bitRate);
- }
- };
-
- static final int C0 = 1000;
- static final int C1 = 8;
-
- public double toBit(double bitRate) {
- throw new AbstractMethodError();
- }
-
- public double toKilobit(double bitRate) {
- throw new AbstractMethodError();
- }
-
- public double toMegabit(double bitRate) {
- throw new AbstractMethodError();
- }
-
- public double toGigabit(double bitRate) {
- throw new AbstractMethodError();
- }
-
- public double toByte(double bitRate) {
- throw new AbstractMethodError();
- }
-
- public double convert(double bitRate, BitRateUnit bitRateUnit) {
- throw new AbstractMethodError();
- }
-}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
deleted file mode 100644
index 14adcc70273..00000000000
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/BitRateUnitTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker;
-
-import static org.testng.Assert.assertEquals;
-import org.testng.annotations.Test;
-
-public class BitRateUnitTest {
-
- @Test
- public void testBps() {
- double bps = 1231434.12;
- assertEquals(BitRateUnit.Bit.toBit(bps), bps);
- assertEquals(BitRateUnit.Bit.toByte(bps), bps / 8);
- assertEquals(BitRateUnit.Bit.toKilobit(bps), bps / 1000);
- assertEquals(BitRateUnit.Bit.toMegabit(bps), bps / 1000 / 1000);
- assertEquals(BitRateUnit.Bit.toGigabit(bps), bps / 1000 / 1000 / 1000);
- }
-
- @Test
- public void testKbps() {
- double kbps = 1231434.12;
- assertEquals(BitRateUnit.Kilobit.toBit(kbps), kbps * 1000);
- assertEquals(BitRateUnit.Kilobit.toByte(kbps), kbps * 1000 / 8);
- assertEquals(BitRateUnit.Kilobit.toKilobit(kbps), kbps);
- assertEquals(BitRateUnit.Kilobit.toMegabit(kbps), kbps / 1000);
- assertEquals(BitRateUnit.Kilobit.toGigabit(kbps), kbps / 1000 / 1000);
- }
-
- @Test
- public void testMbps() {
- double mbps = 1231434.12;
- assertEquals(BitRateUnit.Megabit.toBit(mbps), mbps * 1000 * 1000);
- assertEquals(BitRateUnit.Megabit.toByte(mbps), mbps * 1000 * 1000 / 8);
- assertEquals(BitRateUnit.Megabit.toKilobit(mbps), mbps * 1000);
- assertEquals(BitRateUnit.Megabit.toMegabit(mbps), mbps);
- assertEquals(BitRateUnit.Megabit.toGigabit(mbps), mbps / 1000);
- }
-
- @Test
- public void testGbps() {
- double gbps = 1231434.12;
- assertEquals(BitRateUnit.Gigabit.toBit(gbps),gbps * 1000 * 1000 * 1000
);
- assertEquals(BitRateUnit.Gigabit.toByte(gbps), gbps * 1000 * 1000 *
1000 / 8);
- assertEquals(BitRateUnit.Gigabit.toKilobit(gbps), gbps * 1000 * 1000);
- assertEquals(BitRateUnit.Gigabit.toMegabit(gbps), gbps * 1000);
- assertEquals(BitRateUnit.Gigabit.toGigabit(gbps), gbps);
- }
-
- @Test
- public void testByte() {
- double bytes = 1231434.12;
- assertEquals(BitRateUnit.Byte.toBit(bytes), bytes * 8);
- assertEquals(BitRateUnit.Byte.toByte(bytes), bytes);
- assertEquals(BitRateUnit.Byte.toKilobit(bytes), bytes / 1000 * 8);
- assertEquals(BitRateUnit.Byte.toMegabit(bytes), bytes / 1000 / 1000 *
8);
- assertEquals(BitRateUnit.Byte.toGigabit(bytes), bytes / 1000 / 1000 /
1000 * 8);
- }
-
-
- @Test
- public void testConvert() {
- double unit = 12334125.1234;
- assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toBit(unit));
- assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toBit(unit));
- assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toBit(unit));
- assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toBit(unit));
- assertEquals(BitRateUnit.Bit.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toBit(unit));
-
- assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toKilobit(unit));
- assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toKilobit(unit));
- assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toKilobit(unit));
- assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toKilobit(unit));
- assertEquals(BitRateUnit.Kilobit.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toKilobit(unit));
-
- assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toMegabit(unit));
- assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toMegabit(unit));
- assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toMegabit(unit));
- assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toMegabit(unit));
- assertEquals(BitRateUnit.Megabit.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toMegabit(unit));
-
- assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toGigabit(unit));
- assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toGigabit(unit));
- assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toGigabit(unit));
- assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toGigabit(unit));
- assertEquals(BitRateUnit.Gigabit.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toGigabit(unit));
-
- assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Bit),
BitRateUnit.Bit.toByte(unit));
- assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Kilobit),
BitRateUnit.Kilobit.toByte(unit));
- assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Megabit),
BitRateUnit.Megabit.toByte(unit));
- assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Gigabit),
BitRateUnit.Gigabit.toByte(unit));
- assertEquals(BitRateUnit.Byte.convert(unit, BitRateUnit.Byte),
BitRateUnit.Byte.toByte(unit));
- }
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 7dd28df8a64..c75f68129f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -85,7 +85,6 @@ import
org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.intercept.BrokerInterceptors;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
-import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
@@ -662,14 +661,6 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
+ "authenticationEnabled=true when authorization is
enabled with authorizationEnabled=true.");
}
- if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
- && config.isLoadBalancerEnabled()
- && LinuxInfoUtils.isLinux()
- && !LinuxInfoUtils.checkHasNicSpeeds()) {
- throw new IllegalStateException("Unable to read VM NIC speed.
You must set "
- + "[loadBalancerOverrideBrokerNicSpeedGbps] to
override it when load balancer is enabled.");
- }
-
localMetadataStore = createLocalMetadataStore();
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
deleted file mode 100644
index 2a423045b1b..00000000000
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LinuxInfoUtils.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.loadbalance;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.pulsar.broker.BitRateUnit;
-
-@Slf4j
-public class LinuxInfoUtils {
-
- // CGROUP
- private static final String CGROUPS_CPU_USAGE_PATH =
"/sys/fs/cgroup/cpu/cpuacct.usage";
- private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH =
"/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
- private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH =
"/sys/fs/cgroup/cpu/cpu.cfs_period_us";
-
- // proc states
- private static final String PROC_STAT_PATH = "/proc/stat";
- private static final String NIC_PATH = "/sys/class/net/";
- // NIC type
- private static final int ARPHRD_ETHER = 1;
- private static final String NIC_SPEED_TEMPLATE = "/sys/class/net/%s/speed";
-
- private static Object /*jdk.internal.platform.Metrics*/ metrics;
- private static Method getMetricsProviderMethod;
- private static Method getCpuQuotaMethod;
- private static Method getCpuPeriodMethod;
- private static Method getCpuUsageMethod;
-
- static {
- try {
- metrics =
Class.forName("jdk.internal.platform.Container").getMethod("metrics")
- .invoke(null);
- if (metrics != null) {
- getMetricsProviderMethod =
metrics.getClass().getMethod("getProvider");
- getMetricsProviderMethod.setAccessible(true);
- getCpuQuotaMethod =
metrics.getClass().getMethod("getCpuQuota");
- getCpuQuotaMethod.setAccessible(true);
- getCpuPeriodMethod =
metrics.getClass().getMethod("getCpuPeriod");
- getCpuPeriodMethod.setAccessible(true);
- getCpuUsageMethod =
metrics.getClass().getMethod("getCpuUsage");
- getCpuUsageMethod.setAccessible(true);
- }
- } catch (Throwable e) {
- log.warn("Failed to get runtime metrics", e);
- }
- }
-
- /**
- * Determine whether the OS is the linux kernel.
- * @return Whether the OS is the linux kernel
- */
- public static boolean isLinux() {
- return SystemUtils.IS_OS_LINUX;
- }
-
- /**
- * Determine whether the OS enable CG Group.
- */
- public static boolean isCGroupEnabled() {
- try {
- if (metrics == null) {
- return Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
- }
- String provider = (String)
getMetricsProviderMethod.invoke(metrics);
- log.info("[LinuxInfo] The system metrics provider is: {}",
provider);
- return provider.contains("cgroup");
- } catch (Exception e) {
- log.warn("[LinuxInfo] Failed to check cgroup CPU: {}",
e.getMessage());
- return false;
- }
- }
-
- /**
- * Get total cpu limit.
- * @param isCGroupsEnabled Whether CGroup is enabled
- * @return Total cpu limit
- */
- public static double getTotalCpuLimit(boolean isCGroupsEnabled) {
- if (isCGroupsEnabled) {
- try {
- long quota;
- long period;
- if (metrics != null && getCpuQuotaMethod != null &&
getCpuPeriodMethod != null) {
- quota = (long) getCpuQuotaMethod.invoke(metrics);
- period = (long) getCpuPeriodMethod.invoke(metrics);
- } else {
- quota =
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_QUOTA_PATH));
- period =
readLongFromFile(Paths.get(CGROUPS_CPU_LIMIT_PERIOD_PATH));
- }
-
- if (quota > 0) {
- return 100.0 * quota / period;
- }
- } catch (Exception e) {
- log.warn("[LinuxInfo] Failed to read CPU quotas from cgroup",
e);
- // Fallback to availableProcessors
- }
- }
- // Fallback to JVM reported CPU quota
- return 100 * Runtime.getRuntime().availableProcessors();
- }
-
- /**
- * Get CGroup cpu usage.
- * @return Cpu usage
- */
- public static long getCpuUsageForCGroup() {
- try {
- if (metrics != null && getCpuUsageMethod != null) {
- return (long) getCpuUsageMethod.invoke(metrics);
- }
- return readLongFromFile(Paths.get(CGROUPS_CPU_USAGE_PATH));
- } catch (Exception e) {
- log.error("[LinuxInfo] Failed to read CPU usage from cgroup", e);
- return -1;
- }
- }
-
-
- /**
- * Reads first line of /proc/stat to get total cpu usage.
- *
- * <pre>
- * cpu user nice system idle iowait irq softirq steal guest
guest_nice
- * cpu 317808 128 58637 2503692 7634 0 13472 0 0 0
- * </pre>
- * <p>
- * Line is split in "words", filtering the first. The sum of all numbers
give the amount of cpu cycles used this
- * far. Real CPU usage should equal the sum subtracting the idle cycles,
this would include iowait, irq and steal.
- */
- public static ResourceUsage getCpuUsageForEntireHost() {
- try (Stream<String> stream = Files.lines(Paths.get(PROC_STAT_PATH))) {
- Optional<String> first = stream.findFirst();
- if (!first.isPresent()) {
- log.error("[LinuxInfo] Failed to read CPU usage from
/proc/stat, because of empty values.");
- return ResourceUsage.empty();
- }
- String[] words = first.get().split("\\s+");
- long total = Arrays.stream(words)
- .filter(s -> !s.contains("cpu"))
- .mapToLong(Long::parseLong)
- .sum();
- long idle = Long.parseLong(words[4]);
- return ResourceUsage.builder()
- .usage(total - idle)
- .idle(idle)
- .total(total).build();
- } catch (IOException e) {
- log.error("[LinuxInfo] Failed to read CPU usage from /proc/stat",
e);
- return ResourceUsage.empty();
- }
- }
-
- /**
- * Determine whether the VM has physical nic.
- * @param nicPath Nic path
- * @return whether The VM has physical nic.
- */
- private static boolean isPhysicalNic(Path nicPath) {
- try {
- if (nicPath.toString().contains("/virtual/")) {
- return false;
- }
- // Check the type to make sure it's ethernet (type "1")
- String type = readTrimStringFromFile(nicPath.resolve("type"));
- // wireless NICs don't report speed, ignore them.
- return Integer.parseInt(type) == ARPHRD_ETHER;
- } catch (Exception e) {
- log.warn("[LinuxInfo] Failed to read {} NIC type, the detail is:
{}", nicPath, e.getMessage());
- // Read type got error.
- return false;
- }
- }
-
- /**
- * Determine whether nic is usable.
- * @param nicPath Nic path
- * @return whether nic is usable.
- */
- private static boolean isUsable(Path nicPath) {
- try {
- String operstate =
readTrimStringFromFile(nicPath.resolve("operstate"));
- Operstate operState =
Operstate.valueOf(operstate.toUpperCase(Locale.ROOT));
- switch (operState) {
- case UP:
- case UNKNOWN:
- case DORMANT:
- return true;
- default:
- return false;
- }
- } catch (Exception e) {
- log.warn("[LinuxInfo] Failed to read {} NIC operstate, the detail
is: {}", nicPath, e.getMessage());
- // Read operstate got error.
- return false;
- }
- }
-
- /**
- * Get all physical nic limit.
- * @param nics All nic path
- * @param bitRateUnit Bit rate unit
- * @return Total nic limit
- */
- public static double getTotalNicLimit(List<String> nics, BitRateUnit
bitRateUnit) {
- return bitRateUnit.convert(nics.stream().mapToDouble(nicPath -> {
- try {
- return
readDoubleFromFile(getReplacedNICPath(NIC_SPEED_TEMPLATE, nicPath));
- } catch (IOException e) {
- log.error("[LinuxInfo] Failed to get total nic limit.", e);
- return 0d;
- }
- }).sum(), BitRateUnit.Megabit);
- }
-
- /**
- * Get all physical nic usage.
- * @param nics All nic path
- * @param type Nic's usage type: transport, receive
- * @param bitRateUnit Bit rate unit
- * @return Total nic usage
- */
- public static double getTotalNicUsage(List<String> nics, NICUsageType
type, BitRateUnit bitRateUnit) {
- return bitRateUnit.convert(nics.stream().mapToDouble(nic -> {
- try {
- return readDoubleFromFile(getReplacedNICPath(type.template,
nic));
- } catch (IOException e) {
- log.error("[LinuxInfo] Failed to read {} bytes for NIC {} ",
type, nic, e);
- return 0d;
- }
- }).sum(), BitRateUnit.Byte);
- }
-
- /**
- * Get paths of all usable physical nic.
- * @return All usable physical nic paths.
- */
- public static List<String> getUsablePhysicalNICs() {
- try (Stream<Path> stream = Files.list(Paths.get(NIC_PATH))) {
- return stream.filter(LinuxInfoUtils::isPhysicalNic)
- .filter(LinuxInfoUtils::isUsable)
- .map(path -> path.getFileName().toString())
- .collect(Collectors.toList());
- } catch (IOException e) {
- log.error("[LinuxInfo] Failed to find NICs", e);
- return Collections.emptyList();
- }
- }
-
- /**
- * Check this VM has nic speed.
- * @return Whether the VM has nic speed
- */
- public static boolean checkHasNicSpeeds() {
- List<String> physicalNICs = getUsablePhysicalNICs();
- if (CollectionUtils.isEmpty(physicalNICs)) {
- return false;
- }
- double totalNicLimit = getTotalNicLimit(physicalNICs,
BitRateUnit.Kilobit);
- return totalNicLimit > 0;
- }
-
- private static Path getReplacedNICPath(String template, String nic) {
- return Paths.get(String.format(template, nic));
- }
-
- private static String readTrimStringFromFile(Path path) throws IOException
{
- return new String(Files.readAllBytes(path),
StandardCharsets.UTF_8).trim();
- }
-
- private static long readLongFromFile(Path path) throws IOException {
- return Long.parseLong(readTrimStringFromFile(path));
- }
-
- private static double readDoubleFromFile(Path path) throws IOException {
- return Double.parseDouble(readTrimStringFromFile(path));
- }
-
- /**
- * TLV IFLA_OPERSTATE
- * contains RFC2863 state of the interface in numeric representation:
- * See <a
href="https://www.kernel.org/doc/Documentation/networking/operstates.txt">...</a>
- */
- enum Operstate {
- // Interface is in unknown state, neither driver nor userspace has set
- // operational state. Interface must be considered for user data as
- // setting operational state has not been implemented in every driver.
- UNKNOWN,
- // Interface is unable to transfer data on L1, f.e. ethernet is not
- // plugged or interface is ADMIN down.
- DOWN,
- // Interfaces stacked on an interface that is IF_OPER_DOWN show this
- // state (f.e. VLAN).
- LOWERLAYERDOWN,
- // Interface is L1 up, but waiting for an external event, f.e. for a
- // protocol to establish. (802.1X)
- DORMANT,
- // Interface is operational up and can be used.
- UP
- }
-
- @VisibleForTesting
- public static Object getMetrics() {
- return metrics;
- }
-
- @AllArgsConstructor
- public enum NICUsageType {
- // transport
- TX("/sys/class/net/%s/statistics/tx_bytes"),
- // receive
- RX("/sys/class/net/%s/statistics/rx_bytes");
- private final String template;
- }
-
- @Data
- @Builder
- public static class ResourceUsage {
- private final long total;
- private final long idle;
- private final long usage;
-
- public static ResourceUsage empty() {
- return ResourceUsage.builder()
- .total(-1)
- .idle(-1)
- .usage(-1).build();
- }
-
- public boolean isEmpty() {
- return this.total == -1 && idle == -1 && usage == -1;
- }
- }
-}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 9c9a00d7f5d..fc6c4116e0c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -18,27 +18,26 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import static org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.NICUsageType;
-import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForCGroup;
-import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getCpuUsageForEntireHost;
-import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalCpuLimit;
-import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalNicLimit;
-import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getTotalNicUsage;
-import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.getUsablePhysicalNICs;
-import static
org.apache.pulsar.broker.loadbalance.LinuxInfoUtils.isCGroupEnabled;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
import com.sun.management.OperatingSystemMXBean;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.BitRateUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.BrokerHostUsage;
-import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -55,9 +54,14 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
private double lastCpuTotalTime;
private OperatingSystemMXBean systemBean;
private SystemResourceUsage usage;
+
private final Optional<Double> overrideBrokerNicSpeedGbps;
private final boolean isCGroupsEnabled;
+ private static final String CGROUPS_CPU_USAGE_PATH =
"/sys/fs/cgroup/cpu/cpuacct.usage";
+ private static final String CGROUPS_CPU_LIMIT_QUOTA_PATH =
"/sys/fs/cgroup/cpu/cpu.cfs_quota_us";
+ private static final String CGROUPS_CPU_LIMIT_PERIOD_PATH =
"/sys/fs/cgroup/cpu/cpu.cfs_period_us";
+
public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
this(
pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(),
@@ -73,7 +77,15 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
this.lastCollection = 0L;
this.usage = new SystemResourceUsage();
this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
- this.isCGroupsEnabled = isCGroupEnabled();
+
+ boolean isCGroupsEnabled = false;
+ try {
+ isCGroupsEnabled =
Files.exists(Paths.get(CGROUPS_CPU_USAGE_PATH));
+ } catch (Exception e) {
+ log.warn("Failed to check cgroup CPU usage file: {}",
e.getMessage());
+ }
+ this.isCGroupsEnabled = isCGroupsEnabled;
+
// Call now to initialize values before the constructor returns
calculateBrokerHostUsage();
executorService.scheduleWithFixedDelay(catchingAndLoggingThrowables(this::calculateBrokerHostUsage),
@@ -88,17 +100,19 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
@Override
public void calculateBrokerHostUsage() {
- List<String> nics = getUsablePhysicalNICs();
- double totalNicLimit = getTotalNicLimitWithConfiguration(nics);
- double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX,
BitRateUnit.Kilobit);
- double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX,
BitRateUnit.Kilobit);
- double totalCpuLimit = getTotalCpuLimit(isCGroupsEnabled);
+ List<String> nics = getNics();
+ double totalNicLimit = getTotalNicLimitKbps(nics);
+ double totalNicUsageTx = getTotalNicUsageTxKb(nics);
+ double totalNicUsageRx = getTotalNicUsageRxKb(nics);
+ double totalCpuLimit = getTotalCpuLimit();
+
long now = System.currentTimeMillis();
double elapsedSeconds = (now - lastCollection) / 1000d;
if (elapsedSeconds <= 0) {
log.warn("elapsedSeconds {} is not expected, skip this round of
calculateBrokerHostUsage", elapsedSeconds);
return;
}
+
SystemResourceUsage usage = new SystemResourceUsage();
double cpuUsage = getTotalCpuUsage(elapsedSeconds);
@@ -114,21 +128,30 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
usage.setBandwidthIn(new ResourceUsage(nicUsageRx, totalNicLimit));
usage.setBandwidthOut(new ResourceUsage(nicUsageTx,
totalNicLimit));
}
- usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
lastTotalNicUsageTx = totalNicUsageTx;
lastTotalNicUsageRx = totalNicUsageRx;
lastCollection = System.currentTimeMillis();
this.usage = usage;
+ usage.setCpu(new ResourceUsage(cpuUsage, totalCpuLimit));
}
- @VisibleForTesting
- double getTotalNicLimitWithConfiguration(List<String> nics) {
- // Use the override value as configured. Return the total max speed
across all available NICs, converted
- // from Gbps into Kbps
- return overrideBrokerNicSpeedGbps.map(BitRateUnit.Gigabit::toKilobit)
- .map(speed -> speed * nics.size())
- .orElseGet(() -> getTotalNicLimit(nics, BitRateUnit.Kilobit));
+ private double getTotalCpuLimit() {
+ if (isCGroupsEnabled) {
+ try {
+ long quota = readLongFromFile(CGROUPS_CPU_LIMIT_QUOTA_PATH);
+ long period = readLongFromFile(CGROUPS_CPU_LIMIT_PERIOD_PATH);
+ if (quota > 0) {
+ return 100.0 * quota / period;
+ }
+ } catch (IOException e) {
+ log.warn("Failed to read CPU quotas from cgroups", e);
+ // Fallback to availableProcessors
+ }
+ }
+
+ // Fallback to JVM reported CPU quota
+ return 100 * Runtime.getRuntime().availableProcessors();
}
private double getTotalCpuUsage(double elapsedTimeSeconds) {
@@ -139,13 +162,6 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
}
}
- private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
- double usage = (double) getCpuUsageForCGroup();
- double currentUsage = usage - lastCpuUsage;
- lastCpuUsage = usage;
- return 100 * currentUsage / elapsedTimeSeconds /
TimeUnit.SECONDS.toNanos(1);
- }
-
/**
* Reads first line of /proc/stat to get total cpu usage.
*
@@ -155,18 +171,39 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
* </pre>
*
* Line is split in "words", filtering the first. The sum of all numbers
give the amount of cpu cycles used this
- * far. Real CPU usage should equal the sum subtracting the idle cycles,
this would include iowait, irq and steal.
+ * far. Real CPU usage should equal the sum substracting the idle cycles,
this would include iowait, irq and steal.
*/
private double getTotalCpuUsageForEntireHost() {
- LinuxInfoUtils.ResourceUsage cpuUsageForEntireHost =
getCpuUsageForEntireHost();
- if (cpuUsageForEntireHost.isEmpty()) {
+ try (Stream<String> stream = Files.lines(Paths.get("/proc/stat"))) {
+ String[] words = stream.findFirst().get().split("\\s+");
+
+ long total = Arrays.stream(words).filter(s ->
!s.contains("cpu")).mapToLong(Long::parseLong).sum();
+ long idle = Long.parseLong(words[4]);
+ long usage = total - idle;
+
+ double currentUsage = (usage - lastCpuUsage) / (total -
lastCpuTotalTime) * getTotalCpuLimit();
+
+ lastCpuUsage = usage;
+ lastCpuTotalTime = total;
+
+ return currentUsage;
+ } catch (IOException e) {
+ log.error("Failed to read CPU usage from /proc/stat", e);
+ return -1;
+ }
+ }
+
+ private double getTotalCpuUsageForCGroup(double elapsedTimeSeconds) {
+ try {
+ long usage = readLongFromFile(CGROUPS_CPU_USAGE_PATH);
+ double currentUsage = usage - lastCpuUsage;
+ lastCpuUsage = usage;
+
+ return 100 * currentUsage / elapsedTimeSeconds /
TimeUnit.SECONDS.toNanos(1);
+ } catch (IOException e) {
+ log.error("Failed to read CPU usage from {}",
CGROUPS_CPU_USAGE_PATH, e);
return -1;
}
- double currentUsage = (cpuUsageForEntireHost.getUsage() - lastCpuUsage)
- / (cpuUsageForEntireHost.getTotal() - lastCpuTotalTime) *
getTotalCpuLimit(isCGroupsEnabled);
- lastCpuUsage = cpuUsageForEntireHost.getUsage();
- lastCpuTotalTime = cpuUsageForEntireHost.getTotal();
- return currentUsage;
}
private ResourceUsage getMemUsage() {
@@ -175,4 +212,86 @@ public class LinuxBrokerHostUsageImpl implements
BrokerHostUsage {
return new ResourceUsage(total - free, total);
}
+ private List<String> getNics() {
+ try (Stream<Path> stream = Files.list(Paths.get("/sys/class/net/"))) {
+ return stream.filter(this::isPhysicalNic).map(path ->
path.getFileName().toString())
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ log.error("Failed to find NICs", e);
+ return Collections.emptyList();
+ }
+ }
+
+ public int getNicCount() {
+ return getNics().size();
+ }
+
+ private boolean isPhysicalNic(Path path) {
+ try {
+ if (path.toString().contains("/virtual/")) {
+ return false;
+ }
+ // Check the type to make sure it's ethernet (type "1")
+ String type = new String(Files.readAllBytes(path.resolve("type")),
StandardCharsets.UTF_8).trim();
+ // wireless NICs don't report speed, ignore them.
+ return Integer.parseInt(type) == 1;
+ } catch (Exception e) {
+ // Read type got error.
+ return false;
+ }
+ }
+
+ private Path getNicSpeedPath(String nic) {
+ return Paths.get(String.format("/sys/class/net/%s/speed", nic));
+ }
+
+ private double getTotalNicLimitKbps(List<String> nics) {
+ // Use the override value as configured. Return the total max speed
across all available NICs, converted
+ // from Gbps into Kbps
+ return overrideBrokerNicSpeedGbps.map(aDouble -> aDouble * nics.size()
* 1000 * 1000)
+ .orElseGet(() -> nics.stream().mapToDouble(nicPath -> {
+ // Nic speed is in Mbits/s, return kbits/s
+ try {
+ return Double.parseDouble(new
String(Files.readAllBytes(getNicSpeedPath(nicPath))));
+ } catch (IOException e) {
+ log.error(String.format("Failed to read speed for nic
%s, maybe you can set broker"
+ + " config
[loadBalancerOverrideBrokerNicSpeedGbps] to override it.", nicPath), e);
+ return 0d;
+ }
+ }).sum() * 1000);
+ }
+
+ private Path getNicTxPath(String nic) {
+ return
Paths.get(String.format("/sys/class/net/%s/statistics/tx_bytes", nic));
+ }
+
+ private Path getNicRxPath(String nic) {
+ return
Paths.get(String.format("/sys/class/net/%s/statistics/rx_bytes", nic));
+ }
+
+ private double getTotalNicUsageRxKb(List<String> nics) {
+ return nics.stream().mapToDouble(s -> {
+ try {
+ return Double.parseDouble(new
String(Files.readAllBytes(getNicRxPath(s))));
+ } catch (IOException e) {
+ log.error("Failed to read rx_bytes for NIC " + s, e);
+ return 0d;
+ }
+ }).sum() * 8d / 1000;
+ }
+
+ private double getTotalNicUsageTxKb(List<String> nics) {
+ return nics.stream().mapToDouble(s -> {
+ try {
+ return Double.parseDouble(new
String(Files.readAllBytes(getNicTxPath(s))));
+ } catch (IOException e) {
+ log.error("Failed to read tx_bytes for NIC " + s, e);
+ return 0d;
+ }
+ }).sum() * 8d / 1000;
+ }
+
+ private static long readLongFromFile(String path) throws IOException {
+ return Long.parseLong(new String(Files.readAllBytes(Paths.get(path)),
Charsets.UTF_8).trim());
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
index 512dc594f29..b9a017504e0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/EmbeddedPulsarCluster.java
@@ -101,7 +101,6 @@ public class EmbeddedPulsarCluster implements AutoCloseable
{
conf.setDefaultNumberOfNamespaceBundles(1);
conf.setMetadataStoreUrl(metadataStoreUrl);
conf.setBrokerShutdownTimeoutMs(0L);
- conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setWebServicePort(Optional.of(0));
conf.setNumExecutorThreadPoolSize(1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index c905f87bce1..ddb1ed4d469 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -68,7 +68,6 @@ public class PulsarServiceTest extends
MockedPulsarServiceBaseTest {
configuration.setClusterName("clusterName");
configuration.setFunctionsWorkerEnabled(true);
configuration.setBrokerShutdownTimeoutMs(0L);
-
configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
WorkerService expectedWorkerService = mock(WorkerService.class);
@Cleanup
PulsarService pulsarService = spy(new PulsarService(configuration, new
WorkerConfig(),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
index 0156b845ad4..36784372f6b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java
@@ -78,7 +78,6 @@ public class SLAMonitoringTest {
for (int i = 0; i < BROKER_COUNT; i++) {
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
-
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerShutdownTimeoutMs(0L);
config.setClusterName("my-cluster");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
index 19c174d0edf..ce2acf22aec 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
@@ -114,7 +114,6 @@ public class BrokerAdminClientTlsAuthTest extends
MockedPulsarServiceBaseTest {
/***** Start Broker 2 ******/
ServiceConfiguration conf = new ServiceConfiguration();
conf.setBrokerShutdownTimeoutMs(0L);
- conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePort(Optional.of(0));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index d88a411be34..b61b18ffb7b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -181,7 +181,6 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
protected void doInitConf() throws Exception {
this.conf.setBrokerShutdownTimeoutMs(0L);
- this.conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setAdvertisedAddress("localhost");
@@ -479,7 +478,6 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
configuration.setConfigurationStoreServers("localhost:3181");
configuration.setAllowAutoTopicCreationType("non-partitioned");
configuration.setBrokerShutdownTimeoutMs(0L);
-
configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
configuration.setBrokerServicePort(Optional.of(0));
configuration.setBrokerServicePortTls(Optional.of(0));
configuration.setWebServicePort(Optional.of(0));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
index eb23d6197fc..9e81a3e1db9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java
@@ -111,7 +111,6 @@ public class AntiAffinityNamespaceGroupTest {
config1.setWebServicePort(Optional.of(0));
config1.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config1.setBrokerShutdownTimeoutMs(0L);
- config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
config1.setFailureDomainsEnabled(true);
config1.setLoadBalancerEnabled(true);
@@ -133,7 +132,6 @@ public class AntiAffinityNamespaceGroupTest {
config2.setWebServicePort(Optional.of(0));
config2.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config2.setBrokerShutdownTimeoutMs(0L);
- config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
config2.setFailureDomainsEnabled(true);
config2.setAdvertisedAddress("localhost");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
index 3d49dd9df90..cb01ba43e86 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -69,7 +69,6 @@ public class LeaderElectionServiceTest {
final String clusterName = "elect-test";
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
config.setClusterName(clusterName);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index b7d6a391fb0..a64f28384d3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
@@ -125,7 +125,6 @@ public class LoadBalancerTest {
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
-
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress(localhost+i);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java
index a1c52e13c69..50bb6b37760 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadReportNetworkLimitTest.java
@@ -22,6 +22,7 @@ import static org.testng.Assert.assertEquals;
import java.util.Optional;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.loadbalance.impl.LinuxBrokerHostUsageImpl;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -29,21 +30,17 @@ import org.testng.annotations.Test;
@Test(groups = "broker")
public class LoadReportNetworkLimitTest extends MockedPulsarServiceBaseTest {
- int usableNicCount;
-
- @Override
- protected void doInitConf() throws Exception {
- super.doInitConf();
- conf.setLoadBalancerEnabled(true);
- conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(5.4));
- }
+ int nicCount;
@BeforeClass
@Override
public void setup() throws Exception {
+ conf.setLoadBalancerEnabled(true);
+ conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(5.4));
super.internalSetup();
+
if (SystemUtils.IS_OS_LINUX) {
- usableNicCount = LinuxInfoUtils.getUsablePhysicalNICs().size();
+ nicCount = new LinuxBrokerHostUsageImpl(pulsar).getNicCount();
}
}
@@ -60,12 +57,12 @@ public class LoadReportNetworkLimitTest extends
MockedPulsarServiceBaseTest {
LoadManagerReport report = admin.brokerStats().getLoadReport();
if (SystemUtils.IS_OS_LINUX) {
- assertEquals(report.getBandwidthIn().limit, usableNicCount * 5.4 *
1000 * 1000, 0.0001);
- assertEquals(report.getBandwidthOut().limit, usableNicCount * 5.4
* 1000 * 1000, 0.0001);
+ assertEquals(report.getBandwidthIn().limit, nicCount * 5.4 * 1000
* 1000);
+ assertEquals(report.getBandwidthOut().limit, nicCount * 5.4 * 1000
* 1000);
} else {
// On non-Linux system we don't report the network usage
- assertEquals(report.getBandwidthIn().limit, -1.0, 0.0001);
- assertEquals(report.getBandwidthOut().limit, -1.0, 0.0001);
+ assertEquals(report.getBandwidthIn().limit, -1.0);
+ assertEquals(report.getBandwidthOut().limit, -1.0);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
deleted file mode 100644
index 80ab6490000..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleBrokerStartTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.loadbalance;
-
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Optional;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.PulsarServerException;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
-import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-@Slf4j
-@Test(groups = "broker")
-public class SimpleBrokerStartTest {
-
- public void testHasNICSpeed() throws Exception {
- if (!LinuxInfoUtils.isLinux()) {
- return;
- }
- // Start local bookkeeper ensemble
- @Cleanup("stop")
- LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0,
() -> 0);
- bkEnsemble.start();
- // Start broker
- ServiceConfiguration config = new ServiceConfiguration();
- config.setClusterName("use");
- config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
- config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- config.setBrokerServicePort(Optional.of(0));
- config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
- config.setBrokerServicePortTls(Optional.of(0));
- config.setWebServicePortTls(Optional.of(0));
- config.setAdvertisedAddress("localhost");
- boolean hasNicSpeeds = LinuxInfoUtils.checkHasNicSpeeds();
- if (hasNicSpeeds) {
- @Cleanup
- PulsarService pulsarService = new PulsarService(config);
- pulsarService.start();
- }
- }
-
- public void testNoNICSpeed() throws Exception {
- if (!LinuxInfoUtils.isLinux()) {
- return;
- }
- // Start local bookkeeper ensemble
- @Cleanup("stop")
- LocalBookkeeperEnsemble bkEnsemble = new LocalBookkeeperEnsemble(3, 0,
() -> 0);
- bkEnsemble.start();
- // Start broker
- ServiceConfiguration config = new ServiceConfiguration();
- config.setClusterName("use");
- config.setWebServicePort(Optional.of(0));
- config.setMetadataStoreUrl("zk:127.0.0.1:" +
bkEnsemble.getZookeeperPort());
- config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- config.setBrokerServicePort(Optional.of(0));
- config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
- config.setBrokerServicePortTls(Optional.of(0));
- config.setWebServicePortTls(Optional.of(0));
- config.setAdvertisedAddress("localhost");
- boolean hasNicSpeeds = LinuxInfoUtils.checkHasNicSpeeds();
- if (!hasNicSpeeds) {
- @Cleanup
- PulsarService pulsarService = new PulsarService(config);
- try {
- pulsarService.start();
- fail("unexpected behaviour");
- } catch (PulsarServerException ex) {
- assertTrue(ex.getCause() instanceof IllegalStateException);
- }
- }
- }
-
-
- @Test
- public void testCGroupMetrics() {
- if (!LinuxInfoUtils.isLinux()) {
- return;
- }
-
- boolean existsCGroup = Files.exists(Paths.get("/sys/fs/cgroup"));
- boolean cGroupEnabled = LinuxInfoUtils.isCGroupEnabled();
- Assert.assertEquals(cGroupEnabled, existsCGroup);
-
- double totalCpuLimit = LinuxInfoUtils.getTotalCpuLimit(cGroupEnabled);
- log.info("totalCpuLimit: {}", totalCpuLimit);
- Assert.assertTrue(totalCpuLimit > 0.0);
-
- if (cGroupEnabled) {
- Assert.assertNotNull(LinuxInfoUtils.getMetrics());
-
- long cpuUsageForCGroup = LinuxInfoUtils.getCpuUsageForCGroup();
- log.info("cpuUsageForCGroup: {}", cpuUsageForCGroup);
- Assert.assertTrue(cpuUsageForCGroup > 0);
- }
- }
-
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index f71aa9a5225..ceff0d8aaed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -111,7 +111,6 @@ public class SimpleLoadManagerImplTest {
config1.setWebServicePort(Optional.of(0));
config1.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config1.setBrokerShutdownTimeoutMs(0L);
- config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config1.setBrokerServicePortTls(Optional.of(0));
@@ -131,7 +130,6 @@ public class SimpleLoadManagerImplTest {
config2.setWebServicePort(Optional.of(0));
config2.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config2.setBrokerShutdownTimeoutMs(0L);
- config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
config2.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config2.setBrokerServicePortTls(Optional.of(0));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index 3b9c839775a..9ff266ba96c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -64,7 +64,6 @@ public class BundleSplitterTaskTest {
config.setAdvertisedAddress("localhost");
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
deleted file mode 100644
index b6a625a4fe1..00000000000
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.loadbalance.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import lombok.Cleanup;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-@Slf4j
-public class LinuxBrokerHostUsageImplTest {
-
- @Test
- public void checkOverrideBrokerNicSpeedGbps() {
- @Cleanup("shutdown")
- ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
- LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
- new LinuxBrokerHostUsageImpl(1, Optional.of(3.0),
executorService);
- List<String> nics = new ArrayList<>();
- nics.add("1");
- nics.add("2");
- nics.add("3");
- double totalLimit =
linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
- Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
- }
-
- @Test
- public void testCpuUsage() throws InterruptedException {
- if (!LinuxInfoUtils.isLinux()) {
- return;
- }
-
- @Cleanup("shutdown")
- ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
- LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
- new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE,
Optional.empty(), executorService);
-
- linuxBrokerHostUsage.calculateBrokerHostUsage();
- TimeUnit.SECONDS.sleep(1);
- linuxBrokerHostUsage.calculateBrokerHostUsage();
-
- double usage =
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().usage;
- double limit =
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().limit;
- float percentUsage =
linuxBrokerHostUsage.getBrokerHostUsage().getCpu().percentUsage();
-
- Assert.assertTrue(usage > 0);
- Assert.assertTrue(limit > 0);
- Assert.assertTrue(limit >= usage);
- Assert.assertTrue(percentUsage > 0);
-
- log.info("usage: {}, limit: {}, percentUsage: {}", usage, limit,
percentUsage);
- }
-}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index dae160b1421..be87320e648 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -150,7 +150,6 @@ public class ModularLoadManagerImplTest {
config1.setAdvertisedAddress("localhost");
config1.setBrokerShutdownTimeoutMs(0L);
- config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
@@ -170,7 +169,6 @@ public class ModularLoadManagerImplTest {
config2.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config2.setAdvertisedAddress("localhost");
config2.setBrokerShutdownTimeoutMs(0L);
- config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
@@ -624,7 +622,6 @@ public class ModularLoadManagerImplTest {
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
PulsarService pulsar = new PulsarService(config);
// create znode using different zk-session
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
index c24164acbbf..c25c6c23fb0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -109,7 +109,6 @@ public class OwnerShipForCurrentServerTestBase {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
conf.setBrokerShutdownTimeoutMs(0L);
- conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index abe58585d44..1d2878ca644 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -101,7 +101,6 @@ public class BacklogQuotaManagerTest {
config.setWebServicePort(Optional.of(0));
config.setClusterName("usc");
config.setBrokerShutdownTimeoutMs(0L);
-
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setAuthorizationEnabled(false);
config.setAuthenticationEnabled(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index de7d2bba2d4..b5aaafa6e21 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -81,7 +81,6 @@ public abstract class BkEnsemblesTestBase extends
TestRetrySupport {
config.setWebServicePort(Optional.of(0));
config.setClusterName("usc");
config.setBrokerShutdownTimeoutMs(0L);
-
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setAuthorizationEnabled(false);
config.setAuthenticationEnabled(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index a5d8e62c3bb..6a9ec94de84 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -145,11 +145,8 @@ public class BrokerBookieIsolationTest {
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(0));
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
@@ -311,7 +308,6 @@ public class BrokerBookieIsolationTest {
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
@@ -453,7 +449,6 @@ public class BrokerBookieIsolationTest {
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
index 480894b51f8..f6a90778c6f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MaxMessageSizeTest.java
@@ -64,7 +64,6 @@ public class MaxMessageSizeTest {
configuration.setWebServicePort(Optional.of(0));
configuration.setClusterName("max_message_test");
configuration.setBrokerShutdownTimeoutMs(0L);
-
configuration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
configuration.setBrokerServicePort(Optional.of(0));
configuration.setAuthorizationEnabled(false);
configuration.setAuthenticationEnabled(false);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index 5ec0694ab22..c53d921be11 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -45,7 +45,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -120,7 +119,6 @@ public class PersistentDispatcherFailoverConsumerTest {
executor =
OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-failover-test").build();
ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
svcConfig.setBrokerShutdownTimeoutMs(0L);
- svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setClusterName("pulsar-cluster");
pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
store = MetadataStoreFactory.create("memory:local",
MetadataStoreConfig.builder().build());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index d81b3041003..1ced87a5a2a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -30,7 +30,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -80,7 +79,6 @@ public class PersistentTopicConcurrentTest extends
MockedBookKeeperTestCase {
super.setUp(m);
ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
svcConfig.setBrokerShutdownTimeoutMs(0L);
- svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
@Cleanup
PulsarService pulsar =
spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 0e7a26f0cf9..98be79a932a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -178,7 +178,6 @@ public class PersistentTopicTest extends
MockedBookKeeperTestCase {
ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
svcConfig.setAdvertisedAddress("localhost");
svcConfig.setBrokerShutdownTimeoutMs(0L);
- svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setMaxUnackedMessagesPerConsumer(50000);
svcConfig.setClusterName("pulsar-cluster");
pulsar = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 852ea665977..4270ec448b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -270,7 +270,6 @@ public abstract class ReplicatorTestBase extends
TestRetrySupport {
config.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(),
TimeUnit.SECONDS));
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setTlsCertificateFilePath(brokerCertFilePath);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index eb1cc05434c..c461dd102d0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -193,7 +193,6 @@ public class ServerCnxTest {
executor = OrderedExecutor.newBuilder().numThreads(1).build();
svcConfig = new ServiceConfiguration();
svcConfig.setBrokerShutdownTimeoutMs(0L);
- svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
svcConfig.setBacklogQuotaCheckEnabled(false);
svcConfig.setClusterName("use");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index ccdfb7aa77c..8c68c2de91a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -92,7 +92,6 @@ public class TopicOwnerTest {
for (int i = 0; i < BROKER_COUNT; i++) {
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
-
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 2d1e72d45c8..0e2d300f7a8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -112,7 +112,6 @@ public class PersistentSubscriptionTest {
ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
svcConfig.setBrokerShutdownTimeoutMs(0L);
- svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
svcConfig.setTransactionCoordinatorEnabled(true);
svcConfig.setClusterName("pulsar-cluster");
pulsarMock = spyWithClassAndConstructorArgs(PulsarService.class,
svcConfig);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index e8da743a29b..25c555f09b9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -170,7 +170,6 @@ public abstract class TransactionTestBase extends
TestRetrySupport {
conf.setBookkeeperClientExposeStatsToPrometheus(true);
conf.setForceDeleteNamespaceAllowed(true);
conf.setBrokerShutdownTimeoutMs(0L);
- conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
index e017c28693a..aba5b044583 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java
@@ -66,7 +66,6 @@ public abstract class TransactionMetaStoreTestBase extends
TestRetrySupport {
for (int i = 0; i < BROKER_COUNT; i++) {
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
-
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
index 4a2fe5b69f4..0cd72f19a4d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceTest.java
@@ -397,7 +397,6 @@ public class WebServiceTest {
ServiceConfiguration config = new ServiceConfiguration();
config.setAdvertisedAddress("localhost");
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setWebServicePort(Optional.of(0));
if (enableTls) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 71f3ae73ffc..ce4c3269540 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -155,7 +155,6 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
/**** start broker-2 ****/
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setBrokerShutdownTimeoutMs(0L);
- conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
@@ -272,7 +271,6 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerShutdownTimeoutMs(0L);
- conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
@@ -366,7 +364,6 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerShutdownTimeoutMs(0L);
- conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
@@ -443,7 +440,6 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
/**** start broker-2 ****/
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setBrokerShutdownTimeoutMs(0L);
- conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setBrokerServicePort(Optional.of(0));
@@ -560,7 +556,6 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerShutdownTimeoutMs(0L);
- conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf2.setBrokerServicePort(Optional.of(0));
conf2.setWebServicePort(Optional.of(0));
conf2.setAdvertisedAddress("localhost");
@@ -665,7 +660,6 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
// (1) Start broker-1
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setBrokerShutdownTimeoutMs(0L);
- conf2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf2.setAdvertisedAddress("localhost");
conf2.setBrokerShutdownTimeoutMs(0L);
conf2.setBrokerServicePort(Optional.of(0));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
index faf747f36ed..c8325908c6c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java
@@ -86,7 +86,6 @@ public class ClientDeduplicationFailureTest {
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setTlsAllowInsecureConnection(true);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 46ea5bf0452..6548dbe26ef 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -944,7 +944,6 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(),
TimeUnit.SECONDS));
config1.setBrokerShutdownTimeoutMs(0L);
-
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config1.setAllowAutoTopicCreationType("non-partitioned");
@@ -971,7 +970,6 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(),
TimeUnit.SECONDS));
config2.setBrokerShutdownTimeoutMs(0L);
-
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
config2.setAllowAutoTopicCreationType("non-partitioned");
@@ -998,7 +996,6 @@ public class NonPersistentTopicTest extends
ProducerConsumerBase {
config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
inSec(getBrokerServicePurgeInactiveFrequency(),
TimeUnit.SECONDS));
config3.setBrokerShutdownTimeoutMs(0L);
-
config3.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config3.setBrokerServicePort(Optional.of(0));
config3.setAllowAutoTopicCreationType("non-partitioned");
pulsar3 = new PulsarService(config3);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
index 80408910141..e6d050f7985 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ServiceUrlProviderTest.java
@@ -107,7 +107,6 @@ public class ServiceUrlProviderTest extends
ProducerConsumerBase {
PulsarService pulsarService1 = pulsar;
conf.setBrokerShutdownTimeoutMs(0L);
- conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setWebServicePort(Optional.of(0));
restartBroker();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 45502165180..173794a2cfb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -130,7 +130,6 @@ public class PulsarFunctionE2ESecurityTest {
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress("localhost");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 3f019a39c08..7029dc222db 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -197,7 +197,6 @@ public class PulsarFunctionLocalRunTest {
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 5d830a67778..d985241e290 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -127,7 +127,6 @@ public class PulsarFunctionPublishTest {
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 043c2ecccd6..f4a27506c2e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -90,7 +90,6 @@ public class PulsarFunctionTlsTest {
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
-
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setWebServicePort(Optional.empty());
config.setWebServicePortTls(Optional.of(webPort));
config.setBrokerServicePort(Optional.empty());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 9ba2ccde471..a9fd06dc058 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -94,7 +94,6 @@ public class PulsarWorkerAssignmentTest {
config.setWebServicePort(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress("localhost");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index 7c0f2f0bbe1..9e1edea2f80 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -118,7 +118,6 @@ public abstract class AbstractPulsarE2ETest {
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index afe16c63ec2..b3126defa11 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -101,7 +101,6 @@ public class PulsarFunctionAdminTest {
config.setWebServicePortTls(Optional.of(0));
config.setZookeeperServers("127.0.0.1" + ":" +
bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index dd097ce034c..9f6d9f65dbb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -36,7 +36,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -108,7 +107,6 @@ public class PulsarFunctionTlsTest {
config = spy(ServiceConfiguration.class);
config.setBrokerShutdownTimeoutMs(0L);
- config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setClusterName("use");
Set<String> superUsers = Sets.newHashSet("superUser", "admin");
config.setSuperUserRoles(superUsers);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 36ec3086823..2e6f403862b 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -176,7 +176,6 @@ public class PulsarCluster {
.withEnv("configurationStoreServers", CSContainer.NAME
+ ":" + CS_PORT)
.withEnv("clusterName", clusterName)
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
- .withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey")
.withEnv("AWS_SECRET_KEY", "secretkey")