This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a3eec3084a127be0cc7e0f9c38da64be06d68d92 Author: Omar Yasin <[email protected]> AuthorDate: Wed Aug 14 01:00:23 2024 -0700 [fix][client] Create the retry producer async (#23157) Co-authored-by: Ómar Yasin <[email protected]> (cherry picked from commit a0259380e1eb86dbe4e80d27c585188671b25135) --- .../apache/pulsar/client/impl/ConsumerImpl.java | 96 ++++++++++++---------- 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 4b21d0908bc..2b2f06b8333 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -198,7 +198,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer; - private volatile Producer<byte[]> retryLetterProducer; + private volatile CompletableFuture<Producer<byte[]>> retryLetterProducer; private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); protected volatile boolean paused; @@ -600,6 +600,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle Map<String, String> customProperties, long delayTime, TimeUnit unit) { + MessageId messageId = message.getMessageId(); if (messageId == null) { return FutureUtil.failedFuture(new PulsarClientException @@ -616,29 +617,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } return FutureUtil.failedFuture(exception); } - if (delayTime < 0) { - delayTime = 0; - } - if (retryLetterProducer == null) { - createProducerLock.writeLock().lock(); - try { - if (retryLetterProducer == null) { - retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) - .topic(this.deadLetterPolicy.getRetryLetterTopic()) - .enableBatching(false) - .enableChunking(true) - .blockIfQueueFull(false) - .create(); - stats.setRetryLetterProducerStats(retryLetterProducer.getStats()); - } - } catch (Exception e) { - log.error("Create retry letter producer exception with topic: {}", - deadLetterPolicy.getRetryLetterTopic(), e); - return FutureUtil.failedFuture(e); - } finally { - createProducerLock.writeLock().unlock(); - } - } + + initRetryLetterProducerIfNeeded(); CompletableFuture<Void> result = new CompletableFuture<>(); if (retryLetterProducer != null) { try { @@ -658,7 +638,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumeTimes)); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, - String.valueOf(unit.toMillis(delayTime))); + String.valueOf(unit.toMillis(delayTime < 0 ? 0 : delayTime))); MessageId finalMessageId = messageId; if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount() @@ -687,23 +667,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle }); } else { assert retryMessage != null; - TypedMessageBuilder<byte[]> typedMessageBuilderNew = retryLetterProducer - .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(retryMessage.getData()) - .properties(propertiesMap); - if (delayTime > 0) { - typedMessageBuilderNew.deliverAfter(delayTime, unit); - } - if (message.hasKey()) { - typedMessageBuilderNew.key(message.getKey()); - } - typedMessageBuilderNew.sendAsync() - .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) - .thenAccept(v -> result.complete(null)) - .exceptionally(ex -> { - result.completeExceptionally(ex); - return null; - }); + retryLetterProducer.thenAcceptAsync(rtlProducer -> { + TypedMessageBuilder<byte[]> typedMessageBuilderNew = rtlProducer + .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + .value(retryMessage.getData()) + .properties(propertiesMap); + if (delayTime > 0) { + typedMessageBuilderNew.deliverAfter(delayTime, unit); + } + if (message.hasKey()) { + typedMessageBuilderNew.key(message.getKey()); + } + typedMessageBuilderNew.sendAsync() + .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) + .thenAccept(v -> result.complete(null)) + .exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); + }, internalPinnedExecutor).exceptionally(ex -> { + result.completeExceptionally(ex); + retryLetterProducer = null; + return null; + }); } } catch (Exception e) { result.completeExceptionally(e); @@ -712,7 +698,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle MessageId finalMessageId = messageId; result.exceptionally(ex -> { log.error("Send to retry letter topic exception with topic: {}, messageId: {}", - retryLetterProducer.getTopic(), finalMessageId, ex); + this.deadLetterPolicy.getRetryLetterTopic(), finalMessageId, ex); Set<MessageId> messageIds = Collections.singleton(finalMessageId); unAckedMessageTracker.remove(finalMessageId); redeliverUnacknowledgedMessages(messageIds); @@ -1087,7 +1073,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4); closeFutures.add(closeFuture); if (retryLetterProducer != null) { - closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> { + closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { if (ex != null) { log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); } @@ -2209,6 +2195,28 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } } + private void initRetryLetterProducerIfNeeded() { + if (retryLetterProducer == null) { + createProducerLock.writeLock().lock(); + try { + if (retryLetterProducer == null) { + retryLetterProducer = client + .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) + .topic(this.deadLetterPolicy.getRetryLetterTopic()) + .enableBatching(false) + .enableChunking(true) + .blockIfQueueFull(false) + .createAsync(); + retryLetterProducer.thenAccept(rtlProducer -> { + stats.setRetryLetterProducerStats(rtlProducer.getStats()); + }); + } + } finally { + createProducerLock.writeLock().unlock(); + } + } + } + @Override public void seek(MessageId messageId) throws PulsarClientException { try {
