This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0af123f75fd [fix](suites) Fix syncer ingest binlog with multiple 
replicas #44444 (#44587)
0af123f75fd is described below

commit 0af123f75fd21009d347258d3ea3f30e771d74dc
Author: walter <[email protected]>
AuthorDate: Tue Nov 26 23:25:00 2024 +0800

    [fix](suites) Fix syncer ingest binlog with multiple replicas #44444 
(#44587)
    
    cherry pick from #44444
---
 .../apache/doris/regression/suite/Syncer.groovy    | 81 +++++++++++++---------
 .../doris/regression/suite/SyncerContext.groovy    | 27 +++++++-
 .../ccr_mow_syncer_p0/test_ingest_binlog.groovy    | 36 +++++-----
 3 files changed, 91 insertions(+), 53 deletions(-)

diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index 3a952b6edad..87abf2b151d 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -677,8 +677,13 @@ class Syncer {
                 // step 3.2: get partition/indexId/tabletId
                 partitionSQl += "/" + meta.indexId.toString()
                 sqlInfo = sendSql.call(partitionSQl, toSrc)
+                Map<Long, Long> replicaMap = Maps.newHashMap()
                 for (List<Object> row : sqlInfo) {
-                    meta.tabletMeta.put(row[0] as Long, row[2] as Long)
+                    Long tabletId = row[0] as Long
+                    if (!meta.tabletMeta.containsKey(tabletId)) {
+                        meta.tabletMeta.put(tabletId, new TabletMeta())
+                    }
+                    meta.tabletMeta[tabletId].replicas.put(row[1] as Long, 
row[2] as Long)
                 }
                 if (meta.tabletMeta.isEmpty()) {
                     logger.error("Target cluster get 
(partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.")
@@ -774,47 +779,55 @@ class Syncer {
                     continue
                 }
 
+                long txnId = context.txnId
+                // step 2.3: ingest each tablet in the partition
                 Iterator srcTabletIter = 
srcPartition.value.tabletMeta.iterator()
                 Iterator tarTabletIter = 
tarPartition.value.tabletMeta.iterator()
-
-                // step 2.3: ingest each tablet in the partition
                 while (srcTabletIter.hasNext()) {
                     Entry srcTabletMap = srcTabletIter.next()
                     Entry tarTabletMap = tarTabletIter.next()
+                    TabletMeta srcTabletMeta = srcTabletMap.value
+                    TabletMeta tarTabletMeta = tarTabletMap.value
+
+                    Iterator srcReplicaIter = srcTabletMeta.replicas.iterator()
+                    Iterator tarReplicaIter = tarTabletMeta.replicas.iterator()
+                    while (srcReplicaIter.hasNext()) {
+                        Entry srcReplicaMap = srcReplicaIter.next()
+                        Entry tarReplicaMap = tarReplicaIter.next()
+                        BackendClientImpl srcClient = 
context.sourceBackendClients.get(srcReplicaMap.value)
+                        if (srcClient == null) {
+                            logger.error("Can't find src 
tabletId-${srcReplicaMap.key} -> beId-${srcReplicaMap.value}")
+                            return false
+                        }
+                        BackendClientImpl tarClient = 
context.targetBackendClients.get(tarReplicaMap.value)
+                        if (tarClient == null) {
+                            logger.error("Can't find target 
tabletId-${tarReplicaMap.key} -> beId-${tarReplicaMap.value}")
+                            return false
+                        }
 
-                    BackendClientImpl srcClient = 
context.sourceBackendClients.get(srcTabletMap.value)
-                    if (srcClient == null) {
-                        logger.error("Can't find src 
tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
-                        return false
-                    }
-                    BackendClientImpl tarClient = 
context.targetBackendClients.get(tarTabletMap.value)
-                    if (tarClient == null) {
-                        logger.error("Can't find target 
tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
-                        return false
-                    }
+                        tarPartition.value.version = srcPartition.value.version
+                        long partitionId = fakePartitionId == -1 ? 
tarPartition.key : fakePartitionId
+                        long version = fakeVersion == -1 ? 
partitionRecord.version : fakeVersion
+
+                        TIngestBinlogRequest request = new 
TIngestBinlogRequest()
+                        TUniqueId uid = new TUniqueId(-1, -1)
+                        request.setTxnId(txnId)
+                        request.setRemoteTabletId(srcTabletMap.key)
+                        request.setBinlogVersion(version)
+                        request.setRemoteHost(srcClient.address.hostname)
+                        request.setRemotePort(srcClient.httpPort.toString())
+                        request.setPartitionId(partitionId)
+                        request.setLocalTabletId(tarTabletMap.key)
+                        request.setLoadId(uid)
+                        logger.info("request -> ${request}")
+                        TIngestBinlogResult result = 
tarClient.client.ingestBinlog(request)
+                        if (!checkIngestBinlog(result)) {
+                            logger.error("Ingest binlog error! result: 
${result}")
+                            return false
+                        }
 
-                    tarPartition.value.version = srcPartition.value.version
-                    long partitionId = fakePartitionId == -1 ? 
tarPartition.key : fakePartitionId
-                    long version = fakeVersion == -1 ? 
srcPartition.value.version : fakeVersion
-
-                    TIngestBinlogRequest request = new TIngestBinlogRequest()
-                    TUniqueId uid = new TUniqueId(-1, -1)
-                    request.setTxnId(context.txnId)
-                    request.setRemoteTabletId(srcTabletMap.key)
-                    request.setBinlogVersion(version)
-                    request.setRemoteHost(srcClient.address.hostname)
-                    request.setRemotePort(srcClient.httpPort.toString())
-                    request.setPartitionId(partitionId)
-                    request.setLocalTabletId(tarTabletMap.key)
-                    request.setLoadId(uid)
-                    logger.info("request -> ${request}")
-                    TIngestBinlogResult result = 
tarClient.client.ingestBinlog(request)
-                    if (!checkIngestBinlog(result)) {
-                        logger.error("Ingest binlog error! result: ${result}")
-                        return false
+                        addCommitInfo(tarTabletMap.key, tarReplicaMap.value)
                     }
-
-                    addCommitInfo(tarTabletMap.key, tarTabletMap.value)
                 }
             }
         }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
index 93108e67fc1..9102eb3b591 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SyncerContext.groovy
@@ -31,10 +31,22 @@ import groovy.util.logging.Slf4j
 
 import java.sql.Connection
 
+class TabletMeta {
+    public TreeMap<Long, Long> replicas
+
+    TabletMeta() {
+        this.replicas = new TreeMap<Long, Long>()
+    }
+
+    String toString() {
+        return "TabletMeta: { replicas: " + replicas.toString() + " }"
+    }
+}
+
 class PartitionMeta {
     public long version
     public long indexId
-    public TreeMap<Long, Long> tabletMeta
+    public TreeMap<Long, TabletMeta> tabletMeta
 
     PartitionMeta(long indexId, long version) {
         this.indexId = indexId
@@ -212,6 +224,19 @@ class SyncerContext {
                 } else if (srcTabletMeta.size() != tarTabletMeta.size()) {
                     return false
                 }
+
+                Iterator srcTabletIter = srcTabletMeta.iterator()
+                Iterator tarTabletIter = tarTabletMeta.iterator()
+                while (srcTabletIter.hasNext()) {
+                    Map srcReplicaMap = srcTabletIter.next().value.replicas
+                    Map tarReplicaMap = tarTabletIter.next().value.replicas
+
+                    if (srcReplicaMap.isEmpty() || tarReplicaMap.isEmpty()) {
+                        return false
+                    } else if (srcReplicaMap.size() != tarReplicaMap.size()) {
+                        return false
+                    }
+                }
             }
         })
 
diff --git a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy 
b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
index 233d57aa7b3..e07529718ee 100644
--- a/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
+++ b/regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
@@ -27,16 +27,16 @@ suite("test_mow_ingest_binlog") {
     def test_num = 0
     sql "DROP TABLE IF EXISTS ${tableName}"
     sql """
-           CREATE TABLE if NOT EXISTS ${tableName} 
+           CREATE TABLE if NOT EXISTS ${tableName}
            (
                `test` INT,
                `id` INT
            )
            ENGINE=OLAP
            UNIQUE KEY(`test`, `id`)
-           DISTRIBUTED BY HASH(id) BUCKETS 1 
-           PROPERTIES ( 
-                "enable_unique_key_merge_on_write" = "true",
+           DISTRIBUTED BY HASH(id) BUCKETS 1
+           PROPERTIES (
+               "enable_unique_key_merge_on_write" = "true",
                "replication_allocation" = "tag.location.default: 1"
            )
         """
@@ -44,19 +44,19 @@ suite("test_mow_ingest_binlog") {
 
     target_sql "DROP TABLE IF EXISTS ${tableName}"
     target_sql """
-                  CREATE TABLE if NOT EXISTS ${tableName} 
-                  (
-                      `test` INT,
-                      `id` INT
-                  )
-                  ENGINE=OLAP
-                  UNIQUE KEY(`test`, `id`)
-                  DISTRIBUTED BY HASH(id) BUCKETS 1 
-                  PROPERTIES ( 
-                        "enable_unique_key_merge_on_write" = "true",
-                      "replication_allocation" = "tag.location.default: 1"
-                  )
-              """
+          CREATE TABLE if NOT EXISTS ${tableName}
+          (
+              `test` INT,
+              `id` INT
+          )
+          ENGINE=OLAP
+          UNIQUE KEY(`test`, `id`)
+          DISTRIBUTED BY HASH(id) BUCKETS 1
+          PROPERTIES (
+               "enable_unique_key_merge_on_write" = "true",
+              "replication_allocation" = "tag.location.default: 1"
+          )
+        """
     assertTrue(syncer.getTargetMeta("${tableName}"))
 
 
@@ -124,4 +124,4 @@ suite("test_mow_ingest_binlog") {
 
     // End Test 2
     syncer.closeBackendClients()
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to