This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2c3909c17b0 [fix] [broker] maintain last active info in memory only.
(#22794)
2c3909c17b0 is described below
commit 2c3909c17b0c68a39f2f6f2a50c19216ef3a6ffc
Author: Wenzhi Feng <[email protected]>
AuthorDate: Mon Jun 3 10:14:31 2024 +0800
[fix] [broker] maintain last active info in memory only. (#22794)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 -
managed-ledger/src/main/proto/MLDataFormats.proto | 3 +-
.../broker/service/persistent/PersistentTopic.java | 31 ++++++-----
.../pulsar/broker/service/BrokerServiceTest.java | 62 ++++++++++++++++++++++
4 files changed, 82 insertions(+), 16 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 18d9cd7cb05..1d2065ef8e3 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -475,9 +475,7 @@ public class ManagedCursorImpl implements ManagedCursor {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new
MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
-
updateCursorLedgerStat(info, stat);
- lastActive = info.getLastActive() != 0 ? info.getLastActive()
: lastActive;
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Recover cursor last active to [{}]",
ledger.getName(), name, lastActive);
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto
b/managed-ledger/src/main/proto/MLDataFormats.proto
index c4e502819fa..fdffed6762d 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -124,7 +124,8 @@ message ManagedCursorInfo {
// the current cursor position
repeated LongProperty properties = 5;
- optional int64 lastActive = 6;
+ // deprecated, do not persist this field anymore
+ optional int64 lastActive = 6 [deprecated = true];
// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 69c7f404fdd..18e69250c16 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3227,19 +3227,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
final Integer nsExpirationTime =
policies.subscription_expiration_time_minutes;
final long expirationTimeMillis = TimeUnit.MINUTES
.toMillis(nsExpirationTime == null ? defaultExpirationTime
: nsExpirationTime);
- if (expirationTimeMillis > 0) {
- subscriptions.forEach((subName, sub) -> {
- if (sub.dispatcher != null &&
sub.dispatcher.isConsumerConnected()
- || sub.isReplicated()
- || isCompactionSubscription(subName)) {
- return;
- }
- if (System.currentTimeMillis() -
sub.cursor.getLastActive() > expirationTimeMillis) {
- sub.delete().thenAccept(v -> log.info("[{}][{}] The
subscription was deleted due to expiration "
- + "with last active [{}]", topic, subName,
sub.cursor.getLastActive()));
- }
- });
- }
+ checkInactiveSubscriptions(expirationTimeMillis);
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
@@ -3247,6 +3235,23 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}
+ @VisibleForTesting
+ public void checkInactiveSubscriptions(long expirationTimeMillis) {
+ if (expirationTimeMillis > 0) {
+ subscriptions.forEach((subName, sub) -> {
+ if (sub.dispatcher != null &&
sub.dispatcher.isConsumerConnected()
+ || sub.isReplicated()
+ || isCompactionSubscription(subName)) {
+ return;
+ }
+ if (System.currentTimeMillis() - sub.cursor.getLastActive() >
expirationTimeMillis) {
+ sub.delete().thenAccept(v -> log.info("[{}][{}] The
subscription was deleted due to expiration "
+ + "with last active [{}]", topic, subName,
sub.cursor.getLastActive()));
+ }
+ });
+ }
+ }
+
@Override
public void checkBackloggedCursors() {
subscriptions.forEach((subName, subscription) -> {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index be1221b7fab..172842b5ed3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -22,6 +22,7 @@ import static
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORD
import static
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -1313,6 +1314,67 @@ public class BrokerServiceTest extends BrokerTestBase {
}
+ @Test
+ public void testCheckInactiveSubscriptionWhenNoMessageToAck() throws
Exception {
+ String namespace = "prop/testInactiveSubscriptionWhenNoMessageToAck";
+
+ try {
+ admin.namespaces().createNamespace(namespace);
+ } catch (PulsarAdminException.ConflictException e) {
+ // Ok.. (if test fails intermittently and namespace is already
created)
+ }
+
+ String topic = "persistent://" + namespace + "/my-topic";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topic).create();
+ producer.send("test".getBytes());
+ producer.close();
+
+ // create consumer to consume all messages
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+ consumer.acknowledge(consumer.receive());
+
+ Optional<Topic> topicOptional =
pulsar.getBrokerService().getTopic(topic, true).get();
+ assertTrue(topicOptional.isPresent());
+ PersistentTopic persistentTopic = (PersistentTopic)
topicOptional.get();
+
+ // wait for 1s, but consumer is still connected all the time.
+ // so subscription should not be deleted.
+ Thread.sleep(1000);
+ persistentTopic.checkInactiveSubscriptions(1000);
+ PersistentTopic finalPersistentTopic = persistentTopic;
+ Awaitility.await().pollDelay(3, TimeUnit.SECONDS).until(() ->
+ finalPersistentTopic.getSubscriptions().containsKey("sub1"));
+ PersistentSubscription sub = persistentTopic.getSubscription("sub1");
+
+ // shutdown pulsar ungracefully
+ // disable the updateLastActive method to simulate the ungraceful
shutdown
+ ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();
+ ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+ doNothing().when(spyCursor).updateLastActive();
+ Field cursorField =
PersistentSubscription.class.getDeclaredField("cursor");
+ cursorField.setAccessible(true);
+ cursorField.set(sub, spyCursor);
+
+ // restart pulsar
+ consumer.close();
+ restartBroker();
+
+ admin.lookups().lookupTopic(topic);
+ topicOptional = pulsar.getBrokerService().getTopic(topic, true).get();
+ assertTrue(topicOptional.isPresent());
+ persistentTopic = (PersistentTopic) topicOptional.get();
+ persistentTopic.checkInactiveSubscriptions(1000);
+
+ // check if subscription is still present
+ PersistentTopic finalPersistentTopic1 = persistentTopic;
+ Awaitility.await().pollDelay(3, TimeUnit.SECONDS).until(() ->
+ finalPersistentTopic1.getSubscriptions().containsKey("sub1"));
+ sub = persistentTopic.getSubscription("sub1");
+ assertNotNull(sub);
+ }
+
+
/**
* Verifies brokerService should not have deadlock and successfully remove
topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.