This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d4055b5 [Broker] Add verification when terminating non-persistent
topic (#11903)
d4055b5 is described below
commit d4055b5877c7935b999314eafc85b80b5e219c23
Author: 包子 <[email protected]>
AuthorDate: Tue Sep 7 00:20:19 2021 +0800
[Broker] Add verification when terminating non-persistent topic (#11903)
* [Broker] Add verification when terminating non-persistent
* [Broker] add unit test
* code format
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 ++++++++++
.../pulsar/broker/admin/v1/PersistentTopics.java | 3 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 3 +-
.../pulsar/broker/admin/AdminResourceTest.java | 35 ++++++++++++++++++++++
.../pulsar/broker/admin/PersistentTopicsTest.java | 29 ++++++++++++++++++
5 files changed, 83 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index de13b54..cf96e92 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -241,6 +241,13 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
+ protected void validatePersistentTopicName(String property, String
namespace, String encodedTopic) {
+ validateTopicName(property, namespace, encodedTopic);
+ if (topicName.getDomain() != TopicDomain.persistent) {
+ throw new RestException(Status.NOT_ACCEPTABLE, "Need to provide a
persistent topic name");
+ }
+ }
+
protected void validatePartitionedTopicName(String tenant, String
namespace, String encodedTopic) {
// first, it has to be a validate topic name
validateTopicName(tenant, namespace, encodedTopic);
@@ -278,6 +285,14 @@ public abstract class AdminResource extends
PulsarWebResource {
}
}
+ @Deprecated
+ protected void validatePersistentTopicName(String property, String
cluster, String namespace, String encodedTopic) {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ if (topicName.getDomain() != TopicDomain.persistent) {
+ throw new RestException(Status.NOT_ACCEPTABLE, "Need to provide a
persistent topic name");
+ }
+ }
+
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
final String namespace = namespaceName.toString();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fee8707..32f5ab3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -673,11 +673,12 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve
the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on
non-persistent topic"),
+ @ApiResponse(code = 406, message = "Need to provide a persistent
topic name"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public MessageId terminate(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateTopicName(property, cluster, namespace, encodedTopic);
+ validatePersistentTopicName(property, cluster, namespace,
encodedTopic);
return internalTerminate(authoritative);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 4930915..1e26907 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2442,6 +2442,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Termination of a partitioned
topic is not allowed"),
+ @ApiResponse(code = 406, message = "Need to provide a persistent
topic name"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global
cluster configuration")})
@@ -2454,7 +2455,7 @@ public class PersistentTopics extends
PersistentTopicsBase {
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateTopicName(tenant, namespace, encodedTopic);
+ validatePersistentTopicName(tenant, namespace, encodedTopic);
return internalTerminate(authoritative);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
index 982c0b2..e047d31 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminResourceTest.java
@@ -57,6 +57,41 @@ public class AdminResourceTest extends BrokerTestBase {
};
}
+ private static AdminResource mockNonPersistentResource() {
+ return new AdminResource() {
+
+ @Override
+ protected String domain() {
+ return "non-persistent";
+ }
+ };
+ }
+
+ @Test
+ public void testValidatePersistentTopicNameSuccess() {
+ String tenant = "test-tenant";
+ String namespace = "test-namespace";
+ String topic = Codec.encode("test-topic");
+
+ AdminResource resource = mockResource();
+ resource.validatePersistentTopicName(tenant, namespace, topic);
+ }
+
+ @Test
+ public void testValidatePersistentTopicNameInvalid() {
+ String tenant = "test-tenant";
+ String namespace = "test-namespace";
+ String topic = Codec.encode("test-topic");
+
+ AdminResource nPResource = mockNonPersistentResource();
+ try {
+ nPResource.validatePersistentTopicName(tenant, namespace, topic);
+ fail("Should fail validation on non-persistent topic");
+ } catch (RestException e) {
+ assertEquals(Status.NOT_ACCEPTABLE.getStatusCode(),
e.getResponse().getStatus());
+ }
+ }
+
@Test
public void testValidatePartitionedTopicNameSuccess() {
String tenant = "test-tenant";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 182d2cc..9871b7d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -333,6 +333,35 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testTerminate() {
+ String testLocalTopicName = "topic-not-found";
+
+ // 1) Create the nonPartitionTopic topic
+ persistentTopics.createNonPartitionedTopic(testTenant, testNamespace,
testLocalTopicName, true);
+
+ // 2) Create a subscription
+ AsyncResponse response = mock(AsyncResponse.class);
+ persistentTopics.createSubscription(response, testTenant,
testNamespace, testLocalTopicName, "test", true,
+ (MessageIdImpl) MessageId.earliest, false);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
+
+ // 3) Assert terminate persistent topic
+ MessageId messageId = persistentTopics.terminate(testTenant,
testNamespace, testLocalTopicName, true);
+ Assert.assertEquals(messageId, new MessageIdImpl(3, -1, -1));
+
+ // 4) Assert terminate non-persistent topic
+ String nonPersistentTopicName = "non-persistent-topic";
+ try {
+ nonPersistentTopic.terminate(testTenant, testNamespace,
nonPersistentTopicName, true);
+ Assert.fail("Should fail validation on non-persistent topic");
+ } catch (RestException e) {
+
Assert.assertEquals(Response.Status.NOT_ACCEPTABLE.getStatusCode(),
e.getResponse().getStatus());
+ }
+ }
+
+ @Test
public void testNonPartitionedTopics() {
final String nonPartitionTopic = "non-partitioned-topic";
AsyncResponse response = mock(AsyncResponse.class);