This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e583b055915bcd9b295bbed0d1b99f6d139ec568 Author: gaozhangmin <[email protected]> AuthorDate: Tue Feb 22 02:18:56 2022 +0800 [Broker] waitingCursors potential heap memory leak (#13939) (cherry picked from commit 478fd36227c2ede3e1162dd9a4361cffc5dbfceb) --- .../apache/bookkeeper/mledger/ManagedLedger.java | 7 +++++++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 4 ++++ .../service/persistent/PersistentSubscription.java | 1 + .../broker/admin/CreateSubscriptionTest.java | 22 ++++++++++++++++++++++ .../offload/jcloud/impl/MockManagedLedger.java | 5 +++++ 5 files changed, 39 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 0200e25..cd39919 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -289,6 +289,13 @@ public interface ManagedLedger { void deleteCursor(String name) throws InterruptedException, ManagedLedgerException; /** + * Remove a ManagedCursor from this ManagedLedger's waitingCursors. + * + * @param cursor the ManagedCursor + */ + void removeWaitingCursor(ManagedCursor cursor); + + /** * Open a ManagedCursor asynchronously. * * @see #openCursor(String) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 5288160..8728753 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3415,6 +3415,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } + public void removeWaitingCursor(ManagedCursor cursor) { + this.waitingCursors.remove(cursor); + } + public boolean isCursorActive(ManagedCursor cursor) { return activeCursors.get(cursor.getName()) != null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f69db68..8637ecd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -299,6 +299,7 @@ public class PersistentSubscription implements Subscription { if (dispatcher != null && dispatcher.getConsumers().isEmpty()) { deactivateCursor(); + topic.getManagedLedger().removeWaitingCursor(cursor); if (!cursor.isDurable()) { // If cursor is not durable, we need to clean up the subscription as well diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java index e0d1720..09f2c91 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -19,15 +19,22 @@ package org.apache.pulsar.broker.admin; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; import javax.ws.rs.ClientErrorException; import javax.ws.rs.core.Response.Status; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -127,4 +134,19 @@ public class CreateSubscriptionTest extends ProducerConsumerBase { Lists.newArrayList("sub-1")); } } + + @Test + public void testWaitingCurosrCausedMemoryLeak() throws Exception { + String topic = "persistent://my-property/my-ns/my-topic"; + for (int i = 0; i < 10; i ++) { + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionType(SubscriptionType.Failover).subscriptionName("test" + i).subscribe(); + Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected())); + consumer.close(); + } + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl)(topicRef.getManagedLedger()); + assertEquals(ml.getWaitingCursorsCount(), 0); + } + } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 229cd66..907aba6 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -138,6 +138,11 @@ public class MockManagedLedger implements ManagedLedger { } @Override + public void removeWaitingCursor(ManagedCursor cursor) { + + } + + @Override public void asyncOpenCursor(String name, AsyncCallbacks.OpenCursorCallback callback, Object ctx) { }
