This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 44c033ceaf4c00e08746c68ecbeea7d6499a968e
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 8 10:05:59 2022 +0800

    
[fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad
 (#17487)
    
    (cherry picked from commit 5c67ded8f858c54026ba69bd64854f885b55a5be)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  1 +
 .../broker/namespace/NamespaceServiceTest.java     | 75 +++++++++++++++++-----
 2 files changed, 59 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index ce8791ff8d6..24d2b97fc26 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -813,6 +813,7 @@ public abstract class NamespacesBase extends AdminResource {
             pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
             
pulsar().getBrokerService().getBundleStats().remove(bundle.toString());
         } catch (WebApplicationException wae) {
+            log.error("validateNamespaceBundleOwnership failed with 
exception", wae);
             throw wae;
         } catch (Exception e) {
             log.error("[{}] Failed to remove namespace bundle {}/{}", 
clientAppId(), namespaceName.toString(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 90c76937413..81b4bb556cd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -42,9 +42,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
-import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.commons.collections.CollectionUtils;
@@ -72,6 +71,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
@@ -79,13 +79,13 @@ import 
org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.awaitility.Awaitility;
 import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-
 import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -684,19 +684,18 @@ public class NamespaceServiceTest extends BrokerTestBase {
     public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
         final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
         final String namespace = "prop/ns-abc";
+        final String bundleName = namespace + "/0x00000000_0xffffffff";
         final String topic1 = "persistent://" + namespace + "/topic1";
         final String topic2 = "persistent://" + namespace + "/topic2";
 
         // configure broker with ModularLoadManager
         conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         conf.setForceDeleteNamespaceAllowed(true);
+        // Make sure LoadReportUpdaterTask has a 100% chance to write ZK.
+        conf.setLoadBalancerReportUpdateMaxIntervalMinutes(-1);
         restartBroker();
 
-        LoadManager loadManager = spy(pulsar.getLoadManager().get());
-        Field loadManagerField = 
NamespaceService.class.getDeclaredField("loadManager");
-        loadManagerField.setAccessible(true);
-        doReturn(true).when(loadManager).isCentralized();
-        loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager));
+        LoadManager loadManager = pulsar.getLoadManager().get();
         NamespaceName nsname = NamespaceName.get(namespace);
 
         @Cleanup
@@ -708,18 +707,14 @@ public class NamespaceServiceTest extends BrokerTestBase {
         Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
                 .subscriptionName("my-subscriber-name2").subscribe();
 
-
         NamespaceBundle bundle =
                 
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic1));
-
         loadManager.getLeastLoaded(bundle);
 
         //create znode for bundle-data
         pulsar.getBrokerService().updateRates();
-        loadManager.writeLoadReportOnZookeeper();
-        loadManager.writeResourceQuotasToZooKeeper();
-
-        String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + 
"/0x00000000_0xffffffff";
+        waitResourceDataUpdateToZK(loadManager);
+        String path = BUNDLE_DATA_PATH + "/" + bundleName;
 
         Optional<GetResult> getResult = 
pulsar.getLocalMetadataStore().get(path).get();
         assertTrue(getResult.isPresent());
@@ -730,12 +725,58 @@ public class NamespaceServiceTest extends BrokerTestBase {
         TimeUnit.SECONDS.sleep(5);
 
         // update broker bundle report to zk
+        waitResourceDataUpdateToZK(loadManager);
+        Optional<GetResult> result = 
pulsar.getLocalMetadataStore().get(path).get();
+        assertFalse(result.isPresent());
+    }
+
+    /**
+     * 1. Manually trigger "LoadReportUpdaterTask"
+     * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".
+     * 3. Wait "waitForBrokerChangeNotice" is done, this task will be executed 
after
+     *    {@link ModularLoadManagerImpl#handleDataNotification(Notification)}, 
because it is registry later than
+     *    {@link ModularLoadManagerImpl#handleDataNotification(Notification)}. 
So if "waitForBrokerChangeNotice" is done
+     *    we can guarantee {@link 
ModularLoadManagerImpl#handleDataNotification(Notification)} is done. At this 
time
+     *    we still could not guarantee {@link 
ModularLoadManagerImpl#handleDataNotification(Notification)} has
+     *    finished all things, because there has a async task be submitted to 
"ModularLoadManagerImpl.scheduler" by
+     *    {@link ModularLoadManagerImpl#handleDataNotification(Notification)}.
+     * 4. Submit a new task to "scheduler"(it is a singleton thread executor).
+     * 5. Wait the new task done, if the new task done, we can guarantee
+     *    {@link ModularLoadManagerImpl#handleDataNotification(Notification)} 
has finished all things.
+     * 6. Manually trigger "LoadResourceQuotaUpdaterTask".
+     */
+    private void waitResourceDataUpdateToZK(LoadManager loadManager) throws 
Exception {
+        CompletableFuture<Void> waitForBrokerChangeNotice = 
registryBrokerDataChangeNotice();
+        // Manually trigger "LoadReportUpdaterTask"
         loadManager.writeLoadReportOnZookeeper();
+        waitForBrokerChangeNotice.join();
+        // Wait until "ModularLoadManager" completes processing the ZK 
notification.
+        ModularLoadManagerWrapper modularLoadManagerWrapper = 
(ModularLoadManagerWrapper) loadManager;
+        ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) 
modularLoadManagerWrapper.getLoadManager();
+        ScheduledExecutorService scheduler = 
Whitebox.getInternalState(modularLoadManager, "scheduler");
+        CompletableFuture<Void> waitForNoticeHandleFinishByLoadManager = new 
CompletableFuture<>();
+        scheduler.execute(() -> {
+            waitForNoticeHandleFinishByLoadManager.complete(null);
+        });
+        waitForNoticeHandleFinishByLoadManager.join();
+        // Manually trigger "LoadResourceQuotaUpdaterTask"
         loadManager.writeResourceQuotasToZooKeeper();
+    }
 
-        getResult = pulsar.getLocalMetadataStore().get(path).get();
-        assertFalse(getResult.isPresent());
-
+    public CompletableFuture<Void> registryBrokerDataChangeNotice() {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+                + (conf.getWebServicePort().isPresent() ? 
conf.getWebServicePort().get()
+                : conf.getWebServicePortTls().get());
+        String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + 
lookupServiceAddress;
+        pulsar.getLocalMetadataStore().registerListener(notice -> {
+            if (brokerDataPath.equals(notice.getPath())){
+                if (!completableFuture.isDone()) {
+                    completableFuture.complete(null);
+                }
+            }
+        });
+        return completableFuture;
     }
 
     @SuppressWarnings("unchecked")

Reply via email to