mattisonchao commented on a change in pull request #14367:
URL: https://github.com/apache/pulsar/pull/14367#discussion_r815252492



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -343,41 +350,81 @@ public void healthCheck(@Suspended AsyncResponse 
asyncResponse,
         String messageStr = UUID.randomUUID().toString();
         // create non-partitioned topic manually and close the previous reader 
if present.
         return pulsar().getBrokerService().getTopic(topicName, true)
-                // check and clean all subscriptions
                 .thenCompose(topicOptional -> {
                     if (!topicOptional.isPresent()) {
                         LOG.error("[{}] Fail to run health check while get 
topic {}. because get null value.",
                                 clientAppId(), topicName);
                         throw new RestException(Status.NOT_FOUND, "Topic [{}] 
not found after create.");
                     }
-                    Topic topic = topicOptional.get();
-                    // clean all subscriptions
-                    return 
FutureUtil.waitForAll(topic.getSubscriptions().values()
-                            
.stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
-                            .thenApply(__ -> topic);
-                }).thenCompose(topic -> {
+                    return tryCleanPreviousSubscriptions(topicOptional.get());
+                }).thenCompose(unused-> {
                     try {
                         PulsarClient client = pulsar().getClient();
                         return 
client.newProducer(Schema.STRING).topic(topicName).createAsync()
-                                        
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
+                                
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
                                         
.startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
-                                                        
producer.sendAsync(messageStr).thenCompose(__ ->
-                                                                
healthCheckRecursiveReadNext(reader, messageStr))
-                                                        .thenCompose(__ -> {
-                                                            
List<CompletableFuture<Void>> closeFutures =
-                                                                    new 
ArrayList<>();
-                                                            
closeFutures.add(producer.closeAsync());
-                                                            
closeFutures.add(reader.closeAsync());
-                                                            return 
FutureUtil.waitForAll(closeFutures);
-                                                        })
-                                        ).thenAccept(ignore -> {});
+                                        
producer.sendAsync(messageStr).thenCompose(__ ->
+                                                        
healthCheckRecursiveReadNext(reader, messageStr))
+                                                .thenCompose(__ -> {
+                                                    
List<CompletableFuture<Void>> closeFutures =
+                                                            new ArrayList<>();
+                                                    
closeFutures.add(producer.closeAsync());
+                                                    
closeFutures.add(reader.closeAsync());
+                                                    return 
FutureUtil.waitForAll(closeFutures);
+                                                })
+                                ).thenAccept(ignore -> {});
                     } catch (PulsarServerException e) {
                         LOG.error("[{}] Fail to run health check while get 
client.", clientAppId());
                         throw new RestException(e);
                     }
                 });
     }
 
+    private CompletableFuture<Void> tryCleanPreviousSubscriptions(Topic topic) 
{
+        // clean all subscriptions
+        return FutureUtil.waitForAll(topic.getSubscriptions().values()
+                .stream().filter(subscription ->
+                // All system topics are using compaction, even though is not 
explicitly set in the policies.
+                        
!subscription.getName().equals(Compactor.COMPACTION_SUBSCRIPTION))
+                .map(Subscription::delete).collect(Collectors.toList()))

Review comment:
       @Technoboy- 
   Because all the system topic has compaction subscription that name is 
``__compaction``, every time we call health check, we will create a new reader 
with a subscription name's ``reader-xxx``. we just need to remove the reader 
subscriptions and don't need to remove ``__compaction`` subscription.
   I'm not sure if I explain clearly, please let me know what you think, very 
thanks~
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to