This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a6986b1b664 [improve][broker] Improve 
SystemTopicBasedTopicPoliciesService reader to reduce GC pressure (#23780)
a6986b1b664 is described below

commit a6986b1b664bf2a8da6e99c2db39c65b08b9f3a4
Author: 道君 <[email protected]>
AuthorDate: Fri Jan 3 16:35:39 2025 +0800

    [improve][broker] Improve SystemTopicBasedTopicPoliciesService reader to 
reduce GC pressure (#23780)
---
 .../service/SystemTopicBasedTopicPoliciesService.java      | 14 +++++++++++---
 .../broker/systopic/TopicPoliciesSystemTopicClient.java    |  1 +
 .../apache/pulsar/broker/service/TopicPolicyTestUtils.java |  5 ++++-
 .../systopic/NamespaceEventsSystemTopicServiceTest.java    |  2 ++
 .../pulsar/broker/systopic/PartitionedSystemTopicTest.java |  6 +++++-
 5 files changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 5488d5563f6..f2206f5cada 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -428,7 +428,11 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
             }
             if (hasMore) {
                 reader.readNextAsync().thenAccept(msg -> {
-                    refreshTopicPoliciesCache(msg);
+                    try {
+                        refreshTopicPoliciesCache(msg);
+                    } finally {
+                        msg.release();
+                    }
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Loop next event reading for system 
topic.",
                                 
reader.getSystemTopic().getTopicName().getNamespaceObject());
@@ -505,8 +509,12 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
         reader.readNextAsync()
                 .thenAccept(msg -> {
-                    refreshTopicPoliciesCache(msg);
-                    notifyListener(msg);
+                    try {
+                        refreshTopicPoliciesCache(msg);
+                        notifyListener(msg);
+                    } finally {
+                        msg.release();
+                    }
                 })
                 .whenComplete((__, ex) -> {
                     if (ex == null) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index ea3ac507d11..6ada9db2967 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -73,6 +73,7 @@ public class TopicPoliciesSystemTopicClient extends 
SystemTopicClientBase<Pulsar
                 .subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
                 .startMessageId(MessageId.earliest)
                 .readCompacted(true)
+                .poolMessages(true)
                 .createAsync()
                 .thenApply(reader -> {
                     if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index 9cf688d62ed..d4275cdfd20 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import lombok.Cleanup;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.common.events.PulsarEvent;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -67,7 +68,9 @@ public class TopicPolicyTestUtils {
                 .newReader();
         PulsarEvent event = null;
         while (reader.hasMoreEvents()) {
-            event = reader.readNext().getValue();
+            @Cleanup("release")
+            Message<PulsarEvent> message = reader.readNext();
+            event = message.getValue();
         }
         return Optional.ofNullable(event).map(e -> 
e.getTopicPoliciesEvent().getPolicies());
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index e66140efb32..aaa719515c9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -129,6 +129,7 @@ public class NamespaceEventsSystemTopicServiceTest extends 
MockedPulsarServiceBa
             .build();
         systemTopicClientForNamespace1.newWriter().write(getEventKey(event), 
event);
         SystemTopicClient.Reader reader = 
systemTopicClientForNamespace1.newReader();
+        @Cleanup("release")
         Message<PulsarEvent> received = reader.readNext();
         log.info("Receive pulsar event from system topic : {}", 
received.getValue());
 
@@ -139,6 +140,7 @@ public class NamespaceEventsSystemTopicServiceTest extends 
MockedPulsarServiceBa
 
         // test new reader read
         SystemTopicClient.Reader reader1 = 
systemTopicClientForNamespace1.newReader();
+        @Cleanup("release")
         Message<PulsarEvent> received1 = reader1.readNext();
         log.info("Receive pulsar event from system topic : {}", 
received1.getValue());
         Assert.assertEquals(received1.getValue(), event);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index e7bfa3278e3..e31f78665b3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -338,9 +338,13 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
 
         FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join();
         Assert.assertTrue(reader1.hasMoreEvents());
-        Assert.assertNotNull(reader1.readNext());
+        Message<?> message = reader1.readNext();
+        Assert.assertNotNull(message);
+        message.release();
         Assert.assertTrue(reader2.hasMoreEvents());
+        message = reader2.readNext();
         Assert.assertNotNull(reader2.readNext());
+        message.release();
         reader1.close();
         reader2.close();
         writer1.get().close();

Reply via email to