This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 58bd9bc1035a74f9f6656a2f8d1048d705e7cece Author: Xiangying Meng <[email protected]> AuthorDate: Fri May 13 13:48:34 2022 +0800 [Fix][Txn] Fix transaction PendingAck lowWaterMark (#15530) ### Motivation Now, PendingAckHandle use the ending transaction ID to append abort mark, but it is wrong. We should append abort mark for the first transaction in the individualAckOfTransaction after judgment. ### Modification Append abort mark for the first transaction in the individualAckOfTransaction after judgment. (cherry picked from commit 498cde9ad3dd62142d73e024ea424bd76726dfaa) --- .../pendingack/impl/PendingAckHandleImpl.java | 22 ++-- .../pendingack/PendingAckPersistentTest.java | 117 +++++++++++++++++++++ 2 files changed, 124 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index f001b39972e..7e11592e063 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -600,21 +600,13 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && firstTxn.getLeastSigBits() <= lowWaterMark) { - this.pendingAckStoreFuture.whenComplete((pendingAckStore, throwable) -> { - if (throwable == null) { - pendingAckStore.appendAbortMark(txnID, AckType.Individual).thenAccept(v -> { - synchronized (PendingAckHandleImpl.this) { - log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], " - + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark); - individualAckOfTransaction.remove(firstTxn); - handleLowWaterMark(txnID, lowWaterMark); - } - }).exceptionally(e -> { - log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], " - + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark); - return null; - }); - } + abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> { + log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], " + + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark); + }).exceptionally(e -> { + log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], " + + "lowWaterMark : [{}]", topicName, txnID, lowWaterMark); + return null; }); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java index 58408cdee10..7bc562ca62b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java @@ -23,7 +23,9 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -32,6 +34,7 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.map.LinkedMap; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; @@ -44,6 +47,8 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -345,4 +350,116 @@ public class PendingAckPersistentTest extends TransactionTestBase { assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2))); assertFalse(topics.contains(topic)); } + + @Test + public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception { + String topic = TopicName.get(TopicDomain.persistent.toString(), + NamespaceName.get(NAMESPACE1), "test").toString(); + + String subName = "subName"; + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Failover) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + for (int i = 0; i < 5; i++) { + producer.newMessage().send(); + } + + Transaction transaction1 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build() + .get(); + + Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS); + consumer.acknowledgeAsync(message1.getMessageId(), transaction1); + transaction1.commit().get(); + + + Transaction transaction2 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build() + .get(); + while (transaction1.getTxnID().getMostSigBits() != transaction2.getTxnID().getMostSigBits()) { + transaction2 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build() + .get(); + } + + Transaction transaction3 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build() + .get(); + while (transaction1.getTxnID().getMostSigBits() != transaction3.getTxnID().getMostSigBits()) { + transaction3 = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build() + .get(); + } + + Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS); + consumer.acknowledgeAsync(message3.getMessageId(), transaction2); + transaction2.commit().get(); + + Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS); + + Field field = TransactionImpl.class.getDeclaredField("state"); + field.setAccessible(true); + field.set(transaction1, TransactionImpl.State.OPEN); + + consumer.acknowledgeAsync(message2.getMessageId(), transaction1).get(); + Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS); + field.set(transaction2, TransactionImpl.State.OPEN); + consumer.acknowledgeAsync(message4.getMessageId(), transaction2).get(); + + Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS); + consumer.acknowledgeAsync(message5.getMessageId(), transaction3); + transaction3.commit().get(); + + + PersistentTopic persistentTopic = + (PersistentTopic) getPulsarServiceList() + .get(0) + .getBrokerService() + .getTopic(topic, false) + .get() + .get(); + + PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName); + PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription); + + Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore"); + method.setAccessible(true); + method.invoke(pendingAckHandle); + + Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture"); + field1.setAccessible(true); + CompletableFuture<PendingAckStore> completableFuture = + (CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle); + + Awaitility.await().until(() -> { + completableFuture.get(); + return true; + }); + + Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction"); + field2.setAccessible(true); + LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction = + (LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle); + + assertFalse(individualAckOfTransaction.containsKey(transaction1.getTxnID())); + assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID())); + + } }
