lovelle commented on a change in pull request #4062: Delayed message delivery 
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r276710330
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##########
 @@ -643,6 +654,59 @@ public void 
initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
             this.dispatchRateLimiter = Optional.of(new 
DispatchRateLimiter(topic, name));
         }
     }
-    
+
+    @Override
+    public synchronized boolean trackDelayedDelivery(long ledgerId, long 
entryId, MessageMetadata msgMetadata) {
+        if (!isDelayedDeliveryEnabled) {
+            // If broker has the feature disabled, always deliver messages 
immediately
+            return false;
+        }
+
+        if (!delayedDeliveryTracker.isPresent()) {
+            // Initialize the tracker the first time we need to use it
+            delayedDeliveryTracker = Optional.of(new 
DelayedDeliveryTracker(this));
+        }
+
+        return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, 
msgMetadata.getDeliverAtTime());
+    }
+
+    /**
+     * Returns whether we have any message that could be immediately replayed.
+     * This could be a message that was requested to be re-delivered or a 
delayed
+     * delivery.
+     */
+    private boolean hasMessagesToReplay() {
 
 Review comment:
   I think this method should be synchronized too, because of 
`delayedDeliveryTracker` and `messagesToRedeliver`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to