This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dba0b65 Fix topic getting recreated immediately after deletion (#7524)
dba0b65 is described below
commit dba0b65a2fd03586cb3d17a306fbda5322318d22
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jul 13 17:00:59 2020 -0700
Fix topic getting recreated immediately after deletion (#7524)
---
.../pulsar/broker/lookup/TopicLookupBase.java | 10 ++-
.../pulsar/broker/namespace/LookupOptions.java | 54 ++++++++++++++
.../pulsar/broker/namespace/NamespaceService.java | 83 ++++++++--------------
.../pulsar/broker/web/PulsarWebResource.java | 23 +++++-
.../apache/pulsar/broker/admin/NamespacesTest.java | 29 ++++----
.../namespace/NamespaceOwnershipListenerTests.java | 6 +-
6 files changed, 130 insertions(+), 75 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 3c33949..cd140d5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -79,7 +80,7 @@ public class TopicLookupBase extends PulsarWebResource {
}
CompletableFuture<Optional<LookupResult>> lookupFuture =
pulsar().getNamespaceService()
- .getBrokerServiceUrlAsync(topicName, authoritative);
+ .getBrokerServiceUrlAsync(topicName,
LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());
lookupFuture.thenAccept(optionalResult -> {
if (optionalResult == null || !optionalResult.isPresent()) {
@@ -251,7 +252,12 @@ public class TopicLookupBase extends PulsarWebResource {
if (validationFailureResponse != null) {
lookupfuture.complete(validationFailureResponse);
} else {
-
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName,
authoritative, advertisedListenerName)
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(authoritative)
+ .advertisedListenerName(advertisedListenerName)
+ .loadTopicsInBundle(true)
+ .build();
+
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
.thenAccept(lookupResult -> {
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
new file mode 100644
index 0000000..c3a659d
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/LookupOptions.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import lombok.Builder;
+import lombok.Data;
+
+import org.apache.commons.lang3.StringUtils;
+
+@Data
+@Builder
+public class LookupOptions {
+ /**
+ * If authoritative, it means the lookup had already been redirected here
by a different broker
+ */
+ private final boolean authoritative;
+
+ /**
+ * If read-only, do not attempt to acquire ownership
+ */
+ private final boolean readOnly;
+
+ /**
+ * After acquiring the ownership, load all the topics
+ */
+ private final boolean loadTopicsInBundle;
+
+ /**
+ * The lookup request was made through HTTPs
+ */
+ private final boolean requestHttps;
+
+ private final String advertisedListenerName;
+
+ public boolean hasAdvertisedListenerName() {
+ return StringUtils.isNotBlank(advertisedListenerName);
+ }
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 77bd62e..160c418 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -172,15 +172,9 @@ public class NamespaceService {
}
}
- public CompletableFuture<Optional<LookupResult>>
getBrokerServiceUrlAsync(TopicName topic,
- boolean authoritative) {
- return getBrokerServiceUrlAsync(topic, authoritative, null);
- }
-
- public CompletableFuture<Optional<LookupResult>>
getBrokerServiceUrlAsync(TopicName topic, boolean authoritative,
-
final String advertisedListenerName) {
+ public CompletableFuture<Optional<LookupResult>>
getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
return getBundleAsync(topic)
- .thenCompose(bundle -> findBrokerServiceUrl(bundle,
authoritative, false /* read-only */, advertisedListenerName));
+ .thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
}
public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
@@ -210,38 +204,36 @@ public class NamespaceService {
*
* If the service unit is not owned, return an empty optional
*/
- public Optional<URL> getWebServiceUrl(ServiceUnitId suName, boolean
authoritative, boolean isRequestHttps,
- boolean readOnly) throws Exception {
+ public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions
options) throws Exception {
if (suName instanceof TopicName) {
TopicName name = (TopicName) suName;
if (LOG.isDebugEnabled()) {
- LOG.debug("Getting web service URL of topic: {} - auth: {}",
name, authoritative);
+ LOG.debug("Getting web service URL of topic: {} - options:
{}", name, options);
}
- return this.internalGetWebServiceUrl(getBundle(name),
authoritative, isRequestHttps, readOnly)
+ return this.internalGetWebServiceUrl(getBundle(name), options)
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}
if (suName instanceof NamespaceName) {
- return this.internalGetWebServiceUrl(getFullBundle((NamespaceName)
suName), authoritative, isRequestHttps,
-
readOnly).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(),
SECONDS);
+ return this.internalGetWebServiceUrl(getFullBundle((NamespaceName)
suName), options)
+
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}
if (suName instanceof NamespaceBundle) {
- return this.internalGetWebServiceUrl((NamespaceBundle) suName,
authoritative, isRequestHttps, readOnly)
+ return this.internalGetWebServiceUrl((NamespaceBundle) suName,
options)
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}
throw new IllegalArgumentException("Unrecognized class of
NamespaceBundle: " + suName.getClass().getName());
}
- private CompletableFuture<Optional<URL>>
internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative,
- boolean isRequestHttps, boolean readOnly) {
+ private CompletableFuture<Optional<URL>>
internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {
- return findBrokerServiceUrl(bundle, authoritative,
readOnly).thenApply(lookupResult -> {
+ return findBrokerServiceUrl(bundle, options).thenApply(lookupResult ->
{
if (lookupResult.isPresent()) {
try {
LookupData lookupData = lookupResult.get().getLookupData();
- final String redirectUrl = isRequestHttps ?
lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
+ final String redirectUrl = options.isRequestHttps() ?
lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
return Optional.of(new URL(redirectUrl));
} catch (Exception e) {
// just log the exception, nothing else to do
@@ -329,19 +321,6 @@ public class NamespaceService {
= new ConcurrentOpenHashMap<>();
/**
- * Main internal method to lookup and setup ownership of service unit to a
broker.
- *
- * @param bundle
- * @param authoritative
- * @param readOnly
- * @return
- */
- private CompletableFuture<Optional<LookupResult>>
findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
-
boolean readOnly) {
- return findBrokerServiceUrl(bundle, authoritative, readOnly, null);
- }
-
- /**
* Main internal method to lookup and setup ownership of service unit to a
broker
*
* @param bundle
@@ -351,14 +330,13 @@ public class NamespaceService {
* @return
* @throws PulsarServerException
*/
- private CompletableFuture<Optional<LookupResult>>
findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
- boolean readOnly, final String advertisedListenerName) {
+ private CompletableFuture<Optional<LookupResult>>
findBrokerServiceUrl(NamespaceBundle bundle, LookupOptions options) {
if (LOG.isDebugEnabled()) {
- LOG.debug("findBrokerServiceUrl: {} - read-only: {}", bundle,
readOnly);
+ LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle,
options);
}
ConcurrentOpenHashMap<NamespaceBundle,
CompletableFuture<Optional<LookupResult>>> targetMap;
- if (authoritative) {
+ if (options.isAuthoritative()) {
targetMap = findingBundlesAuthoritative;
} else {
targetMap = findingBundlesNotAuthoritative;
@@ -372,13 +350,13 @@ public class NamespaceService {
if (!nsData.isPresent()) {
// No one owns this bundle
- if (readOnly) {
+ if (options.isReadOnly()) {
// Do not attempt to acquire ownership
future.complete(Optional.empty());
} else {
// Now, no one owns the namespace yet. Hence, we will
try to dynamically assign it
pulsar.getExecutor().execute(() -> {
- searchForCandidateBroker(bundle, future,
authoritative, advertisedListenerName);
+ searchForCandidateBroker(bundle, future, options);
});
}
} else if (nsData.get().isDisabled()) {
@@ -389,11 +367,11 @@ public class NamespaceService {
LOG.debug("Namespace bundle {} already owned by {} ",
bundle, nsData);
}
// find the target
- if (StringUtils.isNotBlank(advertisedListenerName)) {
- AdvertisedListener listener =
nsData.get().getAdvertisedListeners().get(advertisedListenerName);
+ if (options.hasAdvertisedListenerName()) {
+ AdvertisedListener listener =
nsData.get().getAdvertisedListeners().get(options.getAdvertisedListenerName());
if (listener == null) {
future.completeExceptionally(
- new PulsarServerException("the broker do
not have " + advertisedListenerName + " listener"));
+ new PulsarServerException("the broker do
not have " + options.getAdvertisedListenerName() + " listener"));
} else {
future.complete(Optional.of(new
LookupResult(nsData.get(),
listener.getBrokerServiceUrl().toString(),
listener.getBrokerServiceUrlTls().toString())));
@@ -418,13 +396,8 @@ public class NamespaceService {
}
private void searchForCandidateBroker(NamespaceBundle bundle,
- CompletableFuture<Optional<LookupResult>> lookupFuture, boolean
authoritative) {
- searchForCandidateBroker(bundle, lookupFuture, authoritative, null);
- }
-
- private void searchForCandidateBroker(NamespaceBundle bundle,
-
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative,
- final String advertisedListenerName)
{
+
CompletableFuture<Optional<LookupResult>> lookupFuture,
+ LookupOptions options) {
String candidateBroker = null;
boolean authoritativeRedirect =
pulsar.getLeaderElectionService().isLeader();
@@ -440,7 +413,7 @@ public class NamespaceService {
}
if (candidateBroker == null) {
- if (authoritative) {
+ if (options.isAuthoritative()) {
// leader broker already assigned the current broker as
owner
candidateBroker = pulsar.getSafeWebServiceAddress();
} else if (!this.loadManager.get().isCentralized()
@@ -486,14 +459,16 @@ public class NamespaceService {
} else {
// Found owner for the namespace bundle
- // Schedule the task to pre-load topics
- pulsar.loadNamespaceTopics(bundle);
+ if (options.isLoadTopicsInBundle()) {
+ // Schedule the task to pre-load topics
+ pulsar.loadNamespaceTopics(bundle);
+ }
// find the target
- if (StringUtils.isNotBlank(advertisedListenerName)) {
- AdvertisedListener listener =
ownerInfo.getAdvertisedListeners().get(advertisedListenerName);
+ if (options.hasAdvertisedListenerName()) {
+ AdvertisedListener listener =
ownerInfo.getAdvertisedListeners().get(options.getAdvertisedListenerName());
if (listener == null) {
lookupFuture.completeExceptionally(
- new PulsarServerException("the broker
do not have " + advertisedListenerName + " listener"));
+ new PulsarServerException("the broker
do not have " + options.getAdvertisedListenerName() + " listener"));
return;
} else {
lookupFuture.complete(Optional.of(new
LookupResult(ownerInfo, listener.getBrokerServiceUrl().toString(),
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 076c88a..fb7877e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -488,8 +489,14 @@ public abstract class PulsarWebResource {
String bundleRange) {
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles,
bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();
+
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(false)
+ .requestHttps(isRequestHttps())
+ .readOnly(true)
+ .loadTopicsInBundle(false).build();
try {
- return nsService.getWebServiceUrl(nsBundle, /*authoritative */
false, isRequestHttps(), /* read-only */ true).isPresent();
+ return nsService.getWebServiceUrl(nsBundle, options).isPresent();
} catch (Exception e) {
log.error("[{}] Failed to check whether namespace bundle is owned
{}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
throw new RestException(e);
@@ -525,7 +532,12 @@ public abstract class PulsarWebResource {
// - If authoritative is false and this broker is not leader,
forward to leader
// - If authoritative is false and this broker is leader,
determine owner and forward w/ authoritative=true
// - If authoritative is true, own the namespace and continue
- Optional<URL> webUrl = nsService.getWebServiceUrl(bundle,
authoritative, isRequestHttps(), readOnly);
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(authoritative)
+ .requestHttps(isRequestHttps())
+ .readOnly(readOnly)
+ .loadTopicsInBundle(false).build();
+ Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, options);
// Ensure we get a url
if (webUrl == null || !webUrl.isPresent()) {
log.warn("Unable to get web service url");
@@ -581,7 +593,12 @@ public abstract class PulsarWebResource {
try {
// per function name, this is trying to acquire the whole
namespace ownership
- Optional<URL> webUrl = nsService.getWebServiceUrl(topicName,
authoritative, isRequestHttps(), false);
+ LookupOptions options = LookupOptions.builder()
+ .authoritative(authoritative)
+ .requestHttps(isRequestHttps())
+ .readOnly(false)
+ .loadTopicsInBundle(false).build();
+ Optional<URL> webUrl = nsService.getWebServiceUrl(topicName,
options);
// Ensure we get a url
if (webUrl == null || !webUrl.isPresent()) {
log.info("Unable to get web service url");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 3bcb1d1..21235b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
@@ -642,7 +643,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
public boolean matches(NamespaceName nsname) {
return
nsname.equals(NamespacesTest.this.testGlobalNamespaces.get(0));
}
- }), Mockito.anyBoolean(), Mockito.anyBoolean(),
Mockito.anyBoolean());
+ }), Mockito.any());
admin.namespaces().setNamespaceReplicationClusters(testGlobalNamespaces.get(0).toString(),
Sets.newHashSet("usw"));
@@ -677,7 +678,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
// setup ownership to localhost
URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
-
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
false, false, false);
+ LookupOptions options =
LookupOptions.builder().authoritative(false).readOnly(false).requestHttps(false).build();
+
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
options);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
response = mock(AsyncResponse.class);
@@ -705,7 +707,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
testNs = this.testGlobalNamespaces.get(0);
// setup ownership to localhost
-
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
false, false, false);
+
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
options);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
@@ -715,7 +717,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
testNs = this.testLocalNamespaces.get(0);
// setup ownership to localhost
-
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
false, false, false);
+
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
options);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
@@ -731,7 +733,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
// ensure refreshed topics list in the cache
pulsar.getLocalZkCacheService().managedLedgerListCache().clearTree();
// setup ownership to localhost
-
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
false, false, false);
+
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs,
options);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), false);
@@ -757,7 +759,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
public boolean matches(NamespaceBundle bundle) {
return bundle.getNamespaceObject().equals(testNs);
}
- }), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+ }), Mockito.any());
doReturn(false).when(nsSvc).isServiceUnitOwned(Mockito.argThat(new
ArgumentMatcher<NamespaceBundle>() {
@Override
public boolean matches(NamespaceBundle bundle) {
@@ -794,8 +796,8 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
NamespaceBundles nsBundles =
nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
// make one bundle owned
-
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0),
false,
- true, false);
+ LookupOptions optionsHttps =
LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
+
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0),
optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
testTenant + "/" + testLocalCluster + "/" + bundledNsLocal,
"0x00000000_0x80000000");
@@ -816,7 +818,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
// ensure all three bundles are owned by the local broker
for (NamespaceBundle bundle : nsBundles.getBundles()) {
-
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(bundle,
false, true, false);
+
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(bundle,
optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
}
doNothing().when(namespacesAdmin).deleteNamespaceBundle(Mockito.anyString(),
Mockito.anyString());
@@ -827,7 +829,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
final NamespaceName testNs = this.testLocalNamespaces.get(1);
URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
- .getWebServiceUrl(Mockito.argThat(ns -> ns.equals(testNs)),
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean());
+ .getWebServiceUrl(Mockito.argThat(ns -> ns.equals(testNs)),
Mockito.any());
doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(ns ->
ns.equals(testNs)));
NamespaceBundle bundle =
nsSvc.getNamespaceBundleFactory().getFullBundle(testNs);
@@ -907,14 +909,15 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
.getWebServiceUrl(Mockito.argThat(bundle ->
bundle.getNamespaceObject().equals(testNs)),
- Mockito.anyBoolean(), Mockito.anyBoolean(),
Mockito.anyBoolean());
+ Mockito.any());
doReturn(true).when(nsSvc)
.isServiceUnitOwned(Mockito.argThat(bundle ->
bundle.getNamespaceObject().equals(testNs)));
NamespaceBundles nsBundles =
nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
NamespaceBundle testBundle = nsBundles.getBundles().get(0);
// make one bundle owned
-
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testBundle,
false, true, false);
+ LookupOptions optionsHttps =
LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
+
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testBundle,
optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(testBundle);
doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
AsyncResponse response = mock(AsyncResponse.class);
@@ -1291,7 +1294,7 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace)
throws Exception {
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc)
.getWebServiceUrl(Mockito.argThat(bundle ->
bundle.getNamespaceObject().equals(namespace)),
- Mockito.anyBoolean(), Mockito.anyBoolean(),
Mockito.anyBoolean());
+ Mockito.any());
doReturn(true).when(nsSvc)
.isServiceUnitOwned(Mockito.argThat(bundle ->
bundle.getNamespaceObject().equals(namespace)));
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
index f2fe3f5..e1bf228 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceOwnershipListenerTests.java
@@ -62,13 +62,13 @@ public class NamespaceOwnershipListenerTests extends
BrokerTestBase {
final AtomicBoolean onLoad = new AtomicBoolean(false);
final AtomicBoolean unLoad = new AtomicBoolean(false);
- final String namespace = "prop/ns-test-1";
+ final String namespace = "prop/" + UUID.randomUUID().toString();
pulsar.getNamespaceService().addNamespaceBundleOwnershipListener(new
NamespaceBundleOwnershipListener() {
@Override
public boolean test(NamespaceBundle namespaceBundle) {
- return
namespaceBundle.getNamespaceObject().getLocalName().equals("ns-test-1");
+ return
namespaceBundle.getNamespaceObject().toString().equals(namespace);
}
@Override
@@ -95,7 +95,7 @@ public class NamespaceOwnershipListenerTests extends
BrokerTestBase {
producer.close();
- admin.namespaces().unload("prop/ns-test-1");
+ admin.namespaces().unload(namespace);
countDownLatch.await();