This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a6986b1b664 [improve][broker] Improve
SystemTopicBasedTopicPoliciesService reader to reduce GC pressure (#23780)
a6986b1b664 is described below
commit a6986b1b664bf2a8da6e99c2db39c65b08b9f3a4
Author: 道君 <[email protected]>
AuthorDate: Fri Jan 3 16:35:39 2025 +0800
[improve][broker] Improve SystemTopicBasedTopicPoliciesService reader to
reduce GC pressure (#23780)
---
.../service/SystemTopicBasedTopicPoliciesService.java | 14 +++++++++++---
.../broker/systopic/TopicPoliciesSystemTopicClient.java | 1 +
.../apache/pulsar/broker/service/TopicPolicyTestUtils.java | 5 ++++-
.../systopic/NamespaceEventsSystemTopicServiceTest.java | 2 ++
.../pulsar/broker/systopic/PartitionedSystemTopicTest.java | 6 +++++-
5 files changed, 23 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 5488d5563f6..f2206f5cada 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -428,7 +428,11 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
if (hasMore) {
reader.readNextAsync().thenAccept(msg -> {
- refreshTopicPoliciesCache(msg);
+ try {
+ refreshTopicPoliciesCache(msg);
+ } finally {
+ msg.release();
+ }
if (log.isDebugEnabled()) {
log.debug("[{}] Loop next event reading for system
topic.",
reader.getSystemTopic().getTopicName().getNamespaceObject());
@@ -505,8 +509,12 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
reader.readNextAsync()
.thenAccept(msg -> {
- refreshTopicPoliciesCache(msg);
- notifyListener(msg);
+ try {
+ refreshTopicPoliciesCache(msg);
+ notifyListener(msg);
+ } finally {
+ msg.release();
+ }
})
.whenComplete((__, ex) -> {
if (ex == null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index ea3ac507d11..6ada9db2967 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -73,6 +73,7 @@ public class TopicPoliciesSystemTopicClient extends
SystemTopicClientBase<Pulsar
.subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
.startMessageId(MessageId.earliest)
.readCompacted(true)
+ .poolMessages(true)
.createAsync()
.thenApply(reader -> {
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index 9cf688d62ed..d4275cdfd20 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import lombok.Cleanup;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
@@ -67,7 +68,9 @@ public class TopicPolicyTestUtils {
.newReader();
PulsarEvent event = null;
while (reader.hasMoreEvents()) {
- event = reader.readNext().getValue();
+ @Cleanup("release")
+ Message<PulsarEvent> message = reader.readNext();
+ event = message.getValue();
}
return Optional.ofNullable(event).map(e ->
e.getTopicPoliciesEvent().getPolicies());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index e66140efb32..aaa719515c9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -129,6 +129,7 @@ public class NamespaceEventsSystemTopicServiceTest extends
MockedPulsarServiceBa
.build();
systemTopicClientForNamespace1.newWriter().write(getEventKey(event),
event);
SystemTopicClient.Reader reader =
systemTopicClientForNamespace1.newReader();
+ @Cleanup("release")
Message<PulsarEvent> received = reader.readNext();
log.info("Receive pulsar event from system topic : {}",
received.getValue());
@@ -139,6 +140,7 @@ public class NamespaceEventsSystemTopicServiceTest extends
MockedPulsarServiceBa
// test new reader read
SystemTopicClient.Reader reader1 =
systemTopicClientForNamespace1.newReader();
+ @Cleanup("release")
Message<PulsarEvent> received1 = reader1.readNext();
log.info("Receive pulsar event from system topic : {}",
received1.getValue());
Assert.assertEquals(received1.getValue(), event);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index e7bfa3278e3..e31f78665b3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -338,9 +338,13 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join();
Assert.assertTrue(reader1.hasMoreEvents());
- Assert.assertNotNull(reader1.readNext());
+ Message<?> message = reader1.readNext();
+ Assert.assertNotNull(message);
+ message.release();
Assert.assertTrue(reader2.hasMoreEvents());
+ message = reader2.readNext();
Assert.assertNotNull(reader2.readNext());
+ message.release();
reader1.close();
reader2.close();
writer1.get().close();