This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b6da3335c94a4439a6fedf9f1bcc8afff96045c6 Author: Kai Wang <[email protected]> AuthorDate: Tue Aug 6 12:05:41 2024 +0800 [fix][broker] Handle the case when `getOwnedServiceUnits` fails gracefully (#23119) (cherry picked from commit 4a44f45783772780000878cdddbdc2aefd08bcfe) --- .../channel/ServiceUnitStateChannelImpl.java | 8 ++++++-- .../pulsar/broker/namespace/NamespaceService.java | 6 +++++- .../channel/ServiceUnitStateChannelTest.java | 13 +++++++++++++ .../namespace/NamespaceOwnershipListenerTests.java | 21 +++++++++++++++++++++ 4 files changed, 45 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 5454edac14d..965d03b064f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -127,7 +127,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private final String brokerId; private final Map<String, CompletableFuture<Void>> cleanupJobs; private final StateChangeListeners stateChangeListeners; - private ExtensibleLoadManagerImpl loadManager; private BrokerRegistry brokerRegistry; private LeaderElectionService leaderElectionService; private TableView<ServiceUnitStateData> tableview; @@ -285,7 +284,6 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { log.warn("Failed to find the channel leader."); } this.channelState = LeaderElectionServiceStarted; - loadManager = getLoadManager(); if (producer != null) { producer.close(); @@ -554,6 +552,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { } private Optional<String> getOwner(String serviceUnit) { + if (!validateChannelState(Started, true)) { + throw new IllegalStateException("Invalid channel state:" + channelState.name()); + } ServiceUnitStateData data = tableview.get(serviceUnit); ServiceUnitState state = state(data); switch (state) { @@ -1758,6 +1759,9 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { @Override public Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet() { + if (!validateChannelState(Started, true)) { + throw new IllegalStateException("Invalid channel state:" + channelState.name()); + } return tableview.entrySet(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 539f899393a..589195691c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1304,7 +1304,11 @@ public class NamespaceService implements AutoCloseable { } } pulsar.runWhenReadyForIncomingRequests(() -> { - getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners)); + try { + getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners)); + } catch (Exception e) { + LOG.error("Failed to notify namespace bundle ownership listener", e); + } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index aef68aff9a2..e569f0d32d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertThrows; import static org.testng.Assert.expectThrows; +import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -1762,6 +1763,18 @@ public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { } + @Test(priority = 20) + public void testGetOwnershipEntrySetBeforeChannelStart() { + var tmpChannel = new ServiceUnitStateChannelImpl(pulsar1); + try { + tmpChannel.getOwnershipEntrySet(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof IllegalStateException); + assertEquals("Invalid channel state:Constructed", e.getMessage()); + } + } + private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java index 02787aa1435..8fc19432eb3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java @@ -35,6 +35,8 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertTrue; @Test(groups = "broker") @@ -102,6 +104,25 @@ public class NamespaceOwnershipListenerTests extends BrokerTestBase { deleteNamespaceWithRetry(namespace, false); } + @Test + public void testAddNamespaceBundleOwnershipListenerBeforeLBStart() { + NamespaceService namespaceService = spy(new NamespaceService(pulsar)); + doThrow(new IllegalStateException("The LM is not initialized")) + .when(namespaceService).getOwnedServiceUnits(); + namespaceService.addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() { + @Override + public void onLoad(NamespaceBundle bundle) {} + + @Override + public void unLoad(NamespaceBundle bundle) {} + + @Override + public boolean test(NamespaceBundle namespaceBundle) { + return false; + } + }); + } + @Test public void testGetAllPartitions() throws Exception { final String namespace = "prop/" + UUID.randomUUID().toString();
