massakam commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644513191



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse 
asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse 
asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to 
{} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       `enabled` can be null if the request body is empty.
   ```sh
   $ curl -i \
     -X POST \
     -d '' \
     -H 'Content-Type: application/json' \
     
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 06:04:10 GMT
   broker-address: localhost
   Content-Type: application/json
   Content-Length: 50
   Server: Jetty(9.4.39.v20210325)
   
   {"reason":"Boolean type request body is required"}
   ```
   I think the "required = true" specification is just an option used when 
generating Swagger documentation.
   
https://docs.swagger.io/swagger-core/v1.5.0/apidocs/io/swagger/annotations/ApiParam.html

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse 
asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse 
asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to 
{} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, 
"Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on 
non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on 
non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, 
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is 
not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {

Review comment:
       No, "!" is not needed here. This method returns true if the topic name 
suffix is "-partition-x".
   ```java
   
System.out.println(TopicName.get("persistent://public/default/massakam").isPartitioned());
   
System.out.println(TopicName.get("persistent://public/default/massakam-partition-0").isPartitioned());
   ```
   ```sh
   false
   true
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse 
asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse 
asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to 
{} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {

Review comment:
       If boolean is used instead of Boolean, it seems that an error response 
will be returned unless the request body can be cast to boolean type.
   ```sh
   $ curl -i -X POST -d '' -H 'Content-Type: application/json' 
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 06:56:00 GMT
   broker-address: localhost
   Content-Type: application/json
   Content-Length: 35
   Server: Jetty(9.4.39.v20210325)
   
   The request entity cannot be empty.
   
   $ curl -i -X POST -d '"foo"' -H 'Content-Type: application/json' 
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 400 Bad Request
   Date: Thu, 03 Jun 2021 07:01:15 GMT
   broker-address: localhost
   Content-Type: text/plain
   Content-Length: 248
   Server: Jetty(9.4.39.v20210325)
   
   Cannot deserialize value of type `boolean` from String "foo": only 
"true"/"True"/"TRUE" or "false"/"False"/"FALSE" recognized
    at [Source: 
(org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream);
 line: 1, column: 1]
   
   $ curl -i -X POST -d 0 -H 'Content-Type: application/json' 
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 204 No Content
   Date: Thu, 03 Jun 2021 06:56:21 GMT
   broker-address: localhost
   Server: Jetty(9.4.39.v20210325)
   
   $ curl -i -X POST -d 1 -H 'Content-Type: application/json' 
http://localhost:8080/admin/v2/persistent/public/default/massakam/subscription/sub1/replicatedSubscriptionStatus
   
   HTTP/1.1 204 No Content
   Date: Thu, 03 Jun 2021 06:56:34 GMT
   broker-address: localhost
   Server: Jetty(9.4.39.v20210325)
   ```
   Since this behavior is not a problem, I changed the type of `enabled` to 
boolean.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse 
asyncResponse, boolean author
             });
         }
     }
+
+    protected void internalSetReplicatedSubscriptionStatus(AsyncResponse 
asyncResponse, String subName,
+            boolean authoritative, Boolean enabled) {
+        log.info("[{}] Attempting to change replicated subscription status to 
{} - {} {}", clientAppId(), enabled,
+                topicName, subName);
+
+        if (enabled == null) {
+            asyncResponse.resume(new RestException(Status.BAD_REQUEST, 
"Boolean type request body is required"));
+            return;
+        }
+
+        // Reject the request if the topic is not persistent
+        if (!topicName.isPersistent()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on 
non-persistent topics"));
+            return;
+        }
+
+        // Reject the request if the topic is not global
+        if (!topicName.isGlobal()) {
+            asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot enable/disable replicated subscriptions on 
non-global topics"));
+            return;
+        }
+
+        // Permission to consume this topic is required
+        try {
+            validateTopicOperation(topicName, 
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // Redirect the request to the peer-cluster if the local cluster is 
not included in the replication clusters
+        try {
+            validateGlobalNamespaceOwnership(namespaceName);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, 
subName, authoritative,
+                    enabled);
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions > 0) {
+                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                        try {
+                            
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+                                    topicNamePartition.toString(), subName, 
enabled));
+                        } catch (Exception e) {
+                            log.warn("[{}] Failed to change replicated 
subscription status to {} - {} {}",
+                                    clientAppId(), enabled, 
topicNamePartition, subName, e);
+                            resumeAsyncResponseExceptionally(asyncResponse, e);
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
+                        if (exception != null) {
+                            Throwable t = exception.getCause();
+                            if (t instanceof NotFoundException) {
+                                asyncResponse
+                                        .resume(new 
RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+                                return null;
+                            } else if (t instanceof 
PreconditionFailedException) {
+                                asyncResponse.resume(new 
RestException(Status.PRECONDITION_FAILED,
+                                        "Cannot enable/disable replicated 
subscriptions on non-global topics"));
+                                return null;
+                            } else {
+                                log.warn("[{}] Failed to change replicated 
subscription status to {} - {} {}",
+                                        clientAppId(), enabled, topicName, 
subName, t);
+                                asyncResponse.resume(new RestException(t));
+                                return null;
+                            }
+                        }
+
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse, 
subName, authoritative,
+                            enabled);
+                }
+            }).exceptionally(ex -> {
+                log.warn("[{}] Failed to change replicated subscription status 
to {} - {} {}", clientAppId(), enabled,
+                        topicName, subName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    private void 
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse 
asyncResponse,
+            String subName, boolean authoritative, Boolean enabled) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to