This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f29ca21976d [fix][broker]Transactional messages can never be sent 
successfully if concurrently taking transaction buffer snapshot (#24945)
f29ca21976d is described below

commit f29ca21976d63a92371785fbdbe712f9f5e54cf2
Author: fengyubiao <[email protected]>
AuthorDate: Tue Nov 11 17:23:45 2025 +0800

    [fix][broker]Transactional messages can never be sent successfully if 
concurrently taking transaction buffer snapshot (#24945)
---
 .../buffer/impl/TopicTransactionBuffer.java        | 194 ++++++++++++++++-----
 .../buffer/impl/TopicTransactionBufferState.java   |  21 ++-
 .../broker/transaction/TransactionConsumeTest.java | 101 +++++++++++
 .../buffer/TopicTransactionBufferTest.java         |  18 +-
 .../buffer/utils/TransactionBufferTestImpl.java    |  15 ++
 5 files changed, 285 insertions(+), 64 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 0c777afaa26..2df6e717981 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 import io.netty.util.Timer;
 import io.netty.util.TimerTask;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -92,9 +93,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     private final CompletableFuture<Void> transactionBufferFuture = new 
CompletableFuture<>();
 
-    private CompletableFuture<Position> publishFuture = 
getTransactionBufferFuture()
-            .thenApply(__ -> PositionFactory.EARLIEST);
-
     /**
      * The map is used to store the lowWaterMarks which key is TC ID and value 
is lowWaterMark of the TC.
      */
@@ -108,6 +106,8 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     private final AbortedTxnProcessor.SnapshotType snapshotType;
     private final MaxReadPositionCallBack maxReadPositionCallBack;
+    /** if the first snapshot is in progress, it will pending following 
publishing tasks. **/
+    private final LinkedList<PendingAppendingTxnBufferTask> 
pendingAppendingTxnBufferTasks = new LinkedList<>();
 
     private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic 
topic) {
         return 
topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled()
@@ -232,16 +232,6 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         return CompletableFuture.completedFuture(null);
     }
 
-    @VisibleForTesting
-    public void setPublishFuture(CompletableFuture<Position> publishFuture) {
-        this.publishFuture = publishFuture;
-    }
-
-    @VisibleForTesting
-    public CompletableFuture<Position> getPublishFuture() {
-        return publishFuture;
-    }
-
     @VisibleForTesting
     public CompletableFuture<Void> getTransactionBufferFuture() {
         return transactionBufferFuture;
@@ -267,47 +257,146 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
         return this.txnCommittedCounter.sum();
     }
 
+    private record PendingAppendingTxnBufferTask(TxnID txnId, long sequenceId, 
ByteBuf buffer,
+                                         CompletableFuture<Position> 
pendingPublishFuture) {
+
+        void fail(Throwable throwable) {
+            buffer.release();
+            pendingPublishFuture.completeExceptionally(throwable);
+        }
+    }
+
     @Override
     public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long 
sequenceId, ByteBuf buffer) {
-        // Method `takeAbortedTxnsSnapshot` will be executed in the different 
thread.
-        // So we need to retain the buffer in this thread. It will be released 
after message persistent.
-        buffer.retain();
-        CompletableFuture<Position> future = 
getPublishFuture().thenCompose(ignore -> {
-            if (checkIfNoSnapshot()) {
-                CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
-                // `publishFuture` will be completed after message persistent, 
so there will not be two threads
-                // writing snapshots at the same time.
-                
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() 
-> {
-                    if (changeToReadyStateFromNoSnapshot()) {
-                        timer.newTimeout(TopicTransactionBuffer.this,
-                                takeSnapshotIntervalTime, 
TimeUnit.MILLISECONDS);
-                        completableFuture.complete(null);
-                    } else {
-                        log.error("[{}]Failed to change state of transaction 
buffer to Ready from NoSnapshot",
-                                topic.getName());
-                        completableFuture.completeExceptionally(new 
BrokerServiceException.ServiceUnitNotReadyException(
-                                "Transaction Buffer take first snapshot 
failed, the current state is: " + getState()));
-                    }
-                }).exceptionally(exception -> {
-                    log.error("Topic {} failed to take snapshot", 
this.topic.getName());
-                    completableFuture.completeExceptionally(exception);
-                    return null;
-                });
-                return completableFuture.thenCompose(__ -> 
internalAppendBufferToTxn(txnId, buffer));
-            } else if (checkIfReady()) {
-                return internalAppendBufferToTxn(txnId, buffer);
-            } else {
-                // `publishFuture` will be completed after transaction buffer 
recover completely
-                // during initializing, so this case should not happen.
+        synchronized (pendingAppendingTxnBufferTasks) {
+            // The first snapshot is in progress, the following publish tasks 
will be pending.
+            if (!pendingAppendingTxnBufferTasks.isEmpty()) {
+                CompletableFuture<Position> res = new CompletableFuture<>();
+                buffer.retain();
+                pendingAppendingTxnBufferTasks.offer(new 
PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
+                return res;
+            }
+
+            // `publishFuture` will be completed after transaction buffer 
recover completely
+            // during initializing, so this case should not happen.
+            if (!checkIfReady() && !checkIfNoSnapshot() && 
!checkIfFirstSnapshotting() && !checkIfInitializing()) {
+                log.error("[{}] unexpected state: {} when try to take the 
first transaction buffer snapshot",
+                        topic.getName(), getState());
                 return FutureUtil.failedFuture(new 
BrokerServiceException.ServiceUnitNotReadyException(
                         "Transaction Buffer recover failed, the current state 
is: " + getState()));
             }
-        }).whenComplete(((position, throwable) -> buffer.release()));
-        setPublishFuture(future);
-        return future;
+
+            // The transaction buffer is ready to write.
+            if (checkIfReady()) {
+                return internalAppendBufferToTxn(txnId, buffer, sequenceId);
+            }
+
+            // Pending the current publishing and trigger new snapshot if 
needed.
+            CompletableFuture<Position> res = new CompletableFuture<>();
+            buffer.retain();
+            pendingAppendingTxnBufferTasks.offer(new 
PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res));
+
+            final java.util.function.Consumer<Throwable> failPendingTasks = 
throwable -> {
+                synchronized (pendingAppendingTxnBufferTasks) {
+                    PendingAppendingTxnBufferTask pendingTask = null;
+                    while ((pendingTask = 
pendingAppendingTxnBufferTasks.poll()) != null) {
+                        pendingTask.fail(throwable);
+                    }
+                }
+            };
+
+            final Runnable flushPendingTasks = () -> {
+                PendingAppendingTxnBufferTask pendingTask = null;
+                try {
+                    synchronized (pendingAppendingTxnBufferTasks) {
+                        while ((pendingTask = 
pendingAppendingTxnBufferTasks.poll()) != null) {
+                            final ByteBuf data = pendingTask.buffer;
+                            final CompletableFuture<Position> pendingFuture =
+                                    pendingTask.pendingPublishFuture;
+                            internalAppendBufferToTxn(pendingTask.txnId, 
pendingTask.buffer,
+                                    pendingTask.sequenceId)
+                                    .whenComplete((positionAdded, ex3) -> {
+                                        data.release();
+                                        if (ex3 != null) {
+                                            
pendingFuture.completeExceptionally(ex3);
+                                            return;
+                                        }
+                                        pendingFuture.complete(positionAdded);
+                                    });
+                        }
+                    }
+                } catch (Exception e) {
+                    // If there are some error when adding entries or caching 
entries, this log will be printed.
+                    log.error("[{}] Failed to flush pending publishing 
requests after taking the first"
+                                    + " snapshot.",
+                            topic.getName(), e);
+                    if (pendingTask != null) {
+                        pendingTask.fail(e);
+                    }
+                    failPendingTasks.accept(e);
+                }
+            };
+
+            // Trigger the first snapshot.
+            transactionBufferFuture.whenComplete((ignore1, ex1) -> {
+                if (ex1 != null) {
+                    log.error("[{}] Transaction buffer recover failed", 
topic.getName(), ex1);
+                    failPendingTasks.accept(ex1);
+                    return;
+                }
+                if (changeToFirstSnapshotting()) {
+                    log.info("[{}] Start to take the first snapshot", 
topic.getName());
+                    // Flush pending publishing after the first snapshot 
finished.
+                    takeFirstSnapshot().whenComplete((ignore2, ex2) -> {
+                        if (ex2 != null) {
+                            log.error("[{}] Failed to take the first snapshot, 
flushing failed publishing requests",
+                                    topic.getName(), ex2);
+                            failPendingTasks.accept(ex2);
+                            return;
+                        }
+                        log.info("[{}] Finished to take the first snapshot, 
flushing publishing {} requests",
+                                topic.getName(), 
pendingAppendingTxnBufferTasks.size());
+                        flushPendingTasks.run();
+                    });
+                } else if (checkIfReady()) {
+                    log.info("[{}] No need to take the first snapshot, 
flushing publishing {} requests",
+                            topic.getName(), 
pendingAppendingTxnBufferTasks.size());
+                    flushPendingTasks.run();
+                } else {
+                    log.error("[{}] Transaction buffer recover failed, current 
state is {}", topic.getName(),
+                            getState());
+                    failPendingTasks.accept(new 
BrokerServiceException.ServiceUnitNotReadyException(
+                            "Transaction Buffer recover failed, the current 
state is: " + getState()));
+                }
+            });
+            return res;
+        }
+    }
+
+    private CompletableFuture<Void> takeFirstSnapshot() {
+        CompletableFuture<Void> firstSnapshottingFuture = new 
CompletableFuture<>();
+        
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() 
-> {
+            if (changeToReadyStateFromNoSnapshot()) {
+                timer.newTimeout(TopicTransactionBuffer.this,
+                        takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+                firstSnapshottingFuture.complete(null);
+            } else {
+                log.error("[{}]Failed to change state of transaction buffer to 
Ready from NoSnapshot",
+                        topic.getName());
+                firstSnapshottingFuture.completeExceptionally(new 
BrokerServiceException
+                        .ServiceUnitNotReadyException(
+                        "Transaction Buffer take first snapshot failed, the 
current state is: " + getState()));
+            }
+        }).exceptionally(exception -> {
+            log.error("Topic {} failed to take snapshot", 
this.topic.getName());
+            firstSnapshottingFuture.completeExceptionally(exception);
+            return null;
+        });
+        return firstSnapshottingFuture;
     }
 
-    private CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId, 
ByteBuf buffer) {
+    @VisibleForTesting
+    protected CompletableFuture<Position> internalAppendBufferToTxn(TxnID 
txnId, ByteBuf buffer, long seq) {
         CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
         Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits());
         if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) {
@@ -550,7 +639,16 @@ public class TopicTransactionBuffer extends 
TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        changeToCloseState();
+        synchronized (pendingAppendingTxnBufferTasks) {
+            if (!checkIfClosed()) {
+                PendingAppendingTxnBufferTask pendingTask = null;
+                Throwable t = new 
BrokerServiceException.ServiceUnitNotReadyException("Topic is closed");
+                while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) 
!= null) {
+                    pendingTask.fail(t);
+                }
+            }
+            changeToCloseState();
+        }
         return this.snapshotAbortedTxnProcessor.closeAsync();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
index 92ab1d07b69..9a8f2041bf4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java
@@ -33,7 +33,8 @@ public abstract class TopicTransactionBufferState {
         Initializing,
         Ready,
         Close,
-        NoSnapshot
+        NoSnapshot,
+        FirstSnapshotting
     }
 
     private static final 
AtomicReferenceFieldUpdater<TopicTransactionBufferState, State> STATE_UPDATER =
@@ -59,13 +60,25 @@ public abstract class TopicTransactionBufferState {
     }
 
     protected boolean changeToReadyStateFromNoSnapshot() {
-        return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, 
State.Ready);
+        return STATE_UPDATER.compareAndSet(this, State.FirstSnapshotting, 
State.Ready);
+    }
+
+    protected boolean changeToFirstSnapshotting() {
+        return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, 
State.FirstSnapshotting);
     }
 
     protected void changeToCloseState() {
         STATE_UPDATER.set(this, State.Close);
     }
 
+    public boolean checkIfInitializing() {
+        return STATE_UPDATER.get(this) == State.Initializing;
+    }
+
+    public boolean checkIfFirstSnapshotting() {
+        return STATE_UPDATER.get(this) == State.FirstSnapshotting;
+    }
+
     public boolean checkIfReady() {
         return STATE_UPDATER.get(this) == State.Ready;
     }
@@ -74,6 +87,10 @@ public abstract class TopicTransactionBufferState {
         return STATE_UPDATER.get(this) == State.NoSnapshot;
     }
 
+    public boolean checkIfClosed() {
+        return STATE_UPDATER.get(this) == State.Close;
+    }
+
     public State getState() {
         return STATE_UPDATER.get(this);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index a7e2aac5174..16ce35214dc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -19,7 +19,10 @@
 package org.apache.pulsar.broker.transaction;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
@@ -28,18 +31,23 @@ import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -62,6 +70,7 @@ import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -417,4 +426,96 @@ public class TransactionConsumeTest extends 
TransactionTestBase {
         
Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName)
                 .getUnackedMessages(), 0);
     }
+
+    @DataProvider
+    public Object[][] doCommitTxn() {
+        return new Object[][] {
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "doCommitTxn", timeOut = 60_000, invocationCount = 3)
+    public void testFirstTnxBufferSnapshotAndRecoveryConcurrently(boolean 
doCommitTxn) throws Exception {
+        String topic = 
BrokerTestUtil.newUniqueName("persistent://public/txn/tp");
+        // Create many clients and publish with transaction, which will 
trigger transaction buffer snapshot
+        // concurrently.
+        int producerCount = 10;
+        List<PulsarClient> clientList = new ArrayList<>();
+        List<Producer<String>> producerList = new ArrayList<>();
+        List<CompletableFuture<MessageId>> sendResults = new ArrayList<>();
+        List<Transaction> pendingTnxList = new ArrayList<>();
+        for (int i = 0; i < producerCount; i++) {
+            clientList.add(PulsarClient.builder()
+                    .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                    .enableTransaction(true)
+                    .build());
+        }
+        for (int i = 0; i < producerCount; i++) {
+            
producerList.add(clientList.get(i).newProducer(Schema.STRING).topic(topic).create());
+        }
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionName("s1").subscribe();
+        for (int i = 0; i < producerCount; i++) {
+            Transaction transaction = clientList.get(i).newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.HOURS)
+                    .build().get();
+            pendingTnxList.add(transaction);
+            final int index = i;
+            Producer<String> producer = producerList.get(i);
+            new Thread(() -> {
+                sendResults.add(producer.newMessage(transaction).value(index + 
"").sendAsync());
+            }).start();
+        }
+
+        // Verify that the transaction buffer snapshot succeed.
+        AtomicReference<TopicTransactionBuffer> topicTransactionBuffer = new 
AtomicReference<>();
+        for (PulsarService pulsar : pulsarServiceList) {
+            if (pulsar.getBrokerService().getTopics().containsKey(topic)) {
+                PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                        .getTopic(topic, false).get().get();
+                topicTransactionBuffer.set((TopicTransactionBuffer) 
persistentTopic.getTransactionBuffer());
+                break;
+            }
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(topicTransactionBuffer.get());
+            assertEquals(topicTransactionBuffer.get().getState().toString(), 
"Ready");
+            
assertTrue(topicTransactionBuffer.get().getTransactionBufferFuture().isDone());
+            
assertFalse(topicTransactionBuffer.get().getTransactionBufferFuture().isCompletedExceptionally());
+        });
+
+        // Verify that all messages are sent successfully.
+        for (int i = 0; i < producerCount; i++) {
+            sendResults.get(i).get();
+            if (doCommitTxn) {
+                pendingTnxList.get(i).commit();
+            } else {
+                pendingTnxList.get(i).abort();
+            }
+        }
+        Set<String> msgReceived = new HashSet<>();
+        while (true) {
+            Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            msgReceived.add(msg.getValue());
+        }
+        if (doCommitTxn) {
+            for (int i = 0; i < producerCount; i++) {
+                assertTrue(msgReceived.contains(i + ""));
+            }
+        } else {
+            assertTrue(msgReceived.isEmpty());
+        }
+
+        // cleanup.
+        consumer.close();
+        for (int i = 0; i < producerCount; i++) {
+            producerList.get(i).close();
+            clientList.get(i).close();
+        }
+        admin.topics().delete(topic, false);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 5a54b37a637..d76a5a88dbd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.RandomUtils;
@@ -69,7 +68,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -513,14 +511,6 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
                 .withTransactionTimeout(5, TimeUnit.HOURS)
                 .build().get();
 
-        // 2. Set a new future in transaction buffer as 
`transactionBufferFuture` to simulate whether the
-        // transaction buffer recover completely.
-        TransactionBufferTestImpl topicTransactionBuffer = 
(TransactionBufferTestImpl) persistentTopic
-                .getTransactionBuffer();
-        CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
-        CompletableFuture<Position> originalFuture = 
topicTransactionBuffer.getPublishFuture();
-        topicTransactionBuffer.setPublishFuture(completableFuture);
-        
topicTransactionBuffer.setState(TopicTransactionBufferState.State.Ready);
         // Register this topic to the transaction in advance to avoid the 
sending request pending here.
         ((TransactionImpl) transaction).registerProducedTopic(topic).get(5, 
TimeUnit.SECONDS);
         // 3. Test the messages sent before transaction buffer ready is in 
order.
@@ -528,7 +518,6 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
             producer.newMessage(transaction).value(i).sendAsync();
         }
         // 4. Test the messages sent after transaction buffer ready is in 
order.
-        completableFuture.complete(originalFuture.get());
         for (int i = 50; i < 100; i++) {
             producer.newMessage(transaction).value(i).sendAsync();
         }
@@ -569,16 +558,17 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
                 .get(5, TimeUnit.SECONDS);
         Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(byteBuf2.refCnt(), 1));
         // 2.3 Test sending message failed.
-        topicTransactionBuffer.setPublishFuture(FutureUtil.failedFuture(new 
Exception("fail")));
+        topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(true);
         ByteBuf byteBuf3 = Unpooled.buffer();
         try {
             topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, 
byteBuf1)
                     .get(5, TimeUnit.SECONDS);
-            fail();
+            fail("this appending should fail because we injected an error");
         } catch (Exception e) {
-            assertEquals(e.getCause().getMessage(), "fail");
+            assertEquals(e.getCause().getMessage(), "failed because an 
injected error for test");
         }
         Awaitility.await().untilAsserted(() -> 
Assert.assertEquals(byteBuf3.refCnt(), 1));
+        
topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(false);
         // 3. release resource
         byteBuf1.release();
         byteBuf2.release();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
index b1168d08501..f1a003ff194 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
@@ -18,14 +18,21 @@
  */
 package org.apache.pulsar.broker.transaction.buffer.utils;
 
+import io.netty.buffer.ByteBuf;
+import java.util.concurrent.CompletableFuture;
 import lombok.Setter;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.client.api.transaction.TxnID;
 
 public class TransactionBufferTestImpl extends TopicTransactionBuffer {
     @Setter
     public State state = null;
 
+    @Setter
+    private boolean followingInternalAppendBufferToTxnFail;
+
     public TransactionBufferTestImpl(PersistentTopic topic) {
         super(topic);
     }
@@ -34,4 +41,12 @@ public class TransactionBufferTestImpl extends 
TopicTransactionBuffer {
     public State getState() {
         return state == null ? super.getState() : state;
     }
+
+    @Override
+    protected CompletableFuture<Position> internalAppendBufferToTxn(TxnID 
txnId, ByteBuf buffer, long seq) {
+        if (followingInternalAppendBufferToTxnFail) {
+            return CompletableFuture.failedFuture(new RuntimeException("failed 
because an injected error for test"));
+        }
+        return super.internalAppendBufferToTxn(txnId, buffer, seq);
+    }
 }

Reply via email to