This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8a40b30cf47 [fix][broker] Closed topics won't be removed from the
cache (#23884)
8a40b30cf47 is described below
commit 8a40b30cf47a91ec02d931e6371d02409ba5951e
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Feb 6 10:14:28 2025 +0800
[fix][broker] Closed topics won't be removed from the cache (#23884)
---
.../pulsar/broker/service/AbstractTopic.java | 8 ++
.../pulsar/broker/service/BrokerService.java | 51 ++-----
.../buffer/impl/TopicTransactionBuffer.java | 27 +++-
.../impl/TransactionPersistentTopicTest.java | 148 +++++++++++++++++++++
4 files changed, 188 insertions(+), 46 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 69a38bc50de..9a115e6d1ca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -48,6 +48,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.ToLongFunction;
import javax.annotation.Nonnull;
import lombok.Getter;
+import lombok.Setter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -96,6 +97,13 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
protected final String topic;
+ // Reference to the CompletableFuture returned when creating this topic in
BrokerService.
+ // Used to safely remove the topic from BrokerService's cache by ensuring
we remove the exact
+ // topic instance that was created.
+ @Getter
+ @Setter
+ protected volatile CompletableFuture<Optional<Topic>> createFuture;
+
// Producers currently connected to this topic
protected final ConcurrentHashMap<String, Producer> producers;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 79e6fb2b02e..ddd436b0854 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1326,6 +1326,7 @@ public class BrokerService implements Closeable {
NonPersistentTopic nonPersistentTopic;
try {
nonPersistentTopic = newTopic(topic, null, this,
NonPersistentTopic.class);
+ nonPersistentTopic.setCreateFuture(topicFuture);
} catch (Throwable e) {
log.warn("Failed to create topic {}", topic, e);
topicFuture.completeExceptionally(e);
@@ -1800,6 +1801,7 @@ public class BrokerService implements Closeable {
PersistentTopic persistentTopic =
isSystemTopic(topic)
? new SystemTopic(topic, ledger,
BrokerService.this)
: newTopic(topic, ledger,
BrokerService.this, PersistentTopic.class);
+ persistentTopic.setCreateFuture(topicFuture);
persistentTopic
.initialize()
.thenCompose(__ ->
persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
@@ -2409,47 +2411,18 @@ public class BrokerService implements Closeable {
return authorizationService;
}
- public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
- Optional<CompletableFuture<Optional<Topic>>> createTopicFuture =
findTopicFutureInCache(topic);
- if (createTopicFuture.isEmpty()){
- return CompletableFuture.completedFuture(null);
- }
- return removeTopicFutureFromCache(topic.getName(),
createTopicFuture.get());
- }
-
- private Optional<CompletableFuture<Optional<Topic>>>
findTopicFutureInCache(Topic topic){
- if (topic == null){
- return Optional.empty();
- }
- final CompletableFuture<Optional<Topic>> createTopicFuture =
topics.get(topic.getName());
- // If not exists in cache, do nothing.
- if (createTopicFuture == null){
- return Optional.empty();
- }
- // If the future in cache is not yet complete, the topic instance in
the cache is not the same with the topic.
- if (!createTopicFuture.isDone()){
- return Optional.empty();
- }
- // If the future in cache has exception complete,
- // the topic instance in the cache is not the same with the topic.
- if (createTopicFuture.isCompletedExceptionally()){
- return Optional.empty();
- }
- Optional<Topic> optionalTopic = createTopicFuture.join();
- Topic topicInCache = optionalTopic.orElse(null);
- if (topicInCache == null || topicInCache != topic){
- return Optional.empty();
- } else {
- return Optional.of(createTopicFuture);
- }
- }
-
- private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
-
CompletableFuture<Optional<Topic>> createTopicFuture) {
- TopicName topicName = TopicName.get(topic);
+ /**
+ * Removes the topic from the cache only if the topicName and associated
createFuture match exactly.
+ * The TopicEvent.UNLOAD event will be triggered before and after removal.
+ *
+ * @param topic The topic to be removed.
+ * @return A CompletableFuture that completes when the operation is done.
+ */
+ public CompletableFuture<Void> removeTopicFromCache(AbstractTopic topic) {
+ TopicName topicName = TopicName.get(topic.getName());
return pulsar.getNamespaceService().getBundleAsync(topicName)
.thenAccept(namespaceBundle -> {
- removeTopicFromCache(topic, namespaceBundle,
createTopicFuture);
+ removeTopicFromCache(topic.getName(), namespaceBundle,
topic.getCreateFuture());
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 41977e6b61d..c43f0ed7fb9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -109,7 +109,25 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final AbortedTxnProcessor.SnapshotType snapshotType;
private final MaxReadPositionCallBack maxReadPositionCallBack;
+ private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic
topic) {
+ return
topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
+ ? new SnapshotSegmentAbortedTxnProcessorImpl(topic)
+ : new SingleSnapshotAbortedTxnProcessorImpl(topic);
+ }
+
+ private static AbortedTxnProcessor.SnapshotType
determineSnapshotType(PersistentTopic topic) {
+ return
topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
+ ? AbortedTxnProcessor.SnapshotType.Segment
+ : AbortedTxnProcessor.SnapshotType.Single;
+ }
+
public TopicTransactionBuffer(PersistentTopic topic) {
+ this(topic, createSnapshotProcessor(topic),
determineSnapshotType(topic));
+ }
+
+ @VisibleForTesting
+ TopicTransactionBuffer(PersistentTopic topic, AbortedTxnProcessor
snapshotAbortedTxnProcessor,
+ AbortedTxnProcessor.SnapshotType snapshotType) {
super(State.None);
this.topic = topic;
this.timer =
topic.getBrokerService().getPulsar().getTransactionTimer();
@@ -118,13 +136,8 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
this.maxReadPosition =
topic.getManagedLedger().getLastConfirmedEntry();
- if
(topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled())
{
- snapshotAbortedTxnProcessor = new
SnapshotSegmentAbortedTxnProcessorImpl(topic);
- snapshotType = AbortedTxnProcessor.SnapshotType.Segment;
- } else {
- snapshotAbortedTxnProcessor = new
SingleSnapshotAbortedTxnProcessorImpl(topic);
- snapshotType = AbortedTxnProcessor.SnapshotType.Single;
- }
+ this.snapshotAbortedTxnProcessor = snapshotAbortedTxnProcessor;
+ this.snapshotType = snapshotType;
this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
this.recover();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java
new file mode 100644
index 00000000000..508423adce4
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TopicFactory;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class TransactionPersistentTopicTest extends ProducerConsumerBase {
+
+ private static CountDownLatch topicInitSuccessSignal = new
CountDownLatch(1);
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ // Intercept when the `topicFuture` is about to complete and wait
until the topic close operation finishes.
+ conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
+ conf.setTransactionCoordinatorEnabled(true);
+ conf.setBrokerDeduplicationEnabled(false);
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testNoOrphanClosedTopicIfTxnInternalFailed() {
+ String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp2");
+
+ BrokerService brokerService = pulsar.getBrokerService();
+
+ // 1. Mock close topic when create transactionBuffer
+ TransactionBufferProvider mockTransactionBufferProvider = originTopic
-> {
+ AbortedTxnProcessor abortedTxnProcessor =
mock(AbortedTxnProcessor.class);
+ doAnswer(invocation -> {
+ topicInitSuccessSignal.await();
+ return CompletableFuture.failedFuture(new
RuntimeException("Mock recovery failed"));
+ }).when(abortedTxnProcessor).recoverFromSnapshot();
+
when(abortedTxnProcessor.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+ return new TopicTransactionBuffer(
+ (PersistentTopic) originTopic, abortedTxnProcessor,
AbortedTxnProcessor.SnapshotType.Single);
+ };
+ TransactionBufferProvider originalTransactionBufferProvider =
pulsar.getTransactionBufferProvider();
+ pulsar.setTransactionBufferProvider(mockTransactionBufferProvider);
+
+ // 2. Trigger create topic and assert topic load success.
+ CompletableFuture<Optional<Topic>> firstLoad =
brokerService.getTopic(tpName, true);
+ Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
+ .pollInterval(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ assertTrue(firstLoad.isDone());
+ assertFalse(firstLoad.isCompletedExceptionally());
+ });
+
+ // 3. Assert topic removed from cache
+ Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ assertFalse(brokerService.getTopics().containsKey(tpName));
+ });
+
+ // 4. Set txn provider to back
+ pulsar.setTransactionBufferProvider(originalTransactionBufferProvider);
+ }
+
+ public static class MyTopicFactory implements TopicFactory {
+ @Override
+ public <T extends Topic> T create(String topic, ManagedLedger ledger,
BrokerService brokerService,
+ Class<T> topicClazz) {
+ try {
+ if (topicClazz == NonPersistentTopic.class) {
+ return (T) new NonPersistentTopic(topic, brokerService);
+ } else {
+ return (T) new MyPersistentTopic(topic, ledger,
brokerService);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No-op
+ }
+ }
+
+ public static class MyPersistentTopic extends PersistentTopic {
+
+ public MyPersistentTopic(String topic, ManagedLedger ledger,
BrokerService brokerService) {
+ super(topic, ledger, brokerService);
+ }
+
+ @SneakyThrows
+ @Override
+ public CompletableFuture<Void> checkDeduplicationStatus() {
+ topicInitSuccessSignal.countDown();
+ // Sleep 1s pending txn buffer recover failed and close topic
+ Thread.sleep(1000);
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+}