This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5b20207d1eb [fix](suites) Fix syncer ingest binlog with multiple
replicas (#44444)
5b20207d1eb is described below
commit 5b20207d1eb9ca7ed1fb3669e3621bbf1e166c6c
Author: walter <[email protected]>
AuthorDate: Fri Nov 22 17:57:29 2024 +0800
[fix](suites) Fix syncer ingest binlog with multiple replicas (#44444)
---
.../apache/doris/regression/suite/Syncer.groovy | 95 ++++++++++++----------
.../doris/regression/suite/SyncerContext.groovy | 27 +++++-
.../ccr_mow_syncer_p0/test_ingest_binlog.groovy | 36 ++++----
3 files changed, 98 insertions(+), 60 deletions(-)
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 894b42824ef..2195e7e745a 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -700,8 +700,13 @@ class Syncer {
// step 3.2: get partition/indexId/tabletId
partitionSQl += "/" + meta.indexId.toString()
sqlInfo = sendSql.call(partitionSQl, toSrc)
+ Map<Long, Long> replicaMap = Maps.newHashMap()
for (List<Object> row : sqlInfo) {
- meta.tabletMeta.put(row[0] as Long, row[2] as Long)
+ Long tabletId = row[0] as Long
+ if (!meta.tabletMeta.containsKey(tabletId)) {
+ meta.tabletMeta.put(tabletId, new TabletMeta())
+ }
+ meta.tabletMeta[tabletId].replicas.put(row[1] as Long,
row[2] as Long)
}
if (meta.tabletMeta.isEmpty()) {
logger.error("Target cluster get
(partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.")
@@ -816,49 +821,57 @@ class Syncer {
while (srcTabletIter.hasNext()) {
Entry srcTabletMap = srcTabletIter.next()
Entry tarTabletMap = tarTabletIter.next()
+ TabletMeta srcTabletMeta = srcTabletMap.value
+ TabletMeta tarTabletMeta = tarTabletMap.value
+
+ Iterator srcReplicaIter =
srcTabletMeta.replicas.iterator()
+ Iterator tarReplicaIter =
tarTabletMeta.replicas.iterator()
+ while (srcReplicaIter.hasNext()) {
+ Entry srcReplicaMap = srcReplicaIter.next()
+ Entry tarReplicaMap = tarReplicaIter.next()
+ BackendClientImpl srcClient =
context.sourceBackendClients.get(srcReplicaMap.value)
+ if (srcClient == null) {
+ logger.error("Can't find src
tabletId-${srcReplicaMap.key} -> beId-${srcReplicaMap.value}")
+ return false
+ }
+ BackendClientImpl tarClient =
context.targetBackendClients.get(tarReplicaMap.value)
+ if (tarClient == null) {
+ logger.error("Can't find target
tabletId-${tarReplicaMap.key} -> beId-${tarReplicaMap.value}")
+ return false
+ }
- BackendClientImpl srcClient =
context.sourceBackendClients.get(srcTabletMap.value)
- if (srcClient == null) {
- logger.error("Can't find src
tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
- return false
- }
- BackendClientImpl tarClient =
context.targetBackendClients.get(tarTabletMap.value)
- if (tarClient == null) {
- logger.error("Can't find target
tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
- return false
- }
-
- tarPartition.value.version = srcPartition.value.version
- long partitionId = fakePartitionId == -1 ?
tarPartition.key : fakePartitionId
- long version = fakeVersion == -1 ?
partitionRecord.version : fakeVersion
-
- TIngestBinlogRequest request = new
TIngestBinlogRequest()
- TUniqueId uid = new TUniqueId(-1, -1)
- request.setTxnId(txnId)
- request.setRemoteTabletId(srcTabletMap.key)
- request.setBinlogVersion(version)
- request.setRemoteHost(srcClient.address.hostname)
- request.setRemotePort(srcClient.httpPort.toString())
- request.setPartitionId(partitionId)
- request.setLocalTabletId(tarTabletMap.key)
- request.setLoadId(uid)
- logger.info("request -> ${request}")
- TIngestBinlogResult result =
tarClient.client.ingestBinlog(request)
- if (!checkIngestBinlog(result)) {
- logger.error("Ingest binlog error! result:
${result}")
- return false
- }
+ tarPartition.value.version =
srcPartition.value.version
+ long partitionId = fakePartitionId == -1 ?
tarPartition.key : fakePartitionId
+ long version = fakeVersion == -1 ?
partitionRecord.version : fakeVersion
+
+ TIngestBinlogRequest request = new
TIngestBinlogRequest()
+ TUniqueId uid = new TUniqueId(-1, -1)
+ request.setTxnId(txnId)
+ request.setRemoteTabletId(srcTabletMap.key)
+ request.setBinlogVersion(version)
+ request.setRemoteHost(srcClient.address.hostname)
+
request.setRemotePort(srcClient.httpPort.toString())
+ request.setPartitionId(partitionId)
+ request.setLocalTabletId(tarTabletMap.key)
+ request.setLoadId(uid)
+ logger.info("request -> ${request}")
+ TIngestBinlogResult result =
tarClient.client.ingestBinlog(request)
+ if (!checkIngestBinlog(result)) {
+ logger.error("Ingest binlog error! result:
${result}")
+ return false
+ }
- if (context.txnInsert) {
- List<TTabletCommitInfo> tabletCommitInfos =
subTxnIdToTabletCommitInfos.get(txnId)
- if (tabletCommitInfos == null) {
- tabletCommitInfos = new
ArrayList<TTabletCommitInfo>()
- subTxnIdToTabletCommitInfos.put(txnId,
tabletCommitInfos)
- subTxnIdToTableId.put(txnId, tarTableMeta.id)
+ if (context.txnInsert) {
+ List<TTabletCommitInfo> tabletCommitInfos =
subTxnIdToTabletCommitInfos.get(txnId)
+ if (tabletCommitInfos == null) {
+ tabletCommitInfos = new
ArrayList<TTabletCommitInfo>()
+ subTxnIdToTabletCommitInfos.put(txnId,
tabletCommitInfos)
+ subTxnIdToTableId.put(txnId,
tarTableMeta.id)
+ }
+ tabletCommitInfos.add(new
TTabletCommitInfo(tarTabletMap.key, tarReplicaMap.value))
+ } else {
+ addCommitInfo(tarTabletMap.key,
tarReplicaMap.value)
}
- tabletCommitInfos.add(new
TTabletCommitInfo(tarTabletMap.key, tarTabletMap.value))
- } else {
- addCommitInfo(tarTabletMap.key, tarTabletMap.value)
}
}
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
index b86f012aa87..3202db4011f 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
@@ -32,10 +32,22 @@ import groovy.util.logging.Slf4j
import java.sql.Connection
+class TabletMeta {
+ public TreeMap<Long, Long> replicas
+
+ TabletMeta() {
+ this.replicas = new TreeMap<Long, Long>()
+ }
+
+ String toString() {
+ return "TabletMeta: { replicas: " + replicas.toString() + " }"
+ }
+}
+
class PartitionMeta {
public long version
public long indexId
- public TreeMap<Long, Long> tabletMeta
+ public TreeMap<Long, TabletMeta> tabletMeta
PartitionMeta(long indexId, long version) {
this.indexId = indexId
@@ -219,6 +231,19 @@ class SyncerContext {
} else if (srcTabletMeta.size() != tarTabletMeta.size()) {
return false
}
+
+ Iterator srcTabletIter = srcTabletMeta.iterator()
+ Iterator tarTabletIter = tarTabletMeta.iterator()
+ while (srcTabletIter.hasNext()) {
+ Map srcReplicaMap = srcTabletIter.next().value.replicas
+ Map tarReplicaMap = tarTabletIter.next().value.replicas
+
+ if (srcReplicaMap.isEmpty() || tarReplicaMap.isEmpty()) {
+ return false
+ } else if (srcReplicaMap.size() != tarReplicaMap.size()) {
+ return false
+ }
+ }
}
})
diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
index 233d57aa7b3..e07529718ee 100644
--- a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
+++ b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
@@ -27,16 +27,16 @@ suite("test_mow_ingest_binlog") {
def test_num = 0
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
- CREATE TABLE if NOT EXISTS ${tableName}
+ CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
- DISTRIBUTED BY HASH(id) BUCKETS 1
- PROPERTIES (
- "enable_unique_key_merge_on_write" = "true",
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
@@ -44,19 +44,19 @@ suite("test_mow_ingest_binlog") {
target_sql "DROP TABLE IF EXISTS ${tableName}"
target_sql """
- CREATE TABLE if NOT EXISTS ${tableName}
- (
- `test` INT,
- `id` INT
- )
- ENGINE=OLAP
- UNIQUE KEY(`test`, `id`)
- DISTRIBUTED BY HASH(id) BUCKETS 1
- PROPERTIES (
- "enable_unique_key_merge_on_write" = "true",
- "replication_allocation" = "tag.location.default: 1"
- )
- """
+ CREATE TABLE if NOT EXISTS ${tableName}
+ (
+ `test` INT,
+ `id` INT
+ )
+ ENGINE=OLAP
+ UNIQUE KEY(`test`, `id`)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
assertTrue(syncer.getTargetMeta("${tableName}"))
@@ -124,4 +124,4 @@ suite("test_mow_ingest_binlog") {
// End Test 2
syncer.closeBackendClients()
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]