This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 7c5f9b0727d [fix][broker] Restore solution for certain topic unloading
race conditions (#20527)
7c5f9b0727d is described below
commit 7c5f9b0727d5f08e6e9160e9482db0d1d616fed9
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jun 7 16:39:45 2023 +0300
[fix][broker] Restore solution for certain topic unloading race conditions
(#20527)
(cherry picked from commit 03f916702ec2a887833543fdea5a2d5305a87302)
---
.../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 ----
.../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 2 +-
.../java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java | 4 ++--
3 files changed, 3 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index efe7235802f..263ee1fde22 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1909,10 +1909,6 @@ public class BrokerService implements Closeable {
return authorizationService;
}
- public CompletableFuture<Void> removeTopicFromCache(String topicName) {
- return removeTopicFutureFromCache(topicName, null);
- }
-
public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture =
findTopicFutureInCache(topic);
if (!createTopicFuture.isPresent()){
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c502e8c1073..b807f218db7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1326,7 +1326,7 @@ public class PersistentTopic extends AbstractTopic
}
private void disposeTopic(CompletableFuture<?> closeFuture) {
- brokerService.removeTopicFromCache(topic)
+ brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index aa63b224a9d..b3530dbc444 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -119,7 +119,7 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
// (3) remove topic and managed-ledger from broker which means topic
is not closed gracefully
consumer.close();
producer.close();
- pulsar.getBrokerService().removeTopicFromCache(topic1);
+ pulsar.getBrokerService().removeTopicFromCache(topic);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory();
Field field =
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
@@ -242,7 +242,7 @@ public class BrokerBkEnsemblesTests extends
BkEnsemblesTestBase {
// clean managed-ledger and recreate topic to clean any data from the
cache
producer.close();
- pulsar.getBrokerService().removeTopicFromCache(topic1);
+ pulsar.getBrokerService().removeTopicFromCache(topic);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl)
pulsar.getManagedLedgerFactory();
Field field =
ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);