This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c26e7d70750 [fix](txn insert) Fix txn insert connect to observer fe in
cloud mode (#36615)
c26e7d70750 is described below
commit c26e7d70750c842642bde00c2e9d3d9339ff9697
Author: meiyi <[email protected]>
AuthorDate: Fri Jun 21 22:13:12 2024 +0800
[fix](txn insert) Fix txn insert connect to observer fe in cloud mode
(#36615)
## Proposed changes
For txn insert, if connect to observer fe, the sql is forward to master
fe, but the txn context is lost. This pr fix it.
---
.../doris/transaction/SubTransactionState.java | 23 +++++
.../apache/doris/transaction/TransactionEntry.java | 115 ++++++++++++---------
.../apache/doris/transaction/TransactionState.java | 19 +---
.../transaction/DatabaseTransactionMgrTest.java | 2 +-
.../transaction/GlobalTransactionMgrTest.java | 39 ++++---
gensrc/thrift/FrontendService.thrift | 14 +++
6 files changed, 131 insertions(+), 81 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/SubTransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/SubTransactionState.java
index 2ee8c2df59b..43a12329f15 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/SubTransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/SubTransactionState.java
@@ -18,6 +18,7 @@
package org.apache.doris.transaction;
import org.apache.doris.catalog.Table;
+import org.apache.doris.thrift.TSubTxnType;
import org.apache.doris.thrift.TTabletCommitInfo;
import lombok.Getter;
@@ -47,6 +48,28 @@ public class SubTransactionState {
this.subTransactionType = subTransactionType;
}
+ public static SubTransactionType getSubTransactionType(TSubTxnType
subTxnType) {
+ switch (subTxnType) {
+ case INSERT:
+ return SubTransactionType.INSERT;
+ case DELETE:
+ return SubTransactionType.DELETE;
+ default:
+ throw new IllegalArgumentException("Unknown sub txn type: " +
subTxnType);
+ }
+ }
+
+ public static TSubTxnType getSubTransactionType(SubTransactionType
subTxnType) {
+ switch (subTxnType) {
+ case INSERT:
+ return TSubTxnType.INSERT;
+ case DELETE:
+ return TSubTxnType.DELETE;
+ default:
+ throw new IllegalArgumentException("Unknown sub txn type: " +
subTxnType);
+ }
+ }
+
@Override
public String toString() {
return "SubTransactionState{"
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index df68685134c..e9db8e3f58f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -43,6 +43,7 @@ import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
import org.apache.doris.thrift.TLoadTxnBeginResult;
+import org.apache.doris.thrift.TSubTxnInfo;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TTxnLoadInfo;
import org.apache.doris.thrift.TTxnParams;
@@ -85,9 +86,6 @@ public class TransactionEntry {
private long transactionId = -1;
private TransactionState transactionState;
private long timeoutTimestamp = -1;
- // 1. For cloud mode, we keep subTransactionStates in TransactionEntry;
- // 2. For doris, we keep subTransactionStates in TransactionState, because
if executed in observer,
- // the dml statements will be forwarded to master, so keep the
subTransactionStates is in master.
private List<SubTransactionState> subTransactionStates = new ArrayList<>();
// Used for cloud mode, including all successful or failed sub
transactions except the first one
private long allSubTxnNum = 0;
@@ -222,7 +220,6 @@ public class TransactionEntry {
this.dbId = this.database.getId();
this.transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(database.getId(), transactionId);
- this.transactionState.resetSubTransactionStates();
return this.transactionId;
} else {
if (this.database.getId() != database.getId()) {
@@ -253,13 +250,15 @@ public class TransactionEntry {
public TransactionStatus commitTransaction() throws Exception {
if (isTransactionBegan) {
try {
- if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
+ // cloud mode does not commit on observer fe because
CloudGlobalTransactionMgr#afterCommitTxnResp
+ // will produce event
+ if (Env.getCurrentEnv().isMaster()) {
beforeFinishTransaction();
// the report_tablet_interval_seconds is default 60
long commitTimeout = Math.min(Config.commit_timeout_second
* 1000L,
Math.max(timeoutTimestamp -
System.currentTimeMillis(), 0));
if
(Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(database,
transactionId,
- transactionState.getSubTransactionStates(),
commitTimeout)) {
+ subTransactionStates, commitTimeout)) {
return TransactionStatus.VISIBLE;
} else {
return TransactionStatus.COMMITTED;
@@ -336,13 +335,20 @@ public class TransactionEntry {
}
}
- private void beforeFinishTransaction() {
+ private void beforeFinishTransaction() throws DdlException {
if (isTransactionBegan) {
- List<Long> tableIds =
transactionState.getTableIdList().stream().distinct().collect(Collectors.toList());
- transactionState.setTableIdList(tableIds);
- List<SubTransactionState> subTransactionStatesPtr =
Config.isCloudMode() ? subTransactionStates
- : transactionState.getSubTransactionStates();
- subTransactionStatesPtr.sort((s1, s2) -> {
+ if (Config.isCloudMode()) {
+ // null if this is observer
+ if (transactionState == null) {
+ database =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+ transactionState =
Env.getCurrentGlobalTransactionMgr().getTransactionState(dbId, transactionId);
+ }
+ } else {
+ List<Long> tableIds =
transactionState.getTableIdList().stream().distinct()
+ .collect(Collectors.toList());
+ transactionState.setTableIdList(tableIds);
+ }
+ subTransactionStates.sort((s1, s2) -> {
if (s1.getSubTransactionType() == SubTransactionType.INSERT
&& s2.getSubTransactionType() ==
SubTransactionType.DELETE) {
return 1;
@@ -353,11 +359,9 @@ public class TransactionEntry {
return Long.compare(s1.getSubTransactionId(),
s2.getSubTransactionId());
}
});
- if (Config.isCloudMode()) {
-
transactionState.setSubTransactionStates(subTransactionStatesPtr);
- }
- LOG.info("subTransactionStates={}",
transactionState.getSubTransactionStates());
- transactionState.resetSubTxnIds();
+ LOG.info("subTransactionStates={}", subTransactionStates);
+
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+ .collect(Collectors.toList()));
}
}
@@ -395,10 +399,7 @@ public class TransactionEntry {
LOG.debug("label={}, txn_id={}, sub_txn_id={}, table={},
commit_infos={}",
label, transactionId, subTxnId, table, commitInfos);
}
- List<SubTransactionState> subTransactionStatesPtr =
Config.isCloudMode() ? subTransactionStates
- : transactionState.getSubTransactionStates();
- subTransactionStatesPtr
- .add(new SubTransactionState(subTxnId, table, commitInfos,
subTransactionType));
+ subTransactionStates.add(new SubTransactionState(subTxnId, table,
commitInfos, subTransactionType));
}
public boolean isTransactionBegan() {
@@ -443,27 +444,22 @@ public class TransactionEntry {
this.setTxnConf(new TTxnParams().setNeedTxn(true).setTxnId(-1));
this.label = txnLoadInfo.getLabel();
if (txnLoadInfo.isSetTxnId()) {
- this.dbId = txnLoadInfo.getDbId();
- this.database =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
- this.transactionId = txnLoadInfo.getTxnId();
+ Preconditions.checkState(subTransactionStates.isEmpty(),
+ "subTxnStates is not empty: " + subTransactionStates);
+ resetByTxnInfo(txnLoadInfo);
this.transactionState =
Env.getCurrentGlobalTransactionMgr().getTransactionState(dbId, transactionId);
Preconditions.checkNotNull(this.transactionState,
"db_id" + dbId + " txn_id=" + transactionId + " not
found");
Preconditions.checkState(this.label.equals(this.transactionState.getLabel()),
"expected label="
+ this.label + ", real label=" +
this.transactionState.getLabel());
this.isTransactionBegan = true;
- this.timeoutTimestamp = txnLoadInfo.getTimeoutTimestamp();
}
- LOG.info("set txn info in master, label={}, txnId={}, dbId={},
timeoutTimestamp={}",
- label, transactionId, dbId, timeoutTimestamp);
+ LOG.info("set txn info in master, label={}, txnId={}, dbId={},
timeoutTimestamp={}, allSubTxnNum={}, "
+ + "subTxnStates={}", label, transactionId, dbId,
timeoutTimestamp, allSubTxnNum, subTransactionStates);
}
public TTxnLoadInfo getTxnInfoInMaster() {
- TTxnLoadInfo txnLoadInfo = new TTxnLoadInfo();
- txnLoadInfo.setLabel(label);
- txnLoadInfo.setTxnId(transactionId);
- txnLoadInfo.setDbId(dbId);
- txnLoadInfo.setTimeoutTimestamp(timeoutTimestamp);
+ TTxnLoadInfo txnLoadInfo = getTxnLoadInfo();
LOG.info("master return txn info: {}", txnLoadInfo);
return txnLoadInfo;
}
@@ -473,31 +469,56 @@ public class TransactionEntry {
throw new AnalysisException(
"Transaction insert can not insert into values and insert
into select at the same time");
}
+ TTxnLoadInfo txnLoadInfo = getTxnLoadInfo();
+ LOG.info("get txn load info in observer: {}", txnLoadInfo);
+ return txnLoadInfo;
+ }
+
+ public void setTxnLoadInfoInObserver(TTxnLoadInfo txnLoadInfo) throws
DdlException {
+ Preconditions.checkState(txnLoadInfo.getLabel().equals(this.label),
+ "expected label=" + this.label + ", real label=" +
txnLoadInfo.getLabel());
+ subTransactionStates.clear();
+ resetByTxnInfo(txnLoadInfo);
+ this.isTransactionBegan = true;
+ LOG.info("set txn load info in observer, label={}, txnId={}, dbId={},
timeoutTimestamp={}, allSubTxnNum={}, "
+ + "subTxnStates={}", label, transactionId, dbId,
timeoutTimestamp, allSubTxnNum, subTransactionStates);
+ }
+
+ private void resetByTxnInfo(TTxnLoadInfo txnLoadInfo) throws DdlException {
+ this.dbId = txnLoadInfo.getDbId();
+ this.database =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+ this.transactionId = txnLoadInfo.getTxnId();
+ this.timeoutTimestamp = txnLoadInfo.getTimeoutTimestamp();
+ this.allSubTxnNum = txnLoadInfo.getAllSubTxnNum();
+ for (TSubTxnInfo subTxnInfo : txnLoadInfo.getSubTxnInfos()) {
+ TableIf table =
database.getTableOrDdlException(subTxnInfo.getTableId());
+ subTransactionStates.add(
+ new SubTransactionState(subTxnInfo.getSubTxnId(), (Table)
table,
+ subTxnInfo.getTabletCommitInfos(),
+
SubTransactionState.getSubTransactionType(subTxnInfo.getSubTxnType())));
+ }
+ }
+
+ private TTxnLoadInfo getTxnLoadInfo() {
TTxnLoadInfo txnLoadInfo = new TTxnLoadInfo();
txnLoadInfo.setLabel(label);
if (this.isTransactionBegan) {
txnLoadInfo.setTxnId(transactionId);
txnLoadInfo.setDbId(dbId);
txnLoadInfo.setTimeoutTimestamp(timeoutTimestamp);
+ txnLoadInfo.setAllSubTxnNum(allSubTxnNum);
+ for (SubTransactionState subTxnState : subTransactionStates) {
+ txnLoadInfo.addToSubTxnInfos(new TSubTxnInfo()
+ .setSubTxnId(subTxnState.getSubTransactionId())
+ .setTableId(subTxnState.getTable().getId())
+
.setTabletCommitInfos(subTxnState.getTabletCommitInfos())
+
.setSubTxnType(SubTransactionState.getSubTransactionType(subTxnState.getSubTransactionType())));
+ }
}
- LOG.info("get txn load info in observer: {}", txnLoadInfo);
return txnLoadInfo;
}
- public void setTxnLoadInfoInObserver(TTxnLoadInfo txnLoadInfo) {
- Preconditions.checkState(txnLoadInfo.getLabel().equals(this.label),
- "expected label=" + this.label + ", real label=" +
txnLoadInfo.getLabel());
- this.isTransactionBegan = true;
- this.transactionId = txnLoadInfo.txnId;
- this.timeoutTimestamp = txnLoadInfo.timeoutTimestamp;
- this.dbId = txnLoadInfo.dbId;
- LOG.info("set txn load info in observer, label={}, txnId={}, dbId={},
timeoutTimestamp={}",
- label, transactionId, dbId, timeoutTimestamp);
- }
-
private Set<Long> getTableIds() {
- List<SubTransactionState> subTransactionStatesPtr =
Config.isCloudMode() ? subTransactionStates
- : transactionState.getSubTransactionStates();
- return subTransactionStatesPtr.stream().map(s ->
s.getTable().getId()).collect(Collectors.toSet());
+ return subTransactionStates.stream().map(s ->
s.getTable().getId()).collect(Collectors.toSet());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 0b30fc97262..c60081a4260 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -57,7 +57,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
public class TransactionState implements Writable {
private static final Logger LOG =
LogManager.getLogger(TransactionState.class);
@@ -309,8 +308,6 @@ public class TransactionState implements Writable {
// table id -> schema info
private Map<Long, SchemaInfo> txnSchemas = new HashMap<>();
- @Getter
- private List<SubTransactionState> subTransactionStates;
@Getter
@SerializedName(value = "sti")
private List<Long> subTxnIds;
@@ -712,9 +709,6 @@ public class TransactionState implements Writable {
if (idToTableCommitInfos != null && !idToTableCommitInfos.isEmpty()) {
sb.append(", table commit info: ").append(idToTableCommitInfos);
}
- if (subTransactionStates != null) {
- sb.append(", sub txn states: ").append(subTransactionStates);
- }
if (subTxnIds != null) {
sb.append(", sub txn ids: ").append(subTxnIds);
}
@@ -873,17 +867,8 @@ public class TransactionState implements Writable {
return true;
}
- public void resetSubTransactionStates() {
- this.subTransactionStates = new ArrayList<>();
- }
-
- public void setSubTransactionStates(List<SubTransactionState>
subTransactionStates) {
- this.subTransactionStates = subTransactionStates;
- }
-
- public void resetSubTxnIds() {
- this.subTxnIds =
subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
- .collect(Collectors.toList());
+ public void setSubTxnIds(List<Long> subTxnIds) {
+ this.subTxnIds = subTxnIds;
}
public TableCommitInfo getTableCommitInfoBySubTxnId(long subTxnId) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
index ac855012456..bb29a2a4dbd 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java
@@ -354,7 +354,7 @@ public class DatabaseTransactionMgrTest {
TransactionState transactionState6 =
masterDbTransMgr.getTransactionState(
LabelToTxnId.get(CatalogTestUtil.testTxnLabel6));
long transactionId6 = transactionState6.getTransactionId();
- long subTransactionId3 =
transactionState6.getSubTransactionStates().get(2).getSubTransactionId();
+ long subTransactionId3 = transactionState6.getSubTxnIds().get(2);
TransactionState subTransactionState =
masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1,
subTransactionId3);
Assert.assertEquals(transactionState6, subTransactionState);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 1ea73e5b31d..797507e47de 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -71,6 +71,7 @@ import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -650,6 +651,8 @@ public class GlobalTransactionMgrTest {
new SubTransactionInfo(table1, CatalogTestUtil.testTabletId1,
allBackends));
List<SubTransactionState> subTransactionStates =
generateSubTransactionStates(transactionState,
subTransactionInfos);
+
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+ .collect(Collectors.toList()));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2), transactionId,
subTransactionStates, 300000);
// check status is committed
@@ -718,6 +721,8 @@ public class GlobalTransactionMgrTest {
List<SubTransactionState> subTransactionStates =
generateSubTransactionStates(transactionState,
subTransactionInfos);
// commit txn
+
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+ .collect(Collectors.toList()));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
transactionId,
subTransactionStates, 300000);
@@ -759,6 +764,8 @@ public class GlobalTransactionMgrTest {
subTransactionInfos);
// commit txn
try {
+
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+ .collect(Collectors.toList()));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
transactionId, subTransactionStates, 300000);
Assert.fail();
@@ -793,6 +800,8 @@ public class GlobalTransactionMgrTest {
List<SubTransactionState> subTransactionStates =
generateSubTransactionStates(transactionState,
subTransactionInfos);
// commit txn
+
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+ .collect(Collectors.toList()));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
transactionId, subTransactionStates, 300000);
Assert.assertEquals(TransactionStatus.COMMITTED,
transactionState.getTransactionStatus());
@@ -907,6 +916,8 @@ public class GlobalTransactionMgrTest {
List<SubTransactionState> subTransactionStates =
generateSubTransactionStates(transactionState,
subTransactionInfos);
// commit txn
+
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+ .collect(Collectors.toList()));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2), transactionId,
subTransactionStates, 300000);
// check status is committed
@@ -928,15 +939,12 @@ public class GlobalTransactionMgrTest {
// finish transaction
Map<String, Map<Long, Long>> keyToSuccessTablets = new HashMap<>();
DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
allBackends,
-
transactionState.getSubTransactionStates().get(0).getSubTransactionId(),
CatalogTestUtil.testTabletId1,
- 14);
+ subTransactionStates.get(0).getSubTransactionId(),
CatalogTestUtil.testTabletId1, 14);
DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
allBackends,
-
transactionState.getSubTransactionStates().get(1).getSubTransactionId(),
CatalogTestUtil.testTabletId2,
- 13);
+ subTransactionStates.get(1).getSubTransactionId(),
CatalogTestUtil.testTabletId2, 13);
DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
Lists.newArrayList(CatalogTestUtil.testBackendId2,
CatalogTestUtil.testBackendId3),
-
transactionState.getSubTransactionStates().get(2).getSubTransactionId(),
CatalogTestUtil.testTabletId1,
- 15);
+ subTransactionStates.get(2).getSubTransactionId(),
CatalogTestUtil.testTabletId1, 15);
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
allBackends, keyToSuccessTablets);
Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
@@ -1011,6 +1019,8 @@ public class GlobalTransactionMgrTest {
List<SubTransactionState> subTransactionStates =
generateSubTransactionStates(transactionState,
subTransactionInfos);
// commit txn
+
transactionState.setSubTxnIds(subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
+ .collect(Collectors.toList()));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1,
Lists.newArrayList(table1, table2),
transactionId, subTransactionStates, 300000);
// check status is committed
@@ -1040,14 +1050,11 @@ public class GlobalTransactionMgrTest {
// backend2, backend3 publish failed
DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
Lists.newArrayList(CatalogTestUtil.testBackendId1),
-
transactionState.getSubTransactionStates().get(0).getSubTransactionId(),
- CatalogTestUtil.testTabletId1, 14);
+ subTransactionStates.get(0).getSubTransactionId(),
CatalogTestUtil.testTabletId1, 14);
DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
allBackends,
-
transactionState.getSubTransactionStates().get(1).getSubTransactionId(),
CatalogTestUtil.testTabletId2,
- 13);
+ subTransactionStates.get(1).getSubTransactionId(),
CatalogTestUtil.testTabletId2, 13);
DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
allBackends,
-
transactionState.getSubTransactionStates().get(2).getSubTransactionId(),
- CatalogTestUtil.testTabletId1, 15);
+ subTransactionStates.get(2).getSubTransactionId(),
CatalogTestUtil.testTabletId1, 15);
DatabaseTransactionMgrTest.setTransactionFinishPublish(transactionState,
allBackends, keyToSuccessTablets);
LOG.info("publish tasks: {}",
transactionState.getPublishVersionTasks());
// finish transaction
@@ -1131,8 +1138,7 @@ public class GlobalTransactionMgrTest {
protected static List<SubTransactionState>
generateSubTransactionStates(GlobalTransactionMgr masterTransMgr,
TransactionState transactionState, List<SubTransactionInfo>
subTransactionInfos) {
- transactionState.resetSubTransactionStates();
- List<SubTransactionState> subTransactionStates =
transactionState.getSubTransactionStates();
+ List<SubTransactionState> subTransactionStates = new ArrayList<>();
for (int i = 0; i < subTransactionInfos.size(); i++) {
SubTransactionInfo subTransactionInfo = subTransactionInfos.get(i);
Table table = subTransactionInfo.table;
@@ -1145,8 +1151,9 @@ public class GlobalTransactionMgrTest {
subTransactionStates.add(generateSubTransactionState(transactionState,
subTxnId, table,
tabletId, backends, addTableId));
}
- transactionState.resetSubTxnIds();
- LOG.info("sub txn states={}",
transactionState.getSubTransactionStates());
+ transactionState.setSubTxnIds(
+ subTransactionInfos.stream().map(sub ->
sub.subTxnId).collect(Collectors.toList()));
+ LOG.info("sub txn states={}", subTransactionInfos);
return subTransactionStates;
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index ed0f333b958..96ec7222b44 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -517,11 +517,25 @@ struct TFeResult {
1001: optional bool noAuth
}
+enum TSubTxnType {
+ INSERT = 0,
+ DELETE = 1
+}
+
+struct TSubTxnInfo {
+ 1: optional i64 sub_txn_id
+ 2: optional i64 table_id
+ 3: optional list<Types.TTabletCommitInfo> tablet_commit_infos
+ 4: optional TSubTxnType sub_txn_type
+}
+
struct TTxnLoadInfo {
1: optional string label
2: optional i64 dbId
3: optional i64 txnId
4: optional i64 timeoutTimestamp
+ 5: optional i64 allSubTxnNum
+ 6: optional list<TSubTxnInfo> subTxnInfos
}
struct TMasterOpRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]