This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new f9475fe  Branch-2.7 fix Delayed Messages (#11374)
f9475fe is described below

commit f9475fe192500c892be47b02b58b3ce0fd05e8f9
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue Jul 20 04:42:29 2021 +0200

    Branch-2.7 fix Delayed Messages (#11374)
    
    Cherry-picking #10762 broke the Delayed messages feature in branch-2.7.
    
    This patch restores the method that has been dropped
---
 .../PersistentDispatcherMultipleConsumers.java        | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index adc5b2b..4d2e3f4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
 import 
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
 import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -794,6 +795,24 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
     }
 
+    @Override
+    public boolean trackDelayedDelivery(long ledgerId, long entryId, 
PulsarApi.MessageMetadata msgMetadata) {
+        if (!topic.isDelayedDeliveryEnabled()) {
+            // If broker has the feature disabled, always deliver messages 
immediately
+            return false;
+        }
+        synchronized (this) {
+            if (!delayedDeliveryTracker.isPresent()) {
+                // Initialize the tracker the first time we need to use it
+                delayedDeliveryTracker = Optional
+                        
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
+            }
+
+            
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
+            return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, 
msgMetadata.getDeliverAtTime());
+        }
+    }
+
     protected synchronized Set<PositionImpl> getMessagesToReplayNow(int 
maxMessagesToRead) {
         if (!redeliveryMessages.isEmpty()) {
             return 
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);

Reply via email to