This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f2e1920569e93cbc050f12cf778afaa30f8d78d8 Author: Omar Yasin <[email protected]> AuthorDate: Wed Aug 14 01:00:23 2024 -0700 [fix][client] Create the retry producer async (#23157) Co-authored-by: Ómar Yasin <[email protected]> (cherry picked from commit a0259380e1eb86dbe4e80d27c585188671b25135) --- .../apache/pulsar/client/impl/ConsumerImpl.java | 92 ++++++++++++---------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index fa5a8bf3c5a..58e2692fea5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -193,7 +193,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer; - private volatile Producer<byte[]> retryLetterProducer; + private volatile CompletableFuture<Producer<byte[]>> retryLetterProducer; private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); protected volatile boolean paused; @@ -594,6 +594,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle Map<String, String> customProperties, long delayTime, TimeUnit unit) { + MessageId messageId = message.getMessageId(); if (messageId == null) { return FutureUtil.failedFuture(new PulsarClientException @@ -610,28 +611,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } return FutureUtil.failedFuture(exception); } - if (delayTime < 0) { - delayTime = 0; - } - if (retryLetterProducer == null) { - createProducerLock.writeLock().lock(); - try { - if (retryLetterProducer == null) { - retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) - .topic(this.deadLetterPolicy.getRetryLetterTopic()) - .enableBatching(false) - .enableChunking(true) - .blockIfQueueFull(false) - .create(); - } - } catch (Exception e) { - log.error("Create retry letter producer exception with topic: {}", - deadLetterPolicy.getRetryLetterTopic(), e); - return FutureUtil.failedFuture(e); - } finally { - createProducerLock.writeLock().unlock(); - } - } + + initRetryLetterProducerIfNeeded(); CompletableFuture<Void> result = new CompletableFuture<>(); if (retryLetterProducer != null) { try { @@ -651,7 +632,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumeTimes)); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, - String.valueOf(unit.toMillis(delayTime))); + String.valueOf(unit.toMillis(delayTime < 0 ? 0 : delayTime))); MessageId finalMessageId = messageId; if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount() @@ -680,23 +661,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle }); } else { assert retryMessage != null; - TypedMessageBuilder<byte[]> typedMessageBuilderNew = retryLetterProducer - .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(retryMessage.getData()) - .properties(propertiesMap); - if (delayTime > 0) { - typedMessageBuilderNew.deliverAfter(delayTime, unit); - } - if (message.hasKey()) { - typedMessageBuilderNew.key(message.getKey()); - } - typedMessageBuilderNew.sendAsync() - .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) - .thenAccept(v -> result.complete(null)) - .exceptionally(ex -> { - result.completeExceptionally(ex); - return null; - }); + retryLetterProducer.thenAcceptAsync(rtlProducer -> { + TypedMessageBuilder<byte[]> typedMessageBuilderNew = rtlProducer + .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + .value(retryMessage.getData()) + .properties(propertiesMap); + if (delayTime > 0) { + typedMessageBuilderNew.deliverAfter(delayTime, unit); + } + if (message.hasKey()) { + typedMessageBuilderNew.key(message.getKey()); + } + typedMessageBuilderNew.sendAsync() + .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) + .thenAccept(v -> result.complete(null)) + .exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); + }, internalPinnedExecutor).exceptionally(ex -> { + result.completeExceptionally(ex); + retryLetterProducer = null; + return null; + }); } } catch (Exception e) { result.completeExceptionally(e); @@ -705,7 +692,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle MessageId finalMessageId = messageId; result.exceptionally(ex -> { log.error("Send to retry letter topic exception with topic: {}, messageId: {}", - retryLetterProducer.getTopic(), finalMessageId, ex); + this.deadLetterPolicy.getRetryLetterTopic(), finalMessageId, ex); Set<MessageId> messageIds = Collections.singleton(finalMessageId); unAckedMessageTracker.remove(finalMessageId); redeliverUnacknowledgedMessages(messageIds); @@ -1070,7 +1057,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4); closeFutures.add(closeFuture); if (retryLetterProducer != null) { - closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> { + closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { if (ex != null) { log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); } @@ -2186,6 +2173,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } } + private void initRetryLetterProducerIfNeeded() { + if (retryLetterProducer == null) { + createProducerLock.writeLock().lock(); + try { + if (retryLetterProducer == null) { + retryLetterProducer = client + .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) + .topic(this.deadLetterPolicy.getRetryLetterTopic()) + .enableBatching(false) + .enableChunking(true) + .blockIfQueueFull(false) + .createAsync(); + } + } finally { + createProducerLock.writeLock().unlock(); + } + } + } + @Override public void seek(MessageId messageId) throws PulsarClientException { try {
