This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new eaae9ea  Refactored BrokerService to use Optional in topics map (#1652)
eaae9ea is described below

commit eaae9eafd059fba76200593f9a021312c2aa3d44
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 26 13:37:05 2018 -0700

    Refactored BrokerService to use Optional in topics map (#1652)
---
 .../pulsar/broker/service/BrokerService.java       | 175 +++++++++++----------
 .../broker/stats/BookieClientStatsGenerator.java   |   8 +-
 .../broker/namespace/NamespaceServiceTest.java     |  22 +--
 .../pulsar/broker/service/BrokerServiceTest.java   |   2 +-
 4 files changed, 108 insertions(+), 99 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f47796d..683bb03 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -137,7 +137,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     private final int port;
     private final int tlsPort;
 
-    private final ConcurrentOpenHashMap<String, CompletableFuture<Topic>> 
topics;
+    private final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics;
 
     private final ConcurrentOpenHashMap<String, PulsarClient> 
replicationClients;
 
@@ -154,7 +154,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     private static final ConcurrentOpenHashMap<String, ConfigField> 
dynamicConfigurationMap = prepareDynamicConfigurationMap();
     private final ConcurrentOpenHashMap<String, Consumer<?>> 
configRegisteredListeners;
 
-    private final ConcurrentLinkedQueue<Pair<String, 
CompletableFuture<Topic>>> pendingTopicLoadingQueue;
+    private final ConcurrentLinkedQueue<Pair<String, 
CompletableFuture<Optional<Topic>>>> pendingTopicLoadingQueue;
 
     private AuthorizationService authorizationService = null;
     private final ScheduledExecutorService statsUpdater;
@@ -422,19 +422,19 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     }
 
     public CompletableFuture<Optional<Topic>> getTopicIfExists(final String 
topic) {
-        return getTopic(topic, false /* createIfMissing */ ).thenApply(t -> 
Optional.ofNullable(t));
+        return getTopic(topic, false /* createIfMissing */ );
     }
 
     public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
-        return getTopic(topic, true /* createIfMissing */ );
+        return getTopic(topic, true /* createIfMissing */ 
).thenApply(Optional::get);
     }
 
-    private CompletableFuture<Topic> getTopic(final String topic, boolean 
createIfMissing) {
+    private CompletableFuture<Optional<Topic>> getTopic(final String topic, 
boolean createIfMissing) {
         try {
-            CompletableFuture<Topic> topicFuture = topics.get(topic);
+            CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
             if (topicFuture != null) {
                 if (topicFuture.isCompletedExceptionally()
-                        || (topicFuture.isDone() && topicFuture.getNow(null) 
== null)) {
+                        || (topicFuture.isDone() && 
!topicFuture.getNow(Optional.empty()).isPresent())) {
                     // Exceptional topics should be recreated.
                     topics.remove(topic, topicFuture);
                 } else {
@@ -461,8 +461,8 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         }
     }
 
-    private CompletableFuture<Topic> createNonPersistentTopic(String topic) {
-        CompletableFuture<Topic> topicFuture = new CompletableFuture<Topic>();
+    private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String 
topic) {
+        CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
 
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
             if (log.isDebugEnabled()) {
@@ -480,7 +480,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             long topicLoadLatencyMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
             pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
             addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            topicFuture.complete(nonPersistentTopic);
+            topicFuture.complete(Optional.of(nonPersistentTopic));
         });
         replicationFuture.exceptionally((ex) -> {
             log.warn("Replication check failed. Removing topic from topics 
list {}, {}", topic, ex);
@@ -549,10 +549,10 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
      * @return CompletableFuture<Topic>
      * @throws RuntimeException
      */
-    protected CompletableFuture<Topic> loadOrCreatePersistentTopic(final 
String topic, boolean createIfMissing) throws RuntimeException {
+    protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws 
RuntimeException {
         checkTopicNsOwnership(topic);
 
-        final CompletableFuture<Topic> topicFuture = new CompletableFuture<>();
+        final CompletableFuture<Optional<Topic>> topicFuture = new 
CompletableFuture<>();
         if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
             if (log.isDebugEnabled()) {
                 log.debug("Broker is unable to load persistent topic {}", 
topic);
@@ -572,7 +572,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 return null;
             });
         } else {
-            pendingTopicLoadingQueue.add(new ImmutablePair<String, 
CompletableFuture<Topic>>(topic, topicFuture));
+            pendingTopicLoadingQueue.add(new ImmutablePair<String, 
CompletableFuture<Optional<Topic>>>(topic, topicFuture));
             if (log.isDebugEnabled()) {
                 log.debug("topic-loading for {} added into pending queue", 
topic);
             }
@@ -580,7 +580,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         return topicFuture;
     }
 
-    private void createPersistentTopic(final String topic, boolean 
createIfMissing, CompletableFuture<Topic> topicFuture) {
+    private void createPersistentTopic(final String topic, boolean 
createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
 
         final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         TopicName topicName = TopicName.get(topic);
@@ -615,7 +615,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                                             - topicCreateTimeMs;
                                     
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                                     addTopicToStatsMaps(topicName, 
persistentTopic);
-                                    topicFuture.complete(persistentTopic);
+                                    
topicFuture.complete(Optional.of(persistentTopic));
                                 }).exceptionally((ex) -> {
                                     log.warn(
                                             "Replication or dedup check 
failed. Removing topic from topics list {}, {}",
@@ -638,7 +638,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                         public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
                             if (!createIfMissing && exception instanceof 
ManagedLedgerNotFoundException) {
                                 // We were just trying to load a topic and the 
topic doesn't exist
-                                topicFuture.complete(null);
+                                topicFuture.complete(Optional.empty());
                             } else {
                                 log.warn("Failed to create topic {}", topic, 
exception);
                                 pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
@@ -782,9 +782,9 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
      * This method will not make the broker attempt to load the topic if it's 
not already.
      */
     public Optional<Topic> getTopicReference(String topic) {
-          CompletableFuture<Topic> future = topics.get(topic);
+          CompletableFuture<Optional<Topic>> future = topics.get(topic);
         if (future != null && future.isDone() && 
!future.isCompletedExceptionally()) {
-            return Optional.ofNullable(future.join());
+            return future.join();
         } else {
             return Optional.empty();
         }
@@ -814,27 +814,27 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     public void checkGC(int gcIntervalInSeconds) {
         topics.forEach((n, t) -> {
-            Topic topic = t.isCompletedExceptionally() ? null : t.getNow(null);
-            if (topic != null) {
-                topic.checkGC(gcIntervalInSeconds);
+            Optional<Topic> topic = extractTopic(t);
+            if (topic.isPresent()) {
+                topic.get().checkGC(gcIntervalInSeconds);
             }
         });
     }
 
     public void checkMessageExpiry() {
         topics.forEach((n, t) -> {
-            Topic topic = t.getNow(null);
-            if (topic != null) {
-                topic.checkMessageExpiry();
+            Optional<Topic> topic = extractTopic(t);
+            if (topic.isPresent()) {
+                topic.get().checkMessageExpiry();
             }
         });
     }
 
     public void checkMessageDeduplicationInfo() {
         topics.forEach((n, t) -> {
-            Topic topic = t.getNow(null);
-            if (topic != null) {
-                topic.checkMessageDeduplicationInfo();
+            Optional<Topic> topic = extractTopic(t);
+            if (topic.isPresent()) {
+                topic.get().checkMessageDeduplicationInfo();
             }
         });
     }
@@ -868,8 +868,9 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     public void monitorBacklogQuota() {
         topics.forEach((n, t) -> {
             try {
-                if (t.getNow(null) != null && t.getNow(null) instanceof 
PersistentTopic) {
-                    PersistentTopic topic = (PersistentTopic) t.getNow(null);
+                Optional<Topic> optionalTopic = extractTopic(t);
+                if (optionalTopic.isPresent() && optionalTopic.get() 
instanceof PersistentTopic) {
+                    PersistentTopic topic = (PersistentTopic) 
optionalTopic.get();
                     if (isBacklogExceeded(topic)) {
                         
getBacklogQuotaManager().handleExceededBacklogQuota(topic);
                     } else if (topic == null) {
@@ -921,7 +922,8 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             if (serviceUnit.includes(topicName)) {
                 // Topic needs to be unloaded
                 log.info("[{}] Unloading topic", topicName);
-                closeFutures.add(topicFuture.thenCompose(Topic::close));
+                closeFutures.add(topicFuture
+                        .thenCompose(t -> t.isPresent() ? t.get().close() : 
CompletableFuture.completedFuture(null)));
             }
         });
         CompletableFuture<Void> aggregator = 
FutureUtil.waitForAll(closeFutures);
@@ -978,7 +980,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         return this.numberOfNamespaceBundles;
     }
 
-    public ConcurrentOpenHashMap<String, CompletableFuture<Topic>> getTopics() 
{
+    public ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
getTopics() {
         return topics;
     }
 
@@ -996,7 +998,8 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                     if (log.isDebugEnabled()) {
                         log.debug("Notifying topic that policies have changed: 
{}", name);
                     }
-                    topic.onPoliciesUpdate(data);
+
+                    topic.ifPresent(t -> t.onPoliciesUpdate(data));
                 });
             }
         });
@@ -1033,9 +1036,9 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     public Map<String, PersistentTopicStats> getTopicStats() {
         HashMap<String, PersistentTopicStats> stats = new HashMap<>();
         topics.forEach((name, topicFuture) -> {
-            Topic currentTopic = topicFuture.getNow(null);
-            if (currentTopic != null) {
-                stats.put(name, currentTopic.getStats());
+            Optional<Topic> topic = extractTopic(topicFuture);
+            if (topic.isPresent()) {
+                stats.put(name, topic.get().getStats());
             }
         });
         return stats;
@@ -1128,18 +1131,12 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
             topics.forEach((name, topicFuture) -> {
-                if (topicFuture.isDone()) {
-                    String topicName = null;
-                    try {
-                        if (topicFuture.get() instanceof PersistentTopic) {
-                            PersistentTopic topic = (PersistentTopic) 
topicFuture.get();
-                            topicName = topicFuture.get().getName();
-                            // it first checks namespace-policy rate and if 
not present then applies broker-config
-                            
topic.getDispatchRateLimiter().updateDispatchRate();
-                        }
-                    } catch (Exception e) {
-                        log.warn("[{}] failed to update message-dispatch rate 
{}", topicName, e.getMessage());
-                    }
+                Optional<Topic> topic = extractTopic(topicFuture);
+
+                if (topic.isPresent() && topic.get() instanceof 
PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) 
topic.get();
+                    // it first checks namespace-policy rate and if not 
present then applies broker-config
+                    
persistentTopic.getDispatchRateLimiter().updateDispatchRate();
                 }
             });
         });
@@ -1149,22 +1146,19 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         this.pulsar().getExecutor().submit(() -> {
             // update message-rate for each topic subscription
             topics.forEach((name, topicFuture) -> {
-                if (topicFuture.isDone()) {
-                    try {
-                        topicFuture.get().getSubscriptions().forEach((subName, 
persistentSubscription) -> {
-                            if (persistentSubscription
-                                .getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
-                                ((PersistentDispatcherMultipleConsumers) 
persistentSubscription
-                                    
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
-                            } else if (persistentSubscription
+                Optional<Topic> topic = extractTopic(topicFuture);
+
+                if (topic.isPresent()) {
+                    topic.get().getSubscriptions().forEach((subName, 
persistentSubscription) -> {
+                        if (persistentSubscription.getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
+                            ((PersistentDispatcherMultipleConsumers) 
persistentSubscription.getDispatcher())
+                                    
.getDispatchRateLimiter().updateDispatchRate();
+                        } else if (persistentSubscription
                                 .getDispatcher() instanceof 
PersistentDispatcherSingleActiveConsumer) {
-                                ((PersistentDispatcherSingleActiveConsumer) 
persistentSubscription
-                                    
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
-                            }
-                        });
-                    } catch (Exception e) {
-                        log.warn("Failed to get topic from future while update 
subscription dispatch rate ", e);
-                    }
+                            ((PersistentDispatcherSingleActiveConsumer) 
persistentSubscription.getDispatcher())
+                                    
.getDispatchRateLimiter().updateDispatchRate();
+                        }
+                    });
                 }
             });
         });
@@ -1177,11 +1171,13 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 if (topicFuture.isDone()) {
                     String topicName = null;
                     try {
-                        if (topicFuture.getNow(null) instanceof 
PersistentTopic) {
-                            PersistentTopic topic = (PersistentTopic) 
topicFuture.get();
-                            topicName = topicFuture.get().getName();
+                        Optional<Topic> topic = extractTopic(topicFuture);
+
+                        if (topic.isPresent() && topic.get() instanceof 
PersistentTopic) {
+                            PersistentTopic persistentTopic = 
(PersistentTopic) topic.get();
+                            topicName = persistentTopic.getName();
                             // update skipNonRecoverableLedger configuration
-                            
topic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
+                            
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
                                     
pulsar.getConfiguration().isAutoSkipNonRecoverableData());
                         }
                     } catch (Exception e) {
@@ -1340,7 +1336,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
      * permit if it was successful to acquire it.
      */
     private void createPendingLoadTopic() {
-        Pair<String, CompletableFuture<Topic>> pendingTopic = 
pendingTopicLoadingQueue.poll();
+        Pair<String, CompletableFuture<Optional<Topic>>> pendingTopic = 
pendingTopicLoadingQueue.poll();
         if (pendingTopic == null) {
             return;
         }
@@ -1348,7 +1344,7 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         final String topic = pendingTopic.getLeft();
         try {
             checkTopicNsOwnership(topic);
-            CompletableFuture<Topic> pendingFuture = pendingTopic.getRight();
+            CompletableFuture<Optional<Topic>> pendingFuture = 
pendingTopic.getRight();
             final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
             final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
             createPersistentTopic(topic, true, pendingFuture);
@@ -1442,25 +1438,21 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
         lock.readLock().lock();
         try {
             topics.forEach((name, topicFuture) -> {
-                if (topicFuture.isDone()) {
-                    try {
-                        topicFuture.get().getSubscriptions().forEach((subName, 
persistentSubscription) -> {
-                            if (persistentSubscription
-                                    .getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
-                                PersistentDispatcherMultipleConsumers 
dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
-                                        .getDispatcher();
-                                int dispatcherUnAckMsgs = 
dispatcher.getTotalUnackedMessages();
-                                if (dispatcherUnAckMsgs > 
maxUnackedMsgsPerDispatcher) {
-                                    log.info("[{}] Blocking dispatcher due to 
reached max broker limit {}",
-                                            dispatcher.getName(), 
dispatcher.getTotalUnackedMessages());
-                                    dispatcher.blockDispatcherOnUnackedMsgs();
-                                    blockedDispatchers.add(dispatcher);
-                                }
+                Optional<Topic> topic = extractTopic(topicFuture);
+                if (topic.isPresent()) {
+                    topic.get().getSubscriptions().forEach((subName, 
persistentSubscription) -> {
+                        if (persistentSubscription.getDispatcher() instanceof 
PersistentDispatcherMultipleConsumers) {
+                            PersistentDispatcherMultipleConsumers dispatcher = 
(PersistentDispatcherMultipleConsumers) persistentSubscription
+                                    .getDispatcher();
+                            int dispatcherUnAckMsgs = 
dispatcher.getTotalUnackedMessages();
+                            if (dispatcherUnAckMsgs > 
maxUnackedMsgsPerDispatcher) {
+                                log.info("[{}] Blocking dispatcher due to 
reached max broker limit {}",
+                                        dispatcher.getName(), 
dispatcher.getTotalUnackedMessages());
+                                dispatcher.blockDispatcherOnUnackedMsgs();
+                                blockedDispatchers.add(dispatcher);
                             }
-                        });
-                    } catch (Exception e) {
-                        log.warn("Failed to get topic from future ", e);
-                    }
+                        }
+                    });
                 }
             });
         } finally {
@@ -1496,4 +1488,15 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             this.field = field;
         }
     }
+
+    /**
+     * Safely extract optional topic instance from a future, in a way to avoid 
unchecked exceptions and race conditions.
+     */
+    public static Optional<Topic> 
extractTopic(CompletableFuture<Optional<Topic>> topicFuture) {
+        if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) {
+            return topicFuture.join();
+        } else {
+            return Optional.empty();
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
index c2ffc53..914a25c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
@@ -19,9 +19,12 @@
 package org.apache.pulsar.broker.stats;
 
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.naming.TopicName;
 
@@ -47,8 +50,9 @@ public class BookieClientStatsGenerator {
     private Map<String, Map<String, PendingBookieOpsStats>> generate() throws 
Exception {
         if (pulsar.getBrokerService() != null && 
pulsar.getBrokerService().getTopics() != null) {
             pulsar.getBrokerService().getTopics().forEach((name, topicFuture) 
-> {
-                PersistentTopic persistentTopic = (PersistentTopic) 
topicFuture.getNow(null);
-                if (persistentTopic != null) {
+                Optional<Topic> topic = 
BrokerService.extractTopic(topicFuture);
+                if (topic.isPresent() && topic.get() instanceof 
PersistentTopic) {
+                    PersistentTopic persistentTopic = (PersistentTopic) 
topic.get();
                     TopicName topicName = 
TopicName.get(persistentTopic.getName());
                     put(topicName, 
persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats());
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 01e388a..a2ef397 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -32,11 +32,16 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -75,10 +80,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.hash.Hashing;
-
 public class NamespaceServiceTest extends BrokerTestBase {
 
     @BeforeMethod
@@ -260,10 +261,11 @@ public class NamespaceServiceTest extends BrokerTestBase {
         final String topicName = 
"persistent://my-property/use/my-ns/my-topic1";
         
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
 
-        ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = 
pulsar.getBrokerService().getTopics();
-        Topic spyTopic = spy(topics.get(topicName).get());
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics = pulsar.getBrokerService()
+                .getTopics();
+        Topic spyTopic = spy(topics.get(topicName).get().get());
         topics.clear();
-        CompletableFuture<Topic> topicFuture = 
CompletableFuture.completedFuture(spyTopic);
+        CompletableFuture<Optional<Topic>> topicFuture = 
CompletableFuture.completedFuture(Optional.of(spyTopic));
         // add mock topic
         topics.put(topicName, topicFuture);
         doAnswer(new Answer<CompletableFuture<Void>>() {
@@ -300,10 +302,10 @@ public class NamespaceServiceTest extends BrokerTestBase {
         final String topicName = 
"persistent://my-property/use/my-ns/my-topic1";
         Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
                 .subscribe();
-        ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = 
pulsar.getBrokerService().getTopics();
-        Topic spyTopic = spy(topics.get(topicName).get());
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics = pulsar.getBrokerService().getTopics();
+        Topic spyTopic = spy(topics.get(topicName).get().get());
         topics.clear();
-        CompletableFuture<Topic> topicFuture = 
CompletableFuture.completedFuture(spyTopic);
+        CompletableFuture<Optional<Topic>> topicFuture = 
CompletableFuture.completedFuture(Optional.of(spyTopic));
         // add mock topic
         topics.put(topicName, topicFuture);
         // return uncompleted future as close-topic result.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 0918bc7..31773bb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -746,7 +746,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, 
false);
 
         // try to create topic which should fail as bundle is disable
-        CompletableFuture<Topic> futureResult = 
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
+        CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
 
         try {
             futureResult.get();

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to