This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5356b64770f [fix][broker] Fix potential NPE in
InMemTransactionBuffer.appendBufferToTxn by returning a valid Position (#25039)
5356b64770f is described below
commit 5356b64770ff222cd44906a197212503ebe37c41
Author: Ruimin MA <[email protected]>
AuthorDate: Mon Dec 8 00:43:18 2025 +0800
[fix][broker] Fix potential NPE in InMemTransactionBuffer.appendBufferToTxn
by returning a valid Position (#25039)
---
.../buffer/impl/InMemTransactionBuffer.java | 7 +++---
.../buffer/TopicTransactionBufferTest.java | 26 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 4da7a48e96c..55310537b00 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -47,7 +48,7 @@ import
org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
/**
* The in-memory implementation of {@link TransactionBuffer}.
*/
-class InMemTransactionBuffer implements TransactionBuffer {
+public class InMemTransactionBuffer implements TransactionBuffer {
/**
* A class represents the buffer of a transaction.
@@ -269,10 +270,10 @@ class InMemTransactionBuffer implements TransactionBuffer
{
ByteBuf buffer) {
TxnBuffer txnBuffer = getTxnBufferOrCreateIfNotExist(txnId);
- CompletableFuture appendFuture = new CompletableFuture();
+ CompletableFuture<Position> appendFuture = new CompletableFuture<>();
try {
txnBuffer.appendEntry(sequenceId, buffer);
- appendFuture.complete(null);
+ appendFuture.complete(PositionFactory.EARLIEST);
} catch (TransactionBufferException.TransactionSealedException e) {
appendFuture.completeExceptionally(e);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index d76a5a88dbd..82e5473e283 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.RandomUtils;
@@ -50,6 +51,8 @@ import
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBuffer;
+import
org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import
org.apache.pulsar.broker.transaction.buffer.utils.TransactionBufferTestImpl;
@@ -574,4 +577,27 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
byteBuf2.release();
byteBuf3.release();
}
+
+ @Test(timeOut = 10000)
+ public void testAppendBufferToTxnWithInMemTransactionBuffer() throws
Exception {
+ // 1. Prepare test resource
+ this.pulsarServiceList.forEach(pulsarService -> {
+ pulsarService.setTransactionBufferProvider(new
InMemTransactionBufferProvider());
+ });
+ String topic = "persistent://" + NAMESPACE1 +
"/testAppendBufferToTxnWithInMemTransactionBuffer";
+ admin.topics().createNonPartitionedTopic(topic);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsarServiceList.get(0).getBrokerService()
+ .getTopic(topic, false)
+ .get()
+ .get();
+ InMemTransactionBuffer topicTransactionBuffer =
(InMemTransactionBuffer) persistentTopic
+ .getTransactionBuffer();
+ ByteBuf byteBuf = Unpooled.buffer();
+ Position position = topicTransactionBuffer.appendBufferToTxn(new
TxnID(1, 1), 1L, byteBuf)
+ .get(5, TimeUnit.SECONDS);
+ // 2.position should be PositionFactory.EARLIEST with
InMemTransactionBuffer
+ assertEquals(PositionFactory.EARLIEST, position);
+ // 3. release resource
+ byteBuf.release();
+ }
}