This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0a2772c1cfa2cb572a8b29ebb2335bcfc3ac4bd9 Author: fengyubiao <[email protected]> AuthorDate: Tue Nov 11 17:23:45 2025 +0800 [fix][broker]Transactional messages can never be sent successfully if concurrently taking transaction buffer snapshot (#24945) (cherry picked from commit f29ca21976d63a92371785fbdbe712f9f5e54cf2) --- .../buffer/impl/TopicTransactionBuffer.java | 194 ++++++++++++++++----- .../buffer/impl/TopicTransactionBufferState.java | 21 ++- .../broker/transaction/TransactionConsumeTest.java | 101 +++++++++++ .../buffer/TopicTransactionBufferTest.java | 18 +- .../buffer/utils/TransactionBufferTestImpl.java | 15 ++ 5 files changed, 285 insertions(+), 64 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 0c777afaa26..2df6e717981 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -92,9 +93,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture<>(); - private CompletableFuture<Position> publishFuture = getTransactionBufferFuture() - .thenApply(__ -> PositionFactory.EARLIEST); - /** * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC. */ @@ -108,6 +106,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen private final AbortedTxnProcessor.SnapshotType snapshotType; private final MaxReadPositionCallBack maxReadPositionCallBack; + /** if the first snapshot is in progress, it will pending following publishing tasks. **/ + private final LinkedList<PendingAppendingTxnBufferTask> pendingAppendingTxnBufferTasks = new LinkedList<>(); private static AbortedTxnProcessor createSnapshotProcessor(PersistentTopic topic) { return topic.getBrokerService().getPulsar().getConfiguration().isTransactionBufferSegmentedSnapshotEnabled() @@ -232,16 +232,6 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen return CompletableFuture.completedFuture(null); } - @VisibleForTesting - public void setPublishFuture(CompletableFuture<Position> publishFuture) { - this.publishFuture = publishFuture; - } - - @VisibleForTesting - public CompletableFuture<Position> getPublishFuture() { - return publishFuture; - } - @VisibleForTesting public CompletableFuture<Void> getTransactionBufferFuture() { return transactionBufferFuture; @@ -267,47 +257,146 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen return this.txnCommittedCounter.sum(); } + private record PendingAppendingTxnBufferTask(TxnID txnId, long sequenceId, ByteBuf buffer, + CompletableFuture<Position> pendingPublishFuture) { + + void fail(Throwable throwable) { + buffer.release(); + pendingPublishFuture.completeExceptionally(throwable); + } + } + @Override public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { - // Method `takeAbortedTxnsSnapshot` will be executed in the different thread. - // So we need to retain the buffer in this thread. It will be released after message persistent. - buffer.retain(); - CompletableFuture<Position> future = getPublishFuture().thenCompose(ignore -> { - if (checkIfNoSnapshot()) { - CompletableFuture<Void> completableFuture = new CompletableFuture<>(); - // `publishFuture` will be completed after message persistent, so there will not be two threads - // writing snapshots at the same time. - snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { - if (changeToReadyStateFromNoSnapshot()) { - timer.newTimeout(TopicTransactionBuffer.this, - takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); - completableFuture.complete(null); - } else { - log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", - topic.getName()); - completableFuture.completeExceptionally(new BrokerServiceException.ServiceUnitNotReadyException( - "Transaction Buffer take first snapshot failed, the current state is: " + getState())); - } - }).exceptionally(exception -> { - log.error("Topic {} failed to take snapshot", this.topic.getName()); - completableFuture.completeExceptionally(exception); - return null; - }); - return completableFuture.thenCompose(__ -> internalAppendBufferToTxn(txnId, buffer)); - } else if (checkIfReady()) { - return internalAppendBufferToTxn(txnId, buffer); - } else { - // `publishFuture` will be completed after transaction buffer recover completely - // during initializing, so this case should not happen. + synchronized (pendingAppendingTxnBufferTasks) { + // The first snapshot is in progress, the following publish tasks will be pending. + if (!pendingAppendingTxnBufferTasks.isEmpty()) { + CompletableFuture<Position> res = new CompletableFuture<>(); + buffer.retain(); + pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); + return res; + } + + // `publishFuture` will be completed after transaction buffer recover completely + // during initializing, so this case should not happen. + if (!checkIfReady() && !checkIfNoSnapshot() && !checkIfFirstSnapshotting() && !checkIfInitializing()) { + log.error("[{}] unexpected state: {} when try to take the first transaction buffer snapshot", + topic.getName(), getState()); return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException( "Transaction Buffer recover failed, the current state is: " + getState())); } - }).whenComplete(((position, throwable) -> buffer.release())); - setPublishFuture(future); - return future; + + // The transaction buffer is ready to write. + if (checkIfReady()) { + return internalAppendBufferToTxn(txnId, buffer, sequenceId); + } + + // Pending the current publishing and trigger new snapshot if needed. + CompletableFuture<Position> res = new CompletableFuture<>(); + buffer.retain(); + pendingAppendingTxnBufferTasks.offer(new PendingAppendingTxnBufferTask(txnId, sequenceId, buffer, res)); + + final java.util.function.Consumer<Throwable> failPendingTasks = throwable -> { + synchronized (pendingAppendingTxnBufferTasks) { + PendingAppendingTxnBufferTask pendingTask = null; + while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { + pendingTask.fail(throwable); + } + } + }; + + final Runnable flushPendingTasks = () -> { + PendingAppendingTxnBufferTask pendingTask = null; + try { + synchronized (pendingAppendingTxnBufferTasks) { + while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { + final ByteBuf data = pendingTask.buffer; + final CompletableFuture<Position> pendingFuture = + pendingTask.pendingPublishFuture; + internalAppendBufferToTxn(pendingTask.txnId, pendingTask.buffer, + pendingTask.sequenceId) + .whenComplete((positionAdded, ex3) -> { + data.release(); + if (ex3 != null) { + pendingFuture.completeExceptionally(ex3); + return; + } + pendingFuture.complete(positionAdded); + }); + } + } + } catch (Exception e) { + // If there are some error when adding entries or caching entries, this log will be printed. + log.error("[{}] Failed to flush pending publishing requests after taking the first" + + " snapshot.", + topic.getName(), e); + if (pendingTask != null) { + pendingTask.fail(e); + } + failPendingTasks.accept(e); + } + }; + + // Trigger the first snapshot. + transactionBufferFuture.whenComplete((ignore1, ex1) -> { + if (ex1 != null) { + log.error("[{}] Transaction buffer recover failed", topic.getName(), ex1); + failPendingTasks.accept(ex1); + return; + } + if (changeToFirstSnapshotting()) { + log.info("[{}] Start to take the first snapshot", topic.getName()); + // Flush pending publishing after the first snapshot finished. + takeFirstSnapshot().whenComplete((ignore2, ex2) -> { + if (ex2 != null) { + log.error("[{}] Failed to take the first snapshot, flushing failed publishing requests", + topic.getName(), ex2); + failPendingTasks.accept(ex2); + return; + } + log.info("[{}] Finished to take the first snapshot, flushing publishing {} requests", + topic.getName(), pendingAppendingTxnBufferTasks.size()); + flushPendingTasks.run(); + }); + } else if (checkIfReady()) { + log.info("[{}] No need to take the first snapshot, flushing publishing {} requests", + topic.getName(), pendingAppendingTxnBufferTasks.size()); + flushPendingTasks.run(); + } else { + log.error("[{}] Transaction buffer recover failed, current state is {}", topic.getName(), + getState()); + failPendingTasks.accept(new BrokerServiceException.ServiceUnitNotReadyException( + "Transaction Buffer recover failed, the current state is: " + getState())); + } + }); + return res; + } + } + + private CompletableFuture<Void> takeFirstSnapshot() { + CompletableFuture<Void> firstSnapshottingFuture = new CompletableFuture<>(); + snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(() -> { + if (changeToReadyStateFromNoSnapshot()) { + timer.newTimeout(TopicTransactionBuffer.this, + takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); + firstSnapshottingFuture.complete(null); + } else { + log.error("[{}]Failed to change state of transaction buffer to Ready from NoSnapshot", + topic.getName()); + firstSnapshottingFuture.completeExceptionally(new BrokerServiceException + .ServiceUnitNotReadyException( + "Transaction Buffer take first snapshot failed, the current state is: " + getState())); + } + }).exceptionally(exception -> { + log.error("Topic {} failed to take snapshot", this.topic.getName()); + firstSnapshottingFuture.completeExceptionally(exception); + return null; + }); + return firstSnapshottingFuture; } - private CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer) { + @VisibleForTesting + protected CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer, long seq) { CompletableFuture<Position> completableFuture = new CompletableFuture<>(); Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits()); if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) { @@ -550,7 +639,16 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen @Override public CompletableFuture<Void> closeAsync() { - changeToCloseState(); + synchronized (pendingAppendingTxnBufferTasks) { + if (!checkIfClosed()) { + PendingAppendingTxnBufferTask pendingTask = null; + Throwable t = new BrokerServiceException.ServiceUnitNotReadyException("Topic is closed"); + while ((pendingTask = pendingAppendingTxnBufferTasks.poll()) != null) { + pendingTask.fail(t); + } + } + changeToCloseState(); + } return this.snapshotAbortedTxnProcessor.closeAsync(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java index 92ab1d07b69..9a8f2041bf4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferState.java @@ -33,7 +33,8 @@ public abstract class TopicTransactionBufferState { Initializing, Ready, Close, - NoSnapshot + NoSnapshot, + FirstSnapshotting } private static final AtomicReferenceFieldUpdater<TopicTransactionBufferState, State> STATE_UPDATER = @@ -59,13 +60,25 @@ public abstract class TopicTransactionBufferState { } protected boolean changeToReadyStateFromNoSnapshot() { - return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, State.Ready); + return STATE_UPDATER.compareAndSet(this, State.FirstSnapshotting, State.Ready); + } + + protected boolean changeToFirstSnapshotting() { + return STATE_UPDATER.compareAndSet(this, State.NoSnapshot, State.FirstSnapshotting); } protected void changeToCloseState() { STATE_UPDATER.set(this, State.Close); } + public boolean checkIfInitializing() { + return STATE_UPDATER.get(this) == State.Initializing; + } + + public boolean checkIfFirstSnapshotting() { + return STATE_UPDATER.get(this) == State.FirstSnapshotting; + } + public boolean checkIfReady() { return STATE_UPDATER.get(this) == State.Ready; } @@ -74,6 +87,10 @@ public abstract class TopicTransactionBufferState { return STATE_UPDATER.get(this) == State.NoSnapshot; } + public boolean checkIfClosed() { + return STATE_UPDATER.get(this) == State.Close; + } + public State getState() { return STATE_UPDATER.get(this); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java index a7e2aac5174..16ce35214dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java @@ -19,7 +19,10 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; @@ -28,18 +31,23 @@ import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -62,6 +70,7 @@ import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -417,4 +426,96 @@ public class TransactionConsumeTest extends TransactionTestBase { Assert.assertEquals(admin.topics().getStats(CONSUME_TOPIC).getSubscriptions().get(subName) .getUnackedMessages(), 0); } + + @DataProvider + public Object[][] doCommitTxn() { + return new Object[][] { + {true}, + {false} + }; + } + + @Test(dataProvider = "doCommitTxn", timeOut = 60_000, invocationCount = 3) + public void testFirstTnxBufferSnapshotAndRecoveryConcurrently(boolean doCommitTxn) throws Exception { + String topic = BrokerTestUtil.newUniqueName("persistent://public/txn/tp"); + // Create many clients and publish with transaction, which will trigger transaction buffer snapshot + // concurrently. + int producerCount = 10; + List<PulsarClient> clientList = new ArrayList<>(); + List<Producer<String>> producerList = new ArrayList<>(); + List<CompletableFuture<MessageId>> sendResults = new ArrayList<>(); + List<Transaction> pendingTnxList = new ArrayList<>(); + for (int i = 0; i < producerCount; i++) { + clientList.add(PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .enableTransaction(true) + .build()); + } + for (int i = 0; i < producerCount; i++) { + producerList.add(clientList.get(i).newProducer(Schema.STRING).topic(topic).create()); + } + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName("s1").subscribe(); + for (int i = 0; i < producerCount; i++) { + Transaction transaction = clientList.get(i).newTransaction() + .withTransactionTimeout(5, TimeUnit.HOURS) + .build().get(); + pendingTnxList.add(transaction); + final int index = i; + Producer<String> producer = producerList.get(i); + new Thread(() -> { + sendResults.add(producer.newMessage(transaction).value(index + "").sendAsync()); + }).start(); + } + + // Verify that the transaction buffer snapshot succeed. + AtomicReference<TopicTransactionBuffer> topicTransactionBuffer = new AtomicReference<>(); + for (PulsarService pulsar : pulsarServiceList) { + if (pulsar.getBrokerService().getTopics().containsKey(topic)) { + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topic, false).get().get(); + topicTransactionBuffer.set((TopicTransactionBuffer) persistentTopic.getTransactionBuffer()); + break; + } + } + Awaitility.await().untilAsserted(() -> { + assertNotNull(topicTransactionBuffer.get()); + assertEquals(topicTransactionBuffer.get().getState().toString(), "Ready"); + assertTrue(topicTransactionBuffer.get().getTransactionBufferFuture().isDone()); + assertFalse(topicTransactionBuffer.get().getTransactionBufferFuture().isCompletedExceptionally()); + }); + + // Verify that all messages are sent successfully. + for (int i = 0; i < producerCount; i++) { + sendResults.get(i).get(); + if (doCommitTxn) { + pendingTnxList.get(i).commit(); + } else { + pendingTnxList.get(i).abort(); + } + } + Set<String> msgReceived = new HashSet<>(); + while (true) { + Message<String> msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg == null) { + break; + } + msgReceived.add(msg.getValue()); + } + if (doCommitTxn) { + for (int i = 0; i < producerCount; i++) { + assertTrue(msgReceived.contains(i + "")); + } + } else { + assertTrue(msgReceived.isEmpty()); + } + + // cleanup. + consumer.close(); + for (int i = 0; i < producerCount; i++) { + producerList.get(i).close(); + clientList.get(i).close(); + } + admin.topics().delete(topic, false); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 5a54b37a637..d76a5a88dbd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -38,7 +38,6 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.commons.lang3.RandomUtils; @@ -69,7 +68,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -513,14 +511,6 @@ public class TopicTransactionBufferTest extends TransactionTestBase { .withTransactionTimeout(5, TimeUnit.HOURS) .build().get(); - // 2. Set a new future in transaction buffer as `transactionBufferFuture` to simulate whether the - // transaction buffer recover completely. - TransactionBufferTestImpl topicTransactionBuffer = (TransactionBufferTestImpl) persistentTopic - .getTransactionBuffer(); - CompletableFuture<Position> completableFuture = new CompletableFuture<>(); - CompletableFuture<Position> originalFuture = topicTransactionBuffer.getPublishFuture(); - topicTransactionBuffer.setPublishFuture(completableFuture); - topicTransactionBuffer.setState(TopicTransactionBufferState.State.Ready); // Register this topic to the transaction in advance to avoid the sending request pending here. ((TransactionImpl) transaction).registerProducedTopic(topic).get(5, TimeUnit.SECONDS); // 3. Test the messages sent before transaction buffer ready is in order. @@ -528,7 +518,6 @@ public class TopicTransactionBufferTest extends TransactionTestBase { producer.newMessage(transaction).value(i).sendAsync(); } // 4. Test the messages sent after transaction buffer ready is in order. - completableFuture.complete(originalFuture.get()); for (int i = 50; i < 100; i++) { producer.newMessage(transaction).value(i).sendAsync(); } @@ -569,16 +558,17 @@ public class TopicTransactionBufferTest extends TransactionTestBase { .get(5, TimeUnit.SECONDS); Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf2.refCnt(), 1)); // 2.3 Test sending message failed. - topicTransactionBuffer.setPublishFuture(FutureUtil.failedFuture(new Exception("fail"))); + topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(true); ByteBuf byteBuf3 = Unpooled.buffer(); try { topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1) .get(5, TimeUnit.SECONDS); - fail(); + fail("this appending should fail because we injected an error"); } catch (Exception e) { - assertEquals(e.getCause().getMessage(), "fail"); + assertEquals(e.getCause().getMessage(), "failed because an injected error for test"); } Awaitility.await().untilAsserted(() -> Assert.assertEquals(byteBuf3.refCnt(), 1)); + topicTransactionBuffer.setFollowingInternalAppendBufferToTxnFail(false); // 3. release resource byteBuf1.release(); byteBuf2.release(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java index b1168d08501..f1a003ff194 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java @@ -18,14 +18,21 @@ */ package org.apache.pulsar.broker.transaction.buffer.utils; +import io.netty.buffer.ByteBuf; +import java.util.concurrent.CompletableFuture; import lombok.Setter; +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; +import org.apache.pulsar.client.api.transaction.TxnID; public class TransactionBufferTestImpl extends TopicTransactionBuffer { @Setter public State state = null; + @Setter + private boolean followingInternalAppendBufferToTxnFail; + public TransactionBufferTestImpl(PersistentTopic topic) { super(topic); } @@ -34,4 +41,12 @@ public class TransactionBufferTestImpl extends TopicTransactionBuffer { public State getState() { return state == null ? super.getState() : state; } + + @Override + protected CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId, ByteBuf buffer, long seq) { + if (followingInternalAppendBufferToTxnFail) { + return CompletableFuture.failedFuture(new RuntimeException("failed because an injected error for test")); + } + return super.internalAppendBufferToTxn(txnId, buffer, seq); + } }
