This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c2728865406 [fix](txn-insert) Txn insert support ccr (#36859)
c2728865406 is described below

commit c2728865406763d89252e706481807cc4235cff8
Author: meiyi <[email protected]>
AuthorDate: Tue Jul 9 09:25:39 2024 +0800

    [fix](txn-insert) Txn insert support ccr (#36859)
    
    ## Proposed changes
---
 .../java/org/apache/doris/binlog/UpsertRecord.java |  38 +++-
 .../apache/doris/service/FrontendServiceImpl.java  |  49 ++++-
 gensrc/thrift/FrontendService.thrift               |   7 +
 .../apache/doris/regression/json/BinlogData.groovy |  17 +-
 .../apache/doris/regression/suite/Syncer.groovy    | 142 +++++++++----
 .../doris/regression/suite/SyncerContext.groovy    |   9 +-
 .../doris/regression/util/SyncerUtils.groovy       |  21 +-
 .../suites/ccr_syncer_p0/test_txn_insert.groovy    | 234 +++++++++++++++++++++
 8 files changed, 447 insertions(+), 70 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java 
b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
index cdfe8550d4f..320784922a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
@@ -45,6 +45,9 @@ public class UpsertRecord {
 
             @SerializedName(value = "isTempPartition")
             public boolean isTemp;
+
+            @SerializedName(value = "stid")
+            public long subTxnId;
         }
 
         @SerializedName(value = "partitionRecords")
@@ -58,8 +61,13 @@ public class UpsertRecord {
             this.indexIds = indexIds;
         }
 
-        public void addPartitionRecord(PartitionCommitInfo 
partitionCommitInfo) {
+        private void addPartitionRecord(PartitionCommitInfo 
partitionCommitInfo) {
+            addPartitionRecord(-1, partitionCommitInfo);
+        }
+
+        private void addPartitionRecord(long subTxnId, PartitionCommitInfo 
partitionCommitInfo) {
             PartitionRecord partitionRecord = new PartitionRecord();
+            partitionRecord.subTxnId = subTxnId;
             partitionRecord.partitionId = partitionCommitInfo.getPartitionId();
             partitionRecord.range = partitionCommitInfo.getPartitionRange();
             partitionRecord.version = partitionCommitInfo.getVersion();
@@ -87,6 +95,8 @@ public class UpsertRecord {
     // pair is (tableId, tableRecord)
     @SerializedName(value = "tableRecords")
     private Map<Long, TableRecord> tableRecords;
+    @SerializedName(value = "stids")
+    private List<Long> subTxnIds;
 
     // construct from TransactionState
     public UpsertRecord(long commitSeq, TransactionState state) {
@@ -98,13 +108,25 @@ public class UpsertRecord {
         tableRecords = Maps.newHashMap();
 
         Map<Long, Set<Long>> loadedTableIndexIds = state.getLoadedTblIndexes();
-        for (TableCommitInfo info : state.getIdToTableCommitInfos().values()) {
-            Set<Long> indexIds = loadedTableIndexIds.get(info.getTableId());
-            TableRecord tableRecord = new TableRecord(indexIds);
-            tableRecords.put(info.getTableId(), tableRecord);
-
-            for (PartitionCommitInfo partitionCommitInfo : 
info.getIdToPartitionCommitInfo().values()) {
-                tableRecord.addPartitionRecord(partitionCommitInfo);
+        if (state.getSubTxnIds() != null) {
+            state.getSubTxnIdToTableCommitInfo().forEach((subTxnId, 
tableCommitInfo) -> {
+                Set<Long> indexIds = 
loadedTableIndexIds.get(tableCommitInfo.getTableId());
+                TableRecord tableRecord = 
tableRecords.compute(tableCommitInfo.getTableId(),
+                        (k, v) -> v == null ? new TableRecord(indexIds) : v);
+                for (PartitionCommitInfo partitionCommitInfo : 
tableCommitInfo.getIdToPartitionCommitInfo().values()) {
+                    tableRecord.addPartitionRecord(subTxnId, 
partitionCommitInfo);
+                }
+            });
+            subTxnIds = state.getSubTxnIds();
+        } else {
+            for (TableCommitInfo info : 
state.getIdToTableCommitInfos().values()) {
+                Set<Long> indexIds = 
loadedTableIndexIds.get(info.getTableId());
+                TableRecord tableRecord = new TableRecord(indexIds);
+                tableRecords.put(info.getTableId(), tableRecord);
+
+                for (PartitionCommitInfo partitionCommitInfo : 
info.getIdToPartitionCommitInfo().values()) {
+                    tableRecord.addPartitionRecord(partitionCommitInfo);
+                }
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 132378f6511..ab92cbd0d63 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -231,6 +231,7 @@ import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
 import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TStreamLoadPutResult;
+import org.apache.doris.thrift.TSubTxnInfo;
 import org.apache.doris.thrift.TSyncQueryColumns;
 import org.apache.doris.thrift.TTableIndexQueryStats;
 import org.apache.doris.thrift.TTableMetadataNameIds;
@@ -245,6 +246,7 @@ import 
org.apache.doris.thrift.TUpdateFollowerPartitionStatsCacheRequest;
 import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusResult;
+import org.apache.doris.transaction.SubTransactionState;
 import org.apache.doris.transaction.TabletCommitInfo;
 import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
@@ -1265,6 +1267,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         try {
             TBeginTxnResult tmpRes = beginTxnImpl(request, clientAddr);
             result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
+            if (tmpRes.isSetSubTxnIds()) {
+                result.setSubTxnIds(tmpRes.getSubTxnIds());
+            }
         } catch (DuplicatedRequestException e) {
             // this is a duplicate request, just return previous txn id
             LOG.warn("duplicate request for stream load. request id: {}, txn: 
{}", e.getDuplicatedRequestId(),
@@ -1349,6 +1354,12 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         // step 7: return result
         TBeginTxnResult result = new TBeginTxnResult();
         result.setTxnId(txnId).setDbId(db.getId());
+        if (request.isSetSubTxnNum() && request.getSubTxnNum() > 0) {
+            result.addToSubTxnIds(txnId);
+            for (int i = 0; i < request.getSubTxnNum() - 1; i++) {
+                
result.addToSubTxnIds(Env.getCurrentGlobalTransactionMgr().getNextTransactionId());
+            }
+        }
         return result;
     }
 
@@ -1699,8 +1710,14 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (!request.isSetTxnId()) {
             throw new UserException("txn_id is not set");
         }
-        if (!request.isSetCommitInfos()) {
-            throw new UserException("commit_infos is not set");
+        if (request.isSetTxnInsert() && request.isTxnInsert()) {
+            if (!request.isSetSubTxnInfos()) {
+                throw new UserException("sub_txn_infos is not set");
+            }
+        } else {
+            if (!request.isSetCommitInfos()) {
+                throw new UserException("commit_infos is not set");
+            }
         }
 
         // Step 1: get && check database
@@ -1750,11 +1767,29 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? 
request.getThriftRpcTimeoutMs() / 2 : 5000;
 
         // Step 5: commit and publish
-        return Env.getCurrentGlobalTransactionMgr()
-                .commitAndPublishTransaction(db, tableList,
-                        request.getTxnId(),
-                        TabletCommitInfo.fromThrift(request.getCommitInfos()), 
timeoutMs,
-                        
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+        if (request.isSetTxnInsert() && request.isTxnInsert()) {
+            List<Long> subTxnIds = new ArrayList<>();
+            List<SubTransactionState> subTransactionStates = new ArrayList<>();
+            for (TSubTxnInfo subTxnInfo : request.getSubTxnInfos()) {
+                TableIf table = db.getTableNullable(subTxnInfo.getTableId());
+                if (table == null) {
+                    continue;
+                }
+                subTxnIds.add(subTxnInfo.getSubTxnId());
+                subTransactionStates.add(
+                        new SubTransactionState(subTxnInfo.getSubTxnId(), 
(Table) table,
+                                subTxnInfo.getTabletCommitInfos(), null));
+            }
+            transactionState.setSubTxnIds(subTxnIds);
+            return 
Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(db, 
request.getTxnId(),
+                    subTransactionStates, timeoutMs);
+        } else {
+            return Env.getCurrentGlobalTransactionMgr()
+                    .commitAndPublishTransaction(db, tableList,
+                            request.getTxnId(),
+                            
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
+                            
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
+        }
     }
 
     @Override
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index af13976d619..2867e15c3c1 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -654,6 +654,8 @@ struct TBeginTxnRequest {
     10: optional Types.TUniqueId request_id
     11: optional string token
     12: optional i64 backend_id
+    // used for ccr
+    13: optional i64 sub_txn_num = 0
 }
 
 struct TBeginTxnResult {
@@ -662,6 +664,8 @@ struct TBeginTxnResult {
     3: optional string job_status // if label already used, set status of 
existing job
     4: optional i64 db_id
     5: optional Types.TNetworkAddress master_address
+    // used for ccr
+    6: optional list<i64> sub_txn_ids
 }
 
 // StreamLoad request, used to load a streaming to engine
@@ -832,6 +836,9 @@ struct TCommitTxnRequest {
     10: optional i64 thrift_rpc_timeout_ms
     11: optional string token
     12: optional i64 db_id
+    // used for ccr
+    13: optional bool txn_insert
+    14: optional list<TSubTxnInfo> sub_txn_infos
 }
 
 struct TCommitTxnResult {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
index 8ccec9a6669..6acbe0dcb8e 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
@@ -20,9 +20,10 @@ package org.apache.doris.regression.json
 class PartitionData {
     public Long partitionId
     public Long version
+    public Long stid // sub txn id
 
     String toString() {
-        return "(" + partitionId.toString() + ", " + version.toString() + ")"
+        return "(" + partitionId.toString() + ", " + version.toString() + ", " 
+ stid + ")"
     }
 }
 
@@ -51,13 +52,17 @@ class BinlogData {
     public String label
     public Long dbId
     public Map<Long, PartitionRecords> tableRecords
+    public List<Long> stids
 
     String toString() {
-        return "(" + commitSeq.toString() + ", " +
-                     txnId.toString() + ", " +
-                     timeStamp + label + ", " +
-                     dbId.toString() + ", " +
-                     tableRecords.toString()
+        return "(commitSeq: " + commitSeq
+                    + ", txnId: " + txnId
+                    + ", timestamp: " + timeStamp
+                    + ", label: " + label
+                    + ", dbId: " + dbId
+                    +", subTxnIds: " + stids
+                    + ", tableRecords: " + tableRecords
+                    +")"
     }
 }
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 64ebeb03a0a..f17fff2aadd 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -20,6 +20,7 @@ package org.apache.doris.regression.suite
 import com.google.common.collect.Maps
 import com.google.gson.Gson
 import org.apache.doris.regression.Config
+import org.apache.doris.regression.json.PartitionData
 import org.apache.doris.regression.json.PartitionRecords
 import org.apache.doris.regression.suite.client.BackendClientImpl
 import org.apache.doris.regression.suite.client.FrontendClientImpl
@@ -38,6 +39,7 @@ import org.apache.doris.thrift.TNetworkAddress
 import org.apache.doris.thrift.TRestoreSnapshotResult
 import org.apache.doris.thrift.TStatus
 import org.apache.doris.thrift.TStatusCode
+import org.apache.doris.thrift.TSubTxnInfo
 import org.apache.doris.thrift.TTabletCommitInfo
 import org.apache.doris.thrift.TUniqueId
 import org.apache.thrift.transport.TTransportException
@@ -114,7 +116,14 @@ class Syncer {
                 Gson gson = new Gson()
                 context.lastBinlog = gson.fromJson(data, BinlogData.class)
                 logger.info("Source lastBinlog: ${context.lastBinlog}")
-
+                if (context.lastBinlog.stids != null) {
+                    context.sourceSubTxnIds = context.lastBinlog.stids as List
+                }
+                logger.info("source subTxnIds: ${context.sourceSubTxnIds}")
+                context.txnInsert = context.sourceSubTxnIds.size() > 0
+                context.targetSubTxnIds.clear()
+                context.sourceToTargetSubTxnId.clear()
+                context.subTxnInfos.clear()
                 return getSourceMeta(table)
             }
         } else {
@@ -206,6 +215,20 @@ class Syncer {
         if (isCheckedOK && result.isSetTxnId()) {
             logger.info("Begin transaction id is ${result.getTxnId()}")
             context.txnId = result.getTxnId()
+            if (result.getSubTxnIds() != null) {
+                context.targetSubTxnIds.addAll(result.getSubTxnIds())
+            }
+            if (context.targetSubTxnIds.size() != 
context.sourceSubTxnIds.size()) {
+                logger.error("source subTxnIds size is not equal to target 
subTxnIds size, " +
+                        "source: ${context.sourceSubTxnIds}, target: 
${context.targetSubTxnIds}")
+                isCheckedOK = false
+            }
+            if (isCheckedOK) {
+                for (int i = 0; i < context.sourceSubTxnIds.size(); ++i) {
+                    
context.sourceToTargetSubTxnId.put(context.sourceSubTxnIds[i], 
context.targetSubTxnIds[i])
+                }
+            }
+            logger.info("sourceToTargetSubTxnId: 
${context.sourceToTargetSubTxnId}")
         } else {
             logger.error("Begin transaction txnId is unset!")
             isCheckedOK = false
@@ -694,10 +717,10 @@ class Syncer {
 
     Boolean checkTargetVersion() {
         logger.info("Check target tablets version")
-        context.targetTableMap.values().forEach {
+        return context.targetTableMap.values().every {
             String baseSQL = "SHOW PROC '/dbs/" + 
context.targetDbId.toString() + "/" +
                     it.id.toString() + "/partitions/"
-            it.partitionMap.forEach((id, meta) -> {
+            return it.partitionMap.every((id, meta) -> {
                 String versionSQL = baseSQL + id.toString() + "/" + 
meta.indexId.toString()
                 List<List<Object>> sqlInfo = suite.target_sql(versionSQL + "'")
                 for (List<Object> row : sqlInfo) {
@@ -709,10 +732,9 @@ class Syncer {
                         return false
                     }
                 }
+                return true
             })
         }
-
-        return true
     }
 
     Boolean getBinlog(String table = "", Boolean update = true) {
@@ -733,7 +755,8 @@ class Syncer {
         if (context.sourceTableMap.containsKey(table)) {
             tableId = context.targetTableMap.get(table).id
         }
-        TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context, 
tableId)
+        TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context, 
tableId, context.sourceSubTxnIds.size())
+        logger.info("begin txn result: ${result}")
         return checkBeginTxn(result)
     }
 
@@ -750,6 +773,12 @@ class Syncer {
 
         // step 2: Begin ingest binlog
         // step 2.1: ingest each table in meta
+
+        // sub txn id to tablet commit info
+        Map<Long, List<TTabletCommitInfo>> subTxnIdToTabletCommitInfos = new 
HashMap<Long, List<TTabletCommitInfo>>()
+        // sub txn id to table id
+        Map<Long, Long> subTxnIdToTableId = new HashMap<Long, Long>()
+
         for (Entry<String, TableMeta> tableInfo : context.sourceTableMap) {
             String tableName = tableInfo.key
             TableMeta srcTableMeta = tableInfo.value
@@ -772,51 +801,76 @@ class Syncer {
                     continue
                 }
 
-                Iterator srcTabletIter = 
srcPartition.value.tabletMeta.iterator()
-                Iterator tarTabletIter = 
tarPartition.value.tabletMeta.iterator()
-
-                // step 2.3: ingest each tablet in the partition
-                while (srcTabletIter.hasNext()) {
-                    Entry srcTabletMap = srcTabletIter.next()
-                    Entry tarTabletMap = tarTabletIter.next()
-
-                    BackendClientImpl srcClient = 
context.sourceBackendClients.get(srcTabletMap.value)
-                    if (srcClient == null) {
-                        logger.error("Can't find src 
tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
-                        return false
-                    }
-                    BackendClientImpl tarClient = 
context.targetBackendClients.get(tarTabletMap.value)
-                    if (tarClient == null) {
-                        logger.error("Can't find target 
tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
-                        return false
+                logger.info("Partition records: ${binlogRecords}")
+                for (PartitionData partitionRecord : 
binlogRecords.partitionRecords) {
+                    if (partitionRecord.partitionId != srcPartition.key) {
+                        continue
                     }
+                    logger.info("Partition record: ${partitionRecord}")
+                    long txnId = partitionRecord.stid == -1 ? context.txnId : 
context.sourceToTargetSubTxnId.get(partitionRecord.stid)
+                    // step 2.3: ingest each tablet in the partition
+                    Iterator srcTabletIter = 
srcPartition.value.tabletMeta.iterator()
+                    Iterator tarTabletIter = 
tarPartition.value.tabletMeta.iterator()
+                    while (srcTabletIter.hasNext()) {
+                        Entry srcTabletMap = srcTabletIter.next()
+                        Entry tarTabletMap = tarTabletIter.next()
+
+                        BackendClientImpl srcClient = 
context.sourceBackendClients.get(srcTabletMap.value)
+                        if (srcClient == null) {
+                            logger.error("Can't find src 
tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
+                            return false
+                        }
+                        BackendClientImpl tarClient = 
context.targetBackendClients.get(tarTabletMap.value)
+                        if (tarClient == null) {
+                            logger.error("Can't find target 
tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
+                            return false
+                        }
 
-                    tarPartition.value.version = srcPartition.value.version
-                    long partitionId = fakePartitionId == -1 ? 
tarPartition.key : fakePartitionId
-                    long version = fakeVersion == -1 ? 
srcPartition.value.version : fakeVersion
-
-                    TIngestBinlogRequest request = new TIngestBinlogRequest()
-                    TUniqueId uid = new TUniqueId(-1, -1)
-                    request.setTxnId(context.txnId)
-                    request.setRemoteTabletId(srcTabletMap.key)
-                    request.setBinlogVersion(version)
-                    request.setRemoteHost(srcClient.address.hostname)
-                    request.setRemotePort(srcClient.httpPort.toString())
-                    request.setPartitionId(partitionId)
-                    request.setLocalTabletId(tarTabletMap.key)
-                    request.setLoadId(uid)
-                    logger.info("request -> ${request}")
-                    TIngestBinlogResult result = 
tarClient.client.ingestBinlog(request)
-                    if (!checkIngestBinlog(result)) {
-                        logger.error("Ingest binlog error! result: ${result}")
-                        return false
-                    }
+                        tarPartition.value.version = srcPartition.value.version
+                        long partitionId = fakePartitionId == -1 ? 
tarPartition.key : fakePartitionId
+                        long version = fakeVersion == -1 ? 
partitionRecord.version : fakeVersion
+
+                        TIngestBinlogRequest request = new 
TIngestBinlogRequest()
+                        TUniqueId uid = new TUniqueId(-1, -1)
+                        request.setTxnId(txnId)
+                        request.setRemoteTabletId(srcTabletMap.key)
+                        request.setBinlogVersion(version)
+                        request.setRemoteHost(srcClient.address.hostname)
+                        request.setRemotePort(srcClient.httpPort.toString())
+                        request.setPartitionId(partitionId)
+                        request.setLocalTabletId(tarTabletMap.key)
+                        request.setLoadId(uid)
+                        logger.info("request -> ${request}")
+                        TIngestBinlogResult result = 
tarClient.client.ingestBinlog(request)
+                        if (!checkIngestBinlog(result)) {
+                            logger.error("Ingest binlog error! result: 
${result}")
+                            return false
+                        }
 
-                    addCommitInfo(tarTabletMap.key, tarTabletMap.value)
+                        if (context.txnInsert) {
+                            List<TTabletCommitInfo> tabletCommitInfos = 
subTxnIdToTabletCommitInfos.get(txnId)
+                            if (tabletCommitInfos == null) {
+                                tabletCommitInfos = new 
ArrayList<TTabletCommitInfo>()
+                                subTxnIdToTabletCommitInfos.put(txnId, 
tabletCommitInfos)
+                                subTxnIdToTableId.put(txnId, tarTableMeta.id)
+                            }
+                            tabletCommitInfos.add(new 
TTabletCommitInfo(tarTabletMap.key, tarTabletMap.value))
+                        } else {
+                            addCommitInfo(tarTabletMap.key, tarTabletMap.value)
+                        }
+                    }
                 }
             }
         }
 
+        if (context.txnInsert) {
+            for (long sourceSubTxnId : context.sourceSubTxnIds) {
+                long subTxnId = 
context.sourceToTargetSubTxnId.get(sourceSubTxnId)
+                List<TTabletCommitInfo> tabletCommitInfos = 
subTxnIdToTabletCommitInfos.get(subTxnId)
+                TSubTxnInfo subTxnInfo = new 
TSubTxnInfo().setSubTxnId(subTxnId).setTableId(subTxnIdToTableId.get(subTxnId)).setTabletCommitInfos(tabletCommitInfos)
+                context.subTxnInfos.add(subTxnInfo)
+            }
+        }
         return true
     }
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
index 93108e67fc1..b86f012aa87 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
@@ -23,7 +23,8 @@ import 
org.apache.doris.regression.suite.client.BackendClientImpl
 import org.apache.doris.regression.suite.client.FrontendClientImpl
 import org.apache.doris.thrift.TTabletCommitInfo
 import org.apache.doris.thrift.TGetSnapshotResult
-import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TNetworkAddress
+import org.apache.doris.thrift.TSubTxnInfo
 import com.google.gson.annotations.SerializedName
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
@@ -128,6 +129,12 @@ class SyncerContext {
     public long txnId
     public long seq
 
+    public boolean txnInsert = false
+    public List<Long> sourceSubTxnIds = new ArrayList<Long>()
+    public List<Long> targetSubTxnIds = new ArrayList<Long>()
+    public Map<Long, Long> sourceToTargetSubTxnId = new HashMap<Long, Long>()
+    public List<TSubTxnInfo> subTxnInfos = new ArrayList<TSubTxnInfo>()
+
     SyncerContext(Suite suite, String dbName, Config config) {
         this.suite = suite
         this.sourceDbId = -1
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
index 3bb7be40e58..b1b320238a3 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
@@ -17,7 +17,6 @@
 
 package org.apache.doris.regression.util
 
-
 import org.apache.doris.regression.suite.SyncerContext
 import org.apache.doris.regression.suite.client.BackendClientImpl
 import org.apache.doris.regression.suite.client.FrontendClientImpl
@@ -34,11 +33,16 @@ import org.apache.doris.thrift.TIngestBinlogResult
 import org.apache.doris.thrift.TRestoreSnapshotRequest
 import org.apache.doris.thrift.TRestoreSnapshotResult
 import org.apache.doris.thrift.TSnapshotType
+import org.apache.doris.thrift.TSubTxnInfo
 import org.apache.thrift.TException
 import org.apache.doris.thrift.TGetBinlogRequest
 import org.apache.doris.thrift.TGetBinlogResult
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
 
 class SyncerUtils {
+    private static final Logger logger = 
LoggerFactory.getLogger(SyncerUtils.class)
+
     private static <T> void setAuthorInformation(T request, SyncerContext 
context) {
         request.setUser(context.user)
         request.setPasswd(context.passwd)
@@ -62,7 +66,7 @@ class SyncerUtils {
         return clientImpl.client.getBinlog(request)
     }
 
-    static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl, 
SyncerContext context, Long tableId) throws TException {
+    static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl, 
SyncerContext context, Long tableId, Long subTxnNum) throws TException {
         TBeginTxnRequest request = new TBeginTxnRequest()
         setAuthorInformation(request, context)
         request.setDb("TEST_" + context.db)
@@ -70,8 +74,11 @@ class SyncerUtils {
             request.addToTableIds(tableId)
         }
         request.setLabel(newLabel(context, tableId))
+        if (subTxnNum > 0) {
+            request.setSubTxnNum(subTxnNum)
+        }
+        logger.info("begin txn request: ${request}")
         return clientImpl.client.beginTxn(request)
-
     }
 
     static TIngestBinlogResult ingestBinlog(BackendClientImpl clientImpl, 
TIngestBinlogRequest request) throws TException {
@@ -82,8 +89,14 @@ class SyncerUtils {
         TCommitTxnRequest request = new TCommitTxnRequest()
         setAuthorInformation(request, context)
         request.setDb("TEST_" + context.db)
-        request.setCommitInfos(context.commitInfos)
         request.setTxnId(context.txnId)
+        if (context.txnInsert) {
+            request.setTxnInsert(true)
+            request.setSubTxnInfos(context.subTxnInfos)
+        } else {
+            request.setCommitInfos(context.commitInfos)
+        }
+        logger.info("commit txn request: ${request}")
         return clientImpl.client.commitTxn(request)
     }
 
diff --git a/regression-test/suites/ccr_syncer_p0/test_txn_insert.groovy 
b/regression-test/suites/ccr_syncer_p0/test_txn_insert.groovy
new file mode 100644
index 00000000000..069cfc92a32
--- /dev/null
+++ b/regression-test/suites/ccr_syncer_p0/test_txn_insert.groovy
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_txn_insert") {
+    def syncer = getSyncer()
+    if (!syncer.checkEnableFeatureBinlog()) {
+        logger.info("fe enable_feature_binlog is false, skip case 
test_txn_case")
+        return
+    }
+    def txnTableName = "test_txn_insert"
+    for (int i = 0; i < 3; i++) {
+        sql "DROP TABLE IF EXISTS ${txnTableName}_${i} force"
+        sql """
+           CREATE TABLE if NOT EXISTS ${txnTableName}_${i} 
+           (
+               `test` INT,
+               `id` INT
+           )
+           ENGINE=OLAP
+           DUPLICATE KEY(`test`, `id`)
+           DISTRIBUTED BY HASH(id) BUCKETS 1 
+           PROPERTIES ( 
+               "replication_allocation" = "tag.location.default: 1"
+           )
+        """
+        sql """ALTER TABLE ${txnTableName}_${i} set ("binlog.enable" = 
"true")"""
+        assertTrue(syncer.getSourceMeta("${txnTableName}_${i}"))
+
+        target_sql "DROP TABLE IF EXISTS ${txnTableName}_${i} force"
+        target_sql """
+                  CREATE TABLE if NOT EXISTS ${txnTableName}_${i} 
+                  (
+                      `test` INT,
+                      `id` INT
+                  )
+                  ENGINE=OLAP
+                  DUPLICATE KEY(`test`, `id`)
+                  DISTRIBUTED BY HASH(id) BUCKETS 1 
+                  PROPERTIES ( 
+                      "replication_allocation" = "tag.location.default: 1"
+                  )
+              """
+        assertTrue(syncer.getTargetMeta("${txnTableName}_${i}"))
+    }
+
+    def sync = { String tableName ->
+        assertTrue(syncer.getBinlog("${tableName}"))
+        assertTrue(syncer.getBackendClients())
+        assertTrue(syncer.beginTxn("${tableName}"))
+        assertTrue(syncer.ingestBinlog())
+        assertTrue(syncer.commitTxn())
+        assertTrue(syncer.checkTargetVersion())
+        target_sql " sync "
+    }
+
+    def check_row_count = { String tableName, int count ->
+        def res = target_sql """SELECT count() FROM ${tableName}"""
+        logger.info("target row count: ${res}")
+        assertEquals(count, res[0][0])
+    }
+
+    // test duplicate table
+    logger.info("=== Test 1: insert values ===")
+    sql """ INSERT INTO ${txnTableName}_0 VALUES (1, 0) """
+    sync("${txnTableName}_0")
+    def res = target_sql """SELECT * FROM ${txnTableName}_0 WHERE test=1 """
+    assertEquals(res.size(), 1)
+
+    logger.info("=== Test 2: txn insert values ===")
+    sql """ begin """
+    sql """ INSERT INTO ${txnTableName}_0 VALUES (20, 0), (30, 0) """
+    sql """ INSERT INTO ${txnTableName}_0 VALUES (40, 0) """
+    sql """ commit """
+    sync("${txnTableName}_0")
+    check_row_count("${txnTableName}_0", 4)
+
+    logger.info("=== Test 3: txn insert select into 1 table ===")
+    sql """ begin """
+    sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+    sql """ commit """
+    sync("${txnTableName}_1")
+    check_row_count("${txnTableName}_1", 4)
+
+    logger.info("=== Test 4: txn insert select into 1 table twice ===")
+    sql """ begin """
+    sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+    sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+    sql """ commit """
+    sync("${txnTableName}_1")
+    check_row_count("${txnTableName}_1", 12)
+
+    logger.info("=== Test 5: txn insert select into 2 tables ===")
+    sql """ begin """
+    sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+    sql """ INSERT INTO ${txnTableName}_2 select * from ${txnTableName}_0 """
+    sql """ INSERT INTO ${txnTableName}_1 select * from ${txnTableName}_0 """
+    sql """ commit """
+    sync("${txnTableName}_1")
+    check_row_count("${txnTableName}_1", 20)
+    check_row_count("${txnTableName}_2", 4)
+
+    // test multi partitions
+    logger.info("=== Test 6: table with multi partitions ===")
+    sql """ DROP TABLE IF EXISTS ${txnTableName}_3 force """
+    sql """
+        CREATE TABLE if NOT EXISTS ${txnTableName}_3 (`test` INT, `id` INT)
+        DUPLICATE KEY(`test`)
+        PARTITION BY RANGE(test) ( FROM (1) TO (50) INTERVAL 10 )
+        DISTRIBUTED BY HASH(id) BUCKETS 2 
+        PROPERTIES ( "replication_num" = "1" )
+    """
+    sql """ALTER TABLE ${txnTableName}_3 set ("binlog.enable" = "true")"""
+    assertTrue(syncer.getSourceMeta("${txnTableName}_3"))
+    target_sql "DROP TABLE IF EXISTS ${txnTableName}_3 force"
+    target_sql """
+        CREATE TABLE if NOT EXISTS ${txnTableName}_3 (`test` INT, `id` INT)
+        DUPLICATE KEY(`test`)
+        PARTITION BY RANGE(test) ( FROM (1) TO (50) INTERVAL 10 )
+        DISTRIBUTED BY HASH(id) BUCKETS 2 
+        PROPERTIES ( "replication_num" = "1" )
+    """
+    assertTrue(syncer.getTargetMeta("${txnTableName}_3"))
+    sql """ set enable_insert_strict = false """
+    sql """ begin """
+    sql """ INSERT INTO ${txnTableName}_3 select * from ${txnTableName}_0 """
+    sql """ INSERT INTO ${txnTableName}_3 PARTITION (p_1_11, p_11_21) select * 
from ${txnTableName}_0 """
+    sql """ INSERT INTO ${txnTableName}_3 PARTITION (p_31_41) select * from 
${txnTableName}_0 """
+    sql """ commit """
+    sync("${txnTableName}_3")
+    check_row_count("${txnTableName}_3", 7)
+    sql """ set enable_insert_strict = true """
+
+    // delete and insert
+    logger.info("=== Test 7: delete and insert ===")
+    sql """ begin """
+    sql """ delete from ${txnTableName}_2 where test < 30 """
+    sql """ insert into ${txnTableName}_2 select * from ${txnTableName}_0 
where test < 30 """
+    sql """ commit """
+    sync("${txnTableName}_2")
+    check_row_count("${txnTableName}_2", 4)
+
+    // insert and delete
+    /*logger.info("=== Test 8: insert and delete ===")
+    sql """ begin """
+    sql """ insert into ${txnTableName}_2 select * from ${txnTableName}_0 
where test < 30 """
+    sql """ delete from ${txnTableName}_2 where test < 30 """
+    sql """ commit """
+    sync("${txnTableName}_2")
+    check_row_count("${txnTableName}_2", 4)*/
+
+    // mow table
+    logger.info("=== Test 9: mow table insert ===")
+    sql """ DROP TABLE IF EXISTS ${txnTableName}_u0 force """
+    sql """
+        CREATE TABLE if NOT EXISTS ${txnTableName}_u0 (`test` INT, `id` INT)
+        UNIQUE KEY(`test`)
+        DISTRIBUTED BY HASH(test) BUCKETS 2 
+        PROPERTIES ( "replication_num" = "1" )
+    """
+    sql """ALTER TABLE ${txnTableName}_u0 set ("binlog.enable" = "true")"""
+    assertTrue(syncer.getSourceMeta("${txnTableName}_u0"))
+    target_sql "DROP TABLE IF EXISTS ${txnTableName}_u0 force"
+    target_sql """
+        CREATE TABLE if NOT EXISTS ${txnTableName}_u0 (`test` INT, `id` INT)
+        UNIQUE KEY(`test`)
+        DISTRIBUTED BY HASH(test) BUCKETS 2 
+        PROPERTIES ( "replication_num" = "1" )
+    """
+    assertTrue(syncer.getTargetMeta("${txnTableName}_u0"))
+    sql """ insert into ${txnTableName}_0 values (1, 1) """
+    // target_sql """ insert into ${txnTableName}_0 values (1, 1) """
+    sql """ begin """
+    sql """ insert into ${txnTableName}_u0 select * from ${txnTableName}_1 """
+    sql """ insert into ${txnTableName}_u0 select * from ${txnTableName}_0 """
+    sql """ commit """
+    sync("${txnTableName}_u0")
+    check_row_count("${txnTableName}_u0", 4)
+    res = target_sql """SELECT * FROM ${txnTableName}_u0 WHERE test=1 """
+    assertEquals(res.size(), 1)
+    assertEquals(res[0][1], 1)
+
+    logger.info("=== Test 10: mow table update ===")
+    sql """ begin """
+    sql """ insert into ${txnTableName}_u0 select * from ${txnTableName}_0 """
+    sql """ update ${txnTableName}_u0 set id = id + 10 where test = 1 """
+    sql """ commit """
+    sync("${txnTableName}_u0")
+    check_row_count("${txnTableName}_u0", 4)
+    res = target_sql """SELECT * FROM ${txnTableName}_u0 WHERE test=1 """
+    assertEquals(res.size(), 1)
+    assertEquals(res[0][1], 11)
+
+    logger.info("=== Test 11: mow table delete from using ===")
+
+    // test table with multi indexes
+    logger.info("=== Test 12: table with multi indexes ===")
+    target_sql """ create materialized view mv_${txnTableName}_3 as select id 
from ${txnTableName}_3; """
+    createMV """ create materialized view mv_${txnTableName}_3 as select id 
from ${txnTableName}_3; """
+    res = sql """ select id from ${txnTableName}_3 """
+    assertEquals(res.size(), 7)
+    res = target_sql """ select id from ${txnTableName}_3 """
+    assertEquals(res.size(), 7)
+    /*sql """ begin """
+    sql """ insert into ${txnTableName}_3 select * from ${txnTableName}_0 """
+    sql """ insert into ${txnTableName}_3 select * from ${txnTableName}_0 """
+    sql """ commit """
+    sync("${txnTableName}_3")
+    check_row_count("${txnTableName}_3", 12)
+    res = sql """ select id from ${txnTableName}_3 """
+    assertEquals(res.size(), 12)
+    res = target_sql """ select id from ${txnTableName}_3 """
+    assertEquals(res.size(), 12)*/
+
+    // test schema change
+    // test one sub txn is error
+    // test only enable one table binlog
+
+    // End Test
+    syncer.closeBackendClients()
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to