This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 454da897c355457e124371fabeb6c37db806bc4c Author: feynmanlin <[email protected]> AuthorDate: Wed Aug 4 00:30:01 2021 +0800 Make getTopicPoliciesAsyncWithRetry as a default method (#11518) Admin API needs to use `getTopicPoliciesAsyncWithRetry`, it also needs to be used in BrokerService, so move it to TopicService (cherry picked from commit 27ecd41bea020f1af2edaf472f9b10977b3520f8) --- .../apache/pulsar/broker/admin/AdminResource.java | 45 ++-------------------- .../SystemTopicBasedTopicPoliciesService.java | 7 ++-- .../broker/service/TopicPoliciesService.java | 36 +++++++++++++++++ .../SystemTopicBasedTopicPoliciesServiceTest.java | 40 ++++++++++++++----- 4 files changed, 75 insertions(+), 53 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 8f1e780..568d569 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -29,9 +29,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import javax.servlet.ServletContext; import javax.ws.rs.WebApplicationException; @@ -42,12 +39,9 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; -import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.internal.TopicsImpl; -import org.apache.pulsar.client.impl.Backoff; -import org.apache.pulsar.client.impl.BackoffBuilder; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -85,7 +79,6 @@ public abstract class AdminResource extends PulsarWebResource { public static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly"; public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics"; private static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers"; - private static final long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000; protected BookKeeper bookKeeper() { return pulsar().getBookKeeperClient(); @@ -370,47 +363,17 @@ public abstract class AdminResource extends PulsarWebResource { } protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName) { - return internalGetTopicPoliciesAsyncWithRetry(topicName, - new AtomicLong(DEFAULT_GET_TOPIC_POLICY_TIMEOUT), null, null); - } - - protected CompletableFuture<Optional<TopicPolicies>> internalGetTopicPoliciesAsyncWithRetry(TopicName topicName, - final AtomicLong remainingTime, final Backoff backoff, CompletableFuture<Optional<TopicPolicies>> future) { - CompletableFuture<Optional<TopicPolicies>> response = future == null ? new CompletableFuture<>() : future; try { checkTopicLevelPolicyEnable(); - response.complete(Optional.ofNullable(pulsar() - .getTopicPoliciesService().getTopicPolicies(topicName))); - } catch (RestException re) { - response.completeExceptionally(re); - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { - Backoff usedBackoff = backoff == null ? new BackoffBuilder() - .setInitialTime(500, TimeUnit.MILLISECONDS) - .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) - .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) - .create() : backoff; - long nextDelay = Math.min(usedBackoff.next(), remainingTime.get()); - if (nextDelay <= 0) { - response.completeExceptionally(new TimeoutException( - String.format("Failed to get topic policy withing configured timeout %s ms", - DEFAULT_GET_TOPIC_POLICY_TIMEOUT))); - } else { - if (log.isDebugEnabled()) { - log.error("Topic {} policies have not been initialized yet, retry after {}ms", - topicName, nextDelay); - } - pulsar().getExecutor().schedule(() -> { - remainingTime.addAndGet(-nextDelay); - internalGetTopicPoliciesAsyncWithRetry(topicName, remainingTime, usedBackoff, response); - }, nextDelay, TimeUnit.MILLISECONDS); - } + return pulsar().getTopicPoliciesService() + .getTopicPoliciesAsyncWithRetry(topicName, null, pulsar().getExecutor()); } catch (Exception e) { log.error("[{}] Failed to get topic policies {}", clientAppId(), topicName, e); - response.completeExceptionally(e); + return FutureUtil.failedFuture(e); } - return response; } + protected boolean checkBacklogQuota(BacklogQuota quota, RetentionPolicies retention) { if (retention == null || retention.getRetentionSizeInMB() <= 0 || retention.getRetentionTimeInMinutes() <= 0) { return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 315de98..a21eb07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -55,14 +55,15 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private final PulsarService pulsarService; private volatile NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory; - private final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap<>(); + @VisibleForTesting + final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap<>(); private final Map<NamespaceName, AtomicInteger> ownedBundlesCountPerNamespace = new ConcurrentHashMap<>(); private final Map<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>> readerCaches = new ConcurrentHashMap<>(); - - private final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>(); + @VisibleForTesting + final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>(); private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 2f42b64..48d2f1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -18,8 +18,14 @@ */ package org.apache.pulsar.broker.service; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.impl.BackoffBuilder; +import org.apache.pulsar.client.util.RetryUtil; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; @@ -31,6 +37,7 @@ import org.apache.pulsar.common.util.FutureUtil; public interface TopicPoliciesService { TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled(); + long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000; /** * Delete policies for a topic async. @@ -55,6 +62,35 @@ public interface TopicPoliciesService { TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException; /** + * When getting TopicPolicies, if the initialization has not been completed, + * we will go back off and try again until time out. + * @param topicName topic name + * @param backoff back off policy + * @return CompletableFuture<Optional<TopicPolicies>> + */ + default CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName, + final Backoff backoff, ScheduledExecutorService scheduledExecutorService) { + CompletableFuture<Optional<TopicPolicies>> response = new CompletableFuture<>(); + Backoff usedBackoff = backoff == null ? new BackoffBuilder() + .setInitialTime(500, TimeUnit.MILLISECONDS) + .setMandatoryStop(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) + .setMax(DEFAULT_GET_TOPIC_POLICY_TIMEOUT, TimeUnit.MILLISECONDS) + .create() : backoff; + try { + RetryUtil.retryAsynchronously(() -> { + try { + return Optional.ofNullable(getTopicPolicies(topicName)); + } catch (BrokerServiceException.TopicPoliciesCacheNotInitException exception) { + throw new RuntimeException(exception); + } + }, usedBackoff, scheduledExecutorService, response); + } catch (Exception e) { + response.completeExceptionally(e); + } + return response; + } + + /** * Get policies for a topic without cache async. * @param topicName topic name * @return future of the topic policies diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 63680db..61365fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -18,15 +18,25 @@ */ package org.apache.pulsar.broker.service; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; +import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertTrue; import com.google.common.collect.Sets; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.client.impl.BackoffBuilder; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.awaitility.Awaitility; @@ -35,14 +45,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertNotNull; -import static org.testng.AssertJUnit.assertNull; - @Test(groups = "broker") public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest { @@ -223,4 +225,24 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient); systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); } + + @Test + public void testGetPolicyTimeout() throws Exception { + SystemTopicBasedTopicPoliciesService service = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + Awaitility.await().untilAsserted(() -> assertTrue(service.policyCacheInitMap.get(TOPIC1.getNamespaceObject()))); + service.policyCacheInitMap.put(TOPIC1.getNamespaceObject(), false); + long start = System.currentTimeMillis(); + Backoff backoff = new BackoffBuilder() + .setInitialTime(500, TimeUnit.MILLISECONDS) + .setMandatoryStop(5000, TimeUnit.MILLISECONDS) + .setMax(1000, TimeUnit.MILLISECONDS) + .create(); + try { + service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor()).get(); + } catch (Exception e) { + assertTrue(e.getCause().getCause() instanceof TopicPoliciesCacheNotInitException); + } + long cost = System.currentTimeMillis() - start; + assertTrue("actual:" + cost, cost >= 5000 - 1000); + } }
