This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a0259380e1e [fix][client] Create the retry producer async (#23157)
a0259380e1e is described below
commit a0259380e1eb86dbe4e80d27c585188671b25135
Author: Omar Yasin <[email protected]>
AuthorDate: Wed Aug 14 01:00:23 2024 -0700
[fix][client] Create the retry producer async (#23157)
Co-authored-by: Ómar Yasin <[email protected]>
---
.../apache/pulsar/client/impl/ConsumerImpl.java | 96 ++++++++++++----------
1 file changed, 52 insertions(+), 44 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1806d13493b..3acf55afaed 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -203,7 +203,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
- private volatile Producer<byte[]> retryLetterProducer;
+ private volatile CompletableFuture<Producer<byte[]>> retryLetterProducer;
private final ReadWriteLock createProducerLock = new
ReentrantReadWriteLock();
protected volatile boolean paused;
@@ -643,6 +643,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
Map<String, String>
customProperties,
long delayTime,
TimeUnit unit) {
+
MessageId messageId = message.getMessageId();
if (messageId == null) {
return FutureUtil.failedFuture(new PulsarClientException
@@ -659,29 +660,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
return FutureUtil.failedFuture(exception);
}
- if (delayTime < 0) {
- delayTime = 0;
- }
- if (retryLetterProducer == null) {
- createProducerLock.writeLock().lock();
- try {
- if (retryLetterProducer == null) {
- retryLetterProducer =
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
- .topic(this.deadLetterPolicy.getRetryLetterTopic())
- .enableBatching(false)
- .enableChunking(true)
- .blockIfQueueFull(false)
- .create();
-
stats.setRetryLetterProducerStats(retryLetterProducer.getStats());
- }
- } catch (Exception e) {
- log.error("Create retry letter producer exception with topic:
{}",
- deadLetterPolicy.getRetryLetterTopic(), e);
- return FutureUtil.failedFuture(e);
- } finally {
- createProducerLock.writeLock().unlock();
- }
- }
+
+ initRetryLetterProducerIfNeeded();
CompletableFuture<Void> result = new CompletableFuture<>();
if (retryLetterProducer != null) {
try {
@@ -701,7 +681,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES,
String.valueOf(reconsumeTimes));
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
- String.valueOf(unit.toMillis(delayTime)));
+ String.valueOf(unit.toMillis(delayTime < 0 ? 0 :
delayTime)));
MessageId finalMessageId = messageId;
if (reconsumeTimes >
this.deadLetterPolicy.getMaxRedeliverCount()
@@ -732,23 +712,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
});
} else {
assert retryMessage != null;
- TypedMessageBuilder<byte[]> typedMessageBuilderNew =
retryLetterProducer
-
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
- .value(retryMessage.getData())
- .properties(propertiesMap);
- if (delayTime > 0) {
- typedMessageBuilderNew.deliverAfter(delayTime, unit);
- }
- if (message.hasKey()) {
- typedMessageBuilderNew.key(message.getKey());
- }
- typedMessageBuilderNew.sendAsync()
- .thenCompose(__ -> doAcknowledge(finalMessageId,
ackType, Collections.emptyMap(), null))
- .thenAccept(v -> result.complete(null))
- .exceptionally(ex -> {
- result.completeExceptionally(ex);
- return null;
- });
+ retryLetterProducer.thenAcceptAsync(rtlProducer -> {
+ TypedMessageBuilder<byte[]> typedMessageBuilderNew =
rtlProducer
+
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ .value(retryMessage.getData())
+ .properties(propertiesMap);
+ if (delayTime > 0) {
+ typedMessageBuilderNew.deliverAfter(delayTime,
unit);
+ }
+ if (message.hasKey()) {
+ typedMessageBuilderNew.key(message.getKey());
+ }
+ typedMessageBuilderNew.sendAsync()
+ .thenCompose(__ ->
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
+ .thenAccept(v -> result.complete(null))
+ .exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ return null;
+ });
+ }, internalPinnedExecutor).exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ retryLetterProducer = null;
+ return null;
+ });
}
} catch (Exception e) {
result.completeExceptionally(e);
@@ -757,7 +743,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
MessageId finalMessageId = messageId;
result.exceptionally(ex -> {
log.error("Send to retry letter topic exception with topic: {},
messageId: {}",
- retryLetterProducer.getTopic(), finalMessageId, ex);
+ this.deadLetterPolicy.getRetryLetterTopic(),
finalMessageId, ex);
Set<MessageId> messageIds = Collections.singleton(finalMessageId);
unAckedMessageTracker.remove(finalMessageId);
redeliverUnacknowledgedMessages(messageIds);
@@ -1136,7 +1122,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
closeFutures.add(closeFuture);
if (retryLetterProducer != null) {
-
closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> {
+ closeFutures.add(retryLetterProducer.thenCompose(p ->
p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing retryLetterProducer
of consumer", ex);
}
@@ -2267,6 +2253,28 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}
+ private void initRetryLetterProducerIfNeeded() {
+ if (retryLetterProducer == null) {
+ createProducerLock.writeLock().lock();
+ try {
+ if (retryLetterProducer == null) {
+ retryLetterProducer = client
+ .newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
+ .topic(this.deadLetterPolicy.getRetryLetterTopic())
+ .enableBatching(false)
+ .enableChunking(true)
+ .blockIfQueueFull(false)
+ .createAsync();
+ retryLetterProducer.thenAccept(rtlProducer -> {
+
stats.setRetryLetterProducerStats(rtlProducer.getStats());
+ });
+ }
+ } finally {
+ createProducerLock.writeLock().unlock();
+ }
+ }
+ }
+
@Override
public void seek(MessageId messageId) throws PulsarClientException {
try {