This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4cae20ce9e0 [fix][admin] Fix `validatePersistencePolicies` that
Namespace/Topic persistent policies cannot set to < 0 (#18999)
4cae20ce9e0 is described below
commit 4cae20ce9e0bd6be9843893abff2866a03a4556f
Author: Tao Jiuming <[email protected]>
AuthorDate: Sat Jan 28 11:12:24 2023 +0800
[fix][admin] Fix `validatePersistencePolicies` that Namespace/Topic
persistent policies cannot set to < 0 (#18999)
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 ++++++++------
.../apache/pulsar/broker/admin/NamespacesTest.java | 11 ++++++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 16 +++++++++++++++
.../broker/auth/MockedPulsarServiceBaseTest.java | 13 ++++++++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 23 +++++++++++++--------
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 23 +++++++++++++--------
.../org/apache/pulsar/admin/cli/CmdTopics.java | 24 ++++++++++++++--------
7 files changed, 94 insertions(+), 31 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index f00717377ef..eecde2ef359 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -808,12 +808,15 @@ public abstract class AdminResource extends
PulsarWebResource {
protected void validatePersistencePolicies(PersistencePolicies
persistence) {
checkNotNull(persistence, "persistence policies should not be null");
final ServiceConfiguration config = pulsar().getConfiguration();
- checkArgument(persistence.getBookkeeperEnsemble() <=
config.getManagedLedgerMaxEnsembleSize(),
- "Bookkeeper-Ensemble must be <= " +
config.getManagedLedgerMaxEnsembleSize());
- checkArgument(persistence.getBookkeeperWriteQuorum() <=
config.getManagedLedgerMaxWriteQuorum(),
- "Bookkeeper-WriteQuorum must be <= " +
config.getManagedLedgerMaxWriteQuorum());
- checkArgument(persistence.getBookkeeperAckQuorum() <=
config.getManagedLedgerMaxAckQuorum(),
- "Bookkeeper-AckQuorum must be <= " +
config.getManagedLedgerMaxAckQuorum());
+ checkArgument(persistence.getBookkeeperEnsemble() <=
config.getManagedLedgerMaxEnsembleSize()
+ && persistence.getBookkeeperEnsemble() > 0,
+ "Bookkeeper-Ensemble must be <= " +
config.getManagedLedgerMaxEnsembleSize() + " and > 0.");
+ checkArgument(persistence.getBookkeeperWriteQuorum() <=
config.getManagedLedgerMaxWriteQuorum()
+ && persistence.getBookkeeperWriteQuorum() > 0,
+ "Bookkeeper-WriteQuorum must be <= " +
config.getManagedLedgerMaxWriteQuorum() + " and > 0.");
+ checkArgument(persistence.getBookkeeperAckQuorum() <=
config.getManagedLedgerMaxAckQuorum()
+ && persistence.getBookkeeperAckQuorum() > 0,
+ "Bookkeeper-AckQuorum must be <= " +
config.getManagedLedgerMaxAckQuorum() + " and > 0.");
checkArgument(
(persistence.getBookkeeperEnsemble() >=
persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >=
persistence.getBookkeeperAckQuorum()),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 22200fdbf23..f26a209d957 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1162,6 +1162,17 @@ public class NamespacesTest extends
MockedPulsarServiceBaseTest {
assertEquals(persistence2, persistence1);
}
+ @Test(dataProvider = "invalidPersistentPolicies")
+ public void testSetIncorrectPersistentPolicies(int ensembleSize, int
writeQuorum, int ackQuorum) throws Exception {
+ NamespaceName testNs = this.testLocalNamespaces.get(0);
+ PersistencePolicies persistence1 = new
PersistencePolicies(ensembleSize, writeQuorum, ackQuorum, 0.0);
+ AsyncResponse response = mock(AsyncResponse.class);
+ namespaces.setPersistence(response, testNs.getTenant(),
testNs.getCluster(), testNs.getLocalName(), persistence1);
+ ArgumentCaptor<RestException> responseCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ assertEquals(responseCaptor.getValue().getResponse().getStatus(),
Status.BAD_REQUEST.getStatusCode());
+ }
+
@Test
public void testPersistenceUnauthorized() throws Exception {
NamespaceName testNs = this.testLocalNamespaces.get(3);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index db26f250067..a84e955cc9d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -869,6 +869,22 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
consumer.close();
}
+ @Test(dataProvider = "invalidPersistentPolicies")
+ public void testSetIncorrectPersistentPolicies(int ensembleSize, int
writeQuorum, int ackQuorum) throws Exception {
+ admin.topics().createNonPartitionedTopic(persistenceTopic);
+ PersistencePolicies persistence1 = new
PersistencePolicies(ensembleSize, writeQuorum, ackQuorum, 0.0);
+
+ boolean failed = false;
+ try {
+ admin.topicPolicies().setPersistence(persistenceTopic,
persistence1);
+ } catch (PulsarAdminException e) {
+ failed = true;
+ Assert.assertEquals(e.getStatusCode(), 400);
+ }
+ assertTrue(failed);
+ admin.topics().delete(persistenceTopic);
+ }
+
@Test
public void testGetDispatchRateApplied() throws Exception {
final String topic = testTopic + UUID.randomUUID();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index f5c389dfc9a..fba92992f57 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -89,6 +89,7 @@ import org.mockito.internal.util.MockUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testng.annotations.DataProvider;
/**
* Base class for all tests that need a Pulsar instance without a ZK and BK
cluster.
@@ -711,5 +712,17 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
});
}
+ @DataProvider(name = "invalidPersistentPolicies")
+ public Object[][] incorrectPersistentPolicies() {
+ return new Object[][] {
+ {0, 0, 0},
+ {1, 0, 0},
+ {0, 0, 1},
+ {0, 1, 0},
+ {1, 1, 0},
+ {1, 0, 1}
+ };
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index f673ff40b79..c92f1f8838c 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1382,26 +1382,33 @@ public class CmdNamespaces extends CmdBase {
private java.util.List<String> params;
@Parameter(names = { "-e",
- "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic", required = true)
- private int bookkeeperEnsemble;
+ "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic")
+ private int bookkeeperEnsemble = 2;
@Parameter(names = { "-w",
- "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry", required = true)
- private int bookkeeperWriteQuorum;
+ "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry")
+ private int bookkeeperWriteQuorum = 2;
@Parameter(names = { "-a",
"--bookkeeper-ack-quorum" },
- description = "Number of acks (guaranteed copies) to wait for
each entry", required = true)
- private int bookkeeperAckQuorum;
+ description = "Number of acks (guaranteed copies) to wait for
each entry")
+ private int bookkeeperAckQuorum = 2;
@Parameter(names = { "-r",
"--ml-mark-delete-max-rate" },
- description = "Throttling rate of mark-delete operation (0
means no throttle)", required = true)
- private double managedLedgerMaxMarkDeleteRate;
+ description = "Throttling rate of mark-delete operation (0
means no throttle)")
+ private double managedLedgerMaxMarkDeleteRate = 0;
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
+ if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 ||
bookkeeperAckQuorum <= 0) {
+ throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ + "and [--bookkeeper-ack-quorum] must greater than
0.");
+ }
+ if (managedLedgerMaxMarkDeleteRate < 0) {
+ throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
+ }
getAdmin().namespaces().setPersistence(namespace, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 98e65300826..7cd3b49796f 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -1189,20 +1189,20 @@ public class CmdTopicPolicies extends CmdBase {
private java.util.List<String> params;
@Parameter(names = { "-e",
- "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic", required = true)
- private int bookkeeperEnsemble;
+ "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic")
+ private int bookkeeperEnsemble = 2;
@Parameter(names = { "-w",
- "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry", required = true)
- private int bookkeeperWriteQuorum;
+ "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry")
+ private int bookkeeperWriteQuorum = 2;
@Parameter(names = { "-a", "--bookkeeper-ack-quorum" },
- description = "Number of acks (guaranteed copies) to wait for
each entry", required = true)
- private int bookkeeperAckQuorum;
+ description = "Number of acks (guaranteed copies) to wait for
each entry")
+ private int bookkeeperAckQuorum = 2;
@Parameter(names = { "-r", "--ml-mark-delete-max-rate" },
- description = "Throttling rate of mark-delete operation (0
means no throttle)", required = true)
- private double managedLedgerMaxMarkDeleteRate;
+ description = "Throttling rate of mark-delete operation (0
means no throttle)")
+ private double managedLedgerMaxMarkDeleteRate = 0;
@Parameter(names = { "--global", "-g" }, description = "Whether to set
this policy globally. "
+ "If set to true, the policy will be replicate to other
clusters asynchronously", arity = 0)
@@ -1211,6 +1211,13 @@ public class CmdTopicPolicies extends CmdBase {
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
+ if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 ||
bookkeeperAckQuorum <= 0) {
+ throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ + "and [--bookkeeper-ack-quorum] must greater than
0.");
+ }
+ if (managedLedgerMaxMarkDeleteRate < 0) {
+ throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
+ }
getTopicPolicies(isGlobal).setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 19277025892..827699283bf 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -2210,26 +2210,32 @@ public class CmdTopics extends CmdBase {
private java.util.List<String> params;
@Parameter(names = { "-e",
- "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic", required = true)
- private int bookkeeperEnsemble;
+ "--bookkeeper-ensemble" }, description = "Number of bookies to
use for a topic")
+ private int bookkeeperEnsemble = 2;
@Parameter(names = { "-w",
- "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry", required = true)
- private int bookkeeperWriteQuorum;
+ "--bookkeeper-write-quorum" }, description = "How many writes
to make of each entry")
+ private int bookkeeperWriteQuorum = 2;
@Parameter(names = { "-a",
- "--bookkeeper-ack-quorum" }, description = "Number of acks
(guaranteed copies) to wait for each entry",
- required = true)
- private int bookkeeperAckQuorum;
+ "--bookkeeper-ack-quorum" }, description = "Number of acks
(guaranteed copies) to wait for each entry")
+ private int bookkeeperAckQuorum = 2;
@Parameter(names = { "-r",
"--ml-mark-delete-max-rate" }, description = "Throttling rate
of mark-delete operation "
- + "(0 means no throttle)", required = true)
- private double managedLedgerMaxMarkDeleteRate;
+ + "(0 means no throttle)")
+ private double managedLedgerMaxMarkDeleteRate = 0;
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
+ if (bookkeeperEnsemble <= 0 || bookkeeperWriteQuorum <= 0 ||
bookkeeperAckQuorum <= 0) {
+ throw new ParameterException("[--bookkeeper-ensemble],
[--bookkeeper-write-quorum] "
+ + "and [--bookkeeper-ack-quorum] must greater than
0.");
+ }
+ if (managedLedgerMaxMarkDeleteRate < 0) {
+ throw new ParameterException("[--ml-mark-delete-max-rate]
cannot less than 0.");
+ }
getTopics().setPersistence(persistentTopic, new
PersistencePolicies(bookkeeperEnsemble,
bookkeeperWriteQuorum, bookkeeperAckQuorum,
managedLedgerMaxMarkDeleteRate));
}