Technoboy- commented on a change in pull request #11217:
URL: https://github.com/apache/pulsar/pull/11217#discussion_r663503738



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1233,86 +1246,84 @@ public void closeFailed(ManagedLedgerException 
exception, Object ctx) {
             log.debug("[{}] Checking replication status", name);
         }
 
-        Policies policies = null;
-        try {
-            policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, name.getNamespace()))
-                    .orElseThrow(() -> new KeeperException.NoNodeException());
-        } catch (Exception e) {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.completeExceptionally(new ServerMetadataException(e));
-            return future;
-        }
-        //Ignore current broker's config for messageTTL for replication.
-        final int newMessageTTLinSeconds;
-        try {
-            newMessageTTLinSeconds = getMessageTTL();
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(new ServerMetadataException(e));
-        }
+        CompletableFuture<Policies> policiesFuture = 
brokerService.pulsar().getPulsarResources()
+                .getNamespaceResources()
+                .getAsync(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
+                .thenCompose(optPolicies -> {
+                            if (!optPolicies.isPresent()) {
+                                return FutureUtil.failedFuture(
+                                        new ServerMetadataException(
+                                                new 
MetadataStoreException.NotFoundException()));
+                            }
 
-        Set<String> configuredClusters;
-        if (policies.replication_clusters != null) {
-            configuredClusters = 
Sets.newTreeSet(policies.replication_clusters);
-        } else {
-            configuredClusters = Collections.emptySet();
-        }
+                            return 
CompletableFuture.completedFuture(optPolicies.get());
+                        });
 
-        String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
+        CompletableFuture<Integer> ttlFuture = getMessageTTL();
 
-        // if local cluster is removed from global namespace cluster-list : 
then delete topic forcefully because pulsar
-        // doesn't serve global topic without local repl-cluster configured.
-        if (TopicName.get(topic).isGlobal() && 
!configuredClusters.contains(localCluster)) {
-            log.info("Deleting topic [{}] because local cluster is not part of 
global namespace repl list {}",
-                    topic, configuredClusters);
-            return deleteForcefully();
-        }
+        return CompletableFuture.allOf(policiesFuture, ttlFuture)
+                .thenCompose(__ -> {
+                    Policies policies = policiesFuture.join();
+                    int newMessageTTLinSeconds = ttlFuture.join();
 
-        List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    Set<String> configuredClusters;
+                    if (policies.replication_clusters != null) {
+                        configuredClusters = 
Sets.newTreeSet(policies.replication_clusters);
+                    } else {
+                        configuredClusters = Collections.emptySet();
+                    }
 
-        // Check for missing replicators
-        for (String cluster : configuredClusters) {
-            if (cluster.equals(localCluster)) {
-                continue;
-            }
+                    String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
 
-            if (!replicators.containsKey(cluster)) {
-                futures.add(startReplicator(cluster));
-            }
-        }
+                    // if local cluster is removed from global namespace 
cluster-list : then delete topic forcefully
+                    // because pulsar doesn't serve global topic without local 
repl-cluster configured.
+                    if (TopicName.get(topic).isGlobal() && 
!configuredClusters.contains(localCluster)) {
+                        log.info("Deleting topic [{}] because local cluster is 
not part of " +

Review comment:
       Checkstyle failed for
   PersistentTopic.java:[1281,94] (whitespace) OperatorWrap: '+' should be on a 
new line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to