This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 15ec191a77 [Fix](CCR) Use tableId as the credential for CCR syncer 
instead of tableName (#21466)
15ec191a77 is described below

commit 15ec191a779a4b9440b5d54d55a63f62bfa1da36
Author: DeadlineFen <[email protected]>
AuthorDate: Wed Jul 5 10:16:09 2023 +0800

    [Fix](CCR) Use tableId as the credential for CCR syncer instead of 
tableName (#21466)
---
 .../apache/doris/service/FrontendServiceImpl.java  |  43 +++--
 gensrc/thrift/FrontendService.thrift               |   9 +-
 .../apache/doris/regression/json/BinlogData.groovy |  10 ++
 .../apache/doris/regression/suite/Syncer.groovy    | 179 ++++++++++++---------
 .../doris/regression/suite/SyncerContext.groovy    |  10 +-
 .../doris/regression/util/SyncerUtils.groovy       |  17 +-
 6 files changed, 161 insertions(+), 107 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d296663745..0c00615a9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -914,6 +914,24 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    private List<String> getTableNames(String cluster, String dbName, 
List<Long> tableIds) throws UserException {
+        final String fullDbName = ClusterNamespace.getFullName(cluster, 
dbName);
+        Database db = 
Env.getCurrentInternalCatalog().getDbNullable(fullDbName);
+        if (db == null) {
+            throw new UserException(String.format("can't find db named: %s", 
dbName));
+        }
+        List<String> tableNames = Lists.newArrayList();
+        for (Long id : tableIds) {
+            Table table = db.getTableNullable(id);
+            if (table == null) {
+                throw new UserException(String.format("can't find table id: %d 
in db: %s", id, dbName));
+            }
+            tableNames.add(table.getName());
+        }
+
+        return tableNames;
+    }
+
     private void checkPasswordAndPrivs(String cluster, String user, String 
passwd, String db, String tbl,
                                        String clientIp, PrivPredicate 
predicate) throws AuthenticationException {
         checkPasswordAndPrivs(cluster, user, passwd, db, 
Lists.newArrayList(tbl), clientIp, predicate);
@@ -1070,8 +1088,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (!request.isSetDb()) {
             throw new UserException("db is not set");
         }
-        if (!request.isSetTables()) {
-            throw new UserException("tables is not set");
+        if (!request.isSetTableIds()) {
+            throw new UserException("table ids is not set");
         }
         if (!request.isSetLabel()) {
             throw new UserException("label is not set");
@@ -1084,7 +1102,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
 
         // step 1: check auth
         if (Strings.isNullOrEmpty(request.getToken())) {
-            checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(), request.getTables(),
+            // lookup table ids && convert into tableNameList
+            List<String> tableNameList = getTableNames(cluster, 
request.getDb(), request.getTableIds());
+            checkPasswordAndPrivs(cluster, request.getUser(), 
request.getPasswd(), request.getDb(), tableNameList,
                     request.getUserIp(), PrivPredicate.LOAD);
         }
 
@@ -1106,15 +1126,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
 
         // step 4: fetch all tableIds
-        // lookup tables && convert into tableIdList
-        List<Long> tableIdList = Lists.newArrayList();
-        for (String tblName : request.getTables()) {
-            Table table = db.getTableOrMetaException(tblName, TableType.OLAP);
-            if (table == null) {
-                throw new UserException("unknown table, table=" + tblName);
-            }
-            tableIdList.add(table.getId());
-        }
+        // table ids is checked at step 1
+        List<Long> tableIdList = request.getTableIds();
 
         // step 5: get timeout
         long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : 
Config.stream_load_default_timeout_second;
@@ -2320,8 +2333,10 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         // step 4: fetch all tableIds
         // lookup tables && convert into tableIdList
         long tableId = -1;
-        String tableName = request.getTable();
-        if (!Strings.isNullOrEmpty(tableName)) {
+        if (request.isSetTableId()) {
+            tableId = request.getTableId();
+        } else if (request.isSetTable()) {
+            String tableName = request.getTable();
             Table table = db.getTableOrMetaException(tableName, 
TableType.OLAP);
             if (table == null) {
                 throw new UserException("unknown table, table=" + tableName);
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 32e2e3b0a3..5cf08a4e38 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -528,7 +528,7 @@ struct TBeginTxnRequest {
     2: optional string user
     3: optional string passwd
     4: optional string db
-    5: optional list<string> tables
+    5: optional list<i64> table_ids
     6: optional string user_ip
     7: optional string label
     8: optional i64 auth_code
@@ -947,9 +947,10 @@ struct TGetBinlogRequest {
     3: optional string passwd
     4: optional string db
     5: optional string table
-    6: optional string user_ip
-    7: optional string token
-    8: optional i64 prev_commit_seq
+    6: optional i64 table_id
+    7: optional string user_ip
+    8: optional string token
+    9: optional i64 prev_commit_seq
 }
 
 enum TBinlogType {
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
index 868a363828..8ccec9a666 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
@@ -29,6 +29,16 @@ class PartitionData {
 class PartitionRecords {
     public List<PartitionData> partitionRecords
 
+    Boolean contains(Long partitionId) {
+        for (PartitionData data : partitionRecords) {
+            if (data.partitionId == partitionId) {
+                return true
+            }
+        }
+
+        return false
+    }
+
     String toString() {
         return partitionRecords.toString()
     }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 805093ef4f..9504f2a874 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -17,8 +17,10 @@
 
 package org.apache.doris.regression.suite
 
+import com.google.common.collect.Maps
 import com.google.gson.Gson
 import org.apache.doris.regression.Config
+import org.apache.doris.regression.json.PartitionRecords
 import org.apache.doris.regression.suite.client.BackendClientImpl
 import org.apache.doris.regression.suite.client.FrontendClientImpl
 import org.apache.doris.regression.util.SyncerUtils
@@ -61,7 +63,7 @@ class Syncer {
         TARGET
     }
 
-    private Boolean checkBinlog(TBinlog binlog, Boolean update) {
+    private Boolean checkBinlog(TBinlog binlog, String table, Boolean update) {
 
         // step 1: check binlog availability
         if (binlog == null) {
@@ -103,8 +105,10 @@ class Syncer {
             logger.info("binlog data is ${data}")
             if (update) {
                 Gson gson = new Gson()
-                getSourceMeta(gson.fromJson(data, BinlogData.class))
-                logger.info("Source tableId: ${context.sourceTableMap}, 
${context.sourceTableMap}")
+                context.lastBinlog = gson.fromJson(data, BinlogData.class)
+                logger.info("Source lastBinlog: ${context.lastBinlog}")
+
+                return getSourceMeta(table)
             }
         } else {
             logger.error("binlog data is not contains data!")
@@ -156,7 +160,7 @@ class Syncer {
         }
 
         // step 2: check binlog
-        return checkBinlog(binlog, update)
+        return checkBinlog(binlog, table, update)
     }
 
     private Boolean checkBeginTxn(TBeginTxnResult result) {
@@ -404,41 +408,6 @@ class Syncer {
         return clientsMap
     }
 
-    void getSourceMeta(BinlogData binlogData) {
-        logger.info("data struct: ${binlogData}")
-        context.sourceTableIdToName.clear()
-        context.sourceTableMap.clear()
-
-        context.sourceDbId = binlogData.dbId
-
-        String metaSQL = "SHOW PROC '/dbs/" + binlogData.dbId.toString()
-        List<List<Object>> sqlInfo = suite.sql(metaSQL + "'")
-
-        // Get table information
-        for (List<Object> row : sqlInfo) {
-            context.sourceTableIdToName.put(row[0] as Long, row[1] as String)
-        }
-
-        // Get table meta in binlog
-        binlogData.tableRecords.forEach((tableId, partitionRecord) -> {
-            String tableName = context.sourceTableIdToName.get(tableId)
-            TableMeta tableMeta = new TableMeta(tableId)
-
-            partitionRecord.partitionRecords.forEach {
-                String partitionSQL = metaSQL + "/" + tableId.toString() + 
"/partitions/" + it.partitionId.toString()
-                sqlInfo = suite.sql(partitionSQL + "'")
-                PartitionMeta partitionMeta = new PartitionMeta(sqlInfo[0][0] 
as Long, it.version)
-                partitionSQL += "/" + partitionMeta.indexId.toString()
-                sqlInfo = suite.sql(partitionSQL + "'")
-                sqlInfo.forEach(row -> {
-                    partitionMeta.tabletMeta.put((row[0] as Long), (row[2] as 
Long))
-                })
-                tableMeta.partitionMap.put(it.partitionId, partitionMeta)
-            }
-            context.sourceTableMap.put(tableName, tableMeta)
-        })
-    }
-
     ArrayList<TTabletCommitInfo> copyCommitInfos() {
         return new ArrayList<TTabletCommitInfo>(context.commitInfos)
     }
@@ -518,86 +487,118 @@ class Syncer {
         return checkGetSnapshot()
     }
 
-    Boolean getTargetMeta(String table = "") {
-        logger.info("Get target cluster meta data.")
+    Boolean getSourceMeta(String table = "") {
+        logger.info("Get source cluster meta")
         String baseSQL = "SHOW PROC '/dbs"
         List<List<Object>> sqlInfo
+        if (context.sourceDbId == -1) {
+            sqlInfo = suite.sql(baseSQL + "'")
+            for (List<Object> row : sqlInfo) {
+                String[] dbName = (row[1] as String).split(":")
+                if (dbName[1] == context.db) {
+                    context.sourceDbId = row[0] as Long
+                    break
+                }
+            }
+        }
+        if (context.sourceDbId == -1) {
+            logger.error("Get ${context.db} db error.")
+            return false
+        }
+        baseSQL += "/" + context.sourceDbId.toString()
+        return getMeta(baseSQL, table, context.sourceTableMap, true)
+    }
 
-        // step 1: get target dbId
-        Long dbId = -1
-        sqlInfo = suite.target_sql(baseSQL + "'")
-        for (List<Object> row : sqlInfo) {
-            String[] dbName = (row[1] as String).split(":")
-            if (dbName[1] == ("TEST_" + context.db)) {
-                dbId = row[0] as Long
-                break
+    Boolean getTargetMeta(String table = "") {
+        logger.info("Get target cluster meta")
+        String baseSQL = "SHOW PROC '/dbs"
+        List<List<Object>> sqlInfo
+        if (context.targetDbId == -1) {
+            sqlInfo = suite.target_sql(baseSQL + "'")
+            for (List<Object> row : sqlInfo) {
+                String[] dbName = (row[1] as String).split(":")
+                if (dbName[1] == "TEST_" + context.db) {
+                    context.targetDbId = row[0] as Long
+                    break
+                }
             }
         }
-        if (dbId == -1) {
-            logger.error("Target cluster get ${context.db} db error.")
+        if (context.targetDbId == -1) {
+            logger.error("Get TEST_${context.db} db error.")
             return false
         }
-        context.targetDbId = dbId
+        baseSQL += "/" + context.targetDbId.toString()
+        return getMeta(baseSQL, table, context.targetTableMap, false)
+    }
+
+    Boolean getMeta(String baseSql, String table, Map<String, TableMeta> 
metaMap, Boolean toSrc) {
+        def sendSql = { String sqlStmt, Boolean isToSrc -> List<List<Object>>
+            if (isToSrc) {
+                return suite.sql(sqlStmt + "'")
+            } else {
+                return suite.target_sql(sqlStmt + "'")
+            }
+        }
+
+        List<List<Object>> sqlInfo
 
-        // step 2: get target dbId/tableId
-        baseSQL += "/" + dbId.toString()
-        sqlInfo = suite.target_sql(baseSQL + "'")
+        // step 1: get target dbId/tableId
+        sqlInfo = sendSql.call(baseSql, toSrc)
         if (table == "") {
             for (List<Object> row : sqlInfo) {
-                context.targetTableMap.put(row[1] as String, new 
TableMeta(row[0] as long))
+                metaMap.put(row[1] as String, new TableMeta(row[0] as long))
             }
         } else {
             for (List<Object> row : sqlInfo) {
                 if ((row[1] as String) == table) {
-                    context.targetTableMap.put(row[1] as String, new 
TableMeta(row[0] as long))
+                    metaMap.put(row[1] as String, new TableMeta(row[0] as 
long))
                     break
                 }
             }
         }
 
-
-        // step 3: get partitionIds
-        context.targetTableMap.values().forEach {
-            baseSQL += "/" + it.id.toString() + "/partitions"
-            ArrayList<Long> partitionIds = new ArrayList<Long>()
-            sqlInfo = suite.target_sql(baseSQL + "'")
+        // step 2: get partitionIds
+        metaMap.values().forEach {
+            baseSql += "/" + it.id.toString() + "/partitions"
+            Map<Long, Long> partitionInfo = Maps.newHashMap()
+            sqlInfo = sendSql.call(baseSql, toSrc)
             for (List<Object> row : sqlInfo) {
-                partitionIds.add(row[0] as Long)
+                partitionInfo.put(row[0] as Long, row[2] as Long)
             }
-            if (partitionIds.isEmpty()) {
+            if (partitionInfo.isEmpty()) {
                 logger.error("Target cluster get partitions fault.")
                 return false
             }
 
-            // step 4: get partitionMetas
-            for (Long id : partitionIds) {
+            // step 3: get partitionMetas
+            for (Entry<Long, Long> info : partitionInfo) {
 
-                // step 4.1: get partition/indexId
-                String partitionSQl = baseSQL + "/" + id.toString()
-                sqlInfo = suite.target_sql(partitionSQl + "'")
+                // step 3.1: get partition/indexId
+                String partitionSQl = baseSql + "/" + info.key.toString()
+                sqlInfo = sendSql.call(partitionSQl, toSrc)
                 if (sqlInfo.isEmpty()) {
-                    logger.error("Target cluster partition-${id} indexId 
fault.")
+                    logger.error("Target cluster partition-${info.key} indexId 
fault.")
                     return false
                 }
-                PartitionMeta meta = new PartitionMeta(sqlInfo[0][0] as Long, 
-1)
+                PartitionMeta meta = new PartitionMeta(sqlInfo[0][0] as Long, 
info.value)
 
-                // step 4.2: get partition/indexId/tabletId
+                // step 3.2: get partition/indexId/tabletId
                 partitionSQl += "/" + meta.indexId.toString()
-                sqlInfo = suite.target_sql(partitionSQl + "'")
+                sqlInfo = sendSql.call(partitionSQl, toSrc)
                 for (List<Object> row : sqlInfo) {
                     meta.tabletMeta.put(row[0] as Long, row[2] as Long)
                 }
                 if (meta.tabletMeta.isEmpty()) {
-                    logger.error("Target cluster get 
(partitionId/indexId)-(${id}/${meta.indexId}) tabletIds fault.")
+                    logger.error("Target cluster get 
(partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.")
                     return false
                 }
 
-                it.partitionMap.put(id, meta)
+                it.partitionMap.put(info.key, meta)
             }
         }
 
 
-        logger.info("Target cluster metadata: ${context.targetTableMap}")
+        logger.info("cluster metadata: ${metaMap}")
         return true
     }
 
@@ -627,14 +628,22 @@ class Syncer {
     Boolean getBinlog(String table = "", Boolean update = true) {
         logger.info("Get binlog from source cluster 
${context.config.feSourceThriftNetworkAddress}, binlog seq: ${context.seq}")
         FrontendClientImpl clientImpl = context.getSourceFrontClient()
-        TGetBinlogResult result = SyncerUtils.getBinLog(clientImpl, context, 
table)
+        Long tableId = -1
+        if (!table.isEmpty() && context.sourceTableMap.containsKey(table)) {
+            tableId = context.sourceTableMap.get(table).id
+        }
+        TGetBinlogResult result = SyncerUtils.getBinLog(clientImpl, context, 
table, tableId)
         return checkGetBinlog(table, result, update)
     }
 
     Boolean beginTxn(String table) {
         logger.info("Begin transaction to target cluster 
${context.config.feTargetThriftNetworkAddress}")
         FrontendClientImpl clientImpl = context.getTargetFrontClient()
-        TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context, 
table)
+        Long tableId = -1
+        if (context.sourceTableMap.containsKey(table)) {
+            tableId = context.targetTableMap.get(table).id
+        }
+        TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context, 
tableId)
         return checkBeginTxn(result)
     }
 
@@ -647,19 +656,31 @@ class Syncer {
             return false
         }
 
+        BinlogData binlogData = context.lastBinlog
+
         // step 2: Begin ingest binlog
         // step 2.1: ingest each table in meta
         for (Entry<String, TableMeta> tableInfo : context.sourceTableMap) {
             String tableName = tableInfo.key
             TableMeta srcTableMeta = tableInfo.value
+            if (!binlogData.tableRecords.containsKey(srcTableMeta.id)) {
+                continue
+            }
+
+            PartitionRecords binlogRecords = 
binlogData.tableRecords.get(srcTableMeta.id)
+
             TableMeta tarTableMeta = context.targetTableMap.get(tableName)
 
             Iterator sourcePartitionIter = srcTableMeta.partitionMap.iterator()
             Iterator targetPartitionIter = tarTableMeta.partitionMap.iterator()
+
             // step 2.2: ingest each partition in the table
             while (sourcePartitionIter.hasNext()) {
                 Entry srcPartition = sourcePartitionIter.next()
                 Entry tarPartition = targetPartitionIter.next()
+                if (!binlogRecords.contains(srcPartition.key)) {
+                    continue
+                }
                 Iterator srcTabletIter = 
srcPartition.value.tabletMeta.iterator()
                 Iterator tarTabletIter = 
tarPartition.value.tabletMeta.iterator()
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
index cd5c3d5d11..42659c892f 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
@@ -18,6 +18,7 @@
 package org.apache.doris.regression.suite
 
 import org.apache.doris.regression.Config
+import org.apache.doris.regression.json.BinlogData
 import org.apache.doris.regression.suite.client.BackendClientImpl
 import org.apache.doris.regression.suite.client.FrontendClientImpl
 import org.apache.doris.thrift.TTabletCommitInfo
@@ -96,10 +97,9 @@ class SyncerContext {
     protected FrontendClientImpl sourceFrontendClient
     protected FrontendClientImpl targetFrontendClient
 
-    protected Long sourceDbId
-    protected HashMap<Long, String> sourceTableIdToName = new HashMap<>()
+    protected long sourceDbId
     protected HashMap<String, TableMeta> sourceTableMap = new HashMap<>()
-    protected Long targetDbId
+    protected long targetDbId
     protected HashMap<String, TableMeta> targetTableMap = new HashMap<>()
 
     protected HashMap<Long, BackendClientImpl> sourceBackendClients = new 
HashMap<Long, BackendClientImpl>()
@@ -107,6 +107,8 @@ class SyncerContext {
 
     public ArrayList<TTabletCommitInfo> commitInfos = new 
ArrayList<TTabletCommitInfo>()
 
+    public BinlogData lastBinlog
+
     public String labelName
     public String tableName
     public TGetSnapshotResult getSnapshotResult
@@ -120,6 +122,8 @@ class SyncerContext {
     public long seq
 
     SyncerContext(String dbName, Config config) {
+        this.sourceDbId = -1
+        this.targetDbId = -1
         this.db = dbName
         this.config = config
         this.user = config.feSyncerUser
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
index 7801233000..dbcd1dba5d 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
@@ -44,29 +44,32 @@ class SyncerUtils {
         request.setPasswd(context.passwd)
     }
 
-    private static String newLabel(SyncerContext context, String table) {
-        return String.format("ccr_sync_job:%s:%s:%d", context.db, table, 
context.seq)
+    private static String newLabel(SyncerContext context, Long tableId) {
+        return String.format("ccr_sync_job:%s:%d:%d", context.db, tableId, 
context.seq)
     }
 
-    static TGetBinlogResult getBinLog(FrontendClientImpl clientImpl, 
SyncerContext context, String table) throws TException {
+    static TGetBinlogResult getBinLog(FrontendClientImpl clientImpl, 
SyncerContext context, String table, Long tableId) throws TException {
         TGetBinlogRequest request = new TGetBinlogRequest()
         setAuthorInformation(request, context)
         request.setDb(context.db)
         if (!table.isEmpty()) {
             request.setTable(table)
         }
+        if (tableId != -1) {
+            request.setTableId(tableId)
+        }
         request.setPrevCommitSeq(context.seq)
         return clientImpl.client.getBinlog(request)
     }
 
-    static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl, 
SyncerContext context, String table) throws TException {
+    static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl, 
SyncerContext context, Long tableId) throws TException {
         TBeginTxnRequest request = new TBeginTxnRequest()
         setAuthorInformation(request, context)
         request.setDb("TEST_" + context.db)
-        if (table != null) {
-            request.addToTables(table)
+        if (tableId != -1) {
+            request.addToTableIds(tableId)
         }
-        request.setLabel(newLabel(context, table))
+        request.setLabel(newLabel(context, tableId))
         return clientImpl.client.beginTxn(request)
 
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to