This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 6538d32 Fix topic ownership is not checked when get topic policy
(target branch-2.7) (#9781)
6538d32 is described below
commit 6538d32e6a9728957ae07ff2863d92d962b577a8
Author: Yong Zhang <[email protected]>
AuthorDate: Fri Mar 5 10:18:34 2021 +0800
Fix topic ownership is not checked when get topic policy (target
branch-2.7) (#9781)
### Motivation
1. Currently, the API of topic policies does not check the ownership of
topic. In the case of multiple brokers, a `cache not init error` will appear.
2. The parameter validation of the read and write API should not be the
same, and the read API should not verify `validatePoliciesReadOnlyAccess()`. If
`ReadOnly` is set, the read API will also be abnormal.
3. General API should not use `validateAdminAccessForTenant()`.
The original PR is #9767
---
.../broker/admin/impl/PersistentTopicsBase.java | 51 +-----------
.../pulsar/broker/admin/v2/PersistentTopics.java | 97 +++++++++++++---------
.../apache/pulsar/broker/admin/AdminApiTest2.java | 69 ++++++++++++++-
.../broker/auth/MockedPulsarServiceBaseTest.java | 14 +++-
4 files changed, 139 insertions(+), 92 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b2c2324..6b5fc60 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2401,15 +2401,9 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
- validateAdminAccessForTenant(namespaceName.getTenant());
- validatePoliciesReadOnlyAccess();
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
if (backlogQuotaType == null) {
backlogQuotaType =
BacklogQuota.BacklogQuotaType.destination_storage;
}
- checkTopicLevelPolicyEnable();
TopicPolicies topicPolicies;
try {
topicPolicies =
pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
@@ -2476,7 +2470,6 @@ public class PersistentTopicsBase extends AdminResource {
if (ttlInSecond != null && ttlInSecond < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value
for message TTL");
}
- preValidation();
TopicPolicies topicPolicies;
//Update existing topic policy or create a new one if not exist.
try {
@@ -2527,7 +2520,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected void internalGetRetention(AsyncResponse asyncResponse){
- preValidation();
Optional<RetentionPolicies> retention = getTopicPolicies(topicName)
.map(TopicPolicies::getRetentionPolicies);
if (!retention.isPresent()) {
@@ -2538,7 +2530,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalSetRetention(RetentionPolicies
retention) {
- preValidation();
if (retention == null) {
return CompletableFuture.completedFuture(null);
}
@@ -2563,7 +2554,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemoveRetention() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -2573,12 +2563,10 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected Optional<PersistencePolicies> internalGetPersistence(){
- preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
}
protected CompletableFuture<Void>
internalSetPersistence(PersistencePolicies persistencePolicies) {
- preValidation();
validatePersistencePolicies(persistencePolicies);
TopicPolicies topicPolicies =
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
@@ -2587,7 +2575,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemovePersistence() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -2603,20 +2590,16 @@ public class PersistentTopicsBase extends AdminResource
{
"and must be smaller than that in the broker-level");
}
- preValidation();
-
TopicPolicies topicPolicies =
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxMessageSize(maxMessageSize);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
}
protected Optional<Integer> internalGetMaxMessageSize() {
- preValidation();
return
getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
}
protected Optional<Integer> internalGetMaxProducers() {
- preValidation();
return
getTopicPolicies(topicName).map(TopicPolicies::getMaxProducerPerTopic);
}
@@ -2625,16 +2608,12 @@ public class PersistentTopicsBase extends AdminResource
{
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducers must be 0 or more");
}
-
- preValidation();
-
TopicPolicies topicPolicies =
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxProducerPerTopic(maxProducers);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
}
protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() {
- preValidation();
return
getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic);
}
@@ -2643,24 +2622,21 @@ public class PersistentTopicsBase extends AdminResource
{
throw new RestException(Status.PRECONDITION_FAILED,
"maxSubscriptionsPerTopic must be 0 or more");
}
- preValidation();
TopicPolicies topicPolicies =
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies);
}
- private void preValidation() {
- validateAdminAccessForTenant(namespaceName.getTenant());
- validatePoliciesReadOnlyAccess();
+ protected void preValidation() {
+ checkTopicLevelPolicyEnable();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
- checkTopicLevelPolicyEnable();
+ validateTopicOwnership(topicName, false);
}
protected CompletableFuture<Void> internalRemoveMaxProducers() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -2670,7 +2646,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected Optional<Integer> internalGetMaxConsumers() {
- preValidation();
return
getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumerPerTopic);
}
@@ -2679,7 +2654,6 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumers must be 0 or more");
}
- preValidation();
TopicPolicies topicPolicies =
getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxConsumerPerTopic(maxConsumers);
@@ -2687,7 +2661,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemoveMaxConsumers() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -3492,12 +3465,10 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected Optional<DispatchRate> internalGetDispatchRate() {
- preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate);
}
protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate
dispatchRate) {
- preValidation();
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
@@ -3508,7 +3479,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemoveDispatchRate() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -3519,12 +3489,10 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
- preValidation();
return
getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
}
protected CompletableFuture<Void>
internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
- preValidation();
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
@@ -3535,7 +3503,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate()
{
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -3546,7 +3513,6 @@ public class PersistentTopicsBase extends AdminResource {
protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
- preValidation();
return
getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
}
@@ -3554,7 +3520,6 @@ public class PersistentTopicsBase extends AdminResource {
if (maxConsumersPerSubscription != null && maxConsumersPerSubscription
< 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value
for maxConsumersPerSubscription");
}
- preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
@@ -3563,7 +3528,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void>
internalRemoveMaxConsumersPerSubscription() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -3573,7 +3537,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected Optional<Long> internalGetCompactionThreshold() {
- preValidation();
return
getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
}
@@ -3581,7 +3544,6 @@ public class PersistentTopicsBase extends AdminResource {
if (compactionThreshold != null && compactionThreshold < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value
for compactionThreshold");
}
- preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
@@ -3590,7 +3552,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -3600,13 +3561,11 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected Optional<PublishRate> internalGetPublishRate() {
- preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
}
protected CompletableFuture<Void> internalSetPublishRate(PublishRate
publishRate) {
- preValidation();
if (publishRate == null) {
return CompletableFuture.completedFuture(null);
}
@@ -3617,7 +3576,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemovePublishRate() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
@@ -3627,12 +3585,10 @@ public class PersistentTopicsBase extends AdminResource
{
}
protected Optional<SubscribeRate> internalGetSubscribeRate() {
- preValidation();
return
getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
}
protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate
subscribeRate) {
- preValidation();
if (subscribeRate == null) {
return CompletableFuture.completedFuture(null);
}
@@ -3643,7 +3599,6 @@ public class PersistentTopicsBase extends AdminResource {
}
protected CompletableFuture<Void> internalRemoveSubscribeRate() {
- preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 4474fab..45b1b30 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -265,6 +265,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace")
String namespace,
@PathParam("topic")
@Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
if (topicPolicies.isOffloadPoliciesSet()) {
asyncResponse.resume(topicPolicies.getOffloadPolicies());
@@ -285,12 +286,7 @@ public class PersistentTopics extends PersistentTopicsBase
{
@ApiParam(value = "Offload
policies for the specified topic")
OffloadPolicies
offloadPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(tenant);
- validatePoliciesReadOnlyAccess();
- checkTopicLevelPolicyEnable();
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
+ preValidation();
internalSetOffloadPolicies(offloadPolicies).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set offloadPolicies", ex);
@@ -314,6 +310,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
setOffloadPolicies(asyncResponse, tenant, namespace, encodedTopic,
null);
}
@@ -328,6 +325,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace")
String namespace,
@PathParam("topic")
@Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
@@ -348,12 +346,7 @@ public class PersistentTopics extends PersistentTopicsBase
{
@ApiParam(value = "Max
unacked messages on consumer policies for the specified topic")
Integer
maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(tenant);
- validatePoliciesReadOnlyAccess();
- checkTopicLevelPolicyEnable();
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
+ preValidation();
internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex)
-> {
if (ex instanceof RestException) {
log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
@@ -378,6 +371,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace")
String namespace,
@PathParam("topic")
@Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) {
asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
@@ -398,9 +392,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value =
"Interval to take deduplication snapshot for the specified topic")
Integer interval) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(tenant);
- validatePoliciesReadOnlyAccess();
- checkTopicLevelPolicyEnable();
+ preValidation();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
@@ -427,12 +419,7 @@ public class PersistentTopics extends PersistentTopicsBase
{
@PathParam("namespace")
String namespace,
@PathParam("topic")
@Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(tenant);
- validatePoliciesReadOnlyAccess();
- checkTopicLevelPolicyEnable();
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
+ preValidation();
internalSetDeduplicationSnapshotInterval(null).whenComplete((res, ex)
-> {
if (ex instanceof RestException) {
log.error("Failed delete deduplicationSnapshotInterval", ex);
@@ -470,6 +457,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
if (topicPolicies.isInactiveTopicPoliciesSet()) {
asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
@@ -490,12 +478,7 @@ public class PersistentTopics extends PersistentTopicsBase
{
@ApiParam(value = "inactive
topic policies for the specified topic")
InactiveTopicPolicies
inactiveTopicPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(tenant);
- validatePoliciesReadOnlyAccess();
- checkTopicLevelPolicyEnable();
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
+ preValidation();
internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex)
-> {
if (ex instanceof RestException) {
log.error("Failed set InactiveTopicPolicies", ex);
@@ -533,6 +516,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace")
String namespace,
@PathParam("topic")
@Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
@@ -553,12 +537,7 @@ public class PersistentTopics extends PersistentTopicsBase
{
@ApiParam(value = "Max
unacked messages on subscription policies for the specified topic")
Integer
maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(tenant);
- validatePoliciesReadOnlyAccess();
- checkTopicLevelPolicyEnable();
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
+ preValidation();
internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res,
ex) -> {
if (ex instanceof RestException) {
log.error("Failed set MaxUnackedMessagesOnSubscription", ex);
@@ -598,6 +577,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
if (topicPolicies.isDelayedDeliveryEnabledSet() &&
topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
asyncResponse.resume(new
DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
@@ -618,12 +598,7 @@ public class PersistentTopics extends PersistentTopicsBase
{
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "Delayed delivery
policies for the specified topic") DelayedDeliveryPolicies deliveryPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
- validateAdminAccessForTenant(tenant);
- validatePoliciesReadOnlyAccess();
- checkTopicLevelPolicyEnable();
- if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
+ preValidation();
internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
}
@@ -1483,6 +1458,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
return getTopicPolicies(topicName)
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> {
@@ -1507,6 +1483,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType
backlogQuotaType, BacklogQuota backlogQuota) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
}
@@ -1521,6 +1498,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType
backlogQuotaType) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
}
@@ -1534,6 +1512,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic)
{
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
return getTopicPolicies(topicName)
.map(TopicPolicies::getMessageTTLInSeconds)
.orElse(0); //same as default ttl at namespace level
@@ -1553,6 +1532,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "TTL in seconds for the
specified namespace", required = true)
@QueryParam("messageTTL") int messageTTL) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMessageTTL(asyncResponse, messageTTL);
}
@@ -1568,6 +1548,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMessageTTL(asyncResponse, null);
}
@@ -1582,6 +1563,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic)
{
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new
TopicPolicies());
if (topicPolicies.isDeduplicationSet()) {
asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
@@ -1602,6 +1584,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "DeduplicationEnabled policies
for the specified topic") Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated deduplication", ex);
@@ -1642,6 +1625,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
internalGetRetention(asyncResponse);
} catch (RestException e) {
@@ -1665,6 +1649,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Retention policies for the specified
namespace") RetentionPolicies retention) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetRetention(retention).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated retention", ex);
@@ -1698,6 +1683,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic)
{
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveRetention().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed updated retention", ex);
@@ -1724,6 +1710,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<PersistencePolicies> persistencePolicies =
internalGetPersistence();
if (!persistencePolicies.isPresent()) {
@@ -1752,6 +1739,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "Bookkeeper persistence
policies for specified topic") PersistencePolicies persistencePolicies) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetPersistence(persistencePolicies).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
@@ -1785,6 +1773,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemovePersistence().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed updated retention", ex);
@@ -1812,6 +1801,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<Integer> maxSubscriptionsPerTopic =
internalGetMaxSubscriptionsPerTopic();
if (!maxSubscriptionsPerTopic.isPresent()) {
@@ -1841,6 +1831,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "The max subscriptions of
the topic") int maxSubscriptionsPerTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r,
ex) -> {
if (ex instanceof RestException) {
log.error("Updating maxSubscriptionsPerTopic failed", ex);
@@ -1870,6 +1861,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxSubscriptionsPerTopic", ex);
@@ -1896,6 +1888,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<Integer> maxProducers = internalGetMaxProducers();
if (!maxProducers.isPresent()) {
@@ -1924,6 +1917,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "The max producers of the
topic") int maxProducers) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMaxProducers(maxProducers).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
@@ -1954,6 +1948,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveMaxProducers().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxProducers", ex);
@@ -1980,6 +1975,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<Integer> maxConsumers = internalGetMaxConsumers();
if (!maxConsumers.isPresent()) {
@@ -2008,6 +2004,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "The max consumers of the
topic") int maxConsumers) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMaxConsumers(maxConsumers).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
@@ -2038,6 +2035,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveMaxConsumers().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxConsumers", ex);
@@ -2064,6 +2062,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<Integer> policies = internalGetMaxMessageSize();
if (policies.isPresent()) {
@@ -2092,6 +2091,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "The max message size of
the topic") int maxMessageSize) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
@@ -2122,6 +2122,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMaxMessageSize(null).whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxMessageSize", ex);
@@ -2351,6 +2352,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<DispatchRate> dispatchRate = internalGetDispatchRate();
if (!dispatchRate.isPresent()) {
@@ -2378,6 +2380,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "Dispatch rate for the
specified topic") DispatchRate dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetDispatchRate(dispatchRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic dispatch rate", ex);
@@ -2411,6 +2414,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveDispatchRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic dispatch rate", ex);
@@ -2438,6 +2442,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<DispatchRate> dispatchRate =
internalGetSubscriptionDispatchRate();
if (!dispatchRate.isPresent()) {
@@ -2465,6 +2470,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "Subscription message
dispatch rate for the specified topic") DispatchRate dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex)
-> {
if (ex instanceof RestException) {
log.error("Failed to set topic: {} subscription dispatch
rate", topicName.getLocalName(), ex);
@@ -2498,6 +2504,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic: {} subscription dispatch
rate", topicName.getLocalName(), ex);
@@ -2525,6 +2532,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<Long> compactionThreshold =
internalGetCompactionThreshold();
if (!compactionThreshold.isPresent()) {
@@ -2552,6 +2560,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "Dispatch rate for the
specified topic") long compactionThreshold) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetCompactionThreshold(compactionThreshold).whenComplete((r,
ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic dispatch rate", ex);
@@ -2585,6 +2594,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveCompactionThreshold().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic dispatch rate", ex);
@@ -2612,6 +2622,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<Integer> maxConsumersPerSubscription =
internalGetMaxConsumersPerSubscription();
if (!maxConsumersPerSubscription.isPresent()) {
@@ -2639,6 +2650,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "Dispatch rate for
the specified topic") int maxConsumersPerSubscription) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription).whenComplete((r,
ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic {} max consumers per
subscription ", topicName.getLocalName(), ex);
@@ -2672,6 +2684,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String
namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveMaxConsumersPerSubscription().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic {} max consuners per
subscription", topicName.getLocalName(), ex);
@@ -2699,6 +2712,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<PublishRate> publishRate = internalGetPublishRate();
if (!publishRate.isPresent()) {
@@ -2726,6 +2740,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "Dispatch rate for the
specified topic") PublishRate publishRate) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetPublishRate(publishRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic dispatch rate", ex);
@@ -2759,6 +2774,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemovePublishRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic publish rate", ex);
@@ -2786,6 +2802,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
try {
Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
if (!subscribeRate.isPresent()) {
@@ -2813,6 +2830,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String
encodedTopic,
@ApiParam(value = "Subscribe rate for the
specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalSetSubscribeRate(subscribeRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic {} subscribe rate",
topicName.getLocalName(), ex);
@@ -2846,6 +2864,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String
encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
+ preValidation();
internalRemoveSubscribeRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic {} subscribe rate ",
topicName.getLocalName(), ex);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index f0c6a25..1e2b2d1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -122,7 +123,9 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
@Override
public void cleanup() throws Exception {
super.internalCleanup();
- mockPulsarSetup.cleanup();
+ if (mockPulsarSetup != null) {
+ mockPulsarSetup.cleanup();
+ }
}
@DataProvider(name = "topicType")
@@ -242,6 +245,70 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
consumer2.close();
}
+ @Test
+ public void testTopicPoliciesWithMultiBroker() throws Exception {
+ //setup cluster with 3 broker
+ cleanup();
+ conf.setSystemTopicEnabled(true);
+ conf.setTopicLevelPoliciesEnabled(true);
+ super.internalSetup();
+ admin.clusters().createCluster("test"
+ , new ClusterData(pulsar.getWebServiceAddress() +
",localhost:1026," + "localhost:2050"));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("prop-xyz", tenantInfo);
+ admin.namespaces().createNamespace("prop-xyz/ns1",
Sets.newHashSet("test"));
+ conf.setBrokerServicePort(Optional.of(1024));
+ conf.setBrokerServicePortTls(Optional.of(1025));
+ conf.setWebServicePort(Optional.of(1026));
+ conf.setWebServicePortTls(Optional.of(1027));
+ @Cleanup
+ PulsarService pulsar2 = startBrokerWithoutAuthorization(conf);
+ conf.setBrokerServicePort(Optional.of(2048));
+ conf.setBrokerServicePortTls(Optional.of(2049));
+ conf.setWebServicePort(Optional.of(2050));
+ conf.setWebServicePortTls(Optional.of(2051));
+ @Cleanup
+ PulsarService pulsar3 = startBrokerWithoutAuthorization(conf);
+ @Cleanup
+ PulsarAdmin admin2 =
PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build();
+ @Cleanup
+ PulsarAdmin admin3 =
PulsarAdmin.builder().serviceHttpUrl(pulsar3.getWebServiceAddress()).build();
+
+ //for partitioned topic, we can get topic policies from every broker
+ final String topic = "persistent://prop-xyz/ns1/test-" +
UUID.randomUUID();
+ int partitionNum = 3;
+ admin.topics().createPartitionedTopic(topic, partitionNum);
+
pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close();
+ TopicName topicName = TopicName.get(topic);
+ Awaitility.await().until(()->
pulsar.getTopicPoliciesService().cacheIsInitialized(topicName));
+
+ setTopicPoliciesAndValidate(admin2, admin3, topic);
+ //for non-partitioned topic, we can get topic policies from every
broker
+ final String topic2 = "persistent://prop-xyz/ns1/test-" +
UUID.randomUUID();
+
pulsarClient.newConsumer().topic(topic2).subscriptionName("sub").subscribe().close();
+ setTopicPoliciesAndValidate(admin2, admin3, topic2);
+ }
+
+ private void setTopicPoliciesAndValidate(PulsarAdmin admin2
+ , PulsarAdmin admin3, String topic) throws Exception {
+ admin.topics().setMaxUnackedMessagesOnConsumer(topic, 100);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topic)));
+ admin.topics().setMaxConsumers(topic, 101);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.topics().getMaxConsumers(topic)));
+ admin.topics().setMaxProducers(topic, 102);
+ Awaitility.await().untilAsserted(() ->
assertNotNull(admin.topics().getMaxProducers(topic)));
+
+
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(),
100);
+
assertEquals(admin2.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(),
100);
+
assertEquals(admin3.topics().getMaxUnackedMessagesOnConsumer(topic).intValue(),
100);
+ assertEquals(admin.topics().getMaxConsumers(topic).intValue(), 101);
+ assertEquals(admin2.topics().getMaxConsumers(topic).intValue(), 101);
+ assertEquals(admin3.topics().getMaxConsumers(topic).intValue(), 101);
+ assertEquals(admin.topics().getMaxProducers(topic).intValue(), 102);
+ assertEquals(admin2.topics().getMaxProducers(topic).intValue(), 102);
+ assertEquals(admin3.topics().getMaxProducers(topic).intValue(), 102);
+ }
+
/**
* verifies admin api command for non-persistent topic. It verifies:
partitioned-topic, stats
*
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index adcaedf..1efc66c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -256,15 +256,21 @@ public abstract class MockedPulsarServiceBaseTest {
}
protected PulsarService startBroker(ServiceConfiguration conf) throws
Exception {
- PulsarService pulsar = spy(new PulsarService(conf));
- setupBrokerMocks(pulsar);
boolean isAuthorizationEnabled = conf.isAuthorizationEnabled();
// enable authorization to initialize authorization service which is
used by grant-permission
conf.setAuthorizationEnabled(true);
- pulsar.start();
+ PulsarService pulsar = startBrokerWithoutAuthorization(conf);
conf.setAuthorizationEnabled(isAuthorizationEnabled);
+ return pulsar;
+ }
+ protected PulsarService
startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
+ PulsarService pulsar = spy(new PulsarService(conf));
+ setupBrokerMocks(pulsar);
+ pulsar.start();
+ log.info("Pulsar started. brokerServiceUrl: {} webServiceAddress: {}",
pulsar.getBrokerServiceUrl(),
+ pulsar.getWebServiceAddress());
return pulsar;
}
@@ -414,4 +420,4 @@ public abstract class MockedPulsarServiceBaseTest {
}
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
-}
\ No newline at end of file
+}