This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ed37afa0ae [improve][broker] don't do load shedding when metadata 
service not available (#23040)
3ed37afa0ae is described below

commit 3ed37afa0ae03e0df04378d82c3b62863dc510ab
Author: congbo <[email protected]>
AuthorDate: Mon Jul 22 15:20:26 2024 +0800

    [improve][broker] don't do load shedding when metadata service not 
available (#23040)
    
    ### Motivation
    
    don't do load shedding when metadata service not available.
    
    if unload bundle when metadata service not available, these topics which in 
this bundle can't recover the current ledger and send read again
    ### Modifications
    
    1. check metadata service state when do load shedding task
    2. do not interrupt the task are doing in the same time
---
 .../java/org/apache/pulsar/broker/PulsarService.java   |  3 ++-
 .../pulsar/broker/loadbalance/LoadSheddingTask.java    | 12 +++++++++++-
 .../broker/loadbalance/SimpleLoadManagerImplTest.java  | 18 +++++++++++++++++-
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 848484fe376..c623f5d4e5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1320,7 +1320,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
         if (isRunning()) {
             long resourceQuotaUpdateInterval = TimeUnit.MINUTES
                     
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
-            loadSheddingTask = new LoadSheddingTask(loadManager, 
loadManagerExecutor, config);
+            loadSheddingTask = new LoadSheddingTask(loadManager, 
loadManagerExecutor,
+                    config, getManagedLedgerFactory());
             loadSheddingTask.start();
             loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
                     new LoadResourceQuotaUpdaterTask(loadManager), 
resourceQuotaUpdateInterval,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
index eb7eacec608..25a0a2752d1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadSheddingTask.java
@@ -22,6 +22,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,12 +42,16 @@ public class LoadSheddingTask implements Runnable {
 
     private volatile ScheduledFuture<?> future;
 
+    private final ManagedLedgerFactory factory;
+
     public LoadSheddingTask(AtomicReference<LoadManager> loadManager,
                             ScheduledExecutorService loadManagerExecutor,
-                            ServiceConfiguration config) {
+                            ServiceConfiguration config,
+                            ManagedLedgerFactory factory) {
         this.loadManager = loadManager;
         this.loadManagerExecutor = loadManagerExecutor;
         this.config = config;
+        this.factory = factory;
     }
 
     @Override
@@ -53,6 +59,10 @@ public class LoadSheddingTask implements Runnable {
         if (isCancel) {
             return;
         }
+        if (factory instanceof ManagedLedgerFactoryImpl
+                && !((ManagedLedgerFactoryImpl) 
factory).isMetadataServiceAvailable()) {
+            return;
+        }
         try {
             loadManager.get().doLoadShedding();
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 8f7aa17d0d7..acf096751d7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgsRecordingInvocations;
 import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -489,7 +491,21 @@ public class SimpleLoadManagerImplTest {
         task1.run();
         verify(loadManager, times(1)).writeResourceQuotasToZooKeeper();
 
-        LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, 
null);
+        LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, 
null, null);
+        task2.run();
+        verify(loadManager, times(1)).doLoadShedding();
+    }
+
+    @Test
+    public void testMetadataServiceNotAvailable() {
+        LoadManager loadManager = mock(LoadManager.class);
+        AtomicReference<LoadManager> atomicLoadManager = new 
AtomicReference<>(loadManager);
+        ManagedLedgerFactoryImpl factory = 
mock(ManagedLedgerFactoryImpl.class);
+        doReturn(false).when(factory).isMetadataServiceAvailable();
+        LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager, null, 
null, factory);
+        task2.run();
+        verify(loadManager, times(0)).doLoadShedding();
+        doReturn(true).when(factory).isMetadataServiceAvailable();
         task2.run();
         verify(loadManager, times(1)).doLoadShedding();
     }

Reply via email to