This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit d42a7f92cf42df6f9d771c200640a96ffb573566 Author: kaiyi.lk <[email protected]> AuthorDate: Wed Jul 20 15:40:40 2022 +0800 fix: use the last TransactionData for the same transactionId --- .../proxy/service/transaction/AbstractTransactionService.java | 2 +- .../rocketmq/proxy/service/transaction/TransactionDataManager.java | 6 +++--- .../proxy/service/transaction/TransactionDataManagerTest.java | 7 ++++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java index 262e430c3..b55cc3905 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java @@ -57,7 +57,7 @@ public abstract class AbstractTransactionService implements TransactionService, @Override public EndTransactionRequestData genEndTransactionRequestHeader(String producerGroup, Integer commitOrRollback, boolean fromTransactionCheck, String msgId, String transactionId) { - TransactionData transactionData = this.transactionDataManager.pollFirstNoExpireTransactionData(producerGroup, transactionId); + TransactionData transactionData = this.transactionDataManager.pollNoExpireTransactionData(producerGroup, transactionId); if (transactionData == null) { return null; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java index 740afab3a..2c19b858b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java @@ -58,13 +58,13 @@ public class TransactionDataManager implements StartAndShutdown { }); } - public TransactionData pollFirstNoExpireTransactionData(String producerGroup, String transactionId) { + public TransactionData pollNoExpireTransactionData(String producerGroup, String transactionId) { AtomicReference<TransactionData> res = new AtomicReference<>(); long currTimestamp = System.currentTimeMillis(); this.transactionIdDataMap.computeIfPresent(buildKey(producerGroup, transactionId), (key, dataSet) -> { - TransactionData data = dataSet.pollFirst(); + TransactionData data = dataSet.pollLast(); while (data != null && data.getExpireTime() < currTimestamp) { - data = dataSet.pollFirst(); + data = dataSet.pollLast(); } if (data != null) { res.set(data); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java index 2d03ab6af..d9620740e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java @@ -28,6 +28,7 @@ import org.junit.Test; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -65,7 +66,7 @@ public class TransactionDataManagerTest extends InitConfigAndLoggerTest { } @Test - public void testPollFirst() { + public void testPoll() { String txId = MessageClientIDSetter.createUniqID(); TransactionData transactionData1 = createTransactionData(txId, System.currentTimeMillis() - Duration.ofMinutes(2).toMillis()); TransactionData transactionData2 = createTransactionData(txId); @@ -73,9 +74,9 @@ public class TransactionDataManagerTest extends InitConfigAndLoggerTest { this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, transactionData1); this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, transactionData2); - TransactionData resTransactionData = this.transactionDataManager.pollFirstNoExpireTransactionData(PRODUCER_GROUP, txId); + TransactionData resTransactionData = this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId); assertSame(transactionData2, resTransactionData); - assertTrue(this.transactionDataManager.transactionIdDataMap.isEmpty()); + assertNull(this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId)); } @Test
