This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4fceca75e7f133342ddd342e890492687ed12f3c Author: Matteo Merli <[email protected]> AuthorDate: Fri Feb 5 02:55:39 2021 -0800 Do not use a static map of listeners in TopicPoliciesService (#9486) Maybe CI jobs are failing with OOM in the brokers unit tests. The Surefire worker is configured with 4 processes, each with xmx of 1G. The problem was introduced in #7863 where a static map of listeners was added to an interface. That makes that map to contain all the `PulsarService` instances created during the tests execution and keeping references to everything else. The map should instead be scoped to the specific instance. (cherry picked from commit 31ee4541a16bd7194d50b576434eb7742ef87e99) --- .../service/SystemTopicBasedTopicPoliciesService.java | 15 +++++++++------ .../pulsar/broker/service/TopicPoliciesService.java | 7 +------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 2627f4d..8b1ec1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -21,6 +21,11 @@ package org.apache.pulsar.broker.service; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; @@ -40,11 +45,6 @@ import org.apache.pulsar.common.policies.data.TopicPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - /** * Cached topic policies service will cache the system topic reader and the topic policies * @@ -63,6 +63,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>(); + private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>(); + public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { this.pulsarService = pulsarService; } @@ -123,7 +125,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic return; } TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent(); - TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic()); + TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(), + event.getNamespace(), event.getTopic()); if (listeners.get(topicName) != null) { TopicPolicies policies = event.getPolicies(); for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index a4da39f..82b0abf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -18,24 +18,19 @@ */ package org.apache.pulsar.broker.service; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.FutureUtil; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; - /** * Topic policies service */ public interface TopicPoliciesService { TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled(); - Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>(); /** * Update policies for a topic async
