This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aee2ee5070d [fix][broker] Fix brokers still retry start replication
after closed the topic (#23237)
aee2ee5070d is described below
commit aee2ee5070d07c683c54877bc1457a58e273440b
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 2 21:33:41 2024 +0800
[fix][broker] Fix brokers still retry start replication after closed the
topic (#23237)
---
.../broker/service/persistent/PersistentTopic.java | 25 ++++++++++++
.../broker/service/OneWayReplicatorTest.java | 44 ++++++++++++++++++++++
2 files changed, 69 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d814e7ce115..b8cde7619af 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1810,6 +1810,28 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return closeFuture;
}
+ private boolean isClosed() {
+ if (closeFutures == null) {
+ return false;
+ }
+ if (closeFutures.transferring != null
+ && closeFutures.transferring.isDone()
+ && !closeFutures.transferring.isCompletedExceptionally()) {
+ return true;
+ }
+ if (closeFutures.notWaitDisconnectClients != null
+ && closeFutures.notWaitDisconnectClients.isDone()
+ &&
!closeFutures.notWaitDisconnectClients.isCompletedExceptionally()) {
+ return true;
+ }
+ if (closeFutures.waitDisconnectClients != null
+ && closeFutures.waitDisconnectClients.isDone()
+ &&
!closeFutures.waitDisconnectClients.isCompletedExceptionally()) {
+ return true;
+ }
+ return false;
+ }
+
private void disposeTopic(CompletableFuture<?> closeFuture) {
brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
@@ -1832,6 +1854,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
@VisibleForTesting
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
+ if (isClosed()) {
+ return CompletableFuture.completedFuture(null);
+ }
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
result.complete(null);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 74604dd990c..440e90da2b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
@@ -46,6 +47,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1281,4 +1283,46 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin1.topics().delete(topicName, false);
admin2.topics().delete(topicName, false);
}
+
+ /**
+ * This test used to confirm the "start replicator retry task" will be
skipped after the topic is closed.
+ */
+ @Test
+ public void testCloseTopicAfterStartReplicationFailed() throws Exception {
+ Field fieldTopicNameCache = TopicName.class.getDeclaredField("cache");
+ fieldTopicNameCache.setAccessible(true);
+ ConcurrentHashMap<String, TopicName> topicNameCache =
+ (ConcurrentHashMap<String, TopicName>)
fieldTopicNameCache.get(null);
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ // 1.Create topic, does not enable replication now.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ Producer<byte[]> producer1 =
client1.newProducer().topic(topicName).create();
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // We inject an error to make "start replicator" to fail.
+ AsyncLoadingCache<String, Boolean> existsCache =
+
WhiteboxImpl.getInternalState(pulsar1.getConfigurationMetadataStore(),
"existsCache");
+ String path = "/admin/partitioned-topics/" +
TopicName.get(topicName).getPersistenceNamingEncoding();
+ existsCache.put(path, CompletableFuture.completedFuture(true));
+
+ // 2.Enable replication and unload topic after failed to start
replicator.
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+ Thread.sleep(3000);
+ producer1.close();
+ existsCache.synchronous().invalidate(path);
+ admin1.topics().unload(topicName);
+ // Verify: the "start replicator retry task" will be skipped after the
topic is closed.
+ // - Retry delay is
"PersistentTopic.POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS": 60s, so wait for
70s.
+ // - Since the topic should not be touched anymore, we use "TopicName"
to confirm whether it be used by
+ // Replication again.
+ Thread.sleep(10 * 1000);
+ topicNameCache.remove(topicName);
+ Thread.sleep(60 * 1000);
+ assertTrue(!topicNameCache.containsKey(topicName));
+
+ // cleanup.
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1));
+ admin1.topics().delete(topicName, false);
+ }
}