This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 610ba53ea28720b606ea4f096f1f3d138cc2c601 Author: fengyubiao <[email protected]> AuthorDate: Mon Sep 2 21:33:41 2024 +0800 [fix][broker] Fix brokers still retry start replication after closed the topic (#23237) (cherry picked from commit aee2ee5070d07c683c54877bc1457a58e273440b) --- .../broker/service/persistent/PersistentTopic.java | 25 ++++++++++++ .../broker/service/OneWayReplicatorTest.java | 44 ++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0a2e1e0916a..9e0c6c180aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1805,6 +1805,28 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return closeFuture; } + private boolean isClosed() { + if (closeFutures == null) { + return false; + } + if (closeFutures.transferring != null + && closeFutures.transferring.isDone() + && !closeFutures.transferring.isCompletedExceptionally()) { + return true; + } + if (closeFutures.notWaitDisconnectClients != null + && closeFutures.notWaitDisconnectClients.isDone() + && !closeFutures.notWaitDisconnectClients.isCompletedExceptionally()) { + return true; + } + if (closeFutures.waitDisconnectClients != null + && closeFutures.waitDisconnectClients.isDone() + && !closeFutures.waitDisconnectClients.isCompletedExceptionally()) { + return true; + } + return false; + } + private void disposeTopic(CompletableFuture<?> closeFuture) { brokerService.removeTopicFromCache(PersistentTopic.this) .thenRun(() -> { @@ -1827,6 +1849,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @VisibleForTesting CompletableFuture<Void> checkReplicationAndRetryOnFailure() { + if (isClosed()) { + return CompletableFuture.completedFuture(null); + } CompletableFuture<Void> result = new CompletableFuture<Void>(); checkReplication().thenAccept(res -> { result.complete(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 1745d4dc90f..5665433b9c0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -31,6 +31,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.google.common.collect.Sets; import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; @@ -45,6 +46,7 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -1160,4 +1162,46 @@ public class OneWayReplicatorTest extends OneWayReplicatorTestBase { admin1.namespaces().deleteNamespace(ns); admin2.namespaces().deleteNamespace(ns); } + + /** + * This test used to confirm the "start replicator retry task" will be skipped after the topic is closed. + */ + @Test + public void testCloseTopicAfterStartReplicationFailed() throws Exception { + Field fieldTopicNameCache = TopicName.class.getDeclaredField("cache"); + fieldTopicNameCache.setAccessible(true); + ConcurrentHashMap<String, TopicName> topicNameCache = + (ConcurrentHashMap<String, TopicName>) fieldTopicNameCache.get(null); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); + // 1.Create topic, does not enable replication now. + admin1.topics().createNonPartitionedTopic(topicName); + Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + + // We inject an error to make "start replicator" to fail. + AsyncLoadingCache<String, Boolean> existsCache = + WhiteboxImpl.getInternalState(pulsar1.getConfigurationMetadataStore(), "existsCache"); + String path = "/admin/partitioned-topics/" + TopicName.get(topicName).getPersistenceNamingEncoding(); + existsCache.put(path, CompletableFuture.completedFuture(true)); + + // 2.Enable replication and unload topic after failed to start replicator. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); + Thread.sleep(3000); + producer1.close(); + existsCache.synchronous().invalidate(path); + admin1.topics().unload(topicName); + // Verify: the "start replicator retry task" will be skipped after the topic is closed. + // - Retry delay is "PersistentTopic.POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS": 60s, so wait for 70s. + // - Since the topic should not be touched anymore, we use "TopicName" to confirm whether it be used by + // Replication again. + Thread.sleep(10 * 1000); + topicNameCache.remove(topicName); + Thread.sleep(60 * 1000); + assertTrue(!topicNameCache.containsKey(topicName)); + + // cleanup. + admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); + admin1.topics().delete(topicName, false); + } }
