This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 508dfb358c81edea97c70e439aa8cfea54244be1
Author: lipenghui <[email protected]>
AuthorDate: Thu Nov 26 09:25:52 2020 +0800

    Clear delayed messages when clear backlog. (#8691)
    
    Clear delayed messages when clear backlog.
    
    (cherry picked from commit a022d28735ea99e8f5c13053bfab3fef7f095c31)
---
 .../broker/delayed/DelayedDeliveryTracker.java     |  5 ++++
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  5 ++++
 .../apache/pulsar/broker/service/Dispatcher.java   |  4 +++
 .../PersistentDispatcherMultipleConsumers.java     |  5 ++++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../service/persistent/DelayedDeliveryTest.java    | 31 ++++++++++++++++++++++
 .../util/collections/TripleLongPriorityQueue.java  |  8 ++++++
 7 files changed, 59 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index e772d22..6b122bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -69,6 +69,11 @@ public interface DelayedDeliveryTracker extends 
AutoCloseable {
     void resetTickTime(long tickTime);
 
     /**
+     * Clear all delayed messages from the tracker.
+     */
+    void clear();
+
+    /**
      * Close the subscription tracker and release all resources.
      */
     void close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index a702a01..5c37b81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -135,6 +135,11 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
     }
 
     @Override
+    public void clear() {
+        this.priorityQueue.clear();
+    }
+
+    @Override
     public long getNumberOfDelayedMessages() {
         return priorityQueue.size();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index a16adc9..483190f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -108,6 +108,10 @@ public interface Dispatcher {
         return 0;
     }
 
+    default void clearDelayedMessages() {
+        //No-op
+    }
+
     default void cursorIsReset() {
         //No-op
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index be9a7aa..265186b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -798,6 +798,11 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     }
 
     @Override
+    public void clearDelayedMessages() {
+        this.delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear);
+    }
+
+    @Override
     public void cursorIsReset() {
         if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
             this.lastIndividualDeletedRangeFromCursorRecovery = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5e22dad..0a578d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -493,6 +493,7 @@ public class PersistentSubscription implements Subscription 
{
                     log.debug("[{}][{}] Backlog size after clearing: {}", 
topicName, subName,
                             cursor.getNumberOfEntriesInBacklog(false));
                 }
+                dispatcher.clearDelayedMessages();
                 future.complete(null);
             }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 3c527d8..799d7f1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -34,6 +34,8 @@ import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -42,6 +44,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -447,4 +451,31 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
         msg = consumer.receive(3, TimeUnit.SECONDS);
         assertNotNull(msg);
     }
+
+    @Test
+    public void testClearDelayedMessagesWhenClearBacklog() throws 
PulsarClientException, PulsarAdminException {
+        final String topic = 
"persistent://public/default/testClearDelayedMessagesWhenClearBacklog-" + 
UUID.randomUUID().toString();
+        final String subName = "my-sub";
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic).create();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().deliverAfter(1, 
TimeUnit.HOURS).value("Delayed Message - " + i).send();
+        }
+
+        Dispatcher dispatcher = 
pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName).getDispatcher();
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> 
Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), messages));
+
+        admin.topics().skipAllMessages(topic, subName);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> 
Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 6a9168a..5075eb1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -139,6 +139,14 @@ public class TripleLongPriorityQueue implements 
AutoCloseable {
         return size;
     }
 
+    /**
+     * Clear all items.
+     */
+    public void clear() {
+        this.buffer.clear();
+        this.size = 0;
+    }
+
     private void increaseCapacity() {
         // For bigger sizes, increase by 50%
         this.capacity += (capacity <= 256 ? capacity : capacity / 2);

Reply via email to