315157973 commented on a change in pull request #10790:
URL: https://github.com/apache/pulsar/pull/10790#discussion_r644498422
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,126 @@ protected void internalTruncateTopic(AsyncResponse
asyncResponse, boolean author
});
}
}
+
+ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse
asyncResponse, String subName,
+ boolean authoritative, Boolean enabled) {
+ log.info("[{}] Attempting to change replicated subscription status to
{} - {} {}", clientAppId(), enabled,
+ topicName, subName);
+
+ if (enabled == null) {
Review comment:
required = true, it is impossible to be null here, can we use boolean?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse
asyncResponse, boolean author
});
}
}
+
+ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse
asyncResponse, String subName,
+ boolean authoritative, Boolean enabled) {
+ log.info("[{}] Attempting to change replicated subscription status to
{} - {} {}", clientAppId(), enabled,
+ topicName, subName);
+
+ if (enabled == null) {
+ asyncResponse.resume(new RestException(Status.BAD_REQUEST,
"Boolean type request body is required"));
+ return;
+ }
+
+ // Reject the request if the topic is not persistent
+ if (!topicName.isPersistent()) {
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot enable/disable replicated subscriptions on
non-persistent topics"));
+ return;
+ }
+
+ // Reject the request if the topic is not global
+ if (!topicName.isGlobal()) {
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot enable/disable replicated subscriptions on
non-global topics"));
+ return;
+ }
+
+ // Permission to consume this topic is required
+ try {
+ validateTopicOperation(topicName,
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // Redirect the request to the peer-cluster if the local cluster is
not included in the replication clusters
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName, authoritative,
+ enabled);
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().setReplicatedSubscriptionStatusAsync(
+ topicNamePartition.toString(), subName,
enabled));
+ } catch (Exception e) {
+ log.warn("[{}] Failed to change replicated
subscription status to {} - {} {}",
+ clientAppId(), enabled,
topicNamePartition, subName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception)
-> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse
+ .resume(new
RestException(Status.NOT_FOUND, "Topic or subscription not found"));
+ return null;
+ } else if (t instanceof
PreconditionFailedException) {
+ asyncResponse.resume(new
RestException(Status.PRECONDITION_FAILED,
+ "Cannot enable/disable replicated
subscriptions on non-global topics"));
+ return null;
+ } else {
+ log.warn("[{}] Failed to change replicated
subscription status to {} - {} {}",
+ clientAppId(), enabled, topicName,
subName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+ }
+
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(asyncResponse,
subName, authoritative,
+ enabled);
+ }
+ }).exceptionally(ex -> {
+ log.warn("[{}] Failed to change replicated subscription status
to {} - {} {}", clientAppId(), enabled,
+ topicName, subName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+ }
+
+ private void
internalSetReplicatedSubscriptionStatusForNonPartitionedTopic(AsyncResponse
asyncResponse,
+ String subName, boolean authoritative, Boolean enabled) {
Review comment:
Should we use boolean
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4035,4 +4035,134 @@ protected void internalTruncateTopic(AsyncResponse
asyncResponse, boolean author
});
}
}
+
+ protected void internalSetReplicatedSubscriptionStatus(AsyncResponse
asyncResponse, String subName,
+ boolean authoritative, Boolean enabled) {
+ log.info("[{}] Attempting to change replicated subscription status to
{} - {} {}", clientAppId(), enabled,
+ topicName, subName);
+
+ if (enabled == null) {
+ asyncResponse.resume(new RestException(Status.BAD_REQUEST,
"Boolean type request body is required"));
+ return;
+ }
+
+ // Reject the request if the topic is not persistent
+ if (!topicName.isPersistent()) {
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot enable/disable replicated subscriptions on
non-persistent topics"));
+ return;
+ }
+
+ // Reject the request if the topic is not global
+ if (!topicName.isGlobal()) {
+ asyncResponse.resume(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Cannot enable/disable replicated subscriptions on
non-global topics"));
+ return;
+ }
+
+ // Permission to consume this topic is required
+ try {
+ validateTopicOperation(topicName,
TopicOperation.SET_REPLICATED_SUBSCRIPTION_STATUS, subName);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // Redirect the request to the peer-cluster if the local cluster is
not included in the replication clusters
+ try {
+ validateGlobalNamespaceOwnership(namespaceName);
+ } catch (Exception e) {
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
Review comment:
Is there missing one `!`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]