This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0e23c2d25bb [improvement](balance) don't balance tablet which has
unfinish alter job (#39121)
0e23c2d25bb is described below
commit 0e23c2d25bb50f12e9930103e2c4918a31a3f60c
Author: yujun <[email protected]>
AuthorDate: Fri Aug 9 21:34:14 2024 +0800
[improvement](balance) don't balance tablet which has unfinish alter job
(#39121)
Improvement: don't balance tablets that have unfished alter job.
Also fix partition rebalancer may balance colocate tablets.
---
.../main/java/org/apache/doris/alter/Alter.java | 23 +++++++++++++++++
.../apache/doris/alter/SchemaChangeHandler.java | 4 +++
.../org/apache/doris/clone/BeLoadRebalancer.java | 23 +----------------
.../org/apache/doris/clone/DiskRebalancer.java | 14 ++--------
.../apache/doris/clone/PartitionRebalancer.java | 2 +-
.../java/org/apache/doris/clone/Rebalancer.java | 30 +++++++++++++++++++++-
.../org/apache/doris/clone/TabletScheduler.java | 4 +++
7 files changed, 64 insertions(+), 36 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 8e8be7c567e..1fcb4fe65c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -81,6 +81,7 @@ import org.apache.doris.thrift.TTabletType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -89,6 +90,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
public class Alter {
private static final Logger LOG = LogManager.getLogger(Alter.class);
@@ -905,6 +907,27 @@ public class Alter {
}
}
+ public Set<Long> getUnfinishedAlterTableIds() {
+ Set<Long> unfinishedTableIds = Sets.newHashSet();
+ for (AlterJobV2 job : schemaChangeHandler.getAlterJobsV2().values()) {
+ if (!job.isDone()) {
+ unfinishedTableIds.add(job.getTableId());
+ }
+ }
+ for (IndexChangeJob job : ((SchemaChangeHandler)
schemaChangeHandler).getIndexChangeJobs().values()) {
+ if (!job.isDone()) {
+ unfinishedTableIds.add(job.getTableId());
+ }
+ }
+ for (AlterJobV2 job :
materializedViewHandler.getAlterJobsV2().values()) {
+ if (!job.isDone()) {
+ unfinishedTableIds.add(job.getTableId());
+ }
+ }
+
+ return unfinishedTableIds;
+ }
+
public AlterHandler getSchemaChangeHandler() {
return schemaChangeHandler;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 9bcfa1cde04..1a4900a3fd3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1800,6 +1800,10 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
+ public Map<Long, IndexChangeJob> getIndexChangeJobs() {
+ return indexChangeJobs;
+ }
+
public List<List<Comparable>> getAllIndexChangeJobInfos() {
List<List<Comparable>> indexChangeJobInfos = new LinkedList<>();
for (IndexChangeJob indexChangeJob :
ImmutableList.copyOf(indexChangeJobs.values())) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 0da7428e422..78452000ca5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -17,9 +17,6 @@
package org.apache.doris.clone;
-import org.apache.doris.catalog.CatalogRecycleBin;
-import org.apache.doris.catalog.ColocateTableIndex;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
@@ -31,7 +28,6 @@ import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@@ -120,15 +116,7 @@ public class BeLoadRebalancer extends Rebalancer {
LOG.info("get number of low load paths: {}, with medium: {}",
numOfLowPaths, medium);
List<String> alternativeTabletInfos = Lists.newArrayList();
-
- // Clone ut mocked env, but CatalogRecycleBin is not mockable (it
extends from Thread)
- // so in clone ut recycleBin need to set to null.
- CatalogRecycleBin recycleBin = null;
- if (!FeConstants.runningUnitTest) {
- recycleBin = Env.getCurrentRecycleBin();
- }
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
- ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
List<Set<Long>> lowBETablets = lowBEs.stream()
.map(beStat ->
Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
.collect(Collectors.toList());
@@ -230,11 +218,7 @@ public class BeLoadRebalancer extends Rebalancer {
long replicaDataSize = replica.getDataSize();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta =
invertedIndex.getTabletMeta(tabletId);
- if (tabletMeta == null) {
- continue;
- }
-
- if
(colocateTableIndex.isColocateTable(tabletMeta.getTableId())) {
+ if (!canBalanceTablet(tabletMeta)) {
continue;
}
@@ -245,11 +229,6 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
- if (recycleBin != null &&
recycleBin.isRecyclePartition(tabletMeta.getDbId(),
- tabletMeta.getTableId(),
tabletMeta.getPartitionId())) {
- continue;
- }
-
boolean isFit = lowBEs.stream().anyMatch(be ->
be.isFit(replicaDataSize,
medium, null, false) == BalanceStatus.OK);
if (!isFit) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 96eef52d597..a8448b8ffd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -17,7 +17,6 @@
package org.apache.doris.clone;
-import org.apache.doris.catalog.CatalogRecycleBin;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
@@ -59,6 +58,7 @@ public class DiskRebalancer extends Rebalancer {
public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex,
Map<Long, PathSlot> backendsWorkingSlots) {
super(infoService, invertedIndex, backendsWorkingSlots);
+ canBalanceColocateTable = true;
}
public List<BackendLoadStatistic>
filterByPrioBackends(List<BackendLoadStatistic> bes) {
@@ -163,12 +163,6 @@ public class DiskRebalancer extends Rebalancer {
return alternativeTablets;
}
- // Clone ut mocked env, but CatalogRecycleBin is not mockable (it
extends from Thread)
- // so in clone ut recycleBin need to set to null.
- CatalogRecycleBin recycleBin = null;
- if (!FeConstants.runningUnitTest) {
- recycleBin = Env.getCurrentRecycleBin();
- }
Set<Long> alternativeTabletIds = Sets.newHashSet();
Set<Long> unbalancedBEs = Sets.newHashSet();
// choose tablets from backends randomly.
@@ -243,11 +237,7 @@ public class DiskRebalancer extends Rebalancer {
long replicaPathHash = replica.getPathHash();
if (remainingPaths.containsKey(replicaPathHash)) {
TabletMeta tabletMeta =
invertedIndex.getTabletMeta(tabletId);
- if (tabletMeta == null) {
- continue;
- }
- if (recycleBin != null &&
recycleBin.isRecyclePartition(tabletMeta.getDbId(),
- tabletMeta.getTableId(),
tabletMeta.getPartitionId())) {
+ if (!canBalanceTablet(tabletMeta)) {
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 7095ad8dc54..5af920c74fd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -138,7 +138,7 @@ public class PartitionRebalancer extends Rebalancer {
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));
BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId,
TabletMeta tabletMeta) -> {
- return tabletMeta != null
+ return canBalanceTablet(tabletMeta)
&& tabletMeta.getPartitionId() == move.partitionId
&& tabletMeta.getIndexId() == move.indexId
&& !invalidIds.contains(tabletId)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index 682c2915989..af8bc6d67fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -17,9 +17,14 @@
package org.apache.doris.clone;
+import org.apache.doris.catalog.CatalogRecycleBin;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@@ -29,13 +34,14 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
-
+import java.util.Set;
/*
* Rebalancer is responsible for
@@ -61,6 +67,9 @@ public abstract class Rebalancer {
// be id -> end time of prio
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
+ protected boolean canBalanceColocateTable = false;
+ private Set<Long> alterTableIds = Sets.newHashSet();
+
// tag -> (medium, timestamp)
private Table<Tag, TStorageMedium, Long> lastPickTimeTable =
HashBasedTable.create();
@@ -106,6 +115,21 @@ public abstract class Rebalancer {
return lastPickTime == null || now - lastPickTime >=
Config.be_rebalancer_idle_seconds * 1000L;
}
+ protected boolean canBalanceTablet(TabletMeta tabletMeta) {
+ // Clone ut mocked env, but CatalogRecycleBin is not mockable (it
extends from Thread)
+ // so in clone ut recycleBin need to set to null.
+ ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
+ CatalogRecycleBin recycleBin = null;
+ if (!FeConstants.runningUnitTest) {
+ recycleBin = Env.getCurrentRecycleBin();
+ }
+ return tabletMeta != null
+ && !alterTableIds.contains(tabletMeta.getTableId())
+ && (canBalanceColocateTable ||
!colocateTableIndex.isColocateTable(tabletMeta.getTableId()))
+ && (recycleBin == null ||
!recycleBin.isRecyclePartition(tabletMeta.getDbId(),
+ tabletMeta.getTableId(), tabletMeta.getPartitionId()));
+ }
+
public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
throws SchedException {
completeSchedCtx(tabletCtx);
@@ -139,6 +163,10 @@ public abstract class Rebalancer {
this.statisticMap = statisticMap;
}
+ public void updateAlterTableIds(Set<Long> alterTableIds) {
+ this.alterTableIds = alterTableIds;
+ }
+
public void addPrioBackends(List<Backend> backends, long timeoutS) {
long currentTimeMillis = System.currentTimeMillis();
for (Backend backend : backends) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 7c58d6acc53..a83308a650b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -358,6 +358,10 @@ public class TabletScheduler extends MasterDaemon {
rebalancer.updateLoadStatistic(statisticMap);
diskRebalancer.updateLoadStatistic(statisticMap);
+ Set<Long> alterTableIds =
Env.getCurrentEnv().getAlterInstance().getUnfinishedAlterTableIds();
+ rebalancer.updateAlterTableIds(alterTableIds);
+ diskRebalancer.updateAlterTableIds(alterTableIds);
+
lastStatUpdateTime = System.currentTimeMillis();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]