This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dc7ce73ed199c932ad015453fe2ea99330a5866f
Author: Wenzhi Feng <[email protected]>
AuthorDate: Mon Jun 3 10:14:31 2024 +0800

    [fix] [broker] maintain last active info in memory only. (#22794)
    
    (cherry picked from commit 2c3909c17b0c68a39f2f6f2a50c19216ef3a6ffc)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 -
 managed-ledger/src/main/proto/MLDataFormats.proto  |  3 +-
 .../broker/service/persistent/PersistentTopic.java | 31 ++++++-----
 .../pulsar/broker/service/BrokerServiceTest.java   | 62 ++++++++++++++++++++++
 4 files changed, 82 insertions(+), 16 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index e47e03c39bf..dea2868b3fa 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -470,9 +470,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new 
MetaStoreCallback<ManagedCursorInfo>() {
             @Override
             public void operationComplete(ManagedCursorInfo info, Stat stat) {
-
                 updateCursorLedgerStat(info, stat);
-                lastActive = info.getLastActive() != 0 ? info.getLastActive() 
: lastActive;
 
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] Recover cursor last active to [{}]", 
ledger.getName(), name, lastActive);
diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto 
b/managed-ledger/src/main/proto/MLDataFormats.proto
index c4e502819fa..fdffed6762d 100644
--- a/managed-ledger/src/main/proto/MLDataFormats.proto
+++ b/managed-ledger/src/main/proto/MLDataFormats.proto
@@ -124,7 +124,8 @@ message ManagedCursorInfo {
     // the current cursor position
     repeated LongProperty properties = 5;
 
-    optional int64 lastActive = 6;
+    // deprecated, do not persist this field anymore
+    optional int64 lastActive = 6 [deprecated = true];
 
     // Store which index in the batch message has been deleted
     repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5eb0600a9fd..cc85d782631 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3163,19 +3163,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             final Integer nsExpirationTime = 
policies.subscription_expiration_time_minutes;
             final long expirationTimeMillis = TimeUnit.MINUTES
                     .toMillis(nsExpirationTime == null ? defaultExpirationTime 
: nsExpirationTime);
-            if (expirationTimeMillis > 0) {
-                subscriptions.forEach((subName, sub) -> {
-                    if (sub.dispatcher != null && 
sub.dispatcher.isConsumerConnected()
-                            || sub.isReplicated()
-                            || isCompactionSubscription(subName)) {
-                        return;
-                    }
-                    if (System.currentTimeMillis() - 
sub.cursor.getLastActive() > expirationTimeMillis) {
-                        sub.delete().thenAccept(v -> log.info("[{}][{}] The 
subscription was deleted due to expiration "
-                                + "with last active [{}]", topic, subName, 
sub.cursor.getLastActive()));
-                    }
-                });
-            }
+            checkInactiveSubscriptions(expirationTimeMillis);
         } catch (Exception e) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error getting policies", topic);
@@ -3183,6 +3171,23 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
     }
 
+    @VisibleForTesting
+    public void checkInactiveSubscriptions(long expirationTimeMillis) {
+        if (expirationTimeMillis > 0) {
+            subscriptions.forEach((subName, sub) -> {
+                if (sub.dispatcher != null && 
sub.dispatcher.isConsumerConnected()
+                        || sub.isReplicated()
+                        || isCompactionSubscription(subName)) {
+                    return;
+                }
+                if (System.currentTimeMillis() - sub.cursor.getLastActive() > 
expirationTimeMillis) {
+                    sub.delete().thenAccept(v -> log.info("[{}][{}] The 
subscription was deleted due to expiration "
+                            + "with last active [{}]", topic, subName, 
sub.cursor.getLastActive()));
+                }
+            });
+        }
+    }
+
     @Override
     public void checkBackloggedCursors() {
         subscriptions.forEach((subName, subscription) -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index ab0b8f813ea..1279b97a675 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORD
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -1311,6 +1312,67 @@ public class BrokerServiceTest extends BrokerTestBase {
 
     }
 
+    @Test
+    public void testCheckInactiveSubscriptionWhenNoMessageToAck() throws 
Exception {
+        String namespace = "prop/testInactiveSubscriptionWhenNoMessageToAck";
+
+        try {
+            admin.namespaces().createNamespace(namespace);
+        } catch (PulsarAdminException.ConflictException e) {
+            // Ok.. (if test fails intermittently and namespace is already 
created)
+        }
+
+        String topic = "persistent://" + namespace + "/my-topic";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        producer.send("test".getBytes());
+        producer.close();
+
+        // create consumer to consume all messages
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+        consumer.acknowledge(consumer.receive());
+
+        Optional<Topic> topicOptional = 
pulsar.getBrokerService().getTopic(topic, true).get();
+        assertTrue(topicOptional.isPresent());
+        PersistentTopic persistentTopic = (PersistentTopic) 
topicOptional.get();
+
+        // wait for 1s, but consumer is still connected all the time.
+        // so subscription should not be deleted.
+        Thread.sleep(1000);
+        persistentTopic.checkInactiveSubscriptions(1000);
+        PersistentTopic finalPersistentTopic = persistentTopic;
+        Awaitility.await().pollDelay(3, TimeUnit.SECONDS).until(() ->
+                finalPersistentTopic.getSubscriptions().containsKey("sub1"));
+        PersistentSubscription sub = persistentTopic.getSubscription("sub1");
+
+        // shutdown pulsar ungracefully
+        // disable the updateLastActive method to simulate the ungraceful 
shutdown
+        ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();
+        ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+        doNothing().when(spyCursor).updateLastActive();
+        Field cursorField = 
PersistentSubscription.class.getDeclaredField("cursor");
+        cursorField.setAccessible(true);
+        cursorField.set(sub, spyCursor);
+
+        // restart pulsar
+        consumer.close();
+        restartBroker();
+
+        admin.lookups().lookupTopic(topic);
+        topicOptional = pulsar.getBrokerService().getTopic(topic, true).get();
+        assertTrue(topicOptional.isPresent());
+        persistentTopic = (PersistentTopic) topicOptional.get();
+        persistentTopic.checkInactiveSubscriptions(1000);
+
+        // check if subscription is still present
+        PersistentTopic finalPersistentTopic1 = persistentTopic;
+        Awaitility.await().pollDelay(3, TimeUnit.SECONDS).until(() ->
+                finalPersistentTopic1.getSubscriptions().containsKey("sub1"));
+        sub = persistentTopic.getSubscription("sub1");
+        assertNotNull(sub);
+    }
+
+
     /**
      * Verifies brokerService should not have deadlock and successfully remove 
topic from topicMap on topic-failure and
      * it should not introduce deadlock while performing it.

Reply via email to