This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb0f3e3 Async the DLQ process (#9552)
fb0f3e3 is described below
commit fb0f3e39cf1d6eaefb58825291f090cdfe2c4904
Author: lipenghui <[email protected]>
AuthorDate: Thu Feb 11 09:48:37 2021 +0800
Async the DLQ process (#9552)
Fixes #9540
### Motivation
Async the DLQ process. Currently, the DLQ process is a synchronous process.
Since we process the DLQ in the timer and the timer will acquire a write lock
during writing the data to the DLQ, the data writing process will use the IO
thread and the messages that add to the UnAckedMessageTracker also use the IO
thread and if also acquire the same write lock. So this will result in a dead
lock.
---
.../pulsar/client/api/DeadLetterTopicTest.java | 2 +-
.../apache/pulsar/client/impl/ConsumerImpl.java | 194 +++++++++++++--------
.../pulsar/client/impl/UnAckedMessageTracker.java | 13 +-
3 files changed, 128 insertions(+), 81 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 1f396b7..1bc7cfc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -123,7 +123,7 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
newPulsarClient.close();
}
- @Test(timeOut = 10000)
+ @Test(timeOut = 30000)
public void testDLQDisabledForKeySharedSubtype() throws Exception {
final String topic =
"persistent://my-property/my-ns/dead-letter-topic";
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6f92047..893e4e1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -35,7 +35,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -163,7 +162,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final DeadLetterPolicy deadLetterPolicy;
- private volatile Producer<T> deadLetterProducer;
+ private volatile CompletableFuture<Producer<T>> deadLetterProducer;
private volatile Producer<T> retryLetterProducer;
private final ReadWriteLock createProducerLock = new
ReentrantReadWriteLock();
@@ -580,6 +579,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
createProducerLock.writeLock().unlock();
}
}
+ CompletableFuture<Void> result = new CompletableFuture<>();
if (retryLetterProducer != null) {
try {
MessageImpl<T> retryMessage = null;
@@ -613,32 +613,33 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
String.valueOf(unit.toMillis(delayTime)));
if (reconsumetimes >
this.deadLetterPolicy.getMaxRedeliverCount() &&
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
- processPossibleToDLQ((MessageIdImpl)messageId);
- if (deadLetterProducer == null) {
- try {
- createProducerLock.writeLock().lock();
- if (deadLetterProducer == null) {
- deadLetterProducer = client.newProducer(schema)
- .topic(this.deadLetterPolicy
- .getDeadLetterTopic())
- .blockIfQueueFull(false)
- .create();
- }
- } catch (Exception e) {
- log.error("Create dead letter producer exception
with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
- } finally {
- createProducerLock.writeLock().unlock();
- }
- }
- if (deadLetterProducer != null) {
-
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC,
originTopicNameStr);
-
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID,
originMessageIdStr);
- TypedMessageBuilder<T> typedMessageBuilderNew =
deadLetterProducer.newMessage()
- .value(retryMessage.getValue())
- .properties(propertiesMap);
- typedMessageBuilderNew.send();
- return doAcknowledge(messageId, ackType, properties,
null);
- }
+ initDeadLetterProducerIfNeeded();
+ MessageId finalMessageId = messageId;
+ String finalOriginTopicNameStr = originTopicNameStr;
+ String finalOriginMessageIdStr = originMessageIdStr;
+ MessageImpl<T> finalRetryMessage = retryMessage;
+ deadLetterProducer.thenAccept(dlqProducer -> {
+
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC,
finalOriginTopicNameStr);
+
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID,
finalOriginMessageIdStr);
+ TypedMessageBuilder<T> typedMessageBuilderNew =
dlqProducer.newMessage()
+ .value(finalRetryMessage.getValue())
+ .properties(propertiesMap);
+ typedMessageBuilderNew.sendAsync().thenAccept(msgId ->
{
+ doAcknowledge(finalMessageId, ackType, properties,
null).thenAccept(v -> {
+ result.complete(null);
+ }).exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ return null;
+ });
+ }).exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ return null;
+ });
+ }).exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ deadLetterProducer = null;
+ return null;
+ });
} else {
TypedMessageBuilder<T> typedMessageBuilderNew =
retryLetterProducer.newMessage()
.value(retryMessage.getValue())
@@ -654,13 +655,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
} catch (Exception e) {
log.error("Send to retry letter topic exception with topic:
{}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);
- Set<MessageId> messageIds = new HashSet<>();
- messageIds.add(messageId);
+ Set<MessageId> messageIds = Collections.singleton(messageId);
unAckedMessageTracker.remove(messageId);
redeliverUnacknowledgedMessages(messageIds);
}
}
- return CompletableFuture.completedFuture(null);
+ MessageId finalMessageId = messageId;
+ result.exceptionally(ex -> {
+ Set<MessageId> messageIds = Collections.singleton(finalMessageId);
+ unAckedMessageTracker.remove(finalMessageId);
+ redeliverUnacknowledgedMessages(messageIds);
+ return null;
+ });
+ return result;
}
@Override
@@ -1635,18 +1642,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
.map(messageId -> (MessageIdImpl)messageId)
.collect(Collectors.toSet()),
MAX_REDELIVER_UNACKNOWLEDGED);
batches.forEach(ids -> {
- List<MessageIdData> messageIdDatas = ids.stream()
- .filter(messageId -> !processPossibleToDLQ(messageId))
- .map(messageId -> {
- return new MessageIdData()
-
.setPartition(messageId.getPartitionIndex())
- .setLedgerId(messageId.getLedgerId())
- .setEntryId(messageId.getEntryId());
- }).collect(Collectors.toList());
- if (!messageIdDatas.isEmpty()) {
- ByteBuf cmd =
Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
- cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
- }
+ getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> {
+ if (!messageIdData.isEmpty()) {
+ ByteBuf cmd =
Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData);
+ cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+ }
+ });
});
if (messagesFromQueue > 0) {
increaseAvailablePermits(cnx, messagesFromQueue);
@@ -1670,48 +1671,91 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
notifyPendingBatchReceivedCallBack(op);
}
- private boolean processPossibleToDLQ(MessageIdImpl messageId) {
+ private CompletableFuture<List<MessageIdData>>
getRedeliveryMessageIdData(List<MessageIdImpl> messageIds) {
+ if (messageIds == null || messageIds.isEmpty()) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ List<MessageIdData> data = new ArrayList<>(messageIds.size());
+ List<CompletableFuture<Boolean>> futures = new
ArrayList<>(messageIds.size());
+ messageIds.forEach(messageId -> {
+ CompletableFuture<Boolean> future =
processPossibleToDLQ(messageId);
+ futures.add(future);
+ future.thenAccept(sendToDLQ -> {
+ if (!sendToDLQ) {
+ data.add(new MessageIdData()
+ .setPartition(messageId.getPartitionIndex())
+ .setLedgerId(messageId.getLedgerId())
+ .setEntryId(messageId.getEntryId()));
+ }
+ });
+ });
+ return FutureUtil.waitForAll(futures).thenCompose(v ->
CompletableFuture.completedFuture(data));
+ }
+
+ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl
messageId) {
List<MessageImpl<T>> deadLetterMessages = null;
if (possibleSendToDeadLetterTopicMessages != null) {
if (messageId instanceof BatchMessageIdImpl) {
- deadLetterMessages =
possibleSendToDeadLetterTopicMessages.get(new
MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
- getPartitionIndex()));
- } else {
- deadLetterMessages =
possibleSendToDeadLetterTopicMessages.get(messageId);
+ messageId = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(),
+ getPartitionIndex());
}
+ deadLetterMessages =
possibleSendToDeadLetterTopicMessages.get(messageId);
}
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
if (deadLetterMessages != null) {
- if (deadLetterProducer == null) {
- try {
- createProducerLock.writeLock().lock();
- if (deadLetterProducer == null) {
- deadLetterProducer = client.newProducer(schema)
-
.topic(this.deadLetterPolicy.getDeadLetterTopic())
- .blockIfQueueFull(false)
- .create();
- }
- } catch (Exception e) {
- log.error("Create dead letter producer exception with
topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
- } finally {
- createProducerLock.writeLock().unlock();
+ initDeadLetterProducerIfNeeded();
+ List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
+ MessageIdImpl finalMessageId = messageId;
+ deadLetterProducer.thenAccept(producerDLQ -> {
+ for (MessageImpl<T> message : finalDeadLetterMessages) {
+ producerDLQ.newMessage()
+ .value(message.getValue())
+ .properties(message.getProperties())
+ .sendAsync()
+ .thenAccept(messageIdInDLQ -> {
+
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
+
acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] [{}] [{}] Failed to
acknowledge the message {} of the original topic but send to the DLQ
successfully.",
+ topicName, subscription,
consumerName, finalMessageId, ex);
+ } else {
+ result.complete(true);
+ }
+ });
+ }).exceptionally(ex -> {
+ log.warn("[{}] [{}] [{}] Failed to send DLQ
message to {} for message id {}",
+ topicName, subscription, consumerName,
finalMessageId, ex);
+ result.complete(false);
+ return null;
+ });
}
- }
- if (deadLetterProducer != null) {
- try {
- for (MessageImpl<T> message : deadLetterMessages) {
- deadLetterProducer.newMessage()
- .value(message.getValue())
- .properties(message.getProperties())
- .send();
- }
- acknowledge(messageId);
- return true;
- } catch (Exception e) {
- log.error("Send to dead letter topic exception with topic:
{}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
+ }).exceptionally(ex -> {
+ deadLetterProducer = null;
+ result.complete(false);
+ return null;
+ });
+ } else {
+ result.complete(false);
+ }
+ return result;
+ }
+
+ private void initDeadLetterProducerIfNeeded() {
+ if (deadLetterProducer == null) {
+ try {
+ createProducerLock.writeLock().lock();
+ if (deadLetterProducer == null) {
+ deadLetterProducer = client.newProducer(schema)
+ .topic(this.deadLetterPolicy.getDeadLetterTopic())
+ .blockIfQueueFull(false)
+ .createAsync();
}
+ } catch (Exception e) {
+ log.error("Create dead letter producer exception with topic:
{}", deadLetterPolicy.getDeadLetterTopic(), e);
+ } finally {
+ createProducerLock.writeLock().unlock();
}
}
- return false;
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 312763d..bac1b2d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -139,12 +139,15 @@ public class UnAckedMessageTracker implements Closeable {
headPartition.clear();
timePartitions.addLast(headPartition);
} finally {
- if (messageIds.size() > 0) {
- consumerBase.onAckTimeoutSend(messageIds);
-
consumerBase.redeliverUnacknowledgedMessages(messageIds);
+ try {
+ if (messageIds.size() > 0) {
+ consumerBase.onAckTimeoutSend(messageIds);
+
consumerBase.redeliverUnacknowledgedMessages(messageIds);
+ }
+ timeout = client.timer().newTimeout(this,
tickDurationInMs, TimeUnit.MILLISECONDS);
+ } finally {
+ writeLock.unlock();
}
- timeout = client.timer().newTimeout(this,
tickDurationInMs, TimeUnit.MILLISECONDS);
- writeLock.unlock();
}
}
}, this.tickDurationInMs, TimeUnit.MILLISECONDS);