This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5356b64770f [fix][broker] Fix potential NPE in 
InMemTransactionBuffer.appendBufferToTxn by returning a valid Position (#25039)
5356b64770f is described below

commit 5356b64770ff222cd44906a197212503ebe37c41
Author: Ruimin MA <[email protected]>
AuthorDate: Mon Dec 8 00:43:18 2025 +0800

    [fix][broker] Fix potential NPE in InMemTransactionBuffer.appendBufferToTxn 
by returning a valid Position (#25039)
---
 .../buffer/impl/InMemTransactionBuffer.java        |  7 +++---
 .../buffer/TopicTransactionBufferTest.java         | 26 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index 4da7a48e96c..55310537b00 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
@@ -47,7 +48,7 @@ import 
org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 /**
  * The in-memory implementation of {@link TransactionBuffer}.
  */
-class InMemTransactionBuffer implements TransactionBuffer {
+public class InMemTransactionBuffer implements TransactionBuffer {
 
     /**
      * A class represents the buffer of a transaction.
@@ -269,10 +270,10 @@ class InMemTransactionBuffer implements TransactionBuffer 
{
                                                      ByteBuf buffer) {
         TxnBuffer txnBuffer = getTxnBufferOrCreateIfNotExist(txnId);
 
-        CompletableFuture appendFuture = new CompletableFuture();
+        CompletableFuture<Position> appendFuture = new CompletableFuture<>();
         try {
             txnBuffer.appendEntry(sequenceId, buffer);
-            appendFuture.complete(null);
+            appendFuture.complete(PositionFactory.EARLIEST);
         } catch (TransactionBufferException.TransactionSealedException e) {
             appendFuture.completeExceptionally(e);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index d76a5a88dbd..82e5473e283 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.RandomUtils;
@@ -50,6 +51,8 @@ import 
org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBuffer;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import 
org.apache.pulsar.broker.transaction.buffer.utils.TransactionBufferTestImpl;
@@ -574,4 +577,27 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         byteBuf2.release();
         byteBuf3.release();
     }
+
+    @Test(timeOut = 10000)
+    public void testAppendBufferToTxnWithInMemTransactionBuffer() throws 
Exception {
+        // 1. Prepare test resource
+        this.pulsarServiceList.forEach(pulsarService ->  {
+            pulsarService.setTransactionBufferProvider(new 
InMemTransactionBufferProvider());
+        });
+        String topic = "persistent://" + NAMESPACE1 + 
"/testAppendBufferToTxnWithInMemTransactionBuffer";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) 
pulsarServiceList.get(0).getBrokerService()
+                .getTopic(topic, false)
+                .get()
+                .get();
+        InMemTransactionBuffer topicTransactionBuffer = 
(InMemTransactionBuffer) persistentTopic
+                .getTransactionBuffer();
+        ByteBuf byteBuf = Unpooled.buffer();
+        Position position = topicTransactionBuffer.appendBufferToTxn(new 
TxnID(1, 1), 1L, byteBuf)
+                .get(5, TimeUnit.SECONDS);
+        // 2.position should be PositionFactory.EARLIEST with 
InMemTransactionBuffer
+        assertEquals(PositionFactory.EARLIEST, position);
+        // 3. release resource
+        byteBuf.release();
+    }
 }

Reply via email to