This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 454da897c355457e124371fabeb6c37db806bc4c
Author: feynmanlin <[email protected]>
AuthorDate: Wed Aug 4 00:30:01 2021 +0800

    Make getTopicPoliciesAsyncWithRetry as a default method (#11518)
    
    Admin API needs to use `getTopicPoliciesAsyncWithRetry`, it also needs to 
be used in BrokerService, so move it to TopicService
    
    (cherry picked from commit 27ecd41bea020f1af2edaf472f9b10977b3520f8)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 45 ++--------------------
 .../SystemTopicBasedTopicPoliciesService.java      |  7 ++--
 .../broker/service/TopicPoliciesService.java       | 36 +++++++++++++++++
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 40 ++++++++++++++-----
 4 files changed, 75 insertions(+), 53 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 8f1e780..568d569 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -29,9 +29,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
@@ -42,12 +39,9 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.impl.Backoff;
-import org.apache.pulsar.client.impl.BackoffBuilder;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -85,7 +79,6 @@ public abstract class AdminResource extends PulsarWebResource 
{
     public static final String POLICIES_READONLY_FLAG_PATH = 
"/admin/flags/policies-readonly";
     public static final String PARTITIONED_TOPIC_PATH_ZNODE = 
"partitioned-topics";
     private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers";
-    private static final long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
 
     protected BookKeeper bookKeeper() {
         return pulsar().getBookKeeperClient();
@@ -370,47 +363,17 @@ public abstract class AdminResource extends 
PulsarWebResource {
     }
 
     protected CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsyncWithRetry(TopicName topicName) {
-        return internalGetTopicPoliciesAsyncWithRetry(topicName,
-                new AtomicLong(DEFAULT_GET_TOPIC_POLICY_TIMEOUT), null, null);
-    }
-
-    protected CompletableFuture<Optional<TopicPolicies>> 
internalGetTopicPoliciesAsyncWithRetry(TopicName topicName,
-            final AtomicLong remainingTime, final Backoff backoff, 
CompletableFuture<Optional<TopicPolicies>> future) {
-        CompletableFuture<Optional<TopicPolicies>> response = future == null ? 
new CompletableFuture<>() : future;
         try {
             checkTopicLevelPolicyEnable();
-            response.complete(Optional.ofNullable(pulsar()
-                    .getTopicPoliciesService().getTopicPolicies(topicName)));
-        } catch (RestException re) {
-            response.completeExceptionally(re);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            Backoff usedBackoff = backoff == null ? new BackoffBuilder()
-                    .setInitialTime(500, TimeUnit.MILLISECONDS)
-                    .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, 
TimeUnit.MILLISECONDS)
-                    .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, 
TimeUnit.MILLISECONDS)
-                    .create() : backoff;
-            long nextDelay = Math.min(usedBackoff.next(), remainingTime.get());
-            if (nextDelay <= 0) {
-                response.completeExceptionally(new TimeoutException(
-                        String.format("Failed to get topic policy withing 
configured timeout %s ms",
-                                DEFAULT_GET_TOPIC_POLICY_TIMEOUT)));
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.error("Topic {} policies have not been initialized 
yet, retry after {}ms",
-                            topicName, nextDelay);
-                }
-                pulsar().getExecutor().schedule(() -> {
-                    remainingTime.addAndGet(-nextDelay);
-                    internalGetTopicPoliciesAsyncWithRetry(topicName, 
remainingTime, usedBackoff, response);
-                }, nextDelay, TimeUnit.MILLISECONDS);
-            }
+            return pulsar().getTopicPoliciesService()
+                    .getTopicPoliciesAsyncWithRetry(topicName, null, 
pulsar().getExecutor());
         } catch (Exception e) {
             log.error("[{}] Failed to get topic policies {}", clientAppId(), 
topicName, e);
-            response.completeExceptionally(e);
+            return FutureUtil.failedFuture(e);
         }
-        return response;
     }
 
+
     protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies 
retention) {
         if (retention == null || retention.getRetentionSizeInMB() <= 0 || 
retention.getRetentionTimeInMinutes() <= 0) {
             return true;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 315de98..a21eb07 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -55,14 +55,15 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private final PulsarService pulsarService;
     private volatile NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
 
-    private final Map<TopicName, TopicPolicies> policiesCache = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    final Map<TopicName, TopicPolicies> policiesCache = new 
ConcurrentHashMap<>();
 
     private final Map<NamespaceName, AtomicInteger> 
ownedBundlesCountPerNamespace = new ConcurrentHashMap<>();
 
     private final Map<NamespaceName, 
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
             readerCaches = new ConcurrentHashMap<>();
-
-    private final Map<NamespaceName, Boolean> policyCacheInitMap = new 
ConcurrentHashMap<>();
+    @VisibleForTesting
+    final Map<NamespaceName, Boolean> policyCacheInitMap = new 
ConcurrentHashMap<>();
 
     private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> 
listeners = new ConcurrentHashMap<>();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 2f42b64..48d2f1e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -18,8 +18,14 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.BackoffBuilder;
+import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -31,6 +37,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 public interface TopicPoliciesService {
 
     TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
+    long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
 
     /**
      * Delete policies for a topic async.
@@ -55,6 +62,35 @@ public interface TopicPoliciesService {
     TopicPolicies getTopicPolicies(TopicName topicName) throws 
TopicPoliciesCacheNotInitException;
 
     /**
+     * When getting TopicPolicies, if the initialization has not been 
completed,
+     * we will go back off and try again until time out.
+     * @param topicName topic name
+     * @param backoff back off policy
+     * @return CompletableFuture<Optional<TopicPolicies>>
+     */
+    default CompletableFuture<Optional<TopicPolicies>> 
getTopicPoliciesAsyncWithRetry(TopicName topicName,
+              final Backoff backoff, ScheduledExecutorService 
scheduledExecutorService) {
+        CompletableFuture<Optional<TopicPolicies>> response = new 
CompletableFuture<>();
+        Backoff usedBackoff = backoff == null ? new BackoffBuilder()
+                .setInitialTime(500, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, 
TimeUnit.MILLISECONDS)
+                .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, 
TimeUnit.MILLISECONDS)
+                .create() : backoff;
+        try {
+            RetryUtil.retryAsynchronously(() -> {
+                try {
+                    return Optional.ofNullable(getTopicPolicies(topicName));
+                } catch 
(BrokerServiceException.TopicPoliciesCacheNotInitException exception) {
+                    throw new RuntimeException(exception);
+                }
+            }, usedBackoff, scheduledExecutorService, response);
+        } catch (Exception e) {
+            response.completeExceptionally(e);
+        }
+        return response;
+    }
+
+    /**
      * Get policies for a topic without cache async.
      * @param topicName topic name
      * @return future of the topic policies
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 63680db..61365fe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -18,15 +18,25 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertNull;
+import static org.testng.AssertJUnit.assertTrue;
 import com.google.common.collect.Sets;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.BackoffBuilder;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.awaitility.Awaitility;
@@ -35,14 +45,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertNotNull;
-import static org.testng.AssertJUnit.assertNull;
-
 @Test(groups = "broker")
 public class SystemTopicBasedTopicPoliciesServiceTest extends 
MockedPulsarServiceBaseTest {
 
@@ -223,4 +225,24 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         systemTopicFactory = new 
NamespaceEventsSystemTopicFactory(pulsarClient);
         systemTopicBasedTopicPoliciesService = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
     }
+
+    @Test
+    public void testGetPolicyTimeout() throws Exception {
+        SystemTopicBasedTopicPoliciesService service = 
(SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+        Awaitility.await().untilAsserted(() -> 
assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject())));
+        service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false);
+        long start = System.currentTimeMillis();
+        Backoff backoff = new BackoffBuilder()
+                .setInitialTime(500, TimeUnit.MILLISECONDS)
+                .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
+                .setMax(1000, TimeUnit.MILLISECONDS)
+                .create();
+        try {
+            service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, 
pulsar.getExecutor()).get();
+        } catch (Exception e) {
+            assertTrue(e.getCause().getCause() instanceof 
TopicPoliciesCacheNotInitException);
+        }
+        long cost = System.currentTimeMillis() - start;
+        assertTrue("actual:" + cost, cost >= 5000 - 1000);
+    }
 }

Reply via email to