This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 508dfb358c81edea97c70e439aa8cfea54244be1 Author: lipenghui <[email protected]> AuthorDate: Thu Nov 26 09:25:52 2020 +0800 Clear delayed messages when clear backlog. (#8691) Clear delayed messages when clear backlog. (cherry picked from commit a022d28735ea99e8f5c13053bfab3fef7f095c31) --- .../broker/delayed/DelayedDeliveryTracker.java | 5 ++++ .../delayed/InMemoryDelayedDeliveryTracker.java | 5 ++++ .../apache/pulsar/broker/service/Dispatcher.java | 4 +++ .../PersistentDispatcherMultipleConsumers.java | 5 ++++ .../service/persistent/PersistentSubscription.java | 1 + .../service/persistent/DelayedDeliveryTest.java | 31 ++++++++++++++++++++++ .../util/collections/TripleLongPriorityQueue.java | 8 ++++++ 7 files changed, 59 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java index e772d22..6b122bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java @@ -69,6 +69,11 @@ public interface DelayedDeliveryTracker extends AutoCloseable { void resetTickTime(long tickTime); /** + * Clear all delayed messages from the tracker. + */ + void clear(); + + /** * Close the subscription tracker and release all resources. */ void close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index a702a01..5c37b81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -135,6 +135,11 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T } @Override + public void clear() { + this.priorityQueue.clear(); + } + + @Override public long getNumberOfDelayedMessages() { return priorityQueue.size(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index a16adc9..483190f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -108,6 +108,10 @@ public interface Dispatcher { return 0; } + default void clearDelayedMessages() { + //No-op + } + default void cursorIsReset() { //No-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index be9a7aa..265186b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -798,6 +798,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } @Override + public void clearDelayedMessages() { + this.delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear); + } + + @Override public void cursorIsReset() { if (this.lastIndividualDeletedRangeFromCursorRecovery != null) { this.lastIndividualDeletedRangeFromCursorRecovery = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 5e22dad..0a578d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -493,6 +493,7 @@ public class PersistentSubscription implements Subscription { log.debug("[{}][{}] Backlog size after clearing: {}", topicName, subName, cursor.getNumberOfEntriesInBacklog(false)); } + dispatcher.clearDelayedMessages(); future.complete(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index 3c527d8..799d7f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -42,6 +44,8 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -447,4 +451,31 @@ public class DelayedDeliveryTest extends ProducerConsumerBase { msg = consumer.receive(3, TimeUnit.SECONDS); assertNotNull(msg); } + + @Test + public void testClearDelayedMessagesWhenClearBacklog() throws PulsarClientException, PulsarAdminException { + final String topic = "persistent://public/default/testClearDelayedMessagesWhenClearBacklog-" + UUID.randomUUID().toString(); + final String subName = "my-sub"; + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic).create(); + + final int messages = 100; + for (int i = 0; i < messages; i++) { + producer.newMessage().deliverAfter(1, TimeUnit.HOURS).value("Delayed Message - " + i).send(); + } + + Dispatcher dispatcher = pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName).getDispatcher(); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), messages)); + + admin.topics().skipAllMessages(topic, subName); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0)); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java index 6a9168a..5075eb1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java @@ -139,6 +139,14 @@ public class TripleLongPriorityQueue implements AutoCloseable { return size; } + /** + * Clear all items. + */ + public void clear() { + this.buffer.clear(); + this.size = 0; + } + private void increaseCapacity() { // For bigger sizes, increase by 50% this.capacity += (capacity <= 256 ? capacity : capacity / 2);
