This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 6a6917e [bot-cherry-pick]Clear delayed messages when clear backlog.
(#8714)
6a6917e is described below
commit 6a6917ed34be75b7dcae1c4eeb5c4b9b64cc9071
Author: lipenghui <[email protected]>
AuthorDate: Tue Dec 1 10:39:24 2020 +0800
[bot-cherry-pick]Clear delayed messages when clear backlog. (#8714)
Clear delayed messages when clear backlog.
(cherry-pick from a022d28735ea99e8f5c13053bfab3fef7f095c31)
---
.../broker/delayed/DelayedDeliveryTracker.java | 5 ++++
.../delayed/InMemoryDelayedDeliveryTracker.java | 5 ++++
.../apache/pulsar/broker/service/Dispatcher.java | 4 +++
.../PersistentDispatcherMultipleConsumers.java | 5 ++++
.../service/persistent/PersistentSubscription.java | 1 +
.../service/persistent/DelayedDeliveryTest.java | 32 ++++++++++++++++++++++
.../util/collections/TripleLongPriorityQueue.java | 8 ++++++
7 files changed, 60 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
index e772d22..6b122bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java
@@ -69,6 +69,11 @@ public interface DelayedDeliveryTracker extends
AutoCloseable {
void resetTickTime(long tickTime);
/**
+ * Clear all delayed messages from the tracker.
+ */
+ void clear();
+
+ /**
* Close the subscription tracker and release all resources.
*/
void close();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index a702a01..5c37b81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -135,6 +135,11 @@ public class InMemoryDelayedDeliveryTracker implements
DelayedDeliveryTracker, T
}
@Override
+ public void clear() {
+ this.priorityQueue.clear();
+ }
+
+ @Override
public long getNumberOfDelayedMessages() {
return priorityQueue.size();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 2bd053f..f4eed47 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -108,6 +108,10 @@ public interface Dispatcher {
return 0;
}
+ default void clearDelayedMessages() {
+ //No-op
+ }
+
default void cursorIsReset() {
//No-op
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 63f1691..aec3556 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -778,6 +778,11 @@ public class PersistentDispatcherMultipleConsumers extends
AbstractDispatcherMul
}
@Override
+ public void clearDelayedMessages() {
+ this.delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear);
+ }
+
+ @Override
public void cursorIsReset() {
if (this.lastIndividualDeletedRangeFromCursorRecovery != null) {
this.lastIndividualDeletedRangeFromCursorRecovery = null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 9b9a2c8..aa9f263 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -587,6 +587,7 @@ public class PersistentSubscription implements Subscription
{
log.debug("[{}][{}] Backlog size after clearing: {}",
topicName, subName,
cursor.getNumberOfEntriesInBacklog(false));
}
+ dispatcher.clearDelayedMessages();
future.complete(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 82f6241..4528257 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -26,10 +26,13 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -37,6 +40,8 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -315,4 +320,31 @@ public class DelayedDeliveryTest extends
ProducerConsumerBase {
}
}
}
+
+ @Test
+ public void testClearDelayedMessagesWhenClearBacklog() throws
PulsarClientException, PulsarAdminException {
+ final String topic =
"persistent://public/default/testClearDelayedMessagesWhenClearBacklog-" +
UUID.randomUUID().toString();
+ final String subName = "my-sub";
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic).create();
+
+ final int messages = 100;
+ for (int i = 0; i < messages; i++) {
+ producer.newMessage().deliverAfter(1,
TimeUnit.HOURS).value("Delayed Message - " + i).send();
+ }
+
+ Dispatcher dispatcher =
pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName).getDispatcher();
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), messages));
+
+ admin.topics().skipAllMessages(topic, subName);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
index 6a9168a..5075eb1 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/TripleLongPriorityQueue.java
@@ -139,6 +139,14 @@ public class TripleLongPriorityQueue implements
AutoCloseable {
return size;
}
+ /**
+ * Clear all items.
+ */
+ public void clear() {
+ this.buffer.clear();
+ this.size = 0;
+ }
+
private void increaseCapacity() {
// For bigger sizes, increase by 50%
this.capacity += (capacity <= 256 ? capacity : capacity / 2);