This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 793172a4a9de2990ed1481aad8e0e348d022deb4 Author: DeadlineFen <[email protected]> AuthorDate: Wed Jul 5 10:16:09 2023 +0800 [Fix](CCR) Use tableId as the credential for CCR syncer instead of tableName (#21466) --- .../apache/doris/service/FrontendServiceImpl.java | 43 +++-- gensrc/thrift/FrontendService.thrift | 9 +- .../apache/doris/regression/json/BinlogData.groovy | 10 ++ .../apache/doris/regression/suite/Syncer.groovy | 179 ++++++++++++--------- .../doris/regression/suite/SyncerContext.groovy | 10 +- .../doris/regression/util/SyncerUtils.groovy | 17 +- 6 files changed, 161 insertions(+), 107 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index d296663745..0c00615a9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -914,6 +914,24 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + private List<String> getTableNames(String cluster, String dbName, List<Long> tableIds) throws UserException { + final String fullDbName = ClusterNamespace.getFullName(cluster, dbName); + Database db = Env.getCurrentInternalCatalog().getDbNullable(fullDbName); + if (db == null) { + throw new UserException(String.format("can't find db named: %s", dbName)); + } + List<String> tableNames = Lists.newArrayList(); + for (Long id : tableIds) { + Table table = db.getTableNullable(id); + if (table == null) { + throw new UserException(String.format("can't find table id: %d in db: %s", id, dbName)); + } + tableNames.add(table.getName()); + } + + return tableNames; + } + private void checkPasswordAndPrivs(String cluster, String user, String passwd, String db, String tbl, String clientIp, PrivPredicate predicate) throws AuthenticationException { checkPasswordAndPrivs(cluster, user, passwd, db, Lists.newArrayList(tbl), clientIp, predicate); @@ -1070,8 +1088,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (!request.isSetDb()) { throw new UserException("db is not set"); } - if (!request.isSetTables()) { - throw new UserException("tables is not set"); + if (!request.isSetTableIds()) { + throw new UserException("table ids is not set"); } if (!request.isSetLabel()) { throw new UserException("label is not set"); @@ -1084,7 +1102,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { // step 1: check auth if (Strings.isNullOrEmpty(request.getToken())) { - checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), request.getTables(), + // lookup table ids && convert into tableNameList + List<String> tableNameList = getTableNames(cluster, request.getDb(), request.getTableIds()); + checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), tableNameList, request.getUserIp(), PrivPredicate.LOAD); } @@ -1106,15 +1126,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { } // step 4: fetch all tableIds - // lookup tables && convert into tableIdList - List<Long> tableIdList = Lists.newArrayList(); - for (String tblName : request.getTables()) { - Table table = db.getTableOrMetaException(tblName, TableType.OLAP); - if (table == null) { - throw new UserException("unknown table, table=" + tblName); - } - tableIdList.add(table.getId()); - } + // table ids is checked at step 1 + List<Long> tableIdList = request.getTableIds(); // step 5: get timeout long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; @@ -2320,8 +2333,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { // step 4: fetch all tableIds // lookup tables && convert into tableIdList long tableId = -1; - String tableName = request.getTable(); - if (!Strings.isNullOrEmpty(tableName)) { + if (request.isSetTableId()) { + tableId = request.getTableId(); + } else if (request.isSetTable()) { + String tableName = request.getTable(); Table table = db.getTableOrMetaException(tableName, TableType.OLAP); if (table == null) { throw new UserException("unknown table, table=" + tableName); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 32e2e3b0a3..5cf08a4e38 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -528,7 +528,7 @@ struct TBeginTxnRequest { 2: optional string user 3: optional string passwd 4: optional string db - 5: optional list<string> tables + 5: optional list<i64> table_ids 6: optional string user_ip 7: optional string label 8: optional i64 auth_code @@ -947,9 +947,10 @@ struct TGetBinlogRequest { 3: optional string passwd 4: optional string db 5: optional string table - 6: optional string user_ip - 7: optional string token - 8: optional i64 prev_commit_seq + 6: optional i64 table_id + 7: optional string user_ip + 8: optional string token + 9: optional i64 prev_commit_seq } enum TBinlogType { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy index 868a363828..8ccec9a666 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy @@ -29,6 +29,16 @@ class PartitionData { class PartitionRecords { public List<PartitionData> partitionRecords + Boolean contains(Long partitionId) { + for (PartitionData data : partitionRecords) { + if (data.partitionId == partitionId) { + return true + } + } + + return false + } + String toString() { return partitionRecords.toString() } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy index 630f158dae..28db08c589 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy @@ -17,8 +17,10 @@ package org.apache.doris.regression.suite +import com.google.common.collect.Maps import com.google.gson.Gson import org.apache.doris.regression.Config +import org.apache.doris.regression.json.PartitionRecords import org.apache.doris.regression.suite.client.BackendClientImpl import org.apache.doris.regression.suite.client.FrontendClientImpl import org.apache.doris.regression.util.SyncerUtils @@ -60,7 +62,7 @@ class Syncer { TARGET } - private Boolean checkBinlog(TBinlog binlog, Boolean update) { + private Boolean checkBinlog(TBinlog binlog, String table, Boolean update) { // step 1: check binlog availability if (binlog == null) { @@ -102,8 +104,10 @@ class Syncer { logger.info("binlog data is ${data}") if (update) { Gson gson = new Gson() - getSourceMeta(gson.fromJson(data, BinlogData.class)) - logger.info("Source tableId: ${context.sourceTableMap}, ${context.sourceTableMap}") + context.lastBinlog = gson.fromJson(data, BinlogData.class) + logger.info("Source lastBinlog: ${context.lastBinlog}") + + return getSourceMeta(table) } } else { logger.error("binlog data is not contains data!") @@ -155,7 +159,7 @@ class Syncer { } // step 2: check binlog - return checkBinlog(binlog, update) + return checkBinlog(binlog, table, update) } private Boolean checkBeginTxn(TBeginTxnResult result) { @@ -372,41 +376,6 @@ class Syncer { return clientsMap } - void getSourceMeta(BinlogData binlogData) { - logger.info("data struct: ${binlogData}") - context.sourceTableIdToName.clear() - context.sourceTableMap.clear() - - context.sourceDbId = binlogData.dbId - - String metaSQL = "SHOW PROC '/dbs/" + binlogData.dbId.toString() - List<List<Object>> sqlInfo = suite.sql(metaSQL + "'") - - // Get table information - for (List<Object> row : sqlInfo) { - context.sourceTableIdToName.put(row[0] as Long, row[1] as String) - } - - // Get table meta in binlog - binlogData.tableRecords.forEach((tableId, partitionRecord) -> { - String tableName = context.sourceTableIdToName.get(tableId) - TableMeta tableMeta = new TableMeta(tableId) - - partitionRecord.partitionRecords.forEach { - String partitionSQL = metaSQL + "/" + tableId.toString() + "/partitions/" + it.partitionId.toString() - sqlInfo = suite.sql(partitionSQL + "'") - PartitionMeta partitionMeta = new PartitionMeta(sqlInfo[0][0] as Long, it.version) - partitionSQL += "/" + partitionMeta.indexId.toString() - sqlInfo = suite.sql(partitionSQL + "'") - sqlInfo.forEach(row -> { - partitionMeta.tabletMeta.put((row[0] as Long), (row[2] as Long)) - }) - tableMeta.partitionMap.put(it.partitionId, partitionMeta) - } - context.sourceTableMap.put(tableName, tableMeta) - }) - } - ArrayList<TTabletCommitInfo> copyCommitInfos() { return new ArrayList<TTabletCommitInfo>(context.commitInfos) } @@ -472,86 +441,118 @@ class Syncer { return checkGetSnapshot() } - Boolean getTargetMeta(String table = "") { - logger.info("Get target cluster meta data.") + Boolean getSourceMeta(String table = "") { + logger.info("Get source cluster meta") String baseSQL = "SHOW PROC '/dbs" List<List<Object>> sqlInfo + if (context.sourceDbId == -1) { + sqlInfo = suite.sql(baseSQL + "'") + for (List<Object> row : sqlInfo) { + String[] dbName = (row[1] as String).split(":") + if (dbName[1] == context.db) { + context.sourceDbId = row[0] as Long + break + } + } + } + if (context.sourceDbId == -1) { + logger.error("Get ${context.db} db error.") + return false + } + baseSQL += "/" + context.sourceDbId.toString() + return getMeta(baseSQL, table, context.sourceTableMap, true) + } - // step 1: get target dbId - Long dbId = -1 - sqlInfo = suite.target_sql(baseSQL + "'") - for (List<Object> row : sqlInfo) { - String[] dbName = (row[1] as String).split(":") - if (dbName[1] == ("TEST_" + context.db)) { - dbId = row[0] as Long - break + Boolean getTargetMeta(String table = "") { + logger.info("Get target cluster meta") + String baseSQL = "SHOW PROC '/dbs" + List<List<Object>> sqlInfo + if (context.targetDbId == -1) { + sqlInfo = suite.target_sql(baseSQL + "'") + for (List<Object> row : sqlInfo) { + String[] dbName = (row[1] as String).split(":") + if (dbName[1] == "TEST_" + context.db) { + context.targetDbId = row[0] as Long + break + } } } - if (dbId == -1) { - logger.error("Target cluster get ${context.db} db error.") + if (context.targetDbId == -1) { + logger.error("Get TEST_${context.db} db error.") return false } - context.targetDbId = dbId + baseSQL += "/" + context.targetDbId.toString() + return getMeta(baseSQL, table, context.targetTableMap, false) + } - // step 2: get target dbId/tableId - baseSQL += "/" + dbId.toString() - sqlInfo = suite.target_sql(baseSQL + "'") + Boolean getMeta(String baseSql, String table, Map<String, TableMeta> metaMap, Boolean toSrc) { + def sendSql = { String sqlStmt, Boolean isToSrc -> List<List<Object>> + if (isToSrc) { + return suite.sql(sqlStmt + "'") + } else { + return suite.target_sql(sqlStmt + "'") + } + } + + List<List<Object>> sqlInfo + + // step 1: get target dbId/tableId + sqlInfo = sendSql.call(baseSql, toSrc) if (table == "") { for (List<Object> row : sqlInfo) { - context.targetTableMap.put(row[1] as String, new TableMeta(row[0] as long)) + metaMap.put(row[1] as String, new TableMeta(row[0] as long)) } } else { for (List<Object> row : sqlInfo) { if ((row[1] as String) == table) { - context.targetTableMap.put(row[1] as String, new TableMeta(row[0] as long)) + metaMap.put(row[1] as String, new TableMeta(row[0] as long)) break } } } - - // step 3: get partitionIds - context.targetTableMap.values().forEach { - baseSQL += "/" + it.id.toString() + "/partitions" - ArrayList<Long> partitionIds = new ArrayList<Long>() - sqlInfo = suite.target_sql(baseSQL + "'") + // step 2: get partitionIds + metaMap.values().forEach { + baseSql += "/" + it.id.toString() + "/partitions" + Map<Long, Long> partitionInfo = Maps.newHashMap() + sqlInfo = sendSql.call(baseSql, toSrc) for (List<Object> row : sqlInfo) { - partitionIds.add(row[0] as Long) + partitionInfo.put(row[0] as Long, row[2] as Long) } - if (partitionIds.isEmpty()) { + if (partitionInfo.isEmpty()) { logger.error("Target cluster get partitions fault.") return false } - // step 4: get partitionMetas - for (Long id : partitionIds) { + // step 3: get partitionMetas + for (Entry<Long, Long> info : partitionInfo) { - // step 4.1: get partition/indexId - String partitionSQl = baseSQL + "/" + id.toString() - sqlInfo = suite.target_sql(partitionSQl + "'") + // step 3.1: get partition/indexId + String partitionSQl = baseSql + "/" + info.key.toString() + sqlInfo = sendSql.call(partitionSQl, toSrc) if (sqlInfo.isEmpty()) { - logger.error("Target cluster partition-${id} indexId fault.") + logger.error("Target cluster partition-${info.key} indexId fault.") return false } - PartitionMeta meta = new PartitionMeta(sqlInfo[0][0] as Long, -1) + PartitionMeta meta = new PartitionMeta(sqlInfo[0][0] as Long, info.value) - // step 4.2: get partition/indexId/tabletId + // step 3.2: get partition/indexId/tabletId partitionSQl += "/" + meta.indexId.toString() - sqlInfo = suite.target_sql(partitionSQl + "'") + sqlInfo = sendSql.call(partitionSQl, toSrc) for (List<Object> row : sqlInfo) { meta.tabletMeta.put(row[0] as Long, row[2] as Long) } if (meta.tabletMeta.isEmpty()) { - logger.error("Target cluster get (partitionId/indexId)-(${id}/${meta.indexId}) tabletIds fault.") + logger.error("Target cluster get (partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.") return false } - it.partitionMap.put(id, meta) + it.partitionMap.put(info.key, meta) } } - logger.info("Target cluster metadata: ${context.targetTableMap}") + logger.info("cluster metadata: ${metaMap}") return true } @@ -581,14 +582,22 @@ class Syncer { Boolean getBinlog(String table = "", Boolean update = true) { logger.info("Get binlog from source cluster ${context.config.feSourceThriftNetworkAddress}, binlog seq: ${context.seq}") FrontendClientImpl clientImpl = context.getSourceFrontClient() - TGetBinlogResult result = SyncerUtils.getBinLog(clientImpl, context, table) + Long tableId = -1 + if (!table.isEmpty() && context.sourceTableMap.containsKey(table)) { + tableId = context.sourceTableMap.get(table).id + } + TGetBinlogResult result = SyncerUtils.getBinLog(clientImpl, context, table, tableId) return checkGetBinlog(table, result, update) } Boolean beginTxn(String table) { logger.info("Begin transaction to target cluster ${context.config.feTargetThriftNetworkAddress}") FrontendClientImpl clientImpl = context.getTargetFrontClient() - TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context, table) + Long tableId = -1 + if (context.sourceTableMap.containsKey(table)) { + tableId = context.targetTableMap.get(table).id + } + TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context, tableId) return checkBeginTxn(result) } @@ -601,19 +610,31 @@ class Syncer { return false } + BinlogData binlogData = context.lastBinlog + // step 2: Begin ingest binlog // step 2.1: ingest each table in meta for (Entry<String, TableMeta> tableInfo : context.sourceTableMap) { String tableName = tableInfo.key TableMeta srcTableMeta = tableInfo.value + if (!binlogData.tableRecords.containsKey(srcTableMeta.id)) { + continue + } + + PartitionRecords binlogRecords = binlogData.tableRecords.get(srcTableMeta.id) + TableMeta tarTableMeta = context.targetTableMap.get(tableName) Iterator sourcePartitionIter = srcTableMeta.partitionMap.iterator() Iterator targetPartitionIter = tarTableMeta.partitionMap.iterator() + // step 2.2: ingest each partition in the table while (sourcePartitionIter.hasNext()) { Entry srcPartition = sourcePartitionIter.next() Entry tarPartition = targetPartitionIter.next() + if (!binlogRecords.contains(srcPartition.key)) { + continue + } Iterator srcTabletIter = srcPartition.value.tabletMeta.iterator() Iterator tarTabletIter = tarPartition.value.tabletMeta.iterator() diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy index 1b00c9caa6..fcc4d3116b 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy @@ -18,6 +18,7 @@ package org.apache.doris.regression.suite import org.apache.doris.regression.Config +import org.apache.doris.regression.json.BinlogData import org.apache.doris.regression.suite.client.BackendClientImpl import org.apache.doris.regression.suite.client.FrontendClientImpl import org.apache.doris.thrift.TTabletCommitInfo @@ -96,10 +97,9 @@ class SyncerContext { protected FrontendClientImpl sourceFrontendClient protected FrontendClientImpl targetFrontendClient - protected Long sourceDbId - protected HashMap<Long, String> sourceTableIdToName = new HashMap<>() + protected long sourceDbId protected HashMap<String, TableMeta> sourceTableMap = new HashMap<>() - protected Long targetDbId + protected long targetDbId protected HashMap<String, TableMeta> targetTableMap = new HashMap<>() protected HashMap<Long, BackendClientImpl> sourceBackendClients = new HashMap<Long, BackendClientImpl>() @@ -107,6 +107,8 @@ class SyncerContext { public ArrayList<TTabletCommitInfo> commitInfos = new ArrayList<TTabletCommitInfo>() + public BinlogData lastBinlog + public String labelName public String tableName public TGetSnapshotResult getSnapshotResult @@ -119,6 +121,8 @@ class SyncerContext { public long seq SyncerContext(String dbName, Config config) { + this.sourceDbId = -1 + this.targetDbId = -1 this.db = dbName this.config = config this.user = config.feSyncerUser diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy index d622923d35..3ff539966b 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy @@ -42,29 +42,32 @@ class SyncerUtils { request.setPasswd(context.passwd) } - private static String newLabel(SyncerContext context, String table) { - return String.format("ccr_sync_job:%s:%s:%d", context.db, table, context.seq) + private static String newLabel(SyncerContext context, Long tableId) { + return String.format("ccr_sync_job:%s:%d:%d", context.db, tableId, context.seq) } - static TGetBinlogResult getBinLog(FrontendClientImpl clientImpl, SyncerContext context, String table) throws TException { + static TGetBinlogResult getBinLog(FrontendClientImpl clientImpl, SyncerContext context, String table, Long tableId) throws TException { TGetBinlogRequest request = new TGetBinlogRequest() setAuthorInformation(request, context) request.setDb(context.db) if (!table.isEmpty()) { request.setTable(table) } + if (tableId != -1) { + request.setTableId(tableId) + } request.setPrevCommitSeq(context.seq) return clientImpl.client.getBinlog(request) } - static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl, SyncerContext context, String table) throws TException { + static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl, SyncerContext context, Long tableId) throws TException { TBeginTxnRequest request = new TBeginTxnRequest() setAuthorInformation(request, context) request.setDb("TEST_" + context.db) - if (table != null) { - request.addToTables(table) + if (tableId != -1) { + request.addToTableIds(tableId) } - request.setLabel(newLabel(context, table)) + request.setLabel(newLabel(context, tableId)) return clientImpl.client.beginTxn(request) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
