This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 6a6917e  [bot-cherry-pick]Clear delayed messages when clear backlog. 
(#8714)
6a6917e is described below

commit 6a6917ed34be75b7dcae1c4eeb5c4b9b64cc9071
Author: lipenghui <[email protected]>
AuthorDate: Tue Dec 1 10:39:24 2020 +0800

    [bot-cherry-pick]Clear delayed messages when clear backlog. (#8714)
    
    Clear delayed messages when clear backlog.
    
    (cherry-pick from a022d28735ea99e8f5c13053bfab3fef7f095c31)
---
 .../broker/delayed/DelayedDeliveryTracker.java     |  5 ++++
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  5 ++++
 .../apache/pulsar/broker/service/Dispatcher.java   |  4 +++
 .../PersistentDispatcherMultipleConsumers.java     |  5 ++++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../service/persistent/DelayedDeliveryTest.java    | 32 ++++++++++++++++++++++
 .../util/collections/TripleLongPriorityQueue.java  |  8 ++++++
 7 files changed, 60 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index e772d22..6b122bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -69,6 +69,11 @@ public interface DelayedDeliveryTracker extends 
AutoCloseable {
     void resetTickTime(long tickTime);
 
     /**
+     * Clear all delayed messages from the tracker.
+     */
+    void clear();
+
+    /**
      * Close the subscription tracker and release all resources.
      */
     void close();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index a702a01..5c37b81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -135,6 +135,11 @@ public class InMemoryDelayedDeliveryTracker implements 
DelayedDeliveryTracker, T
     }
 
     @Override
+    public void clear() {
+        this.priorityQueue.clear();
+    }
+
+    @Override
     public long getNumberOfDelayedMessages() {
         return priorityQueue.size();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 2bd053f..f4eed47 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -108,6 +108,10 @@ public interface Dispatcher {
         return 0;
     }
 
+    default void clearDelayedMessages() {
+        //No-op
+    }
+
     default void cursorIsReset() {
         //No-op
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 63f1691..aec3556 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -778,6 +778,11 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     }
 
     @Override
+    public void clearDelayedMessages() {
+        this.delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear);
+    }
+
+    @Override
     public void cursorIsReset() {
         if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
             this.lastIndividualDeletedRangeFromCursorRecovery = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9b9a2c8..aa9f263 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -587,6 +587,7 @@ public class PersistentSubscription implements Subscription 
{
                     log.debug("[{}][{}] Backlog size after clearing: {}", 
topicName, subName,
                             cursor.getNumberOfEntriesInBacklog(false));
                 }
+                dispatcher.clearDelayedMessages();
                 future.complete(null);
             }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 82f6241..4528257 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -26,10 +26,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -37,6 +40,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -315,4 +320,31 @@ public class DelayedDeliveryTest extends 
ProducerConsumerBase {
             }
         }
     }
+
+    @Test
+    public void testClearDelayedMessagesWhenClearBacklog() throws 
PulsarClientException, PulsarAdminException {
+        final String topic = 
"persistent://public/default/testClearDelayedMessagesWhenClearBacklog-" + 
UUID.randomUUID().toString();
+        final String subName = "my-sub";
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic).create();
+
+        final int messages = 100;
+        for (int i = 0; i < messages; i++) {
+            producer.newMessage().deliverAfter(1, 
TimeUnit.HOURS).value("Delayed Message - " + i).send();
+        }
+
+        Dispatcher dispatcher = 
pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName).getDispatcher();
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> 
Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), messages));
+
+        admin.topics().skipAllMessages(topic, subName);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> 
Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 6a9168a..5075eb1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -139,6 +139,14 @@ public class TripleLongPriorityQueue implements 
AutoCloseable {
         return size;
     }
 
+    /**
+     * Clear all items.
+     */
+    public void clear() {
+        this.buffer.clear();
+        this.size = 0;
+    }
+
     private void increaseCapacity() {
         // For bigger sizes, increase by 50%
         this.capacity += (capacity <= 256 ? capacity : capacity / 2);

Reply via email to