This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new bbf52cf23db [fix][client] Fix DeadLetterProducer creation callback
blocking client io thread. (#19930)
bbf52cf23db is described below
commit bbf52cf23db10d934894985ca13dd2b90460c540
Author: lifepuzzlefun <[email protected]>
AuthorDate: Wed Apr 5 19:40:24 2023 +0800
[fix][client] Fix DeadLetterProducer creation callback blocking client io
thread. (#19930)
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8c3a400ea44..c7af6d3589c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -688,7 +688,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
if (reconsumetimes >
this.deadLetterPolicy.getMaxRedeliverCount()
&&
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
initDeadLetterProducerIfNeeded();
- deadLetterProducer.thenAccept(dlqProducer -> {
+ deadLetterProducer.thenAcceptAsync(dlqProducer -> {
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
.value(retryMessage.getData())
@@ -704,7 +704,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
result.completeExceptionally(ex);
return null;
});
- }).exceptionally(ex -> {
+ }, internalPinnedExecutor).exceptionally(ex -> {
result.completeExceptionally(ex);
deadLetterProducer = null;
return null;
@@ -2089,7 +2089,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return null;
});
}
- }).exceptionally(ex -> {
+ }, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}",
deadLetterPolicy.getDeadLetterTopic(), ex);
deadLetterProducer = null;
result.complete(false);