This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e904735203f78e003ac8aeb30edb68ed279bd62d
Author: feynmanlin <[email protected]>
AuthorDate: Fri Nov 5 20:43:43 2021 +0800

    Support retry when creating reader of Topic Policies (#12622)
    
    (cherry picked from commit 2ecafbfa4ca540230accb6f4f18f774fa4757adf)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 23 +++++++++++++--
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 34 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index f5dc4b7..70556826 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -35,6 +36,8 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.events.ActionType;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.events.PulsarEvent;
@@ -194,12 +197,10 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
                 ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
                 result.complete(null);
             } else {
-                SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
-                        .createTopicPoliciesSystemTopicClient(namespace);
                 ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
                 policyCacheInitMap.put(namespace, false);
                 CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                        systemTopicClient.newReaderAsync();
+                        creatSystemTopicClientWithRetry(namespace);
                 readerCaches.put(namespace, readerCompletableFuture);
                 readerCompletableFuture.whenComplete((reader, ex) -> {
                     if (ex != null) {
@@ -215,6 +216,22 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         return result;
     }
 
+    protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
creatSystemTopicClientWithRetry(
+            NamespaceName namespace) {
+        SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
+                .createTopicPoliciesSystemTopicClient(namespace);
+        CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new 
CompletableFuture<>();
+        Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, 
TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
+        RetryUtil.retryAsynchronously(() -> {
+            try {
+                return systemTopicClient.newReader();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        }, backoff, pulsarService.getExecutor(), result);
+        return result;
+    }
+
     @Override
     public CompletableFuture<Void> 
removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
         NamespaceName namespace = namespaceBundle.getNamespaceObject();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 61365fe..49d8699 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,11 +18,18 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
 import static org.testng.AssertJUnit.assertNull;
 import static org.testng.AssertJUnit.assertTrue;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -31,9 +38,15 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
+import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.BackoffBuilder;
+import org.apache.pulsar.client.impl.ReaderImpl;
+import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -245,4 +258,25 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         long cost = System.currentTimeMillis() - start;
         assertTrue("actual:" + cost, cost >= 5000 - 1000);
     }
+
+    @Test
+    public void testCreatSystemTopicClientWithRetry() throws Exception {
+        SystemTopicBasedTopicPoliciesService service =
+                spy((SystemTopicBasedTopicPoliciesService) 
pulsar.getTopicPoliciesService());
+        Field field = SystemTopicBasedTopicPoliciesService.class
+                .getDeclaredField("namespaceEventsSystemTopicFactory");
+        field.setAccessible(true);
+        NamespaceEventsSystemTopicFactory factory = 
spy((NamespaceEventsSystemTopicFactory) field.get(service));
+        SystemTopicClient<PulsarEvent> client = 
mock(TopicPoliciesSystemTopicClient.class);
+        
doReturn(client).when(factory).createTopicPoliciesSystemTopicClient(any());
+        field.set(service, factory);
+
+        SystemTopicClient.Reader<PulsarEvent> reader = 
mock(SystemTopicClient.Reader.class);
+        // Throw an exception first, create successfully after retrying
+        doThrow(new 
PulsarClientException("test")).doReturn(reader).when(client).newReader();
+
+        SystemTopicClient.Reader<PulsarEvent> reader1 = 
service.creatSystemTopicClientWithRetry(null).get();
+
+        assertEquals(reader1, reader);
+    }
 }

Reply via email to