This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c26e7d70750 [fix](txn insert) Fix txn insert connect to observer fe in 
cloud mode (#36615)
c26e7d70750 is described below

commit c26e7d70750c842642bde00c2e9d3d9339ff9697
Author: meiyi <[email protected]>
AuthorDate: Fri Jun 21 22:13:12 2024 +0800

    [fix](txn insert) Fix txn insert connect to observer fe in cloud mode 
(#36615)
    
    ## Proposed changes
    
    For txn insert, if connect to observer fe, the sql is forward to master
    fe, but the txn context is lost. This pr fix it.
---
 .../doris/transaction/SubTransactionState.java     |  23 +++++
 .../apache/doris/transaction/TransactionEntry.java | 115 ++++++++++++---------
 .../apache/doris/transaction/TransactionState.java |  19 +---
 .../transaction/DatabaseTransactionMgrTest.java    |   2 +-
 .../transaction/GlobalTransactionMgrTest.java      |  39 ++++---
 gensrc/thrift/FrontendService.thrift               |  14 +++
 6 files changed, 131 insertions(+), 81 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/SubTransactionState.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/SubTransactionState.java
index 2ee8c2df59b..43a12329f15 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/SubTransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/SubTransactionState.java
@@ -18,6 +18,7 @@
 package org.apache.doris.transaction;
 
 import org.apache.doris.catalog.Table;
+import org.apache.doris.thrift.TSubTxnType;
 import org.apache.doris.thrift.TTabletCommitInfo;
 
 import lombok.Getter;
@@ -47,6 +48,28 @@ public class SubTransactionState {
         this.subTransactionType = subTransactionType;
     }
 
+    public static SubTransactionType getSubTransactionType(TSubTxnType 
subTxnType) {
+        switch (subTxnType) {
+            case INSERT:
+                return SubTransactionType.INSERT;
+            case DELETE:
+                return SubTransactionType.DELETE;
+            default:
+                throw new IllegalArgumentException("Unknown sub txn type: " + 
subTxnType);
+        }
+    }
+
+    public static TSubTxnType getSubTransactionType(SubTransactionType 
subTxnType) {
+        switch (subTxnType) {
+            case INSERT:
+                return TSubTxnType.INSERT;
+            case DELETE:
+                return TSubTxnType.DELETE;
+            default:
+                throw new IllegalArgumentException("Unknown sub txn type: " + 
subTxnType);
+        }
+    }
+
     @Override
     public String toString() {
         return "SubTransactionState{"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index df68685134c..e9db8e3f58f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -43,6 +43,7 @@ import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TLoadTxnBeginRequest;
 import org.apache.doris.thrift.TLoadTxnBeginResult;
+import org.apache.doris.thrift.TSubTxnInfo;
 import org.apache.doris.thrift.TTabletCommitInfo;
 import org.apache.doris.thrift.TTxnLoadInfo;
 import org.apache.doris.thrift.TTxnParams;
@@ -85,9 +86,6 @@ public class TransactionEntry {
     private long transactionId = -1;
     private TransactionState transactionState;
     private long timeoutTimestamp = -1;
-    // 1. For cloud mode, we keep subTransactionStates in TransactionEntry;
-    // 2. For doris, we keep subTransactionStates in TransactionState, because 
if executed in observer,
-    // the dml statements will be forwarded to master, so keep the 
subTransactionStates is in master.
     private List<SubTransactionState> subTransactionStates = new ArrayList<>();
     // Used for cloud mode, including all successful or failed sub 
transactions except the first one
     private long allSubTxnNum = 0;
@@ -222,7 +220,6 @@ public class TransactionEntry {
             this.dbId = this.database.getId();
             this.transactionState = Env.getCurrentGlobalTransactionMgr()
                     .getTransactionState(database.getId(), transactionId);
-            this.transactionState.resetSubTransactionStates();
             return this.transactionId;
         } else {
             if (this.database.getId() != database.getId()) {
@@ -253,13 +250,15 @@ public class TransactionEntry {
     public TransactionStatus commitTransaction() throws Exception {
         if (isTransactionBegan) {
             try {
-                if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
+                // cloud mode does not commit on observer fe because 
CloudGlobalTransactionMgr#afterCommitTxnResp
+                // will produce event
+                if (Env.getCurrentEnv().isMaster()) {
                     beforeFinishTransaction();
                     // the report_tablet_interval_seconds is default 60
                     long commitTimeout = Math.min(Config.commit_timeout_second 
* 1000L,
                             Math.max(timeoutTimestamp - 
System.currentTimeMillis(), 0));
                     if 
(Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(database, 
transactionId,
-                            transactionState.getSubTransactionStates(), 
commitTimeout)) {
+                            subTransactionStates, commitTimeout)) {
                         return TransactionStatus.VISIBLE;
                     } else {
                         return TransactionStatus.COMMITTED;
@@ -336,13 +335,20 @@ public class TransactionEntry {
         }
     }
 
-    private void beforeFinishTransaction() {
+    private void beforeFinishTransaction() throws DdlException {
         if (isTransactionBegan) {
-            List<Long> tableIds = 
transactionState.getTableIdList().stream().distinct().collect(Collectors.toList());
-            transactionState.setTableIdList(tableIds);
-            List<SubTransactionState> subTransactionStatesPtr = 
Config.isCloudMode() ? subTransactionStates
-                    : transactionState.getSubTransactionStates();
-            subTransactionStatesPtr.sort((s1, s2) -> {
+            if (Config.isCloudMode()) {
+                // null if this is observer
+                if (transactionState == null) {
+                    database = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+                    transactionState = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(dbId, transactionId);
+                }
+            } else {
+                List<Long> tableIds = 
transactionState.getTableIdList().stream().distinct()
+                        .collect(Collectors.toList());
+                transactionState.setTableIdList(tableIds);
+            }
+            subTransactionStates.sort((s1, s2) -> {
                 if (s1.getSubTransactionType() == SubTransactionType.INSERT
                         && s2.getSubTransactionType() == 
SubTransactionType.DELETE) {
                     return 1;
@@ -353,11 +359,9 @@ public class TransactionEntry {
                     return Long.compare(s1.getSubTransactionId(), 
s2.getSubTransactionId());
                 }
             });
-            if (Config.isCloudMode()) {
-                
transactionState.setSubTransactionStates(subTransactionStatesPtr);
-            }
-            LOG.info("subTransactionStates={}", 
transactionState.getSubTransactionStates());
-            transactionState.resetSubTxnIds();
+            LOG.info("subTransactionStates={}", subTransactionStates);
+            
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+                    .collect(Collectors.toList()));
         }
     }
 
@@ -395,10 +399,7 @@ public class TransactionEntry {
             LOG.debug("label={}, txn_id={}, sub_txn_id={}, table={}, 
commit_infos={}",
                     label, transactionId, subTxnId, table, commitInfos);
         }
-        List<SubTransactionState> subTransactionStatesPtr = 
Config.isCloudMode() ? subTransactionStates
-                : transactionState.getSubTransactionStates();
-        subTransactionStatesPtr
-                .add(new SubTransactionState(subTxnId, table, commitInfos, 
subTransactionType));
+        subTransactionStates.add(new SubTransactionState(subTxnId, table, 
commitInfos, subTransactionType));
     }
 
     public boolean isTransactionBegan() {
@@ -443,27 +444,22 @@ public class TransactionEntry {
         this.setTxnConf(new TTxnParams().setNeedTxn(true).setTxnId(-1));
         this.label = txnLoadInfo.getLabel();
         if (txnLoadInfo.isSetTxnId()) {
-            this.dbId = txnLoadInfo.getDbId();
-            this.database = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
-            this.transactionId = txnLoadInfo.getTxnId();
+            Preconditions.checkState(subTransactionStates.isEmpty(),
+                    "subTxnStates is not empty: " + subTransactionStates);
+            resetByTxnInfo(txnLoadInfo);
             this.transactionState = 
Env.getCurrentGlobalTransactionMgr().getTransactionState(dbId, transactionId);
             Preconditions.checkNotNull(this.transactionState,
                     "db_id" + dbId + " txn_id=" + transactionId + " not 
found");
             
Preconditions.checkState(this.label.equals(this.transactionState.getLabel()), 
"expected label="
                     + this.label + ", real label=" + 
this.transactionState.getLabel());
             this.isTransactionBegan = true;
-            this.timeoutTimestamp = txnLoadInfo.getTimeoutTimestamp();
         }
-        LOG.info("set txn info in master, label={}, txnId={}, dbId={}, 
timeoutTimestamp={}",
-                label, transactionId, dbId, timeoutTimestamp);
+        LOG.info("set txn info in master, label={}, txnId={}, dbId={}, 
timeoutTimestamp={}, allSubTxnNum={}, "
+                + "subTxnStates={}", label, transactionId, dbId, 
timeoutTimestamp, allSubTxnNum, subTransactionStates);
     }
 
     public TTxnLoadInfo getTxnInfoInMaster() {
-        TTxnLoadInfo txnLoadInfo = new TTxnLoadInfo();
-        txnLoadInfo.setLabel(label);
-        txnLoadInfo.setTxnId(transactionId);
-        txnLoadInfo.setDbId(dbId);
-        txnLoadInfo.setTimeoutTimestamp(timeoutTimestamp);
+        TTxnLoadInfo txnLoadInfo = getTxnLoadInfo();
         LOG.info("master return txn info: {}", txnLoadInfo);
         return txnLoadInfo;
     }
@@ -473,31 +469,56 @@ public class TransactionEntry {
             throw new AnalysisException(
                     "Transaction insert can not insert into values and insert 
into select at the same time");
         }
+        TTxnLoadInfo txnLoadInfo = getTxnLoadInfo();
+        LOG.info("get txn load info in observer: {}", txnLoadInfo);
+        return txnLoadInfo;
+    }
+
+    public void setTxnLoadInfoInObserver(TTxnLoadInfo txnLoadInfo) throws 
DdlException {
+        Preconditions.checkState(txnLoadInfo.getLabel().equals(this.label),
+                "expected label=" + this.label + ", real label=" + 
txnLoadInfo.getLabel());
+        subTransactionStates.clear();
+        resetByTxnInfo(txnLoadInfo);
+        this.isTransactionBegan = true;
+        LOG.info("set txn load info in observer, label={}, txnId={}, dbId={}, 
timeoutTimestamp={}, allSubTxnNum={}, "
+                + "subTxnStates={}", label, transactionId, dbId, 
timeoutTimestamp, allSubTxnNum, subTransactionStates);
+    }
+
+    private void resetByTxnInfo(TTxnLoadInfo txnLoadInfo) throws DdlException {
+        this.dbId = txnLoadInfo.getDbId();
+        this.database = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+        this.transactionId = txnLoadInfo.getTxnId();
+        this.timeoutTimestamp = txnLoadInfo.getTimeoutTimestamp();
+        this.allSubTxnNum = txnLoadInfo.getAllSubTxnNum();
+        for (TSubTxnInfo subTxnInfo : txnLoadInfo.getSubTxnInfos()) {
+            TableIf table = 
database.getTableOrDdlException(subTxnInfo.getTableId());
+            subTransactionStates.add(
+                    new SubTransactionState(subTxnInfo.getSubTxnId(), (Table) 
table,
+                            subTxnInfo.getTabletCommitInfos(),
+                            
SubTransactionState.getSubTransactionType(subTxnInfo.getSubTxnType())));
+        }
+    }
+
+    private TTxnLoadInfo getTxnLoadInfo() {
         TTxnLoadInfo txnLoadInfo = new TTxnLoadInfo();
         txnLoadInfo.setLabel(label);
         if (this.isTransactionBegan) {
             txnLoadInfo.setTxnId(transactionId);
             txnLoadInfo.setDbId(dbId);
             txnLoadInfo.setTimeoutTimestamp(timeoutTimestamp);
+            txnLoadInfo.setAllSubTxnNum(allSubTxnNum);
+            for (SubTransactionState subTxnState : subTransactionStates) {
+                txnLoadInfo.addToSubTxnInfos(new TSubTxnInfo()
+                        .setSubTxnId(subTxnState.getSubTransactionId())
+                        .setTableId(subTxnState.getTable().getId())
+                        
.setTabletCommitInfos(subTxnState.getTabletCommitInfos())
+                        
.setSubTxnType(SubTransactionState.getSubTransactionType(subTxnState.getSubTransactionType())));
+            }
         }
-        LOG.info("get txn load info in observer: {}", txnLoadInfo);
         return txnLoadInfo;
     }
 
-    public void setTxnLoadInfoInObserver(TTxnLoadInfo txnLoadInfo) {
-        Preconditions.checkState(txnLoadInfo.getLabel().equals(this.label),
-                "expected label=" + this.label + ", real label=" + 
txnLoadInfo.getLabel());
-        this.isTransactionBegan = true;
-        this.transactionId = txnLoadInfo.txnId;
-        this.timeoutTimestamp = txnLoadInfo.timeoutTimestamp;
-        this.dbId = txnLoadInfo.dbId;
-        LOG.info("set txn load info in observer, label={}, txnId={}, dbId={}, 
timeoutTimestamp={}",
-                label, transactionId, dbId, timeoutTimestamp);
-    }
-
     private Set<Long> getTableIds() {
-        List<SubTransactionState> subTransactionStatesPtr = 
Config.isCloudMode() ? subTransactionStates
-                : transactionState.getSubTransactionStates();
-        return subTransactionStatesPtr.stream().map(s -> 
s.getTable().getId()).collect(Collectors.toSet());
+        return subTransactionStates.stream().map(s -> 
s.getTable().getId()).collect(Collectors.toSet());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 0b30fc97262..c60081a4260 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -57,7 +57,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 public class TransactionState implements Writable {
     private static final Logger LOG = 
LogManager.getLogger(TransactionState.class);
@@ -309,8 +308,6 @@ public class TransactionState implements Writable {
     // table id -> schema info
     private Map<Long, SchemaInfo> txnSchemas = new HashMap<>();
 
-    @Getter
-    private List<SubTransactionState> subTransactionStates;
     @Getter
     @SerializedName(value = "sti")
     private List<Long> subTxnIds;
@@ -712,9 +709,6 @@ public class TransactionState implements Writable {
         if (idToTableCommitInfos != null && !idToTableCommitInfos.isEmpty()) {
             sb.append(", table commit info: ").append(idToTableCommitInfos);
         }
-        if (subTransactionStates != null) {
-            sb.append(", sub txn states: ").append(subTransactionStates);
-        }
         if (subTxnIds != null) {
             sb.append(", sub txn ids: ").append(subTxnIds);
         }
@@ -873,17 +867,8 @@ public class TransactionState implements Writable {
         return true;
     }
 
-    public void resetSubTransactionStates() {
-        this.subTransactionStates = new ArrayList<>();
-    }
-
-    public void setSubTransactionStates(List<SubTransactionState> 
subTransactionStates) {
-        this.subTransactionStates = subTransactionStates;
-    }
-
-    public void resetSubTxnIds() {
-        this.subTxnIds = 
subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
-                .collect(Collectors.toList());
+    public void setSubTxnIds(List<Long> subTxnIds) {
+        this.subTxnIds = subTxnIds;
     }
 
     public TableCommitInfo getTableCommitInfoBySubTxnId(long subTxnId) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index ac855012456..bb29a2a4dbd 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -354,7 +354,7 @@ public class DatabaseTransactionMgrTest {
         TransactionState transactionState6 = 
masterDbTransMgr.getTransactionState(
                 LabelToTxnId.get(CatalogTestUtil.testTxnLabel6));
         long transactionId6 = transactionState6.getTransactionId();
-        long subTransactionId3 = 
transactionState6.getSubTransactionStates().get(2).getSubTransactionId();
+        long subTransactionId3 = transactionState6.getSubTxnIds().get(2);
         TransactionState subTransactionState = 
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1,
                 subTransactionId3);
         Assert.assertEquals(transactionState6, subTransactionState);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 1ea73e5b31d..797507e47de 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -71,6 +71,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -650,6 +651,8 @@ public class GlobalTransactionMgrTest {
                 new SubTransactionInfo(table1, CatalogTestUtil.testTabletId1, 
allBackends));
         List<SubTransactionState> subTransactionStates = 
generateSubTransactionStates(transactionState,
                 subTransactionInfos);
+        
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+                .collect(Collectors.toList()));
         masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2), transactionId,
                 subTransactionStates, 300000);
         // check status is committed
@@ -718,6 +721,8 @@ public class GlobalTransactionMgrTest {
             List<SubTransactionState> subTransactionStates = 
generateSubTransactionStates(transactionState,
                     subTransactionInfos);
             // commit txn
+            
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+                    .collect(Collectors.toList()));
             masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
                     transactionId,
                     subTransactionStates, 300000);
@@ -759,6 +764,8 @@ public class GlobalTransactionMgrTest {
                     subTransactionInfos);
             // commit txn
             try {
+                
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+                        .collect(Collectors.toList()));
                 masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
                         transactionId, subTransactionStates, 300000);
                 Assert.fail();
@@ -793,6 +800,8 @@ public class GlobalTransactionMgrTest {
             List<SubTransactionState> subTransactionStates = 
generateSubTransactionStates(transactionState,
                     subTransactionInfos);
             // commit txn
+            
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+                    .collect(Collectors.toList()));
             masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
                     transactionId, subTransactionStates, 300000);
             Assert.assertEquals(TransactionStatus.COMMITTED, 
transactionState.getTransactionStatus());
@@ -907,6 +916,8 @@ public class GlobalTransactionMgrTest {
         List<SubTransactionState> subTransactionStates = 
generateSubTransactionStates(transactionState,
                 subTransactionInfos);
         // commit txn
+        
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+                .collect(Collectors.toList()));
         masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2), transactionId,
                 subTransactionStates, 300000);
         // check status is committed
@@ -928,15 +939,12 @@ public class GlobalTransactionMgrTest {
         // finish transaction
         Map<String, Map<Long, Long>> keyToSuccessTablets = new HashMap<>();
         DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets, 
allBackends,
-                
transactionState.getSubTransactionStates().get(0).getSubTransactionId(), 
CatalogTestUtil.testTabletId1,
-                14);
+                subTransactionStates.get(0).getSubTransactionId(), 
CatalogTestUtil.testTabletId1, 14);
         DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets, 
allBackends,
-                
transactionState.getSubTransactionStates().get(1).getSubTransactionId(), 
CatalogTestUtil.testTabletId2,
-                13);
+                subTransactionStates.get(1).getSubTransactionId(), 
CatalogTestUtil.testTabletId2, 13);
         DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
                 Lists.newArrayList(CatalogTestUtil.testBackendId2, 
CatalogTestUtil.testBackendId3),
-                
transactionState.getSubTransactionStates().get(2).getSubTransactionId(), 
CatalogTestUtil.testTabletId1,
-                15);
+                subTransactionStates.get(2).getSubTransactionId(), 
CatalogTestUtil.testTabletId1, 15);
         
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState, 
allBackends, keyToSuccessTablets);
         Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
         Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
@@ -1011,6 +1019,8 @@ public class GlobalTransactionMgrTest {
             List<SubTransactionState> subTransactionStates = 
generateSubTransactionStates(transactionState,
                     subTransactionInfos);
             // commit txn
+            
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+                    .collect(Collectors.toList()));
             masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, 
Lists.newArrayList(table1, table2),
                     transactionId, subTransactionStates, 300000);
             // check status is committed
@@ -1040,14 +1050,11 @@ public class GlobalTransactionMgrTest {
             // backend2, backend3 publish failed
             DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
                     Lists.newArrayList(CatalogTestUtil.testBackendId1),
-                    
transactionState.getSubTransactionStates().get(0).getSubTransactionId(),
-                    CatalogTestUtil.testTabletId1, 14);
+                    subTransactionStates.get(0).getSubTransactionId(), 
CatalogTestUtil.testTabletId1, 14);
             DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets, 
allBackends,
-                    
transactionState.getSubTransactionStates().get(1).getSubTransactionId(), 
CatalogTestUtil.testTabletId2,
-                    13);
+                    subTransactionStates.get(1).getSubTransactionId(), 
CatalogTestUtil.testTabletId2, 13);
             DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets, 
allBackends,
-                    
transactionState.getSubTransactionStates().get(2).getSubTransactionId(),
-                    CatalogTestUtil.testTabletId1, 15);
+                    subTransactionStates.get(2).getSubTransactionId(), 
CatalogTestUtil.testTabletId1, 15);
             
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState, 
allBackends, keyToSuccessTablets);
             LOG.info("publish tasks: {}", 
transactionState.getPublishVersionTasks());
             // finish transaction
@@ -1131,8 +1138,7 @@ public class GlobalTransactionMgrTest {
 
     protected static List<SubTransactionState> 
generateSubTransactionStates(GlobalTransactionMgr masterTransMgr,
             TransactionState transactionState, List<SubTransactionInfo> 
subTransactionInfos) {
-        transactionState.resetSubTransactionStates();
-        List<SubTransactionState> subTransactionStates = 
transactionState.getSubTransactionStates();
+        List<SubTransactionState> subTransactionStates = new ArrayList<>();
         for (int i = 0; i < subTransactionInfos.size(); i++) {
             SubTransactionInfo subTransactionInfo = subTransactionInfos.get(i);
             Table table = subTransactionInfo.table;
@@ -1145,8 +1151,9 @@ public class GlobalTransactionMgrTest {
             
subTransactionStates.add(generateSubTransactionState(transactionState, 
subTxnId, table,
                     tabletId, backends, addTableId));
         }
-        transactionState.resetSubTxnIds();
-        LOG.info("sub txn states={}", 
transactionState.getSubTransactionStates());
+        transactionState.setSubTxnIds(
+                subTransactionInfos.stream().map(sub -> 
sub.subTxnId).collect(Collectors.toList()));
+        LOG.info("sub txn states={}", subTransactionInfos);
         return subTransactionStates;
     }
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index ed0f333b958..96ec7222b44 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -517,11 +517,25 @@ struct TFeResult {
     1001: optional bool noAuth
 }
 
+enum TSubTxnType {
+    INSERT = 0,
+    DELETE = 1
+}
+
+struct TSubTxnInfo {
+    1: optional i64 sub_txn_id
+    2: optional i64 table_id
+    3: optional list<Types.TTabletCommitInfo> tablet_commit_infos
+    4: optional TSubTxnType sub_txn_type
+}
+
 struct TTxnLoadInfo {
     1: optional string label
     2: optional i64 dbId
     3: optional i64 txnId
     4: optional i64 timeoutTimestamp
+    5: optional i64 allSubTxnNum
+    6: optional list<TSubTxnInfo> subTxnInfos
 }
 
 struct TMasterOpRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to