This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new ad09404ca6d [improve][broker] Reduce the consumers list sort by
priority level (#16243)
ad09404ca6d is described below
commit ad09404ca6dc50303580fd1fadc77d8ad1c59310
Author: lipenghui <[email protected]>
AuthorDate: Tue Jun 28 09:11:47 2022 +0800
[improve][broker] Reduce the consumers list sort by priority level (#16243)
### Motivation
While create many consumers (> 10000), the IO thread run into BLOCK state
for long time which will
affect the message publish and subsequent consumer creation.
```
"pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s
tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry
[0x0000700019642000]
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207)
- waiting to lock <0x0000100015823488> (a
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
at
org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniApplyNow([email protected]/CompletableFuture.java:684)
at
java.util.concurrent.CompletableFuture.uniApplyStage([email protected]/CompletableFuture.java:662)
at
java.util.concurrent.CompletableFuture.thenApply([email protected]/CompletableFuture.java:2168)
at
org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
at
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:833)
```
```
"pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s
tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000]
java.lang.Thread.State: RUNNABLE
at
java.util.TimSort.countRunAndMakeAscending([email protected]/TimSort.java:360)
at java.util.TimSort.sort([email protected]/TimSort.java:234)
at java.util.Arrays.sort([email protected]/Arrays.java:1307)
at
java.util.concurrent.CopyOnWriteArrayList.sortRange([email protected]/CopyOnWriteArrayList.java:896)
at
java.util.concurrent.CopyOnWriteArrayList.sort([email protected]/CopyOnWriteArrayList.java:888)
- locked <0x00001000158237d8> (a java.lang.Object)
at
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159)
- locked <0x0000100015830888> (a
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers)
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287)
- locked <0x0000100015823488> (a
org.apache.pulsar.broker.service.persistent.PersistentSubscription)
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at
org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206)
at
org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698)
at
org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniComposeStage([email protected]/CompletableFuture.java:1187)
at
java.util.concurrent.CompletableFuture.thenCompose([email protected]/CompletableFuture.java:2309)
at
org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042)
at
org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown
Source)
at
java.util.concurrent.CompletableFuture.uniApplyNow([email protected]/CompletableFuture.java:684)
at
java.util.concurrent.CompletableFuture.uniApplyStage([email protected]/CompletableFuture.java:662)
at
java.util.concurrent.CompletableFuture.thenApply([email protected]/CompletableFuture.java:2168)
at
org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984)
at
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
at
io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run([email protected]/Thread.java:833)
```
### Modification
- Sort the consumer list only if the new consumer with high priority than
the last element in the consumer list,
this can avoid the sort operation for all the consumers without priority
level (the client-side always pass 0 if priority level absent).
(cherry picked from commit 291fedcd2dc2781ef5a7ea2a6d3d653aa882eb3a)
---
.../service/persistent/PersistentDispatcherMultipleConsumers.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a36b1eded3d..02fc8050763 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -159,7 +159,10 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
consumerList.add(consumer);
- consumerList.sort(Comparator.comparingInt(Consumer::getPriorityLevel));
+ if (consumerList.size() > 1
+ && consumer.getPriorityLevel() <
consumerList.get(consumerList.size() - 2).getPriorityLevel()) {
+
consumerList.sort(Comparator.comparingInt(Consumer::getPriorityLevel));
+ }
consumerSet.add(consumer);
}