This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 44c033ceaf4c00e08746c68ecbeea7d6499a968e Author: fengyubiao <[email protected]> AuthorDate: Thu Sep 8 10:05:59 2022 +0800 [fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad (#17487) (cherry picked from commit 5c67ded8f858c54026ba69bd64854f885b55a5be) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 1 + .../broker/namespace/NamespaceServiceTest.java | 75 +++++++++++++++++----- 2 files changed, 59 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index ce8791ff8d6..24d2b97fc26 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -813,6 +813,7 @@ public abstract class NamespacesBase extends AdminResource { pulsar().getNamespaceService().removeOwnedServiceUnit(bundle); pulsar().getBrokerService().getBundleStats().remove(bundle.toString()); } catch (WebApplicationException wae) { + log.error("validateNamespaceBundleOwnership failed with exception", wae); throw wae; } catch (Exception e) { log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 90c76937413..81b4bb556cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -42,9 +42,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - -import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.commons.collections.CollectionUtils; @@ -72,6 +71,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.metadata.api.GetResult; +import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.apache.pulsar.policies.data.loadbalancer.LoadReport; @@ -79,13 +79,13 @@ import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.awaitility.Awaitility; import org.mockito.stubbing.Answer; +import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -684,19 +684,18 @@ public class NamespaceServiceTest extends BrokerTestBase { public void testModularLoadManagerRemoveBundleAndLoad() throws Exception { final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data"; final String namespace = "prop/ns-abc"; + final String bundleName = namespace + "/0x00000000_0xffffffff"; final String topic1 = "persistent://" + namespace + "/topic1"; final String topic2 = "persistent://" + namespace + "/topic2"; // configure broker with ModularLoadManager conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); conf.setForceDeleteNamespaceAllowed(true); + // Make sure LoadReportUpdaterTask has a 100% chance to write ZK. + conf.setLoadBalancerReportUpdateMaxIntervalMinutes(-1); restartBroker(); - LoadManager loadManager = spy(pulsar.getLoadManager().get()); - Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); - loadManagerField.setAccessible(true); - doReturn(true).when(loadManager).isCentralized(); - loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager)); + LoadManager loadManager = pulsar.getLoadManager().get(); NamespaceName nsname = NamespaceName.get(namespace); @Cleanup @@ -708,18 +707,14 @@ public class NamespaceServiceTest extends BrokerTestBase { Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2) .subscriptionName("my-subscriber-name2").subscribe(); - NamespaceBundle bundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic1)); - loadManager.getLeastLoaded(bundle); //create znode for bundle-data pulsar.getBrokerService().updateRates(); - loadManager.writeLoadReportOnZookeeper(); - loadManager.writeResourceQuotasToZooKeeper(); - - String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + "/0x00000000_0xffffffff"; + waitResourceDataUpdateToZK(loadManager); + String path = BUNDLE_DATA_PATH + "/" + bundleName; Optional<GetResult> getResult = pulsar.getLocalMetadataStore().get(path).get(); assertTrue(getResult.isPresent()); @@ -730,12 +725,58 @@ public class NamespaceServiceTest extends BrokerTestBase { TimeUnit.SECONDS.sleep(5); // update broker bundle report to zk + waitResourceDataUpdateToZK(loadManager); + Optional<GetResult> result = pulsar.getLocalMetadataStore().get(path).get(); + assertFalse(result.isPresent()); + } + + /** + * 1. Manually trigger "LoadReportUpdaterTask" + * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice". + * 3. Wait "waitForBrokerChangeNotice" is done, this task will be executed after + * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}, because it is registry later than + * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}. So if "waitForBrokerChangeNotice" is done + * we can guarantee {@link ModularLoadManagerImpl#handleDataNotification(Notification)} is done. At this time + * we still could not guarantee {@link ModularLoadManagerImpl#handleDataNotification(Notification)} has + * finished all things, because there has a async task be submitted to "ModularLoadManagerImpl.scheduler" by + * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}. + * 4. Submit a new task to "scheduler"(it is a singleton thread executor). + * 5. Wait the new task done, if the new task done, we can guarantee + * {@link ModularLoadManagerImpl#handleDataNotification(Notification)} has finished all things. + * 6. Manually trigger "LoadResourceQuotaUpdaterTask". + */ + private void waitResourceDataUpdateToZK(LoadManager loadManager) throws Exception { + CompletableFuture<Void> waitForBrokerChangeNotice = registryBrokerDataChangeNotice(); + // Manually trigger "LoadReportUpdaterTask" loadManager.writeLoadReportOnZookeeper(); + waitForBrokerChangeNotice.join(); + // Wait until "ModularLoadManager" completes processing the ZK notification. + ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) loadManager; + ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager(); + ScheduledExecutorService scheduler = Whitebox.getInternalState(modularLoadManager, "scheduler"); + CompletableFuture<Void> waitForNoticeHandleFinishByLoadManager = new CompletableFuture<>(); + scheduler.execute(() -> { + waitForNoticeHandleFinishByLoadManager.complete(null); + }); + waitForNoticeHandleFinishByLoadManager.join(); + // Manually trigger "LoadResourceQuotaUpdaterTask" loadManager.writeResourceQuotasToZooKeeper(); + } - getResult = pulsar.getLocalMetadataStore().get(path).get(); - assertFalse(getResult.isPresent()); - + public CompletableFuture<Void> registryBrokerDataChangeNotice() { + CompletableFuture<Void> completableFuture = new CompletableFuture<>(); + String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":" + + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get() + : conf.getWebServicePortTls().get()); + String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress; + pulsar.getLocalMetadataStore().registerListener(notice -> { + if (brokerDataPath.equals(notice.getPath())){ + if (!completableFuture.isDone()) { + completableFuture.complete(null); + } + } + }); + return completableFuture; } @SuppressWarnings("unchecked")
