This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dba0b65  Fix topic getting recreated immediately after deletion (#7524)
dba0b65 is described below

commit dba0b65a2fd03586cb3d17a306fbda5322318d22
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jul 13 17:00:59 2020 -0700

    Fix topic getting recreated immediately after deletion (#7524)
---
 .../pulsar/broker/lookup/TopicLookupBase.java      | 10 ++-
 .../pulsar/broker/namespace/LookupOptions.java     | 54 ++++++++++++++
 .../pulsar/broker/namespace/NamespaceService.java  | 83 ++++++++--------------
 .../pulsar/broker/web/PulsarWebResource.java       | 23 +++++-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 29 ++++----
 .../namespace/NamespaceOwnershipListenerTests.java |  6 +-
 6 files changed, 130 insertions(+), 75 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 3c33949..cd140d5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -79,7 +80,7 @@ public class TopicLookupBase extends PulsarWebResource {
         }
 
         CompletableFuture<Optional<LookupResult>> lookupFuture = 
pulsar().getNamespaceService()
-                .getBrokerServiceUrlAsync(topicName, authoritative);
+                .getBrokerServiceUrlAsync(topicName, 
LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());
 
         lookupFuture.thenAccept(optionalResult -> {
             if (optionalResult == null || !optionalResult.isPresent()) {
@@ -251,7 +252,12 @@ public class TopicLookupBase extends PulsarWebResource {
             if (validationFailureResponse != null) {
                 lookupfuture.complete(validationFailureResponse);
             } else {
-                
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, 
authoritative, advertisedListenerName)
+                LookupOptions options = LookupOptions.builder()
+                        .authoritative(authoritative)
+                        .advertisedListenerName(advertisedListenerName)
+                        .loadTopicsInBundle(true)
+                        .build();
+                
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
                         .thenAccept(lookupResult -> {
 
                             if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
new file mode 100644
index 0000000..c3a659d
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import lombok.Builder;
+import lombok.Data;
+
+import org.apache.commons.lang3.StringUtils;
+
+@Data
+@Builder
+public class LookupOptions {
+    /**
+     * If authoritative, it means the lookup had already been redirected here 
by a different broker
+     */
+    private final boolean authoritative;
+
+    /**
+     * If read-only, do not attempt to acquire ownership
+     */
+    private final boolean readOnly;
+
+    /**
+     * After acquiring the ownership, load all the topics
+     */
+    private final boolean loadTopicsInBundle;
+
+    /**
+     * The lookup request was made through HTTPs
+     */
+    private final boolean requestHttps;
+
+    private final String advertisedListenerName;
+
+    public boolean hasAdvertisedListenerName() {
+        return StringUtils.isNotBlank(advertisedListenerName);
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 77bd62e..160c418 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -172,15 +172,9 @@ public class NamespaceService {
         }
     }
 
-    public CompletableFuture<Optional<LookupResult>> 
getBrokerServiceUrlAsync(TopicName topic,
-            boolean authoritative) {
-        return getBrokerServiceUrlAsync(topic, authoritative, null);
-    }
-
-    public CompletableFuture<Optional<LookupResult>> 
getBrokerServiceUrlAsync(TopicName topic, boolean authoritative,
-                                                                              
final String advertisedListenerName) {
+    public CompletableFuture<Optional<LookupResult>> 
getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
         return getBundleAsync(topic)
-                .thenCompose(bundle -> findBrokerServiceUrl(bundle, 
authoritative, false /* read-only */, advertisedListenerName));
+                .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
     }
 
     public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
@@ -210,38 +204,36 @@ public class NamespaceService {
      *
      * If the service unit is not owned, return an empty optional
      */
-    public Optional<URL> getWebServiceUrl(ServiceUnitId suName, boolean 
authoritative, boolean isRequestHttps,
-            boolean readOnly) throws Exception {
+    public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions 
options) throws Exception {
         if (suName instanceof TopicName) {
             TopicName name = (TopicName) suName;
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Getting web service URL of topic: {} - auth: {}", 
name, authoritative);
+                LOG.debug("Getting web service URL of topic: {} - options: 
{}", name, options);
             }
-            return this.internalGetWebServiceUrl(getBundle(name), 
authoritative, isRequestHttps, readOnly)
+            return this.internalGetWebServiceUrl(getBundle(name), options)
                     
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
         }
 
         if (suName instanceof NamespaceName) {
-            return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) 
suName), authoritative, isRequestHttps,
-                    
readOnly).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), 
SECONDS);
+            return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) 
suName), options)
+                    
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
         }
 
         if (suName instanceof NamespaceBundle) {
-            return this.internalGetWebServiceUrl((NamespaceBundle) suName, 
authoritative, isRequestHttps, readOnly)
+            return this.internalGetWebServiceUrl((NamespaceBundle) suName, 
options)
                     
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
         }
 
         throw new IllegalArgumentException("Unrecognized class of 
NamespaceBundle: " + suName.getClass().getName());
     }
 
-    private CompletableFuture<Optional<URL>> 
internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative,
-            boolean isRequestHttps, boolean readOnly) {
+    private CompletableFuture<Optional<URL>> 
internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {
 
-        return findBrokerServiceUrl(bundle, authoritative, 
readOnly).thenApply(lookupResult -> {
+        return findBrokerServiceUrl(bundle, options).thenApply(lookupResult -> 
{
             if (lookupResult.isPresent()) {
                 try {
                     LookupData lookupData = lookupResult.get().getLookupData();
-                    final String redirectUrl = isRequestHttps ? 
lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
+                    final String redirectUrl = options.isRequestHttps() ? 
lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
                     return Optional.of(new URL(redirectUrl));
                 } catch (Exception e) {
                     // just log the exception, nothing else to do
@@ -329,19 +321,6 @@ public class NamespaceService {
         = new ConcurrentOpenHashMap<>();
 
     /**
-     * Main internal method to lookup and setup ownership of service unit to a 
broker.
-     *
-     * @param bundle
-     * @param authoritative
-     * @param readOnly
-     * @return
-     */
-    private CompletableFuture<Optional<LookupResult>> 
findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
-                                                                           
boolean readOnly) {
-        return findBrokerServiceUrl(bundle, authoritative, readOnly, null);
-    }
-
-    /**
      * Main internal method to lookup and setup ownership of service unit to a 
broker
      *
      * @param bundle
@@ -351,14 +330,13 @@ public class NamespaceService {
      * @return
      * @throws PulsarServerException
      */
-    private CompletableFuture<Optional<LookupResult>> 
findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
-            boolean readOnly, final String advertisedListenerName) {
+    private CompletableFuture<Optional<LookupResult>> 
findBrokerServiceUrl(NamespaceBundle bundle, LookupOptions options) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("findBrokerServiceUrl: {} - read-only: {}", bundle, 
readOnly);
+            LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle, 
options);
         }
 
         ConcurrentOpenHashMap<NamespaceBundle, 
CompletableFuture<Optional<LookupResult>>> targetMap;
-        if (authoritative) {
+        if (options.isAuthoritative()) {
             targetMap = findingBundlesAuthoritative;
         } else {
             targetMap = findingBundlesNotAuthoritative;
@@ -372,13 +350,13 @@ public class NamespaceService {
                 if (!nsData.isPresent()) {
                     // No one owns this bundle
 
-                    if (readOnly) {
+                    if (options.isReadOnly()) {
                         // Do not attempt to acquire ownership
                         future.complete(Optional.empty());
                     } else {
                         // Now, no one owns the namespace yet. Hence, we will 
try to dynamically assign it
                         pulsar.getExecutor().execute(() -> {
-                            searchForCandidateBroker(bundle, future, 
authoritative, advertisedListenerName);
+                            searchForCandidateBroker(bundle, future, options);
                         });
                     }
                 } else if (nsData.get().isDisabled()) {
@@ -389,11 +367,11 @@ public class NamespaceService {
                         LOG.debug("Namespace bundle {} already owned by {} ", 
bundle, nsData);
                     }
                     // find the target
-                    if (StringUtils.isNotBlank(advertisedListenerName)) {
-                        AdvertisedListener listener = 
nsData.get().getAdvertisedListeners().get(advertisedListenerName);
+                    if (options.hasAdvertisedListenerName()) {
+                        AdvertisedListener listener = 
nsData.get().getAdvertisedListeners().get(options.getAdvertisedListenerName());
                         if (listener == null) {
                             future.completeExceptionally(
-                                    new PulsarServerException("the broker do 
not have " + advertisedListenerName + " listener"));
+                                    new PulsarServerException("the broker do 
not have " + options.getAdvertisedListenerName() + " listener"));
                         } else {
                             future.complete(Optional.of(new 
LookupResult(nsData.get(),
                                     listener.getBrokerServiceUrl().toString(), 
listener.getBrokerServiceUrlTls().toString())));
@@ -418,13 +396,8 @@ public class NamespaceService {
     }
 
     private void searchForCandidateBroker(NamespaceBundle bundle,
-            CompletableFuture<Optional<LookupResult>> lookupFuture, boolean 
authoritative) {
-        searchForCandidateBroker(bundle, lookupFuture, authoritative, null);
-    }
-
-    private void searchForCandidateBroker(NamespaceBundle bundle,
-                                          
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative,
-                                          final String advertisedListenerName) 
{
+                                          
CompletableFuture<Optional<LookupResult>> lookupFuture,
+                                          LookupOptions options) {
         String candidateBroker = null;
         boolean authoritativeRedirect = 
pulsar.getLeaderElectionService().isLeader();
 
@@ -440,7 +413,7 @@ public class NamespaceService {
             }
 
             if (candidateBroker == null) {
-                if (authoritative) {
+                if (options.isAuthoritative()) {
                     // leader broker already assigned the current broker as 
owner
                     candidateBroker = pulsar.getSafeWebServiceAddress();
                 } else if (!this.loadManager.get().isCentralized()
@@ -486,14 +459,16 @@ public class NamespaceService {
                     } else {
                         // Found owner for the namespace bundle
 
-                        // Schedule the task to pre-load topics
-                        pulsar.loadNamespaceTopics(bundle);
+                        if (options.isLoadTopicsInBundle()) {
+                            // Schedule the task to pre-load topics
+                            pulsar.loadNamespaceTopics(bundle);
+                        }
                         // find the target
-                        if (StringUtils.isNotBlank(advertisedListenerName)) {
-                            AdvertisedListener listener = 
ownerInfo.getAdvertisedListeners().get(advertisedListenerName);
+                        if (options.hasAdvertisedListenerName()) {
+                            AdvertisedListener listener = 
ownerInfo.getAdvertisedListeners().get(options.getAdvertisedListenerName());
                             if (listener == null) {
                                 lookupFuture.completeExceptionally(
-                                        new PulsarServerException("the broker 
do not have " + advertisedListenerName + " listener"));
+                                        new PulsarServerException("the broker 
do not have " + options.getAdvertisedListenerName() + " listener"));
                                 return;
                             } else {
                                 lookupFuture.complete(Optional.of(new 
LookupResult(ownerInfo, listener.getBrokerServiceUrl().toString(),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 076c88a..fb7877e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -488,8 +489,14 @@ public abstract class PulsarWebResource {
             String bundleRange) {
         NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, 
bundleRange);
         NamespaceService nsService = pulsar().getNamespaceService();
+
+        LookupOptions options = LookupOptions.builder()
+                .authoritative(false)
+                .requestHttps(isRequestHttps())
+                .readOnly(true)
+                .loadTopicsInBundle(false).build();
         try {
-            return nsService.getWebServiceUrl(nsBundle, /*authoritative */ 
false, isRequestHttps(), /* read-only */ true).isPresent();
+            return nsService.getWebServiceUrl(nsBundle, options).isPresent();
         } catch (Exception e) {
             log.error("[{}] Failed to check whether namespace bundle is owned 
{}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
             throw new RestException(e);
@@ -525,7 +532,12 @@ public abstract class PulsarWebResource {
             // - If authoritative is false and this broker is not leader, 
forward to leader
             // - If authoritative is false and this broker is leader, 
determine owner and forward w/ authoritative=true
             // - If authoritative is true, own the namespace and continue
-            Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, 
authoritative, isRequestHttps(), readOnly);
+            LookupOptions options = LookupOptions.builder()
+                    .authoritative(authoritative)
+                    .requestHttps(isRequestHttps())
+                    .readOnly(readOnly)
+                    .loadTopicsInBundle(false).build();
+            Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, options);
             // Ensure we get a url
             if (webUrl == null || !webUrl.isPresent()) {
                 log.warn("Unable to get web service url");
@@ -581,7 +593,12 @@ public abstract class PulsarWebResource {
 
         try {
             // per function name, this is trying to acquire the whole 
namespace ownership
-            Optional<URL> webUrl = nsService.getWebServiceUrl(topicName, 
authoritative, isRequestHttps(), false);
+            LookupOptions options = LookupOptions.builder()
+                    .authoritative(authoritative)
+                    .requestHttps(isRequestHttps())
+                    .readOnly(false)
+                    .loadTopicsInBundle(false).build();
+            Optional<URL> webUrl = nsService.getWebServiceUrl(topicName, 
options);
             // Ensure we get a url
             if (webUrl == null || !webUrl.isPresent()) {
                 log.info("Unable to get web service url");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 3bcb1d1..21235b6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.admin.v1.Namespaces;
 import org.apache.pulsar.broker.admin.v1.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.namespace.LookupOptions;
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
@@ -642,7 +643,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
                     public boolean matches(NamespaceName nsname) {
                         return 
nsname.equals(NamespacesTest.this.testGlobalNamespaces.get(0));
                     }
-                }), Mockito.anyBoolean(), Mockito.anyBoolean(), 
Mockito.anyBoolean());
+                }), Mockito.any());
 
         
admin.namespaces().setNamespaceReplicationClusters(testGlobalNamespaces.get(0).toString(),
                 Sets.newHashSet("usw"));
@@ -677,7 +678,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
         // setup ownership to localhost
         URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
-        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
false, false, false);
+        LookupOptions options = 
LookupOptions.builder().authoritative(false).readOnly(false).requestHttps(false).build();
+        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
options);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
 
         response = mock(AsyncResponse.class);
@@ -705,7 +707,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
         testNs = this.testGlobalNamespaces.get(0);
         // setup ownership to localhost
-        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
false, false, false);
+        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
options);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
         response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
@@ -715,7 +717,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
         testNs = this.testLocalNamespaces.get(0);
         // setup ownership to localhost
-        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
false, false, false);
+        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
options);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
         response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
@@ -731,7 +733,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         // ensure refreshed topics list in the cache
         pulsar.getLocalZkCacheService().managedLedgerListCache().clearTree();
         // setup ownership to localhost
-        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
false, false, false);
+        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, 
options);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
         response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, testNs.getTenant(), 
testNs.getCluster(), testNs.getLocalName(), false);
@@ -757,7 +759,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
             public boolean matches(NamespaceBundle bundle) {
                 return bundle.getNamespaceObject().equals(testNs);
             }
-        }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+        }), Mockito.any());
         doReturn(false).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new 
ArgumentMatcher<NamespaceBundle>() {
             @Override
             public boolean matches(NamespaceBundle bundle) {
@@ -794,8 +796,8 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
         NamespaceBundles nsBundles = 
nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
         // make one bundle owned
-        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0),
 false,
-                true, false);
+        LookupOptions optionsHttps = 
LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
+        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0),
 optionsHttps);
         
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
         
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
                 testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, 
"0x00000000_0x80000000");
@@ -816,7 +818,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
         // ensure all three bundles are owned by the local broker
         for (NamespaceBundle bundle : nsBundles.getBundles()) {
-            
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(bundle, 
false, true, false);
+            
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(bundle, 
optionsHttps);
             doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
         }
         
doNothing().when(namespacesAdmin).deleteNamespaceBundle(Mockito.anyString(), 
Mockito.anyString());
@@ -827,7 +829,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         final NamespaceName testNs = this.testLocalNamespaces.get(1);
         URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
-                .getWebServiceUrl(Mockito.argThat(ns -> ns.equals(testNs)), 
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+                .getWebServiceUrl(Mockito.argThat(ns -> ns.equals(testNs)), 
Mockito.any());
         doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(ns -> 
ns.equals(testNs)));
 
         NamespaceBundle bundle = 
nsSvc.getNamespaceBundleFactory().getFullBundle(testNs);
@@ -907,14 +909,15 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
 
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
                 .getWebServiceUrl(Mockito.argThat(bundle -> 
bundle.getNamespaceObject().equals(testNs)),
-                        Mockito.anyBoolean(), Mockito.anyBoolean(), 
Mockito.anyBoolean());
+                        Mockito.any());
         doReturn(true).when(nsSvc)
                 .isServiceUnitOwned(Mockito.argThat(bundle -> 
bundle.getNamespaceObject().equals(testNs)));
 
         NamespaceBundles nsBundles = 
nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
         NamespaceBundle testBundle = nsBundles.getBundles().get(0);
         // make one bundle owned
-        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testBundle,
 false, true, false);
+        LookupOptions optionsHttps = 
LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
+        
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testBundle,
 optionsHttps);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testBundle);
         
doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
         AsyncResponse response = mock(AsyncResponse.class);
@@ -1291,7 +1294,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
     private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace) 
throws Exception {
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
                 .getWebServiceUrl(Mockito.argThat(bundle -> 
bundle.getNamespaceObject().equals(namespace)),
-                        Mockito.anyBoolean(), Mockito.anyBoolean(), 
Mockito.anyBoolean());
+                        Mockito.any());
         doReturn(true).when(nsSvc)
                 .isServiceUnitOwned(Mockito.argThat(bundle -> 
bundle.getNamespaceObject().equals(namespace)));
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
index f2fe3f5..e1bf228 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
@@ -62,13 +62,13 @@ public class NamespaceOwnershipListenerTests extends 
BrokerTestBase {
         final AtomicBoolean onLoad = new AtomicBoolean(false);
         final AtomicBoolean unLoad = new AtomicBoolean(false);
 
-        final String namespace = "prop/ns-test-1";
+        final String namespace = "prop/" + UUID.randomUUID().toString();
 
         pulsar.getNamespaceService().addNamespaceBundleOwnershipListener(new 
NamespaceBundleOwnershipListener() {
 
             @Override
             public boolean test(NamespaceBundle namespaceBundle) {
-                return 
namespaceBundle.getNamespaceObject().getLocalName().equals("ns-test-1");
+                return 
namespaceBundle.getNamespaceObject().toString().equals(namespace);
             }
 
             @Override
@@ -95,7 +95,7 @@ public class NamespaceOwnershipListenerTests extends 
BrokerTestBase {
 
         producer.close();
 
-        admin.namespaces().unload("prop/ns-test-1");
+        admin.namespaces().unload(namespace);
 
         countDownLatch.await();
 

Reply via email to