This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f2e1920569e93cbc050f12cf778afaa30f8d78d8
Author: Omar Yasin <[email protected]>
AuthorDate: Wed Aug 14 01:00:23 2024 -0700

    [fix][client] Create the retry producer async (#23157)
    
    Co-authored-by: Ă“mar Yasin <[email protected]>
    (cherry picked from commit a0259380e1eb86dbe4e80d27c585188671b25135)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 92 ++++++++++++----------
 1 file changed, 49 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fa5a8bf3c5a..58e2692fea5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -193,7 +193,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
 
-    private volatile Producer<byte[]> retryLetterProducer;
+    private volatile CompletableFuture<Producer<byte[]>> retryLetterProducer;
     private final ReadWriteLock createProducerLock = new 
ReentrantReadWriteLock();
 
     protected volatile boolean paused;
@@ -594,6 +594,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                                        Map<String, String> 
customProperties,
                                                        long delayTime,
                                                        TimeUnit unit) {
+
         MessageId messageId = message.getMessageId();
         if (messageId == null) {
             return FutureUtil.failedFuture(new PulsarClientException
@@ -610,28 +611,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
             return FutureUtil.failedFuture(exception);
         }
-        if (delayTime < 0) {
-            delayTime = 0;
-        }
-        if (retryLetterProducer == null) {
-            createProducerLock.writeLock().lock();
-            try {
-                if (retryLetterProducer == null) {
-                    retryLetterProducer = 
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
-                            .topic(this.deadLetterPolicy.getRetryLetterTopic())
-                            .enableBatching(false)
-                            .enableChunking(true)
-                            .blockIfQueueFull(false)
-                            .create();
-                }
-            } catch (Exception e) {
-                log.error("Create retry letter producer exception with topic: 
{}",
-                        deadLetterPolicy.getRetryLetterTopic(), e);
-                return FutureUtil.failedFuture(e);
-            } finally {
-                createProducerLock.writeLock().unlock();
-            }
-        }
+
+        initRetryLetterProducerIfNeeded();
         CompletableFuture<Void> result = new CompletableFuture<>();
         if (retryLetterProducer != null) {
             try {
@@ -651,7 +632,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
                 
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, 
String.valueOf(reconsumeTimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
-                        String.valueOf(unit.toMillis(delayTime)));
+                        String.valueOf(unit.toMillis(delayTime < 0 ? 0 : 
delayTime)));
 
                 MessageId finalMessageId = messageId;
                 if (reconsumeTimes > 
this.deadLetterPolicy.getMaxRedeliverCount()
@@ -680,23 +661,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     });
                 } else {
                     assert retryMessage != null;
-                    TypedMessageBuilder<byte[]> typedMessageBuilderNew = 
retryLetterProducer
-                            
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
-                            .value(retryMessage.getData())
-                            .properties(propertiesMap);
-                    if (delayTime > 0) {
-                        typedMessageBuilderNew.deliverAfter(delayTime, unit);
-                    }
-                    if (message.hasKey()) {
-                        typedMessageBuilderNew.key(message.getKey());
-                    }
-                    typedMessageBuilderNew.sendAsync()
-                            .thenCompose(__ -> doAcknowledge(finalMessageId, 
ackType, Collections.emptyMap(), null))
-                            .thenAccept(v -> result.complete(null))
-                            .exceptionally(ex -> {
-                                result.completeExceptionally(ex);
-                                return null;
-                            });
+                    retryLetterProducer.thenAcceptAsync(rtlProducer -> {
+                        TypedMessageBuilder<byte[]> typedMessageBuilderNew = 
rtlProducer
+                                
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                                .value(retryMessage.getData())
+                                .properties(propertiesMap);
+                        if (delayTime > 0) {
+                            typedMessageBuilderNew.deliverAfter(delayTime, 
unit);
+                        }
+                        if (message.hasKey()) {
+                            typedMessageBuilderNew.key(message.getKey());
+                        }
+                        typedMessageBuilderNew.sendAsync()
+                                .thenCompose(__ -> 
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
+                                .thenAccept(v -> result.complete(null))
+                                .exceptionally(ex -> {
+                                    result.completeExceptionally(ex);
+                                    return null;
+                                });
+                    }, internalPinnedExecutor).exceptionally(ex -> {
+                        result.completeExceptionally(ex);
+                        retryLetterProducer = null;
+                        return null;
+                    });
                 }
             } catch (Exception e) {
                 result.completeExceptionally(e);
@@ -705,7 +692,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         MessageId finalMessageId = messageId;
         result.exceptionally(ex -> {
             log.error("Send to retry letter topic exception with topic: {}, 
messageId: {}",
-                    retryLetterProducer.getTopic(), finalMessageId, ex);
+                    this.deadLetterPolicy.getRetryLetterTopic(), 
finalMessageId, ex);
             Set<MessageId> messageIds = Collections.singleton(finalMessageId);
             unAckedMessageTracker.remove(finalMessageId);
             redeliverUnacknowledgedMessages(messageIds);
@@ -1070,7 +1057,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
         closeFutures.add(closeFuture);
         if (retryLetterProducer != null) {
-            
closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> {
+            closeFutures.add(retryLetterProducer.thenCompose(p -> 
p.closeAsync()).whenComplete((ignore, ex) -> {
                 if (ex != null) {
                     log.warn("Exception ignored in closing retryLetterProducer 
of consumer", ex);
                 }
@@ -2186,6 +2173,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
     }
 
+    private void initRetryLetterProducerIfNeeded() {
+        if (retryLetterProducer == null) {
+            createProducerLock.writeLock().lock();
+            try {
+                if (retryLetterProducer == null) {
+                    retryLetterProducer = client
+                            .newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
+                            .topic(this.deadLetterPolicy.getRetryLetterTopic())
+                            .enableBatching(false)
+                            .enableChunking(true)
+                            .blockIfQueueFull(false)
+                            .createAsync();
+                }
+            } finally {
+                createProducerLock.writeLock().unlock();
+            }
+        }
+    }
+
     @Override
     public void seek(MessageId messageId) throws PulsarClientException {
         try {

Reply via email to