This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 752319e Fix unload namespaces bundle hangs. (#9116)
752319e is described below
commit 752319eff0f990071ee0b6e3c35d23a112cfe342
Author: lipenghui <[email protected]>
AuthorDate: Tue Jan 5 07:31:04 2021 +0800
Fix unload namespaces bundle hangs. (#9116)
### Motivation
Fix namespace bundle unloads hangs. In the BrokerService, we maintained a
ConcurrentOpenHashMap for storing all topic references. In #8968 cleanup the
topics when unloading namespace bundles, see
https://github.com/apache/pulsar/pull/8968/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8bbbbc704c6ed6e317f5R1572-R1579
Since StampedLock is not a reentrant and the method `foreach` of the
ConcurrentOpenHashMap also acquire read lock, this might block the namespace
unloading, here is the thread dump:
```
"pulsar-io-16-7" #132 prio=5 os_prio=31 tid=0x00007ff370ae2800 nid=0x1f603
waiting on condition [0x00007000121d0000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000780a0be18> (a
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section)
at
java.util.concurrent.locks.StampedLock.acquireWrite(StampedLock.java:1119)
at
java.util.concurrent.locks.StampedLock.writeLock(StampedLock.java:354)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.remove(ConcurrentOpenHashMap.java:306)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.access$200(ConcurrentOpenHashMap.java:180)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.remove(ConcurrentOpenHashMap.java:135)
at
org.apache.pulsar.broker.service.BrokerService.removeTopicFromCache(BrokerService.java:1658)
at
org.apache.pulsar.broker.service.BrokerService.lambda$cleanUnloadedTopicFromCache$61(BrokerService.java:1611)
at
org.apache.pulsar.broker.service.BrokerService$$Lambda$1003/2064147704.accept(Unknown
Source)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
at
org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
at
org.apache.pulsar.broker.service.BrokerService.cleanUnloadedTopicFromCache(BrokerService.java:1607)
at
org.apache.pulsar.broker.namespace.OwnedBundle.lambda$handleUnloadRequest$1(OwnedBundle.java:140)
at
org.apache.pulsar.broker.namespace.OwnedBundle$$Lambda$999/503902413.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$null$18(NonPersistentTopic.java:442)
at
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic$$Lambda$994/682846231.run(Unknown
Source)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
```
This also makes the current ci unstable while shutdown the mock broker
after the tests.
### Modifications
Use `keys()` method of the `ConcurrentOpenHashMap` to get a new keys array
list.
---
.../main/java/org/apache/pulsar/broker/service/BrokerService.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index acab311..2e8fdf7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1597,12 +1597,12 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
}
public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) {
- topics.forEach((name, topicFuture) -> {
- TopicName topicName = TopicName.get(name);
+ for (String topic : topics.keys()) {
+ TopicName topicName = TopicName.get(topic);
if (serviceUnit.includes(topicName)) {
pulsar.getBrokerService().removeTopicFromCache(topicName.toString());
}
- });
+ }
}
public AuthorizationService getAuthorizationService() {