This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eaae9ea Refactored BrokerService to use Optional in topics map (#1652)
eaae9ea is described below
commit eaae9eafd059fba76200593f9a021312c2aa3d44
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 26 13:37:05 2018 -0700
Refactored BrokerService to use Optional in topics map (#1652)
---
.../pulsar/broker/service/BrokerService.java | 175 +++++++++++----------
.../broker/stats/BookieClientStatsGenerator.java | 8 +-
.../broker/namespace/NamespaceServiceTest.java | 22 +--
.../pulsar/broker/service/BrokerServiceTest.java | 2 +-
4 files changed, 108 insertions(+), 99 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f47796d..683bb03 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -137,7 +137,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private final int port;
private final int tlsPort;
- private final ConcurrentOpenHashMap<String, CompletableFuture<Topic>>
topics;
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>> topics;
private final ConcurrentOpenHashMap<String, PulsarClient>
replicationClients;
@@ -154,7 +154,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private static final ConcurrentOpenHashMap<String, ConfigField>
dynamicConfigurationMap = prepareDynamicConfigurationMap();
private final ConcurrentOpenHashMap<String, Consumer<?>>
configRegisteredListeners;
- private final ConcurrentLinkedQueue<Pair<String,
CompletableFuture<Topic>>> pendingTopicLoadingQueue;
+ private final ConcurrentLinkedQueue<Pair<String,
CompletableFuture<Optional<Topic>>>> pendingTopicLoadingQueue;
private AuthorizationService authorizationService = null;
private final ScheduledExecutorService statsUpdater;
@@ -422,19 +422,19 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
public CompletableFuture<Optional<Topic>> getTopicIfExists(final String
topic) {
- return getTopic(topic, false /* createIfMissing */ ).thenApply(t ->
Optional.ofNullable(t));
+ return getTopic(topic, false /* createIfMissing */ );
}
public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
- return getTopic(topic, true /* createIfMissing */ );
+ return getTopic(topic, true /* createIfMissing */
).thenApply(Optional::get);
}
- private CompletableFuture<Topic> getTopic(final String topic, boolean
createIfMissing) {
+ private CompletableFuture<Optional<Topic>> getTopic(final String topic,
boolean createIfMissing) {
try {
- CompletableFuture<Topic> topicFuture = topics.get(topic);
+ CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
- || (topicFuture.isDone() && topicFuture.getNow(null)
== null)) {
+ || (topicFuture.isDone() &&
!topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
topics.remove(topic, topicFuture);
} else {
@@ -461,8 +461,8 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
}
- private CompletableFuture<Topic> createNonPersistentTopic(String topic) {
- CompletableFuture<Topic> topicFuture = new CompletableFuture<Topic>();
+ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic) {
+ CompletableFuture<Optional<Topic>> topicFuture = new
CompletableFuture<>();
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
@@ -480,7 +480,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
long topicLoadLatencyMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
- topicFuture.complete(nonPersistentTopic);
+ topicFuture.complete(Optional.of(nonPersistentTopic));
});
replicationFuture.exceptionally((ex) -> {
log.warn("Replication check failed. Removing topic from topics
list {}, {}", topic, ex);
@@ -549,10 +549,10 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
* @return CompletableFuture<Topic>
* @throws RuntimeException
*/
- protected CompletableFuture<Topic> loadOrCreatePersistentTopic(final
String topic, boolean createIfMissing) throws RuntimeException {
+ protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws
RuntimeException {
checkTopicNsOwnership(topic);
- final CompletableFuture<Topic> topicFuture = new CompletableFuture<>();
+ final CompletableFuture<Optional<Topic>> topicFuture = new
CompletableFuture<>();
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}",
topic);
@@ -572,7 +572,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
return null;
});
} else {
- pendingTopicLoadingQueue.add(new ImmutablePair<String,
CompletableFuture<Topic>>(topic, topicFuture));
+ pendingTopicLoadingQueue.add(new ImmutablePair<String,
CompletableFuture<Optional<Topic>>>(topic, topicFuture));
if (log.isDebugEnabled()) {
log.debug("topic-loading for {} added into pending queue",
topic);
}
@@ -580,7 +580,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
return topicFuture;
}
- private void createPersistentTopic(final String topic, boolean
createIfMissing, CompletableFuture<Topic> topicFuture) {
+ private void createPersistentTopic(final String topic, boolean
createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
final long topicCreateTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
TopicName topicName = TopicName.get(topic);
@@ -615,7 +615,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(topicName,
persistentTopic);
- topicFuture.complete(persistentTopic);
+
topicFuture.complete(Optional.of(persistentTopic));
}).exceptionally((ex) -> {
log.warn(
"Replication or dedup check
failed. Removing topic from topics list {}, {}",
@@ -638,7 +638,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
if (!createIfMissing && exception instanceof
ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the
topic doesn't exist
- topicFuture.complete(null);
+ topicFuture.complete(Optional.empty());
} else {
log.warn("Failed to create topic {}", topic,
exception);
pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
@@ -782,9 +782,9 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
* This method will not make the broker attempt to load the topic if it's
not already.
*/
public Optional<Topic> getTopicReference(String topic) {
- CompletableFuture<Topic> future = topics.get(topic);
+ CompletableFuture<Optional<Topic>> future = topics.get(topic);
if (future != null && future.isDone() &&
!future.isCompletedExceptionally()) {
- return Optional.ofNullable(future.join());
+ return future.join();
} else {
return Optional.empty();
}
@@ -814,27 +814,27 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
public void checkGC(int gcIntervalInSeconds) {
topics.forEach((n, t) -> {
- Topic topic = t.isCompletedExceptionally() ? null : t.getNow(null);
- if (topic != null) {
- topic.checkGC(gcIntervalInSeconds);
+ Optional<Topic> topic = extractTopic(t);
+ if (topic.isPresent()) {
+ topic.get().checkGC(gcIntervalInSeconds);
}
});
}
public void checkMessageExpiry() {
topics.forEach((n, t) -> {
- Topic topic = t.getNow(null);
- if (topic != null) {
- topic.checkMessageExpiry();
+ Optional<Topic> topic = extractTopic(t);
+ if (topic.isPresent()) {
+ topic.get().checkMessageExpiry();
}
});
}
public void checkMessageDeduplicationInfo() {
topics.forEach((n, t) -> {
- Topic topic = t.getNow(null);
- if (topic != null) {
- topic.checkMessageDeduplicationInfo();
+ Optional<Topic> topic = extractTopic(t);
+ if (topic.isPresent()) {
+ topic.get().checkMessageDeduplicationInfo();
}
});
}
@@ -868,8 +868,9 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
public void monitorBacklogQuota() {
topics.forEach((n, t) -> {
try {
- if (t.getNow(null) != null && t.getNow(null) instanceof
PersistentTopic) {
- PersistentTopic topic = (PersistentTopic) t.getNow(null);
+ Optional<Topic> optionalTopic = extractTopic(t);
+ if (optionalTopic.isPresent() && optionalTopic.get()
instanceof PersistentTopic) {
+ PersistentTopic topic = (PersistentTopic)
optionalTopic.get();
if (isBacklogExceeded(topic)) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic);
} else if (topic == null) {
@@ -921,7 +922,8 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
if (serviceUnit.includes(topicName)) {
// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
- closeFutures.add(topicFuture.thenCompose(Topic::close));
+ closeFutures.add(topicFuture
+ .thenCompose(t -> t.isPresent() ? t.get().close() :
CompletableFuture.completedFuture(null)));
}
});
CompletableFuture<Void> aggregator =
FutureUtil.waitForAll(closeFutures);
@@ -978,7 +980,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
return this.numberOfNamespaceBundles;
}
- public ConcurrentOpenHashMap<String, CompletableFuture<Topic>> getTopics()
{
+ public ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
getTopics() {
return topics;
}
@@ -996,7 +998,8 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
if (log.isDebugEnabled()) {
log.debug("Notifying topic that policies have changed:
{}", name);
}
- topic.onPoliciesUpdate(data);
+
+ topic.ifPresent(t -> t.onPoliciesUpdate(data));
});
}
});
@@ -1033,9 +1036,9 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
public Map<String, PersistentTopicStats> getTopicStats() {
HashMap<String, PersistentTopicStats> stats = new HashMap<>();
topics.forEach((name, topicFuture) -> {
- Topic currentTopic = topicFuture.getNow(null);
- if (currentTopic != null) {
- stats.put(name, currentTopic.getStats());
+ Optional<Topic> topic = extractTopic(topicFuture);
+ if (topic.isPresent()) {
+ stats.put(name, topic.get().getStats());
}
});
return stats;
@@ -1128,18 +1131,12 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
topics.forEach((name, topicFuture) -> {
- if (topicFuture.isDone()) {
- String topicName = null;
- try {
- if (topicFuture.get() instanceof PersistentTopic) {
- PersistentTopic topic = (PersistentTopic)
topicFuture.get();
- topicName = topicFuture.get().getName();
- // it first checks namespace-policy rate and if
not present then applies broker-config
-
topic.getDispatchRateLimiter().updateDispatchRate();
- }
- } catch (Exception e) {
- log.warn("[{}] failed to update message-dispatch rate
{}", topicName, e.getMessage());
- }
+ Optional<Topic> topic = extractTopic(topicFuture);
+
+ if (topic.isPresent() && topic.get() instanceof
PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic)
topic.get();
+ // it first checks namespace-policy rate and if not
present then applies broker-config
+
persistentTopic.getDispatchRateLimiter().updateDispatchRate();
}
});
});
@@ -1149,22 +1146,19 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic subscription
topics.forEach((name, topicFuture) -> {
- if (topicFuture.isDone()) {
- try {
- topicFuture.get().getSubscriptions().forEach((subName,
persistentSubscription) -> {
- if (persistentSubscription
- .getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
- ((PersistentDispatcherMultipleConsumers)
persistentSubscription
-
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
- } else if (persistentSubscription
+ Optional<Topic> topic = extractTopic(topicFuture);
+
+ if (topic.isPresent()) {
+ topic.get().getSubscriptions().forEach((subName,
persistentSubscription) -> {
+ if (persistentSubscription.getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
+ ((PersistentDispatcherMultipleConsumers)
persistentSubscription.getDispatcher())
+
.getDispatchRateLimiter().updateDispatchRate();
+ } else if (persistentSubscription
.getDispatcher() instanceof
PersistentDispatcherSingleActiveConsumer) {
- ((PersistentDispatcherSingleActiveConsumer)
persistentSubscription
-
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
- }
- });
- } catch (Exception e) {
- log.warn("Failed to get topic from future while update
subscription dispatch rate ", e);
- }
+ ((PersistentDispatcherSingleActiveConsumer)
persistentSubscription.getDispatcher())
+
.getDispatchRateLimiter().updateDispatchRate();
+ }
+ });
}
});
});
@@ -1177,11 +1171,13 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
if (topicFuture.isDone()) {
String topicName = null;
try {
- if (topicFuture.getNow(null) instanceof
PersistentTopic) {
- PersistentTopic topic = (PersistentTopic)
topicFuture.get();
- topicName = topicFuture.get().getName();
+ Optional<Topic> topic = extractTopic(topicFuture);
+
+ if (topic.isPresent() && topic.get() instanceof
PersistentTopic) {
+ PersistentTopic persistentTopic =
(PersistentTopic) topic.get();
+ topicName = persistentTopic.getName();
// update skipNonRecoverableLedger configuration
-
topic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
+
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
pulsar.getConfiguration().isAutoSkipNonRecoverableData());
}
} catch (Exception e) {
@@ -1340,7 +1336,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
* permit if it was successful to acquire it.
*/
private void createPendingLoadTopic() {
- Pair<String, CompletableFuture<Topic>> pendingTopic =
pendingTopicLoadingQueue.poll();
+ Pair<String, CompletableFuture<Optional<Topic>>> pendingTopic =
pendingTopicLoadingQueue.poll();
if (pendingTopic == null) {
return;
}
@@ -1348,7 +1344,7 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
final String topic = pendingTopic.getLeft();
try {
checkTopicNsOwnership(topic);
- CompletableFuture<Topic> pendingFuture = pendingTopic.getRight();
+ CompletableFuture<Optional<Topic>> pendingFuture =
pendingTopic.getRight();
final Semaphore topicLoadSemaphore =
topicLoadRequestSemaphore.get();
final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
createPersistentTopic(topic, true, pendingFuture);
@@ -1442,25 +1438,21 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
lock.readLock().lock();
try {
topics.forEach((name, topicFuture) -> {
- if (topicFuture.isDone()) {
- try {
- topicFuture.get().getSubscriptions().forEach((subName,
persistentSubscription) -> {
- if (persistentSubscription
- .getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
- PersistentDispatcherMultipleConsumers
dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
- .getDispatcher();
- int dispatcherUnAckMsgs =
dispatcher.getTotalUnackedMessages();
- if (dispatcherUnAckMsgs >
maxUnackedMsgsPerDispatcher) {
- log.info("[{}] Blocking dispatcher due to
reached max broker limit {}",
- dispatcher.getName(),
dispatcher.getTotalUnackedMessages());
- dispatcher.blockDispatcherOnUnackedMsgs();
- blockedDispatchers.add(dispatcher);
- }
+ Optional<Topic> topic = extractTopic(topicFuture);
+ if (topic.isPresent()) {
+ topic.get().getSubscriptions().forEach((subName,
persistentSubscription) -> {
+ if (persistentSubscription.getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
+ PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) persistentSubscription
+ .getDispatcher();
+ int dispatcherUnAckMsgs =
dispatcher.getTotalUnackedMessages();
+ if (dispatcherUnAckMsgs >
maxUnackedMsgsPerDispatcher) {
+ log.info("[{}] Blocking dispatcher due to
reached max broker limit {}",
+ dispatcher.getName(),
dispatcher.getTotalUnackedMessages());
+ dispatcher.blockDispatcherOnUnackedMsgs();
+ blockedDispatchers.add(dispatcher);
}
- });
- } catch (Exception e) {
- log.warn("Failed to get topic from future ", e);
- }
+ }
+ });
}
});
} finally {
@@ -1496,4 +1488,15 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
this.field = field;
}
}
+
+ /**
+ * Safely extract optional topic instance from a future, in a way to avoid
unchecked exceptions and race conditions.
+ */
+ public static Optional<Topic>
extractTopic(CompletableFuture<Optional<Topic>> topicFuture) {
+ if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) {
+ return topicFuture.join();
+ } else {
+ return Optional.empty();
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
index c2ffc53..914a25c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
@@ -19,9 +19,12 @@
package org.apache.pulsar.broker.stats;
import java.util.Map;
+import java.util.Optional;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
@@ -47,8 +50,9 @@ public class BookieClientStatsGenerator {
private Map<String, Map<String, PendingBookieOpsStats>> generate() throws
Exception {
if (pulsar.getBrokerService() != null &&
pulsar.getBrokerService().getTopics() != null) {
pulsar.getBrokerService().getTopics().forEach((name, topicFuture)
-> {
- PersistentTopic persistentTopic = (PersistentTopic)
topicFuture.getNow(null);
- if (persistentTopic != null) {
+ Optional<Topic> topic =
BrokerService.extractTopic(topicFuture);
+ if (topic.isPresent() && topic.get() instanceof
PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic)
topic.get();
TopicName topicName =
TopicName.get(persistentTopic.getName());
put(topicName,
persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 01e388a..a2ef397 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -32,11 +32,16 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -75,10 +80,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.hash.Hashing;
-
public class NamespaceServiceTest extends BrokerTestBase {
@BeforeMethod
@@ -260,10 +261,11 @@ public class NamespaceServiceTest extends BrokerTestBase {
final String topicName =
"persistent://my-property/use/my-ns/my-topic1";
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
- ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics =
pulsar.getBrokerService().getTopics();
- Topic spyTopic = spy(topics.get(topicName).get());
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topics = pulsar.getBrokerService()
+ .getTopics();
+ Topic spyTopic = spy(topics.get(topicName).get().get());
topics.clear();
- CompletableFuture<Topic> topicFuture =
CompletableFuture.completedFuture(spyTopic);
+ CompletableFuture<Optional<Topic>> topicFuture =
CompletableFuture.completedFuture(Optional.of(spyTopic));
// add mock topic
topics.put(topicName, topicFuture);
doAnswer(new Answer<CompletableFuture<Void>>() {
@@ -300,10 +302,10 @@ public class NamespaceServiceTest extends BrokerTestBase {
final String topicName =
"persistent://my-property/use/my-ns/my-topic1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscribe();
- ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics =
pulsar.getBrokerService().getTopics();
- Topic spyTopic = spy(topics.get(topicName).get());
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topics = pulsar.getBrokerService().getTopics();
+ Topic spyTopic = spy(topics.get(topicName).get().get());
topics.clear();
- CompletableFuture<Topic> topicFuture =
CompletableFuture.completedFuture(spyTopic);
+ CompletableFuture<Optional<Topic>> topicFuture =
CompletableFuture.completedFuture(Optional.of(spyTopic));
// add mock topic
topics.put(topicName, topicFuture);
// return uncompleted future as close-topic result.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 0918bc7..31773bb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -746,7 +746,7 @@ public class BrokerServiceTest extends BrokerTestBase {
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle,
false);
// try to create topic which should fail as bundle is disable
- CompletableFuture<Topic> futureResult =
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
+ CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
try {
futureResult.get();
--
To stop receiving notification emails like this one, please contact
[email protected].