This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 752319e  Fix unload namespaces bundle hangs. (#9116)
752319e is described below

commit 752319eff0f990071ee0b6e3c35d23a112cfe342
Author: lipenghui <[email protected]>
AuthorDate: Tue Jan 5 07:31:04 2021 +0800

    Fix unload namespaces bundle hangs. (#9116)
    
    ### Motivation
    
    Fix namespace bundle unloads hangs. In the BrokerService, we maintained a 
ConcurrentOpenHashMap for storing all topic references. In #8968 cleanup the 
topics when unloading namespace bundles, see 
https://github.com/apache/pulsar/pull/8968/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8bbbbc704c6ed6e317f5R1572-R1579
    
    Since StampedLock is not a reentrant and the method `foreach` of the 
ConcurrentOpenHashMap also acquire read lock, this might block the namespace 
unloading, here is the thread dump:
    
    ```
    "pulsar-io-16-7" #132 prio=5 os_prio=31 tid=0x00007ff370ae2800 nid=0x1f603 
waiting on condition [0x00007000121d0000]
       java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000780a0be18> (a 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
        at 
java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
        at 
java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.remove(ConcurrentOpenHashMap.java:306)
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.access$200(ConcurrentOpenHashMap.java:180)
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.remove(ConcurrentOpenHashMap.java:135)
        at 
org.apache.pulsar.broker.service.BrokerService.removeTopicFromCache(BrokerService.java:1658)
        at 
org.apache.pulsar.broker.service.BrokerService.lambda$cleanUnloadedTopicFromCache$61(BrokerService.java:1611)
        at 
org.apache.pulsar.broker.service.BrokerService$$Lambda$1003/2064147704.accept(Unknown
 Source)
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
        at 
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at 
org.apache.pulsar.broker.service.BrokerService.cleanUnloadedTopicFromCache(BrokerService.java:1607)
        at 
org.apache.pulsar.broker.namespace.OwnedBundle.lambda$handleUnloadRequest$1(OwnedBundle.java:140)
        at 
org.apache.pulsar.broker.namespace.OwnedBundle$$Lambda$999/503902413.apply(Unknown
 Source)
        at 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
        at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$null$18(NonPersistentTopic.java:442)
        at 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic$$Lambda$994/682846231.run(Unknown
 Source)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    ```
    This also makes the current ci unstable while shutdown the mock broker 
after the tests.
    
    ### Modifications
    
    Use `keys()` method of the `ConcurrentOpenHashMap` to get a new keys array 
list.
---
 .../main/java/org/apache/pulsar/broker/service/BrokerService.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index acab311..2e8fdf7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1597,12 +1597,12 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
     }
 
     public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) {
-        topics.forEach((name, topicFuture) -> {
-            TopicName topicName = TopicName.get(name);
+        for (String topic : topics.keys()) {
+            TopicName topicName = TopicName.get(topic);
             if (serviceUnit.includes(topicName)) {
                 
pulsar.getBrokerService().removeTopicFromCache(topicName.toString());
             }
-        });
+        }
     }
 
     public AuthorizationService getAuthorizationService() {

Reply via email to