This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4a93e2503bf [improve][txn] Improve Reader in TransactionBuffer to
reduce GC pressure (#23779)
4a93e2503bf is described below
commit 4a93e2503bf1d890f5d45f97374d37ee575372d3
Author: 道君 <[email protected]>
AuthorDate: Fri Jan 3 16:39:55 2025 +0800
[improve][txn] Improve Reader in TransactionBuffer to reduce GC pressure
(#23779)
---
.../TransactionBufferSnapshotBaseSystemTopicClient.java | 1 +
.../impl/SnapshotSegmentAbortedTxnProcessorImpl.java | 12 ++++++++++--
.../pulsar/broker/transaction/buffer/impl/TableView.java | 14 +++++++++-----
.../broker/transaction/SegmentAbortedTxnProcessorTest.java | 2 ++
.../transaction/TopicTransactionBufferRecoverTest.java | 4 +++-
5 files changed, 25 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
index 4023cd88bef..7ba01b09b27 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
@@ -205,6 +205,7 @@ public class
TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTo
.subscriptionRolePrefix(SystemTopicNames.SYSTEM_READER_PREFIX)
.startMessageId(MessageId.earliest)
.readCompacted(true)
+ .poolMessages(true)
.createAsync()
.thenApply(reader -> {
if (log.isDebugEnabled()) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index f2ff5d519d8..779d083289b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -768,8 +768,16 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
try {
while (wait(reader.hasMoreEventsAsync(), "has more
events")) {
final var message = wait(reader.readNextAsync(),
"read next");
- if
(topic.getName().equals(message.getValue().getTopicName())) {
-
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
+ final String topicName;
+ final String key;
+ try {
+ topicName = message.getValue().getTopicName();
+ key = message.getKey();
+ } finally {
+ message.release();
+ }
+ if (topic.getName().equals(topicName)) {
+
snapshotSegmentsWriter.getFuture().get().write(key, null);
}
}
future.complete(null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
index 7608a393cc9..40adec74884 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -61,12 +61,16 @@ public class TableView<T> {
final var reader = getReader(topic);
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
- if (msg.getKey() != null) {
- if (msg.getValue() != null) {
- snapshots.put(msg.getKey(), msg.getValue());
- } else {
- snapshots.remove(msg.getKey());
+ try {
+ if (msg.getKey() != null) {
+ if (msg.getValue() != null) {
+ snapshots.put(msg.getKey(), msg.getValue());
+ } else {
+ snapshots.remove(msg.getKey());
+ }
}
+ } finally {
+ msg.release();
}
}
return snapshots.get(topic);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index d9ba825f02e..b9f4c4f632c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -348,6 +348,7 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
.createReader(TopicName.get(topic)).get();
int segmentCount = 0;
while (reader.hasMoreEvents()) {
+ @Cleanup("release")
Message<TransactionBufferSnapshotSegment> message =
reader.readNextAsync()
.get(5, TimeUnit.SECONDS);
if (topic.equals(message.getValue().getTopicName())) {
@@ -364,6 +365,7 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
.createReader(TopicName.get(topic)).get();
int indexCount = 0;
while (reader.hasMoreEvents()) {
+ @Cleanup("release")
Message<TransactionBufferSnapshotIndexes> message =
reader.readNextAsync()
.get(5, TimeUnit.SECONDS);
if (topic.equals(message.getValue().getTopicName())) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 14cc813a17d..ccf99936439 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -715,7 +715,9 @@ public class TopicTransactionBufferRecoverTest extends
TransactionTestBase {
indexesWriter.write(SNAPSHOT_INDEX,
transactionBufferTransactionBufferSnapshotIndexes);
assertTrue(indexesReader.hasMoreEvents());
- transactionBufferTransactionBufferSnapshotIndexes =
indexesReader.readNext().getValue();
+ @Cleanup("release")
+ Message<TransactionBufferSnapshotIndexes> message =
indexesReader.readNext();
+ transactionBufferTransactionBufferSnapshotIndexes = message.getValue();
assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getTopicName(),
SNAPSHOT_INDEX);
assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getIndexList().size(),
5);
assertNull(transactionBufferTransactionBufferSnapshotIndexes.getSnapshot());