This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new be9cbc3b286 [fix][broker]Consumer can't consume messages because there
has two sames topics in one broker (#17526)
be9cbc3b286 is described below
commit be9cbc3b28665615fb913d02c67cf9e4d232afa0
Author: fengyubiao <[email protected]>
AuthorDate: Wed Sep 21 21:28:38 2022 +0800
[fix][broker]Consumer can't consume messages because there has two sames
topics in one broker (#17526)
---
.../pulsar/broker/service/BrokerService.java | 56 ++++++++++++++++++++--
.../service/nonpersistent/NonPersistentTopic.java | 4 +-
.../broker/service/persistent/PersistentTopic.java | 6 +--
3 files changed, 56 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 48f3713f5cb..b704044e139 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1945,7 +1945,7 @@ public class BrokerService implements Closeable {
TopicName topicName = TopicName.get(topic);
if (serviceUnit.includes(topicName) &&
getTopicReference(topic).isPresent()) {
log.info("[{}][{}] Clean unloaded topic from cache.",
serviceUnit.toString(), topic);
-
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(),
serviceUnit);
+
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(),
serviceUnit, null);
}
}
}
@@ -1954,15 +1954,56 @@ public class BrokerService implements Closeable {
return authorizationService;
}
- public CompletableFuture<Void> removeTopicFromCache(String topic) {
+ public CompletableFuture<Void> removeTopicFromCache(String topicName) {
+ return removeTopicFutureFromCache(topicName, null);
+ }
+
+ public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
+ Optional<CompletableFuture<Optional<Topic>>> createTopicFuture =
findTopicFutureInCache(topic);
+ if (createTopicFuture.isEmpty()){
+ return CompletableFuture.completedFuture(null);
+ }
+ return removeTopicFutureFromCache(topic.getName(),
createTopicFuture.get());
+ }
+
+ private Optional<CompletableFuture<Optional<Topic>>>
findTopicFutureInCache(Topic topic){
+ if (topic == null){
+ return Optional.empty();
+ }
+ final CompletableFuture<Optional<Topic>> createTopicFuture =
topics.get(topic.getName());
+ // If not exists in cache, do nothing.
+ if (createTopicFuture == null){
+ return Optional.empty();
+ }
+ // If the future in cache is not yet complete, the topic instance in
the cache is not the same with the topic.
+ if (!createTopicFuture.isDone()){
+ return Optional.empty();
+ }
+ // If the future in cache has exception complete,
+ // the topic instance in the cache is not the same with the topic.
+ if (createTopicFuture.isCompletedExceptionally()){
+ return Optional.empty();
+ }
+ Optional<Topic> optionalTopic = createTopicFuture.join();
+ Topic topicInCache = optionalTopic.orElse(null);
+ if (topicInCache == null || topicInCache != topic){
+ return Optional.empty();
+ } else {
+ return Optional.of(createTopicFuture);
+ }
+ }
+
+ private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
+
CompletableFuture<Optional<Topic>> createTopicFuture) {
TopicName topicName = TopicName.get(topic);
return pulsar.getNamespaceService().getBundleAsync(topicName)
.thenAccept(namespaceBundle -> {
- removeTopicFromCache(topic, namespaceBundle);
+ removeTopicFromCache(topic, namespaceBundle,
createTopicFuture);
});
}
- public void removeTopicFromCache(String topic, NamespaceBundle
namespaceBundle) {
+ private void removeTopicFromCache(String topic, NamespaceBundle
namespaceBundle,
+ CompletableFuture<Optional<Topic>>
createTopicFuture) {
String bundleName = namespaceBundle.toString();
String namespaceName =
TopicName.get(topic).getNamespaceObject().toString();
@@ -1989,7 +2030,12 @@ public class BrokerService implements Closeable {
}
}
}
- topics.remove(topic);
+
+ if (createTopicFuture == null) {
+ topics.remove(topic);
+ } else {
+ topics.remove(topic, createTopicFuture);
+ }
Compactor compactor = pulsar.getNullableCompactor();
if (compactor != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index db77d12e713..b178e99b5c8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -439,7 +439,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
// topic GC iterates over topics map and removing
from the map with the same thread creates
// deadlock. so, execute it in different thread
brokerService.executor().execute(() -> {
- brokerService.removeTopicFromCache(topic);
+
brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
@@ -506,7 +506,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
// unload topic iterates over topics map and removing from the map
with the same thread creates deadlock.
// so, execute it in different thread
brokerService.executor().execute(() -> {
- brokerService.removeTopicFromCache(topic);
+ brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
closeFuture.complete(null);
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 60b08413235..5114ddcee54 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1190,7 +1190,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
ledger.asyncDelete(new
AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void
deleteLedgerComplete(Object ctx) {
-
brokerService.removeTopicFromCache(topic);
+
brokerService.removeTopicFromCache(PersistentTopic.this);
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
@@ -1290,7 +1290,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void closeComplete(Object ctx) {
// Everything is now closed, remove the topic from map
- brokerService.removeTopicFromCache(topic)
+ brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
@@ -1312,7 +1312,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@Override
public void closeFailed(ManagedLedgerException exception,
Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding
anyway.", topic, exception);
- brokerService.removeTopicFromCache(topic);
+ brokerService.removeTopicFromCache(PersistentTopic.this);
unregisterTopicPolicyListener();
closeFuture.complete(null);
}