RobertIndie opened a new issue, #21888:
URL: https://github.com/apache/pulsar/issues/21888

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   
[c834feb](https://github.com/apache/pulsar/commit/c834feb459369f497c68cd42dbc1625b11551f72)
   
   ### Minimal reproduce step
   
   Run the test: 
[testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers](https://github.com/apache/pulsar/blob/c87cfb3f50b32f5c032b5d1c3a6a0b91cde2bbe9/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java#L872)
   
   
   
   ### What did you expect to see?
   
   The test should be passed without any exceptions.
   
   ### What did you see instead?
   
   Even though the test is passed, It will throw unexpected execeptions:
   ```
   2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] 
- Dead letter producer exception with topic: 
persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ
   java.util.concurrent.CompletionException: 
org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: 
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException:
 Producer with name 
'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is 
already connected to topic","reqId":2964198371560761575, 
"remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846)
 ~[classes/:?]
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) 
~[classes/:?]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192)
 ~[classes/:?]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
 ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
 ~[netty-handler-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at java.lang.Thread.run(Thread.java:1583) ~[?:?]
   Caused by: 
org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: 
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException:
 Producer with name 
'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is 
already connected to topic","reqId":2964198371560761575, 
"remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
        at 
org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) 
~[classes/:?]
        ... 26 more
   2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] 
- Dead letter producer exception with topic: 
persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ
   java.util.concurrent.CompletionException: 
org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: 
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException:
 Producer with name 
'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is 
already connected to topic","reqId":2964198371560761575, 
"remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846)
 ~[classes/:?]
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) 
~[classes/:?]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192)
 ~[classes/:?]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
 ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
 ~[netty-handler-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at java.lang.Thread.run(Thread.java:1583) ~[?:?]
   Caused by: 
org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: 
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException:
 Producer with name 
'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is 
already connected to topic","reqId":2964198371560761575, 
"remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
        at 
org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) 
~[classes/:?]
        ... 26 more
   2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] 
- Dead letter producer exception with topic: 
persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ
   java.util.concurrent.CompletionException: 
org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: 
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException:
 Producer with name 
'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is 
already connected to topic","reqId":2964198371560761575, 
"remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846)
 ~[classes/:?]
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) 
~[classes/:?]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192)
 ~[classes/:?]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
 ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
 ~[netty-handler-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at java.lang.Thread.run(Thread.java:1583) ~[?:?]
   Caused by: 
org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: 
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException:
 Producer with name 
'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is 
already connected to topic","reqId":2964198371560761575, 
"remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
        at 
org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) 
~[classes/:?]
        ... 26 more
   2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] 
- Dead letter producer exception with topic: 
persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ
   java.util.concurrent.CompletionException: 
org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: 
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException:
 Producer with name 
'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is 
already connected to topic","reqId":2964198371560761575, 
"remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846)
 ~[classes/:?]
        at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974)
 ~[?:?]
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
 ~[?:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) 
~[classes/:?]
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192)
 ~[classes/:?]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
 ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
 ~[netty-handler-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 
~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 
~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
        at java.lang.Thread.run(Thread.java:1583) ~[?:?]
   Caused by: 
org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: 
{"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException:
 Producer with name 
'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is 
already connected to topic","reqId":2964198371560761575, 
"remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
        at 
org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) 
~[classes/:?]
        ... 26 more
   ```
   
   ### Anything else?
   
   The root cause is that a regression bug is introduced in 
https://github.com/apache/pulsar/pull/21589. The producer name will be 
conflicted when multiple consumers in the same topic and subscription send 
messages to DLQ concurrently.
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to