This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 64caf495a71 [fix][broker] Fix thread safety issue in info-internal 
admin api for partitioned topics (#19021)
64caf495a71 is described below

commit 64caf495a714073c5f854c31db361b33ce464d0e
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Dec 23 03:42:48 2022 +0200

    [fix][broker] Fix thread safety issue in info-internal admin api for 
partitioned topics (#19021)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 42 +++++++++++-----------
 1 file changed, 20 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a3cfecc0750..78c137f94e3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -25,6 +25,7 @@ import com.github.zafarkhaja.semver.Version;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collections;
@@ -63,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -1288,38 +1290,28 @@ public class PersistentTopicsBase extends AdminResource 
{
                 getPartitionedTopicMetadataAsync(topicName, authoritative, 
false)
                         .thenAccept(partitionMetadata -> {
                     if (partitionMetadata.partitions > 0) {
-                        final List<CompletableFuture<String>> futures =
+                        final List<CompletableFuture<Pair<String, 
ManagedLedgerInfo>>> futures =
                                 new ArrayList<>(partitionMetadata.partitions);
-                        PartitionedManagedLedgerInfo 
partitionedManagedLedgerInfo = new PartitionedManagedLedgerInfo();
                         for (int i = 0; i < partitionMetadata.partitions; i++) 
{
                             TopicName topicNamePartition = 
topicName.getPartition(i);
                             try {
                                 futures.add(pulsar().getAdminClient().topics()
                                         
.getInternalInfoAsync(topicNamePartition.toString())
-                                        .whenComplete((response, throwable) -> 
{
-                                            if (throwable != null) {
-                                                log.error("[{}] Failed to get 
managed info for {}",
-                                                        clientAppId(), 
topicNamePartition, throwable);
-                                                asyncResponse.resume(new 
RestException(throwable));
-                                            }
+                                        .thenApply((response) -> {
                                             try {
-                                                
partitionedManagedLedgerInfo.partitions
-                                                        
.put(topicNamePartition.toString(), jsonMapper()
-                                                                
.readValue(response, ManagedLedgerInfo.class));
-                                            } catch (JsonProcessingException 
ex) {
-                                                log.error("[{}] Failed to 
parse ManagedLedgerInfo for {} from [{}]",
-                                                        clientAppId(), 
topicNamePartition, response, ex);
+                                                return 
Pair.of(topicNamePartition.toString(), jsonMapper()
+                                                        .readValue(response, 
ManagedLedgerInfo.class));
+                                            } catch (JsonProcessingException 
e) {
+                                                throw new 
UncheckedIOException(e);
                                             }
-                                        })
-                                );
+                                        }));
                             } catch (PulsarServerException e) {
                                 log.error("[{}] Failed to get admin client 
while get managed info for {}" ,
                                         clientAppId(), topicNamePartition, e);
                                 throw new RestException(e);
                             }
                         }
-
-                        FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
+                        FutureUtil.waitForAll(futures).whenComplete((result, 
exception) -> {
                             if (exception != null) {
                                 Throwable t = exception.getCause();
                                 if (t instanceof NotFoundException) {
@@ -1329,11 +1321,17 @@ public class PersistentTopicsBase extends AdminResource 
{
                                     log.error("[{}] Failed to get managed info 
for {}", clientAppId(), topicName, t);
                                     asyncResponse.resume(new RestException(t));
                                 }
+                            } else {
+                                PartitionedManagedLedgerInfo 
partitionedManagedLedgerInfo =
+                                        new PartitionedManagedLedgerInfo();
+                                for (CompletableFuture<Pair<String, 
ManagedLedgerInfo>> infoFuture : futures) {
+                                    Pair<String, ManagedLedgerInfo> info = 
infoFuture.getNow(null);
+                                    
partitionedManagedLedgerInfo.partitions.put(info.getKey(), info.getValue());
+                                }
+                                asyncResponse.resume((StreamingOutput) output 
-> {
+                                    jsonMapper().writer().writeValue(output, 
partitionedManagedLedgerInfo);
+                                });
                             }
-                            asyncResponse.resume((StreamingOutput) output -> {
-                                jsonMapper().writer().writeValue(output, 
partitionedManagedLedgerInfo);
-                            });
-                            return null;
                         });
                     } else {
                         
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);

Reply via email to