This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1c02fdbbece branch-3.0: [fix](cloud) Fix 
`ConcurrentModificationException` in cloud rebalance #52013 (#52309)
1c02fdbbece is described below

commit 1c02fdbbecece77fad7fb8afffa5dc59c9f1eaed
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jun 26 10:06:25 2025 +0800

    branch-3.0: [fix](cloud) Fix `ConcurrentModificationException` in cloud 
rebalance #52013 (#52309)
    
    Cherry-picked from #52013
    
    Co-authored-by: deardeng <[email protected]>
---
 .../doris/cloud/catalog/CloudTabletRebalancer.java |  18 +-
 .../cloud_p0/multi_cluster/test_rebalance.groovy   | 292 ++++++++++++---------
 2 files changed, 191 insertions(+), 119 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index b93d4fe2cff..1b014caea82 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.common.util.MasterDaemon;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.system.Backend;
@@ -55,6 +56,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -418,11 +420,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
         for (Map.Entry<Long, List<InfightTask>> entry : 
beToInfightTasks.entrySet()) {
             LOG.info("before pre cache check dest be {} inflight task num {}", 
entry.getKey(), entry.getValue().size());
             Backend destBackend = 
cloudSystemInfoService.getBackend(entry.getKey());
+            if 
(DebugPointUtil.isEnable("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull"))
 {
+                LOG.info("debug point 
CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull, be {}", destBackend);
+                destBackend = null;
+            }
             if (destBackend == null || (!destBackend.isAlive() && 
destBackend.getLastUpdateMs() < needRehashDeadTime)) {
+                List<InfightTablet> toRemove = new LinkedList<>();
                 for (InfightTask task : entry.getValue()) {
                     for (InfightTablet key : tabletToInfightTask.keySet()) {
-                        tabletToInfightTask.remove(new 
InfightTablet(task.pickedTablet.getId(), key.clusterId));
+                        toRemove.add(new 
InfightTablet(task.pickedTablet.getId(), key.clusterId));
+                    }
+                }
+                for (InfightTablet key : toRemove) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("remove tablet {}-{}", key.getClusterId(), 
key.getTabletId());
                     }
+                    tabletToInfightTask.remove(key);
                 }
                 continue;
             }
@@ -447,6 +460,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
                         LOG.info("{} pre cache timeout, forced to change the 
mapping", result.getKey());
                     }
                     updateClusterToBeMap(task.pickedTablet, task.destBe, 
clusterId, infos);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("remove tablet {}-{}", clusterId, 
task.pickedTablet.getId());
+                    }
                     tabletToInfightTask.remove(new 
InfightTablet(task.pickedTablet.getId(), clusterId));
                 }
             }
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
index 7a76533b0a4..c15157308c4 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -21,142 +21,198 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
     if (!isCloudMode()) {
         return;
     }
-    def options = new ClusterOptions()
-    options.feConfigs += [
-        'cloud_cluster_check_interval_second=1',
-        'enable_cloud_warm_up_for_rebalance=false',
-        'cloud_tablet_rebalancer_interval_second=1',
-        'cloud_balance_tablet_percent_per_run=0.5',
-        'cloud_pre_heating_time_limit_sec=1',
-        'sys_log_verbose_modules=org',
+
+    def clusterOptions = [
+        new ClusterOptions(),
+        new ClusterOptions(),
     ]
-    options.setFeNum(3)
-    options.setBeNum(1)
-    options.cloudMode = true
-    options.connectToFollower = true
-    options.enableDebugPoints()
-
-    docker(options) {
-        sql """
-            CREATE TABLE table100 (
-            class INT,
-            id INT,
-            score INT SUM
-            )
-            AGGREGATE KEY(class, id)
-            DISTRIBUTED BY HASH(class) BUCKETS 48
-        """
-
-        sql """
-            CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT 
NULL, k3 int sum NOT NULL )
-            AGGREGATE KEY(k1, k2)
-            PARTITION BY RANGE(k1) (
-            PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
-            PARTITION p1993 VALUES [("19930101"), ("19940101")),
-            PARTITION p1994 VALUES [("19940101"), ("19950101")),
-            PARTITION p1995 VALUES [("19950101"), ("19960101")),
-            PARTITION p1996 VALUES [("19960101"), ("19970101")),
-            PARTITION p1997 VALUES [("19970101"), ("19980101")),
-            PARTITION p1998 VALUES [("19980101"), ("19990101")))
-            DISTRIBUTED BY HASH(k1) BUCKETS 3
-        """
-        
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends");
-        sql """set global forward_to_master=false"""
-        
-        // add a be
-        cluster.addBackend(1, null)
-        
-        dockerAwaitUntil(30) {
-            def bes = sql """show backends"""
-            log.info("bes: {}", bes)
-            bes.size() == 2
-        }
+    for (options in clusterOptions) {
+        options.setFeNum(3)
+        options.setBeNum(1)
+        options.cloudMode = true
+        options.connectToFollower = true
+        options.enableDebugPoints()
+        options.feConfigs += [
+            'cloud_cluster_check_interval_second=1',
+            'cloud_tablet_rebalancer_interval_second=1',
+            'cloud_balance_tablet_percent_per_run=0.5',
 
-        dockerAwaitUntil(5) {
-            def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
-            log.info("replica distribution table100: {}", ret)
-            ret.size() == 2
-        }
+            'sys_log_verbose_modules=org',
+        ]
+    }
+    clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true', 
            'cloud_pre_heating_time_limit_sec=300']
+    clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false']
 
-        def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION 
FROM table100; """
-        assertEquals(2, result.size())
-        int replicaNum = 0
-
-        for (def row : result) {
-            log.info("replica distribution: ${row} ".toString())
-            replicaNum = Integer.valueOf((String) row.ReplicaNum)
-            if (replicaNum == 0) {
-                // due to debug point, observer not hash replica
-            } else {
-                assertTrue(replicaNum <= 25 && replicaNum >= 23)
+
+    for (int i = 0; i < clusterOptions.size(); i++) {
+        log.info("begin warm up {}", i == 0 ? "ON" : "OFF")
+        docker(clusterOptions[i]) {
+            sql """
+                CREATE TABLE table100 (
+                class INT,
+                id INT,
+                score INT SUM
+                )
+                AGGREGATE KEY(class, id)
+                DISTRIBUTED BY HASH(class) BUCKETS 48
+            """
+
+            sql """
+                CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) 
NOT NULL, k3 int sum NOT NULL )
+                AGGREGATE KEY(k1, k2)
+                PARTITION BY RANGE(k1) (
+                PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+                PARTITION p1993 VALUES [("19930101"), ("19940101")),
+                PARTITION p1994 VALUES [("19940101"), ("19950101")),
+                PARTITION p1995 VALUES [("19950101"), ("19960101")),
+                PARTITION p1996 VALUES [("19960101"), ("19970101")),
+                PARTITION p1997 VALUES [("19970101"), ("19980101")),
+                PARTITION p1998 VALUES [("19980101"), ("19990101")))
+                DISTRIBUTED BY HASH(k1) BUCKETS 3
+            """
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends");
+            sql """set global forward_to_master=false"""
+            
+            // add a be
+            cluster.addBackend(1, null)
+            
+            dockerAwaitUntil(30) {
+                def bes = sql """show backends"""
+                log.info("bes: {}", bes)
+                bes.size() == 2
             }
-        }
 
-        dockerAwaitUntil(5) {
-            def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 
PARTITION(p1992)"""
-            log.info("replica distribution table_p2: {}", ret)
-            ret.size() == 2
-        }
+            dockerAwaitUntil(5) {
+                def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM 
table100"""
+                log.info("replica distribution table100: {}", ret)
+                ret.size() == 2
+            }
 
+            def result = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM table100; """
+            assertEquals(2, result.size())
+            int replicaNum = 0
 
-        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1992) """
-        assertEquals(2, result.size())
-        for (def row : result) {
-            replicaNum = Integer.valueOf((String) row.ReplicaNum)
-            log.info("replica distribution: ${row} ".toString())
-            if (replicaNum != 0) {
-                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            for (def row : result) {
+                log.info("replica distribution: ${row} ".toString())
+                replicaNum = Integer.valueOf((String) row.ReplicaNum)
+                if (replicaNum == 0) {
+                    // due to debug point, observer not hash replica
+                } else {
+                    assertTrue(replicaNum <= 25 && replicaNum >= 23)
+                }
             }
-        }
 
-        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1993) """
-        assertEquals(2, result.size())
-        for (def row : result) {
-            replicaNum = Integer.valueOf((String) row.ReplicaNum)
-            log.info("replica distribution: ${row} ".toString())
-            if (replicaNum != 0) {
-                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            dockerAwaitUntil(5) {
+                def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2 
PARTITION(p1992)"""
+                log.info("replica distribution table_p2: {}", ret)
+                ret.size() == 2
             }
-        }
 
-        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1994) """
-        assertEquals(2, result.size())
-        for (def row : result) {
-            replicaNum = Integer.valueOf((String) row.ReplicaNum)
-            log.info("replica distribution: ${row} ".toString())
-            if (replicaNum != 0) {
-                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM table_p2 PARTITION(p1992) """
+            assertEquals(2, result.size())
+            for (def row : result) {
+                replicaNum = Integer.valueOf((String) row.ReplicaNum)
+                log.info("replica distribution: ${row} ".toString())
+                if (replicaNum != 0) {
+                    assertTrue(replicaNum <= 2 && replicaNum >= 1)
+                }
             }
-        }
 
-        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1995) """
-        assertEquals(2, result.size())
-        for (def row : result) {
-            replicaNum = Integer.valueOf((String) row.ReplicaNum)
-            log.info("replica distribution: ${row} ".toString())
-            if (replicaNum != 0) {
-                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM table_p2 PARTITION(p1993) """
+            assertEquals(2, result.size())
+            for (def row : result) {
+                replicaNum = Integer.valueOf((String) row.ReplicaNum)
+                log.info("replica distribution: ${row} ".toString())
+                if (replicaNum != 0) {
+                    assertTrue(replicaNum <= 2 && replicaNum >= 1)
+                }
             }
-        }
 
-        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1996) """
-        assertEquals(2, result.size())
-        for (def row : result) {
-            replicaNum = Integer.valueOf((String) row.ReplicaNum)
-            log.info("replica distribution: ${row} ".toString())
-            if (replicaNum != 0) {
-                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM table_p2 PARTITION(p1994) """
+            assertEquals(2, result.size())
+            for (def row : result) {
+                replicaNum = Integer.valueOf((String) row.ReplicaNum)
+                log.info("replica distribution: ${row} ".toString())
+                if (replicaNum != 0) {
+                    assertTrue(replicaNum <= 2 && replicaNum >= 1)
+                }
             }
-        }
 
-        result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM 
table_p2 PARTITION(p1997) """
-        assertEquals(2, result.size())
-        for (def row : result) {
-            replicaNum = Integer.valueOf((String) row.ReplicaNum)
-            log.info("replica distribution: ${row} ".toString())
-            if (replicaNum != 0) {
-                assertTrue(replicaNum <= 2 && replicaNum >= 1)
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM table_p2 PARTITION(p1995) """
+            assertEquals(2, result.size())
+            for (def row : result) {
+                replicaNum = Integer.valueOf((String) row.ReplicaNum)
+                log.info("replica distribution: ${row} ".toString())
+                if (replicaNum != 0) {
+                    assertTrue(replicaNum <= 2 && replicaNum >= 1)
+                }
+            }
+
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM table_p2 PARTITION(p1996) """
+            assertEquals(2, result.size())
+            for (def row : result) {
+                replicaNum = Integer.valueOf((String) row.ReplicaNum)
+                log.info("replica distribution: ${row} ".toString())
+                if (replicaNum != 0) {
+                    assertTrue(replicaNum <= 2 && replicaNum >= 1)
+                }
+            }
+
+            result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION 
FROM table_p2 PARTITION(p1997) """
+            assertEquals(2, result.size())
+            for (def row : result) {
+                replicaNum = Integer.valueOf((String) row.ReplicaNum)
+                log.info("replica distribution: ${row} ".toString())
+                if (replicaNum != 0) {
+                    assertTrue(replicaNum <= 2 && replicaNum >= 1)
+                }
+            }
+
+            if (i == 1) {
+                // just test warm up
+                return
+            }
+
+            
GetDebugPoint().enableDebugPointForAllFEs("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull");
+            // add a be
+            cluster.addBackend(1, null)
+            // warm up
+            sql """admin set frontend 
config("enable_cloud_warm_up_for_rebalance"="true")"""
+
+            // test rebalance thread still work
+            dockerAwaitUntil(30) {
+                def bes = sql """show backends"""
+                log.info("bes: {}", bes)
+                bes.size() == 3
+            }
+
+            dockerAwaitUntil(5) {
+                def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM 
table100"""
+                log.info("replica distribution table100: {}", ret)
+                ret.size() == 3
+            }
+
+            result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION 
FROM table100; """
+            assertEquals(3, result.size())
+            log.info("replica distribution: ${result} ".toString())
+
+            // test 10s not balance, due to debug point
+            for (int j = 0; j < 10; j++) {
+                assertTrue(result.any { row ->  
+                    Integer.valueOf((String) row.ReplicaNum) == 0 
+                })
+                sleep(1 * 1000)
+            }
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull");
+            dockerAwaitUntil(10) {
+                def ret = sql_return_maparray """ADMIN SHOW REPLICA 
DISTRIBUTION FROM table100"""
+                log.info("replica distribution table100: {}", ret)
+                ret.any { row -> 
+                    Integer.valueOf((String) row.ReplicaNum) == 16 
+                }
             }
         }
+        logger.info("Successfully run {} times", i + 1)
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to