This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit d42a7f92cf42df6f9d771c200640a96ffb573566
Author: kaiyi.lk <[email protected]>
AuthorDate: Wed Jul 20 15:40:40 2022 +0800

    fix: use the last TransactionData for the same transactionId
---
 .../proxy/service/transaction/AbstractTransactionService.java      | 2 +-
 .../rocketmq/proxy/service/transaction/TransactionDataManager.java | 6 +++---
 .../proxy/service/transaction/TransactionDataManagerTest.java      | 7 ++++---
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
index 262e430c3..b55cc3905 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
@@ -57,7 +57,7 @@ public abstract class AbstractTransactionService implements 
TransactionService,
     @Override
     public EndTransactionRequestData genEndTransactionRequestHeader(String 
producerGroup, Integer commitOrRollback,
         boolean fromTransactionCheck, String msgId, String transactionId) {
-        TransactionData transactionData = 
this.transactionDataManager.pollFirstNoExpireTransactionData(producerGroup, 
transactionId);
+        TransactionData transactionData = 
this.transactionDataManager.pollNoExpireTransactionData(producerGroup, 
transactionId);
         if (transactionData == null) {
             return null;
         }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
index 740afab3a..2c19b858b 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManager.java
@@ -58,13 +58,13 @@ public class TransactionDataManager implements 
StartAndShutdown {
         });
     }
 
-    public TransactionData pollFirstNoExpireTransactionData(String 
producerGroup, String transactionId) {
+    public TransactionData pollNoExpireTransactionData(String producerGroup, 
String transactionId) {
         AtomicReference<TransactionData> res = new AtomicReference<>();
         long currTimestamp = System.currentTimeMillis();
         this.transactionIdDataMap.computeIfPresent(buildKey(producerGroup, 
transactionId), (key, dataSet) -> {
-            TransactionData data = dataSet.pollFirst();
+            TransactionData data = dataSet.pollLast();
             while (data != null && data.getExpireTime() < currTimestamp) {
-                data = dataSet.pollFirst();
+                data = dataSet.pollLast();
             }
             if (data != null) {
                 res.set(data);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
index 2d03ab6af..d9620740e 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/TransactionDataManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
@@ -65,7 +66,7 @@ public class TransactionDataManagerTest extends 
InitConfigAndLoggerTest {
     }
 
     @Test
-    public void testPollFirst() {
+    public void testPoll() {
         String txId = MessageClientIDSetter.createUniqID();
         TransactionData transactionData1 = createTransactionData(txId, 
System.currentTimeMillis() - Duration.ofMinutes(2).toMillis());
         TransactionData transactionData2 = createTransactionData(txId);
@@ -73,9 +74,9 @@ public class TransactionDataManagerTest extends 
InitConfigAndLoggerTest {
         this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, 
transactionData1);
         this.transactionDataManager.addTransactionData(PRODUCER_GROUP, txId, 
transactionData2);
 
-        TransactionData resTransactionData = 
this.transactionDataManager.pollFirstNoExpireTransactionData(PRODUCER_GROUP, 
txId);
+        TransactionData resTransactionData = 
this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP, txId);
         assertSame(transactionData2, resTransactionData);
-        assertTrue(this.transactionDataManager.transactionIdDataMap.isEmpty());
+        
assertNull(this.transactionDataManager.pollNoExpireTransactionData(PRODUCER_GROUP,
 txId));
     }
 
     @Test

Reply via email to