This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1c02fdbbece branch-3.0: [fix](cloud) Fix
`ConcurrentModificationException` in cloud rebalance #52013 (#52309)
1c02fdbbece is described below
commit 1c02fdbbecece77fad7fb8afffa5dc59c9f1eaed
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jun 26 10:06:25 2025 +0800
branch-3.0: [fix](cloud) Fix `ConcurrentModificationException` in cloud
rebalance #52013 (#52309)
Cherry-picked from #52013
Co-authored-by: deardeng <[email protected]>
---
.../doris/cloud/catalog/CloudTabletRebalancer.java | 18 +-
.../cloud_p0/multi_cluster/test_rebalance.groovy | 292 ++++++++++++---------
2 files changed, 191 insertions(+), 119 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index b93d4fe2cff..1b014caea82 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
@@ -55,6 +56,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -418,11 +420,22 @@ public class CloudTabletRebalancer extends MasterDaemon {
for (Map.Entry<Long, List<InfightTask>> entry :
beToInfightTasks.entrySet()) {
LOG.info("before pre cache check dest be {} inflight task num {}",
entry.getKey(), entry.getValue().size());
Backend destBackend =
cloudSystemInfoService.getBackend(entry.getKey());
+ if
(DebugPointUtil.isEnable("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull"))
{
+ LOG.info("debug point
CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull, be {}", destBackend);
+ destBackend = null;
+ }
if (destBackend == null || (!destBackend.isAlive() &&
destBackend.getLastUpdateMs() < needRehashDeadTime)) {
+ List<InfightTablet> toRemove = new LinkedList<>();
for (InfightTask task : entry.getValue()) {
for (InfightTablet key : tabletToInfightTask.keySet()) {
- tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), key.clusterId));
+ toRemove.add(new
InfightTablet(task.pickedTablet.getId(), key.clusterId));
+ }
+ }
+ for (InfightTablet key : toRemove) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove tablet {}-{}", key.getClusterId(),
key.getTabletId());
}
+ tabletToInfightTask.remove(key);
}
continue;
}
@@ -447,6 +460,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
LOG.info("{} pre cache timeout, forced to change the
mapping", result.getKey());
}
updateClusterToBeMap(task.pickedTablet, task.destBe,
clusterId, infos);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove tablet {}-{}", clusterId,
task.pickedTablet.getId());
+ }
tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), clusterId));
}
}
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
index 7a76533b0a4..c15157308c4 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -21,142 +21,198 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
if (!isCloudMode()) {
return;
}
- def options = new ClusterOptions()
- options.feConfigs += [
- 'cloud_cluster_check_interval_second=1',
- 'enable_cloud_warm_up_for_rebalance=false',
- 'cloud_tablet_rebalancer_interval_second=1',
- 'cloud_balance_tablet_percent_per_run=0.5',
- 'cloud_pre_heating_time_limit_sec=1',
- 'sys_log_verbose_modules=org',
+
+ def clusterOptions = [
+ new ClusterOptions(),
+ new ClusterOptions(),
]
- options.setFeNum(3)
- options.setBeNum(1)
- options.cloudMode = true
- options.connectToFollower = true
- options.enableDebugPoints()
-
- docker(options) {
- sql """
- CREATE TABLE table100 (
- class INT,
- id INT,
- score INT SUM
- )
- AGGREGATE KEY(class, id)
- DISTRIBUTED BY HASH(class) BUCKETS 48
- """
-
- sql """
- CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20) NOT
NULL, k3 int sum NOT NULL )
- AGGREGATE KEY(k1, k2)
- PARTITION BY RANGE(k1) (
- PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
- PARTITION p1993 VALUES [("19930101"), ("19940101")),
- PARTITION p1994 VALUES [("19940101"), ("19950101")),
- PARTITION p1995 VALUES [("19950101"), ("19960101")),
- PARTITION p1996 VALUES [("19960101"), ("19970101")),
- PARTITION p1997 VALUES [("19970101"), ("19980101")),
- PARTITION p1998 VALUES [("19980101"), ("19990101")))
- DISTRIBUTED BY HASH(k1) BUCKETS 3
- """
-
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends");
- sql """set global forward_to_master=false"""
-
- // add a be
- cluster.addBackend(1, null)
-
- dockerAwaitUntil(30) {
- def bes = sql """show backends"""
- log.info("bes: {}", bes)
- bes.size() == 2
- }
+ for (options in clusterOptions) {
+ options.setFeNum(3)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.connectToFollower = true
+ options.enableDebugPoints()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'cloud_balance_tablet_percent_per_run=0.5',
- dockerAwaitUntil(5) {
- def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
- log.info("replica distribution table100: {}", ret)
- ret.size() == 2
- }
+ 'sys_log_verbose_modules=org',
+ ]
+ }
+ clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true',
'cloud_pre_heating_time_limit_sec=300']
+ clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false']
- def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION
FROM table100; """
- assertEquals(2, result.size())
- int replicaNum = 0
-
- for (def row : result) {
- log.info("replica distribution: ${row} ".toString())
- replicaNum = Integer.valueOf((String) row.ReplicaNum)
- if (replicaNum == 0) {
- // due to debug point, observer not hash replica
- } else {
- assertTrue(replicaNum <= 25 && replicaNum >= 23)
+
+ for (int i = 0; i < clusterOptions.size(); i++) {
+ log.info("begin warm up {}", i == 0 ? "ON" : "OFF")
+ docker(clusterOptions[i]) {
+ sql """
+ CREATE TABLE table100 (
+ class INT,
+ id INT,
+ score INT SUM
+ )
+ AGGREGATE KEY(class, id)
+ DISTRIBUTED BY HASH(class) BUCKETS 48
+ """
+
+ sql """
+ CREATE TABLE table_p2 ( k1 int(11) NOT NULL, k2 varchar(20)
NOT NULL, k3 int sum NOT NULL )
+ AGGREGATE KEY(k1, k2)
+ PARTITION BY RANGE(k1) (
+ PARTITION p1992 VALUES [("-2147483648"), ("19930101")),
+ PARTITION p1993 VALUES [("19930101"), ("19940101")),
+ PARTITION p1994 VALUES [("19940101"), ("19950101")),
+ PARTITION p1995 VALUES [("19950101"), ("19960101")),
+ PARTITION p1996 VALUES [("19960101"), ("19970101")),
+ PARTITION p1997 VALUES [("19970101"), ("19980101")),
+ PARTITION p1998 VALUES [("19980101"), ("19990101")))
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ """
+
GetDebugPoint().enableDebugPointForAllFEs("CloudReplica.getBackendIdImpl.primaryClusterToBackends");
+ sql """set global forward_to_master=false"""
+
+ // add a be
+ cluster.addBackend(1, null)
+
+ dockerAwaitUntil(30) {
+ def bes = sql """show backends"""
+ log.info("bes: {}", bes)
+ bes.size() == 2
}
- }
- dockerAwaitUntil(5) {
- def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2
PARTITION(p1992)"""
- log.info("replica distribution table_p2: {}", ret)
- ret.size() == 2
- }
+ dockerAwaitUntil(5) {
+ def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM
table100"""
+ log.info("replica distribution table100: {}", ret)
+ ret.size() == 2
+ }
+ def result = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM table100; """
+ assertEquals(2, result.size())
+ int replicaNum = 0
- result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1992) """
- assertEquals(2, result.size())
- for (def row : result) {
- replicaNum = Integer.valueOf((String) row.ReplicaNum)
- log.info("replica distribution: ${row} ".toString())
- if (replicaNum != 0) {
- assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ for (def row : result) {
+ log.info("replica distribution: ${row} ".toString())
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ if (replicaNum == 0) {
+ // due to debug point, observer not hash replica
+ } else {
+ assertTrue(replicaNum <= 25 && replicaNum >= 23)
+ }
}
- }
- result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1993) """
- assertEquals(2, result.size())
- for (def row : result) {
- replicaNum = Integer.valueOf((String) row.ReplicaNum)
- log.info("replica distribution: ${row} ".toString())
- if (replicaNum != 0) {
- assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ dockerAwaitUntil(5) {
+ def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table_p2
PARTITION(p1992)"""
+ log.info("replica distribution table_p2: {}", ret)
+ ret.size() == 2
}
- }
- result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1994) """
- assertEquals(2, result.size())
- for (def row : result) {
- replicaNum = Integer.valueOf((String) row.ReplicaNum)
- log.info("replica distribution: ${row} ".toString())
- if (replicaNum != 0) {
- assertTrue(replicaNum <= 2 && replicaNum >= 1)
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION
FROM table_p2 PARTITION(p1992) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
}
- }
- result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1995) """
- assertEquals(2, result.size())
- for (def row : result) {
- replicaNum = Integer.valueOf((String) row.ReplicaNum)
- log.info("replica distribution: ${row} ".toString())
- if (replicaNum != 0) {
- assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION
FROM table_p2 PARTITION(p1993) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
}
- }
- result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1996) """
- assertEquals(2, result.size())
- for (def row : result) {
- replicaNum = Integer.valueOf((String) row.ReplicaNum)
- log.info("replica distribution: ${row} ".toString())
- if (replicaNum != 0) {
- assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION
FROM table_p2 PARTITION(p1994) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
}
- }
- result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION FROM
table_p2 PARTITION(p1997) """
- assertEquals(2, result.size())
- for (def row : result) {
- replicaNum = Integer.valueOf((String) row.ReplicaNum)
- log.info("replica distribution: ${row} ".toString())
- if (replicaNum != 0) {
- assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION
FROM table_p2 PARTITION(p1995) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION
FROM table_p2 PARTITION(p1996) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+
+ result = sql_return_maparray """ ADMIN SHOW REPLICA DISTRIBUTION
FROM table_p2 PARTITION(p1997) """
+ assertEquals(2, result.size())
+ for (def row : result) {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ log.info("replica distribution: ${row} ".toString())
+ if (replicaNum != 0) {
+ assertTrue(replicaNum <= 2 && replicaNum >= 1)
+ }
+ }
+
+ if (i == 1) {
+ // just test warm up
+ return
+ }
+
+
GetDebugPoint().enableDebugPointForAllFEs("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull");
+ // add a be
+ cluster.addBackend(1, null)
+ // warm up
+ sql """admin set frontend
config("enable_cloud_warm_up_for_rebalance"="true")"""
+
+ // test rebalance thread still work
+ dockerAwaitUntil(30) {
+ def bes = sql """show backends"""
+ log.info("bes: {}", bes)
+ bes.size() == 3
+ }
+
+ dockerAwaitUntil(5) {
+ def ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM
table100"""
+ log.info("replica distribution table100: {}", ret)
+ ret.size() == 3
+ }
+
+ result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION
FROM table100; """
+ assertEquals(3, result.size())
+ log.info("replica distribution: ${result} ".toString())
+
+ // test 10s not balance, due to debug point
+ for (int j = 0; j < 10; j++) {
+ assertTrue(result.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 0
+ })
+ sleep(1 * 1000)
+ }
+
GetDebugPoint().disableDebugPointForAllFEs("CloudTabletRebalancer.checkInflghtWarmUpCacheAsync.beNull");
+ dockerAwaitUntil(10) {
+ def ret = sql_return_maparray """ADMIN SHOW REPLICA
DISTRIBUTION FROM table100"""
+ log.info("replica distribution table100: {}", ret)
+ ret.any { row ->
+ Integer.valueOf((String) row.ReplicaNum) == 16
+ }
}
}
+ logger.info("Successfully run {} times", i + 1)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]