This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new a8068c8 Not allow sub auto create by admin when disable topic auto
create (#6685)
a8068c8 is described below
commit a8068c80e1dd4690b831301fe8e55be06c96d030
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Apr 8 22:20:37 2020 +0800
Not allow sub auto create by admin when disable topic auto create (#6685)
### Motivation
Not allow sub auto create by admin when disable topic auto create
### Modifications
change admin code to not allow sub auto create by admin when disable topic
auto create
add tests
### Verifying this change
ut passed
* fix sub auto created by admin
* fix test error: create sub partition when update it
* fix flaky test
---
.../apache/pulsar/broker/admin/AdminResource.java | 2 +-
.../broker/admin/impl/PersistentTopicsBase.java | 12 ++++--
.../BrokerServiceAutoTopicCreationTest.java | 44 ++++++++++++++++++++++
.../pulsar/client/api/PartitionCreationTest.java | 2 +-
4 files changed, 55 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3805d53..42e7a7b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -282,7 +282,7 @@ public abstract class AdminResource extends
PulsarWebResource {
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing
nothing.", clientAppId(),
topicName.getPartition(partition));
-
result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
+ result.complete(null);
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with
concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 7186330..a1e22ad 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -522,11 +523,12 @@ public class PersistentTopicsBase extends AdminResource {
}
return;
}
-
+
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of
partitions should be more than 0");
}
try {
+ tryCreatePartitionsAsync(numPartitions).get();
updatePartitionedTopic(topicName, numPartitions).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
@@ -1501,7 +1503,8 @@ public class PersistentTopicsBase extends AdminResource {
if (topicName.isPartitioned()) {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse,
subscriptionName, targetMessageId, authoritative, replicated);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative,
false).thenAccept(partitionMetadata -> {
+ boolean allowAutoTopicCreation =
pulsar().getConfiguration().isAllowAutoTopicCreation();
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new
CompletableFuture<>();
@@ -1578,7 +1581,10 @@ public class PersistentTopicsBase extends AdminResource {
MessageIdImpl targetMessageId, boolean authoritative, boolean
replicated) {
try {
validateAdminAccessForSubscriber(subscriptionName, authoritative);
- PersistentTopic topic = (PersistentTopic)
getOrCreateTopic(topicName);
+
+ boolean isAllowAutoTopicCreation =
pulsar().getConfiguration().isAllowAutoTopicCreation();
+ PersistentTopic topic = (PersistentTopic)
pulsar().getBrokerService()
+ .getTopic(topicName.toString(),
isAllowAutoTopicCreation).thenApply(Optional::get).join();
if (topic.getSubscriptions().containsKey(subscriptionName)) {
asyncResponse.resume(new RestException(Status.CONFLICT,
"Subscription already exists for topic"));
return;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 929f9c2..6f537da 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -120,4 +120,48 @@ public class BrokerServiceAutoTopicCreationTest extends
BrokerTestBase{
assertEquals(partitions, 0);
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
}
+
+
+ @Test
+ public void testNotAllowSubscriptionTopicCreation() throws Exception{
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+ String topicName = "persistent://prop/ns-abc/non-partitioned-topic" +
System.currentTimeMillis();
+ String subscriptionName = "non-partitioned-topic-sub";
+
+ try {
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ fail("should fail to create subscription once not
allowAutoTopicCreation");
+ } catch (Exception e) {
+ // expected
+ }
+
+ try {
+ admin.topics().createSubscription(topicName + "-partition-0",
+ subscriptionName, MessageId.earliest);
+ fail("should fail to create subscription once not
allowAutoTopicCreation");
+ } catch (Exception e) {
+ // expected
+ }
+
+
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+
+ try {
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subscriptionName,
MessageId.earliest);
+ } catch (Exception e) {
+ // expected
+ fail("should success to create subscription once topic created");
+ }
+
+ try {
+ String partitionTopic =
"persistent://prop/ns-abc/partitioned-topic" + System.currentTimeMillis();
+ admin.topics().createPartitionedTopic(partitionTopic, 1);
+ admin.topics().createSubscription(partitionTopic + "-partition-0",
subscriptionName, MessageId.earliest);
+ } catch (Exception e) {
+ // expected
+ fail("should success to create subscription once topic created");
+ }
+
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index 2647bbb..5e1ad1d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -104,7 +104,7 @@ public class PartitionCreationTest extends
ProducerConsumerBase {
}
@Test
- public void
testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation()
throws PulsarClientException, PulsarAdminException {
+ public void
testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation()
throws Exception {
conf.setAllowAutoTopicCreation(false);
final String topic =
"testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation";
admin.topics().createPartitionedTopic(topic, 3);