This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 15ec191a77 [Fix](CCR) Use tableId as the credential for CCR syncer
instead of tableName (#21466)
15ec191a77 is described below
commit 15ec191a779a4b9440b5d54d55a63f62bfa1da36
Author: DeadlineFen <[email protected]>
AuthorDate: Wed Jul 5 10:16:09 2023 +0800
[Fix](CCR) Use tableId as the credential for CCR syncer instead of
tableName (#21466)
---
.../apache/doris/service/FrontendServiceImpl.java | 43 +++--
gensrc/thrift/FrontendService.thrift | 9 +-
.../apache/doris/regression/json/BinlogData.groovy | 10 ++
.../apache/doris/regression/suite/Syncer.groovy | 179 ++++++++++++---------
.../doris/regression/suite/SyncerContext.groovy | 10 +-
.../doris/regression/util/SyncerUtils.groovy | 17 +-
6 files changed, 161 insertions(+), 107 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d296663745..0c00615a9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -914,6 +914,24 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ private List<String> getTableNames(String cluster, String dbName,
List<Long> tableIds) throws UserException {
+ final String fullDbName = ClusterNamespace.getFullName(cluster,
dbName);
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(fullDbName);
+ if (db == null) {
+ throw new UserException(String.format("can't find db named: %s",
dbName));
+ }
+ List<String> tableNames = Lists.newArrayList();
+ for (Long id : tableIds) {
+ Table table = db.getTableNullable(id);
+ if (table == null) {
+ throw new UserException(String.format("can't find table id: %d
in db: %s", id, dbName));
+ }
+ tableNames.add(table.getName());
+ }
+
+ return tableNames;
+ }
+
private void checkPasswordAndPrivs(String cluster, String user, String
passwd, String db, String tbl,
String clientIp, PrivPredicate
predicate) throws AuthenticationException {
checkPasswordAndPrivs(cluster, user, passwd, db,
Lists.newArrayList(tbl), clientIp, predicate);
@@ -1070,8 +1088,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
- if (!request.isSetTables()) {
- throw new UserException("tables is not set");
+ if (!request.isSetTableIds()) {
+ throw new UserException("table ids is not set");
}
if (!request.isSetLabel()) {
throw new UserException("label is not set");
@@ -1084,7 +1102,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
- checkPasswordAndPrivs(cluster, request.getUser(),
request.getPasswd(), request.getDb(), request.getTables(),
+ // lookup table ids && convert into tableNameList
+ List<String> tableNameList = getTableNames(cluster,
request.getDb(), request.getTableIds());
+ checkPasswordAndPrivs(cluster, request.getUser(),
request.getPasswd(), request.getDb(), tableNameList,
request.getUserIp(), PrivPredicate.LOAD);
}
@@ -1106,15 +1126,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
// step 4: fetch all tableIds
- // lookup tables && convert into tableIdList
- List<Long> tableIdList = Lists.newArrayList();
- for (String tblName : request.getTables()) {
- Table table = db.getTableOrMetaException(tblName, TableType.OLAP);
- if (table == null) {
- throw new UserException("unknown table, table=" + tblName);
- }
- tableIdList.add(table.getId());
- }
+ // table ids is checked at step 1
+ List<Long> tableIdList = request.getTableIds();
// step 5: get timeout
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() :
Config.stream_load_default_timeout_second;
@@ -2320,8 +2333,10 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// step 4: fetch all tableIds
// lookup tables && convert into tableIdList
long tableId = -1;
- String tableName = request.getTable();
- if (!Strings.isNullOrEmpty(tableName)) {
+ if (request.isSetTableId()) {
+ tableId = request.getTableId();
+ } else if (request.isSetTable()) {
+ String tableName = request.getTable();
Table table = db.getTableOrMetaException(tableName,
TableType.OLAP);
if (table == null) {
throw new UserException("unknown table, table=" + tableName);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 32e2e3b0a3..5cf08a4e38 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -528,7 +528,7 @@ struct TBeginTxnRequest {
2: optional string user
3: optional string passwd
4: optional string db
- 5: optional list<string> tables
+ 5: optional list<i64> table_ids
6: optional string user_ip
7: optional string label
8: optional i64 auth_code
@@ -947,9 +947,10 @@ struct TGetBinlogRequest {
3: optional string passwd
4: optional string db
5: optional string table
- 6: optional string user_ip
- 7: optional string token
- 8: optional i64 prev_commit_seq
+ 6: optional i64 table_id
+ 7: optional string user_ip
+ 8: optional string token
+ 9: optional i64 prev_commit_seq
}
enum TBinlogType {
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
index 868a363828..8ccec9a666 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/json/BinlogData.groovy
@@ -29,6 +29,16 @@ class PartitionData {
class PartitionRecords {
public List<PartitionData> partitionRecords
+ Boolean contains(Long partitionId) {
+ for (PartitionData data : partitionRecords) {
+ if (data.partitionId == partitionId) {
+ return true
+ }
+ }
+
+ return false
+ }
+
String toString() {
return partitionRecords.toString()
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 805093ef4f..9504f2a874 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -17,8 +17,10 @@
package org.apache.doris.regression.suite
+import com.google.common.collect.Maps
import com.google.gson.Gson
import org.apache.doris.regression.Config
+import org.apache.doris.regression.json.PartitionRecords
import org.apache.doris.regression.suite.client.BackendClientImpl
import org.apache.doris.regression.suite.client.FrontendClientImpl
import org.apache.doris.regression.util.SyncerUtils
@@ -61,7 +63,7 @@ class Syncer {
TARGET
}
- private Boolean checkBinlog(TBinlog binlog, Boolean update) {
+ private Boolean checkBinlog(TBinlog binlog, String table, Boolean update) {
// step 1: check binlog availability
if (binlog == null) {
@@ -103,8 +105,10 @@ class Syncer {
logger.info("binlog data is ${data}")
if (update) {
Gson gson = new Gson()
- getSourceMeta(gson.fromJson(data, BinlogData.class))
- logger.info("Source tableId: ${context.sourceTableMap},
${context.sourceTableMap}")
+ context.lastBinlog = gson.fromJson(data, BinlogData.class)
+ logger.info("Source lastBinlog: ${context.lastBinlog}")
+
+ return getSourceMeta(table)
}
} else {
logger.error("binlog data is not contains data!")
@@ -156,7 +160,7 @@ class Syncer {
}
// step 2: check binlog
- return checkBinlog(binlog, update)
+ return checkBinlog(binlog, table, update)
}
private Boolean checkBeginTxn(TBeginTxnResult result) {
@@ -404,41 +408,6 @@ class Syncer {
return clientsMap
}
- void getSourceMeta(BinlogData binlogData) {
- logger.info("data struct: ${binlogData}")
- context.sourceTableIdToName.clear()
- context.sourceTableMap.clear()
-
- context.sourceDbId = binlogData.dbId
-
- String metaSQL = "SHOW PROC '/dbs/" + binlogData.dbId.toString()
- List<List<Object>> sqlInfo = suite.sql(metaSQL + "'")
-
- // Get table information
- for (List<Object> row : sqlInfo) {
- context.sourceTableIdToName.put(row[0] as Long, row[1] as String)
- }
-
- // Get table meta in binlog
- binlogData.tableRecords.forEach((tableId, partitionRecord) -> {
- String tableName = context.sourceTableIdToName.get(tableId)
- TableMeta tableMeta = new TableMeta(tableId)
-
- partitionRecord.partitionRecords.forEach {
- String partitionSQL = metaSQL + "/" + tableId.toString() +
"/partitions/" + it.partitionId.toString()
- sqlInfo = suite.sql(partitionSQL + "'")
- PartitionMeta partitionMeta = new PartitionMeta(sqlInfo[0][0]
as Long, it.version)
- partitionSQL += "/" + partitionMeta.indexId.toString()
- sqlInfo = suite.sql(partitionSQL + "'")
- sqlInfo.forEach(row -> {
- partitionMeta.tabletMeta.put((row[0] as Long), (row[2] as
Long))
- })
- tableMeta.partitionMap.put(it.partitionId, partitionMeta)
- }
- context.sourceTableMap.put(tableName, tableMeta)
- })
- }
-
ArrayList<TTabletCommitInfo> copyCommitInfos() {
return new ArrayList<TTabletCommitInfo>(context.commitInfos)
}
@@ -518,86 +487,118 @@ class Syncer {
return checkGetSnapshot()
}
- Boolean getTargetMeta(String table = "") {
- logger.info("Get target cluster meta data.")
+ Boolean getSourceMeta(String table = "") {
+ logger.info("Get source cluster meta")
String baseSQL = "SHOW PROC '/dbs"
List<List<Object>> sqlInfo
+ if (context.sourceDbId == -1) {
+ sqlInfo = suite.sql(baseSQL + "'")
+ for (List<Object> row : sqlInfo) {
+ String[] dbName = (row[1] as String).split(":")
+ if (dbName[1] == context.db) {
+ context.sourceDbId = row[0] as Long
+ break
+ }
+ }
+ }
+ if (context.sourceDbId == -1) {
+ logger.error("Get ${context.db} db error.")
+ return false
+ }
+ baseSQL += "/" + context.sourceDbId.toString()
+ return getMeta(baseSQL, table, context.sourceTableMap, true)
+ }
- // step 1: get target dbId
- Long dbId = -1
- sqlInfo = suite.target_sql(baseSQL + "'")
- for (List<Object> row : sqlInfo) {
- String[] dbName = (row[1] as String).split(":")
- if (dbName[1] == ("TEST_" + context.db)) {
- dbId = row[0] as Long
- break
+ Boolean getTargetMeta(String table = "") {
+ logger.info("Get target cluster meta")
+ String baseSQL = "SHOW PROC '/dbs"
+ List<List<Object>> sqlInfo
+ if (context.targetDbId == -1) {
+ sqlInfo = suite.target_sql(baseSQL + "'")
+ for (List<Object> row : sqlInfo) {
+ String[] dbName = (row[1] as String).split(":")
+ if (dbName[1] == "TEST_" + context.db) {
+ context.targetDbId = row[0] as Long
+ break
+ }
}
}
- if (dbId == -1) {
- logger.error("Target cluster get ${context.db} db error.")
+ if (context.targetDbId == -1) {
+ logger.error("Get TEST_${context.db} db error.")
return false
}
- context.targetDbId = dbId
+ baseSQL += "/" + context.targetDbId.toString()
+ return getMeta(baseSQL, table, context.targetTableMap, false)
+ }
+
+ Boolean getMeta(String baseSql, String table, Map<String, TableMeta>
metaMap, Boolean toSrc) {
+ def sendSql = { String sqlStmt, Boolean isToSrc -> List<List<Object>>
+ if (isToSrc) {
+ return suite.sql(sqlStmt + "'")
+ } else {
+ return suite.target_sql(sqlStmt + "'")
+ }
+ }
+
+ List<List<Object>> sqlInfo
- // step 2: get target dbId/tableId
- baseSQL += "/" + dbId.toString()
- sqlInfo = suite.target_sql(baseSQL + "'")
+ // step 1: get target dbId/tableId
+ sqlInfo = sendSql.call(baseSql, toSrc)
if (table == "") {
for (List<Object> row : sqlInfo) {
- context.targetTableMap.put(row[1] as String, new
TableMeta(row[0] as long))
+ metaMap.put(row[1] as String, new TableMeta(row[0] as long))
}
} else {
for (List<Object> row : sqlInfo) {
if ((row[1] as String) == table) {
- context.targetTableMap.put(row[1] as String, new
TableMeta(row[0] as long))
+ metaMap.put(row[1] as String, new TableMeta(row[0] as
long))
break
}
}
}
-
- // step 3: get partitionIds
- context.targetTableMap.values().forEach {
- baseSQL += "/" + it.id.toString() + "/partitions"
- ArrayList<Long> partitionIds = new ArrayList<Long>()
- sqlInfo = suite.target_sql(baseSQL + "'")
+ // step 2: get partitionIds
+ metaMap.values().forEach {
+ baseSql += "/" + it.id.toString() + "/partitions"
+ Map<Long, Long> partitionInfo = Maps.newHashMap()
+ sqlInfo = sendSql.call(baseSql, toSrc)
for (List<Object> row : sqlInfo) {
- partitionIds.add(row[0] as Long)
+ partitionInfo.put(row[0] as Long, row[2] as Long)
}
- if (partitionIds.isEmpty()) {
+ if (partitionInfo.isEmpty()) {
logger.error("Target cluster get partitions fault.")
return false
}
- // step 4: get partitionMetas
- for (Long id : partitionIds) {
+ // step 3: get partitionMetas
+ for (Entry<Long, Long> info : partitionInfo) {
- // step 4.1: get partition/indexId
- String partitionSQl = baseSQL + "/" + id.toString()
- sqlInfo = suite.target_sql(partitionSQl + "'")
+ // step 3.1: get partition/indexId
+ String partitionSQl = baseSql + "/" + info.key.toString()
+ sqlInfo = sendSql.call(partitionSQl, toSrc)
if (sqlInfo.isEmpty()) {
- logger.error("Target cluster partition-${id} indexId
fault.")
+ logger.error("Target cluster partition-${info.key} indexId
fault.")
return false
}
- PartitionMeta meta = new PartitionMeta(sqlInfo[0][0] as Long,
-1)
+ PartitionMeta meta = new PartitionMeta(sqlInfo[0][0] as Long,
info.value)
- // step 4.2: get partition/indexId/tabletId
+ // step 3.2: get partition/indexId/tabletId
partitionSQl += "/" + meta.indexId.toString()
- sqlInfo = suite.target_sql(partitionSQl + "'")
+ sqlInfo = sendSql.call(partitionSQl, toSrc)
for (List<Object> row : sqlInfo) {
meta.tabletMeta.put(row[0] as Long, row[2] as Long)
}
if (meta.tabletMeta.isEmpty()) {
- logger.error("Target cluster get
(partitionId/indexId)-(${id}/${meta.indexId}) tabletIds fault.")
+ logger.error("Target cluster get
(partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.")
return false
}
- it.partitionMap.put(id, meta)
+ it.partitionMap.put(info.key, meta)
}
}
- logger.info("Target cluster metadata: ${context.targetTableMap}")
+ logger.info("cluster metadata: ${metaMap}")
return true
}
@@ -627,14 +628,22 @@ class Syncer {
Boolean getBinlog(String table = "", Boolean update = true) {
logger.info("Get binlog from source cluster
${context.config.feSourceThriftNetworkAddress}, binlog seq: ${context.seq}")
FrontendClientImpl clientImpl = context.getSourceFrontClient()
- TGetBinlogResult result = SyncerUtils.getBinLog(clientImpl, context,
table)
+ Long tableId = -1
+ if (!table.isEmpty() && context.sourceTableMap.containsKey(table)) {
+ tableId = context.sourceTableMap.get(table).id
+ }
+ TGetBinlogResult result = SyncerUtils.getBinLog(clientImpl, context,
table, tableId)
return checkGetBinlog(table, result, update)
}
Boolean beginTxn(String table) {
logger.info("Begin transaction to target cluster
${context.config.feTargetThriftNetworkAddress}")
FrontendClientImpl clientImpl = context.getTargetFrontClient()
- TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context,
table)
+ Long tableId = -1
+ if (context.sourceTableMap.containsKey(table)) {
+ tableId = context.targetTableMap.get(table).id
+ }
+ TBeginTxnResult result = SyncerUtils.beginTxn(clientImpl, context,
tableId)
return checkBeginTxn(result)
}
@@ -647,19 +656,31 @@ class Syncer {
return false
}
+ BinlogData binlogData = context.lastBinlog
+
// step 2: Begin ingest binlog
// step 2.1: ingest each table in meta
for (Entry<String, TableMeta> tableInfo : context.sourceTableMap) {
String tableName = tableInfo.key
TableMeta srcTableMeta = tableInfo.value
+ if (!binlogData.tableRecords.containsKey(srcTableMeta.id)) {
+ continue
+ }
+
+ PartitionRecords binlogRecords =
binlogData.tableRecords.get(srcTableMeta.id)
+
TableMeta tarTableMeta = context.targetTableMap.get(tableName)
Iterator sourcePartitionIter = srcTableMeta.partitionMap.iterator()
Iterator targetPartitionIter = tarTableMeta.partitionMap.iterator()
+
// step 2.2: ingest each partition in the table
while (sourcePartitionIter.hasNext()) {
Entry srcPartition = sourcePartitionIter.next()
Entry tarPartition = targetPartitionIter.next()
+ if (!binlogRecords.contains(srcPartition.key)) {
+ continue
+ }
Iterator srcTabletIter =
srcPartition.value.tabletMeta.iterator()
Iterator tarTabletIter =
tarPartition.value.tabletMeta.iterator()
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
index cd5c3d5d11..42659c892f 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
@@ -18,6 +18,7 @@
package org.apache.doris.regression.suite
import org.apache.doris.regression.Config
+import org.apache.doris.regression.json.BinlogData
import org.apache.doris.regression.suite.client.BackendClientImpl
import org.apache.doris.regression.suite.client.FrontendClientImpl
import org.apache.doris.thrift.TTabletCommitInfo
@@ -96,10 +97,9 @@ class SyncerContext {
protected FrontendClientImpl sourceFrontendClient
protected FrontendClientImpl targetFrontendClient
- protected Long sourceDbId
- protected HashMap<Long, String> sourceTableIdToName = new HashMap<>()
+ protected long sourceDbId
protected HashMap<String, TableMeta> sourceTableMap = new HashMap<>()
- protected Long targetDbId
+ protected long targetDbId
protected HashMap<String, TableMeta> targetTableMap = new HashMap<>()
protected HashMap<Long, BackendClientImpl> sourceBackendClients = new
HashMap<Long, BackendClientImpl>()
@@ -107,6 +107,8 @@ class SyncerContext {
public ArrayList<TTabletCommitInfo> commitInfos = new
ArrayList<TTabletCommitInfo>()
+ public BinlogData lastBinlog
+
public String labelName
public String tableName
public TGetSnapshotResult getSnapshotResult
@@ -120,6 +122,8 @@ class SyncerContext {
public long seq
SyncerContext(String dbName, Config config) {
+ this.sourceDbId = -1
+ this.targetDbId = -1
this.db = dbName
this.config = config
this.user = config.feSyncerUser
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
index 7801233000..dbcd1dba5d 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/SyncerUtils.groovy
@@ -44,29 +44,32 @@ class SyncerUtils {
request.setPasswd(context.passwd)
}
- private static String newLabel(SyncerContext context, String table) {
- return String.format("ccr_sync_job:%s:%s:%d", context.db, table,
context.seq)
+ private static String newLabel(SyncerContext context, Long tableId) {
+ return String.format("ccr_sync_job:%s:%d:%d", context.db, tableId,
context.seq)
}
- static TGetBinlogResult getBinLog(FrontendClientImpl clientImpl,
SyncerContext context, String table) throws TException {
+ static TGetBinlogResult getBinLog(FrontendClientImpl clientImpl,
SyncerContext context, String table, Long tableId) throws TException {
TGetBinlogRequest request = new TGetBinlogRequest()
setAuthorInformation(request, context)
request.setDb(context.db)
if (!table.isEmpty()) {
request.setTable(table)
}
+ if (tableId != -1) {
+ request.setTableId(tableId)
+ }
request.setPrevCommitSeq(context.seq)
return clientImpl.client.getBinlog(request)
}
- static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl,
SyncerContext context, String table) throws TException {
+ static TBeginTxnResult beginTxn(FrontendClientImpl clientImpl,
SyncerContext context, Long tableId) throws TException {
TBeginTxnRequest request = new TBeginTxnRequest()
setAuthorInformation(request, context)
request.setDb("TEST_" + context.db)
- if (table != null) {
- request.addToTables(table)
+ if (tableId != -1) {
+ request.addToTableIds(tableId)
}
- request.setLabel(newLabel(context, table))
+ request.setLabel(newLabel(context, tableId))
return clientImpl.client.beginTxn(request)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]