This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new bbf52cf23db [fix][client] Fix DeadLetterProducer creation callback 
blocking client io thread. (#19930)
bbf52cf23db is described below

commit bbf52cf23db10d934894985ca13dd2b90460c540
Author: lifepuzzlefun <[email protected]>
AuthorDate: Wed Apr 5 19:40:24 2023 +0800

    [fix][client] Fix DeadLetterProducer creation callback blocking client io 
thread. (#19930)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8c3a400ea44..c7af6d3589c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -688,7 +688,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (reconsumetimes > 
this.deadLetterPolicy.getMaxRedeliverCount()
                         && 
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
                     initDeadLetterProducerIfNeeded();
-                    deadLetterProducer.thenAccept(dlqProducer -> {
+                    deadLetterProducer.thenAcceptAsync(dlqProducer -> {
                         TypedMessageBuilder<byte[]> typedMessageBuilderNew =
                                 
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
                                         .value(retryMessage.getData())
@@ -704,7 +704,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             result.completeExceptionally(ex);
                             return null;
                         });
-                    }).exceptionally(ex -> {
+                    }, internalPinnedExecutor).exceptionally(ex -> {
                         result.completeExceptionally(ex);
                         deadLetterProducer = null;
                         return null;
@@ -2089,7 +2089,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 return null;
                     });
                 }
-            }).exceptionally(ex -> {
+            }, internalPinnedExecutor).exceptionally(ex -> {
                 log.error("Dead letter producer exception with topic: {}", 
deadLetterPolicy.getDeadLetterTopic(), ex);
                 deadLetterProducer = null;
                 result.complete(false);

Reply via email to