mattisonchao commented on code in PR #17711:
URL: https://github.com/apache/pulsar/pull/17711#discussion_r979530556
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -148,6 +149,33 @@ public void getManagedLedgerInfo(String ledgerName,
boolean createIfMissing, Map
});
}
+ public CompletableFuture<Map<String, String>>
getManagedLedgerPropertiesAsync(String name) {
+ CompletableFuture<Map<String, String>> result = new
CompletableFuture<>();
+ getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+ @Override
+ public void operationComplete(MLDataFormats.ManagedLedgerInfo
mlInfo, Stat stat) {
+ HashMap<String, String> propertiesMap = new HashMap<>();
Review Comment:
It's better to give map an init value, `mlInfo.getPropertiesCount()`?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1408,14 +1408,38 @@ protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(final S
return topicFuture;
}
+ CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(TopicName
topicName) {
+ if (!topicName.isPartitioned()) {
+ return
managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding());
+ } else {
+ TopicName partitionedTopicName =
TopicName.get(topicName.getPartitionedTopicName());
+ return fetchPartitionedTopicMetadataAsync(partitionedTopicName)
+ .thenCompose(metadata -> {
+ if (metadata.partitions == 0) {
+ return
managedLedgerFactory.getManagedLedgerPropertiesAsync(
+ topicName.getPersistenceNamingEncoding());
+ }
+ return
CompletableFuture.completedFuture(metadata.properties);
+ });
+ }
+ }
+
private void checkOwnershipAndCreatePersistentTopic(final String topic,
boolean createIfMissing,
CompletableFuture<Optional<Topic>>
topicFuture,
Map<String, String> properties) {
TopicName topicName = TopicName.get(topic);
pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
.thenAccept(isActive -> {
if (isActive) {
- createPersistentTopic(topic, createIfMissing,
topicFuture, properties);
+ CompletableFuture<Map<String, String>>
propertiesFuture;
+ if (properties == null) {
+ //Read properties from storage when loading topic.
+ propertiesFuture =
fetchTopicPropertiesAsync(topicName);
+ } else {
+ propertiesFuture =
CompletableFuture.completedFuture(properties);
+ }
+ propertiesFuture.thenAccept(finalProperties->
Review Comment:
what about `propertiesFuture` got an exception?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -177,6 +178,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator>
shadowReplicators;
@Getter
private volatile List<String> shadowTopics;
+ @Getter
Review Comment:
I'd like to add get method return Optional to avoid potential NPE. :)
##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -148,6 +149,33 @@ public void getManagedLedgerInfo(String ledgerName,
boolean createIfMissing, Map
});
}
+ public CompletableFuture<Map<String, String>>
getManagedLedgerPropertiesAsync(String name) {
+ CompletableFuture<Map<String, String>> result = new
CompletableFuture<>();
+ getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+ @Override
+ public void operationComplete(MLDataFormats.ManagedLedgerInfo
mlInfo, Stat stat) {
+ HashMap<String, String> propertiesMap = new HashMap<>();
+ if (mlInfo.getPropertiesCount() > 0) {
+ for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
+ MLDataFormats.KeyValue property =
mlInfo.getProperties(i);
+ propertiesMap.put(property.getKey(),
property.getValue());
+ }
+ }
+ result.complete(propertiesMap);
+ }
+
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ if (e instanceof MetadataNotFoundException) {
+ result.complete(null);
Review Comment:
If we don't want to return the `MetadataNotFoundException` exception out, we
can give it the empty map, which will avoid NPE.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]