mattisonchao commented on code in PR #17711:
URL: https://github.com/apache/pulsar/pull/17711#discussion_r979530556


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -148,6 +149,33 @@ public void getManagedLedgerInfo(String ledgerName, 
boolean createIfMissing, Map
                 });
     }
 
+    public CompletableFuture<Map<String, String>> 
getManagedLedgerPropertiesAsync(String name) {
+        CompletableFuture<Map<String, String>> result = new 
CompletableFuture<>();
+        getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo 
mlInfo, Stat stat) {
+                HashMap<String, String> propertiesMap = new HashMap<>();

Review Comment:
   It's better to give map an init value, `mlInfo.getPropertiesCount()`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -1408,14 +1408,38 @@ protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final S
         return topicFuture;
     }
 
+    CompletableFuture<Map<String, String>> fetchTopicPropertiesAsync(TopicName 
topicName) {
+        if (!topicName.isPartitioned()) {
+            return 
managedLedgerFactory.getManagedLedgerPropertiesAsync(topicName.getPersistenceNamingEncoding());
+        } else {
+            TopicName partitionedTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+            return fetchPartitionedTopicMetadataAsync(partitionedTopicName)
+                    .thenCompose(metadata -> {
+                        if (metadata.partitions == 0) {
+                            return 
managedLedgerFactory.getManagedLedgerPropertiesAsync(
+                                    topicName.getPersistenceNamingEncoding());
+                        }
+                        return 
CompletableFuture.completedFuture(metadata.properties);
+                    });
+        }
+    }
+
     private void checkOwnershipAndCreatePersistentTopic(final String topic, 
boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
                                        Map<String, String> properties) {
         TopicName topicName = TopicName.get(topic);
         pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName)
                 .thenAccept(isActive -> {
                     if (isActive) {
-                        createPersistentTopic(topic, createIfMissing, 
topicFuture, properties);
+                        CompletableFuture<Map<String, String>> 
propertiesFuture;
+                        if (properties == null) {
+                            //Read properties from storage when loading topic.
+                            propertiesFuture = 
fetchTopicPropertiesAsync(topicName);
+                        } else {
+                            propertiesFuture = 
CompletableFuture.completedFuture(properties);
+                        }
+                        propertiesFuture.thenAccept(finalProperties->

Review Comment:
   what about `propertiesFuture` got an exception?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -177,6 +178,8 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     private final ConcurrentOpenHashMap<String/*ShadowTopic*/, Replicator> 
shadowReplicators;
     @Getter
     private volatile List<String> shadowTopics;
+    @Getter

Review Comment:
   I'd like to add get method return Optional to avoid potential NPE. :)



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java:
##########
@@ -148,6 +149,33 @@ public void getManagedLedgerInfo(String ledgerName, 
boolean createIfMissing, Map
                 });
     }
 
+    public CompletableFuture<Map<String, String>> 
getManagedLedgerPropertiesAsync(String name) {
+        CompletableFuture<Map<String, String>> result = new 
CompletableFuture<>();
+        getManagedLedgerInfo(name, false, new MetaStoreCallback<>() {
+            @Override
+            public void operationComplete(MLDataFormats.ManagedLedgerInfo 
mlInfo, Stat stat) {
+                HashMap<String, String> propertiesMap = new HashMap<>();
+                if (mlInfo.getPropertiesCount() > 0) {
+                    for (int i = 0; i < mlInfo.getPropertiesCount(); i++) {
+                        MLDataFormats.KeyValue property = 
mlInfo.getProperties(i);
+                        propertiesMap.put(property.getKey(), 
property.getValue());
+                    }
+                }
+                result.complete(propertiesMap);
+            }
+
+            @Override
+            public void operationFailed(MetaStoreException e) {
+                if (e instanceof MetadataNotFoundException) {
+                    result.complete(null);

Review Comment:
   If we don't want to return the `MetadataNotFoundException` exception out, we 
can give it the empty map, which will avoid NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to