This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 4392da6db7b [fix][client] Fix DeadLetterProducer creation callback 
blocking client io thread. (#19930)
4392da6db7b is described below

commit 4392da6db7b38b3136d25f4e51454cc4d726a29b
Author: lifepuzzlefun <[email protected]>
AuthorDate: Wed Apr 5 19:40:24 2023 +0800

    [fix][client] Fix DeadLetterProducer creation callback blocking client io 
thread. (#19930)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java   | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3d0f70dc40e..df55f4ff966 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -652,7 +652,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 if (reconsumetimes > 
this.deadLetterPolicy.getMaxRedeliverCount()
                         && 
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
                     initDeadLetterProducerIfNeeded();
-                    deadLetterProducer.thenAccept(dlqProducer -> {
+                    deadLetterProducer.thenAcceptAsync(dlqProducer -> {
                         TypedMessageBuilder<byte[]> typedMessageBuilderNew =
                                 
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
                                         .value(retryMessage.getData())
@@ -668,7 +668,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             result.completeExceptionally(ex);
                             return null;
                         });
-                    }).exceptionally(ex -> {
+                    }, internalPinnedExecutor).exceptionally(ex -> {
                         result.completeExceptionally(ex);
                         deadLetterProducer = null;
                         return null;
@@ -2030,7 +2030,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                 return null;
                     });
                 }
-            }).exceptionally(ex -> {
+            }, internalPinnedExecutor).exceptionally(ex -> {
                 log.error("Dead letter producer exception with topic: {}", 
deadLetterPolicy.getDeadLetterTopic(), ex);
                 deadLetterProducer = null;
                 result.complete(false);

Reply via email to