This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a93e2503bf [improve][txn] Improve Reader in TransactionBuffer to 
reduce GC pressure (#23779)
4a93e2503bf is described below

commit 4a93e2503bf1d890f5d45f97374d37ee575372d3
Author: 道君 <[email protected]>
AuthorDate: Fri Jan 3 16:39:55 2025 +0800

    [improve][txn] Improve Reader in TransactionBuffer to reduce GC pressure 
(#23779)
---
 .../TransactionBufferSnapshotBaseSystemTopicClient.java    |  1 +
 .../impl/SnapshotSegmentAbortedTxnProcessorImpl.java       | 12 ++++++++++--
 .../pulsar/broker/transaction/buffer/impl/TableView.java   | 14 +++++++++-----
 .../broker/transaction/SegmentAbortedTxnProcessorTest.java |  2 ++
 .../transaction/TopicTransactionBufferRecoverTest.java     |  4 +++-
 5 files changed, 25 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
index 4023cd88bef..7ba01b09b27 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
@@ -205,6 +205,7 @@ public class  
TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTo
                 .subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
                 .startMessageId(MessageId.earliest)
                 .readCompacted(true)
+                .poolMessages(true)
                 .createAsync()
                 .thenApply(reader -> {
                     if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index f2ff5d519d8..779d083289b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -768,8 +768,16 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
                     try {
                         while (wait(reader.hasMoreEventsAsync(), "has more 
events")) {
                             final var message = wait(reader.readNextAsync(), 
"read next");
-                            if 
(topic.getName().equals(message.getValue().getTopicName())) {
-                                
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
+                            final String topicName;
+                            final String key;
+                            try {
+                                topicName = message.getValue().getTopicName();
+                                key = message.getKey();
+                            } finally {
+                                message.release();
+                            }
+                            if (topic.getName().equals(topicName)) {
+                                
snapshotSegmentsWriter.getFuture().get().write(key, null);
                             }
                         }
                         future.complete(null);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
index 7608a393cc9..40adec74884 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -61,12 +61,16 @@ public class TableView<T> {
         final var reader = getReader(topic);
         while (wait(reader.hasMoreEventsAsync(), "has more events")) {
             final var msg = wait(reader.readNextAsync(), "read message");
-            if (msg.getKey() != null) {
-                if (msg.getValue() != null) {
-                    snapshots.put(msg.getKey(), msg.getValue());
-                } else {
-                    snapshots.remove(msg.getKey());
+            try {
+                if (msg.getKey() != null) {
+                    if (msg.getValue() != null) {
+                        snapshots.put(msg.getKey(), msg.getValue());
+                    } else {
+                        snapshots.remove(msg.getKey());
+                    }
                 }
+            } finally {
+                msg.release();
             }
         }
         return snapshots.get(topic);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index d9ba825f02e..b9f4c4f632c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -348,6 +348,7 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
                         .createReader(TopicName.get(topic)).get();
         int segmentCount = 0;
         while (reader.hasMoreEvents()) {
+            @Cleanup("release")
             Message<TransactionBufferSnapshotSegment> message = 
reader.readNextAsync()
                     .get(5, TimeUnit.SECONDS);
             if (topic.equals(message.getValue().getTopicName())) {
@@ -364,6 +365,7 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
                         .createReader(TopicName.get(topic)).get();
         int indexCount = 0;
         while (reader.hasMoreEvents()) {
+            @Cleanup("release")
             Message<TransactionBufferSnapshotIndexes> message = 
reader.readNextAsync()
                     .get(5, TimeUnit.SECONDS);
             if (topic.equals(message.getValue().getTopicName())) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 14cc813a17d..ccf99936439 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -715,7 +715,9 @@ public class TopicTransactionBufferRecoverTest extends 
TransactionTestBase {
         indexesWriter.write(SNAPSHOT_INDEX, 
transactionBufferTransactionBufferSnapshotIndexes);
 
         assertTrue(indexesReader.hasMoreEvents());
-        transactionBufferTransactionBufferSnapshotIndexes = 
indexesReader.readNext().getValue();
+        @Cleanup("release")
+        Message<TransactionBufferSnapshotIndexes> message = 
indexesReader.readNext();
+        transactionBufferTransactionBufferSnapshotIndexes = message.getValue();
         
assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getTopicName(), 
SNAPSHOT_INDEX);
         
assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getIndexList().size(),
 5);
         
assertNull(transactionBufferTransactionBufferSnapshotIndexes.getSnapshot());

Reply via email to