This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4fceca75e7f133342ddd342e890492687ed12f3c
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Feb 5 02:55:39 2021 -0800

    Do not use a static map of listeners in TopicPoliciesService (#9486)
    
    Maybe CI jobs are failing with OOM in the brokers unit tests. The Surefire 
worker is configured with 4 processes, each with xmx of 1G.
    
    The problem was introduced in #7863 where a static map of listeners was 
added to an interface. That makes that map to contain all the `PulsarService` 
instances created during the tests execution and keeping references to 
everything else.
    
    The map should instead be scoped to the specific instance.
    
    (cherry picked from commit 31ee4541a16bd7194d50b576434eb7742ef87e99)
---
 .../service/SystemTopicBasedTopicPoliciesService.java     | 15 +++++++++------
 .../pulsar/broker/service/TopicPoliciesService.java       |  7 +------
 2 files changed, 10 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 2627f4d..8b1ec1e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -21,6 +21,11 @@ package org.apache.pulsar.broker.service;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -40,11 +45,6 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Cached topic policies service will cache the system topic reader and the 
topic policies
  *
@@ -63,6 +63,8 @@ public class SystemTopicBasedTopicPoliciesService implements 
TopicPoliciesServic
 
     private final Map<NamespaceName, Boolean> policyCacheInitMap = new 
ConcurrentHashMap<>();
 
+    private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> 
listeners = new ConcurrentHashMap<>();
+
     public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
         this.pulsarService = pulsarService;
     }
@@ -123,7 +125,8 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             return;
         }
         TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
-        TopicName topicName = TopicName.get(event.getDomain(), 
event.getTenant(), event.getNamespace(), event.getTopic());
+        TopicName topicName = TopicName.get(event.getDomain(), 
event.getTenant(),
+                event.getNamespace(), event.getTopic());
         if (listeners.get(topicName) != null) {
             TopicPolicies policies = event.getPolicies();
             for (TopicPolicyListener<TopicPolicies> listener : 
listeners.get(topicName)) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index a4da39f..82b0abf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -18,24 +18,19 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.concurrent.CompletableFuture;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Topic policies service
  */
 public interface TopicPoliciesService {
 
     TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
-    Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new 
ConcurrentHashMap<>();
 
     /**
      * Update policies for a topic async

Reply via email to