This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 610ba53ea28720b606ea4f096f1f3d138cc2c601
Author: fengyubiao <[email protected]>
AuthorDate: Mon Sep 2 21:33:41 2024 +0800

    [fix][broker] Fix brokers still retry start replication after closed the 
topic (#23237)
    
    (cherry picked from commit aee2ee5070d07c683c54877bc1457a58e273440b)
---
 .../broker/service/persistent/PersistentTopic.java | 25 ++++++++++++
 .../broker/service/OneWayReplicatorTest.java       | 44 ++++++++++++++++++++++
 2 files changed, 69 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0a2e1e0916a..9e0c6c180aa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1805,6 +1805,28 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return closeFuture;
     }
 
+    private boolean isClosed() {
+        if (closeFutures == null) {
+            return false;
+        }
+        if (closeFutures.transferring != null
+                && closeFutures.transferring.isDone()
+                && !closeFutures.transferring.isCompletedExceptionally()) {
+            return true;
+        }
+        if (closeFutures.notWaitDisconnectClients != null
+                && closeFutures.notWaitDisconnectClients.isDone()
+                && 
!closeFutures.notWaitDisconnectClients.isCompletedExceptionally()) {
+            return true;
+        }
+        if (closeFutures.waitDisconnectClients != null
+                && closeFutures.waitDisconnectClients.isDone()
+                && 
!closeFutures.waitDisconnectClients.isCompletedExceptionally()) {
+            return true;
+        }
+        return false;
+    }
+
     private void disposeTopic(CompletableFuture<?> closeFuture) {
         brokerService.removeTopicFromCache(PersistentTopic.this)
                 .thenRun(() -> {
@@ -1827,6 +1849,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
     @VisibleForTesting
     CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
+        if (isClosed()) {
+            return CompletableFuture.completedFuture(null);
+        }
         CompletableFuture<Void> result = new CompletableFuture<Void>();
         checkReplication().thenAccept(res -> {
             result.complete(null);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 1745d4dc90f..5665433b9c0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -31,6 +31,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.reflect.Field;
@@ -45,6 +46,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1160,4 +1162,46 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         admin1.namespaces().deleteNamespace(ns);
         admin2.namespaces().deleteNamespace(ns);
     }
+
+    /**
+     * This test used to confirm the "start replicator retry task" will be 
skipped after the topic is closed.
+     */
+    @Test
+    public void testCloseTopicAfterStartReplicationFailed() throws Exception {
+        Field fieldTopicNameCache = TopicName.class.getDeclaredField("cache");
+        fieldTopicNameCache.setAccessible(true);
+        ConcurrentHashMap<String, TopicName> topicNameCache =
+                (ConcurrentHashMap<String, TopicName>) 
fieldTopicNameCache.get(null);
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+        // 1.Create topic, does not enable replication now.
+        admin1.topics().createNonPartitionedTopic(topicName);
+        Producer<byte[]> producer1 = 
client1.newProducer().topic(topicName).create();
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+        // We inject an error to make "start replicator" to fail.
+        AsyncLoadingCache<String, Boolean> existsCache =
+                
WhiteboxImpl.getInternalState(pulsar1.getConfigurationMetadataStore(), 
"existsCache");
+        String path = "/admin/partitioned-topics/" + 
TopicName.get(topicName).getPersistenceNamingEncoding();
+        existsCache.put(path, CompletableFuture.completedFuture(true));
+
+        // 2.Enable replication and unload topic after failed to start 
replicator.
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+        Thread.sleep(3000);
+        producer1.close();
+        existsCache.synchronous().invalidate(path);
+        admin1.topics().unload(topicName);
+        // Verify: the "start replicator retry task" will be skipped after the 
topic is closed.
+        // - Retry delay is 
"PersistentTopic.POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS": 60s, so wait for 
70s.
+        // - Since the topic should not be touched anymore, we use "TopicName" 
to confirm whether it be used by
+        //   Replication again.
+        Thread.sleep(10 * 1000);
+        topicNameCache.remove(topicName);
+        Thread.sleep(60 * 1000);
+        assertTrue(!topicNameCache.containsKey(topicName));
+
+        // cleanup.
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        admin1.topics().delete(topicName, false);
+    }
 }

Reply via email to