lovelle commented on a change in pull request #4062: Delayed message delivery 
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r276704232
 
 

 ##########
 File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 ##########
 @@ -643,6 +654,59 @@ public void 
initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
             this.dispatchRateLimiter = Optional.of(new 
DispatchRateLimiter(topic, name));
         }
     }
-    
+
+    @Override
+    public synchronized boolean trackDelayedDelivery(long ledgerId, long 
entryId, MessageMetadata msgMetadata) {
+        if (!isDelayedDeliveryEnabled) {
+            // If broker has the feature disabled, always deliver messages 
immediately
+            return false;
+        }
+
+        if (!delayedDeliveryTracker.isPresent()) {
+            // Initialize the tracker the first time we need to use it
+            delayedDeliveryTracker = Optional.of(new 
DelayedDeliveryTracker(this));
+        }
+
+        return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, 
msgMetadata.getDeliverAtTime());
+    }
+
+    /**
+     * Returns whether we have any message that could be immediately replayed.
+     * This could be a message that was requested to be re-delivered or a 
delayed
+     * delivery.
+     */
+    private boolean hasMessagesToReplay() {
+        if (!messagesToRedeliver.isEmpty()) {
+            return true;
+        }
+
+        if (delayedDeliveryTracker.isPresent() && 
delayedDeliveryTracker.get().hasMessageAvailable()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+        if (!messagesToRedeliver.isEmpty()) {
+            return messagesToRedeliver.items(maxMessagesToRead,
 
 Review comment:
   I think we should protect this method, here we have a check-then-act race 
condition because we call this method from `readMoreEntries` which is not 
synchronized.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to