This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dc7ce73ed199c932ad015453fe2ea99330a5866f Author: Wenzhi Feng <[email protected]> AuthorDate: Mon Jun 3 10:14:31 2024 +0800 [fix] [broker] maintain last active info in memory only. (#22794) (cherry picked from commit 2c3909c17b0c68a39f2f6f2a50c19216ef3a6ffc) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 - managed-ledger/src/main/proto/MLDataFormats.proto | 3 +- .../broker/service/persistent/PersistentTopic.java | 31 ++++++----- .../pulsar/broker/service/BrokerServiceTest.java | 62 ++++++++++++++++++++++ 4 files changed, 82 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index e47e03c39bf..dea2868b3fa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -470,9 +470,7 @@ public class ManagedCursorImpl implements ManagedCursor { ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<ManagedCursorInfo>() { @Override public void operationComplete(ManagedCursorInfo info, Stat stat) { - updateCursorLedgerStat(info, stat); - lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive; if (log.isDebugEnabled()) { log.debug("[{}] [{}] Recover cursor last active to [{}]", ledger.getName(), name, lastActive); diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index c4e502819fa..fdffed6762d 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -124,7 +124,8 @@ message ManagedCursorInfo { // the current cursor position repeated LongProperty properties = 5; - optional int64 lastActive = 6; + // deprecated, do not persist this field anymore + optional int64 lastActive = 6 [deprecated = true]; // Store which index in the batch message has been deleted repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5eb0600a9fd..cc85d782631 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3163,19 +3163,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal final Integer nsExpirationTime = policies.subscription_expiration_time_minutes; final long expirationTimeMillis = TimeUnit.MINUTES .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); - if (expirationTimeMillis > 0) { - subscriptions.forEach((subName, sub) -> { - if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() - || sub.isReplicated() - || isCompactionSubscription(subName)) { - return; - } - if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) { - sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration " - + "with last active [{}]", topic, subName, sub.cursor.getLastActive())); - } - }); - } + checkInactiveSubscriptions(expirationTimeMillis); } catch (Exception e) { if (log.isDebugEnabled()) { log.debug("[{}] Error getting policies", topic); @@ -3183,6 +3171,23 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } } + @VisibleForTesting + public void checkInactiveSubscriptions(long expirationTimeMillis) { + if (expirationTimeMillis > 0) { + subscriptions.forEach((subName, sub) -> { + if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() + || sub.isReplicated() + || isCompactionSubscription(subName)) { + return; + } + if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) { + sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration " + + "with last active [{}]", topic, subName, sub.cursor.getLastActive())); + } + }); + } + } + @Override public void checkBackloggedCursors() { subscriptions.forEach((subName, subscription) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index ab0b8f813ea..1279b97a675 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORD import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -1311,6 +1312,67 @@ public class BrokerServiceTest extends BrokerTestBase { } + @Test + public void testCheckInactiveSubscriptionWhenNoMessageToAck() throws Exception { + String namespace = "prop/testInactiveSubscriptionWhenNoMessageToAck"; + + try { + admin.namespaces().createNamespace(namespace); + } catch (PulsarAdminException.ConflictException e) { + // Ok.. (if test fails intermittently and namespace is already created) + } + + String topic = "persistent://" + namespace + "/my-topic"; + Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); + producer.send("test".getBytes()); + producer.close(); + + // create consumer to consume all messages + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + consumer.acknowledge(consumer.receive()); + + Optional<Topic> topicOptional = pulsar.getBrokerService().getTopic(topic, true).get(); + assertTrue(topicOptional.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get(); + + // wait for 1s, but consumer is still connected all the time. + // so subscription should not be deleted. + Thread.sleep(1000); + persistentTopic.checkInactiveSubscriptions(1000); + PersistentTopic finalPersistentTopic = persistentTopic; + Awaitility.await().pollDelay(3, TimeUnit.SECONDS).until(() -> + finalPersistentTopic.getSubscriptions().containsKey("sub1")); + PersistentSubscription sub = persistentTopic.getSubscription("sub1"); + + // shutdown pulsar ungracefully + // disable the updateLastActive method to simulate the ungraceful shutdown + ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor(); + ManagedCursorImpl spyCursor = Mockito.spy(cursor); + doNothing().when(spyCursor).updateLastActive(); + Field cursorField = PersistentSubscription.class.getDeclaredField("cursor"); + cursorField.setAccessible(true); + cursorField.set(sub, spyCursor); + + // restart pulsar + consumer.close(); + restartBroker(); + + admin.lookups().lookupTopic(topic); + topicOptional = pulsar.getBrokerService().getTopic(topic, true).get(); + assertTrue(topicOptional.isPresent()); + persistentTopic = (PersistentTopic) topicOptional.get(); + persistentTopic.checkInactiveSubscriptions(1000); + + // check if subscription is still present + PersistentTopic finalPersistentTopic1 = persistentTopic; + Awaitility.await().pollDelay(3, TimeUnit.SECONDS).until(() -> + finalPersistentTopic1.getSubscriptions().containsKey("sub1")); + sub = persistentTopic.getSubscription("sub1"); + assertNotNull(sub); + } + + /** * Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and * it should not introduce deadlock while performing it.
