This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3adb59c51b3 [fix][client] Prevent retry topic and dead letter topic 
producer leaks when sending of message fails (#23824)
3adb59c51b3 is described below

commit 3adb59c51b3e28ee6fa003959612ce3e914dd145
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 9 11:10:49 2025 -0800

    [fix][client] Prevent retry topic and dead letter topic producer leaks when 
sending of message fails (#23824)
    
    (cherry picked from commit 04e89fe2d841246e655bf875ba52cda2c2de0e3d)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     |  93 ++++++
 .../apache/pulsar/client/api/RetryTopicTest.java   | 166 ++++++++---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 315 +++++++++++++--------
 .../client/impl/MultiTopicsConsumerImpl.java       |   9 +-
 .../src/main/resources/findbugsExclude.xml         |   4 +
 5 files changed, 423 insertions(+), 164 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index e46fddeacc1..ab26949c04f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -40,9 +41,11 @@ import java.util.regex.Pattern;
 import lombok.Cleanup;
 import lombok.Data;
 import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1167,4 +1170,94 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         consumerBuilder.loadConf(config);
         
assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(),
 policy);
     }
+
+    @Data
+    static class Payload {
+        String number;
+
+        public Payload() {
+
+        }
+
+        public Payload(String number) {
+            this.number = number;
+        }
+    }
+
+    @Data
+    static class PayloadIncompatible {
+        long number;
+
+        public PayloadIncompatible() {
+
+        }
+
+        public PayloadIncompatible(long number) {
+            this.number = number;
+        }
+    }
+
+    // reproduce issue reported in 
https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
+    @Test
+    public void 
testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws 
Exception {
+        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+        admin.namespaces().createNamespace(namespace);
+        // don't enforce schema validation
+        admin.namespaces().setSchemaValidationEnforced(namespace, false);
+        // set schema compatibility strategy to always compatible
+        admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        Schema<Payload> schema = Schema.AVRO(Payload.class);
+        Schema<PayloadIncompatible> schemaIncompatible = 
Schema.AVRO(PayloadIncompatible.class);
+        String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+                        + 
"/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
+        String dlqTopic = topic + "-DLQ";
+
+        // create topics
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createNonPartitionedTopic(dlqTopic);
+
+        AtomicInteger nackCounter = new AtomicInteger(0);
+        Consumer<Payload> payloadConsumer = null;
+        try {
+            payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
+                    
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
+                    .ackTimeout(1, TimeUnit.SECONDS)
+                    .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
+                    
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build())
+                    .messageListener((c, msg) -> {
+                        if (nackCounter.incrementAndGet() < 10) {
+                            c.negativeAcknowledge(msg);
+                        }
+                    }).subscribe();
+
+            // send a message to the topic with the incompatible schema
+            PayloadIncompatible payloadIncompatible = new 
PayloadIncompatible(123);
+            try (Producer<PayloadIncompatible> producer = 
pulsarClient.newProducer(schemaIncompatible).topic(topic)
+                    .create()) {
+                producer.send(payloadIncompatible);
+            }
+
+            Thread.sleep(2000L);
+
+            
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
+                    .describedAs("producer count of dlq topic %s should be <= 
1 so that it doesn't leak producers",
+                            dlqTopic)
+                    .isLessThanOrEqualTo(1);
+
+        } finally {
+            if (payloadConsumer != null) {
+                try {
+                    payloadConsumer.close();
+                } catch (PulsarClientException e) {
+                    // ignore
+                }
+            }
+        }
+
+        
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
+                .describedAs("producer count of dlq topic %s should be 0 here",
+                        dlqTopic)
+                .isEqualTo(0);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index cd598585c8e..91b97fa4758 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pulsar.client.api;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -36,11 +36,10 @@ import lombok.Cleanup;
 import lombok.Data;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.util.RetryMessageUtil;
-import org.reflections.ReflectionUtils;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -617,10 +616,12 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
     @Test(timeOut = 30000L)
     public void testRetryTopicException() throws Exception {
-        final String topic = "persistent://my-property/my-ns/retry-topic";
+        String retryLetterTopic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
         final int maxRedeliveryCount = 2;
         final int sendMessages = 1;
         // subscribe before publish
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName("my-subscription")
@@ -629,7 +630,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 .receiverQueueSize(100)
                 .deadLetterPolicy(DeadLetterPolicy.builder()
                         .maxRedeliverCount(maxRedeliveryCount)
-                        
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .retryLetterTopic(retryLetterTopic)
                         .build())
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -642,30 +643,16 @@ public class RetryTopicTest extends ProducerConsumerBase {
         }
         producer.close();
 
-        // mock a retry producer exception when reconsumelater is called
-        MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = 
(MultiTopicsConsumerImpl<byte[]>) consumer;
-        List<ConsumerImpl<byte[]>> consumers = 
multiTopicsConsumer.getConsumers();
-        for (ConsumerImpl<byte[]> c : consumers) {
-            Set<Field> deadLetterPolicyField =
-                    ReflectionUtils.getAllFields(c.getClass(), 
ReflectionUtils.withName("deadLetterPolicy"));
-
-            if (deadLetterPolicyField.size() != 0) {
-                Field field = deadLetterPolicyField.iterator().next();
-                field.setAccessible(true);
-                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) 
field.get(c);
-                
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
-            }
-        }
+        admin.topics().terminateTopic(retryLetterTopic);
+
         Message<byte[]> message = consumer.receive();
         log.info("consumer received message : {} {}", message.getMessageId(), 
new String(message.getData()));
         try {
             consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
-        } catch (PulsarClientException.InvalidTopicNameException e) {
-            assertEquals(e.getClass(), 
PulsarClientException.InvalidTopicNameException.class);
-        } catch (Exception e) {
-            fail("exception should be 
PulsarClientException.InvalidTopicNameException");
+            fail("exception should be 
PulsarClientException.TopicTerminatedException");
+        } catch (PulsarClientException.TopicTerminatedException e) {
+            // ok
         }
-        consumer.close();
     }
 
 
@@ -718,10 +705,12 @@ public class RetryTopicTest extends ProducerConsumerBase {
 
     @Test(timeOut = 30000L)
     public void testRetryTopicExceptionWithConcurrent() throws Exception {
-        final String topic = "persistent://my-property/my-ns/retry-topic";
+        String retryLetterTopic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
         final int maxRedeliveryCount = 2;
         final int sendMessages = 10;
         // subscribe before publish
+        @Cleanup
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
                 .topic(topic)
                 .subscriptionName("my-subscription")
@@ -730,7 +719,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 .receiverQueueSize(100)
                 .deadLetterPolicy(DeadLetterPolicy.builder()
                         .maxRedeliverCount(maxRedeliveryCount)
-                        
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .retryLetterTopic(retryLetterTopic)
                         .build())
                 
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .subscribe();
@@ -739,24 +728,11 @@ public class RetryTopicTest extends ProducerConsumerBase {
                 .topic(topic)
                 .create();
         for (int i = 0; i < sendMessages; i++) {
-            producer.newMessage().key("1").value(String.format("Hello Pulsar 
[%d]", i).getBytes()).send();
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
         }
         producer.close();
 
-        // mock a retry producer exception when reconsumelater is called
-        MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = 
(MultiTopicsConsumerImpl<byte[]>) consumer;
-        List<ConsumerImpl<byte[]>> consumers = 
multiTopicsConsumer.getConsumers();
-        for (ConsumerImpl<byte[]> c : consumers) {
-            Set<Field> deadLetterPolicyField =
-                    ReflectionUtils.getAllFields(c.getClass(), 
ReflectionUtils.withName("deadLetterPolicy"));
-
-            if (deadLetterPolicyField.size() != 0) {
-                Field field = deadLetterPolicyField.iterator().next();
-                field.setAccessible(true);
-                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) 
field.get(c);
-                
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
-            }
-        }
+        admin.topics().terminateTopic(retryLetterTopic);
 
         List<Message<byte[]>> messages = Lists.newArrayList();
         for (int i = 0; i < sendMessages; i++) {
@@ -769,16 +745,114 @@ public class RetryTopicTest extends ProducerConsumerBase 
{
             new Thread(() -> {
                 try {
                     consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
-                } catch (Exception ignore) {
-
-                } finally {
+                } catch (PulsarClientException.TopicTerminatedException e) {
+                    // ok
                     latch.countDown();
+                } catch (PulsarClientException e) {
+                    // unexpected exception
+                    fail("unexpected exception", e);
                 }
             }).start();
         }
 
-        latch.await();
+        latch.await(sendMessages, TimeUnit.SECONDS);
         consumer.close();
     }
 
+    @Data
+    static class Payload {
+        String number;
+
+        public Payload() {
+
+        }
+
+        public Payload(String number) {
+            this.number = number;
+        }
+    }
+
+    @Data
+    static class PayloadIncompatible {
+        long number;
+
+        public PayloadIncompatible() {
+
+        }
+
+        public PayloadIncompatible(long number) {
+            this.number = number;
+        }
+    }
+
+    // reproduce similar issue as reported in 
https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
+    // but for retry topic
+    @Test
+    public void 
testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws 
Exception {
+        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+        admin.namespaces().createNamespace(namespace);
+        // don't enforce schema validation
+        admin.namespaces().setSchemaValidationEnforced(namespace, false);
+        // set schema compatibility strategy to always compatible
+        admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+        Schema<Payload> schema = Schema.AVRO(Payload.class);
+        Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
+                PayloadIncompatible.class);
+        String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+                + 
"/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
+        String dlqTopic = topic + "-DLQ";
+        String retryTopic = topic + "-RETRY";
+
+        // create topics
+        admin.topics().createNonPartitionedTopic(topic);
+        admin.topics().createNonPartitionedTopic(dlqTopic);
+        admin.topics().createNonPartitionedTopic(retryTopic);
+
+        Consumer<Payload> payloadConsumer = null;
+        try {
+            payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
+                    
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
+                    .ackTimeout(1, TimeUnit.SECONDS)
+                    .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
+                    .enableRetry(true)
+                    
.deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3)
+                            .deadLetterTopic(dlqTopic).build())
+                    .messageListener((c, msg) -> {
+                        try {
+                            c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS);
+                        } catch (PulsarClientException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }).subscribe();
+
+            // send a message to the topic with the incompatible schema
+            PayloadIncompatible payloadIncompatible = new 
PayloadIncompatible(123);
+            try (Producer<PayloadIncompatible> producer = 
pulsarClient.newProducer(schemaIncompatible).topic(topic)
+                    .create()) {
+                producer.send(payloadIncompatible);
+            }
+
+            Thread.sleep(2000L);
+
+            
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
+                    .describedAs("producer count of retry topic %s should be 
<= 1 so that it doesn't leak producers",
+                            retryTopic)
+                    .isLessThanOrEqualTo(1);
+
+        } finally {
+            if (payloadConsumer != null) {
+                try {
+                    payloadConsumer.close();
+                } catch (PulsarClientException e) {
+                    // ignore
+                }
+            }
+        }
+
+        
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
+                .describedAs("producer count of retry topic %s should be 0 
here",
+                        retryTopic)
+                .isEqualTo(0);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 86af4bdaf58..77a91a944ee 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -202,8 +203,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private final DeadLetterPolicy deadLetterPolicy;
 
     private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
-
+    private volatile int deadLetterProducerFailureCount;
     private volatile CompletableFuture<Producer<byte[]>> retryLetterProducer;
+    private volatile int retryLetterProducerFailureCount;
     private final ReadWriteLock createProducerLock = new 
ReentrantReadWriteLock();
 
     protected volatile boolean paused;
@@ -682,9 +684,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return FutureUtil.failedFuture(exception);
         }
 
-        initRetryLetterProducerIfNeeded();
         CompletableFuture<Void> result = new CompletableFuture<>();
-        if (retryLetterProducer != null) {
+        if (initRetryLetterProducerIfNeeded() != null) {
             try {
                 MessageImpl<T> retryMessage = (MessageImpl<T>) 
getMessageImpl(message);
                 String originMessageIdStr = message.getMessageId().toString();
@@ -707,52 +708,61 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 MessageId finalMessageId = messageId;
                 if (reconsumeTimes > 
this.deadLetterPolicy.getMaxRedeliverCount()
                         && 
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
-                    initDeadLetterProducerIfNeeded();
-                    deadLetterProducer.thenAcceptAsync(dlqProducer -> {
-                        TypedMessageBuilder<byte[]> typedMessageBuilderNew =
-                                
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
-                                        .value(retryMessage.getData())
-                                        .properties(propertiesMap);
-                        copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
-                        typedMessageBuilderNew.sendAsync().thenAccept(msgId -> 
{
-                            consumerDlqMessagesCounter.increment();
-
-                            doAcknowledge(finalMessageId, ackType, 
Collections.emptyMap(), null).thenAccept(v -> {
-                                result.complete(null);
+                    
initDeadLetterProducerIfNeeded().thenAcceptAsync(dlqProducer -> {
+                        try {
+                            TypedMessageBuilder<byte[]> typedMessageBuilderNew 
=
+                                    dlqProducer.newMessage(
+                                                    
Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
+                                            .value(retryMessage.getData())
+                                            .properties(propertiesMap);
+                            copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
+                            
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
+                                consumerDlqMessagesCounter.increment();
+
+                                doAcknowledge(finalMessageId, ackType, 
Collections.emptyMap(), null).thenAccept(v -> {
+                                    result.complete(null);
+                                }).exceptionally(ex -> {
+                                    result.completeExceptionally(ex);
+                                    return null;
+                                });
                             }).exceptionally(ex -> {
                                 result.completeExceptionally(ex);
                                 return null;
                             });
-                        }).exceptionally(ex -> {
-                            result.completeExceptionally(ex);
-                            return null;
-                        });
+                        } catch (Exception e) {
+                            result.completeExceptionally(e);
+                        }
                     }, internalPinnedExecutor).exceptionally(ex -> {
                         result.completeExceptionally(ex);
-                        deadLetterProducer = null;
                         return null;
                     });
                 } else {
                     assert retryMessage != null;
-                    retryLetterProducer.thenAcceptAsync(rtlProducer -> {
-                        TypedMessageBuilder<byte[]> typedMessageBuilderNew = 
rtlProducer
-                                
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
-                                .value(retryMessage.getData())
-                                .properties(propertiesMap);
-                        if (delayTime > 0) {
-                            typedMessageBuilderNew.deliverAfter(delayTime, 
unit);
+                    
initRetryLetterProducerIfNeeded().thenAcceptAsync(rtlProducer -> {
+                        try {
+                            TypedMessageBuilder<byte[]> typedMessageBuilderNew 
= rtlProducer
+                                    
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                                    .value(retryMessage.getData())
+                                    .properties(propertiesMap);
+                            if (delayTime > 0) {
+                                typedMessageBuilderNew.deliverAfter(delayTime, 
unit);
+                            }
+                            copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
+                            typedMessageBuilderNew.sendAsync()
+                                    .thenCompose(
+                                            __ -> 
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
+                                    .thenAccept(v -> {
+                                        result.complete(null);
+                                    })
+                                    .exceptionally(ex -> {
+                                        result.completeExceptionally(ex);
+                                        return null;
+                                    });
+                        } catch (Exception e) {
+                            result.completeExceptionally(e);
                         }
-                        copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
-                        typedMessageBuilderNew.sendAsync()
-                                .thenCompose(__ -> 
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
-                                .thenAccept(v -> result.complete(null))
-                                .exceptionally(ex -> {
-                                    result.completeExceptionally(ex);
-                                    return null;
-                                });
                     }, internalPinnedExecutor).exceptionally(ex -> {
                         result.completeExceptionally(ex);
-                        retryLetterProducer = null;
                         return null;
                     });
                 }
@@ -1099,10 +1109,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     public synchronized CompletableFuture<Void> closeAsync() {
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
 
+        ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
+        closeFutures.add(closeFuture);
+        if (retryLetterProducer != null) {
+            closeFutures.add(retryLetterProducer.thenCompose(p -> 
p.closeAsync()).whenComplete((ignore, ex) -> {
+                if (ex != null) {
+                    log.warn("Exception ignored in closing retryLetterProducer 
of consumer", ex);
+                }
+            }));
+        }
+        if (deadLetterProducer != null) {
+            closeFutures.add(deadLetterProducer.thenCompose(p -> 
p.closeAsync()).whenComplete((ignore, ex) -> {
+                if (ex != null) {
+                    log.warn("Exception ignored in closing deadLetterProducer 
of consumer", ex);
+                }
+            }));
+        }
+        CompletableFuture<Void> compositeCloseFuture = 
FutureUtil.waitForAll(closeFutures);
+
+
         if (getState() == State.Closing || getState() == State.Closed) {
             closeConsumerTasks();
             failPendingReceive().whenComplete((r, t) -> 
closeFuture.complete(null));
-            return closeFuture;
+            return compositeCloseFuture;
         }
 
         consumersClosedCounter.increment();
@@ -1114,7 +1143,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             deregisterFromClientCnx();
             client.cleanupConsumer(this);
             failPendingReceive().whenComplete((r, t) -> 
closeFuture.complete(null));
-            return closeFuture;
+            return compositeCloseFuture;
         }
 
         stats.getStatTimeout().ifPresent(Timeout::cancel);
@@ -1141,23 +1170,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             });
         }
 
-        ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
-        closeFutures.add(closeFuture);
-        if (retryLetterProducer != null) {
-            closeFutures.add(retryLetterProducer.thenCompose(p -> 
p.closeAsync()).whenComplete((ignore, ex) -> {
-                if (ex != null) {
-                    log.warn("Exception ignored in closing retryLetterProducer 
of consumer", ex);
-                }
-            }));
-        }
-        if (deadLetterProducer != null) {
-            closeFutures.add(deadLetterProducer.thenCompose(p -> 
p.closeAsync()).whenComplete((ignore, ex) -> {
-                if (ex != null) {
-                    log.warn("Exception ignored in closing deadLetterProducer 
of consumer", ex);
-                }
-            }));
-        }
-        return FutureUtil.waitForAll(closeFutures);
+        return compositeCloseFuture;
     }
 
     private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable 
exception) {
@@ -2216,47 +2229,54 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
         CompletableFuture<Boolean> result = new CompletableFuture<>();
         if (deadLetterMessages != null) {
-            initDeadLetterProducerIfNeeded();
             List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
-            deadLetterProducer.thenAcceptAsync(producerDLQ -> {
+            initDeadLetterProducerIfNeeded().thenAcceptAsync(producerDLQ -> {
                 for (MessageImpl<T> message : finalDeadLetterMessages) {
-                    String originMessageIdStr = 
message.getMessageId().toString();
-                    String originTopicNameStr = getOriginTopicNameStr(message);
-                    TypedMessageBuilder<byte[]> typedMessageBuilderNew =
-                            
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
-                            .value(message.getData())
-                            .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
-                    copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
-                    typedMessageBuilderNew.sendAsync()
-                            .thenAccept(messageIdInDLQ -> {
-                                
possibleSendToDeadLetterTopicMessages.remove(messageId);
-                                acknowledgeAsync(messageId).whenComplete((v, 
ex) -> {
-                                    if (ex != null) {
-                                        log.warn("[{}] [{}] [{}] Failed to 
acknowledge the message {} of the original"
-                                                        + " topic but send to 
the DLQ successfully.",
-                                                topicName, subscription, 
consumerName, messageId, ex);
-                                        result.complete(false);
+                    try {
+                        String originMessageIdStr = 
message.getMessageId().toString();
+                        String originTopicNameStr = 
getOriginTopicNameStr(message);
+                        TypedMessageBuilder<byte[]> typedMessageBuilderNew =
+                                
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                                        .value(message.getData())
+                                        .properties(getPropertiesMap(message, 
originMessageIdStr, originTopicNameStr));
+                        copyMessageKeysIfNeeded(message, 
typedMessageBuilderNew);
+                        typedMessageBuilderNew.sendAsync()
+                                .thenAccept(messageIdInDLQ -> {
+                                    
possibleSendToDeadLetterTopicMessages.remove(messageId);
+                                    
acknowledgeAsync(messageId).whenComplete((v, ex) -> {
+                                        if (ex != null) {
+                                            log.warn(
+                                                    "[{}] [{}] [{}] Failed to 
acknowledge the message {} of the "
+                                                            + "original topic 
but send to the DLQ successfully.",
+                                                    topicName, subscription, 
consumerName, messageId, ex);
+                                            result.complete(false);
+                                        } else {
+                                            result.complete(true);
+                                        }
+                                    });
+                                }).exceptionally(ex -> {
+                                    if (ex instanceof 
PulsarClientException.ProducerQueueIsFullError) {
+                                        log.warn(
+                                                "[{}] [{}] [{}] Failed to send 
DLQ message to {} for message id {}: {}",
+                                                topicName, subscription, 
consumerName,
+                                                
deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage());
                                     } else {
-                                        result.complete(true);
+                                        log.warn("[{}] [{}] [{}] Failed to 
send DLQ message to {} for message id {}",
+                                                topicName, subscription, 
consumerName,
+                                                
deadLetterPolicy.getDeadLetterTopic(), messageId, ex);
                                     }
+                                    result.complete(false);
+                                    return null;
                                 });
-                            }).exceptionally(ex -> {
-                                if (ex instanceof 
PulsarClientException.ProducerQueueIsFullError) {
-                                    log.warn("[{}] [{}] [{}] Failed to send 
DLQ message to {} for message id {}: {}",
-                                            topicName, subscription, 
consumerName,
-                                            
deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage());
-                                } else {
-                                    log.warn("[{}] [{}] [{}] Failed to send 
DLQ message to {} for message id {}",
-                                            topicName, subscription, 
consumerName,
-                                            
deadLetterPolicy.getDeadLetterTopic(), messageId, ex);
-                                }
-                                result.complete(false);
-                                return null;
-                    });
+                    } catch (Exception e) {
+                        log.warn("[{}] [{}] [{}] Failed to send DLQ message to 
{} for message id {}",
+                                topicName, subscription, consumerName, 
deadLetterPolicy.getDeadLetterTopic(), messageId,
+                                e);
+                        result.complete(false);
+                    }
                 }
             }, internalPinnedExecutor).exceptionally(ex -> {
                 log.error("Dead letter producer exception with topic: {}", 
deadLetterPolicy.getDeadLetterTopic(), ex);
-                deadLetterProducer = null;
                 result.complete(false);
                 return null;
             });
@@ -2266,51 +2286,112 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return result;
     }
 
-    private void initDeadLetterProducerIfNeeded() {
-        if (deadLetterProducer == null) {
+    private CompletableFuture<Producer<byte[]>> 
initDeadLetterProducerIfNeeded() {
+        CompletableFuture<Producer<byte[]>> p = deadLetterProducer;
+        if (p == null || p.isCompletedExceptionally()) {
             createProducerLock.writeLock().lock();
             try {
-                if (deadLetterProducer == null) {
-                    deadLetterProducer =
-                            ((ProducerBuilderImpl<byte[]>) 
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
-                                    
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
-                                    
.topic(this.deadLetterPolicy.getDeadLetterTopic())
-                                    
.producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName, 
this.subscription,
-                                            this.consumerName, 
RandomStringUtils.randomAlphanumeric(5)))
-                                    .blockIfQueueFull(false)
-                                    .enableBatching(false)
-                                    .enableChunking(true)
-                                    .createAsync();
-                    deadLetterProducer.thenAccept(dlqProducer -> {
-                        
stats.setDeadLetterProducerStats(dlqProducer.getStats());
-                    });
+                p = deadLetterProducer;
+                if (p == null || p.isCompletedExceptionally()) {
+                    p = createProducerWithBackOff(() -> {
+                        CompletableFuture<Producer<byte[]>> newProducer =
+                                ((ProducerBuilderImpl<byte[]>) 
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
+                                        
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
+                                        
.topic(this.deadLetterPolicy.getDeadLetterTopic())
+                                        .producerName(
+                                                
String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription,
+                                                        this.consumerName, 
RandomStringUtils.randomAlphanumeric(5)))
+                                        .blockIfQueueFull(false)
+                                        .enableBatching(false)
+                                        .enableChunking(true)
+                                        .createAsync();
+                        newProducer.whenComplete((producer, ex) -> {
+                            if (ex != null) {
+                                log.error("[{}] [{}] [{}] Failed to create 
dead letter producer for topic {}",
+                                        topicName, subscription, consumerName, 
deadLetterPolicy.getDeadLetterTopic(),
+                                        ex);
+                                deadLetterProducerFailureCount++;
+                            } else {
+                                deadLetterProducerFailureCount = 0;
+                                
stats.setDeadLetterProducerStats(producer.getStats());
+                            }
+                        });
+                        return newProducer;
+                    }, deadLetterProducerFailureCount, () -> "dead letter 
producer (topic: "
+                            + deadLetterPolicy.getDeadLetterTopic() + ")");
+                    deadLetterProducer = p;
                 }
             } finally {
                 createProducerLock.writeLock().unlock();
             }
         }
+        return p;
     }
 
-    private void initRetryLetterProducerIfNeeded() {
-        if (retryLetterProducer == null) {
+    private CompletableFuture<Producer<byte[]>> createProducerWithBackOff(
+            Supplier<CompletableFuture<Producer<byte[]>>> producerSupplier, 
int failureCount,
+            Supplier<String> logDescription) {
+        if (failureCount == 0) {
+            return producerSupplier.get();
+        } else {
+            // calculate backoff time for given failure count
+            Backoff backoff = new BackoffBuilder()
+                    .setInitialTime(100, TimeUnit.MILLISECONDS)
+                    
.setMandatoryStop(client.getConfiguration().getOperationTimeoutMs() * 2,
+                            TimeUnit.MILLISECONDS)
+                    .setMax(1, TimeUnit.MINUTES)
+                    .create();
+            long backoffTimeMillis = 0;
+            for (int i = 0; i < failureCount; i++) {
+                backoffTimeMillis = backoff.next();
+            }
+            CompletableFuture<Producer<byte[]>> newProducer = new 
CompletableFuture<>();
+            ScheduledExecutorService executor =
+                    (ScheduledExecutorService) 
client.getScheduledExecutorProvider().getExecutor(this);
+            log.info("Creating {} with backoff time of {} ms", 
logDescription.get(), backoffTimeMillis);
+            executor.schedule(() -> {
+                FutureUtil.completeAfter(newProducer, producerSupplier.get());
+            }, backoffTimeMillis, TimeUnit.MILLISECONDS);
+            return newProducer;
+        }
+    }
+
+    private CompletableFuture<Producer<byte[]>> 
initRetryLetterProducerIfNeeded() {
+        CompletableFuture<Producer<byte[]>> p = retryLetterProducer;
+        if (p == null || p.isCompletedExceptionally()) {
             createProducerLock.writeLock().lock();
             try {
-                if (retryLetterProducer == null) {
-                    retryLetterProducer = client
-                            .newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
-                            .topic(this.deadLetterPolicy.getRetryLetterTopic())
-                            .enableBatching(false)
-                            .enableChunking(true)
-                            .blockIfQueueFull(false)
-                            .createAsync();
-                    retryLetterProducer.thenAccept(rtlProducer -> {
-                        
stats.setRetryLetterProducerStats(rtlProducer.getStats());
-                    });
+                p = retryLetterProducer;
+                if (p == null || p.isCompletedExceptionally()) {
+                    p = createProducerWithBackOff(() -> {
+                        CompletableFuture<Producer<byte[]>> newProducer = 
client
+                                .newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
+                                
.topic(this.deadLetterPolicy.getRetryLetterTopic())
+                                .enableBatching(false)
+                                .enableChunking(true)
+                                .blockIfQueueFull(false)
+                                .createAsync();
+                        newProducer.whenComplete((producer, ex) -> {
+                            if (ex != null) {
+                                log.error("[{}] [{}] [{}] Failed to create 
retry letter producer for topic {}",
+                                        topicName, subscription, consumerName, 
deadLetterPolicy.getRetryLetterTopic(),
+                                        ex);
+                                retryLetterProducerFailureCount++;
+                            } else {
+                                retryLetterProducerFailureCount = 0;
+                                
stats.setRetryLetterProducerStats(producer.getStats());
+                            }
+                        });
+                        return newProducer;
+                    }, retryLetterProducerFailureCount, () -> "retry letter 
producer (topic: "
+                            + deadLetterPolicy.getRetryLetterTopic() + ")");
+                    retryLetterProducer = p;
                 }
             } finally {
                 createProducerLock.writeLock().unlock();
             }
         }
+        return p;
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6f9c5b47c55..341272cd69b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -638,7 +638,14 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         List<CompletableFuture<Void>> futureList = consumers.values().stream()
-            .map(ConsumerImpl::closeAsync).collect(Collectors.toList());
+            .map(consumer -> consumer.closeAsync().exceptionally(t -> {
+                Throwable cause = FutureUtil.unwrapCompletionException(t);
+                if (!(cause instanceof 
PulsarClientException.AlreadyClosedException)) {
+                    log.warn("[{}] [{}] Error closing individual consumer", 
consumer.getTopic(),
+                            consumer.getSubscription(), cause);
+                }
+                return null;
+            })).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
             .thenComposeAsync((r) -> {
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml 
b/pulsar-client/src/main/resources/findbugsExclude.xml
index 0e05d20cb9b..f7cf6b9cfd5 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -1043,4 +1043,8 @@
         <Method name="getStats"/>
         <Bug pattern="EI_EXPOSE_REP"/>
     </Match>
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.ConsumerImpl"/>
+        <Bug pattern="VO_VOLATILE_INCREMENT"/>
+    </Match>
 </FindBugsFilter>


Reply via email to