This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 4392da6db7b [fix][client] Fix DeadLetterProducer creation callback
blocking client io thread. (#19930)
4392da6db7b is described below
commit 4392da6db7b38b3136d25f4e51454cc4d726a29b
Author: lifepuzzlefun <[email protected]>
AuthorDate: Wed Apr 5 19:40:24 2023 +0800
[fix][client] Fix DeadLetterProducer creation callback blocking client io
thread. (#19930)
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3d0f70dc40e..df55f4ff966 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -652,7 +652,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (reconsumetimes >
this.deadLetterPolicy.getMaxRedeliverCount()
&&
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
initDeadLetterProducerIfNeeded();
- deadLetterProducer.thenAccept(dlqProducer -> {
+ deadLetterProducer.thenAcceptAsync(dlqProducer -> {
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
.value(retryMessage.getData())
@@ -668,7 +668,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
result.completeExceptionally(ex);
return null;
});
- }).exceptionally(ex -> {
+ }, internalPinnedExecutor).exceptionally(ex -> {
result.completeExceptionally(ex);
deadLetterProducer = null;
return null;
@@ -2030,7 +2030,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return null;
});
}
- }).exceptionally(ex -> {
+ }, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}",
deadLetterPolicy.getDeadLetterTopic(), ex);
deadLetterProducer = null;
result.complete(false);