hecter-java-architect opened a new issue, #20924: URL: https://github.com/apache/pulsar/issues/20924
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version 3.0.0 ### Minimal reproduce step ` 2023-07-28T09:13:34,703+0800 [pulsar-io-12-13] WARN org.apache.pulsar.broker.service.ServerCnx - [/10.65.17.39:65308] Got exception java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2 at java.base/java.util.concurrent.CopyOnWriteArrayList.elementAt(CopyOnWriteArrayList.java:385) at java.base/java.util.concurrent.CopyOnWriteArrayList.get(CopyOnWriteArrayList.java:398) at org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers.getNextConsumerFromSameOrLowerLevel(AbstractDispatcherMultipleConsumers.java:192) at org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers.getNextConsumer(AbstractDispatcherMultipleConsumers.java:137) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers.sendMessages(NonPersistentDispatcherMultipleConsumers.java:188) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$2(NonPersistentTopic.java:202) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:554) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:277) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:196) at org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:281) at org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:194) at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1733) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833) ` multi-thread invoke ` org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers#getNextConsumer ` Even if consumerList.size() is checked first, there are still thread safety issues between consumerList.size() and consumerList.get() ` if (currentConsumerRoundRobinIndex >= consumerList.size()) { currentConsumerRoundRobinIndex = 0; } int currentRoundRobinConsumerPriority = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel(); ` ### What did you expect to see? Like persistent methods, it is possible to access this method in a thread-safe manner or make the method itself safe org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers ` protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) { if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); } ` ### What did you see instead? @Override public void sendMessages(List<Entry> entries) { Consumer consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? getNextConsumer() : null; org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers#sendMessages ### Anything else? _No response_ ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
