This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eb3b1cd23d [fix][broker] Skip topic.close during unloading if the 
topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to 
use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379)
3eb3b1cd23d is described below

commit 3eb3b1cd23d2cc11424bf882e244d3bc2e92bf27
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Mon Apr 1 01:52:21 2024 -0700

    [fix][broker] Skip topic.close during unloading if the topic future fails 
with ownership check, and fix isBundleOwnedByAnyBroker to use 
ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer (#22379)
---
 .../extensions/manager/UnloadManager.java          | 14 ++++++-
 .../pulsar/broker/service/BrokerService.java       | 11 +++++-
 .../pulsar/broker/web/PulsarWebResource.java       |  5 +++
 .../ExtensibleLoadManagerImplBaseTest.java         |  8 ----
 .../extensions/ExtensibleLoadManagerImplTest.java  | 27 +++++++++++++
 .../extensions/manager/UnloadManagerTest.java      | 44 ++++++++++++----------
 6 files changed, 80 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
index b210dedbfe8..ffae9475243 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.loadbalance.extensions.manager;
 
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigning;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Label.Failure;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
 import com.google.common.annotations.VisibleForTesting;
@@ -28,6 +30,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
@@ -170,6 +173,15 @@ public class UnloadManager implements StateChangeListener {
 
     @Override
     public void handleEvent(String serviceUnit, ServiceUnitStateData data, 
Throwable t) {
+        ServiceUnitState state = ServiceUnitStateData.state(data);
+
+        if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || 
state == Assigning)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Skipping {} for service unit {} from the assignment 
command.", data, serviceUnit);
+            }
+            return;
+        }
+
         if (t != null) {
             if (log.isDebugEnabled()) {
                 log.debug("Handling {} for service unit {} with exception.", 
data, serviceUnit, t);
@@ -181,7 +193,7 @@ public class UnloadManager implements StateChangeListener {
         if (log.isDebugEnabled()) {
             log.debug("Handling {} for service unit {}", data, serviceUnit);
         }
-        ServiceUnitState state = ServiceUnitStateData.state(data);
+
         switch (state) {
             case Free, Owned -> complete(serviceUnit, t);
             case Releasing -> 
LatencyMetric.RELEASE.endMeasurement(serviceUnit);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 98a0ed95b1a..549dfef896c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2244,9 +2244,18 @@ public class BrokerService implements Closeable {
                 closeFutures.add(topicFuture
                         .thenCompose(t -> t.isPresent() ? t.get().close(
                                 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-                                : CompletableFuture.completedFuture(null)));
+                                : CompletableFuture.completedFuture(null))
+                        .exceptionally(e -> {
+                            if (e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException
+                                    && e.getMessage().contains("Please redo 
the lookup")) {
+                                log.warn("[{}] Topic ownership check failed. 
Skipping it", topicName);
+                                return null;
+                            }
+                            throw FutureUtil.wrapToCompletionException(e);
+                        }));
             }
         });
+
         if (getPulsar().getConfig().isTransactionCoordinatorEnabled()
                 && 
serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) {
             TransactionMetadataStoreService metadataStoreService =
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index e23286ae449..07c33107d4b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -610,11 +610,16 @@ public abstract class PulsarWebResource {
         NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, 
bundleRange);
         NamespaceService nsService = pulsar().getNamespaceService();
 
+        if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
+            return nsService.checkOwnershipPresentAsync(nsBundle);
+        }
+
         LookupOptions options = LookupOptions.builder()
                 .authoritative(false)
                 .requestHttps(isRequestHttps())
                 .readOnly(true)
                 .loadTopicsInBundle(false).build();
+
         return nsService.getWebServiceUrlAsync(nsBundle, 
options).thenApply(Optional::isPresent);
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
index 9e20fccff6d..651a544a04e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.policies.data.TopicType;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -65,18 +64,11 @@ public abstract class ExtensibleLoadManagerImplBaseTest 
extends MockedPulsarServ
     }
 
     protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
-        // Set the inflight state waiting time and ownership monitor delay 
time to 5 seconds to avoid
-        // stuck when doing unload.
-        conf.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 
1000);
-        conf.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1);
         conf.setForceDeleteNamespaceAllowed(true);
-        conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
-        conf.setAllowAutoTopicCreation(true);
         
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
         
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
         conf.setLoadBalancerSheddingEnabled(false);
         conf.setLoadBalancerDebugModeEnabled(true);
-        conf.setTopicLevelPoliciesEnabled(true);
         return conf;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 308a755235c..aee57f9d260 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -67,6 +67,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -264,6 +265,32 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         assertTrue(brokerLookupData.isPresent());
     }
 
+    @Test(timeOut = 30 * 1000)
+    public void testUnloadUponTopicLookupFailure() throws Exception {
+        TopicName topicName =
+                TopicName.get("public/test/testUnloadUponTopicLookupFailure");
+        NamespaceBundle bundle = 
pulsar1.getNamespaceService().getBundle(topicName);
+        primaryLoadManager.assign(Optional.empty(), bundle).get();
+
+        CompletableFuture future1 = new CompletableFuture();
+        CompletableFuture future2 = new CompletableFuture();
+        try {
+            pulsar1.getBrokerService().getTopics().put(topicName.toString(), 
future1);
+            pulsar2.getBrokerService().getTopics().put(topicName.toString(), 
future2);
+            CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS).execute(() 
-> {
+                future1.completeExceptionally(new CompletionException(
+                        new 
BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
+                future2.completeExceptionally(new CompletionException(
+                        new 
BrokerServiceException.ServiceUnitNotReadyException("Please redo the lookup")));
+            });
+            
admin.namespaces().unloadNamespaceBundle(bundle.getNamespaceObject().toString(),
 bundle.getBundleRange());
+        } finally {
+            
pulsar1.getBrokerService().getTopics().remove(topicName.toString());
+            
pulsar2.getBrokerService().getTopics().remove(topicName.toString());
+        }
+    }
+
+
     @Test(timeOut = 30 * 1000)
     public void testUnloadAdminAPI() throws Exception {
         Pair<TopicName, NamespaceBundle> topicAndBundle = 
getBundleIsNotOwnByChangeEventTopic("test-unload");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
index 56c28966ac2..5d0abea3357 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java
@@ -93,53 +93,59 @@ public class UnloadManagerTest {
     public void testSuccess() throws IllegalAccessException, 
ExecutionException, InterruptedException {
         UnloadCounter counter = new UnloadCounter();
         UnloadManager manager = new UnloadManager(counter, "mockBrokerId");
+        String dstBroker = "broker-2";
+        String srcBroker = "broker-1";
+        String bundle = "bundle-1";
         var unloadDecision =
-                new UnloadDecision(new Unload("broker-1", "bundle-1"), 
Success, Admin);
+                new UnloadDecision(new Unload(srcBroker, bundle), Success, 
Admin);
         CompletableFuture<Void> future =
                 manager.waitAsync(CompletableFuture.completedFuture(null),
-                        "bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
+                        bundle, unloadDecision, 5, TimeUnit.SECONDS);
         Map<String, CompletableFuture<Void>> inFlightUnloadRequestMap = 
getInFlightUnloadRequestMap(manager);
 
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Assigning, 
"broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Assigning, null, 
srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Deleted, "broker-1", 
VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Deleted, null, 
srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Splitting, 
"broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Splitting, null, 
srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Releasing, 
"broker-1", VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Releasing, null, 
srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Init, "broker-1", 
VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Init, null, 
srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Free, "broker-1", 
VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Free, null, 
srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 0);
         future.get();
         
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1);
 
         // Success with Owned state.
         future = manager.waitAsync(CompletableFuture.completedFuture(null),
-                "bundle-1", unloadDecision, 5, TimeUnit.SECONDS);
+                bundle, unloadDecision, 5, TimeUnit.SECONDS);
         inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager);
+        assertEquals(inFlightUnloadRequestMap.size(), 1);
 
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, 
null, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
-        manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", 
VERSION_ID_INIT), null);
+        manager.handleEvent(bundle,
+                new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, 
srcBroker, VERSION_ID_INIT), null);
         assertEquals(inFlightUnloadRequestMap.size(), 0);
-        future.get();
 
+        future.get();
         
assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2);
     }
 
@@ -157,7 +163,7 @@ public class UnloadManagerTest {
         assertEquals(inFlightUnloadRequestMap.size(), 1);
 
         manager.handleEvent("bundle-1",
-                new ServiceUnitStateData(ServiceUnitState.Owned, "broker-1", 
VERSION_ID_INIT),
+                new ServiceUnitStateData(ServiceUnitState.Owned, null, 
"broker-1", VERSION_ID_INIT),
                 new IllegalStateException("Failed stage."));
 
         try {

Reply via email to