This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 64caf495a71 [fix][broker] Fix thread safety issue in info-internal
admin api for partitioned topics (#19021)
64caf495a71 is described below
commit 64caf495a714073c5f854c31db361b33ce464d0e
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Dec 23 03:42:48 2022 +0200
[fix][broker] Fix thread safety issue in info-internal admin api for
partitioned topics (#19021)
---
.../broker/admin/impl/PersistentTopicsBase.java | 42 +++++++++++-----------
1 file changed, 20 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a3cfecc0750..78c137f94e3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -25,6 +25,7 @@ import com.github.zafarkhaja.semver.Version;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
@@ -63,6 +64,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.admin.AdminResource;
@@ -1288,38 +1290,28 @@ public class PersistentTopicsBase extends AdminResource
{
getPartitionedTopicMetadataAsync(topicName, authoritative,
false)
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
- final List<CompletableFuture<String>> futures =
+ final List<CompletableFuture<Pair<String,
ManagedLedgerInfo>>> futures =
new ArrayList<>(partitionMetadata.partitions);
- PartitionedManagedLedgerInfo
partitionedManagedLedgerInfo = new PartitionedManagedLedgerInfo();
for (int i = 0; i < partitionMetadata.partitions; i++)
{
TopicName topicNamePartition =
topicName.getPartition(i);
try {
futures.add(pulsar().getAdminClient().topics()
.getInternalInfoAsync(topicNamePartition.toString())
- .whenComplete((response, throwable) ->
{
- if (throwable != null) {
- log.error("[{}] Failed to get
managed info for {}",
- clientAppId(),
topicNamePartition, throwable);
- asyncResponse.resume(new
RestException(throwable));
- }
+ .thenApply((response) -> {
try {
-
partitionedManagedLedgerInfo.partitions
-
.put(topicNamePartition.toString(), jsonMapper()
-
.readValue(response, ManagedLedgerInfo.class));
- } catch (JsonProcessingException
ex) {
- log.error("[{}] Failed to
parse ManagedLedgerInfo for {} from [{}]",
- clientAppId(),
topicNamePartition, response, ex);
+ return
Pair.of(topicNamePartition.toString(), jsonMapper()
+ .readValue(response,
ManagedLedgerInfo.class));
+ } catch (JsonProcessingException
e) {
+ throw new
UncheckedIOException(e);
}
- })
- );
+ }));
} catch (PulsarServerException e) {
log.error("[{}] Failed to get admin client
while get managed info for {}" ,
clientAppId(), topicNamePartition, e);
throw new RestException(e);
}
}
-
- FutureUtil.waitForAll(futures).handle((result,
exception) -> {
+ FutureUtil.waitForAll(futures).whenComplete((result,
exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
@@ -1329,11 +1321,17 @@ public class PersistentTopicsBase extends AdminResource
{
log.error("[{}] Failed to get managed info
for {}", clientAppId(), topicName, t);
asyncResponse.resume(new RestException(t));
}
+ } else {
+ PartitionedManagedLedgerInfo
partitionedManagedLedgerInfo =
+ new PartitionedManagedLedgerInfo();
+ for (CompletableFuture<Pair<String,
ManagedLedgerInfo>> infoFuture : futures) {
+ Pair<String, ManagedLedgerInfo> info =
infoFuture.getNow(null);
+
partitionedManagedLedgerInfo.partitions.put(info.getKey(), info.getValue());
+ }
+ asyncResponse.resume((StreamingOutput) output
-> {
+ jsonMapper().writer().writeValue(output,
partitionedManagedLedgerInfo);
+ });
}
- asyncResponse.resume((StreamingOutput) output -> {
- jsonMapper().writer().writeValue(output,
partitionedManagedLedgerInfo);
- });
- return null;
});
} else {
internalGetManagedLedgerInfoForNonPartitionedTopic(asyncResponse);