This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fb0f3e3  Async the DLQ process (#9552)
fb0f3e3 is described below

commit fb0f3e39cf1d6eaefb58825291f090cdfe2c4904
Author: lipenghui <[email protected]>
AuthorDate: Thu Feb 11 09:48:37 2021 +0800

    Async the DLQ process (#9552)
    
    Fixes #9540
    
    ### Motivation
    
    Async the DLQ process. Currently, the DLQ process is a synchronous process. 
Since we process the DLQ in the timer and the timer will acquire a write lock 
during writing the data to the DLQ, the data writing process will use the IO 
thread and the messages that add to the UnAckedMessageTracker also use the IO 
thread and if also acquire the same write lock. So this will result in a dead 
lock.
---
 .../pulsar/client/api/DeadLetterTopicTest.java     |   2 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 194 +++++++++++++--------
 .../pulsar/client/impl/UnAckedMessageTracker.java  |  13 +-
 3 files changed, 128 insertions(+), 81 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index 1f396b7..1bc7cfc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -123,7 +123,7 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         newPulsarClient.close();
     }
 
-    @Test(timeOut = 10000)
+    @Test(timeOut = 30000)
     public void testDLQDisabledForKeySharedSubtype() throws Exception {
         final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6f92047..893e4e1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -35,7 +35,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -163,7 +162,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
     private final DeadLetterPolicy deadLetterPolicy;
 
-    private volatile Producer<T> deadLetterProducer;
+    private volatile CompletableFuture<Producer<T>> deadLetterProducer;
 
     private volatile Producer<T> retryLetterProducer;
     private final ReadWriteLock createProducerLock = new 
ReentrantReadWriteLock();
@@ -580,6 +579,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 createProducerLock.writeLock().unlock();
             }
         }
+        CompletableFuture<Void> result = new CompletableFuture<>();
         if (retryLetterProducer != null) {
             try {
                 MessageImpl<T> retryMessage = null;
@@ -613,32 +613,33 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, 
String.valueOf(unit.toMillis(delayTime)));
 
                 if (reconsumetimes > 
this.deadLetterPolicy.getMaxRedeliverCount() && 
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
-                   processPossibleToDLQ((MessageIdImpl)messageId);
-                    if (deadLetterProducer == null) {
-                        try {
-                            createProducerLock.writeLock().lock();
-                            if (deadLetterProducer == null) {
-                                deadLetterProducer = client.newProducer(schema)
-                                        .topic(this.deadLetterPolicy
-                                        .getDeadLetterTopic())
-                                        .blockIfQueueFull(false)
-                                        .create();
-                            }
-                        } catch (Exception e) {
-                           log.error("Create dead letter producer exception 
with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
-                        } finally {
-                           createProducerLock.writeLock().unlock();
-                        }
-                   }
-                   if (deadLetterProducer != null) {
-                       
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, 
originTopicNameStr);
-                       
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, 
originMessageIdStr);
-                       TypedMessageBuilder<T> typedMessageBuilderNew = 
deadLetterProducer.newMessage()
-                               .value(retryMessage.getValue())
-                               .properties(propertiesMap);
-                       typedMessageBuilderNew.send();
-                       return doAcknowledge(messageId, ackType, properties, 
null);
-                   }
+                    initDeadLetterProducerIfNeeded();
+                    MessageId finalMessageId = messageId;
+                    String finalOriginTopicNameStr = originTopicNameStr;
+                    String finalOriginMessageIdStr = originMessageIdStr;
+                    MessageImpl<T> finalRetryMessage = retryMessage;
+                    deadLetterProducer.thenAccept(dlqProducer -> {
+                        
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, 
finalOriginTopicNameStr);
+                        
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, 
finalOriginMessageIdStr);
+                        TypedMessageBuilder<T> typedMessageBuilderNew = 
dlqProducer.newMessage()
+                                .value(finalRetryMessage.getValue())
+                                .properties(propertiesMap);
+                        typedMessageBuilderNew.sendAsync().thenAccept(msgId -> 
{
+                            doAcknowledge(finalMessageId, ackType, properties, 
null).thenAccept(v -> {
+                                result.complete(null);
+                            }).exceptionally(ex -> {
+                                result.completeExceptionally(ex);
+                                return null;
+                            });
+                        }).exceptionally(ex -> {
+                            result.completeExceptionally(ex);
+                            return null;
+                        });
+                    }).exceptionally(ex -> {
+                        result.completeExceptionally(ex);
+                        deadLetterProducer = null;
+                        return null;
+                    });
                 } else {
                     TypedMessageBuilder<T> typedMessageBuilderNew = 
retryLetterProducer.newMessage()
                             .value(retryMessage.getValue())
@@ -654,13 +655,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
             } catch (Exception e) {
                 log.error("Send to retry letter topic exception with topic: 
{}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);
-                Set<MessageId> messageIds = new HashSet<>();
-                messageIds.add(messageId);
+                Set<MessageId> messageIds = Collections.singleton(messageId);
                 unAckedMessageTracker.remove(messageId);
                 redeliverUnacknowledgedMessages(messageIds);
             }
         }
-        return CompletableFuture.completedFuture(null);
+        MessageId finalMessageId = messageId;
+        result.exceptionally(ex -> {
+            Set<MessageId> messageIds = Collections.singleton(finalMessageId);
+            unAckedMessageTracker.remove(finalMessageId);
+            redeliverUnacknowledgedMessages(messageIds);
+            return null;
+        });
+        return result;
     }
 
     @Override
@@ -1635,18 +1642,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     .map(messageId -> (MessageIdImpl)messageId)
                     .collect(Collectors.toSet()), 
MAX_REDELIVER_UNACKNOWLEDGED);
             batches.forEach(ids -> {
-                List<MessageIdData> messageIdDatas = ids.stream()
-                        .filter(messageId -> !processPossibleToDLQ(messageId))
-                        .map(messageId -> {
-                            return new MessageIdData()
-                                    
.setPartition(messageId.getPartitionIndex())
-                                    .setLedgerId(messageId.getLedgerId())
-                                    .setEntryId(messageId.getEntryId());
-                        }).collect(Collectors.toList());
-                if (!messageIdDatas.isEmpty()) {
-                    ByteBuf cmd = 
Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
-                }
+                getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> {
+                    if (!messageIdData.isEmpty()) {
+                        ByteBuf cmd = 
Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData);
+                        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    }
+                });
             });
             if (messagesFromQueue > 0) {
                 increaseAvailablePermits(cnx, messagesFromQueue);
@@ -1670,48 +1671,91 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         notifyPendingBatchReceivedCallBack(op);
     }
 
-    private boolean processPossibleToDLQ(MessageIdImpl messageId) {
+    private CompletableFuture<List<MessageIdData>> 
getRedeliveryMessageIdData(List<MessageIdImpl> messageIds) {
+        if (messageIds == null || messageIds.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+        List<MessageIdData> data = new ArrayList<>(messageIds.size());
+        List<CompletableFuture<Boolean>> futures = new 
ArrayList<>(messageIds.size());
+        messageIds.forEach(messageId ->  {
+            CompletableFuture<Boolean> future = 
processPossibleToDLQ(messageId);
+            futures.add(future);
+            future.thenAccept(sendToDLQ -> {
+                if (!sendToDLQ) {
+                    data.add(new MessageIdData()
+                            .setPartition(messageId.getPartitionIndex())
+                            .setLedgerId(messageId.getLedgerId())
+                            .setEntryId(messageId.getEntryId()));
+                }
+            });
+        });
+        return FutureUtil.waitForAll(futures).thenCompose(v -> 
CompletableFuture.completedFuture(data));
+    }
+
+    private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl 
messageId) {
         List<MessageImpl<T>> deadLetterMessages = null;
         if (possibleSendToDeadLetterTopicMessages != null) {
             if (messageId instanceof BatchMessageIdImpl) {
-                deadLetterMessages = 
possibleSendToDeadLetterTopicMessages.get(new 
MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
-                        getPartitionIndex()));
-            } else {
-                deadLetterMessages = 
possibleSendToDeadLetterTopicMessages.get(messageId);
+                messageId = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(),
+                        getPartitionIndex());
             }
+            deadLetterMessages = 
possibleSendToDeadLetterTopicMessages.get(messageId);
         }
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
         if (deadLetterMessages != null) {
-            if (deadLetterProducer == null) {
-                try {
-                    createProducerLock.writeLock().lock();
-                    if (deadLetterProducer == null) {
-                        deadLetterProducer = client.newProducer(schema)
-                                
.topic(this.deadLetterPolicy.getDeadLetterTopic())
-                                .blockIfQueueFull(false)
-                                .create();
-                    }
-                } catch (Exception e) {
-                    log.error("Create dead letter producer exception with 
topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
-                } finally {
-                    createProducerLock.writeLock().unlock();
+            initDeadLetterProducerIfNeeded();
+            List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
+            MessageIdImpl finalMessageId = messageId;
+            deadLetterProducer.thenAccept(producerDLQ -> {
+                for (MessageImpl<T> message : finalDeadLetterMessages) {
+                    producerDLQ.newMessage()
+                            .value(message.getValue())
+                            .properties(message.getProperties())
+                            .sendAsync()
+                            .thenAccept(messageIdInDLQ -> {
+                                
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
+                                
acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {
+                                    if (ex != null) {
+                                        log.warn("[{}] [{}] [{}] Failed to 
acknowledge the message {} of the original topic but send to the DLQ 
successfully.",
+                                                topicName, subscription, 
consumerName, finalMessageId, ex);
+                                    } else {
+                                        result.complete(true);
+                                    }
+                                });
+                            }).exceptionally(ex -> {
+                                log.warn("[{}] [{}] [{}] Failed to send DLQ 
message to {} for message id {}",
+                                        topicName, subscription, consumerName, 
finalMessageId, ex);
+                                result.complete(false);
+                                return null;
+                    });
                 }
-            }
-            if (deadLetterProducer != null) {
-                try {
-                    for (MessageImpl<T> message : deadLetterMessages) {
-                        deadLetterProducer.newMessage()
-                                .value(message.getValue())
-                                .properties(message.getProperties())
-                                .send();
-                    }
-                    acknowledge(messageId);
-                    return true;
-                } catch (Exception e) {
-                    log.error("Send to dead letter topic exception with topic: 
{}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
+            }).exceptionally(ex -> {
+                deadLetterProducer = null;
+                result.complete(false);
+                return null;
+            });
+        } else {
+            result.complete(false);
+        }
+        return result;
+    }
+
+    private void initDeadLetterProducerIfNeeded() {
+        if (deadLetterProducer == null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (deadLetterProducer == null) {
+                    deadLetterProducer = client.newProducer(schema)
+                            .topic(this.deadLetterPolicy.getDeadLetterTopic())
+                            .blockIfQueueFull(false)
+                            .createAsync();
                 }
+            } catch (Exception e) {
+                log.error("Create dead letter producer exception with topic: 
{}", deadLetterPolicy.getDeadLetterTopic(), e);
+            } finally {
+                createProducerLock.writeLock().unlock();
             }
         }
-        return false;
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 312763d..bac1b2d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -139,12 +139,15 @@ public class UnAckedMessageTracker implements Closeable {
                     headPartition.clear();
                     timePartitions.addLast(headPartition);
                 } finally {
-                    if (messageIds.size() > 0) {
-                        consumerBase.onAckTimeoutSend(messageIds);
-                        
consumerBase.redeliverUnacknowledgedMessages(messageIds);
+                    try {
+                        if (messageIds.size() > 0) {
+                            consumerBase.onAckTimeoutSend(messageIds);
+                            
consumerBase.redeliverUnacknowledgedMessages(messageIds);
+                        }
+                        timeout = client.timer().newTimeout(this, 
tickDurationInMs, TimeUnit.MILLISECONDS);
+                    } finally {
+                        writeLock.unlock();
                     }
-                    timeout = client.timer().newTimeout(this, 
tickDurationInMs, TimeUnit.MILLISECONDS);
-                    writeLock.unlock();
                 }
             }
         }, this.tickDurationInMs, TimeUnit.MILLISECONDS);

Reply via email to