This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b43b9de72c [improve][broker] Explicitly close LB internal topics when 
playing a follower (ExtensibleLoadManagerImpl only) (#23144)
1b43b9de72c is described below

commit 1b43b9de72ccf57b56375870b43d551def59d8a3
Author: Heesung Sohn <[email protected]>
AuthorDate: Sat Aug 10 02:24:26 2024 -0700

    [improve][broker] Explicitly close LB internal topics when playing a 
follower (ExtensibleLoadManagerImpl only) (#23144)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  20 +-
 .../extensions/ExtensibleLoadManagerImplTest.java  | 235 +++++++++++++--------
 2 files changed, 162 insertions(+), 93 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 9450c2a9cc4..95882cfb21b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -124,7 +124,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
     private static final String ELECTION_ROOT = 
"/loadbalance/extension/leader";
 
-    private static final Set<String> INTERNAL_TOPICS =
+    public static final Set<String> INTERNAL_TOPICS =
             Set.of(BROKER_LOAD_DATA_STORE_TOPIC, 
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);
 
     @VisibleForTesting
@@ -146,7 +146,10 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     @Getter
     private IsolationPoliciesHelper isolationPoliciesHelper;
 
+    @Getter
     private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
+
+    @Getter
     private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
 
     private LoadManagerScheduler unloadScheduler;
@@ -259,6 +262,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
         Follower
     }
 
+    @Getter
     private volatile Role role;
 
     /**
@@ -903,6 +907,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 }
                 unloadScheduler.close();
                 serviceUnitStateChannel.cancelOwnershipMonitor();
+                closeInternalTopics();
                 brokerLoadDataStore.init();
                 topBundlesLoadDataStore.close();
                 topBundlesLoadDataStore.startProducer();
@@ -1006,12 +1011,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
     private void closeInternalTopics() {
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         for (String name : INTERNAL_TOPICS) {
-            futures.add(pulsar.getBrokerService().getTopicIfExists(name)
-                    .thenAccept(topicOptional -> topicOptional.ifPresent(topic 
-> topic.close(true)))
-                    .exceptionally(__ -> {
-                        log.warn("Failed to close internal topic:{}", name);
-                        return null;
-                    }));
+            pulsar.getBrokerService()
+                    .getTopicReference(name)
+                    .ifPresent(topic -> futures.add(topic.close(true)
+                            .exceptionally(__ -> {
+                                log.warn("Failed to close internal topic:{}", 
name);
+                                return null;
+                            })));
         }
         try {
             FutureUtil.waitForAll(futures)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 4a9b80c798f..69a65caf294 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -102,7 +102,6 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
-import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.namespace.LookupOptions;
@@ -122,7 +121,6 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.LookupService;
-import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -134,6 +132,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.awaitility.Awaitility;
@@ -1186,55 +1185,49 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         assertEquals(result, expectedBrokerServiceUrl);
     }
 
-    @Test(priority = 10)
-    public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws 
Exception {
-        var topBundlesLoadDataStorePrimary =
-                (LoadDataStore) 
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore", 
true);
-        var serviceUnitStateChannelPrimary =
-                (ServiceUnitStateChannelImpl) 
FieldUtils.readDeclaredField(primaryLoadManager,
-                        "serviceUnitStateChannel", true);
-        var tvPrimary =
-                (TableViewImpl) 
FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimary, "tableView", true);
-
-        var topBundlesLoadDataStoreSecondary =
-                (LoadDataStore) 
FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore", 
true);
-        var tvSecondary =
-                (TableViewImpl) 
FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView", 
true);
-
-        if (serviceUnitStateChannelPrimary.isChannelOwnerAsync().get(5, 
TimeUnit.SECONDS)) {
-            assertNotNull(tvPrimary);
-            assertNull(tvSecondary);
-        } else {
-            assertNull(tvPrimary);
-            assertNotNull(tvSecondary);
+
+    private void makePrimaryAsLeader() throws Exception {
+        log.info("makePrimaryAsLeader");
+        if (channel2.isChannelOwner()) {
+            pulsar2.getLeaderElectionService().close();
+            Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() 
-> {
+                assertTrue(channel1.isChannelOwner());
+            });
+            pulsar2.getLeaderElectionService().start();
         }
 
-        restartBroker();
-        pulsar1 = pulsar;
-        setPrimaryLoadManager();
-        
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
-                Sets.newHashSet(this.conf.getClusterName()));
-
-        var serviceUnitStateChannelPrimaryNew =
-                (ServiceUnitStateChannelImpl) 
FieldUtils.readDeclaredField(primaryLoadManager,
-                        "serviceUnitStateChannel", true);
-        var topBundlesLoadDataStorePrimaryNew =
-                (LoadDataStore) 
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore"
-                        , true);
-        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
-                    
assertFalse(serviceUnitStateChannelPrimaryNew.isChannelOwnerAsync().get(5, 
TimeUnit.SECONDS));
-                    
assertNotNull(FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, 
"tableView"
-                            , true));
-                    
assertNull(FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimaryNew, 
"tableView"
-                            , true));
-                }
-        );
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertTrue(channel1.isChannelOwner());
+        });
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertFalse(channel2.isChannelOwner());
+        });
     }
 
-    @Test
-    public void testRoleChange() throws Exception {
-        var topBundlesLoadDataStorePrimary = 
(LoadDataStore<TopBundlesLoadData>)
-                FieldUtils.readDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore", true);
+    private void makeSecondaryAsLeader() throws Exception {
+        log.info("makeSecondaryAsLeader");
+        if (channel1.isChannelOwner()) {
+            pulsar1.getLeaderElectionService().close();
+            Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() 
-> {
+                assertTrue(channel2.isChannelOwner());
+            });
+            pulsar1.getLeaderElectionService().start();
+        }
+
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertTrue(channel2.isChannelOwner());
+        });
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertFalse(channel1.isChannelOwner());
+        });
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testRoleChangeIdempotency() throws Exception {
+
+        makePrimaryAsLeader();
+
+        var topBundlesLoadDataStorePrimary = 
primaryLoadManager.getTopBundlesLoadDataStore();
         var topBundlesLoadDataStorePrimarySpy = 
spy(topBundlesLoadDataStorePrimary);
         AtomicInteger countPri = new AtomicInteger(3);
         AtomicInteger countPri2 = new AtomicInteger(3);
@@ -1255,8 +1248,7 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
             return null;
         }).when(topBundlesLoadDataStorePrimarySpy).closeTableView();
 
-        var topBundlesLoadDataStoreSecondary = 
(LoadDataStore<TopBundlesLoadData>)
-                FieldUtils.readDeclaredField(secondaryLoadManager, 
"topBundlesLoadDataStore", true);
+        var topBundlesLoadDataStoreSecondary = 
secondaryLoadManager.getTopBundlesLoadDataStore();
         var topBundlesLoadDataStoreSecondarySpy = 
spy(topBundlesLoadDataStoreSecondary);
         AtomicInteger countSec = new AtomicInteger(3);
         AtomicInteger countSec2 = new AtomicInteger(3);
@@ -1284,51 +1276,30 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                     topBundlesLoadDataStoreSecondarySpy, true);
 
 
-            if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
-                primaryLoadManager.playLeader();
-                secondaryLoadManager.playFollower();
-                verify(topBundlesLoadDataStorePrimarySpy, 
times(3)).startTableView();
-                verify(topBundlesLoadDataStorePrimarySpy, 
times(5)).closeTableView();
-                verify(topBundlesLoadDataStoreSecondarySpy, 
times(0)).startTableView();
-                verify(topBundlesLoadDataStoreSecondarySpy, 
times(3)).closeTableView();
-            } else {
-                primaryLoadManager.playFollower();
-                secondaryLoadManager.playLeader();
-                verify(topBundlesLoadDataStoreSecondarySpy, 
times(3)).startTableView();
-                verify(topBundlesLoadDataStoreSecondarySpy, 
times(5)).closeTableView();
-                verify(topBundlesLoadDataStorePrimarySpy, 
times(0)).startTableView();
-                verify(topBundlesLoadDataStorePrimarySpy, 
times(3)).closeTableView();
-            }
+
+            primaryLoadManager.playLeader();
+            secondaryLoadManager.playFollower();
+            verify(topBundlesLoadDataStorePrimarySpy, 
times(3)).startTableView();
+            verify(topBundlesLoadDataStorePrimarySpy, 
times(5)).closeTableView();
+            verify(topBundlesLoadDataStoreSecondarySpy, 
times(0)).startTableView();
+            verify(topBundlesLoadDataStoreSecondarySpy, 
times(3)).closeTableView();
+
 
             primaryLoadManager.playFollower();
             secondaryLoadManager.playFollower();
+            assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+                    primaryLoadManager.getRole());
+            assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+                    secondaryLoadManager.getRole());
 
-            if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
-                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
-                        FieldUtils.readDeclaredField(primaryLoadManager, 
"role", true));
-                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
-                        FieldUtils.readDeclaredField(secondaryLoadManager, 
"role", true));
-            } else {
-                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
-                        FieldUtils.readDeclaredField(primaryLoadManager, 
"role", true));
-                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
-                        FieldUtils.readDeclaredField(secondaryLoadManager, 
"role", true));
-            }
 
             primaryLoadManager.playLeader();
             secondaryLoadManager.playLeader();
+            assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+                    primaryLoadManager.getRole());
+            assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+                    secondaryLoadManager.getRole());
 
-            if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
-                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
-                        FieldUtils.readDeclaredField(primaryLoadManager, 
"role", true));
-                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
-                        FieldUtils.readDeclaredField(secondaryLoadManager, 
"role", true));
-            } else {
-                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
-                        FieldUtils.readDeclaredField(primaryLoadManager, 
"role", true));
-                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
-                        FieldUtils.readDeclaredField(secondaryLoadManager, 
"role", true));
-            }
         } finally {
             FieldUtils.writeDeclaredField(primaryLoadManager, 
"topBundlesLoadDataStore",
                     topBundlesLoadDataStorePrimary, true);
@@ -1336,6 +1307,98 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
                     topBundlesLoadDataStoreSecondary, true);
         }
     }
+    @Test(timeOut = 30 * 1000)
+    public void testRoleChange() throws Exception {
+        makePrimaryAsLeader();
+
+        var leader = primaryLoadManager;
+        var follower = secondaryLoadManager;
+
+        BrokerLoadData brokerLoadExpected = new BrokerLoadData();
+        SystemResourceUsage usage = new SystemResourceUsage();
+        var cpu = new ResourceUsage(1.0, 100.0);
+        String key = "b1";
+        usage.setCpu(cpu);
+        brokerLoadExpected.update(usage, 0, 0, 0, 0, 0, 0, conf);
+        String bundle = "public/default/0x00000000_0xffffffff";
+        TopBundlesLoadData topBundlesExpected = new TopBundlesLoadData();
+        topBundlesExpected.getTopBundlesLoadData().clear();
+        topBundlesExpected.getTopBundlesLoadData().add(new 
TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats()));
+
+        follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
+        follower.getTopBundlesLoadDataStore().pushAsync(bundle, 
topBundlesExpected);
+
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+
+            
assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(), 
"tableView", true));
+            
assertNull(FieldUtils.readDeclaredField(follower.getTopBundlesLoadDataStore(), 
"tableView", true));
+
+            for (String internalTopic : 
ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
+                
assertTrue(leader.pulsar.getBrokerService().getTopicReference(internalTopic)
+                        .isPresent());
+                
assertTrue(follower.pulsar.getBrokerService().getTopicReference(internalTopic)
+                        .isEmpty());
+
+                assertTrue(leader.pulsar.getNamespaceService()
+                        
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
+                assertFalse(follower.pulsar.getNamespaceService()
+                        
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
+            }
+
+            var actualBrokerLoadLeader = 
leader.getBrokerLoadDataStore().get(key);
+            if (actualBrokerLoadLeader.isPresent()) {
+                assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);
+            }
+
+            var actualTopBundlesLeader = 
leader.getTopBundlesLoadDataStore().get(bundle);
+            if (actualTopBundlesLeader.isPresent()) {
+                assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);
+            }
+
+            var actualBrokerLoadFollower = 
follower.getBrokerLoadDataStore().get(key);
+            if (actualBrokerLoadFollower.isPresent()) {
+                assertEquals(actualBrokerLoadFollower.get(), 
brokerLoadExpected);
+            }
+        });
+
+        makeSecondaryAsLeader();
+
+        var leader2 = secondaryLoadManager;
+        var follower2 = primaryLoadManager;
+
+        brokerLoadExpected.update(usage, 1, 0, 0, 0, 0, 0, conf);
+        topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn = 
1;
+
+        follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
+        follower.getTopBundlesLoadDataStore().pushAsync(bundle, 
topBundlesExpected);
+
+        Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+            
assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(),
 "tableView", true));
+            
assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(), 
"tableView", true));
+
+            for (String internalTopic : 
ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
+                
assertTrue(leader2.pulsar.getBrokerService().getTopicReference(internalTopic)
+                        .isPresent());
+                
assertTrue(follower2.pulsar.getBrokerService().getTopicReference(internalTopic)
+                        .isEmpty());
+
+                assertTrue(leader2.pulsar.getNamespaceService()
+                        
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
+                assertFalse(follower2.pulsar.getNamespaceService()
+                        
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
+            }
+
+
+            var actualBrokerLoadLeader = 
leader2.getBrokerLoadDataStore().get(key);
+            assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);
+
+            var actualTopBundlesLeader = 
leader2.getTopBundlesLoadDataStore().get(bundle);
+            assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);
+
+            var actualBrokerLoadFollower = 
follower2.getBrokerLoadDataStore().get(key);
+            assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected);
+        });
+    }
 
     @Test
     public void testGetMetrics() throws Exception {

Reply via email to