This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fff41c31c5d [fix][broker] Handle the case when `getOwnedServiceUnits` 
fails gracefully (#23119)
fff41c31c5d is described below

commit fff41c31c5db70402fa9e31b0e96d8c93f7d5198
Author: Kai Wang <[email protected]>
AuthorDate: Tue Aug 6 12:05:41 2024 +0800

    [fix][broker] Handle the case when `getOwnedServiceUnits` fails gracefully 
(#23119)
---
 .../channel/ServiceUnitStateChannelImpl.java        |  8 ++++++--
 .../pulsar/broker/namespace/NamespaceService.java   |  6 +++++-
 .../channel/ServiceUnitStateChannelTest.java        | 13 +++++++++++++
 .../namespace/NamespaceOwnershipListenerTests.java  | 21 +++++++++++++++++++++
 4 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 02e641f69a7..e7804806d9c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -130,7 +130,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private final String brokerId;
     private final Map<String, CompletableFuture<Void>> cleanupJobs;
     private final StateChangeListeners stateChangeListeners;
-    private ExtensibleLoadManagerImpl loadManager;
     private BrokerRegistry brokerRegistry;
     private LeaderElectionService leaderElectionService;
     private TableView<ServiceUnitStateData> tableview;
@@ -296,7 +295,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 log.warn("Failed to find the channel leader.");
             }
             this.channelState = LeaderElectionServiceStarted;
-            loadManager = getLoadManager();
 
             if (producer != null) {
                 producer.close();
@@ -561,6 +559,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private Optional<String> getOwner(String serviceUnit) {
+        if (!validateChannelState(Started, true)) {
+            throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
+        }
         ServiceUnitStateData data = tableview.get(serviceUnit);
         ServiceUnitState state = state(data);
         switch (state) {
@@ -1715,6 +1716,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     @Override
     public Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet() 
{
+        if (!validateChannelState(Started, true)) {
+            throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
+        }
         return tableview.entrySet();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index b02653b53ef..b8ca38624d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1301,7 +1301,11 @@ public class NamespaceService implements AutoCloseable {
             }
         }
         pulsar.runWhenReadyForIncomingRequests(() -> {
-            getOwnedServiceUnits().forEach(bundle -> 
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+            try {
+                getOwnedServiceUnits().forEach(bundle -> 
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+            } catch (Exception e) {
+                LOG.error("Failed to notify namespace bundle ownership 
listener", e);
+            }
         });
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 24f27a58e76..e8f0c9e43a8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThrows;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.expectThrows;
+import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -1759,6 +1760,18 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
     }
 
+    @Test(priority = 20)
+    public void testGetOwnershipEntrySetBeforeChannelStart() {
+        var tmpChannel = new ServiceUnitStateChannelImpl(pulsar1);
+        try {
+            tmpChannel.getOwnershipEntrySet();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof IllegalStateException);
+            assertEquals("Invalid channel state:Constructed", e.getMessage());
+        }
+    }
+
 
     private static ConcurrentHashMap<String, 
CompletableFuture<Optional<String>>> getOwnerRequests(
             ServiceUnitStateChannel channel) throws IllegalAccessException {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
index 02787aa1435..8fc19432eb3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
@@ -35,6 +35,8 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertTrue;
 
 @Test(groups = "broker")
@@ -102,6 +104,25 @@ public class NamespaceOwnershipListenerTests extends 
BrokerTestBase {
         deleteNamespaceWithRetry(namespace, false);
     }
 
+    @Test
+    public void testAddNamespaceBundleOwnershipListenerBeforeLBStart() {
+        NamespaceService namespaceService = spy(new NamespaceService(pulsar));
+        doThrow(new IllegalStateException("The LM is not initialized"))
+                .when(namespaceService).getOwnedServiceUnits();
+        namespaceService.addNamespaceBundleOwnershipListener(new 
NamespaceBundleOwnershipListener() {
+            @Override
+            public void onLoad(NamespaceBundle bundle) {}
+
+            @Override
+            public void unLoad(NamespaceBundle bundle) {}
+
+            @Override
+            public boolean test(NamespaceBundle namespaceBundle) {
+                return false;
+            }
+        });
+    }
+
     @Test
     public void testGetAllPartitions() throws Exception {
         final String namespace = "prop/" + UUID.randomUUID().toString();

Reply via email to