This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 7209a5b Fix that maxProducersPerTopic cannot be disabled at the
namespace level (#9157)
7209a5b is described below
commit 7209a5bf8d2d9cb5f4ffae5fc67c636856c87782
Author: feynmanlin <[email protected]>
AuthorDate: Mon Jan 11 11:55:07 2021 +0800
Fix that maxProducersPerTopic cannot be disabled at the namespace level
(#9157)
Master Issue: #9146
`maxProducersPerTopic` cannot be disabled at the namespace-level
Let `maxProducersPerTopic` can be null and no longer uses broker-level
policy as the default value
AdminApiTest2#testMaxProducersPerTopicUnlimited
(cherry picked from commit ab8802b51975ed525bda7a80f0af81665e2e7a29)
---
.../apache/pulsar/broker/admin/AdminResource.java | 3 --
.../pulsar/broker/admin/impl/NamespacesBase.java | 6 +--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 14 +++++-
.../pulsar/broker/service/AbstractTopic.java | 3 +-
.../apache/pulsar/broker/admin/AdminApiTest2.java | 53 ++++++++++++++++++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 2 +-
.../org/apache/pulsar/client/admin/Namespaces.java | 22 ++++++++-
.../client/admin/internal/NamespacesImpl.java | 24 +++++++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 3 ++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 14 ++++++
.../pulsar/common/policies/data/Policies.java | 4 +-
11 files changed, 135 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 02181e1..e1fdea1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -491,9 +491,6 @@ public abstract class AdminResource extends
PulsarWebResource {
}
final ServiceConfiguration config = pulsar().getConfiguration();
- if (policies.max_producers_per_topic < 1) {
- policies.max_producers_per_topic =
config.getMaxProducersPerTopic();
- }
if (policies.max_consumers_per_topic < 1) {
policies.max_consumers_per_topic =
config.getMaxConsumersPerTopic();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2a142f9..87d0d2f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2492,7 +2492,7 @@ public abstract class NamespacesBase extends
AdminResource {
"specific limit. To disable retention both limits must
be set to 0.");
}
- protected int internalGetMaxProducersPerTopic() {
+ protected Integer internalGetMaxProducersPerTopic() {
validateNamespacePolicyOperation(namespaceName,
PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_producers_per_topic;
}
@@ -2510,7 +2510,7 @@ public abstract class NamespacesBase extends
AdminResource {
internalSetPolicies("deduplicationSnapshotIntervalSeconds", interval);
}
- protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
+ protected void internalSetMaxProducersPerTopic(Integer
maxProducersPerTopic) {
validateNamespacePolicyOperation(namespaceName,
PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
@@ -2519,7 +2519,7 @@ public abstract class NamespacesBase extends
AdminResource {
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content,
Policies.class);
- if (maxProducersPerTopic < 0) {
+ if (maxProducersPerTopic != null && maxProducersPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducersPerTopic must be 0 or more");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 6befe4d..7351d5b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -919,7 +919,7 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public int getMaxProducersPerTopic(@PathParam("tenant") String tenant,
+ public Integer getMaxProducersPerTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxProducersPerTopic();
@@ -938,6 +938,18 @@ public class Namespaces extends NamespacesBase {
internalSetMaxProducersPerTopic(maxProducersPerTopic);
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/maxProducersPerTopic")
+ @ApiOperation(value = "Remove maxProducersPerTopic configuration on a
namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public void removeMaxProducersPerTopic(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String
namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetMaxProducersPerTopic(null);
+ }
+
@GET
@Path("/{tenant}/{namespace}/deduplicationSnapshotInterval")
@ApiOperation(value = "Get deduplicationSnapshotInterval config on a
namespace.")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 429248a..2cab994 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -145,7 +145,8 @@ public abstract class AbstractTopic implements Topic {
}
maxProducers = policies.max_producers_per_topic;
}
- maxProducers = maxProducers > 0 ? maxProducers :
brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+ maxProducers = maxProducers != null ? maxProducers :
brokerService.pulsar()
+ .getConfiguration().getMaxProducersPerTopic();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 62eb997..70d2542 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1605,5 +1605,58 @@ public class AdminApiTest2 extends
MockedPulsarServiceBaseTest {
client.close();
}
+ @Test
+ public void testMaxProducersPerTopicUnlimited() throws Exception {
+ final int maxProducersPerTopic = 1;
+ super.internalCleanup();
+ mockPulsarSetup.cleanup();
+ conf.setMaxProducersPerTopic(maxProducersPerTopic);
+ super.internalSetup();
+ //init namespace
+ admin.clusters().createCluster("test", new
ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("prop-xyz", tenantInfo);
+ final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+ final String topic = "persistent://" + myNamespace +
"/testMaxProducersPerTopicUnlimited";
+ //the policy is set to 0, so there will be no restrictions
+ admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+ -> admin.namespaces().getMaxProducersPerTopic(myNamespace) ==
0);
+ List<Producer<byte[]>> producers = new ArrayList<>();
+ for (int i = 0; i < maxProducersPerTopic + 1; i++) {
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ producers.add(producer);
+ }
+
+ admin.namespaces().removeMaxProducersPerTopic(myNamespace);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+ -> admin.namespaces().getMaxProducersPerTopic(myNamespace) ==
null);
+ try {
+ pulsarClient.newProducer().topic(topic).create();
+ fail("should fail");
+ } catch (PulsarClientException ignore) {
+ assertTrue(ignore.getMessage().contains("Topic reached max
producers limit"));
+ }
+ //set the limit to 3
+ admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+ -> admin.namespaces().getMaxProducersPerTopic(myNamespace) ==
3);
+ // should success
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ producers.add(producer);
+ try {
+ pulsarClient.newProducer().topic(topic).create();
+ fail("should fail");
+ } catch (PulsarClientException ignore) {
+ assertTrue(ignore.getMessage().contains("Topic reached max
producers limit"));
+ }
+
+ //clean up
+ for (Producer<byte[]> tempProducer : producers) {
+ tempProducer.close();
+ }
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 99cb773..aaba1bd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -472,7 +472,7 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Awaitility.await().atMost(3, TimeUnit.SECONDS)
- .untilAsserted(() ->
Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace),
3));
+ .untilAsserted(() ->
Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace).intValue(),
3));
log.info("MaxProducers: {} will set to the namespace: {}", 3,
myNamespace);
try {
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index a4637c4..fcf2211 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2522,7 +2522,7 @@ public interface Namespaces {
* @throws PulsarAdminException
* Unexpected error
*/
- int getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
+ Integer getMaxProducersPerTopic(String namespace) throws
PulsarAdminException;
/**
* Get the maxProducersPerTopic for a namespace asynchronously.
@@ -2578,6 +2578,26 @@ public interface Namespaces {
CompletableFuture<Void> setMaxProducersPerTopicAsync(String namespace, int
maxProducersPerTopic);
/**
+ * Remove maxProducersPerTopic for a namespace.
+ * @param namespace Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeMaxProducersPerTopic(String namespace) throws
PulsarAdminException;
+
+ /**
+ * Set maxProducersPerTopic for a namespace asynchronously.
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> removeMaxProducersPerTopicAsync(String namespace);
+
+ /**
* Get the maxProducersPerTopic for a namespace.
* <p/>
* Response example:
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index b8e1a67..ff428be 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2121,7 +2121,7 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
- public int getMaxProducersPerTopic(String namespace) throws
PulsarAdminException {
+ public Integer getMaxProducersPerTopic(String namespace) throws
PulsarAdminException {
try {
return getMaxProducersPerTopicAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
@@ -2178,6 +2178,28 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
@Override
+ public void removeMaxProducersPerTopic(String namespace) throws
PulsarAdminException {
+ try {
+ removeMaxProducersPerTopicAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeMaxProducersPerTopicAsync(String
namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxProducersPerTopic");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public int getMaxConsumersPerTopic(String namespace) throws
PulsarAdminException {
try {
return getMaxConsumersPerTopicAsync(namespace).
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index dac618d..bdd46e6 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -471,6 +471,9 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-max-producers-per-topic myprop/clust/ns1 -p
1"));
verify(mockNamespaces).setMaxProducersPerTopic("myprop/clust/ns1", 1);
+ namespaces.run(split("remove-max-producers-per-topic
myprop/clust/ns1"));
+ verify(mockNamespaces).removeMaxProducersPerTopic("myprop/clust/ns1");
+
namespaces.run(split("get-max-consumers-per-topic myprop/clust/ns1"));
verify(mockNamespaces).getMaxConsumersPerTopic("myprop/clust/ns1");
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index d2052cd..418d255 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1253,6 +1253,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Remove max producers per topic for a
namespace")
+ private class RemoveMaxProducersPerTopic extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().removeMaxProducersPerTopic(namespace);
+ }
+ }
+
@Parameters(commandDescription = "Set maxProducersPerTopic for a
namespace")
private class SetMaxProducersPerTopic extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -1918,6 +1930,8 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-max-producers-per-topic", new
GetMaxProducersPerTopic());
jcommander.addCommand("set-max-producers-per-topic", new
SetMaxProducersPerTopic());
+ jcommander.addCommand("remove-max-producers-per-topic", new
RemoveMaxProducersPerTopic());
+
jcommander.addCommand("get-max-consumers-per-topic", new
GetMaxConsumersPerTopic());
jcommander.addCommand("set-max-consumers-per-topic", new
SetMaxConsumersPerTopic());
jcommander.addCommand("get-max-consumers-per-subscription", new
GetMaxConsumersPerSubscription());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a7ee4f7..e314ab5 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -77,7 +77,7 @@ public class Policies {
public SubscriptionAuthMode subscription_auth_mode =
SubscriptionAuthMode.None;
@SuppressWarnings("checkstyle:MemberName")
- public int max_producers_per_topic = 0;
+ public Integer max_producers_per_topic = null;
@SuppressWarnings("checkstyle:MemberName")
public int max_consumers_per_topic = 0;
@SuppressWarnings("checkstyle:MemberName")
@@ -164,7 +164,7 @@ public class Policies {
&& Objects.equals(delayed_delivery_policies,
other.delayed_delivery_policies)
&& Objects.equals(inactive_topic_policies,
other.inactive_topic_policies)
&& Objects.equals(subscription_auth_mode,
other.subscription_auth_mode)
- && max_producers_per_topic == other.max_producers_per_topic
+ && Objects.equals(max_producers_per_topic,
other.max_producers_per_topic)
&& max_consumers_per_topic == other.max_consumers_per_topic
&& max_consumers_per_subscription ==
other.max_consumers_per_subscription
&& max_unacked_messages_per_consumer ==
other.max_unacked_messages_per_consumer