This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e2bbb4b7cbc [improve][txn] Take first snapshot before persisting the
first transactional message (#21406)
e2bbb4b7cbc is described below
commit e2bbb4b7cbc5eb6196e8a11f1d7cdbdad20ce4b4
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Aug 29 10:58:38 2024 +0800
[improve][txn] Take first snapshot before persisting the first
transactional message (#21406)
### Motivation
The decision to write a snapshot before the first transaction message
instead of before building the producer, is based on the fact that only the act
of writing transactional messages signifies the use of the transaction buffer.
Furthermore, it is only appropriate to schedule snapshot updates after this
point.
Otherwise, it will add a lot of unnecessary IO read and write operations
and increase the delay of topic load.
---
**Scenario**
* 1000 topics under namespace1.
* Client1 enables transaction and sends transaction messages to topic 1.
* Client1 sends normal messages to topic 2~500.
* Client2 disables transaction and sends messages to topic 501~1000.
**Internal Behavior**
* Topic 1~500 will start the 500 task to write snapshots into the same
system topic, e.g., system topic 1.
* All the topics (1~1000) will read this system topic 1 when topic loading.
### Modifications
This Pull Request aims to resolve the unnecessary write operation. Starting
to write snapshots when sending first transaction messages instead of building
producer.
---
.../apache/pulsar/broker/service/ServerCnx.java | 89 ++++------
.../org/apache/pulsar/broker/service/Topic.java | 9 +-
.../service/nonpersistent/NonPersistentTopic.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 4 +-
.../transaction/buffer/TransactionBuffer.java | 11 +-
.../buffer/impl/InMemTransactionBuffer.java | 2 +-
.../buffer/impl/TopicTransactionBuffer.java | 98 +++++++----
.../buffer/impl/TransactionBufferDisable.java | 2 +-
.../pulsar/broker/transaction/TransactionTest.java | 13 +-
.../buffer/TopicTransactionBufferTest.java | 193 ++++++++++++++++++++-
.../buffer/TransactionStablePositionTest.java | 9 +
.../buffer/utils/TransactionBufferTestImpl.java | 54 ++++++
.../utils/TransactionBufferTestProvider.java | 33 ++++
13 files changed, 404 insertions(+), 115 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d1fe9776e07..a5c09d28923 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1616,66 +1616,53 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
schemaVersionFuture.thenAccept(schemaVersion -> {
-
topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future
-> {
- CompletionStage<Subscription> createInitSubFuture;
- if (!Strings.isNullOrEmpty(initialSubscriptionName)
- && topic.isPersistent()
- &&
!topic.getSubscriptions().containsKey(initialSubscriptionName)) {
- createInitSubFuture =
service.isAllowAutoSubscriptionCreationAsync(topicName)
-
.thenCompose(isAllowAutoSubscriptionCreation -> {
- if
(!isAllowAutoSubscriptionCreation) {
- return
CompletableFuture.failedFuture(
- new
BrokerServiceException.NotAllowedException(
- "Could not create the
initial subscription due to"
- + " the auto
subscription creation is not allowed."));
- }
- return
topic.createSubscription(initialSubscriptionName,
- InitialPosition.Earliest,
false, null);
- });
- } else {
- createInitSubFuture =
CompletableFuture.completedFuture(null);
- }
-
- createInitSubFuture.whenComplete((sub, ex) -> {
- if (ex != null) {
- final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
- if (rc instanceof
BrokerServiceException.NotAllowedException) {
- log.warn("[{}] {}
initialSubscriptionName: {}, topic: {}",
- remoteAddress,
rc.getMessage(), initialSubscriptionName, topicName);
- if
(producerFuture.completeExceptionally(rc)) {
-
commandSender.sendErrorResponse(requestId,
-
ServerError.NotAllowedError, rc.getMessage());
+ CompletionStage<Subscription> createInitSubFuture;
+ if (!Strings.isNullOrEmpty(initialSubscriptionName)
+ && topic.isPersistent()
+ &&
!topic.getSubscriptions().containsKey(initialSubscriptionName)) {
+ createInitSubFuture =
service.isAllowAutoSubscriptionCreationAsync(topicName)
+
.thenCompose(isAllowAutoSubscriptionCreation -> {
+ if (!isAllowAutoSubscriptionCreation) {
+ return
CompletableFuture.failedFuture(
+ new
BrokerServiceException.NotAllowedException(
+ "Could not create
the initial subscription due to the "
+ + "auto
subscription creation is not allowed."));
}
- producers.remove(producerId,
producerFuture);
- return;
- }
- String msg =
- "Failed to create the initial
subscription: " + ex.getCause().getMessage();
+ return
topic.createSubscription(initialSubscriptionName,
+ InitialPosition.Earliest,
false, null);
+ });
+ } else {
+ createInitSubFuture =
CompletableFuture.completedFuture(null);
+ }
+
+ createInitSubFuture.whenComplete((sub, ex) -> {
+ if (ex != null) {
+ final Throwable rc =
FutureUtil.unwrapCompletionException(ex);
+ if (rc instanceof
BrokerServiceException.NotAllowedException) {
log.warn("[{}] {} initialSubscriptionName:
{}, topic: {}",
- remoteAddress, msg,
initialSubscriptionName, topicName);
- if
(producerFuture.completeExceptionally(ex)) {
+ remoteAddress, rc.getMessage(),
initialSubscriptionName, topicName);
+ if
(producerFuture.completeExceptionally(rc)) {
commandSender.sendErrorResponse(requestId,
-
BrokerServiceException.getClientErrorCode(ex), msg);
+ ServerError.NotAllowedError,
rc.getMessage());
}
producers.remove(producerId,
producerFuture);
return;
}
+ String msg =
+ "Failed to create the initial
subscription: " + ex.getCause().getMessage();
+ log.warn("[{}] {} initialSubscriptionName: {},
topic: {}",
+ remoteAddress, msg,
initialSubscriptionName, topicName);
+ if (producerFuture.completeExceptionally(ex)) {
+ commandSender.sendErrorResponse(requestId,
+
BrokerServiceException.getClientErrorCode(ex), msg);
+ }
+ producers.remove(producerId, producerFuture);
+ return;
+ }
- buildProducerAndAddTopic(topic, producerId,
producerName, requestId, isEncrypted,
+ buildProducerAndAddTopic(topic, producerId,
producerName, requestId, isEncrypted,
metadata, schemaVersion, epoch,
userProvidedProducerName, topicName,
producerAccessMode, topicEpoch,
supportsPartialProducer, producerFuture);
- });
- }).exceptionally(exception -> {
- Throwable cause = exception.getCause();
- log.error("producerId {}, requestId {} :
TransactionBuffer recover failed",
- producerId, requestId, exception);
- if
(producerFuture.completeExceptionally(exception)) {
- commandSender.sendErrorResponse(requestId,
-
ServiceUnitNotReadyException.getClientErrorCode(cause),
- cause.getMessage());
- }
- producers.remove(producerId, producerFuture);
- return null;
});
});
});
@@ -2249,7 +2236,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
long requestId = getLastMessageId.getRequestId();
Topic topic = consumer.getSubscription().getTopic();
- topic.checkIfTransactionBufferRecoverCompletely(true)
+ topic.checkIfTransactionBufferRecoverCompletely()
.thenCompose(__ -> topic.getLastDispatchablePosition())
.thenApply(lastPosition -> {
int partitionIndex =
TopicName.getPartitionIndex(topic.getName());
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 50a28c79792..3ec09e9bfcd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -146,12 +146,11 @@ public interface Topic {
void removeProducer(Producer producer);
/**
- * Wait TransactionBuffer Recovers completely.
- * Take snapshot after TB Recovers completely.
- * @param isTxnEnabled isTxnEnabled
- * @return a future which has completely if isTxn = false. Or a future
return by takeSnapshot.
+ * Wait TransactionBuffer recovers completely.
+ *
+ * @return a future that will be completed after the transaction buffer
recover completely.
*/
- CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean
isTxnEnabled);
+ CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely();
/**
* record add-latency.
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 94568705891..1b98ee2f830 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -258,7 +258,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
}
@Override
- public CompletableFuture<Void>
checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
+ public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely()
{
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 146ac05d695..d814e7ce115 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -836,8 +836,8 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
@Override
- public CompletableFuture<Void>
checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
- return getTransactionBuffer().checkIfTBRecoverCompletely(isTxnEnabled);
+ public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely()
{
+ return getTransactionBuffer().checkIfTBRecoverCompletely();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
index b379c4d1db1..874f4c1c28a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java
@@ -187,14 +187,11 @@ public interface TransactionBuffer {
TransactionBufferStats getStats(boolean lowWaterMarks);
/**
- * Wait TransactionBuffer Recovers completely.
- * Take snapshot after TB Recovers completely.
- * @param isTxn
- * @return a future which has completely if isTxn = false. Or a future
return by takeSnapshot.
+ * Wait TransactionBuffer recovers completely.
+ *
+ * @return a future that will be completed after the transaction buffer
recover completely.
*/
- CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn);
-
-
+ CompletableFuture<Void> checkIfTBRecoverCompletely();
long getOngoingTxnCount();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
index ae755f0715e..4da7a48e96c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -411,7 +411,7 @@ class InMemTransactionBuffer implements TransactionBuffer {
@Override
- public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
+ public CompletableFuture<Void> checkIfTBRecoverCompletely() {
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 7561457d11f..2f90ff8922a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -57,6 +57,7 @@ import
org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
@@ -89,8 +90,12 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
private final int takeSnapshotIntervalTime;
+
private final CompletableFuture<Void> transactionBufferFuture = new
CompletableFuture<>();
+ private CompletableFuture<Position> publishFuture =
getTransactionBufferFuture()
+ .thenApply(__ -> PositionFactory.EARLIEST);
+
/**
* The map is used to store the lowWaterMarks which key is TC ID and value
is lowWaterMark of the TC.
*/
@@ -138,14 +143,14 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
if (!changeToReadyState()) {
log.error("[{}]Transaction buffer recover
fail, current state: {}",
topic.getName(), getState());
- transactionBufferFuture.completeExceptionally
+
getTransactionBufferFuture().completeExceptionally
(new
BrokerServiceException.ServiceUnitNotReadyException(
"Transaction buffer recover
failed to change the status to Ready,"
+ "current state is: "
+ getState()));
} else {
timer.newTimeout(TopicTransactionBuffer.this,
takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
- transactionBufferFuture.complete(null);
+ getTransactionBufferFuture().complete(null);
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
@@ -158,7 +163,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
if (!changeToNoSnapshotState()) {
log.error("[{}]Transaction buffer recover
fail", topic.getName());
} else {
- transactionBufferFuture.complete(null);
+ getTransactionBufferFuture().complete(null);
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
@@ -196,10 +201,10 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
// if transaction buffer recover fail throw
PulsarClientException,
// we need to change the PulsarClientException to
ServiceUnitNotReadyException,
// the tc do op will retry
- transactionBufferFuture.completeExceptionally
+ getTransactionBufferFuture().completeExceptionally
(new
BrokerServiceException.ServiceUnitNotReadyException(e.getMessage(), e));
} else {
- transactionBufferFuture.completeExceptionally(e);
+
getTransactionBufferFuture().completeExceptionally(e);
}
recoverTime.setRecoverEndTime(System.currentTimeMillis());
topic.close(true);
@@ -212,35 +217,19 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
return CompletableFuture.completedFuture(null);
}
+ @VisibleForTesting
+ public CompletableFuture<Position> getPublishFuture() {
+ return publishFuture;
+ }
+
+ @VisibleForTesting
+ public CompletableFuture<Void> getTransactionBufferFuture() {
+ return transactionBufferFuture;
+ }
+
@Override
- public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean
isTxnEnabled) {
- if (!isTxnEnabled) {
- return CompletableFuture.completedFuture(null);
- } else {
- CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- transactionBufferFuture.thenRun(() -> {
- if (checkIfNoSnapshot()) {
-
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(()
-> {
- if (changeToReadyStateFromNoSnapshot()) {
- timer.newTimeout(TopicTransactionBuffer.this,
- takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
- }
- completableFuture.complete(null);
- }).exceptionally(exception -> {
- log.error("Topic {} failed to take snapshot",
this.topic.getName());
- completableFuture.completeExceptionally(exception);
- return null;
- });
- } else {
- completableFuture.complete(null);
- }
- }).exceptionally(exception -> {
- log.error("Topic {}: TransactionBuffer recover failed",
this.topic.getName(), exception.getCause());
- completableFuture.completeExceptionally(exception.getCause());
- return null;
- });
- return completableFuture;
- }
+ public CompletableFuture<Void> checkIfTBRecoverCompletely() {
+ return getTransactionBufferFuture();
}
@Override
@@ -260,6 +249,45 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long
sequenceId, ByteBuf buffer) {
+ // Method `takeAbortedTxnsSnapshot` will be executed in the different
thread.
+ // So we need to retain the buffer in this thread. It will be released
after message persistent.
+ buffer.retain();
+ CompletableFuture<Position> future =
getPublishFuture().thenCompose(ignore -> {
+ if (checkIfNoSnapshot()) {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ // `publishFuture` will be completed after message persistent,
so there will not be two threads
+ // writing snapshots at the same time.
+
snapshotAbortedTxnProcessor.takeAbortedTxnsSnapshot(maxReadPosition).thenRun(()
-> {
+ if (changeToReadyStateFromNoSnapshot()) {
+ timer.newTimeout(TopicTransactionBuffer.this,
+ takeSnapshotIntervalTime,
TimeUnit.MILLISECONDS);
+ completableFuture.complete(null);
+ } else {
+ log.error("[{}]Failed to change state of transaction
buffer to Ready from NoSnapshot",
+ topic.getName());
+ completableFuture.completeExceptionally(new
BrokerServiceException.ServiceUnitNotReadyException(
+ "Transaction Buffer take first snapshot
failed, the current state is: " + getState()));
+ }
+ }).exceptionally(exception -> {
+ log.error("Topic {} failed to take snapshot",
this.topic.getName());
+ completableFuture.completeExceptionally(exception);
+ return null;
+ });
+ return completableFuture.thenCompose(__ ->
internalAppendBufferToTxn(txnId, buffer));
+ } else if (checkIfReady()) {
+ return internalAppendBufferToTxn(txnId, buffer);
+ } else {
+ // `publishFuture` will be completed after transaction buffer
recover completely
+ // during initializing, so this case should not happen.
+ return FutureUtil.failedFuture(new
BrokerServiceException.ServiceUnitNotReadyException(
+ "Transaction Buffer recover failed, the current state
is: " + getState()));
+ }
+ }).whenComplete(((position, throwable) -> buffer.release()));
+ publishFuture = future;
+ return future;
+ }
+
+ private CompletableFuture<Position> internalAppendBufferToTxn(TxnID txnId,
ByteBuf buffer) {
CompletableFuture<Position> completableFuture = new
CompletableFuture<>();
Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits());
if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) {
@@ -314,7 +342,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
//Wait TB recover completely.
- transactionBufferFuture.thenRun(() -> {
+ getTransactionBufferFuture().thenRun(() -> {
ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L,
txnID.getMostSigBits(),
txnID.getLeastSigBits());
try {
@@ -356,7 +384,7 @@ public class TopicTransactionBuffer extends
TopicTransactionBufferState implemen
}
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
//Wait TB recover completely.
- transactionBufferFuture.thenRun(() -> {
+ getTransactionBufferFuture().thenRun(() -> {
//no message sent, need not to add abort mark by txn timeout.
if (!checkIfReady()) {
completableFuture.complete(null);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
index d0efc47c495..d4fd071fef8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java
@@ -132,7 +132,7 @@ public class TransactionBufferDisable implements
TransactionBuffer {
}
@Override
- public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
+ public CompletableFuture<Void> checkIfTBRecoverCompletely() {
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 246ab5ef26a..3b3eaf7bb22 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -595,6 +595,11 @@ public class TransactionTest extends TransactionTestBase {
.topic(topic).enableBatching(true)
.create();
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
+ producer.newMessage(transaction).send();
+ transaction.abort().get();
+
Awaitility.await().untilAsserted(() -> {
Message<TransactionBufferSnapshot> message1 = reader.readNext();
TransactionBufferSnapshot snapshot1 = message1.getValue();
@@ -608,7 +613,7 @@ public class TransactionTest extends TransactionTestBase {
Awaitility.await().untilAsserted(() -> {
Message<TransactionBufferSnapshot> message1 = reader.readNext();
TransactionBufferSnapshot snapshot1 = message1.getValue();
- Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 1);
+ Assert.assertEquals(snapshot1.getMaxReadPositionEntryId(), 3);
});
}
@@ -716,7 +721,7 @@ public class TransactionTest extends TransactionTestBase {
.sendTimeout(0, TimeUnit.SECONDS)
.create();
- Awaitility.await().untilAsserted(() ->
Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
+ Awaitility.await().untilAsserted(() ->
Assert.assertTrue(topicTransactionBuffer.checkIfNoSnapshot()));
//test publishing txn messages will not change maxReadPosition if
don`t commit or abort.
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
@@ -1657,7 +1662,7 @@ public class TransactionTest extends TransactionTestBase {
persistentTopic.set(new PersistentTopic("topic-a", managedLedger,
brokerService));
try {
// Do check.
-
persistentTopic.get().checkIfTransactionBufferRecoverCompletely(true).get(5,
TimeUnit.SECONDS);
+
persistentTopic.get().checkIfTransactionBufferRecoverCompletely().get(5,
TimeUnit.SECONDS);
fail("Expect failure by TB closed, but it is finished.");
} catch (ExecutionException executionException){
Throwable t = executionException.getCause();
@@ -1815,8 +1820,6 @@ public class TransactionTest extends TransactionTestBase {
.createAsync();
getTopic("persistent://" + topic + "-partition-0");
Thread.sleep(3000);
- // the producer shouldn't be created, because the transaction buffer
snapshot writer future didn't finish.
- assertFalse(producerFuture.isDone());
// The topic will be closed, because the transaction buffer snapshot
writer future is failed,
// the failed writer future will be removed, the producer will be
reconnected and work well.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index dea79f391e3..1ab97eb457a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,13 +18,15 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Cleanup;
import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;
-import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
-import static org.testng.AssertJUnit.fail;
import io.opentelemetry.api.common.Attributes;
import java.time.Duration;
import java.util.Collections;
@@ -35,11 +37,13 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import lombok.Cleanup;
+
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
@@ -50,16 +54,23 @@ import
org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
+import
org.apache.pulsar.broker.transaction.buffer.utils.TransactionBufferTestImpl;
+import
org.apache.pulsar.broker.transaction.buffer.utils.TransactionBufferTestProvider;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -228,9 +239,7 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
String topic = "persistent://" + NAMESPACE1 +
"/testGetMaxReadyPositionAfterTBReady";
// 1.1 Mock component.
TransactionBuffer transactionBuffer =
Mockito.spy(TransactionBuffer.class);
- when(transactionBuffer.checkIfTBRecoverCompletely(anyBoolean()))
- // Handle producer will check transaction buffer recover
completely.
- .thenReturn(CompletableFuture.completedFuture(null))
+ when(transactionBuffer.checkIfTBRecoverCompletely())
// If the Transaction buffer failed to recover, we can not get
the correct last max read id.
.thenReturn(CompletableFuture.failedFuture(new Throwable("Mock
fail")))
// If the transaction buffer recover successfully, the max
read position can be acquired successfully.
@@ -405,4 +414,174 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
assertEquals(expected.getLedgerId(), actual.getLedgerId());
}
+ /**
+ * This test verifies the state changes of a TransactionBuffer within a
topic under different conditions.
+ * Initially, the TransactionBuffer is in a NoSnapshot state upon topic
creation.
+ * It remains in the NoSnapshot state even after a normal message is sent.
+ * The state changes to Ready only after a transactional message is sent.
+ * The test also ensures that the TransactionBuffer can be correctly
recovered after the topic is unloaded.
+ */
+ @Test
+ public void testWriteSnapshotWhenFirstTxnMessageSend() throws Exception {
+ // 1. Prepare test environment.
+ String topic = "persistent://" + NAMESPACE1 +
"/testWriteSnapshotWhenFirstTxnMessageSend";
+ String txnMsg = "transaction message";
+ String normalMsg = "normal message";
+ admin.topics().createNonPartitionedTopic(topic);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsarServiceList.get(0).getBrokerService()
+ .getTopic(topic, false)
+ .get()
+ .get();
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscribe();
+ // 2. Test the state of transaction buffer after building producer
with no new messages.
+ // The TransactionBuffer should be in NoSnapshot state before
transaction message sent.
+ TopicTransactionBuffer topicTransactionBuffer =
(TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(topicTransactionBuffer.getState(),
TopicTransactionBufferState.State.NoSnapshot);
+ });
+ // 3. Test the state of transaction buffer after sending normal
messages.
+ // The TransactionBuffer should still be in NoSnapshot state after a
normal message is sent.
+ producer.newMessage().value(normalMsg).send();
+ Assert.assertEquals(topicTransactionBuffer.getState(),
TopicTransactionBufferState.State.NoSnapshot);
+ // 4. Test the state of transaction buffer after sending transaction
messages.
+ // The transaction buffer should be in Ready state at this time.
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS)
+ .build()
+ .get();
+ producer.newMessage(transaction).value(txnMsg).send();
+ Assert.assertEquals(topicTransactionBuffer.getState(),
TopicTransactionBufferState.State.Ready);
+ // 5. Test transaction buffer can be recovered correctly.
+ // There are 4 message sent to this topic, 2 normal message and 2
transaction message |m1|m2-txn1|m3-txn1|m4|.
+ // Aborting the transaction and unload the topic and then redelivering
unacked messages,
+ // only normal messages can be received.
+ transaction.abort().get(5, TimeUnit.SECONDS);
+ producer.newMessage().value(normalMsg).send();
+ admin.topics().unload(topic);
+ PersistentTopic persistentTopic2 = (PersistentTopic)
pulsarServiceList.get(0).getBrokerService()
+ .getTopic(topic, false)
+ .get()
+ .get();
+ TopicTransactionBuffer topicTransactionBuffer2 =
(TopicTransactionBuffer) persistentTopic2
+ .getTransactionBuffer();
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(topicTransactionBuffer2.getState(),
TopicTransactionBufferState.State.Ready);
+ });
+ consumer.redeliverUnacknowledgedMessages();
+ for (int i = 0; i < 2; i++) {
+ Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertEquals(message.getValue(), normalMsg);
+ }
+ Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertNull(message);
+ }
+
+ /**
+ * Send some messages before transaction buffer ready and then send some
messages after transaction buffer ready,
+ * these messages should be received in order.
+ */
+ @Test
+ public void testMessagePublishInOrder() throws Exception {
+ // 1. Prepare test environment.
+ this.pulsarServiceList.forEach(pulsarService -> {
+ pulsarService.setTransactionBufferProvider(new
TransactionBufferTestProvider());
+ });
+ String topic = "persistent://" + NAMESPACE1 +
"/testMessagePublishInOrder" + RandomUtils.nextLong();
+ admin.topics().createNonPartitionedTopic(topic);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsarServiceList.get(0).getBrokerService()
+ .getTopic(topic, false)
+ .get()
+ .get();
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .create();
+ @Cleanup
+ Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe();
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS)
+ .build().get();
+
+ // 2. Set a new future in transaction buffer as
`transactionBufferFuture` to simulate whether the
+ // transaction buffer recover completely.
+ TransactionBufferTestImpl topicTransactionBuffer =
(TransactionBufferTestImpl) persistentTopic
+ .getTransactionBuffer();
+ CompletableFuture<Position> completableFuture = new
CompletableFuture<>();
+ CompletableFuture<Position> originalFuture =
topicTransactionBuffer.getPublishFuture();
+ topicTransactionBuffer.setPublishFuture(completableFuture);
+
topicTransactionBuffer.setState(TopicTransactionBufferState.State.Ready);
+ // Register this topic to the transaction in advance to avoid the
sending request pending here.
+ ((TransactionImpl) transaction).registerProducedTopic(topic).get(5,
TimeUnit.SECONDS);
+ // 3. Test the messages sent before transaction buffer ready is in
order.
+ for (int i = 0; i < 50; i++) {
+ producer.newMessage(transaction).value(i).sendAsync();
+ }
+ // 4. Test the messages sent after transaction buffer ready is in
order.
+ completableFuture.complete(originalFuture.get());
+ for (int i = 50; i < 100; i++) {
+ producer.newMessage(transaction).value(i).sendAsync();
+ }
+ transaction.commit().get();
+ for (int i = 0; i < 100; i++) {
+ Message<Integer> message = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertEquals(message.getValue(), i);
+ }
+ }
+
+ /**
+ * Test `testMessagePublishInOrder` will test the ref count work as
expected with no exception.
+ * And this test is used to test the memory leak due to ref count.
+ */
+ @Test
+ public void testRefCountWhenAppendBufferToTxn() throws Exception {
+ // 1. Prepare test resource
+ this.pulsarServiceList.forEach(pulsarService -> {
+ pulsarService.setTransactionBufferProvider(new
TransactionBufferTestProvider());
+ });
+ String topic = "persistent://" + NAMESPACE1 +
"/testRefCountWhenAppendBufferToTxn";
+ admin.topics().createNonPartitionedTopic(topic);
+ PersistentTopic persistentTopic = (PersistentTopic)
pulsarServiceList.get(0).getBrokerService()
+ .getTopic(topic, false)
+ .get()
+ .get();
+ TransactionBufferTestImpl topicTransactionBuffer =
(TransactionBufferTestImpl) persistentTopic
+ .getTransactionBuffer();
+ // 2. Test reference count does not change in the method
`appendBufferToTxn`.
+ // 2.1 Test sending first transaction message, this will take a
snapshot.
+ ByteBuf byteBuf1 = Unpooled.buffer();
+ topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1)
+ .get(5, TimeUnit.SECONDS);
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(byteBuf1.refCnt(), 1));
+ // 2.2 Test send the second transaction message, this will not take
snapshots.
+ ByteBuf byteBuf2 = Unpooled.buffer();
+ topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L, byteBuf1)
+ .get(5, TimeUnit.SECONDS);
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(byteBuf2.refCnt(), 1));
+ // 2.3 Test sending message failed.
+ topicTransactionBuffer.setPublishFuture(FutureUtil.failedFuture(new
Exception("fail")));
+ ByteBuf byteBuf3 = Unpooled.buffer();
+ try {
+ topicTransactionBuffer.appendBufferToTxn(new TxnID(1, 1), 1L,
byteBuf1)
+ .get(5, TimeUnit.SECONDS);
+ fail();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getMessage(), "fail");
+ }
+ Awaitility.await().untilAsserted(() ->
Assert.assertEquals(byteBuf3.refCnt(), 1));
+ // 3. release resource
+ byteBuf1.release();
+ byteBuf2.release();
+ byteBuf3.release();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index eb7b24c7326..0b50f91fd40 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -195,6 +195,15 @@ public class TransactionStablePositionTest extends
TransactionTestBase {
.topic(topicName)
.create();
+ if (clientEnableTransaction) {
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.HOURS)
+ .build()
+ .get();
+ producer.newMessage(transaction).send();
+ transaction.commit().get();
+ }
+
PersistentTopic persistentTopic = (PersistentTopic)
getPulsarServiceList().get(0).getBrokerService()
.getTopic(TopicName.get(topicName).toString(),
false).get().get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
new file mode 100644
index 00000000000..7ee14ffc337
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.utils;
+
+import lombok.Setter;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+
+import java.util.concurrent.CompletableFuture;
+
+public class TransactionBufferTestImpl extends TopicTransactionBuffer {
+ @Setter
+ public CompletableFuture<Void> transactionBufferFuture = null;
+ @Setter
+ public State state = null;
+ @Setter
+ public CompletableFuture<Position> publishFuture = null;
+
+ public TransactionBufferTestImpl(PersistentTopic topic) {
+ super(topic);
+ }
+
+ @Override
+ public CompletableFuture<Void> getTransactionBufferFuture() {
+ return transactionBufferFuture == null ?
super.getTransactionBufferFuture() : transactionBufferFuture;
+ }
+
+ @Override
+ public State getState() {
+ return state == null ? super.getState() : state;
+ }
+
+ @Override
+ public CompletableFuture<Position> getPublishFuture() {
+ return publishFuture == null ? super.getPublishFuture() : publishFuture;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestProvider.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestProvider.java
new file mode 100644
index 00000000000..7bc93c0e7cf
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/utils/TransactionBufferTestProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.utils;
+
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+
+public class TransactionBufferTestProvider implements
TransactionBufferProvider {
+
+ @Override
+ public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+ return new TransactionBufferTestImpl((PersistentTopic) originTopic);
+ }
+}
+