This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fff41c31c5d [fix][broker] Handle the case when `getOwnedServiceUnits`
fails gracefully (#23119)
fff41c31c5d is described below
commit fff41c31c5db70402fa9e31b0e96d8c93f7d5198
Author: Kai Wang <[email protected]>
AuthorDate: Tue Aug 6 12:05:41 2024 +0800
[fix][broker] Handle the case when `getOwnedServiceUnits` fails gracefully
(#23119)
---
.../channel/ServiceUnitStateChannelImpl.java | 8 ++++++--
.../pulsar/broker/namespace/NamespaceService.java | 6 +++++-
.../channel/ServiceUnitStateChannelTest.java | 13 +++++++++++++
.../namespace/NamespaceOwnershipListenerTests.java | 21 +++++++++++++++++++++
4 files changed, 45 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 02e641f69a7..e7804806d9c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -130,7 +130,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private final String brokerId;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;
- private ExtensibleLoadManagerImpl loadManager;
private BrokerRegistry brokerRegistry;
private LeaderElectionService leaderElectionService;
private TableView<ServiceUnitStateData> tableview;
@@ -296,7 +295,6 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.warn("Failed to find the channel leader.");
}
this.channelState = LeaderElectionServiceStarted;
- loadManager = getLoadManager();
if (producer != null) {
producer.close();
@@ -561,6 +559,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
private Optional<String> getOwner(String serviceUnit) {
+ if (!validateChannelState(Started, true)) {
+ throw new IllegalStateException("Invalid channel state:" +
channelState.name());
+ }
ServiceUnitStateData data = tableview.get(serviceUnit);
ServiceUnitState state = state(data);
switch (state) {
@@ -1715,6 +1716,9 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
@Override
public Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet()
{
+ if (!validateChannelState(Started, true)) {
+ throw new IllegalStateException("Invalid channel state:" +
channelState.name());
+ }
return tableview.entrySet();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index b02653b53ef..b8ca38624d0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1301,7 +1301,11 @@ public class NamespaceService implements AutoCloseable {
}
}
pulsar.runWhenReadyForIncomingRequests(() -> {
- getOwnedServiceUnits().forEach(bundle ->
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+ try {
+ getOwnedServiceUnits().forEach(bundle ->
notifyNamespaceBundleOwnershipListener(bundle, listeners));
+ } catch (Exception e) {
+ LOG.error("Failed to notify namespace bundle ownership
listener", e);
+ }
});
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
index 24f27a58e76..e8f0c9e43a8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java
@@ -40,6 +40,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThrows;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.expectThrows;
+import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -1759,6 +1760,18 @@ public class ServiceUnitStateChannelTest extends
MockedPulsarServiceBaseTest {
}
+ @Test(priority = 20)
+ public void testGetOwnershipEntrySetBeforeChannelStart() {
+ var tmpChannel = new ServiceUnitStateChannelImpl(pulsar1);
+ try {
+ tmpChannel.getOwnershipEntrySet();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof IllegalStateException);
+ assertEquals("Invalid channel state:Constructed", e.getMessage());
+ }
+ }
+
private static ConcurrentHashMap<String,
CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
index 02787aa1435..8fc19432eb3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
@@ -35,6 +35,8 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertTrue;
@Test(groups = "broker")
@@ -102,6 +104,25 @@ public class NamespaceOwnershipListenerTests extends
BrokerTestBase {
deleteNamespaceWithRetry(namespace, false);
}
+ @Test
+ public void testAddNamespaceBundleOwnershipListenerBeforeLBStart() {
+ NamespaceService namespaceService = spy(new NamespaceService(pulsar));
+ doThrow(new IllegalStateException("The LM is not initialized"))
+ .when(namespaceService).getOwnedServiceUnits();
+ namespaceService.addNamespaceBundleOwnershipListener(new
NamespaceBundleOwnershipListener() {
+ @Override
+ public void onLoad(NamespaceBundle bundle) {}
+
+ @Override
+ public void unLoad(NamespaceBundle bundle) {}
+
+ @Override
+ public boolean test(NamespaceBundle namespaceBundle) {
+ return false;
+ }
+ });
+ }
+
@Test
public void testGetAllPartitions() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString();