lovelle commented on a change in pull request #4062: Delayed message delivery
implementation
URL: https://github.com/apache/pulsar/pull/4062#discussion_r276704232
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -643,6 +654,59 @@ public void
initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(topic, name));
}
}
-
+
+ @Override
+ public synchronized boolean trackDelayedDelivery(long ledgerId, long
entryId, MessageMetadata msgMetadata) {
+ if (!isDelayedDeliveryEnabled) {
+ // If broker has the feature disabled, always deliver messages
immediately
+ return false;
+ }
+
+ if (!delayedDeliveryTracker.isPresent()) {
+ // Initialize the tracker the first time we need to use it
+ delayedDeliveryTracker = Optional.of(new
DelayedDeliveryTracker(this));
+ }
+
+ return delayedDeliveryTracker.get().addMessage(ledgerId, entryId,
msgMetadata.getDeliverAtTime());
+ }
+
+ /**
+ * Returns whether we have any message that could be immediately replayed.
+ * This could be a message that was requested to be re-delivered or a
delayed
+ * delivery.
+ */
+ private boolean hasMessagesToReplay() {
+ if (!messagesToRedeliver.isEmpty()) {
+ return true;
+ }
+
+ if (delayedDeliveryTracker.isPresent() &&
delayedDeliveryTracker.get().hasMessageAvailable()) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+ if (!messagesToRedeliver.isEmpty()) {
+ return messagesToRedeliver.items(maxMessagesToRead,
Review comment:
I think we should protect this too, here we have a check-then-act race
condition.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services