This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 090b19fef03 [fix](cloud) Fix migrate tablets between backends back and
forth (#39792)
090b19fef03 is described below
commit 090b19fef03b166f81c39e75575bfd7a1e05cbf0
Author: deardeng <[email protected]>
AuthorDate: Mon Sep 2 00:29:06 2024 +0800
[fix](cloud) Fix migrate tablets between backends back and forth (#39792)
BUG: cloud rebalancer migrates tablets back and forth: move from A to B,
then B to A, then A to B, ...
The reason is that the tabletToInfightTask map tracking in-flight tasks
ignored the multi-cluster scenario, and in the statRouteInfo function,
the cluster information was lost, which led to inaccurate tablets
statistics.
---
be/src/cloud/cloud_backend_service.cpp | 5 +
.../doris/cloud/catalog/CloudTabletRebalancer.java | 95 +++++++++-----
.../multi_cluster/test_warmup_rebalance.groovy | 137 +++++++++++++++++++++
3 files changed, 205 insertions(+), 32 deletions(-)
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
index d91e9e416b8..2dc6d03ebf6 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -180,6 +180,11 @@ void
CloudBackendService::check_warm_up_cache_async(TCheckWarmUpCacheAsyncRespon
const
TCheckWarmUpCacheAsyncRequest& request) {
std::map<int64_t, bool> task_done;
_engine.file_cache_block_downloader().check_download_task(request.tablets,
&task_done);
+
DBUG_EXECUTE_IF("CloudBackendService.check_warm_up_cache_async.return_task_false",
{
+ for (auto& it : task_done) {
+ it.second = false;
+ }
+ });
response.__set_task_done(task_done);
Status st = Status::OK();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 73ddbe4c455..fc580c4fc7e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -46,6 +46,7 @@ import org.apache.doris.thrift.TWarmUpCacheAsyncRequest;
import org.apache.doris.thrift.TWarmUpCacheAsyncResponse;
import com.google.common.base.Preconditions;
+import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -54,6 +55,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -94,7 +96,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new
LinkedBlockingQueue<Pair<Long, Long>>();
- private Map<Long, InfightTask> tabletToInfightTask = new HashMap<Long,
InfightTask>();
+ private Map<InfightTablet, InfightTask> tabletToInfightTask = new
HashMap<>();
private long assignedErrNum = 0;
@@ -115,12 +117,39 @@ public class CloudTabletRebalancer extends MasterDaemon {
PARTITION
}
+ @Getter
+ private class InfightTablet {
+ private final Long tabletId;
+ private final String clusterId;
+
+ public InfightTablet(Long tabletId, String clusterId) {
+ this.tabletId = tabletId;
+ this.clusterId = clusterId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InfightTablet that = (InfightTablet) o;
+ return tabletId.equals(that.tabletId) &&
clusterId.equals(that.clusterId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tabletId, clusterId);
+ }
+ }
+
private class InfightTask {
public Tablet pickedTablet;
public long srcBe;
public long destBe;
public boolean isGlobal;
- public String clusterId;
public Map<Long, List<Tablet>> beToTablets;
public long startTimestamp;
BalanceType balanceType;
@@ -343,41 +372,44 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
public void checkInflghtWarmUpCacheAsync() {
- Map<Long, List<Long>> beToTabletIds = new HashMap<Long, List<Long>>();
+ Map<Long, List<InfightTask>> beToInfightTasks = new HashMap<Long,
List<InfightTask>>();
- for (Map.Entry<Long, InfightTask> entry :
tabletToInfightTask.entrySet()) {
- beToTabletIds.putIfAbsent(entry.getValue().destBe, new
ArrayList<Long>());
-
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
+ for (Map.Entry<InfightTablet, InfightTask> entry :
tabletToInfightTask.entrySet()) {
+ beToInfightTasks.putIfAbsent(entry.getValue().destBe, new
ArrayList<>());
+
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
- for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
+ for (Map.Entry<Long, List<InfightTask>> entry :
beToInfightTasks.entrySet()) {
LOG.info("before pre cache check dest be {} inflight task num {}",
entry.getKey(), entry.getValue().size());
Backend destBackend =
cloudSystemInfoService.getBackend(entry.getKey());
if (destBackend == null) {
- for (long tabletId : entry.getValue()) {
- tabletToInfightTask.remove(tabletId);
+ for (InfightTask task : entry.getValue()) {
+ for (InfightTablet key : tabletToInfightTask.keySet()) {
+ tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), key.clusterId));
+ }
}
continue;
}
-
- Map<Long, Boolean> taskDone =
sendCheckWarmUpCacheAsyncRpc(entry.getValue(), entry.getKey());
+ List<Long> tablets = entry.getValue().stream()
+ .map(task ->
task.pickedTablet.getId()).collect(Collectors.toList());
+ Map<Long, Boolean> taskDone =
sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
if (taskDone == null) {
LOG.warn("sendCheckWarmUpCacheAsyncRpc return null be {},
inFight tasks {}",
entry.getKey(), entry.getValue());
continue;
}
-
+ String clusterId =
cloudSystemInfoService.getBackend(entry.getKey()).getCloudClusterId();
for (Map.Entry<Long, Boolean> result : taskDone.entrySet()) {
- InfightTask task = tabletToInfightTask.get(result.getKey());
- if (result.getValue()
- || System.currentTimeMillis() / 1000 -
task.startTimestamp
- > Config.cloud_pre_heating_time_limit_sec) {
+ InfightTask task = tabletToInfightTask
+ .getOrDefault(new InfightTablet(result.getKey(),
clusterId), null);
+ if (task != null && (result.getValue() ||
System.currentTimeMillis() / 1000 - task.startTimestamp
+ > Config.cloud_pre_heating_time_limit_sec)) {
if (!result.getValue()) {
LOG.info("{} pre cache timeout, forced to change the
mapping", result.getKey());
}
- updateClusterToBeMap(task.pickedTablet, task.destBe,
task.clusterId, infos);
- tabletToInfightTask.remove(result.getKey());
+ updateClusterToBeMap(task.pickedTablet, task.destBe,
clusterId, infos);
+ tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), clusterId));
}
}
}
@@ -393,13 +425,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
// recalculate inflight beToTablets, just for print the log
- beToTabletIds = new HashMap<Long, List<Long>>();
- for (Map.Entry<Long, InfightTask> entry :
tabletToInfightTask.entrySet()) {
- beToTabletIds.putIfAbsent(entry.getValue().destBe, new
ArrayList<Long>());
-
beToTabletIds.get(entry.getValue().destBe).add(entry.getValue().pickedTablet.getId());
+ beToInfightTasks.clear();
+ for (Map.Entry<InfightTablet, InfightTask> entry :
tabletToInfightTask.entrySet()) {
+ beToInfightTasks.putIfAbsent(entry.getValue().destBe, new
ArrayList<>());
+
beToInfightTasks.get(entry.getValue().destBe).add(entry.getValue());
}
- for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
+ for (Map.Entry<Long, List<InfightTask>> entry :
beToInfightTasks.entrySet()) {
LOG.info("after pre cache check dest be {} inflight task num {}",
entry.getKey(), entry.getValue().size());
}
}
@@ -449,7 +481,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
LOG.info("notify decommission response: {} ",
response);
} catch (RpcException e) {
- LOG.info("failed to notify decommission {}", e);
+ LOG.info("failed to notify decommission", e);
return;
}
beToDecommissionedTime.put(beId,
System.currentTimeMillis() / 1000);
@@ -552,8 +584,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
fillBeToTablets(bes.get(0), table.getId(),
partition.getId(), index.getId(), tablet,
tmpBeToTabletsGlobal, beToTabletsInTable,
this.partitionToTablets);
- if (tabletToInfightTask.containsKey(tablet.getId())) {
- InfightTask task =
tabletToInfightTask.get(tablet.getId());
+ InfightTask task = tabletToInfightTask
+ .getOrDefault(new
InfightTablet(tablet.getId(), cluster), null);
+
+ if (task != null) {
fillBeToTablets(task.destBe, table.getId(),
partition.getId(), index.getId(), tablet,
futureBeToTabletsGlobal,
futureBeToTabletsInTable, futurePartitionToTablets);
} else {
@@ -808,9 +842,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
List<Tablet> destBeTablets =
beToTabletsInParts.get(cloudReplica.getPartitionId())
.get(cloudReplica.getIndexId()).get(destBe);
long minBeSize = destBeTablets == null ? 0 : destBeTablets.size();
- if (minBeSize >= maxBeSize) {
- return true;
- }
+ return minBeSize >= maxBeSize;
}
return false;
@@ -881,10 +913,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
task.srcBe = srcBe;
task.destBe = destBe;
task.balanceType = balanceType;
- task.clusterId = clusterId;
task.beToTablets = beToTablets;
task.startTimestamp = System.currentTimeMillis() / 1000;
- tabletToInfightTask.put(pickedTablet.getId(), task);
+ tabletToInfightTask.put(new
InfightTablet(pickedTablet.getId(), clusterId), task);
LOG.info("pre cache {} from {} to {}, cluster {} minNum {}
maxNum {} beNum {} tabletsNum {}, part {}",
pickedTablet.getId(), srcBe, destBe, clusterId,
@@ -936,7 +967,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
CloudReplica cloudReplica = (CloudReplica)
tablet.getReplicas().get(0);
Backend be = cloudSystemInfoService.getBackend(srcBe);
if (be == null) {
- LOG.info("backend {} not found", be);
+ LOG.info("src backend {} not found", srcBe);
continue;
}
String clusterId = be.getCloudClusterId();
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
new file mode 100644
index 00000000000..7e12d9f81f8
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_warmup_rebalance.groovy
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import org.awaitility.Awaitility;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite('test_warmup_rebalance_in_cloud', 'multi_cluster') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'enable_cloud_warm_up_for_rebalance=true',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ 'cloud_balance_tablet_percent_per_run=0.5',
+ 'sys_log_verbose_modules=org',
+ 'cloud_pre_heating_time_limit_sec=600'
+ ]
+ options.setFeNum(2)
+ options.setBeNum(3)
+ options.cloudMode = true
+ options.enableDebugPoints()
+ def check = { String feLogPath ->
+ log.info("search fe log path: {}", feLogPath)
+ Map<String, List<String>> circularRebalanceMap = [:]
+ boolean isCircularRebalanceDetected = false
+
+ new File(feLogPath).text.tokenize('\n')
+ .findAll { it =~ /pre cache ([0-9]+) from ([0-9]+) to ([0-9]+),
cluster ([a-zA-Z0-9_]+)/ }
+ .each { line ->
+ def (tabletId, fromBe, toBe, clusterId) = (line =~ /pre cache
([0-9]+) from ([0-9]+) to ([0-9]+), cluster ([a-zA-Z0-9_]+)/)[0][1..-1]
+
+ String clusterPreCacheKey = "$clusterId-$tabletId"
+
+ if (!circularRebalanceMap.containsKey(clusterPreCacheKey)) {
+ circularRebalanceMap[clusterPreCacheKey] = new ArrayList<>()
+ }
+
+ List<String> paths = circularRebalanceMap[clusterPreCacheKey]
+
+ if (paths.contains(toBe)) {
+ isCircularRebalanceDetected = true
+ log.info("Circular rebalance detected for tabletId: {},
clusterId: {}", tabletId, clusterId)
+ assertFalse(true)
+ }
+
+ paths << fromBe
+ circularRebalanceMap[clusterPreCacheKey] = paths
+
+ if (!paths.contains(toBe)) {
+ paths << (toBe as String)
+ }
+ }
+
+ if (!isCircularRebalanceDetected) {
+ log.info("No circular rebalance detected.")
+ }
+ }
+
+ docker(options) {
+ def clusterName = "newcluster1"
+ // 添加一个新的cluster add_new_cluster
+ cluster.addBackend(2, clusterName)
+
+ def ret = sql_return_maparray """show clusters"""
+ log.info("show clusters: {}", ret)
+ assertEquals(2, ret.size())
+
+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+ sql """set global forward_to_master=false"""
+
+ sql """
+ CREATE TABLE table100 (
+ class INT,
+ id INT,
+ score INT SUM
+ )
+ AGGREGATE KEY(class, id)
+ DISTRIBUTED BY HASH(class) BUCKETS 48
+ """
+
+ sql """
+ INSERT INTO table100 VALUES (1, 1, 100);
+ """
+
+ dockerAwaitUntil(5) {
+ ret = sql """ADMIN SHOW REPLICA DISTRIBUTION FROM table100"""
+ log.info("replica distribution table100: {}", ret)
+ ret.size() == 5
+ }
+
+ sql """use @newcluster1"""
+ def result = sql_return_maparray """ADMIN SHOW REPLICA DISTRIBUTION
FROM table100; """
+ assertEquals(5, result.size())
+ int replicaNum = 0
+
+ for (def row : result) {
+ log.info("replica distribution: ${row} ".toString())
+ if (row.CloudClusterName == "newcluster1") {
+ replicaNum = Integer.valueOf((String) row.ReplicaNum)
+ assertTrue(replicaNum <= 25 && replicaNum >= 23)
+ }
+ }
+ def fe1 = cluster.getFeByIndex(1)
+ String feLogPath = fe1.getLogFilePath()
+ // stop be id 1, 4
+ cluster.stopBackends(1, 4)
+ // check log
+ sleep(10 * 1000)
+ check feLogPath
+
+ // start be id 1, 4
+ cluster.startBackends(1, 4)
+
GetDebugPoint().enableDebugPointForAllBEs("CloudBackendService.check_warm_up_cache_async.return_task_false")
+ // check log
+ sleep(10 * 1000)
+ check feLogPath
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]