This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 498cde9ad3d [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)
498cde9ad3d is described below
commit 498cde9ad3dd62142d73e024ea424bd76726dfaa
Author: Xiangying Meng <[email protected]>
AuthorDate: Fri May 13 13:48:34 2022 +0800
[Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530)
### Motivation
Now, PendingAckHandle use the ending transaction ID to append abort mark,
but it is wrong. We should append abort mark for the first transaction in the
individualAckOfTransaction after judgment.
### Modification
Append abort mark for the first transaction in the
individualAckOfTransaction after judgment.
---
.../pendingack/impl/PendingAckHandleImpl.java | 22 ++--
.../pendingack/PendingAckPersistentTest.java | 117 +++++++++++++++++++++
2 files changed, 124 insertions(+), 15 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index dc4f89d5e11..98b6700ca20 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -600,21 +600,13 @@ public class PendingAckHandleImpl extends
PendingAckHandleState implements Pendi
if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
&& firstTxn.getLeastSigBits() <= lowWaterMark) {
- this.pendingAckStoreFuture.whenComplete((pendingAckStore,
throwable) -> {
- if (throwable == null) {
- pendingAckStore.appendAbortMark(txnID,
AckType.Individual).thenAccept(v -> {
- synchronized (PendingAckHandleImpl.this) {
- log.warn("[{}] Transaction pending ack handle
low water mark success! txnId : [{}], "
- + "lowWaterMark : [{}]", topicName,
txnID, lowWaterMark);
- individualAckOfTransaction.remove(firstTxn);
- handleLowWaterMark(txnID, lowWaterMark);
- }
- }).exceptionally(e -> {
- log.warn("[{}] Transaction pending ack handle low
water mark fail! txnId : [{}], "
- + "lowWaterMark : [{}]", topicName, txnID,
lowWaterMark);
- return null;
- });
- }
+ abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
+ log.warn("[{}] Transaction pending ack handle low water
mark success! txnId : [{}], "
+ + "lowWaterMark : [{}]", topicName, txnID,
lowWaterMark);
+ }).exceptionally(e -> {
+ log.warn("[{}] Transaction pending ack handle low water
mark fail! txnId : [{}], "
+ + "lowWaterMark : [{}]", topicName, txnID,
lowWaterMark);
+ return null;
});
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index 58408cdee10..7bc562ca62b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -23,7 +23,9 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -32,6 +34,7 @@ import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -44,6 +47,8 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
@@ -345,4 +350,116 @@ public class PendingAckPersistentTest extends
TransactionTestBase {
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic,
subName2)));
assertFalse(topics.contains(topic));
}
+
+ @Test
+ public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
+ String topic = TopicName.get(TopicDomain.persistent.toString(),
+ NamespaceName.get(NAMESPACE1), "test").toString();
+
+ String subName = "subName";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Failover)
+ .enableBatchIndexAcknowledgment(true)
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().send();
+ }
+
+ Transaction transaction1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message1.getMessageId(), transaction1);
+ transaction1.commit().get();
+
+
+ Transaction transaction2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ while (transaction1.getTxnID().getMostSigBits() !=
transaction2.getTxnID().getMostSigBits()) {
+ transaction2 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ }
+
+ Transaction transaction3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ while (transaction1.getTxnID().getMostSigBits() !=
transaction3.getTxnID().getMostSigBits()) {
+ transaction3 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+ }
+
+ Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message3.getMessageId(), transaction2);
+ transaction2.commit().get();
+
+ Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);
+
+ Field field = TransactionImpl.class.getDeclaredField("state");
+ field.setAccessible(true);
+ field.set(transaction1, TransactionImpl.State.OPEN);
+
+ consumer.acknowledgeAsync(message2.getMessageId(), transaction1).get();
+ Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
+ field.set(transaction2, TransactionImpl.State.OPEN);
+ consumer.acknowledgeAsync(message4.getMessageId(), transaction2).get();
+
+ Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
+ consumer.acknowledgeAsync(message5.getMessageId(), transaction3);
+ transaction3.commit().get();
+
+
+ PersistentTopic persistentTopic =
+ (PersistentTopic) getPulsarServiceList()
+ .get(0)
+ .getBrokerService()
+ .getTopic(topic, false)
+ .get()
+ .get();
+
+ PersistentSubscription persistentSubscription =
persistentTopic.getSubscription(subName);
+ PendingAckHandleImpl pendingAckHandle = new
PendingAckHandleImpl(persistentSubscription);
+
+ Method method =
PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
+ method.setAccessible(true);
+ method.invoke(pendingAckHandle);
+
+ Field field1 =
PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
+ field1.setAccessible(true);
+ CompletableFuture<PendingAckStore> completableFuture =
+ (CompletableFuture<PendingAckStore>)
field1.get(pendingAckHandle);
+
+ Awaitility.await().until(() -> {
+ completableFuture.get();
+ return true;
+ });
+
+ Field field2 =
PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
+ field2.setAccessible(true);
+ LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>
individualAckOfTransaction =
+ (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>)
field2.get(pendingAckHandle);
+
+
assertFalse(individualAckOfTransaction.containsKey(transaction1.getTxnID()));
+
assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID()));
+
+ }
}