This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4cef40c60879657c6c21bbe1b0e0aed1fa83017a Author: fengyubiao <[email protected]> AuthorDate: Thu Sep 8 10:05:59 2022 +0800 [fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad (#17487) --- .../broker/namespace/NamespaceServiceTest.java | 68 ++++++++++++++++++---- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 9f03c026225..dba2d35a21c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -76,6 +77,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataCache; +import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.apache.pulsar.policies.data.loadbalancer.LoadReport; @@ -84,6 +86,7 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.policies.data.loadbalancer.BundleData; import org.awaitility.Awaitility; import org.mockito.stubbing.Answer; +import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -725,19 +728,18 @@ public class NamespaceServiceTest extends BrokerTestBase { public void testModularLoadManagerRemoveBundleAndLoad() throws Exception { final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data"; final String namespace = "prop/ns-abc"; + final String bundleName = namespace + "/0x00000000_0xffffffff"; final String topic1 = "persistent://" + namespace + "/topic1"; final String topic2 = "persistent://" + namespace + "/topic2"; // configure broker with ModularLoadManager conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); conf.setForceDeleteNamespaceAllowed(true); + // Make sure LoadReportUpdaterTask has a 100% chance to write ZK. + conf.setLoadBalancerReportUpdateMaxIntervalMinutes(-1); restartBroker(); - LoadManager loadManager = spy(pulsar.getLoadManager().get()); - Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); - loadManagerField.setAccessible(true); - doReturn(true).when(loadManager).isCentralized(); - loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager)); + LoadManager loadManager = pulsar.getLoadManager().get(); NamespaceName nsname = NamespaceName.get(namespace); @Cleanup @@ -757,10 +759,9 @@ public class NamespaceServiceTest extends BrokerTestBase { //create znode for bundle-data pulsar.getBrokerService().updateRates(); - loadManager.writeLoadReportOnZookeeper(); - loadManager.writeResourceQuotasToZooKeeper(); - String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + "/0x00000000_0xffffffff"; + waitResourceDataUpdateToZK(loadManager); + String path = BUNDLE_DATA_PATH + "/" + bundleName; Optional<GetResult> getResult = pulsar.getLocalMetadataStore().get(path).get(); assertTrue(getResult.isPresent()); @@ -771,12 +772,59 @@ public class NamespaceServiceTest extends BrokerTestBase { TimeUnit.SECONDS.sleep(5); // update broker bundle report to zk - loadManager.writeLoadReportOnZookeeper(); - loadManager.writeResourceQuotasToZooKeeper(); + waitResourceDataUpdateToZK(loadManager); getResult = pulsar.getLocalMetadataStore().get(path).get(); assertFalse(getResult.isPresent()); + } + + /** + * 1. Manually trigger "LoadReportUpdaterTask" + * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice". + * 3. Wait "waitForBrokerChangeNotice" is done, this task will be executed after + * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}, because it is registry later than + * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}. So if "waitForBrokerChangeNotice" is done + * we can guarantee {@link ModularLoadManagerImpl#handleDataNotification(Notification)} is done. At this time + * we still could not guarantee {@link ModularLoadManagerImpl#handleDataNotification(Notification)} has + * finished all things, because there has a async task be submitted to "ModularLoadManagerImpl.scheduler" by + * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}. + * 4. Submit a new task to "scheduler"(it is a singleton thread executor). + * 5. Wait the new task done, if the new task done, we can guarantee + * {@link ModularLoadManagerImpl#handleDataNotification(Notification)} has finished all things. + * 6. Manually trigger "LoadResourceQuotaUpdaterTask". + */ + private void waitResourceDataUpdateToZK(LoadManager loadManager) throws Exception { + CompletableFuture<Void> waitForBrokerChangeNotice = registryBrokerDataChangeNotice(); + // Manually trigger "LoadReportUpdaterTask" + loadManager.writeLoadReportOnZookeeper(); + waitForBrokerChangeNotice.join(); + // Wait until "ModularLoadManager" completes processing the ZK notification. + ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) loadManager; + ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager(); + ScheduledExecutorService scheduler = Whitebox.getInternalState(modularLoadManager, "scheduler"); + CompletableFuture<Void> waitForNoticeHandleFinishByLoadManager = new CompletableFuture<>(); + scheduler.execute(() -> { + waitForNoticeHandleFinishByLoadManager.complete(null); + }); + waitForNoticeHandleFinishByLoadManager.join(); + // Manually trigger "LoadResourceQuotaUpdaterTask" + loadManager.writeResourceQuotasToZooKeeper(); + } + public CompletableFuture<Void> registryBrokerDataChangeNotice() { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get() + : conf.getWebServicePortTls().get()); + String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress; + pulsar.getLocalMetadataStore().registerListener(notice -> { + if (brokerDataPath.equals(notice.getPath())){ + if (!completableFuture.isDone()) { + completableFuture.complete(null); + } + } + }); + return completableFuture; } @SuppressWarnings("unchecked")
