This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a272c5b76a0 [improve] [broker] Add overrideBrokerNics for adaptation 
of heterogeneous network environments (#24883)
a272c5b76a0 is described below

commit a272c5b76a03631c0157d0771e34af8981a81874
Author: dong_zhong_hua <[email protected]>
AuthorDate: Tue Dec 2 14:18:12 2025 +0800

    [improve] [broker] Add overrideBrokerNics for adaptation of heterogeneous 
network environments (#24883)
    
    Co-authored-by: dongzhonghua03 <[email protected]>
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 +++-
 .../loadbalance/impl/LinuxBrokerHostUsageImpl.java |  6 +++-
 .../pulsar/broker/tools/LoadReportCommand.java     |  3 +-
 .../impl/LinuxBrokerHostUsageImplTest.java         | 40 ++++++++++++++++++++--
 4 files changed, 50 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a3183316ccf..c6bedcfd01d 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2994,7 +2994,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         doc = "Option to override the auto-detected network interfaces max 
speed"
     )
     private Optional<Double> loadBalancerOverrideBrokerNicSpeedGbps = 
Optional.empty();
-
+    @FieldContext(
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Option to override the auto-detected network interfaces"
+    )
+    private List<String> loadBalancerOverrideBrokerNics = new ArrayList<>();
     @FieldContext(
         category = CATEGORY_LOAD_BALANCER,
         dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
index 6d0e6bb9073..86634c4fdb2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImpl.java
@@ -56,23 +56,27 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
     private OperatingSystemMXBean systemBean;
     private SystemResourceUsage usage;
     private final Optional<Double> overrideBrokerNicSpeedGbps;
+    private final List<String> overrideBrokerNics;
     private final boolean isCGroupsEnabled;
 
     public LinuxBrokerHostUsageImpl(PulsarService pulsar) {
         this(
             
pulsar.getConfiguration().getLoadBalancerHostUsageCheckIntervalMinutes(),
             
pulsar.getConfiguration().getLoadBalancerOverrideBrokerNicSpeedGbps(),
+            pulsar.getConfiguration().getLoadBalancerOverrideBrokerNics(),
             pulsar.getLoadManagerExecutor()
         );
     }
 
     public LinuxBrokerHostUsageImpl(int hostUsageCheckIntervalMin,
                                     Optional<Double> 
overrideBrokerNicSpeedGbps,
+                                    List<String> overrideBrokerNics,
                                     ScheduledExecutorService executorService) {
         this.systemBean = (OperatingSystemMXBean) 
ManagementFactory.getOperatingSystemMXBean();
         this.lastCollection = 0L;
         this.usage = new SystemResourceUsage();
         this.overrideBrokerNicSpeedGbps = overrideBrokerNicSpeedGbps;
+        this.overrideBrokerNics = overrideBrokerNics;
         this.isCGroupsEnabled = isCGroupEnabled();
         // Call now to initialize values before the constructor returns
         calculateBrokerHostUsage();
@@ -88,7 +92,7 @@ public class LinuxBrokerHostUsageImpl implements 
BrokerHostUsage {
 
     @Override
     public void calculateBrokerHostUsage() {
-        List<String> nics = getUsablePhysicalNICs();
+        List<String> nics = !overrideBrokerNics.isEmpty() ? overrideBrokerNics 
: getUsablePhysicalNICs();
         double totalNicLimit = getTotalNicLimitWithConfiguration(nics);
         double totalNicUsageTx = getTotalNicUsage(nics, NICUsageType.TX, 
BitRateUnit.Kilobit);
         double totalNicUsageRx = getTotalNicUsage(nics, NICUsageType.RX, 
BitRateUnit.Kilobit);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
index f1f4a917571..bc240a05d29 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/tools/LoadReportCommand.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.tools;
 
+import java.util.ArrayList;
 import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -67,7 +68,7 @@ public class LoadReportCommand implements Callable<Integer> {
         try {
             if (isLinux) {
                 hostUsage = new LinuxBrokerHostUsageImpl(
-                    Integer.MAX_VALUE, Optional.empty(), scheduler
+                    Integer.MAX_VALUE, Optional.empty(), new ArrayList<>(), 
scheduler
                 );
             } else {
                 hostUsage = new GenericBrokerHostUsageImpl(
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
index 563f707c445..f6d0a53d8c2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/LinuxBrokerHostUsageImplTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -26,7 +29,11 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -38,7 +45,7 @@ public class LinuxBrokerHostUsageImplTest {
         @Cleanup("shutdown")
         ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
         LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
-                new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), 
executorService);
+                new LinuxBrokerHostUsageImpl(1, Optional.of(3.0), new 
ArrayList<>(), executorService);
         List<String> nics = new ArrayList<>();
         nics.add("1");
         nics.add("2");
@@ -47,6 +54,34 @@ public class LinuxBrokerHostUsageImplTest {
         Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
     }
 
+    @Test
+    public void checkOverrideBrokerNics() {
+        try (MockedStatic<LinuxInfoUtils> mockedUtils = 
Mockito.mockStatic(LinuxInfoUtils.class)) {
+            mockedUtils.when(() -> LinuxInfoUtils.getTotalNicUsage(any(), 
any(), any())).thenReturn(3.0d);
+            
mockedUtils.when(LinuxInfoUtils::getCpuUsageForEntireHost).thenReturn(LinuxInfoUtils.ResourceUsage.empty());
+            List<String> nics = new ArrayList<>();
+            nics.add("1");
+            nics.add("2");
+            nics.add("3");
+            ServiceConfiguration config = new ServiceConfiguration();
+            
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(3.0d));
+            config.setLoadBalancerOverrideBrokerNics(nics);
+            PulsarService pulsarService = mock(PulsarService.class);
+            when(pulsarService.getConfiguration()).thenReturn(config);
+            @Cleanup("shutdown")
+            ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+            
when(pulsarService.getLoadManagerExecutor()).thenReturn(executorService);
+            LinuxBrokerHostUsageImpl linuxBrokerHostUsage = new 
LinuxBrokerHostUsageImpl(pulsarService);
+            linuxBrokerHostUsage.calculateBrokerHostUsage();
+            double totalLimit = 
linuxBrokerHostUsage.getTotalNicLimitWithConfiguration(nics);
+            Assert.assertEquals(totalLimit, 3.0 * 1000 * 1000 * 3);
+            double totalNicLimitRx = 
linuxBrokerHostUsage.getBrokerHostUsage().getBandwidthIn().limit;
+            double totalNicLimitTx = 
linuxBrokerHostUsage.getBrokerHostUsage().getBandwidthOut().limit;
+            Assert.assertEquals(totalNicLimitRx, 3.0 * 1000 * 1000 * 3);
+            Assert.assertEquals(totalNicLimitTx, 3.0 * 1000 * 1000 * 3);
+        }
+    }
+
     @Test
     public void testCpuUsage() throws InterruptedException {
         if (!LinuxInfoUtils.isLinux()) {
@@ -56,7 +91,8 @@ public class LinuxBrokerHostUsageImplTest {
         @Cleanup("shutdown")
         ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
         LinuxBrokerHostUsageImpl linuxBrokerHostUsage =
-                new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, 
Optional.empty(), executorService);
+                new LinuxBrokerHostUsageImpl(Integer.MAX_VALUE, 
Optional.empty(),
+                        new ArrayList<>(), executorService);
 
         linuxBrokerHostUsage.calculateBrokerHostUsage();
         TimeUnit.SECONDS.sleep(1);

Reply via email to