This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new f9475fe Branch-2.7 fix Delayed Messages (#11374)
f9475fe is described below
commit f9475fe192500c892be47b02b58b3ce0fd05e8f9
Author: Enrico Olivelli <[email protected]>
AuthorDate: Tue Jul 20 04:42:29 2021 +0200
Branch-2.7 fix Delayed Messages (#11374)
Cherry-picking #10762 broke the Delayed messages feature in branch-2.7.
This patch restores the method that has been dropped
---
.../PersistentDispatcherMultipleConsumers.java | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index adc5b2b..4d2e3f4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import
org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -794,6 +795,24 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
}
+ @Override
+ public boolean trackDelayedDelivery(long ledgerId, long entryId,
PulsarApi.MessageMetadata msgMetadata) {
+ if (!topic.isDelayedDeliveryEnabled()) {
+ // If broker has the feature disabled, always deliver messages
immediately
+ return false;
+ }
+ synchronized (this) {
+ if (!delayedDeliveryTracker.isPresent()) {
+ // Initialize the tracker the first time we need to use it
+ delayedDeliveryTracker = Optional
+
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
+ }
+
+
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
+ return delayedDeliveryTracker.get().addMessage(ledgerId, entryId,
msgMetadata.getDeliverAtTime());
+ }
+ }
+
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int
maxMessagesToRead) {
if (!redeliveryMessages.isEmpty()) {
return
redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);