This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3eb3b1cd23d [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) 3eb3b1cd23d is described below commit 3eb3b1cd23d2cc11424bf882e244d3bc2e92bf27 Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Mon Apr 1 01:52:21 2024 -0700 [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379) --- .../extensions/manager/UnloadManager.java | 14 ++++++- .../pulsar/broker/service/BrokerService.java | 11 +++++- .../pulsar/broker/web/PulsarWebResource.java | 5 +++ .../ExtensibleLoadManagerImplBaseTest.java | 8 ---- .../extensions/ExtensibleLoadManagerImplTest.java | 27 +++++++++++++ .../extensions/manager/UnloadManagerTest.java | 44 ++++++++++++---------- 6 files changed, 80 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index b210dedbfe8..ffae9475243 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.loadbalance.extensions.manager; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning; +import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure; import static org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown; import com.google.common.annotations.VisibleForTesting; @@ -28,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData; import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter; @@ -170,6 +173,15 @@ public class UnloadManager implements StateChangeListener { @Override public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { + ServiceUnitState state = ServiceUnitStateData.state(data); + + if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) { + if (log.isDebugEnabled()) { + log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit); + } + return; + } + if (t != null) { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {} with exception.", data, serviceUnit, t); @@ -181,7 +193,7 @@ public class UnloadManager implements StateChangeListener { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {}", data, serviceUnit); } - ServiceUnitState state = ServiceUnitStateData.state(data); + switch (state) { case Free, Owned -> complete(serviceUnit, t); case Releasing -> LatencyMetric.RELEASE.endMeasurement(serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 98a0ed95b1a..549dfef896c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2244,9 +2244,18 @@ public class BrokerService implements Closeable { closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) - : CompletableFuture.completedFuture(null))); + : CompletableFuture.completedFuture(null)) + .exceptionally(e -> { + if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException + && e.getMessage().contains("Please redo the lookup")) { + log.warn("[{}] Topic ownership check failed. Skipping it", topicName); + return null; + } + throw FutureUtil.wrapToCompletionException(e); + })); } }); + if (getPulsar().getConfig().isTransactionCoordinatorEnabled() && serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) { TransactionMetadataStoreService metadataStoreService = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index e23286ae449..07c33107d4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -610,11 +610,16 @@ public abstract class PulsarWebResource { NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange); NamespaceService nsService = pulsar().getNamespaceService(); + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + return nsService.checkOwnershipPresentAsync(nsBundle); + } + LookupOptions options = LookupOptions.builder() .authoritative(false) .requestHttps(isRequestHttps()) .readOnly(true) .loadTopicsInBundle(false).build(); + return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java index 9e20fccff6d..651a544a04e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -37,7 +37,6 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.policies.data.TopicType; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -65,18 +64,11 @@ public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServ } protected ServiceConfiguration initConfig(ServiceConfiguration conf) { - // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid - // stuck when doing unload. - conf.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 1000); - conf.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1); conf.setForceDeleteNamespaceAllowed(true); - conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - conf.setAllowAutoTopicCreation(true); conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); conf.setLoadBalancerSheddingEnabled(false); conf.setLoadBalancerDebugModeEnabled(true); - conf.setTopicLevelPoliciesEnabled(true); return conf; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 308a755235c..aee57f9d260 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -67,6 +67,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -264,6 +265,32 @@ public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBase assertTrue(brokerLookupData.isPresent()); } + @Test(timeOut = 30 * 1000) + public void testUnloadUponTopicLookupFailure() throws Exception { + TopicName topicName = + TopicName.get("public/test/testUnloadUponTopicLookupFailure"); + NamespaceBundle bundle = pulsar1.getNamespaceService().getBundle(topicName); + primaryLoadManager.assign(Optional.empty(), bundle).get(); + + CompletableFuture future1 = new CompletableFuture(); + CompletableFuture future2 = new CompletableFuture(); + try { + pulsar1.getBrokerService().getTopics().put(topicName.toString(), future1); + pulsar2.getBrokerService().getTopics().put(topicName.toString(), future2); + CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS).execute(() -> { + future1.completeExceptionally(new CompletionException( + new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup"))); + future2.completeExceptionally(new CompletionException( + new BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup"))); + }); + admin.namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(), bundle.getBundleRange()); + } finally { + pulsar1.getBrokerService().getTopics().remove(topicName.toString()); + pulsar2.getBrokerService().getTopics().remove(topicName.toString()); + } + } + + @Test(timeOut = 30 * 1000) public void testUnloadAdminAPI() throws Exception { Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-unload"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java index 56c28966ac2..5d0abea3357 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -93,53 +93,59 @@ public class UnloadManagerTest { public void testSuccess() throws IllegalAccessException, ExecutionException, InterruptedException { UnloadCounter counter = new UnloadCounter(); UnloadManager manager = new UnloadManager(counter, "mockBrokerId"); + String dstBroker = "broker-2"; + String srcBroker = "broker-1"; + String bundle = "bundle-1"; var unloadDecision = - new UnloadDecision(new Unload("broker-1", "bundle-1"), Success, Admin); + new UnloadDecision(new Unload(srcBroker, bundle), Success, Admin); CompletableFuture<Void> future = manager.waitAsync(CompletableFuture.completedFuture(null), - "bundle-1", unloadDecision, 5, TimeUnit.SECONDS); + bundle, unloadDecision, 5, TimeUnit.SECONDS); Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent("bundle-1", - new ServiceUnitStateData(ServiceUnitState.Assigning, "broker-1", VERSION_ID_INIT), null); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Assigning, null, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent("bundle-1", - new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", VERSION_ID_INIT), null); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Deleted, null, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent("bundle-1", - new ServiceUnitStateData(ServiceUnitState.Splitting, "broker-1", VERSION_ID_INIT), null); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Splitting, null, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent("bundle-1", - new ServiceUnitStateData(ServiceUnitState.Releasing, "broker-1", VERSION_ID_INIT), null); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Releasing, null, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent("bundle-1", - new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", VERSION_ID_INIT), null); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent("bundle-1", - new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", VERSION_ID_INIT), null); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1); // Success with Owned state. future = manager.waitAsync(CompletableFuture.completedFuture(null), - "bundle-1", unloadDecision, 5, TimeUnit.SECONDS); + bundle, unloadDecision, 5, TimeUnit.SECONDS); inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent("bundle-1", - new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), null); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); - future.get(); + future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2); } @@ -157,7 +163,7 @@ public class UnloadManagerTest { assertEquals(inFlightUnloadRequestMap.size(), 1); manager.handleEvent("bundle-1", - new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", VERSION_ID_INIT), + new ServiceUnitStateData(ServiceUnitState.Owned, null, "broker-1", VERSION_ID_INIT), new IllegalStateException("Failed stage.")); try {