This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c2728865406 [fix](txn-insert) Txn insert support ccr (#36859)
c2728865406 is described below
commit c2728865406763d89252e706481807cc4235cff8
Author: meiyi <[email protected]>
AuthorDate: Tue Jul 9 09:25:39 2024 +0800
[fix](txn-insert) Txn insert support ccr (#36859)
## Proposed changes
---
.../java/org/apache/doris/binlog/UpsertRecord.java | 38 +++-
.../apache/doris/service/FrontendServiceImpl.java | 49 ++++-
gensrc/thrift/FrontendService.thrift | 7 +
.../apache/doris/regression/json/BinlogData.groovy | 17 +-
.../apache/doris/regression/suite/Syncer.groovy | 142 +++++++++----
.../doris/regression/suite/SyncerContext.groovy | 9 +-
.../doris/regression/util/SyncerUtils.groovy | 21 +-
.../suites/ccr_syncer_p0/test_txn_insert.groovy | 234 +++++++++++++++++++++
8 files changed, 447 insertions(+), 70 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
index cdfe8550d4f..320784922a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
@@ -45,6 +45,9 @@ public class UpsertRecord {
@SerializedName(value = "isTempPartition")
public boolean isTemp;
+
+ @SerializedName(value = "stid")
+ public long subTxnId;
}
@SerializedName(value = "partitionRecords")
@@ -58,8 +61,13 @@ public class UpsertRecord {
this.indexIds = indexIds;
}
- public void addPartitionRecord(PartitionCommitInfo
partitionCommitInfo) {
+ private void addPartitionRecord(PartitionCommitInfo
partitionCommitInfo) {
+ addPartitionRecord(-1, partitionCommitInfo);
+ }
+
+ private void addPartitionRecord(long subTxnId, PartitionCommitInfo
partitionCommitInfo) {
PartitionRecord partitionRecord = new PartitionRecord();
+ partitionRecord.subTxnId = subTxnId;
partitionRecord.partitionId = partitionCommitInfo.getPartitionId();
partitionRecord.range = partitionCommitInfo.getPartitionRange();
partitionRecord.version = partitionCommitInfo.getVersion();
@@ -87,6 +95,8 @@ public class UpsertRecord {
// pair is (tableId, tableRecord)
@SerializedName(value = "tableRecords")
private Map<Long, TableRecord> tableRecords;
+ @SerializedName(value = "stids")
+ private List<Long> subTxnIds;
// construct from TransactionState
public UpsertRecord(long commitSeq, TransactionState state) {
@@ -98,13 +108,25 @@ public class UpsertRecord {
tableRecords = Maps.newHashMap();
Map<Long, Set<Long>> loadedTableIndexIds = state.getLoadedTblIndexes();
- for (TableCommitInfo info : state.getIdToTableCommitInfos().values()) {
- Set<Long> indexIds = loadedTableIndexIds.get(info.getTableId());
- TableRecord tableRecord = new TableRecord(indexIds);
- tableRecords.put(info.getTableId(), tableRecord);
-
- for (PartitionCommitInfo partitionCommitInfo :
info.getIdToPartitionCommitInfo().values()) {
- tableRecord.addPartitionRecord(partitionCommitInfo);
+ if (state.getSubTxnIds() != null) {
+ state.getSubTxnIdToTableCommitInfo().forEach((subTxnId,
tableCommitInfo) -> {
+ Set<Long> indexIds =
loadedTableIndexIds.get(tableCommitInfo.getTableId());
+ TableRecord tableRecord =
tableRecords.compute(tableCommitInfo.getTableId(),
+ (k, v) -> v == null ? new TableRecord(indexIds) : v);
+ for (PartitionCommitInfo partitionCommitInfo :
tableCommitInfo.getIdToPartitionCommitInfo().values()) {
+ tableRecord.addPartitionRecord(subTxnId,
partitionCommitInfo);
+ }
+ });
+ subTxnIds = state.getSubTxnIds();
+ } else {
+ for (TableCommitInfo info :
state.getIdToTableCommitInfos().values()) {
+ Set<Long> indexIds =
loadedTableIndexIds.get(info.getTableId());
+ TableRecord tableRecord = new TableRecord(indexIds);
+ tableRecords.put(info.getTableId(), tableRecord);
+
+ for (PartitionCommitInfo partitionCommitInfo :
info.getIdToPartitionCommitInfo().values()) {
+ tableRecord.addPartitionRecord(partitionCommitInfo);
+ }
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 132378f6511..ab92cbd0d63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -231,6 +231,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
+import org.apache.doris.thrift.TSubTxnInfo;
import org.apache.doris.thrift.TSyncQueryColumns;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableMetadataNameIds;
@@ -245,6 +246,7 @@ import
org.apache.doris.thrift.TUpdateFollowerPartitionStatsCacheRequest;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
+import org.apache.doris.transaction.SubTransactionState;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -1265,6 +1267,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
try {
TBeginTxnResult tmpRes = beginTxnImpl(request, clientAddr);
result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
+ if (tmpRes.isSetSubTxnIds()) {
+ result.setSubTxnIds(tmpRes.getSubTxnIds());
+ }
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
LOG.warn("duplicate request for stream load. request id: {}, txn:
{}", e.getDuplicatedRequestId(),
@@ -1349,6 +1354,12 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// step 7: return result
TBeginTxnResult result = new TBeginTxnResult();
result.setTxnId(txnId).setDbId(db.getId());
+ if (request.isSetSubTxnNum() && request.getSubTxnNum() > 0) {
+ result.addToSubTxnIds(txnId);
+ for (int i = 0; i < request.getSubTxnNum() - 1; i++) {
+
result.addToSubTxnIds(Env.getCurrentGlobalTransactionMgr().getNextTransactionId());
+ }
+ }
return result;
}
@@ -1699,8 +1710,14 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (!request.isSetTxnId()) {
throw new UserException("txn_id is not set");
}
- if (!request.isSetCommitInfos()) {
- throw new UserException("commit_infos is not set");
+ if (request.isSetTxnInsert() && request.isTxnInsert()) {
+ if (!request.isSetSubTxnInfos()) {
+ throw new UserException("sub_txn_infos is not set");
+ }
+ } else {
+ if (!request.isSetCommitInfos()) {
+ throw new UserException("commit_infos is not set");
+ }
}
// Step 1: get && check database
@@ -1750,11 +1767,29 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
long timeoutMs = request.isSetThriftRpcTimeoutMs() ?
request.getThriftRpcTimeoutMs() / 2 : 5000;
// Step 5: commit and publish
- return Env.getCurrentGlobalTransactionMgr()
- .commitAndPublishTransaction(db, tableList,
- request.getTxnId(),
- TabletCommitInfo.fromThrift(request.getCommitInfos()),
timeoutMs,
-
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ if (request.isSetTxnInsert() && request.isTxnInsert()) {
+ List<Long> subTxnIds = new ArrayList<>();
+ List<SubTransactionState> subTransactionStates = new ArrayList<>();
+ for (TSubTxnInfo subTxnInfo : request.getSubTxnInfos()) {
+ TableIf table = db.getTableNullable(subTxnInfo.getTableId());
+ if (table == null) {
+ continue;
+ }
+ subTxnIds.add(subTxnInfo.getSubTxnId());
+ subTransactionStates.add(
+ new SubTransactionState(subTxnInfo.getSubTxnId(),
(Table) table,
+ subTxnInfo.getTabletCommitInfos(), null));
+ }
+ transactionState.setSubTxnIds(subTxnIds);
+ return
Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(db,
request.getTxnId(),
+ subTransactionStates, timeoutMs);
+ } else {
+ return Env.getCurrentGlobalTransactionMgr()
+ .commitAndPublishTransaction(db, tableList,
+ request.getTxnId(),
+
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+ }
}
@Override
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index af13976d619..2867e15c3c1 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -654,6 +654,8 @@ struct TBeginTxnRequest {
10: optional Types.TUniqueId request_id
11: optional string token
12: optional i64 backend_id
+ // used for ccr
+ 13: optional i64 sub_txn_num = 0
}
struct TBeginTxnResult {
@@ -662,6 +664,8 @@ struct TBeginTxnResult {
3: optional string job_status // if label already used, set status of
existing job
4: optional i64 db_id
5: optional Types.TNetworkAddress master_address
+ // used for ccr
+ 6: optional list<i64> sub_txn_ids
}
// StreamLoad request, used to load a streaming to engine
@@ -832,6 +836,9 @@ struct TCommitTxnRequest {
10: optional i64 thrift_rpc_timeout_ms
11: optional string token
12: optional i64 db_id
+ // used for ccr
+ 13: optional bool txn_insert
+ 14: optional list<TSubTxnInfo> sub_txn_infos
}
struct TCommitTxnResult {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
index 8ccec9a6669..6acbe0dcb8e 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
@@ -20,9 +20,10 @@ package org.apache.doris.regression.json
class PartitionData {
public Long partitionId
public Long version
+ public Long stid // sub txn id
String toString() {
- return "(" + partitionId.toString() + ", " + version.toString() + ")"
+ return "(" + partitionId.toString() + ", " + version.toString() + ", "
+ stid + ")"
}
}
@@ -51,13 +52,17 @@ class BinlogData {
public String label
public Long dbId
public Map<Long, PartitionRecords> tableRecords
+ public List<Long> stids
String toString() {
- return "(" + commitSeq.toString() + ", " +
- txnId.toString() + ", " +
- timeStamp + label + ", " +
- dbId.toString() + ", " +
- tableRecords.toString()
+ return "(commitSeq: " + commitSeq
+ + ", txnId: " + txnId
+ + ", timestamp: " + timeStamp
+ + ", label: " + label
+ + ", dbId: " + dbId
+ +", subTxnIds: " + stids
+ + ", tableRecords: " + tableRecords
+ +")"
}
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 64ebeb03a0a..f17fff2aadd 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -20,6 +20,7 @@ package org.apache.doris.regression.suite
import com.google.common.collect.Maps
import com.google.gson.Gson
import org.apache.doris.regression.Config
+import org.apache.doris.regression.json.PartitionData
import org.apache.doris.regression.json.PartitionRecords
import org.apache.doris.regression.suite.client.BackendClientImpl
import org.apache.doris.regression.suite.client.FrontendClientImpl
@@ -38,6 +39,7 @@ import org.apache.doris.thrift.TNetworkAddress
import org.apache.doris.thrift.TRestoreSnapshotResult
import org.apache.doris.thrift.TStatus
import org.apache.doris.thrift.TStatusCode
+import org.apache.doris.thrift.TSubTxnInfo
import org.apache.doris.thrift.TTabletCommitInfo
import org.apache.doris.thrift.TUniqueId
import org.apache.thrift.transport.TTransportException
@@ -114,7 +116,14 @@ class Syncer {
Gson gson = new Gson()
context.lastBinlog = gson.fromJson(data, BinlogData.class)
logger.info("Source lastBinlog: ${context.lastBinlog}")
-
+ if (context.lastBinlog.stids != null) {
+ context.sourceSubTxnIds = context.lastBinlog.stids as List
+ }
+ logger.info("source subTxnIds: ${context.sourceSubTxnIds}")
+ context.txnInsert = context.sourceSubTxnIds.size() > 0
+ context.targetSubTxnIds.clear()
+ context.sourceToTargetSubTxnId.clear()
+ context.subTxnInfos.clear()
return getSourceMeta(table)
}
} else {
@@ -206,6 +215,20 @@ class Syncer {
if (isCheckedOK && result.isSetTxnId()) {
logger.info("Begin transaction id is ${result.getTxnId()}")
context.txnId = result.getTxnId()
+ if (result.getSubTxnIds() != null) {
+ context.targetSubTxnIds.addAll(result.getSubTxnIds())
+ }
+ if (context.targetSubTxnIds.size() !=
context.sourceSubTxnIds.size()) {
+ logger.error("source subTxnIds size is not equal to target
subTxnIds size, " +
+ "source: ${context.sourceSubTxnIds}, target:
${context.targetSubTxnIds}")
+ isCheckedOK = false
+ }
+ if (isCheckedOK) {
+ for (int i = 0; i < context.sourceSubTxnIds.size(); ++i) {
+
context.sourceToTargetSubTxnId.put(context.sourceSubTxnIds[i],
context.targetSubTxnIds[i])
+ }
+ }
+ logger.info("sourceToTargetSubTxnId:
${context.sourceToTargetSubTxnId}")
} else {
logger.error("Begin transaction txnId is unset!")
isCheckedOK = false
@@ -694,10 +717,10 @@ class Syncer {
Boolean checkTargetVersion() {
logger.info("Check target tablets version")
- context.targetTableMap.values().forEach {
+ return context.targetTableMap.values().every {
String baseSQL = "SHOW PROC '/dbs/" +
context.targetDbId.toString() + "/" +
it.id.toString() + "/partitions/"
- it.partitionMap.forEach((id, meta) -> {
+ return it.partitionMap.every((id, meta) -> {
String versionSQL = baseSQL + id.toString() + "/" +
meta.indexId.toString()
List<List<Object>> sqlInfo = suite.target_sql(versionSQL + "'")
for (List<Object> row : sqlInfo) {
@@ -709,10 +732,9 @@ class Syncer {
return false
}
}
+ return true
})
}
-
- return true
}
Boolean getBinlog(String table = "", Boolean update = true) {
@@ -733,7 +755,8 @@ class Syncer {
if (context.sourceTableMap.containsKey(table)) {
tableId = context.targetTableMap.get(table).id
}
- TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context,
tableId)
+ TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context,
tableId, context.sourceSubTxnIds.size())
+ logger.info("begin txn result: ${result}")
return checkBeginTxn(result)
}
@@ -750,6 +773,12 @@ class Syncer {
// step 2: Begin ingest binlog
// step 2.1: ingest each table in meta
+
+ // sub txn id to tablet commit info
+ Map<Long, List<TTabletCommitInfo>> subTxnIdToTabletCommitInfos = new
HashMap<Long, List<TTabletCommitInfo>>()
+ // sub txn id to table id
+ Map<Long, Long> subTxnIdToTableId = new HashMap<Long, Long>()
+
for (Entry<String, TableMeta> tableInfo : context.sourceTableMap) {
String tableName = tableInfo.key
TableMeta srcTableMeta = tableInfo.value
@@ -772,51 +801,76 @@ class Syncer {
continue
}
- Iterator srcTabletIter =
srcPartition.value.tabletMeta.iterator()
- Iterator tarTabletIter =
tarPartition.value.tabletMeta.iterator()
-
- // step 2.3: ingest each tablet in the partition
- while (srcTabletIter.hasNext()) {
- Entry srcTabletMap = srcTabletIter.next()
- Entry tarTabletMap = tarTabletIter.next()
-
- BackendClientImpl srcClient =
context.sourceBackendClients.get(srcTabletMap.value)
- if (srcClient == null) {
- logger.error("Can't find src
tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
- return false
- }
- BackendClientImpl tarClient =
context.targetBackendClients.get(tarTabletMap.value)
- if (tarClient == null) {
- logger.error("Can't find target
tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
- return false
+ logger.info("Partition records: ${binlogRecords}")
+ for (PartitionData partitionRecord :
binlogRecords.partitionRecords) {
+ if (partitionRecord.partitionId != srcPartition.key) {
+ continue
}
+ logger.info("Partition record: ${partitionRecord}")
+ long txnId = partitionRecord.stid == -1 ? context.txnId :
context.sourceToTargetSubTxnId.get(partitionRecord.stid)
+ // step 2.3: ingest each tablet in the partition
+ Iterator srcTabletIter =
srcPartition.value.tabletMeta.iterator()
+ Iterator tarTabletIter =
tarPartition.value.tabletMeta.iterator()
+ while (srcTabletIter.hasNext()) {
+ Entry srcTabletMap = srcTabletIter.next()
+ Entry tarTabletMap = tarTabletIter.next()
+
+ BackendClientImpl srcClient =
context.sourceBackendClients.get(srcTabletMap.value)
+ if (srcClient == null) {
+ logger.error("Can't find src
tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
+ return false
+ }
+ BackendClientImpl tarClient =
context.targetBackendClients.get(tarTabletMap.value)
+ if (tarClient == null) {
+ logger.error("Can't find target
tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
+ return false
+ }
- tarPartition.value.version = srcPartition.value.version
- long partitionId = fakePartitionId == -1 ?
tarPartition.key : fakePartitionId
- long version = fakeVersion == -1 ?
srcPartition.value.version : fakeVersion
-
- TIngestBinlogRequest request = new TIngestBinlogRequest()
- TUniqueId uid = new TUniqueId(-1, -1)
- request.setTxnId(context.txnId)
- request.setRemoteTabletId(srcTabletMap.key)
- request.setBinlogVersion(version)
- request.setRemoteHost(srcClient.address.hostname)
- request.setRemotePort(srcClient.httpPort.toString())
- request.setPartitionId(partitionId)
- request.setLocalTabletId(tarTabletMap.key)
- request.setLoadId(uid)
- logger.info("request -> ${request}")
- TIngestBinlogResult result =
tarClient.client.ingestBinlog(request)
- if (!checkIngestBinlog(result)) {
- logger.error("Ingest binlog error! result: ${result}")
- return false
- }
+ tarPartition.value.version = srcPartition.value.version
+ long partitionId = fakePartitionId == -1 ?
tarPartition.key : fakePartitionId
+ long version = fakeVersion == -1 ?
partitionRecord.version : fakeVersion
+
+ TIngestBinlogRequest request = new
TIngestBinlogRequest()
+ TUniqueId uid = new TUniqueId(-1, -1)
+ request.setTxnId(txnId)
+ request.setRemoteTabletId(srcTabletMap.key)
+ request.setBinlogVersion(version)
+ request.setRemoteHost(srcClient.address.hostname)
+ request.setRemotePort(srcClient.httpPort.toString())
+ request.setPartitionId(partitionId)
+ request.setLocalTabletId(tarTabletMap.key)
+ request.setLoadId(uid)
+ logger.info("request -> ${request}")
+ TIngestBinlogResult result =
tarClient.client.ingestBinlog(request)
+ if (!checkIngestBinlog(result)) {
+ logger.error("Ingest binlog error! result:
${result}")
+ return false
+ }
- addCommitInfo(tarTabletMap.key, tarTabletMap.value)
+ if (context.txnInsert) {
+ List<TTabletCommitInfo> tabletCommitInfos =
subTxnIdToTabletCommitInfos.get(txnId)
+ if (tabletCommitInfos == null) {
+ tabletCommitInfos = new
ArrayList<TTabletCommitInfo>()
+ subTxnIdToTabletCommitInfos.put(txnId,
tabletCommitInfos)
+ subTxnIdToTableId.put(txnId, tarTableMeta.id)
+ }
+ tabletCommitInfos.add(new
TTabletCommitInfo(tarTabletMap.key, tarTabletMap.value))
+ } else {
+ addCommitInfo(tarTabletMap.key, tarTabletMap.value)
+ }
+ }
}
}
}
+ if (context.txnInsert) {
+ for (long sourceSubTxnId : context.sourceSubTxnIds) {
+ long subTxnId =
context.sourceToTargetSubTxnId.get(sourceSubTxnId)
+ List<TTabletCommitInfo> tabletCommitInfos =
subTxnIdToTabletCommitInfos.get(subTxnId)
+ TSubTxnInfo subTxnInfo = new
TSubTxnInfo().setSubTxnId(subTxnId).setTableId(subTxnIdToTableId.get(subTxnId)).setTabletCommitInfos(tabletCommitInfos)
+ context.subTxnInfos.add(subTxnInfo)
+ }
+ }
return true
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
index 93108e67fc1..b86f012aa87 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
@@ -23,7 +23,8 @@ import
org.apache.doris.regression.suite.client.BackendClientImpl
import org.apache.doris.regression.suite.client.FrontendClientImpl
import org.apache.doris.thrift.TTabletCommitInfo
import org.apache.doris.thrift.TGetSnapshotResult
-import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TNetworkAddress
+import org.apache.doris.thrift.TSubTxnInfo
import com.google.gson.annotations.SerializedName
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -128,6 +129,12 @@ class SyncerContext {
public long txnId
public long seq
+ public boolean txnInsert = false
+ public List<Long> sourceSubTxnIds = new ArrayList<Long>()
+ public List<Long> targetSubTxnIds = new ArrayList<Long>()
+ public Map<Long, Long> sourceToTargetSubTxnId = new HashMap<Long, Long>()
+ public List<TSubTxnInfo> subTxnInfos = new ArrayList<TSubTxnInfo>()
+
SyncerContext(Suite suite, String dbName, Config config) {
this.suite = suite
this.sourceDbId = -1
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
index 3bb7be40e58..b1b320238a3 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
@@ -17,7 +17,6 @@
package org.apache.doris.regression.util
-
import org.apache.doris.regression.suite.SyncerContext
import org.apache.doris.regression.suite.client.BackendClientImpl
import org.apache.doris.regression.suite.client.FrontendClientImpl
@@ -34,11 +33,16 @@ import org.apache.doris.thrift.TIngestBinlogResult
import org.apache.doris.thrift.TRestoreSnapshotRequest
import org.apache.doris.thrift.TRestoreSnapshotResult
import org.apache.doris.thrift.TSnapshotType
+import org.apache.doris.thrift.TSubTxnInfo
import org.apache.thrift.TException
import org.apache.doris.thrift.TGetBinlogRequest
import org.apache.doris.thrift.TGetBinlogResult
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
class SyncerUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(SyncerUtils.class)
+
private static <T> void setAuthorInformation(T request, SyncerContext
context) {
request.setUser(context.user)
request.setPasswd(context.passwd)
@@ -62,7 +66,7 @@ class SyncerUtils {
return clientImpl.client.getBinlog(request)
}
- static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl,
SyncerContext context, Long tableId) throws TException {
+ static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl,
SyncerContext context, Long tableId, Long subTxnNum) throws TException {
TBeginTxnRequest request = new TBeginTxnRequest()
setAuthorInformation(request, context)
request.setDb("TEST_" + context.db)
@@ -70,8 +74,11 @@ class SyncerUtils {
request.addToTableIds(tableId)
}
request.setLabel(newLabel(context, tableId))
+ if (subTxnNum > 0) {
+ request.setSubTxnNum(subTxnNum)
+ }
+ logger.info("begin txn request: ${request}")
return clientImpl.client.beginTxn(request)
-
}
static TIngestBinlogResult ingestBinlog(BackendClientImpl clientImpl,
TIngestBinlogRequest request) throws TException {
@@ -82,8 +89,14 @@ class SyncerUtils {
TCommitTxnRequest request = new TCommitTxnRequest()
setAuthorInformation(request, context)
request.setDb("TEST_" + context.db)
- request.setCommitInfos(context.commitInfos)
request.setTxnId(context.txnId)
+ if (context.txnInsert) {
+ request.setTxnInsert(true)
+ request.setSubTxnInfos(context.subTxnInfos)
+ } else {
+ request.setCommitInfos(context.commitInfos)
+ }
+ logger.info("commit txn request: ${request}")
return clientImpl.client.commitTxn(request)
}
diff --git a/regression-test/suites/ccr_syncer_p0/test_txn_insert.groovy
b/regression-test/suites/ccr_syncer_p0/test_txn_insert.groovy
new file mode 100644
index 00000000000..069cfc92a32
--- /dev/null
+++ b/regression-test/suites/ccr_syncer_p0/test_txn_insert.groovy
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_txn_insert") {
+ def syncer = getSyncer()
+ if (!syncer.checkEnableFeatureBinlog()) {
+ logger.info("fe enable_feature_binlog is false, skip case
test_txn_case")
+ return
+ }
+ def txnTableName = "test_txn_insert"
+ for (int i = 0; i < 3; i++) {
+ sql "DROP TABLE IF EXISTS ${txnTableName}_${i} force"
+ sql """
+ CREATE TABLE if NOT EXISTS ${txnTableName}_${i}
+ (
+ `test` INT,
+ `id` INT
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(`test`, `id`)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+ sql """ALTER TABLE ${txnTableName}_${i} set ("binlog.enable" =
"true")"""
+ assertTrue(syncer.getSourceMeta("${txnTableName}_${i}"))
+
+ target_sql "DROP TABLE IF EXISTS ${txnTableName}_${i} force"
+ target_sql """
+ CREATE TABLE if NOT EXISTS ${txnTableName}_${i}
+ (
+ `test` INT,
+ `id` INT
+ )
+ ENGINE=OLAP
+ DUPLICATE KEY(`test`, `id`)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+ assertTrue(syncer.getTargetMeta("${txnTableName}_${i}"))
+ }
+
+ def sync = { String tableName ->
+ assertTrue(syncer.getBinlog("${tableName}"))
+ assertTrue(syncer.getBackendClients())
+ assertTrue(syncer.beginTxn("${tableName}"))
+ assertTrue(syncer.ingestBinlog())
+ assertTrue(syncer.commitTxn())
+ assertTrue(syncer.checkTargetVersion())
+ target_sql " sync "
+ }
+
+ def check_row_count = { String tableName, int count ->
+ def res = target_sql """SELECT count() FROM ${tableName}"""
+ logger.info("target row count: ${res}")
+ assertEquals(count, res[0][0])
+ }
+
+ // test duplicate table
+ logger.info("=== Test 1: insert values ===")
+ sql """ INSERT INTO ${txnTableName}_0 VALUES (1, 0) """
+ sync("${txnTableName}_0")
+ def res = target_sql """SELECT * FROM ${txnTableName}_0 WHERE test=1 """
+ assertEquals(res.size(), 1)
+
+ logger.info("=== Test 2: txn insert values ===")
+ sql """ begin """
+ sql """ INSERT INTO ${txnTableName}_0 VALUES (20, 0), (30, 0) """
+ sql """ INSERT INTO ${txnTableName}_0 VALUES (40, 0) """
+ sql """ commit """
+ sync("${txnTableName}_0")
+ check_row_count("${txnTableName}_0", 4)
+
+ logger.info("=== Test 3: txn insert select into 1 table ===")
+ sql """ begin """
+ sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+ sql """ commit """
+ sync("${txnTableName}_1")
+ check_row_count("${txnTableName}_1", 4)
+
+ logger.info("=== Test 4: txn insert select into 1 table twice ===")
+ sql """ begin """
+ sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+ sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+ sql """ commit """
+ sync("${txnTableName}_1")
+ check_row_count("${txnTableName}_1", 12)
+
+ logger.info("=== Test 5: txn insert select into 2 tables ===")
+ sql """ begin """
+ sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+ sql """ INSERT INTO ${txnTableName}_2 select * from ${txnTableName}_0 """
+ sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+ sql """ commit """
+ sync("${txnTableName}_1")
+ check_row_count("${txnTableName}_1", 20)
+ check_row_count("${txnTableName}_2", 4)
+
+ // test multi partitions
+ logger.info("=== Test 6: table with multi partitions ===")
+ sql """ DROP TABLE IF EXISTS ${txnTableName}_3 force """
+ sql """
+ CREATE TABLE if NOT EXISTS ${txnTableName}_3 (`test` INT, `id` INT)
+ DUPLICATE KEY(`test`)
+ PARTITION BY RANGE(test) ( FROM (1) TO (50) INTERVAL 10 )
+ DISTRIBUTED BY HASH(id) BUCKETS 2
+ PROPERTIES ( "replication_num" = "1" )
+ """
+ sql """ALTER TABLE ${txnTableName}_3 set ("binlog.enable" = "true")"""
+ assertTrue(syncer.getSourceMeta("${txnTableName}_3"))
+ target_sql "DROP TABLE IF EXISTS ${txnTableName}_3 force"
+ target_sql """
+ CREATE TABLE if NOT EXISTS ${txnTableName}_3 (`test` INT, `id` INT)
+ DUPLICATE KEY(`test`)
+ PARTITION BY RANGE(test) ( FROM (1) TO (50) INTERVAL 10 )
+ DISTRIBUTED BY HASH(id) BUCKETS 2
+ PROPERTIES ( "replication_num" = "1" )
+ """
+ assertTrue(syncer.getTargetMeta("${txnTableName}_3"))
+ sql """ set enable_insert_strict = false """
+ sql """ begin """
+ sql """ INSERT INTO ${txnTableName}_3 select * from ${txnTableName}_0 """
+ sql """ INSERT INTO ${txnTableName}_3 PARTITION (p_1_11, p_11_21) select *
from ${txnTableName}_0 """
+ sql """ INSERT INTO ${txnTableName}_3 PARTITION (p_31_41) select * from
${txnTableName}_0 """
+ sql """ commit """
+ sync("${txnTableName}_3")
+ check_row_count("${txnTableName}_3", 7)
+ sql """ set enable_insert_strict = true """
+
+ // delete and insert
+ logger.info("=== Test 7: delete and insert ===")
+ sql """ begin """
+ sql """ delete from ${txnTableName}_2 where test < 30 """
+ sql """ insert into ${txnTableName}_2 select * from ${txnTableName}_0
where test < 30 """
+ sql """ commit """
+ sync("${txnTableName}_2")
+ check_row_count("${txnTableName}_2", 4)
+
+ // insert and delete
+ /*logger.info("=== Test 8: insert and delete ===")
+ sql """ begin """
+ sql """ insert into ${txnTableName}_2 select * from ${txnTableName}_0
where test < 30 """
+ sql """ delete from ${txnTableName}_2 where test < 30 """
+ sql """ commit """
+ sync("${txnTableName}_2")
+ check_row_count("${txnTableName}_2", 4)*/
+
+ // mow table
+ logger.info("=== Test 9: mow table insert ===")
+ sql """ DROP TABLE IF EXISTS ${txnTableName}_u0 force """
+ sql """
+ CREATE TABLE if NOT EXISTS ${txnTableName}_u0 (`test` INT, `id` INT)
+ UNIQUE KEY(`test`)
+ DISTRIBUTED BY HASH(test) BUCKETS 2
+ PROPERTIES ( "replication_num" = "1" )
+ """
+ sql """ALTER TABLE ${txnTableName}_u0 set ("binlog.enable" = "true")"""
+ assertTrue(syncer.getSourceMeta("${txnTableName}_u0"))
+ target_sql "DROP TABLE IF EXISTS ${txnTableName}_u0 force"
+ target_sql """
+ CREATE TABLE if NOT EXISTS ${txnTableName}_u0 (`test` INT, `id` INT)
+ UNIQUE KEY(`test`)
+ DISTRIBUTED BY HASH(test) BUCKETS 2
+ PROPERTIES ( "replication_num" = "1" )
+ """
+ assertTrue(syncer.getTargetMeta("${txnTableName}_u0"))
+ sql """ insert into ${txnTableName}_0 values (1, 1) """
+ // target_sql """ insert into ${txnTableName}_0 values (1, 1) """
+ sql """ begin """
+ sql """ insert into ${txnTableName}_u0 select * from ${txnTableName}_1 """
+ sql """ insert into ${txnTableName}_u0 select * from ${txnTableName}_0 """
+ sql """ commit """
+ sync("${txnTableName}_u0")
+ check_row_count("${txnTableName}_u0", 4)
+ res = target_sql """SELECT * FROM ${txnTableName}_u0 WHERE test=1 """
+ assertEquals(res.size(), 1)
+ assertEquals(res[0][1], 1)
+
+ logger.info("=== Test 10: mow table update ===")
+ sql """ begin """
+ sql """ insert into ${txnTableName}_u0 select * from ${txnTableName}_0 """
+ sql """ update ${txnTableName}_u0 set id = id + 10 where test = 1 """
+ sql """ commit """
+ sync("${txnTableName}_u0")
+ check_row_count("${txnTableName}_u0", 4)
+ res = target_sql """SELECT * FROM ${txnTableName}_u0 WHERE test=1 """
+ assertEquals(res.size(), 1)
+ assertEquals(res[0][1], 11)
+
+ logger.info("=== Test 11: mow table delete from using ===")
+
+ // test table with multi indexes
+ logger.info("=== Test 12: table with multi indexes ===")
+ target_sql """ create materialized view mv_${txnTableName}_3 as select id
from ${txnTableName}_3; """
+ createMV """ create materialized view mv_${txnTableName}_3 as select id
from ${txnTableName}_3; """
+ res = sql """ select id from ${txnTableName}_3 """
+ assertEquals(res.size(), 7)
+ res = target_sql """ select id from ${txnTableName}_3 """
+ assertEquals(res.size(), 7)
+ /*sql """ begin """
+ sql """ insert into ${txnTableName}_3 select * from ${txnTableName}_0 """
+ sql """ insert into ${txnTableName}_3 select * from ${txnTableName}_0 """
+ sql """ commit """
+ sync("${txnTableName}_3")
+ check_row_count("${txnTableName}_3", 12)
+ res = sql """ select id from ${txnTableName}_3 """
+ assertEquals(res.size(), 12)
+ res = target_sql """ select id from ${txnTableName}_3 """
+ assertEquals(res.size(), 12)*/
+
+ // test schema change
+ // test one sub txn is error
+ // test only enable one table binlog
+
+ // End Test
+ syncer.closeBackendClients()
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]