Technoboy- commented on a change in pull request #12136:
URL: https://github.com/apache/pulsar/pull/12136#discussion_r716460872



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2708,6 +2709,60 @@ protected PersistentOfflineTopicStats 
internalGetBacklog(boolean authoritative)
             });
     }
 
+    protected CompletableFuture<Void> 
internalSetReplicationClusters(List<String> clusterIds) {
+        validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        Set<String> replicationClusters = Sets.newHashSet(clusterIds);
+        if (replicationClusters.contains("global")) {
+            throw new RestException(Status.PRECONDITION_FAILED,
+                    "Cannot specify global in the list of replication 
clusters");
+        }
+        Set<String> clusters = clusters();
+        for (String clusterId : replicationClusters) {
+            if (!clusters.contains(clusterId)) {
+                throw new RestException(Status.FORBIDDEN, "Invalid cluster id: 
" + clusterId);
+            }
+            validatePeerClusterConflict(clusterId, replicationClusters);
+            validateClusterForTenant(namespaceName.getTenant(), clusterId);
+        }
+
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    
topicPolicies.setReplicationClusters(Lists.newArrayList(replicationClusters));
+                    return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies)
+                            .thenRun(() -> {
+                                log.info("[{}] Successfully set replication 
clusters for namespace={}, "
+                                                + "topic={}, clusters={}",
+                                        clientAppId(),
+                                        namespaceName,
+                                        topicName.getLocalName(),
+                                        
topicPolicies.getReplicationClusters());
+                            });
+                }
+        );
+    }
+
+    protected CompletableFuture<Void> internalRemoveReplicationClusters() {
+        validateTopicPolicyOperation(topicName, PolicyName.REPLICATION, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        return getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op -> {
+                    TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                    
topicPolicies.setReplicationClusters(Collections.emptyList());

Review comment:
       ok

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -2641,4 +2642,23 @@ public void 
testDoNotCreateSystemTopicForHeartbeatNamespace() {
         });
     }
 
+    @Test(timeOut = 30000)

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to