Technoboy- commented on a change in pull request #11217:
URL: https://github.com/apache/pulsar/pull/11217#discussion_r663503738
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1233,86 +1246,84 @@ public void closeFailed(ManagedLedgerException
exception, Object ctx) {
log.debug("[{}] Checking replication status", name);
}
- Policies policies = null;
- try {
- policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, name.getNamespace()))
- .orElseThrow(() -> new KeeperException.NoNodeException());
- } catch (Exception e) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(new ServerMetadataException(e));
- return future;
- }
- //Ignore current broker's config for messageTTL for replication.
- final int newMessageTTLinSeconds;
- try {
- newMessageTTLinSeconds = getMessageTTL();
- } catch (Exception e) {
- return FutureUtil.failedFuture(new ServerMetadataException(e));
- }
+ CompletableFuture<Policies> policiesFuture =
brokerService.pulsar().getPulsarResources()
+ .getNamespaceResources()
+ .getAsync(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
+ .thenCompose(optPolicies -> {
+ if (!optPolicies.isPresent()) {
+ return FutureUtil.failedFuture(
+ new ServerMetadataException(
+ new
MetadataStoreException.NotFoundException()));
+ }
- Set<String> configuredClusters;
- if (policies.replication_clusters != null) {
- configuredClusters =
Sets.newTreeSet(policies.replication_clusters);
- } else {
- configuredClusters = Collections.emptySet();
- }
+ return
CompletableFuture.completedFuture(optPolicies.get());
+ });
- String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
+ CompletableFuture<Integer> ttlFuture = getMessageTTL();
- // if local cluster is removed from global namespace cluster-list :
then delete topic forcefully because pulsar
- // doesn't serve global topic without local repl-cluster configured.
- if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
- log.info("Deleting topic [{}] because local cluster is not part of
global namespace repl list {}",
- topic, configuredClusters);
- return deleteForcefully();
- }
+ return CompletableFuture.allOf(policiesFuture, ttlFuture)
+ .thenCompose(__ -> {
+ Policies policies = policiesFuture.join();
+ int newMessageTTLinSeconds = ttlFuture.join();
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ Set<String> configuredClusters;
+ if (policies.replication_clusters != null) {
+ configuredClusters =
Sets.newTreeSet(policies.replication_clusters);
+ } else {
+ configuredClusters = Collections.emptySet();
+ }
- // Check for missing replicators
- for (String cluster : configuredClusters) {
- if (cluster.equals(localCluster)) {
- continue;
- }
+ String localCluster =
brokerService.pulsar().getConfiguration().getClusterName();
- if (!replicators.containsKey(cluster)) {
- futures.add(startReplicator(cluster));
- }
- }
+ // if local cluster is removed from global namespace
cluster-list : then delete topic forcefully
+ // because pulsar doesn't serve global topic without local
repl-cluster configured.
+ if (TopicName.get(topic).isGlobal() &&
!configuredClusters.contains(localCluster)) {
+ log.info("Deleting topic [{}] because local cluster is
not part of " +
Review comment:
Checkstyle failed for
PersistentTopic.java:[1281,94] (whitespace) OperatorWrap: '+' should be on a
new line.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]