sijie closed pull request #1652: Refactored BrokerService to use Optional in 
topics map
URL: https://github.com/apache/incubator-pulsar/pull/1652
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f47796d93..683bb0331 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -137,7 +137,7 @@
     private final int port;
     private final int tlsPort;
 
-    private final ConcurrentOpenHashMap<String, CompletableFuture<Topic>> 
topics;
+    private final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics;
 
     private final ConcurrentOpenHashMap<String, PulsarClient> 
replicationClients;
 
@@ -154,7 +154,7 @@
     private static final ConcurrentOpenHashMap<String, ConfigField> 
dynamicConfigurationMap = prepareDynamicConfigurationMap();
     private final ConcurrentOpenHashMap<String, Consumer<?>> 
configRegisteredListeners;
 
-    private final ConcurrentLinkedQueue<Pair<String, 
CompletableFuture<Topic>>> pendingTopicLoadingQueue;
+    private final ConcurrentLinkedQueue<Pair<String, 
CompletableFuture<Optional<Topic>>>> pendingTopicLoadingQueue;
 
     private AuthorizationService authorizationService = null;
     private final ScheduledExecutorService statsUpdater;
@@ -422,19 +422,19 @@ public void unloadNamespaceBundlesGracefully() {
     }
 
     public CompletableFuture<Optional<Topic>> getTopicIfExists(final String 
topic) {
-        return getTopic(topic, false /* createIfMissing */ ).thenApply(t -> 
Optional.ofNullable(t));
+        return getTopic(topic, false /* createIfMissing */ );
     }
 
     public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
-        return getTopic(topic, true /* createIfMissing */ );
+        return getTopic(topic, true /* createIfMissing */ 
).thenApply(Optional::get);
     }
 
-    private CompletableFuture<Topic> getTopic(final String topic, boolean 
createIfMissing) {
+    private CompletableFuture<Optional<Topic>> getTopic(final String topic, 
boolean createIfMissing) {
         try {
-            CompletableFuture<Topic> topicFuture = topics.get(topic);
+            CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
             if (topicFuture != null) {
                 if (topicFuture.isCompletedExceptionally()
-                        || (topicFuture.isDone() && topicFuture.getNow(null) 
== null)) {
+                        || (topicFuture.isDone() && 
!topicFuture.getNow(Optional.empty()).isPresent())) {
                     // Exceptional topics should be recreated.
                     topics.remove(topic, topicFuture);
                 } else {
@@ -461,8 +461,8 @@ public void unloadNamespaceBundlesGracefully() {
         }
     }
 
-    private CompletableFuture<Topic> createNonPersistentTopic(String topic) {
-        CompletableFuture<Topic> topicFuture = new CompletableFuture<Topic>();
+    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String 
topic) {
+        CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
 
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
             if (log.isDebugEnabled()) {
@@ -480,7 +480,7 @@ public void unloadNamespaceBundlesGracefully() {
             long topicLoadLatencyMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
             pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
             addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            topicFuture.complete(nonPersistentTopic);
+            topicFuture.complete(Optional.of(nonPersistentTopic));
         });
         replicationFuture.exceptionally((ex) -> {
             log.warn("Replication check failed. Removing topic from topics 
list {}, {}", topic, ex);
@@ -549,10 +549,10 @@ public PulsarClient getReplicationClient(String cluster) {
      * @return CompletableFuture<Topic>
      * @throws RuntimeException
      */
-    protected CompletableFuture<Topic> loadOrCreatePersistentTopic(final 
String topic, boolean createIfMissing) throws RuntimeException {
+    protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws 
RuntimeException {
         checkTopicNsOwnership(topic);
 
-        final CompletableFuture<Topic> topicFuture = new CompletableFuture<>();
+        final CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
         if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
             if (log.isDebugEnabled()) {
                 log.debug("Broker is unable to load persistent topic {}", 
topic);
@@ -572,7 +572,7 @@ public PulsarClient getReplicationClient(String cluster) {
                 return null;
             });
         } else {
-            pendingTopicLoadingQueue.add(new ImmutablePair<String, 
CompletableFuture<Topic>>(topic, topicFuture));
+            pendingTopicLoadingQueue.add(new ImmutablePair<String, 
CompletableFuture<Optional<Topic>>>(topic, topicFuture));
             if (log.isDebugEnabled()) {
                 log.debug("topic-loading for {} added into pending queue", 
topic);
             }
@@ -580,7 +580,7 @@ public PulsarClient getReplicationClient(String cluster) {
         return topicFuture;
     }
 
-    private void createPersistentTopic(final String topic, boolean 
createIfMissing, CompletableFuture<Topic> topicFuture) {
+    private void createPersistentTopic(final String topic, boolean 
createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
 
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         TopicName topicName = TopicName.get(topic);
@@ -615,7 +615,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object 
ctx) {
                                             - topicCreateTimeMs;
                                     
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                                     addTopicToStatsMaps(topicName, 
persistentTopic);
-                                    topicFuture.complete(persistentTopic);
+                                    
topicFuture.complete(Optional.of(persistentTopic));
                                 }).exceptionally((ex) -> {
                                     log.warn(
                                             "Replication or dedup check 
failed. Removing topic from topics list {}, {}",
@@ -638,7 +638,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object 
ctx) {
                         public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                             if (!createIfMissing && exception instanceof 
ManagedLedgerNotFoundException) {
                                 // We were just trying to load a topic and the 
topic doesn't exist
-                                topicFuture.complete(null);
+                                topicFuture.complete(Optional.empty());
                             } else {
                                 log.warn("Failed to create topic {}", topic, 
exception);
                                 pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
@@ -782,9 +782,9 @@ public void invalidateOfflineTopicStatCache(TopicName 
topicName) {
      * This method will not make the broker attempt to load the topic if it's 
not already.
      */
     public Optional<Topic> getTopicReference(String topic) {
-          CompletableFuture<Topic> future = topics.get(topic);
+          CompletableFuture<Optional<Topic>> future = topics.get(topic);
         if (future != null && future.isDone() && 
!future.isCompletedExceptionally()) {
-            return Optional.ofNullable(future.join());
+            return future.join();
         } else {
             return Optional.empty();
         }
@@ -814,27 +814,27 @@ public Semaphore getLookupRequestSemaphore() {
 
     public void checkGC(int gcIntervalInSeconds) {
         topics.forEach((n, t) -> {
-            Topic topic = t.isCompletedExceptionally() ? null : t.getNow(null);
-            if (topic != null) {
-                topic.checkGC(gcIntervalInSeconds);
+            Optional<Topic> topic = extractTopic(t);
+            if (topic.isPresent()) {
+                topic.get().checkGC(gcIntervalInSeconds);
             }
         });
     }
 
     public void checkMessageExpiry() {
         topics.forEach((n, t) -> {
-            Topic topic = t.getNow(null);
-            if (topic != null) {
-                topic.checkMessageExpiry();
+            Optional<Topic> topic = extractTopic(t);
+            if (topic.isPresent()) {
+                topic.get().checkMessageExpiry();
             }
         });
     }
 
     public void checkMessageDeduplicationInfo() {
         topics.forEach((n, t) -> {
-            Topic topic = t.getNow(null);
-            if (topic != null) {
-                topic.checkMessageDeduplicationInfo();
+            Optional<Topic> topic = extractTopic(t);
+            if (topic.isPresent()) {
+                topic.get().checkMessageDeduplicationInfo();
             }
         });
     }
@@ -868,8 +868,9 @@ public boolean isBacklogExceeded(PersistentTopic topic) {
     public void monitorBacklogQuota() {
         topics.forEach((n, t) -> {
             try {
-                if (t.getNow(null) != null && t.getNow(null) instanceof 
PersistentTopic) {
-                    PersistentTopic topic = (PersistentTopic) t.getNow(null);
+                Optional<Topic> optionalTopic = extractTopic(t);
+                if (optionalTopic.isPresent() && optionalTopic.get() 
instanceof PersistentTopic) {
+                    PersistentTopic topic = (PersistentTopic) 
optionalTopic.get();
                     if (isBacklogExceeded(topic)) {
                         
getBacklogQuotaManager().handleExceededBacklogQuota(topic);
                     } else if (topic == null) {
@@ -921,7 +922,8 @@ void checkTopicNsOwnership(final String topic) throws 
RuntimeException {
             if (serviceUnit.includes(topicName)) {
                 // Topic needs to be unloaded
                 log.info("[{}] Unloading topic", topicName);
-                closeFutures.add(topicFuture.thenCompose(Topic::close));
+                closeFutures.add(topicFuture
+                        .thenCompose(t -> t.isPresent() ? t.get().close() : 
CompletableFuture.completedFuture(null)));
             }
         });
         CompletableFuture<Void> aggregator = 
FutureUtil.waitForAll(closeFutures);
@@ -978,7 +980,7 @@ public int getNumberOfNamespaceBundles() {
         return this.numberOfNamespaceBundles;
     }
 
-    public ConcurrentOpenHashMap<String, CompletableFuture<Topic>> getTopics() 
{
+    public ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
getTopics() {
         return topics;
     }
 
@@ -996,7 +998,8 @@ public void onUpdate(String path, Policies data, Stat stat) 
{
                     if (log.isDebugEnabled()) {
                         log.debug("Notifying topic that policies have changed: 
{}", name);
                     }
-                    topic.onPoliciesUpdate(data);
+
+                    topic.ifPresent(t -> t.onPoliciesUpdate(data));
                 });
             }
         });
@@ -1033,9 +1036,9 @@ public String generateUniqueProducerName() {
     public Map<String, PersistentTopicStats> getTopicStats() {
         HashMap<String, PersistentTopicStats> stats = new HashMap<>();
         topics.forEach((name, topicFuture) -> {
-            Topic currentTopic = topicFuture.getNow(null);
-            if (currentTopic != null) {
-                stats.put(name, currentTopic.getStats());
+            Optional<Topic> topic = extractTopic(topicFuture);
+            if (topic.isPresent()) {
+                stats.put(name, topic.get().getStats());
             }
         });
         return stats;
@@ -1128,18 +1131,12 @@ private void updateTopicMessageDispatchRate() {
         this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
             topics.forEach((name, topicFuture) -> {
-                if (topicFuture.isDone()) {
-                    String topicName = null;
-                    try {
-                        if (topicFuture.get() instanceof PersistentTopic) {
-                            PersistentTopic topic = (PersistentTopic) 
topicFuture.get();
-                            topicName = topicFuture.get().getName();
-                            // it first checks namespace-policy rate and if 
not present then applies broker-config
-                            
topic.getDispatchRateLimiter().updateDispatchRate();
-                        }
-                    } catch (Exception e) {
-                        log.warn("[{}] failed to update message-dispatch rate 
{}", topicName, e.getMessage());
-                    }
+                Optional<Topic> topic = extractTopic(topicFuture);
+
+                if (topic.isPresent() && topic.get() instanceof 
PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) 
topic.get();
+                    // it first checks namespace-policy rate and if not 
present then applies broker-config
+                    
persistentTopic.getDispatchRateLimiter().updateDispatchRate();
                 }
             });
         });
@@ -1149,22 +1146,19 @@ private void updateSubscriptionMessageDispatchRate() {
         this.pulsar().getExecutor().submit(() -> {
             // update message-rate for each topic subscription
             topics.forEach((name, topicFuture) -> {
-                if (topicFuture.isDone()) {
-                    try {
-                        topicFuture.get().getSubscriptions().forEach((subName, 
persistentSubscription) -> {
-                            if (persistentSubscription
-                                .getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
-                                ((PersistentDispatcherMultipleConsumers) 
persistentSubscription
-                                    
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
-                            } else if (persistentSubscription
+                Optional<Topic> topic = extractTopic(topicFuture);
+
+                if (topic.isPresent()) {
+                    topic.get().getSubscriptions().forEach((subName, 
persistentSubscription) -> {
+                        if (persistentSubscription.getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
+                            ((PersistentDispatcherMultipleConsumers) 
persistentSubscription.getDispatcher())
+                                    
.getDispatchRateLimiter().updateDispatchRate();
+                        } else if (persistentSubscription
                                 .getDispatcher() instanceof 
PersistentDispatcherSingleActiveConsumer) {
-                                ((PersistentDispatcherSingleActiveConsumer) 
persistentSubscription
-                                    
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
-                            }
-                        });
-                    } catch (Exception e) {
-                        log.warn("Failed to get topic from future while update 
subscription dispatch rate ", e);
-                    }
+                            ((PersistentDispatcherSingleActiveConsumer) 
persistentSubscription.getDispatcher())
+                                    
.getDispatchRateLimiter().updateDispatchRate();
+                        }
+                    });
                 }
             });
         });
@@ -1177,11 +1171,13 @@ private void updateManagedLedgerConfig() {
                 if (topicFuture.isDone()) {
                     String topicName = null;
                     try {
-                        if (topicFuture.getNow(null) instanceof 
PersistentTopic) {
-                            PersistentTopic topic = (PersistentTopic) 
topicFuture.get();
-                            topicName = topicFuture.get().getName();
+                        Optional<Topic> topic = extractTopic(topicFuture);
+
+                        if (topic.isPresent() && topic.get() instanceof 
PersistentTopic) {
+                            PersistentTopic persistentTopic = 
(PersistentTopic) topic.get();
+                            topicName = persistentTopic.getName();
                             // update skipNonRecoverableLedger configuration
-                            
topic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
+                            
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
                                     
pulsar.getConfiguration().isAutoSkipNonRecoverableData());
                         }
                     } catch (Exception e) {
@@ -1340,7 +1336,7 @@ public static boolean validateDynamicConfiguration(String 
key, String value) {
      * permit if it was successful to acquire it.
      */
     private void createPendingLoadTopic() {
-        Pair<String, CompletableFuture<Topic>> pendingTopic = 
pendingTopicLoadingQueue.poll();
+        Pair<String, CompletableFuture<Optional<Topic>>> pendingTopic = 
pendingTopicLoadingQueue.poll();
         if (pendingTopic == null) {
             return;
         }
@@ -1348,7 +1344,7 @@ private void createPendingLoadTopic() {
         final String topic = pendingTopic.getLeft();
         try {
             checkTopicNsOwnership(topic);
-            CompletableFuture<Topic> pendingFuture = pendingTopic.getRight();
+            CompletableFuture<Optional<Topic>> pendingFuture = 
pendingTopic.getRight();
             final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
             final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
             createPersistentTopic(topic, true, pendingFuture);
@@ -1442,25 +1438,21 @@ private void blockDispatchersWithLargeUnAckMessages() {
         lock.readLock().lock();
         try {
             topics.forEach((name, topicFuture) -> {
-                if (topicFuture.isDone()) {
-                    try {
-                        topicFuture.get().getSubscriptions().forEach((subName, 
persistentSubscription) -> {
-                            if (persistentSubscription
-                                    .getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
-                                PersistentDispatcherMultipleConsumers 
dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
-                                        .getDispatcher();
-                                int dispatcherUnAckMsgs = 
dispatcher.getTotalUnackedMessages();
-                                if (dispatcherUnAckMsgs > 
maxUnackedMsgsPerDispatcher) {
-                                    log.info("[{}] Blocking dispatcher due to 
reached max broker limit {}",
-                                            dispatcher.getName(), 
dispatcher.getTotalUnackedMessages());
-                                    dispatcher.blockDispatcherOnUnackedMsgs();
-                                    blockedDispatchers.add(dispatcher);
-                                }
+                Optional<Topic> topic = extractTopic(topicFuture);
+                if (topic.isPresent()) {
+                    topic.get().getSubscriptions().forEach((subName, 
persistentSubscription) -> {
+                        if (persistentSubscription.getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
+                            PersistentDispatcherMultipleConsumers dispatcher = 
(PersistentDispatcherMultipleConsumers) persistentSubscription
+                                    .getDispatcher();
+                            int dispatcherUnAckMsgs = 
dispatcher.getTotalUnackedMessages();
+                            if (dispatcherUnAckMsgs > 
maxUnackedMsgsPerDispatcher) {
+                                log.info("[{}] Blocking dispatcher due to 
reached max broker limit {}",
+                                        dispatcher.getName(), 
dispatcher.getTotalUnackedMessages());
+                                dispatcher.blockDispatcherOnUnackedMsgs();
+                                blockedDispatchers.add(dispatcher);
                             }
-                        });
-                    } catch (Exception e) {
-                        log.warn("Failed to get topic from future ", e);
-                    }
+                        }
+                    });
                 }
             });
         } finally {
@@ -1496,4 +1488,15 @@ public ConfigField(Field field) {
             this.field = field;
         }
     }
+
+    /**
+     * Safely extract optional topic instance from a future, in a way to avoid 
unchecked exceptions and race conditions.
+     */
+    public static Optional<Topic> 
extractTopic(CompletableFuture<Optional<Topic>> topicFuture) {
+        if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) {
+            return topicFuture.join();
+        } else {
+            return Optional.empty();
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
index c2ffc5341..914a25c7e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
@@ -19,9 +19,12 @@
 package org.apache.pulsar.broker.stats;
 
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.TopicName;
 
@@ -47,8 +50,9 @@ public BookieClientStatsGenerator(PulsarService pulsar) {
     private Map<String, Map<String, PendingBookieOpsStats>> generate() throws 
Exception {
         if (pulsar.getBrokerService() != null && 
pulsar.getBrokerService().getTopics() != null) {
             pulsar.getBrokerService().getTopics().forEach((name, topicFuture) 
-> {
-                PersistentTopic persistentTopic = (PersistentTopic) 
topicFuture.getNow(null);
-                if (persistentTopic != null) {
+                Optional<Topic> topic = 
BrokerService.extractTopic(topicFuture);
+                if (topic.isPresent() && topic.get() instanceof 
PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) 
topic.get();
                     TopicName topicName = 
TopicName.get(persistentTopic.getName());
                     put(topicName, 
persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats());
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 01e388a33..a2ef39726 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -32,11 +32,16 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -75,10 +80,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.hash.Hashing;
-
 public class NamespaceServiceTest extends BrokerTestBase {
 
     @BeforeMethod
@@ -260,10 +261,11 @@ public void testUnloadNamespaceBundleFailure() throws 
Exception {
         final String topicName = 
"persistent://my-property/use/my-ns/my-topic1";
         
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
 
-        ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = 
pulsar.getBrokerService().getTopics();
-        Topic spyTopic = spy(topics.get(topicName).get());
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics = pulsar.getBrokerService()
+                .getTopics();
+        Topic spyTopic = spy(topics.get(topicName).get().get());
         topics.clear();
-        CompletableFuture<Topic> topicFuture = 
CompletableFuture.completedFuture(spyTopic);
+        CompletableFuture<Optional<Topic>> topicFuture = 
CompletableFuture.completedFuture(Optional.of(spyTopic));
         // add mock topic
         topics.put(topicName, topicFuture);
         doAnswer(new Answer<CompletableFuture<Void>>() {
@@ -300,10 +302,10 @@ public void testUnloadNamespaceBundleWithStuckTopic() 
throws Exception {
         final String topicName = 
"persistent://my-property/use/my-ns/my-topic1";
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
                 .subscribe();
-        ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = 
pulsar.getBrokerService().getTopics();
-        Topic spyTopic = spy(topics.get(topicName).get());
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics = pulsar.getBrokerService().getTopics();
+        Topic spyTopic = spy(topics.get(topicName).get().get());
         topics.clear();
-        CompletableFuture<Topic> topicFuture = 
CompletableFuture.completedFuture(spyTopic);
+        CompletableFuture<Optional<Topic>> topicFuture = 
CompletableFuture.completedFuture(Optional.of(spyTopic));
         // add mock topic
         topics.put(topicName, topicFuture);
         // return uncompleted future as close-topic result.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 0918bc7c9..31773bbd6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -746,7 +746,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() 
throws Exception {
         
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, 
false);
 
         // try to create topic which should fail as bundle is disable
-        CompletableFuture<Topic> futureResult = 
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
+        CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
 
         try {
             futureResult.get();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to