RobertIndie opened a new issue, #21888: URL: https://github.com/apache/pulsar/issues/21888
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version [c834feb](https://github.com/apache/pulsar/commit/c834feb459369f497c68cd42dbc1625b11551f72) ### Minimal reproduce step Run the test: [testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers](https://github.com/apache/pulsar/blob/c87cfb3f50b32f5c032b5d1c3a6a0b91cde2bbe9/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java#L872) ### What did you expect to see? The test should be passed without any exceptions. ### What did you see instead? Even though the test is passed, It will throw unexpected execeptions: ``` 2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] - Dead letter producer exception with topic: persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"} at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?] at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846) ~[classes/:?] at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?] at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?] at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) ~[classes/:?] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192) ~[classes/:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at java.lang.Thread.run(Thread.java:1583) ~[?:?] Caused by: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"} at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318) ~[classes/:?] at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) ~[classes/:?] ... 26 more 2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] - Dead letter producer exception with topic: persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"} at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?] at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846) ~[classes/:?] at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?] at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?] at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) ~[classes/:?] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192) ~[classes/:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at java.lang.Thread.run(Thread.java:1583) ~[?:?] Caused by: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"} at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318) ~[classes/:?] at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) ~[classes/:?] ... 26 more 2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] - Dead letter producer exception with topic: persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"} at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?] at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846) ~[classes/:?] at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?] at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?] at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) ~[classes/:?] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192) ~[classes/:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at java.lang.Thread.run(Thread.java:1583) ~[?:?] Caused by: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"} at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318) ~[classes/:?] at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) ~[classes/:?] ... 26 more 2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] - Dead letter producer exception with topic: persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"} at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?] at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?] at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846) ~[classes/:?] at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?] at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?] at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) ~[classes/:?] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192) ~[classes/:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final] at java.lang.Thread.run(Thread.java:1583) ~[?:?] Caused by: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"} at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318) ~[classes/:?] at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) ~[classes/:?] ... 26 more ``` ### Anything else? The root cause is that a regression bug is introduced in https://github.com/apache/pulsar/pull/21589. The producer name will be conflicted when multiple consumers in the same topic and subscription send messages to DLQ concurrently. ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
