This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 74c366c7a9cf8a1ea053d2ce736a0c7f3a82f5e6 Author: Dream95 <[email protected]> AuthorDate: Wed Oct 22 12:58:17 2025 +0800 [fix][broker] Flaky-test: TopicTransactionBufferTest.testMessagePublishInOrder (#24826) Signed-off-by: Dream95 <[email protected]> Co-authored-by: Lari Hotari <[email protected]> (cherry picked from commit 88287345d3246fe0d6ea34e06389356ada516cfa) --- .../transaction/buffer/impl/TopicTransactionBuffer.java | 7 ++++++- .../buffer/utils/TransactionBufferTestImpl.java | 16 ---------------- 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index c43f0ed7fb9..45a2a80d811 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -229,6 +229,11 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen return CompletableFuture.completedFuture(null); } + @VisibleForTesting + public void setPublishFuture(CompletableFuture<Position> publishFuture) { + this.publishFuture = publishFuture; + } + @VisibleForTesting public CompletableFuture<Position> getPublishFuture() { return publishFuture; @@ -295,7 +300,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen "Transaction Buffer recover failed, the current state is: " + getState())); } }).whenComplete(((position, throwable) -> buffer.release())); - publishFuture = future; + setPublishFuture(future); return future; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java index 18724e13f67..b1168d08501 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java @@ -18,36 +18,20 @@ */ package org.apache.pulsar.broker.transaction.buffer.utils; -import java.util.concurrent.CompletableFuture; import lombok.Setter; -import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; public class TransactionBufferTestImpl extends TopicTransactionBuffer { - @Setter - public CompletableFuture<Void> transactionBufferFuture = null; @Setter public State state = null; - @Setter - public CompletableFuture<Position> publishFuture = null; public TransactionBufferTestImpl(PersistentTopic topic) { super(topic); } - @Override - public CompletableFuture<Void> getTransactionBufferFuture() { - return transactionBufferFuture == null ? super.getTransactionBufferFuture() : transactionBufferFuture; - } - @Override public State getState() { return state == null ? super.getState() : state; } - - @Override - public CompletableFuture<Position> getPublishFuture() { - return publishFuture == null ? super.getPublishFuture() : publishFuture; - } }
