This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e904735203f78e003ac8aeb30edb68ed279bd62d Author: feynmanlin <[email protected]> AuthorDate: Fri Nov 5 20:43:43 2021 +0800 Support retry when creating reader of Topic Policies (#12622) (cherry picked from commit 2ecafbfa4ca540230accb6f4f18f774fa4757adf) --- .../SystemTopicBasedTopicPoliciesService.java | 23 +++++++++++++-- .../SystemTopicBasedTopicPoliciesServiceTest.java | 34 ++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index f5dc4b7..70556826 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -35,6 +36,8 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.events.ActionType; import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.events.PulsarEvent; @@ -194,12 +197,10 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic ownedBundlesCountPerNamespace.get(namespace).incrementAndGet(); result.complete(null); } else { - SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory - .createTopicPoliciesSystemTopicClient(namespace); ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); policyCacheInitMap.put(namespace, false); CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = - systemTopicClient.newReaderAsync(); + creatSystemTopicClientWithRetry(namespace); readerCaches.put(namespace, readerCompletableFuture); readerCompletableFuture.whenComplete((reader, ex) -> { if (ex != null) { @@ -215,6 +216,22 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return result; } + protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry( + NamespaceName namespace) { + SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory + .createTopicPoliciesSystemTopicClient(namespace); + CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>(); + Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS); + RetryUtil.retryAsynchronously(() -> { + try { + return systemTopicClient.newReader(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }, backoff, pulsarService.getExecutor(), result); + return result; + } + @Override public CompletableFuture<Void> removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { NamespaceName namespace = namespaceBundle.getNamespaceObject(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 61365fe..49d8699 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -18,11 +18,18 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Sets; +import java.lang.reflect.Field; import java.util.List; import java.util.Map; import java.util.UUID; @@ -31,9 +38,15 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; +import org.apache.pulsar.broker.systopic.SystemTopicClient; +import org.apache.pulsar.broker.systopic.TopicPoliciesSystemTopicClient; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.BackoffBuilder; +import org.apache.pulsar.client.impl.ReaderImpl; +import org.apache.pulsar.common.events.PulsarEvent; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; @@ -245,4 +258,25 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic long cost = System.currentTimeMillis() - start; assertTrue("actual:" + cost, cost >= 5000 - 1000); } + + @Test + public void testCreatSystemTopicClientWithRetry() throws Exception { + SystemTopicBasedTopicPoliciesService service = + spy((SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService()); + Field field = SystemTopicBasedTopicPoliciesService.class + .getDeclaredField("namespaceEventsSystemTopicFactory"); + field.setAccessible(true); + NamespaceEventsSystemTopicFactory factory = spy((NamespaceEventsSystemTopicFactory) field.get(service)); + SystemTopicClient<PulsarEvent> client = mock(TopicPoliciesSystemTopicClient.class); + doReturn(client).when(factory).createTopicPoliciesSystemTopicClient(any()); + field.set(service, factory); + + SystemTopicClient.Reader<PulsarEvent> reader = mock(SystemTopicClient.Reader.class); + // Throw an exception first, create successfully after retrying + doThrow(new PulsarClientException("test")).doReturn(reader).when(client).newReader(); + + SystemTopicClient.Reader<PulsarEvent> reader1 = service.creatSystemTopicClientWithRetry(null).get(); + + assertEquals(reader1, reader); + } }
