massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644513191
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse
asyncResponse, boolean author
});
}
}
+
+ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse
asyncResponse, String subName,
+ boolean authoritative, Boolean enabled) {
+ log.info("[{}] Attempting to change replicated subscription status to
{} - {} {}", clientAppId(), enabled,
+ topicName, subName);
+
+ if (enabled == null) {
Review comment:
`enabled` can be null if the request body is empty.
```sh
$ curl -i \
-X POST \
-d '' \
-H 'Content-Type: application/json' \
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub/replicatedSubscriptionStatus
HTTP/1.1 400 Bad Request
Date: Thu, 03 Jun 2021 06:04:10 GMT
broker-address: localhost
Content-Type: application/json
Content-Length: 50
Server: Jetty(9.4.39.v20210325)
{"reason":"Boolean type request body is required"}
```
I think the "required = true" specification is just an option used when
generating Swagger documentation.
https://docs.swagger.io/swagger-core/v1.5.0/apidocs/io/swagger/annotations/ApiParam.html
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse
asyncResponse, boolean author
});
}
}
+
+ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse
asyncResponse, String subName,
+ boolean authoritative, Boolean enabled) {
+ log.info("[{}] Attempting to change replicated subscription status to
{} - {} {}", clientAppId(), enabled,
+ topicName, subName);
+
+ if (enabled == null) {
+ asyncResponse.resume(new RestException(Status.BAD_REQUEST,
"Boolean type request body is required"));
+ return;
+ }
+
+ // Reject the request if the topic is not persistent
+ if (!topicName.isPersistent()) {
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot enable/disable replicated subscriptions on
non-persistent topics"));
+ return;
+ }
+
+ // Reject the request if the topic is not global
+ if (!topicName.isGlobal()) {
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot enable/disable replicated subscriptions on
non-global topics"));
+ return;
+ }
+
+ // Permission to consume this topic is required
+ try {
+ validateTopicOperation(topicName,
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // Redirect the request to the peer-cluster if the local cluster is
not included in the replication clusters
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
Review comment:
No, "!" is not needed here. This method returns true if the topic name
suffix is "-partition-x".
```java
System.out.println(TopicName.get("persistent://public/default/massakam").isPartitioned());
System.out.println(TopicName.get("persistent://public/default/massakam-partition-0").isPartitioned());
```
```sh
false
true
```
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse
asyncResponse, boolean author
});
}
}
+
+ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse
asyncResponse, String subName,
+ boolean authoritative, Boolean enabled) {
+ log.info("[{}] Attempting to change replicated subscription status to
{} - {} {}", clientAppId(), enabled,
+ topicName, subName);
+
+ if (enabled == null) {
Review comment:
If boolean is used instead of Boolean, it seems that an error response
will be returned unless the request body can be cast to boolean type.
```sh
$ curl -i -X POST -d '' -H 'Content-Type: application/json'
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
HTTP/1.1 400 Bad Request
Date: Thu, 03 Jun 2021 06:56:00 GMT
broker-address: localhost
Content-Type: application/json
Content-Length: 35
Server: Jetty(9.4.39.v20210325)
The request entity cannot be empty.
$ curl -i -X POST -d '"foo"' -H 'Content-Type: application/json'
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
HTTP/1.1 400 Bad Request
Date: Thu, 03 Jun 2021 07:01:15 GMT
broker-address: localhost
Content-Type: text/plain
Content-Length: 248
Server: Jetty(9.4.39.v20210325)
Cannot deserialize value of type `boolean` from String "foo": only
"true"/"True"/"TRUE" or "false"/"False"/"FALSE" recognized
at [Source:
(org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream);
line: 1, column: 1]
$ curl -i -X POST -d 0 -H 'Content-Type: application/json'
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
HTTP/1.1 204 No Content
Date: Thu, 03 Jun 2021 06:56:21 GMT
broker-address: localhost
Server: Jetty(9.4.39.v20210325)
$ curl -i -X POST -d 1 -H 'Content-Type: application/json'
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
HTTP/1.1 204 No Content
Date: Thu, 03 Jun 2021 06:56:34 GMT
broker-address: localhost
Server: Jetty(9.4.39.v20210325)
```
Since this behavior is not a problem, I changed the type of `enabled` to
boolean.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse
asyncResponse, boolean author
});
}
}
+
+ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse
asyncResponse, String subName,
+ boolean authoritative, Boolean enabled) {
+ log.info("[{}] Attempting to change replicated subscription status to
{} - {} {}", clientAppId(), enabled,
+ topicName, subName);
+
+ if (enabled == null) {
+ asyncResponse.resume(new RestException(Status.BAD_REQUEST,
"Boolean type request body is required"));
+ return;
+ }
+
+ // Reject the request if the topic is not persistent
+ if (!topicName.isPersistent()) {
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot enable/disable replicated subscriptions on
non-persistent topics"));
+ return;
+ }
+
+ // Reject the request if the topic is not global
+ if (!topicName.isGlobal()) {
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot enable/disable replicated subscriptions on
non-global topics"));
+ return;
+ }
+
+ // Permission to consume this topic is required
+ try {
+ validateTopicOperation(topicName,
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // Redirect the request to the peer-cluster if the local cluster is
not included in the replication clusters
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName, authoritative,
+ enabled);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+ topicNamePartition.toString(), subName,
enabled));
+ } catch (Exception e) {
+ log.warn("[{}] Failed to change replicated
subscription status to {} - {} {}",
+ clientAppId(), enabled,
topicNamePartition, subName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception)
-> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse
+ .resume(new
RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+ return null;
+ } else if (t instanceof
PreconditionFailedException) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Cannot enable/disable replicated
subscriptions on non-global topics"));
+ return null;
+ } else {
+ log.warn("[{}] Failed to change replicated
subscription status to {} - {} {}",
+ clientAppId(), enabled, topicName,
subName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+ }
+
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName, authoritative,
+ enabled);
+ }
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to change replicated subscription status
to {} - {} {}", clientAppId(), enabled,
+ topicName, subName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+ }
+
+ private void
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse
asyncResponse,
+ String subName, boolean authoritative, Boolean enabled) {
Review comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]