This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 2a018eda9f2 branch-4.1: [refactor](cloud) Replace Tablet references
with tabletId in CloudTabletRebalancer (#61233) (#63844)
2a018eda9f2 is described below
commit 2a018eda9f24135d969d5a9833248991bdecb059
Author: meiyi <[email protected]>
AuthorDate: Fri May 29 14:30:31 2026 +0800
branch-4.1: [refactor](cloud) Replace Tablet references with tabletId in
CloudTabletRebalancer (#61233) (#63844)
pick https://github.com/apache/doris/pull/61233
---------
Co-authored-by: Yongqiang YANG <[email protected]>
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../doris/cloud/catalog/CloudTabletRebalancer.java | 508 ++++++++++++---------
.../cloud/catalog/CloudTabletRebalancerTest.java | 48 +-
2 files changed, 299 insertions(+), 257 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 621aa916aac..6f0e4073dd1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
import org.apache.doris.cloud.proto.Cloud;
@@ -81,32 +82,32 @@ import java.util.stream.Collectors;
public class CloudTabletRebalancer extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(CloudTabletRebalancer.class);
- private volatile ConcurrentHashMap<Long, Set<Tablet>> beToTabletsGlobal =
- new ConcurrentHashMap<Long, Set<Tablet>>();
+ private volatile ConcurrentHashMap<Long, Set<Long>> beToTabletsGlobal =
+ new ConcurrentHashMap<Long, Set<Long>>();
- private volatile ConcurrentHashMap<Long, Set<Tablet>>
beToColocateTabletsGlobal =
- new ConcurrentHashMap<Long, Set<Tablet>>();
+ private volatile ConcurrentHashMap<Long, Set<Long>>
beToColocateTabletsGlobal =
+ new ConcurrentHashMap<Long, Set<Long>>();
// used for cloud tablet report
- private volatile ConcurrentHashMap<Long, Set<Tablet>>
beToTabletsGlobalInSecondary =
- new ConcurrentHashMap<Long, Set<Tablet>>();
+ private volatile ConcurrentHashMap<Long, Set<Long>>
beToTabletsGlobalInSecondary =
+ new ConcurrentHashMap<Long, Set<Long>>();
- private volatile ConcurrentHashMap<Long, Set<Tablet>>
futureBeToTabletsGlobal;
+ private volatile ConcurrentHashMap<Long, Set<Long>>
futureBeToTabletsGlobal;
private Map<String, List<Long>> clusterToBes;
private Set<Long> allBes;
- // partitionId -> indexId -> be -> tablet
- private ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>> partitionToTablets;
+ // partitionId -> indexId -> be -> tabletIds
+ private ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>> partitionToTablets;
- private ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ private ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>
futurePartitionToTablets;
- // tableId -> be -> tablet
- private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
beToTabletsInTable;
+ // tableId -> be -> tabletIds
+ private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Long>>>
beToTabletsInTable;
- private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
futureBeToTabletsInTable;
+ private ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Long>>>
futureBeToTabletsInTable;
private Map<Long, Long> beToDecommissionedTime = new HashMap<Long, Long>();
@@ -336,7 +337,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private class InfightTask {
- public Tablet pickedTablet;
+ public long pickedTabletId;
public long srcBe;
public long destBe;
public long startTimestamp;
@@ -372,13 +373,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private static class WarmupTabletTask {
- private final Tablet pickedTablet;
+ private final long pickedTabletId;
private final long srcBe;
private final long destBe;
private final String clusterId;
- WarmupTabletTask(Tablet pickedTablet, long srcBe, long destBe, String
clusterId) {
- this.pickedTablet = pickedTablet;
+ WarmupTabletTask(long pickedTabletId, long srcBe, long destBe, String
clusterId) {
+ this.pickedTabletId = pickedTabletId;
this.srcBe = srcBe;
this.destBe = destBe;
this.clusterId = clusterId;
@@ -434,20 +435,16 @@ public class CloudTabletRebalancer extends MasterDaemon {
public Set<Long> getSnapshotTabletsInPrimaryByBeId(Long beId) {
Set<Long> tabletIds = Sets.newHashSet();
- Set<Tablet> tablets = beToTabletsGlobal.get(beId);
+ Set<Long> tablets = beToTabletsGlobal.get(beId);
if (tablets != null) {
// Create a copy
- for (Tablet tablet : new HashSet<>(tablets)) {
- tabletIds.add(tablet.getId());
- }
+ tabletIds.addAll(new HashSet<>(tablets));
}
- Set<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
+ Set<Long> colocateTablets = beToColocateTabletsGlobal.get(beId);
if (colocateTablets != null) {
// Create a copy
- for (Tablet tablet : new HashSet<>(colocateTablets)) {
- tabletIds.add(tablet.getId());
- }
+ tabletIds.addAll(new HashSet<>(colocateTablets));
}
return tabletIds;
@@ -455,12 +452,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
public Set<Long> getSnapshotTabletsInSecondaryByBeId(Long beId) {
Set<Long> tabletIds = Sets.newHashSet();
- Set<Tablet> tablets = beToTabletsGlobalInSecondary.get(beId);
+ Set<Long> tablets = beToTabletsGlobalInSecondary.get(beId);
if (tablets != null) {
// Create a copy
- for (Tablet tablet : new HashSet<>(tablets)) {
- tabletIds.add(tablet.getId());
- }
+ tabletIds.addAll(new HashSet<>(tablets));
}
return tabletIds;
}
@@ -473,14 +468,14 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
public int getTabletNumByBackendId(long beId) {
- Map<Long, Set<Tablet>> sourceMap = beToTabletsGlobal;
- ConcurrentHashMap<Long, Set<Tablet>> futureMap =
futureBeToTabletsGlobal;
+ Map<Long, Set<Long>> sourceMap = beToTabletsGlobal;
+ ConcurrentHashMap<Long, Set<Long>> futureMap = futureBeToTabletsGlobal;
if (futureMap != null && !futureMap.isEmpty()) {
sourceMap = futureMap;
}
- Set<Tablet> tablets = sourceMap.get(beId);
- Set<Tablet> colocateTablets = beToColocateTabletsGlobal.get(beId);
+ Set<Long> tablets = sourceMap.get(beId);
+ Set<Long> colocateTablets = beToColocateTabletsGlobal.get(beId);
int tabletsSize = (tablets == null) ? 0 : tablets.size();
int colocateTabletsSize = (colocateTablets == null) ? 0 :
colocateTablets.size();
@@ -645,11 +640,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
indexBalanced = true;
if (LOG.isDebugEnabled()) {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
beToTabletsGlobal.entrySet()) {
LOG.debug("before partition balance({}) be {} tablet num {}",
phase, entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.debug("before partition balance({}) be {} tablet
num(current + pre heating inflight) {}",
phase, entry.getKey(), entry.getValue().size());
}
@@ -681,11 +676,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
if (LOG.isDebugEnabled()) {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
beToTabletsGlobal.entrySet()) {
LOG.debug("after partition balance({}) be {} tablet num {}",
phase, entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.debug("after partition balance({}) be {} tablet
num(current + pre heating inflight) {}",
phase, entry.getKey(), entry.getValue().size());
}
@@ -697,11 +692,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
tableBalanced = true;
if (LOG.isDebugEnabled()) {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
beToTabletsGlobal.entrySet()) {
LOG.debug("before table balance({}) be {} tablet num {}",
phase, entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.debug("before table balance({}) be {} tablet num(current +
pre heating inflight) {}",
phase, entry.getKey(), entry.getValue().size());
}
@@ -731,11 +726,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
if (LOG.isDebugEnabled()) {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
beToTabletsGlobal.entrySet()) {
LOG.debug("after table balance({}) be {} tablet num {}",
phase, entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.debug("after table balance({}) be {} tablet num(current +
pre heating inflight) {}",
phase, entry.getKey(), entry.getValue().size());
}
@@ -745,10 +740,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
public void globalBalance() {
if (LOG.isDebugEnabled()) {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
beToTabletsGlobal.entrySet()) {
LOG.debug("before global balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.debug("before global balance be {} tablet num(current +
pre heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
@@ -774,10 +769,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
if (LOG.isDebugEnabled()) {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
beToTabletsGlobal.entrySet()) {
LOG.debug("after global balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
}
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ for (Map.Entry<Long, Set<Long>> entry :
futureBeToTabletsGlobal.entrySet()) {
LOG.debug("after global balance be {} tablet num(current + pre
heating inflight) {}",
entry.getKey(), entry.getValue().size());
}
@@ -821,7 +816,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
List<InfightTablet> toRemove = new LinkedList<>();
for (InfightTask task : entry.getValue()) {
for (InfightTablet key : tabletToInfightTask.keySet()) {
- toRemove.add(new
InfightTablet(task.pickedTablet.getId(), key.clusterId));
+ toRemove.add(new InfightTablet(task.pickedTabletId,
key.clusterId));
}
}
for (InfightTablet key : toRemove) {
@@ -837,7 +832,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
continue;
}
List<Long> tablets = entry.getValue().stream()
- .map(task ->
task.pickedTablet.getId()).collect(Collectors.toList());
+ .map(task ->
task.pickedTabletId).collect(Collectors.toList());
// check dest backend whether warmup cache done
Map<Long, Boolean> taskDone =
sendCheckWarmUpCacheAsyncRpc(tablets, entry.getKey());
if (taskDone == null) {
@@ -879,7 +874,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
List<Long> beList = entry.getValue();
for (long beId : beList) {
- Set<Tablet> tablets = beToTabletsGlobal.get(beId);
+ Set<Long> tablets = beToTabletsGlobal.get(beId);
int tabletNum = tablets == null ? 0 : tablets.size();
Backend backend = cloudSystemInfoService.getBackend(beId);
if (backend == null) {
@@ -1033,28 +1028,28 @@ public class CloudTabletRebalancer extends MasterDaemon
{
return true;
}
- public void fillBeToTablets(long be, long tableId, long partId, long
indexId, Tablet tablet,
- ConcurrentHashMap<Long, Set<Tablet>>
globalBeToTablets,
- ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable,
- ConcurrentHashMap<Long,
ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
+ public void fillBeToTablets(long be, long tableId, long partId, long
indexId, long tabletId,
+ ConcurrentHashMap<Long, Set<Long>>
globalBeToTablets,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>> beToTabletsInTable,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Long>>>>
partToTablets) {
// global
globalBeToTablets.putIfAbsent(be, ConcurrentHashMap.newKeySet());
- globalBeToTablets.get(be).add(tablet);
+ globalBeToTablets.get(be).add(tabletId);
// table
- beToTabletsInTable.putIfAbsent(tableId, new ConcurrentHashMap<Long,
Set<Tablet>>());
- ConcurrentHashMap<Long, Set<Tablet>> beToTabletsOfTable =
beToTabletsInTable.get(tableId);
+ beToTabletsInTable.putIfAbsent(tableId, new ConcurrentHashMap<Long,
Set<Long>>());
+ ConcurrentHashMap<Long, Set<Long>> beToTabletsOfTable =
beToTabletsInTable.get(tableId);
beToTabletsOfTable.putIfAbsent(be, ConcurrentHashMap.newKeySet());
- beToTabletsOfTable.get(be).add(tablet);
+ beToTabletsOfTable.get(be).add(tabletId);
// partition
- partToTablets.putIfAbsent(partId, new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>());
- ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
indexToTablets = partToTablets.get(partId);
- indexToTablets.putIfAbsent(indexId, new ConcurrentHashMap<Long,
Set<Tablet>>());
- ConcurrentHashMap<Long, Set<Tablet>> beToTabletsOfIndex =
indexToTablets.get(indexId);
+ partToTablets.putIfAbsent(partId, new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>());
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Long>>>
indexToTablets = partToTablets.get(partId);
+ indexToTablets.putIfAbsent(indexId, new ConcurrentHashMap<Long,
Set<Long>>());
+ ConcurrentHashMap<Long, Set<Long>> beToTabletsOfIndex =
indexToTablets.get(indexId);
beToTabletsOfIndex.putIfAbsent(be, ConcurrentHashMap.newKeySet());
- beToTabletsOfIndex.get(be).add(tablet);
+ beToTabletsOfIndex.get(be).add(tabletId);
}
private void enqueueWarmupTask(WarmupTabletTask task) {
@@ -1086,7 +1081,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
String.format("backend missing or dead, src %s dest %s",
srcBackend, destBackend)));
return;
}
- List<Long> tabletIds = tasks.stream().map(task ->
task.pickedTablet.getId()).collect(Collectors.toList());
+ List<Long> tabletIds = tasks.stream().map(task ->
task.pickedTabletId).collect(Collectors.toList());
try {
sendPreHeatingRpc(tabletIds, key.getSrcBe(), key.getDestBe());
} catch (Exception e) {
@@ -1109,9 +1104,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private void revertWarmupState(WarmupTabletTask task) {
- updateBeToTablets(task.pickedTablet, task.destBe, task.srcBe,
+ updateBeToTablets(task.pickedTabletId, task.destBe, task.srcBe,
futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
- tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), task.clusterId));
+ tabletToInfightTask.remove(new InfightTablet(task.pickedTabletId,
task.clusterId));
}
private void processFailedWarmupTasks() {
@@ -1132,20 +1127,20 @@ public class CloudTabletRebalancer extends MasterDaemon
{
}
public void statRouteInfo() {
- ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobal = new
ConcurrentHashMap<Long, Set<Tablet>>();
- ConcurrentHashMap<Long, Set<Tablet>> tmpFutureBeToTabletsGlobal = new
ConcurrentHashMap<Long, Set<Tablet>>();
- ConcurrentHashMap<Long, Set<Tablet>> tmpBeToTabletsGlobalInSecondary
- = new ConcurrentHashMap<Long, Set<Tablet>>();
- ConcurrentHashMap<Long, Set<Tablet>> tmpBeToColocateTabletsGlobal
- = new ConcurrentHashMap<Long, Set<Tablet>>();
+ ConcurrentHashMap<Long, Set<Long>> tmpBeToTabletsGlobal = new
ConcurrentHashMap<Long, Set<Long>>();
+ ConcurrentHashMap<Long, Set<Long>> tmpFutureBeToTabletsGlobal = new
ConcurrentHashMap<Long, Set<Long>>();
+ ConcurrentHashMap<Long, Set<Long>> tmpBeToTabletsGlobalInSecondary
+ = new ConcurrentHashMap<Long, Set<Long>>();
+ ConcurrentHashMap<Long, Set<Long>> tmpBeToColocateTabletsGlobal
+ = new ConcurrentHashMap<Long, Set<Long>>();
partitionToTablets = new ConcurrentHashMap<Long,
- ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>();
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Long>>>>();
futurePartitionToTablets =
- new ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>();
+ new ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>();
- beToTabletsInTable = new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>();
- futureBeToTabletsInTable = new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>();
+ beToTabletsInTable = new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>();
+ futureBeToTabletsInTable = new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>();
// rebuild scheduling caches for this run
Map<Long, Long> tmpTableActive = new HashMap<>();
@@ -1164,8 +1159,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
return name != null && INTERNAL_DB_NAMES.contains(name);
});
for (Tablet tablet : index.getTablets()) {
+ long tabletId = tablet.getId();
// active tablet scoring (used for scheduling order)
- if (activeTabletIds != null && !activeTabletIds.isEmpty() &&
activeTabletIds.contains(tablet.getId())) {
+ if (activeTabletIds != null && !activeTabletIds.isEmpty() &&
activeTabletIds.contains(tabletId)) {
tmpTableActive.merge(table.getId(), 1L, Long::sum);
tmpPartitionActive.merge(partition.getId(), 1L, Long::sum);
tmpDbActive.merge(db.getId(), 1L, Long::sum);
@@ -1180,9 +1176,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
continue;
}
if (allBes.contains(beId)) {
- Set<Tablet> colocateTablets =
+ Set<Long> colocateTablets =
tmpBeToColocateTabletsGlobal.computeIfAbsent(beId, k -> new HashSet<>());
- colocateTablets.add(tablet);
+ colocateTablets.add(tabletId);
}
continue;
}
@@ -1196,18 +1192,18 @@ public class CloudTabletRebalancer extends MasterDaemon
{
Backend secondaryBe = replica.getSecondaryBackend(cluster);
long secondaryBeId = secondaryBe == null ? -1L :
secondaryBe.getId();
if (allBes.contains(secondaryBeId)) {
- Set<Tablet> tablets = tmpBeToTabletsGlobalInSecondary
+ Set<Long> tablets = tmpBeToTabletsGlobalInSecondary
.computeIfAbsent(secondaryBeId, k -> new
HashSet<>());
- tablets.add(tablet);
+ tablets.add(tabletId);
}
- InfightTablet taskKey = new InfightTablet(tablet.getId(),
cluster);
+ InfightTablet taskKey = new InfightTablet(tabletId,
cluster);
InfightTask task = tabletToInfightTask.get(taskKey);
long futureBeId = task == null ? beId : task.destBe;
- fillBeToTablets(beId, table.getId(), partition.getId(),
index.getId(), tablet,
+ fillBeToTablets(beId, table.getId(), partition.getId(),
index.getId(), tabletId,
tmpBeToTabletsGlobal, beToTabletsInTable,
this.partitionToTablets);
- fillBeToTablets(futureBeId, table.getId(),
partition.getId(), index.getId(), tablet,
+ fillBeToTablets(futureBeId, table.getId(),
partition.getId(), index.getId(), tabletId,
tmpFutureBeToTabletsGlobal,
futureBeToTabletsInTable, futurePartitionToTablets);
}
}
@@ -1260,22 +1256,22 @@ public class CloudTabletRebalancer extends MasterDaemon
{
private void balanceInPartition(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos,
ActiveSchedulePhase phase) {
// balance all partition (prefer active partitions/tables, put
internal db at tail)
- Iterable<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> partitions;
+ Iterable<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>> partitions;
if (Config.enable_cloud_active_tablet_priority_scheduling) {
- final Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> cmp =
+ final Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>> cmp =
partitionEntryComparator();
// Phase-aware filtering and ordering.
// - ACTIVE_ONLY: only non-internal partitions with activeCnt > 0
// - INACTIVE_ONLY: all remaining partitions (non-internal
inactive first, internal last)
// - ALL: active (TopN first if configured) -> inactive -> internal
- List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> nonInternalActive =
+ List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>> nonInternalActive =
new ArrayList<>();
- List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> nonInternalInactive =
+ List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>> nonInternalInactive =
new ArrayList<>();
- List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> internalPartitions =
+ List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>> internalPartitions =
new ArrayList<>();
- for (Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>> e
+ for (Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>> e
: futurePartitionToTablets.entrySet()) {
long partId = e.getKey();
boolean internal =
isInternalDbId(partitionIdToDbId.get(partId));
@@ -1298,7 +1294,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
nonInternalInactive.sort(cmp);
internalPartitions.sort(cmp);
- List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> ordered =
+ List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>> ordered =
new ArrayList<>(futurePartitionToTablets.size());
if (phase == ActiveSchedulePhase.ACTIVE_ONLY) {
// In ACTIVE_ONLY phase, schedule all active partitions
(already sorted by cmp, most active first)
@@ -1318,15 +1314,15 @@ public class CloudTabletRebalancer extends MasterDaemon
{
partitions = futurePartitionToTablets.entrySet();
}
- for (Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Tablet>>>> partitionEntry
+ for (Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Long>>>> partitionEntry
: partitions) {
- Map<Long, ConcurrentHashMap<Long, Set<Tablet>>> indexToTablets =
partitionEntry.getValue();
+ Map<Long, ConcurrentHashMap<Long, Set<Long>>> indexToTablets =
partitionEntry.getValue();
// balance all index of a partition
- List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
indexes =
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>> indexes =
new ArrayList<>(indexToTablets.entrySet());
// index-level ordering is not critical; keep stable by id
indexes.sort(Comparator.comparingLong(Map.Entry::getKey));
- for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry :
indexes) {
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>> entry :
indexes) {
// balance a index
// Fast path: this index has no tablets in this cluster, skip
to avoid useless balanceImpl work.
if (calculateTotalTablets(bes, entry.getValue()) == 0) {
@@ -1340,14 +1336,14 @@ public class CloudTabletRebalancer extends MasterDaemon
{
private void balanceInTable(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos,
ActiveSchedulePhase phase) {
// balance all tables (prefer active tables/dbs, put internal db at
tail)
- Iterable<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> tables;
+ Iterable<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>> tables;
if (Config.enable_cloud_active_tablet_priority_scheduling) {
- final Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
Set<Tablet>>>> cmp = tableEntryComparator();
- List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
nonInternalActive = new ArrayList<>();
- List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
nonInternalInactive = new ArrayList<>();
- List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
internalTables = new ArrayList<>();
+ final Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
Set<Long>>>> cmp = tableEntryComparator();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>>
nonInternalActive = new ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>>
nonInternalInactive = new ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>>
internalTables = new ArrayList<>();
- for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> e :
futureBeToTabletsInTable.entrySet()) {
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>> e :
futureBeToTabletsInTable.entrySet()) {
long tableId = e.getKey();
boolean internal = isInternalDbId(tableIdToDbId.get(tableId));
long activeCnt = tableIdToActiveCount.getOrDefault(tableId,
0L);
@@ -1366,7 +1362,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
nonInternalInactive.sort(cmp);
internalTables.sort(cmp);
- List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
ordered =
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>> ordered =
new ArrayList<>(futureBeToTabletsInTable.size());
if (phase == ActiveSchedulePhase.ACTIVE_ONLY) {
ordered.addAll(nonInternalActive);
@@ -1384,7 +1380,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
tables = futureBeToTabletsInTable.entrySet();
}
- for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry :
tables) {
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>> entry :
tables) {
// Fast path: this table has no tablets in this cluster, skip.
if (calculateTotalTablets(bes, entry.getValue()) == 0) {
continue;
@@ -1413,7 +1409,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
return internal;
}
- private Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
tableEntryComparator() {
+ private Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>>
tableEntryComparator() {
return (a, b) -> {
Long tableIdA = a.getKey();
Long tableIdB = b.getKey();
@@ -1439,7 +1435,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
private Comparator<Map.Entry<Long,
- ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>>
partitionEntryComparator() {
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Long>>>>>
partitionEntryComparator() {
return (a, b) -> {
Long partIdA = a.getKey();
Long partIdB = b.getKey();
@@ -1464,10 +1460,6 @@ public class CloudTabletRebalancer extends MasterDaemon {
};
}
- private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long
destBe) throws Exception {
- sendPreHeatingRpc(Collections.singletonList(pickedTablet.getId()),
srcBe, destBe);
- }
-
private void sendPreHeatingRpc(List<Long> tabletIds, long srcBe, long
destBe) throws Exception {
BackendService.Client client = null;
TNetworkAddress address = null;
@@ -1576,51 +1568,81 @@ public class CloudTabletRebalancer extends MasterDaemon
{
return;
}
- updateClusterToBeMap(task.pickedTablet, task.destBe, clusterId, infos);
+ updateClusterToBeMap(task.pickedTabletId, task.destBe, clusterId,
infos);
if (LOG.isDebugEnabled()) {
- LOG.debug("remove tablet {}-{}", clusterId,
task.pickedTablet.getId());
+ LOG.debug("remove tablet {}-{}", clusterId, task.pickedTabletId);
}
- tabletToInfightTask.remove(new
InfightTablet(task.pickedTablet.getId(), clusterId));
+ tabletToInfightTask.remove(new InfightTablet(task.pickedTabletId,
clusterId));
if (BalanceTypeEnum.SYNC_WARMUP.equals(currentBalanceType)) {
try {
// send sync cache rpc again, ignore the result, the best
effort to sync some new data
- sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
+
sendPreHeatingRpc(Collections.singletonList(task.pickedTabletId), task.srcBe,
task.destBe);
} catch (Exception e) {
LOG.warn("Failed to preheat tablet {} from {} to {}, "
+ "help msg change fe config
cloud_warm_up_for_rebalance_type to without_warmup, ",
- task.pickedTablet.getId(), task.srcBe, task.destBe, e);
+ task.pickedTabletId, task.srcBe, task.destBe, e);
}
}
}
- private void updateBeToTablets(Tablet pickedTablet, long srcBe, long
destBe,
- ConcurrentHashMap<Long, Set<Tablet>>
globalBeToTablets,
- ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTable,
+ private void updateBeToTablets(long tabletId, long srcBe, long destBe,
+ ConcurrentHashMap<Long, Set<Long>>
globalBeToTablets,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>> beToTabletsInTable,
ConcurrentHashMap<Long,
ConcurrentHashMap<Long, ConcurrentHashMap<Long,
- Set<Tablet>>>> partToTablets) {
- CloudReplica replica = ((CloudTablet) pickedTablet).getCloudReplica();
- long tableId = replica.getTableId();
- long partId = replica.getPartitionId();
- long indexId = replica.getIndexId();
+ Set<Long>>>> partToTablets) {
+ TabletMeta tabletMeta =
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(tabletId);
+ if (tabletMeta == null) {
+ LOG.warn("tablet {} meta not found in inverted index, skip
updateBeToTablets", tabletId);
+ return;
+ }
+ long tableId = tabletMeta.getTableId();
+ long partId = tabletMeta.getPartitionId();
+ long indexId = tabletMeta.getIndexId();
- globalBeToTablets.get(srcBe).remove(pickedTablet);
- beToTabletsInTable.get(tableId).get(srcBe).remove(pickedTablet);
- partToTablets.get(partId).get(indexId).get(srcBe).remove(pickedTablet);
+ Set<Long> globalSrcTablets = globalBeToTablets.get(srcBe);
+ if (globalSrcTablets == null || !globalSrcTablets.remove(tabletId)) {
+ LOG.debug("skip updateBeToTablets for tablet {}: srcBe {} not in
globalBeToTablets", tabletId, srcBe);
+ return;
+ }
- fillBeToTablets(destBe, tableId, partId, indexId, pickedTablet,
globalBeToTablets, beToTabletsInTable,
+ ConcurrentHashMap<Long, Set<Long>> tableBeMap =
beToTabletsInTable.get(tableId);
+ if (tableBeMap == null) {
+ return;
+ }
+ Set<Long> tableSrcTablets = tableBeMap.get(srcBe);
+ if (tableSrcTablets != null) {
+ tableSrcTablets.remove(tabletId);
+ }
+
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Long>>> indexMap =
partToTablets.get(partId);
+ if (indexMap != null) {
+ ConcurrentHashMap<Long, Set<Long>> beMap = indexMap.get(indexId);
+ if (beMap != null) {
+ Set<Long> partSrcTablets = beMap.get(srcBe);
+ if (partSrcTablets != null) {
+ partSrcTablets.remove(tabletId);
+ }
+ }
+ }
+
+ fillBeToTablets(destBe, tableId, partId, indexId, tabletId,
globalBeToTablets, beToTabletsInTable,
partToTablets);
}
- private void updateClusterToBeMap(Tablet pickedTablet, long destBe, String
clusterId,
+ private void updateClusterToBeMap(long tabletId, long destBe, String
clusterId,
List<UpdateCloudReplicaInfo> infos) {
- CloudReplica cloudReplica = ((CloudTablet)
pickedTablet).getCloudReplica();
- Database db =
Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
+ TabletMeta tabletMeta =
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(tabletId);
+ if (tabletMeta == null) {
+ LOG.warn("tablet {} meta not found in inverted index, skip
updateClusterToBeMap", tabletId);
+ return;
+ }
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId());
if (db == null) {
return;
}
- OlapTable table = (OlapTable)
db.getTableNullable(cloudReplica.getTableId());
+ OlapTable table = (OlapTable)
db.getTableNullable(tabletMeta.getTableId());
if (table == null) {
return;
}
@@ -1628,21 +1650,38 @@ public class CloudTabletRebalancer extends MasterDaemon
{
table.readLock();
try {
- if (db.getTableNullable(cloudReplica.getTableId()) == null) {
+ if (db.getTableNullable(tabletMeta.getTableId()) == null) {
+ return;
+ }
+
+ Partition partition =
table.getPartition(tabletMeta.getPartitionId());
+ if (partition == null) {
+ return;
+ }
+ MaterializedIndex index =
partition.getIndex(tabletMeta.getIndexId());
+ if (index == null) {
+ return;
+ }
+ Tablet tablet = index.getTablet(tabletId);
+ if (tablet == null) {
+ return;
+ }
+ CloudReplica cloudReplica = ((CloudTablet)
tablet).getCloudReplica();
+ if (cloudReplica == null) {
return;
}
cloudReplica.updateClusterToPrimaryBe(clusterId, destBe);
- UpdateCloudReplicaInfo info = new
UpdateCloudReplicaInfo(cloudReplica.getDbId(),
- cloudReplica.getTableId(), cloudReplica.getPartitionId(),
cloudReplica.getIndexId(),
- pickedTablet.getId(), cloudReplica.getId(), clusterId,
destBe);
+ UpdateCloudReplicaInfo info = new
UpdateCloudReplicaInfo(tabletMeta.getDbId(),
+ tabletMeta.getTableId(), tabletMeta.getPartitionId(),
tabletMeta.getIndexId(),
+ tabletId, cloudReplica.getId(), clusterId, destBe);
infos.add(info);
} finally {
table.readUnlock();
}
}
- private boolean getTransferPair(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets, long avgNum,
+ private boolean getTransferPair(List<Long> bes, Map<Long, Set<Long>>
beToTablets, long avgNum,
TransferPairInfo pairInfo) {
long srcBe = findSourceBackend(bes, beToTablets);
long destBe = findDestinationBackend(bes, beToTablets, srcBe);
@@ -1665,7 +1704,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
return true;
}
- private long findSourceBackend(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets) {
+ private long findSourceBackend(List<Long> bes, Map<Long, Set<Long>>
beToTablets) {
long srcBe = -1;
long maxTabletsNum = 0;
@@ -1690,7 +1729,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
return srcBe;
}
- private long findDestinationBackend(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets, long srcBe) {
+ private long findDestinationBackend(List<Long> bes, Map<Long, Set<Long>>
beToTablets, long srcBe) {
long minTabletsNum = Long.MAX_VALUE;
List<Long> candidateBes = new ArrayList<>();
@@ -1734,82 +1773,87 @@ public class CloudTabletRebalancer extends MasterDaemon
{
return true;
}
- private boolean isConflict(long srcBe, long destBe, CloudReplica
cloudReplica, BalanceType balanceType,
- ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ private boolean isConflict(long srcBe, long destBe, long tabletId,
BalanceType balanceType,
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>
beToTabletsInParts,
- ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Tablet>>> beToTabletsInTables) {
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Long>>> beToTabletsInTables) {
if (cloudSystemInfoService.getBackend(srcBe).isDecommissioning()
||
cloudSystemInfoService.getBackend(srcBe).isDecommissioned()) {
return false; // If source BE is decommissioned, no conflict
}
+ TabletMeta tabletMeta =
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(tabletId);
+ if (tabletMeta == null) {
+ return true; // Cannot find metadata, treat as conflict
+ }
+
if (balanceType == BalanceType.GLOBAL) {
- return checkGlobalBalanceConflict(srcBe, destBe, cloudReplica,
beToTabletsInParts, beToTabletsInTables);
+ return checkGlobalBalanceConflict(srcBe, destBe, tabletMeta,
beToTabletsInParts, beToTabletsInTables);
} else if (balanceType == BalanceType.TABLE) {
- return checkTableBalanceConflict(srcBe, destBe, cloudReplica,
beToTabletsInParts);
+ return checkTableBalanceConflict(srcBe, destBe, tabletMeta,
beToTabletsInParts);
}
return false;
}
- private boolean checkGlobalBalanceConflict(long srcBe, long destBe,
CloudReplica cloudReplica,
+ private boolean checkGlobalBalanceConflict(long srcBe, long destBe,
TabletMeta tabletMeta,
ConcurrentHashMap<Long,
- ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>
beToTabletsInParts,
- ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>
beToTabletsInTables) {
- long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica,
beToTabletsInParts);
- long minBeSize = getTabletSizeInParts(destBe, cloudReplica,
beToTabletsInParts);
+ long maxBeSize = getTabletSizeInParts(srcBe, tabletMeta,
beToTabletsInParts);
+ long minBeSize = getTabletSizeInParts(destBe, tabletMeta,
beToTabletsInParts);
if (minBeSize >= maxBeSize) {
return true; // Conflict detected
}
- maxBeSize = getTabletSizeInBes(srcBe, cloudReplica,
beToTabletsInTables);
- minBeSize = getTabletSizeInBes(destBe, cloudReplica,
beToTabletsInTables);
+ maxBeSize = getTabletSizeInBes(srcBe, tabletMeta, beToTabletsInTables);
+ minBeSize = getTabletSizeInBes(destBe, tabletMeta,
beToTabletsInTables);
return minBeSize >= maxBeSize; // Conflict detected
}
- private boolean checkTableBalanceConflict(long srcBe, long destBe,
CloudReplica cloudReplica,
+ private boolean checkTableBalanceConflict(long srcBe, long destBe,
TabletMeta tabletMeta,
ConcurrentHashMap<Long,
- ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>
beToTabletsInParts) {
- long maxBeSize = getTabletSizeInParts(srcBe, cloudReplica,
beToTabletsInParts);
- long minBeSize = getTabletSizeInParts(destBe, cloudReplica,
beToTabletsInParts);
+ long maxBeSize = getTabletSizeInParts(srcBe, tabletMeta,
beToTabletsInParts);
+ long minBeSize = getTabletSizeInParts(destBe, tabletMeta,
beToTabletsInParts);
return minBeSize >= maxBeSize; // Conflict detected
}
- private long getTabletSizeInParts(long beId, CloudReplica cloudReplica,
+ private long getTabletSizeInParts(long beId, TabletMeta tabletMeta,
ConcurrentHashMap<Long,
- ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>
beToTabletsInParts) {
- ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>
indexToTablets
- = beToTabletsInParts.get(cloudReplica.getPartitionId());
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Long>>>
indexToTablets
+ = beToTabletsInParts.get(tabletMeta.getPartitionId());
if (indexToTablets == null) {
return 0;
}
- ConcurrentHashMap<Long, Set<Tablet>> beToTablets =
indexToTablets.get(cloudReplica.getIndexId());
+ ConcurrentHashMap<Long, Set<Long>> beToTablets =
indexToTablets.get(tabletMeta.getIndexId());
if (beToTablets == null) {
return 0;
}
- Set<Tablet> tablets = beToTablets.get(beId);
+ Set<Long> tablets = beToTablets.get(beId);
return tablets == null ? 0 : tablets.size();
}
- private long getTabletSizeInBes(long beId, CloudReplica cloudReplica,
- ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>> beToTabletsInTables) {
- ConcurrentHashMap<Long, Set<Tablet>> beToTablets =
beToTabletsInTables.get(cloudReplica.getTableId());
+ private long getTabletSizeInBes(long beId, TabletMeta tabletMeta,
+ ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>> beToTabletsInTables) {
+ ConcurrentHashMap<Long, Set<Long>> beToTablets =
beToTabletsInTables.get(tabletMeta.getTableId());
if (beToTablets == null) {
return 0;
}
- Set<Tablet> tablets = beToTablets.get(beId);
+ Set<Long> tablets = beToTablets.get(beId);
return tablets == null ? 0 : tablets.size();
}
- private void balanceImpl(List<Long> bes, String clusterId, Map<Long,
Set<Tablet>> beToTablets,
+ private void balanceImpl(List<Long> bes, String clusterId, Map<Long,
Set<Long>> beToTablets,
BalanceType balanceType, List<UpdateCloudReplicaInfo> infos) {
if (bes == null || bes.isEmpty() || beToTablets == null ||
beToTablets.isEmpty()) {
return;
@@ -1843,47 +1887,47 @@ public class CloudTabletRebalancer extends MasterDaemon
{
long srcBe = pairInfo.srcBe;
long destBe = pairInfo.destBe;
- Tablet pickedTablet = pickTabletPreferCold(srcBe,
beToTablets.get(srcBe),
+ Long pickedTabletId = pickTabletPreferCold(srcBe,
beToTablets.get(srcBe),
this.activeTabletIds, pickedTabletIds);
- if (pickedTablet == null) {
+ if (pickedTabletId == null) {
continue; // No tablet to pick
}
- pickedTabletIds.add(pickedTablet.getId());
- CloudReplica cloudReplica = (CloudReplica)
pickedTablet.getReplicas().get(0);
+ pickedTabletIds.add(pickedTabletId);
Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
if ((BalanceTypeEnum.WITHOUT_WARMUP.equals(currentBalanceType)
||
BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType))
&& srcBackend != null && srcBackend.isAlive()) {
// direct switch, update fe meta directly, not send preheating
task
- if (isConflict(srcBe, destBe, cloudReplica, balanceType,
partitionToTablets, beToTabletsInTable)) {
+ if (isConflict(srcBe, destBe, pickedTabletId, balanceType,
+ partitionToTablets, beToTabletsInTable)) {
continue;
}
- boolean moved = transferTablet(pickedTablet, srcBe, destBe,
clusterId, balanceType, infos);
+ boolean moved = transferTablet(pickedTabletId, srcBe, destBe,
clusterId, balanceType, infos);
if (moved) {
updateBalanceStatus(balanceType);
}
if
(BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) {
- LOG.debug("directly switch {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe,
+ LOG.debug("directly switch {} from {} to {}, cluster {}",
pickedTabletId, srcBe, destBe,
clusterId);
// send sync cache rpc, best effort
try {
- sendPreHeatingRpc(pickedTablet, srcBe, destBe);
+
sendPreHeatingRpc(Collections.singletonList(pickedTabletId), srcBe, destBe);
} catch (Exception e) {
LOG.debug("Failed to preheat tablet {} from {} to {}, "
+ "directly policy, just ignore the error",
- pickedTablet.getId(), srcBe, destBe, e);
+ pickedTabletId, srcBe, destBe, e);
return;
}
}
} else {
// cache warm up
- if (isConflict(srcBe, destBe, cloudReplica, balanceType,
+ if (isConflict(srcBe, destBe, pickedTabletId, balanceType,
futurePartitionToTablets, futureBeToTabletsInTable)) {
continue;
}
- boolean moved = preheatAndUpdateTablet(pickedTablet, srcBe,
destBe,
+ boolean moved = preheatAndUpdateTablet(pickedTabletId, srcBe,
destBe,
clusterId, balanceType, beToTablets);
if (moved) {
updateBalanceStatus(balanceType);
@@ -1892,7 +1936,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
}
}
- private long calculateTotalTablets(List<Long> bes, Map<Long, Set<Tablet>>
beToTablets) {
+ private long calculateTotalTablets(List<Long> bes, Map<Long, Set<Long>>
beToTablets) {
return bes.stream()
.mapToLong(be -> beToTablets.getOrDefault(be,
Collections.emptySet()).size())
.sum();
@@ -1943,9 +1987,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
// Choose non-active (cold) tablet first to re-balance, to reduce impact
on hot tablets.
// Fallback to active/random if no cold tablet is available.
- private Tablet pickTabletPreferCold(long srcBe, Set<Tablet> tablets,
Set<Long> activeTabletIds,
+ private Long pickTabletPreferCold(long srcBe, Set<Long> tabletIds,
Set<Long> activeTabletIds,
Set<Long> pickedTabletIds) {
- if (tablets == null || tablets.isEmpty()) {
+ if (tabletIds == null || tabletIds.isEmpty()) {
return null;
}
// Prefer cold tablets first (when active stats is available)
@@ -1953,70 +1997,70 @@ public class CloudTabletRebalancer extends MasterDaemon
{
boolean preferCold =
Config.enable_cloud_active_tablet_priority_scheduling && hasActiveStats;
if (preferCold) {
- Tablet cold = reservoirPick(tablets, pickedTabletIds,
activeTabletIds, true);
+ Long cold = reservoirPick(tabletIds, pickedTabletIds,
activeTabletIds, true);
if (cold != null) {
return cold;
}
}
- return reservoirPick(tablets, pickedTabletIds, activeTabletIds, false);
+ return reservoirPick(tabletIds, pickedTabletIds, activeTabletIds,
false);
}
// Reservoir sampling to pick one element uniformly at random from
candidates,
// without allocating intermediate collections.
- private Tablet reservoirPick(Set<Tablet> tablets, Set<Long>
pickedTabletIds,
+ private Long reservoirPick(Set<Long> tabletIds, Set<Long> pickedTabletIds,
Set<Long> activeTabletIds, boolean
requireCold) {
- Tablet chosen = null;
+ Long chosen = null;
int seen = 0;
- for (Tablet t : tablets) {
- if (pickedTabletIds.contains(t.getId())) {
+ for (Long tabletId : tabletIds) {
+ if (pickedTabletIds.contains(tabletId)) {
continue;
}
- if (requireCold && activeTabletIds != null &&
activeTabletIds.contains(t.getId())) {
+ if (requireCold && activeTabletIds != null &&
activeTabletIds.contains(tabletId)) {
continue;
}
seen++;
if (rand.nextInt(seen) == 0) {
- chosen = t;
+ chosen = tabletId;
}
}
return chosen;
}
- private boolean preheatAndUpdateTablet(Tablet pickedTablet, long srcBe,
long destBe, String clusterId,
- BalanceType balanceType, Map<Long,
Set<Tablet>> beToTablets) {
+ private boolean preheatAndUpdateTablet(long pickedTabletId, long srcBe,
long destBe, String clusterId,
+ BalanceType balanceType, Map<Long,
Set<Long>> beToTablets) {
Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
Backend destBackend = cloudSystemInfoService.getBackend(destBe);
if (srcBackend == null || destBackend == null) {
LOG.warn("backend missing when preheating tablet {} from {} to {},
cluster {}",
- pickedTablet.getId(), srcBe, destBe, clusterId);
+ pickedTabletId, srcBe, destBe, clusterId);
return false;
}
InfightTask task = new InfightTask();
- task.pickedTablet = pickedTablet;
+ task.pickedTabletId = pickedTabletId;
task.srcBe = srcBe;
task.destBe = destBe;
task.balanceType = balanceType;
task.startTimestamp = System.currentTimeMillis() / 1000;
- InfightTablet key = new InfightTablet(pickedTablet.getId(), clusterId);
+ InfightTablet key = new InfightTablet(pickedTabletId, clusterId);
tabletToInfightTask.put(key, task);
- updateBeToTablets(pickedTablet, srcBe, destBe,
+ updateBeToTablets(pickedTabletId, srcBe, destBe,
futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
- LOG.debug("pre cache {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe, clusterId);
- enqueueWarmupTask(new WarmupTabletTask(pickedTablet, srcBe, destBe,
clusterId));
+ LOG.debug("pre cache {} from {} to {}, cluster {}", pickedTabletId,
srcBe, destBe, clusterId);
+ enqueueWarmupTask(new WarmupTabletTask(pickedTabletId, srcBe, destBe,
clusterId));
return true;
}
- private boolean transferTablet(Tablet pickedTablet, long srcBe, long
destBe, String clusterId,
+ private boolean transferTablet(long pickedTabletId, long srcBe, long
destBe, String clusterId,
BalanceType balanceType,
List<UpdateCloudReplicaInfo> infos) {
LOG.debug("transfer {} from {} to {}, cluster {}, type {}",
- pickedTablet.getId(), srcBe, destBe, clusterId, balanceType);
- updateBeToTablets(pickedTablet, srcBe, destBe,
+ pickedTabletId, srcBe, destBe, clusterId, balanceType);
+ updateBeToTablets(pickedTabletId, srcBe, destBe,
beToTabletsGlobal, beToTabletsInTable, partitionToTablets);
- updateBeToTablets(pickedTablet, srcBe, destBe,
+ updateBeToTablets(pickedTabletId, srcBe, destBe,
futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
- updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
+ updateClusterToBeMap(pickedTabletId, destBe, clusterId, infos);
return true;
}
@@ -2028,49 +2072,57 @@ public class CloudTabletRebalancer extends MasterDaemon
{
* replica location info will be updated in both master and follower FEs.
*/
private void migrateTablets(Long srcBe, Long dstBe) {
- // get tablets
- Set<Tablet> tablets = beToTabletsGlobal.get(srcBe);
- if (tablets == null || tablets.isEmpty()) {
+ // get tabletIds
+ Set<Long> tabletIds = beToTabletsGlobal.get(srcBe);
+ if (tabletIds == null || tabletIds.isEmpty()) {
LOG.info("smooth upgrade srcBe={} does not have any tablets, set
inactive", srcBe);
((CloudEnv)
Env.getCurrentEnv()).getCloudUpgradeMgr().setBeStateInactive(srcBe);
return;
}
+ Backend be = cloudSystemInfoService.getBackend(srcBe);
+ if (be == null) {
+ LOG.info("src backend {} not found", srcBe);
+ return;
+ }
+ String clusterId = be.getCloudClusterId();
+ String clusterName = be.getCloudClusterName();
+
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
- for (Tablet tablet : tablets) {
- // get replica
- CloudReplica cloudReplica = ((CloudTablet)
tablet).getCloudReplica();
- Backend be = cloudSystemInfoService.getBackend(srcBe);
- if (be == null) {
- LOG.info("src backend {} not found", srcBe);
+ for (Long tabletId : tabletIds) {
+ TabletMeta tabletMeta =
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(tabletId);
+ if (tabletMeta == null) {
+ LOG.warn("tablet {} meta not found in inverted index, skip
migration", tabletId);
continue;
}
- // populate to followers
- Database db =
Env.getCurrentInternalCatalog().getDbNullable(cloudReplica.getDbId());
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId());
if (db == null) {
- long beId;
- try {
- beId = cloudReplica.getBackendId();
- } catch (ComputeGroupException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("get backend failed cloudReplica {}",
cloudReplica, e);
- }
- beId = -1;
- }
- LOG.error("get null db from replica, tabletId={},
partitionId={}, beId={}",
- cloudReplica.getTableId(),
cloudReplica.getPartitionId(), beId);
+ LOG.error("get null db from tablet meta, tabletId={},
dbId={}", tabletId, tabletMeta.getDbId());
continue;
}
- OlapTable table = (OlapTable)
db.getTableNullable(cloudReplica.getTableId());
+ OlapTable table = (OlapTable)
db.getTableNullable(tabletMeta.getTableId());
if (table == null) {
continue;
}
- String clusterId = be.getCloudClusterId();
- String clusterName = be.getCloudClusterName();
-
table.readLock();
try {
- if (db.getTableNullable(cloudReplica.getTableId()) == null) {
+ if (db.getTableNullable(tabletMeta.getTableId()) == null) {
+ continue;
+ }
+ Partition partition =
table.getPartition(tabletMeta.getPartitionId());
+ if (partition == null) {
+ continue;
+ }
+ MaterializedIndex index =
partition.getIndex(tabletMeta.getIndexId());
+ if (index == null) {
+ continue;
+ }
+ Tablet tablet = index.getTablet(tabletId);
+ if (tablet == null) {
+ continue;
+ }
+ CloudReplica cloudReplica = ((CloudTablet)
tablet).getCloudReplica();
+ if (cloudReplica == null) {
continue;
}
// update replica location info: primary -> new BE (dstBe)
@@ -2080,9 +2132,9 @@ public class CloudTabletRebalancer extends MasterDaemon {
// hashReplicaToBe would exclude both: old BE
(isSmoothUpgradeSrc) and new BE
// (not alive), causing COMPUTE_GROUPS_NO_ALIVE_BE.
cloudReplica.updateClusterToSecondaryBe(clusterId, srcBe);
- UpdateCloudReplicaInfo info = new
UpdateCloudReplicaInfo(cloudReplica.getDbId(),
- cloudReplica.getTableId(),
cloudReplica.getPartitionId(), cloudReplica.getIndexId(),
- tablet.getId(), cloudReplica.getId(), clusterId,
dstBe);
+ UpdateCloudReplicaInfo info = new
UpdateCloudReplicaInfo(tabletMeta.getDbId(),
+ tabletMeta.getTableId(), tabletMeta.getPartitionId(),
tabletMeta.getIndexId(),
+ tabletId, cloudReplica.getId(), clusterId, dstBe);
infos.add(info);
} finally {
table.readUnlock();
@@ -2090,7 +2142,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
if (LOG.isDebugEnabled()) {
LOG.debug("cloud be migrate tablet {} from srcBe={} to
dstBe={}, clusterId={}, clusterName={}",
- tablet.getId(), srcBe, dstBe, clusterId, clusterName);
+ tabletId, srcBe, dstBe, clusterId, clusterName);
}
}
long oldSize = infos.size();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
index d255a55887a..c9dcd1ef9c9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
@@ -17,7 +17,6 @@
package org.apache.doris.cloud.catalog;
-import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.metric.MetricRepo;
@@ -100,26 +99,21 @@ public class CloudTabletRebalancerTest {
TestRebalancer r = new TestRebalancer();
setField(r, "rand", new Random(1));
- Tablet hot = Mockito.mock(Tablet.class);
- Mockito.when(hot.getId()).thenReturn(100L);
- Tablet cold = Mockito.mock(Tablet.class);
- Mockito.when(cold.getId()).thenReturn(200L);
-
- Set<Tablet> tablets = new HashSet<>();
- tablets.add(hot);
- tablets.add(cold);
+ Set<Long> tabletIds = new HashSet<>();
+ tabletIds.add(100L); // hot
+ tabletIds.add(200L); // cold
Set<Long> activeIds = new HashSet<>();
activeIds.add(100L);
Set<Long> picked = new HashSet<>();
- Tablet pickedTablet = invokePrivate(r, "pickTabletPreferCold",
+ Long pickedTabletId = invokePrivate(r, "pickTabletPreferCold",
new Class<?>[] {long.class, Set.class, Set.class, Set.class},
- new Object[] {1L, tablets, activeIds, picked});
+ new Object[] {1L, tabletIds, activeIds, picked});
- Assertions.assertNotNull(pickedTablet);
- Assertions.assertEquals(200L, pickedTablet.getId(), "Should prefer
cold tablet when available");
+ Assertions.assertNotNull(pickedTabletId);
+ Assertions.assertEquals(200L, pickedTabletId.longValue(), "Should
prefer cold tablet when available");
}
@Test
@@ -127,21 +121,19 @@ public class CloudTabletRebalancerTest {
TestRebalancer r = new TestRebalancer();
setField(r, "rand", new Random(1));
- Tablet only = Mockito.mock(Tablet.class);
- Mockito.when(only.getId()).thenReturn(300L);
- Set<Tablet> tablets = new HashSet<>();
- tablets.add(only);
+ Set<Long> tabletIds = new HashSet<>();
+ tabletIds.add(300L);
// active stats unavailable -> activeIds empty or cache null
Set<Long> activeIds = new HashSet<>();
Set<Long> picked = new HashSet<>();
- Tablet pickedTablet = invokePrivate(r, "pickTabletPreferCold",
+ Long pickedTabletId = invokePrivate(r, "pickTabletPreferCold",
new Class<?>[] {long.class, Set.class, Set.class, Set.class},
- new Object[] {1L, tablets, activeIds, picked});
+ new Object[] {1L, tabletIds, activeIds, picked});
- Assertions.assertNotNull(pickedTablet);
- Assertions.assertEquals(300L, pickedTablet.getId());
+ Assertions.assertNotNull(pickedTabletId);
+ Assertions.assertEquals(300L, pickedTabletId.longValue());
}
@Test
@@ -169,10 +161,10 @@ public class CloudTabletRebalancerTest {
tableActive.put(20L, 100L); // should still lose because dbActive(2)=1
< dbActive(1)=5
setField(r, "tableIdToActiveCount", new
ConcurrentHashMap<>(tableActive));
- Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> cmp =
+ Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>> cmp =
invokePrivate(r, "tableEntryComparator", new Class<?>[] {},
new Object[] {});
- List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> list = new
ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>> list = new
ArrayList<>();
list.add(new AbstractMap.SimpleEntry<>(10L, new
ConcurrentHashMap<>()));
list.add(new AbstractMap.SimpleEntry<>(11L, new
ConcurrentHashMap<>()));
list.add(new AbstractMap.SimpleEntry<>(20L, new
ConcurrentHashMap<>()));
@@ -197,10 +189,10 @@ public class CloudTabletRebalancerTest {
setField(r, "dbIdToActiveCount", new ConcurrentHashMap<>());
setField(r, "tableIdToActiveCount", new ConcurrentHashMap<>());
- Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> cmp =
+ Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>> cmp =
invokePrivate(r, "tableEntryComparator", new Class<?>[] {},
new Object[] {});
- List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> list = new
ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Long>>>> list = new
ArrayList<>();
list.add(new AbstractMap.SimpleEntry<>(10L, new
ConcurrentHashMap<>()));
list.add(new AbstractMap.SimpleEntry<>(20L, new
ConcurrentHashMap<>()));
list.sort(cmp);
@@ -231,10 +223,10 @@ public class CloudTabletRebalancerTest {
setField(r, "partitionIdToActiveCount", new
ConcurrentHashMap<>(partActive));
@SuppressWarnings("unchecked")
- Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> cmp =
+ Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Long>>>>> cmp =
invokePrivate(r, "partitionEntryComparator", new Class<?>[]
{}, new Object[] {});
- List<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Tablet>>>>> list = new ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Long>>>>> list = new ArrayList<>();
list.add(new AbstractMap.SimpleEntry<>(100L, new
ConcurrentHashMap<>()));
list.add(new AbstractMap.SimpleEntry<>(200L, new
ConcurrentHashMap<>()));
list.add(new AbstractMap.SimpleEntry<>(201L, new
ConcurrentHashMap<>()));
@@ -282,5 +274,3 @@ public class CloudTabletRebalancerTest {
}
}
}
-
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]