eolivelli commented on a change in pull request #14367:
URL: https://github.com/apache/pulsar/pull/14367#discussion_r817505653



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -338,39 +336,60 @@ public void healthCheck(@Suspended AsyncResponse 
asyncResponse,
         NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
                 ? 
NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration())
                 : 
NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration());
-        String topicName = String.format("persistent://%s/%s", namespaceName, 
HEALTH_CHECK_TOPIC_SUFFIX);
+        final String topicName = String.format("persistent://%s/%s", 
namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
         LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), 
topicName);
-        String messageStr = UUID.randomUUID().toString();
+        final String messageStr = UUID.randomUUID().toString();
+        final String subscriptionName = "healthCheck-" + messageStr;
         // create non-partitioned topic manually and close the previous reader 
if present.
         return pulsar().getBrokerService().getTopic(topicName, true)
-                // check and clean all subscriptions
                 .thenCompose(topicOptional -> {
                     if (!topicOptional.isPresent()) {
                         LOG.error("[{}] Fail to run health check while get 
topic {}. because get null value.",
                                 clientAppId(), topicName);
-                        throw new RestException(Status.NOT_FOUND, "Topic [{}] 
not found after create.");
+                        throw new RestException(Status.NOT_FOUND,
+                                String.format("Topic [%s] not found after 
create.", topicName));
                     }
-                    Topic topic = topicOptional.get();
-                    // clean all subscriptions
-                    return 
FutureUtil.waitForAll(topic.getSubscriptions().values()
-                            
.stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
-                            .thenApply(__ -> topic);
-                }).thenCompose(topic -> {
                     try {
                         PulsarClient client = pulsar().getClient();
                         return 
client.newProducer(Schema.STRING).topic(topicName).createAsync()
-                                        
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
+                                
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
+                                        .subscriptionName(subscriptionName)
                                         
.startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
-                                                        
producer.sendAsync(messageStr).thenCompose(__ ->
-                                                                
healthCheckRecursiveReadNext(reader, messageStr))
-                                                        .thenCompose(__ -> {
-                                                            
List<CompletableFuture<Void>> closeFutures =
-                                                                    new 
ArrayList<>();
-                                                            
closeFutures.add(producer.closeAsync());
-                                                            
closeFutures.add(reader.closeAsync());
-                                                            return 
FutureUtil.waitForAll(closeFutures);
-                                                        })
-                                        ).thenAccept(ignore -> {});
+                                        producer.sendAsync(messageStr)
+                                                .thenCompose(__ -> 
healthCheckRecursiveReadNext(reader, messageStr))

Review comment:
       are you sure that the 'handle' will run even if there is a problem in 
"sendAsync" ?
   my understanding is that "thenCompose" is triggered only in case of success.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -338,39 +336,60 @@ public void healthCheck(@Suspended AsyncResponse 
asyncResponse,
         NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
                 ? 
NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration())
                 : 
NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), 
pulsar().getConfiguration());
-        String topicName = String.format("persistent://%s/%s", namespaceName, 
HEALTH_CHECK_TOPIC_SUFFIX);
+        final String topicName = String.format("persistent://%s/%s", 
namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
         LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), 
topicName);
-        String messageStr = UUID.randomUUID().toString();
+        final String messageStr = UUID.randomUUID().toString();
+        final String subscriptionName = "healthCheck-" + messageStr;
         // create non-partitioned topic manually and close the previous reader 
if present.
         return pulsar().getBrokerService().getTopic(topicName, true)
-                // check and clean all subscriptions
                 .thenCompose(topicOptional -> {
                     if (!topicOptional.isPresent()) {
                         LOG.error("[{}] Fail to run health check while get 
topic {}. because get null value.",
                                 clientAppId(), topicName);
-                        throw new RestException(Status.NOT_FOUND, "Topic [{}] 
not found after create.");
+                        throw new RestException(Status.NOT_FOUND,
+                                String.format("Topic [%s] not found after 
create.", topicName));
                     }
-                    Topic topic = topicOptional.get();
-                    // clean all subscriptions
-                    return 
FutureUtil.waitForAll(topic.getSubscriptions().values()
-                            
.stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
-                            .thenApply(__ -> topic);
-                }).thenCompose(topic -> {
                     try {
                         PulsarClient client = pulsar().getClient();
                         return 
client.newProducer(Schema.STRING).topic(topicName).createAsync()
-                                        
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
+                                
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
+                                        .subscriptionName(subscriptionName)
                                         
.startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
-                                                        
producer.sendAsync(messageStr).thenCompose(__ ->
-                                                                
healthCheckRecursiveReadNext(reader, messageStr))
-                                                        .thenCompose(__ -> {
-                                                            
List<CompletableFuture<Void>> closeFutures =
-                                                                    new 
ArrayList<>();
-                                                            
closeFutures.add(producer.closeAsync());
-                                                            
closeFutures.add(reader.closeAsync());
-                                                            return 
FutureUtil.waitForAll(closeFutures);
-                                                        })
-                                        ).thenAccept(ignore -> {});
+                                        producer.sendAsync(messageStr)
+                                                .thenCompose(__ -> 
healthCheckRecursiveReadNext(reader, messageStr))
+                                                .handle((unused, exception) -> 
{
+                                                    // no matter exception or 
success, we still need to
+                                                    // close producer/reader
+                                                    
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
+                                                    
closeFutures.add(producer.closeAsync());
+                                                    
closeFutures.add(reader.closeAsync());

Review comment:
       if the reader creation fails this 'reader' variable is null
   
   so we are missing the handling of the failure of the Reader creation, in 
that case we will miss to close the producer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to