Technoboy- commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r716460872
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2708,6 +2709,60 @@ protected PersistentOfflineTopicStats
internalGetBacklog(boolean authoritative)
});
}
+ protected CompletableFuture<Void>
internalSetReplicationClusters(List<String> clusterIds) {
+ validateTopicPolicyOperation(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
+
+ Set<String> replicationClusters = Sets.newHashSet(clusterIds);
+ if (replicationClusters.contains("global")) {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Cannot specify global in the list of replication
clusters");
+ }
+ Set<String> clusters = clusters();
+ for (String clusterId : replicationClusters) {
+ if (!clusters.contains(clusterId)) {
+ throw new RestException(Status.FORBIDDEN, "Invalid cluster id:
" + clusterId);
+ }
+ validatePeerClusterConflict(clusterId, replicationClusters);
+ validateClusterForTenant(namespaceName.getTenant(), clusterId);
+ }
+
+ return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+
topicPolicies.setReplicationClusters(Lists.newArrayList(replicationClusters));
+ return
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName,
topicPolicies)
+ .thenRun(() -> {
+ log.info("[{}] Successfully set replication
clusters for namespace={}, "
+ + "topic={}, clusters={}",
+ clientAppId(),
+ namespaceName,
+ topicName.getLocalName(),
+
topicPolicies.getReplicationClusters());
+ });
+ }
+ );
+ }
+
+ protected CompletableFuture<Void> internalRemoveReplicationClusters() {
+ validateTopicPolicyOperation(topicName, PolicyName.REPLICATION,
PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
+
+ return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+ TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);
+
topicPolicies.setReplicationClusters(Collections.emptyList());
Review comment:
ok
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -2641,4 +2642,23 @@ public void
testDoNotCreateSystemTopicForHeartbeatNamespace() {
});
}
+ @Test(timeOut = 30000)
Review comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]