This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b6da3335c94a4439a6fedf9f1bcc8afff96045c6
Author: Kai Wang <[email protected]>
AuthorDate: Tue Aug 6 12:05:41 2024 +0800

    [fix][broker] Handle the case when `getOwnedServiceUnits` fails gracefully 
(#23119)
    
    (cherry picked from commit 4a44f45783772780000878cdddbdc2aefd08bcfe)
---
 .../channel/ServiceUnitStateChannelImpl.java        |  8 ++++++--
 .../pulsar/broker/namespace/NamespaceService.java   |  6 +++++-
 .../channel/ServiceUnitStateChannelTest.java        | 13 +++++++++++++
 .../namespace/NamespaceOwnershipListenerTests.java  | 21 +++++++++++++++++++++
 4 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 5454edac14d..965d03b064f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -127,7 +127,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private final String brokerId;
     private final Map<String, CompletableFuture<Void>> cleanupJobs;
     private final StateChangeListeners stateChangeListeners;
-    private ExtensibleLoadManagerImpl loadManager;
     private BrokerRegistry brokerRegistry;
     private LeaderElectionService leaderElectionService;
     private TableView<ServiceUnitStateData> tableview;
@@ -285,7 +284,6 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 log.warn("Failed to find the channel leader.");
             }
             this.channelState = LeaderElectionServiceStarted;
-            loadManager = getLoadManager();
 
             if (producer != null) {
                 producer.close();
@@ -554,6 +552,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     }
 
     private Optional<String> getOwner(String serviceUnit) {
+        if (!validateChannelState(Started, true)) {
+            throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
+        }
         ServiceUnitStateData data = tableview.get(serviceUnit);
         ServiceUnitState state = state(data);
         switch (state) {
@@ -1758,6 +1759,9 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
 
     @Override
     public Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet() 
{
+        if (!validateChannelState(Started, true)) {
+            throw new IllegalStateException("Invalid channel state:" + 
channelState.name());
+        }
         return tableview.entrySet();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 539f899393a..589195691c6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1304,7 +1304,11 @@ public class NamespaceService implements AutoCloseable {
             }
         }
         pulsar.runWhenReadyForIncomingRequests(() -> {
-            getOwnedServiceUnits().forEach(bundle -> 
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+            try {
+                getOwnedServiceUnits().forEach(bundle -> 
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+            } catch (Exception e) {
+                LOG.error("Failed to notify namespace bundle ownership 
listener", e);
+            }
         });
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index aef68aff9a2..e569f0d32d5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.expectThrows;
+import static org.testng.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -1762,6 +1763,18 @@ public class ServiceUnitStateChannelTest extends 
MockedPulsarServiceBaseTest {
 
     }
 
+    @Test(priority = 20)
+    public void testGetOwnershipEntrySetBeforeChannelStart() {
+        var tmpChannel = new ServiceUnitStateChannelImpl(pulsar1);
+        try {
+            tmpChannel.getOwnershipEntrySet();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e instanceof IllegalStateException);
+            assertEquals("Invalid channel state:Constructed", e.getMessage());
+        }
+    }
+
 
     private static ConcurrentHashMap<String, 
CompletableFuture<Optional<String>>> getOwnerRequests(
             ServiceUnitStateChannel channel) throws IllegalAccessException {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
index 02787aa1435..8fc19432eb3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
@@ -35,6 +35,8 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertTrue;
 
 @Test(groups = "broker")
@@ -102,6 +104,25 @@ public class NamespaceOwnershipListenerTests extends 
BrokerTestBase {
         deleteNamespaceWithRetry(namespace, false);
     }
 
+    @Test
+    public void testAddNamespaceBundleOwnershipListenerBeforeLBStart() {
+        NamespaceService namespaceService = spy(new NamespaceService(pulsar));
+        doThrow(new IllegalStateException("The LM is not initialized"))
+                .when(namespaceService).getOwnedServiceUnits();
+        namespaceService.addNamespaceBundleOwnershipListener(new 
NamespaceBundleOwnershipListener() {
+            @Override
+            public void onLoad(NamespaceBundle bundle) {}
+
+            @Override
+            public void unLoad(NamespaceBundle bundle) {}
+
+            @Override
+            public boolean test(NamespaceBundle namespaceBundle) {
+                return false;
+            }
+        });
+    }
+
     @Test
     public void testGetAllPartitions() throws Exception {
         final String namespace = "prop/" + UUID.randomUUID().toString();

Reply via email to