sijie closed pull request #1652: Refactored BrokerService to use Optional in
topics map
URL: https://github.com/apache/incubator-pulsar/pull/1652
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f47796d93..683bb0331 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -137,7 +137,7 @@
private final int port;
private final int tlsPort;
- private final ConcurrentOpenHashMap<String, CompletableFuture<Topic>>
topics;
+ private final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>> topics;
private final ConcurrentOpenHashMap<String, PulsarClient>
replicationClients;
@@ -154,7 +154,7 @@
private static final ConcurrentOpenHashMap<String, ConfigField>
dynamicConfigurationMap = prepareDynamicConfigurationMap();
private final ConcurrentOpenHashMap<String, Consumer<?>>
configRegisteredListeners;
- private final ConcurrentLinkedQueue<Pair<String,
CompletableFuture<Topic>>> pendingTopicLoadingQueue;
+ private final ConcurrentLinkedQueue<Pair<String,
CompletableFuture<Optional<Topic>>>> pendingTopicLoadingQueue;
private AuthorizationService authorizationService = null;
private final ScheduledExecutorService statsUpdater;
@@ -422,19 +422,19 @@ public void unloadNamespaceBundlesGracefully() {
}
public CompletableFuture<Optional<Topic>> getTopicIfExists(final String
topic) {
- return getTopic(topic, false /* createIfMissing */ ).thenApply(t ->
Optional.ofNullable(t));
+ return getTopic(topic, false /* createIfMissing */ );
}
public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
- return getTopic(topic, true /* createIfMissing */ );
+ return getTopic(topic, true /* createIfMissing */
).thenApply(Optional::get);
}
- private CompletableFuture<Topic> getTopic(final String topic, boolean
createIfMissing) {
+ private CompletableFuture<Optional<Topic>> getTopic(final String topic,
boolean createIfMissing) {
try {
- CompletableFuture<Topic> topicFuture = topics.get(topic);
+ CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic);
if (topicFuture != null) {
if (topicFuture.isCompletedExceptionally()
- || (topicFuture.isDone() && topicFuture.getNow(null)
== null)) {
+ || (topicFuture.isDone() &&
!topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
topics.remove(topic, topicFuture);
} else {
@@ -461,8 +461,8 @@ public void unloadNamespaceBundlesGracefully() {
}
}
- private CompletableFuture<Topic> createNonPersistentTopic(String topic) {
- CompletableFuture<Topic> topicFuture = new CompletableFuture<Topic>();
+ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String
topic) {
+ CompletableFuture<Optional<Topic>> topicFuture = new
CompletableFuture<>();
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
@@ -480,7 +480,7 @@ public void unloadNamespaceBundlesGracefully() {
long topicLoadLatencyMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
- topicFuture.complete(nonPersistentTopic);
+ topicFuture.complete(Optional.of(nonPersistentTopic));
});
replicationFuture.exceptionally((ex) -> {
log.warn("Replication check failed. Removing topic from topics
list {}, {}", topic, ex);
@@ -549,10 +549,10 @@ public PulsarClient getReplicationClient(String cluster) {
* @return CompletableFuture<Topic>
* @throws RuntimeException
*/
- protected CompletableFuture<Topic> loadOrCreatePersistentTopic(final
String topic, boolean createIfMissing) throws RuntimeException {
+ protected CompletableFuture<Optional<Topic>>
loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws
RuntimeException {
checkTopicNsOwnership(topic);
- final CompletableFuture<Topic> topicFuture = new CompletableFuture<>();
+ final CompletableFuture<Optional<Topic>> topicFuture = new
CompletableFuture<>();
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}",
topic);
@@ -572,7 +572,7 @@ public PulsarClient getReplicationClient(String cluster) {
return null;
});
} else {
- pendingTopicLoadingQueue.add(new ImmutablePair<String,
CompletableFuture<Topic>>(topic, topicFuture));
+ pendingTopicLoadingQueue.add(new ImmutablePair<String,
CompletableFuture<Optional<Topic>>>(topic, topicFuture));
if (log.isDebugEnabled()) {
log.debug("topic-loading for {} added into pending queue",
topic);
}
@@ -580,7 +580,7 @@ public PulsarClient getReplicationClient(String cluster) {
return topicFuture;
}
- private void createPersistentTopic(final String topic, boolean
createIfMissing, CompletableFuture<Topic> topicFuture) {
+ private void createPersistentTopic(final String topic, boolean
createIfMissing, CompletableFuture<Optional<Topic>> topicFuture) {
final long topicCreateTimeMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
TopicName topicName = TopicName.get(topic);
@@ -615,7 +615,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object
ctx) {
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
addTopicToStatsMaps(topicName,
persistentTopic);
- topicFuture.complete(persistentTopic);
+
topicFuture.complete(Optional.of(persistentTopic));
}).exceptionally((ex) -> {
log.warn(
"Replication or dedup check
failed. Removing topic from topics list {}, {}",
@@ -638,7 +638,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object
ctx) {
public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
if (!createIfMissing && exception instanceof
ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the
topic doesn't exist
- topicFuture.complete(null);
+ topicFuture.complete(Optional.empty());
} else {
log.warn("Failed to create topic {}", topic,
exception);
pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
@@ -782,9 +782,9 @@ public void invalidateOfflineTopicStatCache(TopicName
topicName) {
* This method will not make the broker attempt to load the topic if it's
not already.
*/
public Optional<Topic> getTopicReference(String topic) {
- CompletableFuture<Topic> future = topics.get(topic);
+ CompletableFuture<Optional<Topic>> future = topics.get(topic);
if (future != null && future.isDone() &&
!future.isCompletedExceptionally()) {
- return Optional.ofNullable(future.join());
+ return future.join();
} else {
return Optional.empty();
}
@@ -814,27 +814,27 @@ public Semaphore getLookupRequestSemaphore() {
public void checkGC(int gcIntervalInSeconds) {
topics.forEach((n, t) -> {
- Topic topic = t.isCompletedExceptionally() ? null : t.getNow(null);
- if (topic != null) {
- topic.checkGC(gcIntervalInSeconds);
+ Optional<Topic> topic = extractTopic(t);
+ if (topic.isPresent()) {
+ topic.get().checkGC(gcIntervalInSeconds);
}
});
}
public void checkMessageExpiry() {
topics.forEach((n, t) -> {
- Topic topic = t.getNow(null);
- if (topic != null) {
- topic.checkMessageExpiry();
+ Optional<Topic> topic = extractTopic(t);
+ if (topic.isPresent()) {
+ topic.get().checkMessageExpiry();
}
});
}
public void checkMessageDeduplicationInfo() {
topics.forEach((n, t) -> {
- Topic topic = t.getNow(null);
- if (topic != null) {
- topic.checkMessageDeduplicationInfo();
+ Optional<Topic> topic = extractTopic(t);
+ if (topic.isPresent()) {
+ topic.get().checkMessageDeduplicationInfo();
}
});
}
@@ -868,8 +868,9 @@ public boolean isBacklogExceeded(PersistentTopic topic) {
public void monitorBacklogQuota() {
topics.forEach((n, t) -> {
try {
- if (t.getNow(null) != null && t.getNow(null) instanceof
PersistentTopic) {
- PersistentTopic topic = (PersistentTopic) t.getNow(null);
+ Optional<Topic> optionalTopic = extractTopic(t);
+ if (optionalTopic.isPresent() && optionalTopic.get()
instanceof PersistentTopic) {
+ PersistentTopic topic = (PersistentTopic)
optionalTopic.get();
if (isBacklogExceeded(topic)) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic);
} else if (topic == null) {
@@ -921,7 +922,8 @@ void checkTopicNsOwnership(final String topic) throws
RuntimeException {
if (serviceUnit.includes(topicName)) {
// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
- closeFutures.add(topicFuture.thenCompose(Topic::close));
+ closeFutures.add(topicFuture
+ .thenCompose(t -> t.isPresent() ? t.get().close() :
CompletableFuture.completedFuture(null)));
}
});
CompletableFuture<Void> aggregator =
FutureUtil.waitForAll(closeFutures);
@@ -978,7 +980,7 @@ public int getNumberOfNamespaceBundles() {
return this.numberOfNamespaceBundles;
}
- public ConcurrentOpenHashMap<String, CompletableFuture<Topic>> getTopics()
{
+ public ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
getTopics() {
return topics;
}
@@ -996,7 +998,8 @@ public void onUpdate(String path, Policies data, Stat stat)
{
if (log.isDebugEnabled()) {
log.debug("Notifying topic that policies have changed:
{}", name);
}
- topic.onPoliciesUpdate(data);
+
+ topic.ifPresent(t -> t.onPoliciesUpdate(data));
});
}
});
@@ -1033,9 +1036,9 @@ public String generateUniqueProducerName() {
public Map<String, PersistentTopicStats> getTopicStats() {
HashMap<String, PersistentTopicStats> stats = new HashMap<>();
topics.forEach((name, topicFuture) -> {
- Topic currentTopic = topicFuture.getNow(null);
- if (currentTopic != null) {
- stats.put(name, currentTopic.getStats());
+ Optional<Topic> topic = extractTopic(topicFuture);
+ if (topic.isPresent()) {
+ stats.put(name, topic.get().getStats());
}
});
return stats;
@@ -1128,18 +1131,12 @@ private void updateTopicMessageDispatchRate() {
this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
topics.forEach((name, topicFuture) -> {
- if (topicFuture.isDone()) {
- String topicName = null;
- try {
- if (topicFuture.get() instanceof PersistentTopic) {
- PersistentTopic topic = (PersistentTopic)
topicFuture.get();
- topicName = topicFuture.get().getName();
- // it first checks namespace-policy rate and if
not present then applies broker-config
-
topic.getDispatchRateLimiter().updateDispatchRate();
- }
- } catch (Exception e) {
- log.warn("[{}] failed to update message-dispatch rate
{}", topicName, e.getMessage());
- }
+ Optional<Topic> topic = extractTopic(topicFuture);
+
+ if (topic.isPresent() && topic.get() instanceof
PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic)
topic.get();
+ // it first checks namespace-policy rate and if not
present then applies broker-config
+
persistentTopic.getDispatchRateLimiter().updateDispatchRate();
}
});
});
@@ -1149,22 +1146,19 @@ private void updateSubscriptionMessageDispatchRate() {
this.pulsar().getExecutor().submit(() -> {
// update message-rate for each topic subscription
topics.forEach((name, topicFuture) -> {
- if (topicFuture.isDone()) {
- try {
- topicFuture.get().getSubscriptions().forEach((subName,
persistentSubscription) -> {
- if (persistentSubscription
- .getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
- ((PersistentDispatcherMultipleConsumers)
persistentSubscription
-
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
- } else if (persistentSubscription
+ Optional<Topic> topic = extractTopic(topicFuture);
+
+ if (topic.isPresent()) {
+ topic.get().getSubscriptions().forEach((subName,
persistentSubscription) -> {
+ if (persistentSubscription.getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
+ ((PersistentDispatcherMultipleConsumers)
persistentSubscription.getDispatcher())
+
.getDispatchRateLimiter().updateDispatchRate();
+ } else if (persistentSubscription
.getDispatcher() instanceof
PersistentDispatcherSingleActiveConsumer) {
- ((PersistentDispatcherSingleActiveConsumer)
persistentSubscription
-
.getDispatcher()).getDispatchRateLimiter().updateDispatchRate();
- }
- });
- } catch (Exception e) {
- log.warn("Failed to get topic from future while update
subscription dispatch rate ", e);
- }
+ ((PersistentDispatcherSingleActiveConsumer)
persistentSubscription.getDispatcher())
+
.getDispatchRateLimiter().updateDispatchRate();
+ }
+ });
}
});
});
@@ -1177,11 +1171,13 @@ private void updateManagedLedgerConfig() {
if (topicFuture.isDone()) {
String topicName = null;
try {
- if (topicFuture.getNow(null) instanceof
PersistentTopic) {
- PersistentTopic topic = (PersistentTopic)
topicFuture.get();
- topicName = topicFuture.get().getName();
+ Optional<Topic> topic = extractTopic(topicFuture);
+
+ if (topic.isPresent() && topic.get() instanceof
PersistentTopic) {
+ PersistentTopic persistentTopic =
(PersistentTopic) topic.get();
+ topicName = persistentTopic.getName();
// update skipNonRecoverableLedger configuration
-
topic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
+
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(
pulsar.getConfiguration().isAutoSkipNonRecoverableData());
}
} catch (Exception e) {
@@ -1340,7 +1336,7 @@ public static boolean validateDynamicConfiguration(String
key, String value) {
* permit if it was successful to acquire it.
*/
private void createPendingLoadTopic() {
- Pair<String, CompletableFuture<Topic>> pendingTopic =
pendingTopicLoadingQueue.poll();
+ Pair<String, CompletableFuture<Optional<Topic>>> pendingTopic =
pendingTopicLoadingQueue.poll();
if (pendingTopic == null) {
return;
}
@@ -1348,7 +1344,7 @@ private void createPendingLoadTopic() {
final String topic = pendingTopic.getLeft();
try {
checkTopicNsOwnership(topic);
- CompletableFuture<Topic> pendingFuture = pendingTopic.getRight();
+ CompletableFuture<Optional<Topic>> pendingFuture =
pendingTopic.getRight();
final Semaphore topicLoadSemaphore =
topicLoadRequestSemaphore.get();
final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
createPersistentTopic(topic, true, pendingFuture);
@@ -1442,25 +1438,21 @@ private void blockDispatchersWithLargeUnAckMessages() {
lock.readLock().lock();
try {
topics.forEach((name, topicFuture) -> {
- if (topicFuture.isDone()) {
- try {
- topicFuture.get().getSubscriptions().forEach((subName,
persistentSubscription) -> {
- if (persistentSubscription
- .getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
- PersistentDispatcherMultipleConsumers
dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription
- .getDispatcher();
- int dispatcherUnAckMsgs =
dispatcher.getTotalUnackedMessages();
- if (dispatcherUnAckMsgs >
maxUnackedMsgsPerDispatcher) {
- log.info("[{}] Blocking dispatcher due to
reached max broker limit {}",
- dispatcher.getName(),
dispatcher.getTotalUnackedMessages());
- dispatcher.blockDispatcherOnUnackedMsgs();
- blockedDispatchers.add(dispatcher);
- }
+ Optional<Topic> topic = extractTopic(topicFuture);
+ if (topic.isPresent()) {
+ topic.get().getSubscriptions().forEach((subName,
persistentSubscription) -> {
+ if (persistentSubscription.getDispatcher() instanceof
PersistentDispatcherMultipleConsumers) {
+ PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) persistentSubscription
+ .getDispatcher();
+ int dispatcherUnAckMsgs =
dispatcher.getTotalUnackedMessages();
+ if (dispatcherUnAckMsgs >
maxUnackedMsgsPerDispatcher) {
+ log.info("[{}] Blocking dispatcher due to
reached max broker limit {}",
+ dispatcher.getName(),
dispatcher.getTotalUnackedMessages());
+ dispatcher.blockDispatcherOnUnackedMsgs();
+ blockedDispatchers.add(dispatcher);
}
- });
- } catch (Exception e) {
- log.warn("Failed to get topic from future ", e);
- }
+ }
+ });
}
});
} finally {
@@ -1496,4 +1488,15 @@ public ConfigField(Field field) {
this.field = field;
}
}
+
+ /**
+ * Safely extract optional topic instance from a future, in a way to avoid
unchecked exceptions and race conditions.
+ */
+ public static Optional<Topic>
extractTopic(CompletableFuture<Optional<Topic>> topicFuture) {
+ if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) {
+ return topicFuture.join();
+ } else {
+ return Optional.empty();
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
index c2ffc5341..914a25c7e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java
@@ -19,9 +19,12 @@
package org.apache.pulsar.broker.stats;
import java.util.Map;
+import java.util.Optional;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.TopicName;
@@ -47,8 +50,9 @@ public BookieClientStatsGenerator(PulsarService pulsar) {
private Map<String, Map<String, PendingBookieOpsStats>> generate() throws
Exception {
if (pulsar.getBrokerService() != null &&
pulsar.getBrokerService().getTopics() != null) {
pulsar.getBrokerService().getTopics().forEach((name, topicFuture)
-> {
- PersistentTopic persistentTopic = (PersistentTopic)
topicFuture.getNow(null);
- if (persistentTopic != null) {
+ Optional<Topic> topic =
BrokerService.extractTopic(topicFuture);
+ if (topic.isPresent() && topic.get() instanceof
PersistentTopic) {
+ PersistentTopic persistentTopic = (PersistentTopic)
topic.get();
TopicName topicName =
TopicName.get(persistentTopic.getName());
put(topicName,
persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 01e388a33..a2ef39726 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -32,11 +32,16 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.hash.Hashing;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -75,10 +80,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.hash.Hashing;
-
public class NamespaceServiceTest extends BrokerTestBase {
@BeforeMethod
@@ -260,10 +261,11 @@ public void testUnloadNamespaceBundleFailure() throws
Exception {
final String topicName =
"persistent://my-property/use/my-ns/my-topic1";
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name").subscribe();
- ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics =
pulsar.getBrokerService().getTopics();
- Topic spyTopic = spy(topics.get(topicName).get());
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topics = pulsar.getBrokerService()
+ .getTopics();
+ Topic spyTopic = spy(topics.get(topicName).get().get());
topics.clear();
- CompletableFuture<Topic> topicFuture =
CompletableFuture.completedFuture(spyTopic);
+ CompletableFuture<Optional<Topic>> topicFuture =
CompletableFuture.completedFuture(Optional.of(spyTopic));
// add mock topic
topics.put(topicName, topicFuture);
doAnswer(new Answer<CompletableFuture<Void>>() {
@@ -300,10 +302,10 @@ public void testUnloadNamespaceBundleWithStuckTopic()
throws Exception {
final String topicName =
"persistent://my-property/use/my-ns/my-topic1";
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscribe();
- ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics =
pulsar.getBrokerService().getTopics();
- Topic spyTopic = spy(topics.get(topicName).get());
+ ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>
topics = pulsar.getBrokerService().getTopics();
+ Topic spyTopic = spy(topics.get(topicName).get().get());
topics.clear();
- CompletableFuture<Topic> topicFuture =
CompletableFuture.completedFuture(spyTopic);
+ CompletableFuture<Optional<Topic>> topicFuture =
CompletableFuture.completedFuture(Optional.of(spyTopic));
// add mock topic
topics.put(topicName, topicFuture);
// return uncompleted future as close-topic result.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 0918bc7c9..31773bbd6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -746,7 +746,7 @@ public void testTopicLoadingOnDisableNamespaceBundle()
throws Exception {
pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle,
false);
// try to create topic which should fail as bundle is disable
- CompletableFuture<Topic> futureResult =
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
+ CompletableFuture<Optional<Topic>> futureResult =
pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true);
try {
futureResult.get();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services