This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new beec0e9169 [Improvement](tablet clone) impr tablet sched speed and fix
tablet sched failed too many times (#21856)
beec0e9169 is described below
commit beec0e9169ece675bbaa17a83fda5bbacc7350d8
Author: yujun <[email protected]>
AuthorDate: Tue Jul 18 23:25:22 2023 +0800
[Improvement](tablet clone) impr tablet sched speed and fix tablet sched
failed too many times (#21856)
---
docker/runtime/doris-compose/database.py | 10 +-
.../main/java/org/apache/doris/common/Config.java | 16 +-
.../main/java/org/apache/doris/catalog/Tablet.java | 15 +-
.../org/apache/doris/clone/BeLoadRebalancer.java | 50 ++-
.../clone/ColocateTableCheckerAndBalancer.java | 5 +-
.../org/apache/doris/clone/DiskRebalancer.java | 4 +-
.../apache/doris/clone/PartitionRebalancer.java | 6 +-
.../org/apache/doris/clone/SchedException.java | 16 +
.../java/org/apache/doris/clone/TabletChecker.java | 6 +-
.../org/apache/doris/clone/TabletSchedCtx.java | 245 +++++------
.../org/apache/doris/clone/TabletScheduler.java | 461 +++++++++++++--------
.../common/proc/TabletSchedulerDetailProcDir.java | 10 +-
.../org/apache/doris/clone/TabletSchedCtxTest.java | 12 +-
.../doris/cluster/DecommissionBackendTest.java | 2 +-
.../apache/doris/utframe/TestWithFeService.java | 5 +-
15 files changed, 491 insertions(+), 372 deletions(-)
diff --git a/docker/runtime/doris-compose/database.py
b/docker/runtime/doris-compose/database.py
index 21aa400a47..57fc90275a 100644
--- a/docker/runtime/doris-compose/database.py
+++ b/docker/runtime/doris-compose/database.py
@@ -102,6 +102,7 @@ class DBManager(object):
def decommission_be(self, be_endpoint):
old_tablet_num = 0
id = CLUSTER.Node.get_id_from_ip(be_endpoint[:be_endpoint.find(":")])
+ start_ts = time.time()
if id not in self.be_states:
self._load_be_states()
if id in self.be_states:
@@ -132,8 +133,9 @@ class DBManager(object):
return
LOG.info(
"Decommission be {} status: alive {}, decommissioned {}. "
\
- "It is migrating its tablets, left {}/{} tablets."
- .format(be_endpoint, be.alive, be.decommissioned,
be.tablet_num, old_tablet_num))
+ "It is migrating its tablets, left {}/{} tablets. Time
elapse {} s."
+ .format(be_endpoint, be.alive, be.decommissioned,
be.tablet_num, old_tablet_num,
+ int(time.time() - start_ts)))
time.sleep(5)
@@ -189,7 +191,7 @@ class DBManager(object):
def _reset_conn(self):
self.conn = pymysql.connect(user="root",
host="127.0.0.1",
- read_timeout = 10,
+ read_timeout=10,
port=self.query_port)
@@ -234,6 +236,6 @@ def get_db_mgr(cluster_name, required_load_succ=True):
except Exception as e:
if required_load_succ:
raise e
- LOG.exception(e)
+ #LOG.exception(e)
return db_mgr
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index d614d7f9bc..29df2c3740 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -904,7 +904,21 @@ public class Config extends ConfigBase {
* the default slot number per path in tablet scheduler
* TODO(cmy): remove this config and dynamically adjust it by clone task
statistic
*/
- @ConfField public static int schedule_slot_num_per_path = 2;
+ @ConfField(mutable = true, masterOnly = true)
+ public static int schedule_slot_num_per_path = 4;
+
+ /**
+ * the default slot number per path in tablet scheduler for decommission
backend
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int schedule_decommission_slot_num_per_path = 8;
+
+ /**
+ * the default batch size in tablet scheduler for a single schedule.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int schedule_batch_size = 50;
+
/**
* Deprecated after 0.10
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 74f1c31cbf..0e93a4dd0f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -703,11 +703,24 @@ public class Tablet extends MetaObject implements
Writable {
* NORMAL: delay Config.tablet_repair_delay_factor_second * 2;
* LOW: delay Config.tablet_repair_delay_factor_second * 3;
*/
- public boolean readyToBeRepaired(TabletSchedCtx.Priority priority) {
+ public boolean readyToBeRepaired(SystemInfoService infoService,
TabletSchedCtx.Priority priority) {
if (priority == Priority.VERY_HIGH) {
return true;
}
+ boolean allBeAliveOrDecommissioned = true;
+ for (Replica replica : replicas) {
+ Backend backend = infoService.getBackend(replica.getBackendId());
+ if (backend == null || (!backend.isAlive() &&
!backend.isDecommissioned())) {
+ allBeAliveOrDecommissioned = false;
+ break;
+ }
+ }
+
+ if (allBeAliveOrDecommissioned) {
+ return true;
+ }
+
long currentTime = System.currentTimeMillis();
// first check, wait for next round
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index ebbebe6806..5317725881 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
@@ -166,7 +167,7 @@ public class BeLoadRebalancer extends Rebalancer {
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// balance task's priority is always LOW
- tabletCtx.setOrigPriority(Priority.LOW);
+ tabletCtx.setPriority(Priority.LOW);
alternativeTablets.add(tabletCtx);
if (--numOfLowPaths <= 0) {
@@ -262,7 +263,7 @@ public class BeLoadRebalancer extends Rebalancer {
}
// Select a low load backend as destination.
- boolean setDest = false;
+ List<BackendLoadStatistic> candidates = Lists.newArrayList();
for (BackendLoadStatistic beStat : lowBe) {
if (beStat.isAvailable() && replicas.stream().noneMatch(r ->
r.getBackendId() == beStat.getBeId())) {
// check if on same host.
@@ -296,27 +297,36 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
- // classify the paths.
- // And we only select path from 'low' and 'mid' paths
- Set<Long> pathLow = Sets.newHashSet();
- Set<Long> pathMid = Sets.newHashSet();
- Set<Long> pathHigh = Sets.newHashSet();
- beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh,
tabletCtx.getStorageMedium());
- pathLow.addAll(pathMid);
-
- long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
- if (pathHash == -1) {
- LOG.debug("paths has no available balance slot: {}",
pathLow);
- } else {
- tabletCtx.setDest(beStat.getBeId(), pathHash);
- setDest = true;
- break;
- }
+ candidates.add(beStat);
}
}
- if (!setDest) {
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to find
low backend");
+ if (candidates.isEmpty()) {
+ throw new SchedException(Status.UNRECOVERABLE, "unable to find low
backend");
}
+
+ for (BackendLoadStatistic beStat : candidates) {
+ PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
+ if (slot == null) {
+ continue;
+ }
+
+ // classify the paths.
+ // And we only select path from 'low' and 'mid' paths
+ Set<Long> pathLow = Sets.newHashSet();
+ Set<Long> pathMid = Sets.newHashSet();
+ Set<Long> pathHigh = Sets.newHashSet();
+ beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh,
tabletCtx.getStorageMedium());
+ pathLow.addAll(pathMid);
+
+ long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
+ if (pathHash != -1) {
+ tabletCtx.setDest(beStat.getBeId(), pathHash);
+ return;
+ }
+ }
+
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+ "unable to find low backend");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index b3fdc3ca71..b34bc926dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -202,6 +202,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
*/
private void matchGroup() {
Env env = Env.getCurrentEnv();
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
TabletScheduler tabletScheduler = env.getTabletScheduler();
@@ -254,7 +255,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
+ " status: %s", tablet.getId(),
st);
LOG.debug(unstableReason);
- if
(!tablet.readyToBeRepaired(Priority.NORMAL)) {
+ if (!tablet.readyToBeRepaired(infoService,
Priority.NORMAL)) {
continue;
}
@@ -265,7 +266,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
System.currentTimeMillis());
// the tablet status will be set again
when being scheduled
tabletCtx.setTabletStatus(st);
- tabletCtx.setOrigPriority(Priority.NORMAL);
+ tabletCtx.setPriority(Priority.NORMAL);
tabletCtx.setTabletOrderIdx(idx);
AddResult res =
tabletScheduler.addTablet(tabletCtx, false /* not force */);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 9d676d950c..abac0c2d1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -208,10 +208,10 @@ public class DiskRebalancer extends Rebalancer {
tabletCtx.setTag(clusterStat.getTag());
if (prioBackends.containsKey(beStat.getBeId())) {
// priority of balance task of prio BE is NORMAL
- tabletCtx.setOrigPriority(Priority.NORMAL);
+ tabletCtx.setPriority(Priority.NORMAL);
} else {
// balance task's default priority is LOW
- tabletCtx.setOrigPriority(Priority.LOW);
+ tabletCtx.setPriority(Priority.LOW);
}
// we must set balanceType to DISK_BALANCE for create
migration task
tabletCtx.setBalanceType(BalanceType.DISK_BALANCE);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 9e3e37ef00..d9d3f27cc7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -151,7 +151,7 @@ public class PartitionRebalancer extends Rebalancer {
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// Balance task's priority is always LOW
- tabletCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
+ tabletCtx.setPriority(TabletSchedCtx.Priority.LOW);
alternativeTablets.add(tabletCtx);
// Pair<Move, ToDeleteReplicaId>, ToDeleteReplicaId should be -1L
before scheduled successfully
movesInProgress.get().put(pickedTabletId,
@@ -251,7 +251,7 @@ public class PartitionRebalancer extends Rebalancer {
if (slot.takeBalanceSlot(srcReplica.getPathHash()) != -1) {
tabletCtx.setSrc(srcReplica);
} else {
- throw new SchedException(SchedException.Status.SCHEDULE_FAILED,
+ throw new
SchedException(SchedException.Status.SCHEDULE_FAILED,
SchedException.SubCode.WAITING_SLOT,
"no slot for src replica " + srcReplica + ", pathHash
" + srcReplica.getPathHash());
}
@@ -269,7 +269,7 @@ public class PartitionRebalancer extends Rebalancer {
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
if (pathHash == -1) {
- throw new SchedException(SchedException.Status.SCHEDULE_FAILED,
+ throw new
SchedException(SchedException.Status.SCHEDULE_FAILED,
SchedException.SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
} else {
tabletCtx.setDest(beStat.getBeId(), pathHash);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
index 0ad83e8909..a343e6543c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/SchedException.java
@@ -27,14 +27,30 @@ public class SchedException extends Exception {
FINISHED // schedule is done, remove the tablet from tablet scheduler
with status FINISHED
}
+ public enum SubCode {
+ NONE,
+ WAITING_DECOMMISSION,
+ WAITING_SLOT,
+ }
+
private Status status;
+ private SubCode subCode;
public SchedException(Status status, String errorMsg) {
+ this(status, SubCode.NONE, errorMsg);
+ }
+
+ public SchedException(Status status, SubCode subCode, String errorMsg) {
super(errorMsg);
this.status = status;
+ this.subCode = subCode;
}
public Status getStatus() {
return status;
}
+
+ public SubCode getSubCode() {
+ return subCode;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index 67226eae67..d9330e0f16 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -372,9 +372,7 @@ public class TabletChecker extends MasterDaemon {
}
counter.unhealthyTabletNum++;
-
- if (!tablet.readyToBeRepaired(statusWithPrio.second)) {
- counter.tabletNotReady++;
+ if (!tablet.readyToBeRepaired(infoService,
statusWithPrio.second)) {
continue;
}
@@ -386,7 +384,7 @@ public class TabletChecker extends MasterDaemon {
System.currentTimeMillis());
// the tablet status will be set again when being scheduled
tabletCtx.setTabletStatus(statusWithPrio.first);
- tabletCtx.setOrigPriority(statusWithPrio.second);
+ tabletCtx.setPriority(statusWithPrio.second);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /*
not force */);
if (res == AddResult.LIMIT_EXCEED || res ==
AddResult.DISABLED) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 58c411c40f..286b70f65e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
@@ -102,6 +103,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
*/
private static final int RUNNING_FAILED_COUNTER_THRESHOLD = 3;
+ public static final int FINISHED_COUNTER_THRESHOLD = 3;
+
private static VersionCountComparator VERSION_COUNTER_COMPARATOR = new
VersionCountComparator();
public enum Type {
@@ -117,22 +120,6 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
NORMAL,
HIGH,
VERY_HIGH;
-
- // VERY_HIGH can only be downgraded to NORMAL
- // LOW can only be upgraded to HIGH
- public Priority adjust(Priority origPriority, boolean isUp) {
- switch (this) {
- case VERY_HIGH:
- return isUp ? VERY_HIGH : HIGH;
- case HIGH:
- return isUp ? (origPriority == LOW ? HIGH : VERY_HIGH) :
NORMAL;
- case NORMAL:
- return isUp ? HIGH : (origPriority == Priority.VERY_HIGH ?
NORMAL : LOW);
- default:
- return isUp ? NORMAL : LOW;
- }
- }
-
}
public enum State {
@@ -147,23 +134,19 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private Type type;
private BalanceType balanceType;
- /*
- * origPriority is the origin priority being set when this tablet being
added to scheduler.
- * dynamicPriority will be set during tablet schedule processing, it will
not be prior than origin priority.
- * And dynamic priority is also used in priority queue compare in tablet
scheduler.
- */
- private Priority origPriority;
- private Priority dynamicPriority;
+ private Priority priority;
// we change the dynamic priority based on how many times it fails to be
scheduled
private int failedSchedCounter = 0;
// clone task failed counter
private int failedRunningCounter = 0;
+ // When finish a tablet ctx, it will check the tablet's health status.
+ // If the tablet is unhealthy, it will add a new ctx.
+ // The new ctx's finishedCounter = old ctx's finishedCounter + 1.
+ private int finishedCounter = 0;
// last time this tablet being scheduled
private long lastSchedTime = 0;
- // last time the dynamic priority being adjusted
- private long lastAdjustPrioTime = 0;
// last time this tablet being visited.
// being visited means:
@@ -180,6 +163,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private State state;
private TabletStatus tabletStatus;
+ private long decommissionTime = -1;
+
private long dbId;
private long tblId;
private long partitionId;
@@ -223,6 +208,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// tag is only set for BALANCE task, used to identify which workload group
this Balance job is in
private Tag tag;
+ private SubCode schedFailedCode;
+
public TabletSchedCtx(Type type, long dbId, long tblId, long partId,
long idxId, long tabletId, ReplicaAllocation replicaAlloc, long
createTime) {
this.type = type;
@@ -236,12 +223,17 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
this.state = State.PENDING;
this.replicaAlloc = replicaAlloc;
this.balanceType = BalanceType.BE_BALANCE;
+ this.schedFailedCode = SubCode.NONE;
}
public ReplicaAllocation getReplicaAlloc() {
return replicaAlloc;
}
+ public void setReplicaAlloc(ReplicaAllocation replicaAlloc) {
+ this.replicaAlloc = replicaAlloc;
+ }
+
public void setTag(Tag tag) {
this.tag = tag;
}
@@ -266,21 +258,20 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
return balanceType;
}
- public Priority getOrigPriority() {
- return origPriority;
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public void setPriority(Priority priority) {
+ this.priority = priority;
}
- public void setOrigPriority(Priority origPriority) {
- this.origPriority = origPriority;
- // reset dynamic priority along with the origin priority being set.
- this.dynamicPriority = origPriority;
- this.failedSchedCounter = 0;
- this.lastSchedTime = 0;
- this.lastAdjustPrioTime = 0;
+ public int getFinishedCounter() {
+ return finishedCounter;
}
- public Priority getDynamicPriority() {
- return dynamicPriority;
+ public void setFinishedCounter(int finishedCounter) {
+ this.finishedCounter = finishedCounter;
}
public void increaseFailedSchedCounter() {
@@ -295,8 +286,27 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
++failedRunningCounter;
}
- public int getFailedRunningCounter() {
- return failedRunningCounter;
+ public boolean isExceedFailedRunningLimit() {
+ return failedRunningCounter >= RUNNING_FAILED_COUNTER_THRESHOLD;
+ }
+
+ public boolean onSchedFailedAndCheckExceedLimit(SubCode code) {
+ schedFailedCode = code;
+ failedSchedCounter++;
+ if (code == SubCode.WAITING_DECOMMISSION) {
+ failedSchedCounter = 0;
+ if (decommissionTime < 0) {
+ decommissionTime = System.currentTimeMillis();
+ }
+ return System.currentTimeMillis() > decommissionTime + 10 * 60 *
1000L;
+ } else {
+ decommissionTime = -1;
+ if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) {
+ return failedSchedCounter > 30 * 1000 /
TabletScheduler.SCHEDULE_INTERVAL_MS;
+ } else {
+ return failedSchedCounter > 10;
+ }
+ }
}
public void setLastSchedTime(long lastSchedTime) {
@@ -311,6 +321,10 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
this.finishedTime = finishedTime;
}
+ public void setDecommissionTime(long decommissionTime) {
+ this.decommissionTime = decommissionTime;
+ }
+
public State getState() {
return state;
}
@@ -615,7 +629,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
setSrc(srcReplica);
return;
}
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to find
source slot");
+ throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
+ "unable to find source slot");
}
/*
@@ -641,7 +656,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
*/
public void chooseDestReplicaForVersionIncomplete(Map<Long, PathSlot>
backendsWorkingSlots)
throws SchedException {
- Replica chosenReplica = null;
+ List<Replica> candidates = Lists.newArrayList();
for (Replica replica : tablet.getReplicas()) {
if (replica.isBad()) {
LOG.debug("replica {} is bad, skip. tablet: {}",
@@ -660,18 +675,37 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// if the replica's state is DECOMMISSION, it may be chose as dest
replica,
// and its state will be set to NORMAL later.
if (replica.getLastFailedVersion() <= 0
- && ((replica.getVersion() == visibleVersion)
- || replica.getVersion() > visibleVersion) &&
replica.getState() != ReplicaState.DECOMMISSION) {
+ && replica.getVersion() >= visibleVersion
+ && replica.getState() != ReplicaState.DECOMMISSION) {
// skip healthy replica
LOG.debug("replica {} version {} is healthy, visible version
{}, replica state {}, skip. tablet: {}",
replica.getId(), replica.getVersion(), visibleVersion,
replica.getState(), tabletId);
continue;
}
+ candidates.add(replica);
+ }
+
+ if (candidates.isEmpty()) {
+ throw new SchedException(Status.UNRECOVERABLE, "unable to choose
dest replica");
+ }
+
+ Replica chosenReplica = null;
+ for (Replica replica : candidates) {
+ PathSlot slot = backendsWorkingSlots.get(replica.getBackendId());
+ if (slot == null || !slot.hasAvailableSlot(replica.getPathHash()))
{
+ if (!replica.needFurtherRepair()) {
+ throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_SLOT,
+ "replica " + replica + " has not slot");
+ }
+
+ continue;
+ }
+
if (replica.needFurtherRepair()) {
+ chosenReplica = replica;
LOG.debug("replica {} need further repair, choose it. tablet:
{}",
replica.getId(), tabletId);
- chosenReplica = replica;
break;
}
@@ -686,20 +720,19 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
}
- if (chosenReplica == null) {
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to choose
dest replica");
- }
-
// check if the dest replica has available slot
+ // it should not happen cause it just check hasAvailableSlot yet.
PathSlot slot = backendsWorkingSlots.get(chosenReplica.getBackendId());
if (slot == null) {
- throw new SchedException(Status.SCHEDULE_FAILED, "backend of dest
replica is missing");
+ throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_SLOT,
+ "backend of dest replica is missing");
}
-
long destPathHash = slot.takeSlot(chosenReplica.getPathHash());
if (destPathHash == -1) {
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to take
slot of dest path");
+ throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_SLOT,
+ "unable to take slot of dest path");
}
+
if (chosenReplica.getState() == ReplicaState.DECOMMISSION) {
// Since this replica is selected as the repair object of
VERSION_INCOMPLETE,
// it means that this replica needs to be able to accept loading
data.
@@ -717,6 +750,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// forever, because the replica in the DECOMMISSION state will not
receive the load task.
chosenReplica.setWatermarkTxnId(-1);
chosenReplica.setState(ReplicaState.NORMAL);
+ setDecommissionTime(-1);
LOG.info("choose replica {} on backend {} of tablet {} as dest
replica for version incomplete,"
+ " and change state from DECOMMISSION to NORMAL",
chosenReplica.getId(), chosenReplica.getBackendId(),
tabletId);
@@ -941,7 +975,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
cloneTask.getDbId(), cloneTask.getTableId(),
cloneTask.getPartitionId(),
cloneTask.getIndexId(), cloneTask.getTabletId(),
cloneTask.getBackendId(),
dbId, tblId, partitionId, indexId, tablet.getId(),
destBackendId);
- throw new SchedException(Status.RUNNING_FAILED, msg);
+ throw new SchedException(Status.UNRECOVERABLE, msg);
}
// 1. check the tablet status first
@@ -1041,13 +1075,6 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
state = State.FINISHED;
LOG.info("clone finished: {}", this);
- } catch (SchedException e) {
- // if failed to too many times, remove this task
- ++failedRunningCounter;
- if (failedRunningCounter > RUNNING_FAILED_COUNTER_THRESHOLD) {
- throw new SchedException(Status.UNRECOVERABLE, e.getMessage());
- }
- throw e;
} finally {
olapTable.writeUnlock();
}
@@ -1061,73 +1088,6 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
}
- /*
- * we try to adjust the priority based on schedule history
- * 1. If failed counter is larger than FAILED_COUNTER_THRESHOLD, which
means this tablet is being scheduled
- * at least FAILED_TIME_THRESHOLD times and all are failed. So we
downgrade its priority.
- * Also reset the failedCounter, or it will be downgraded forever.
- *
- * 2. Else, if it has been a long time since last time the tablet being
scheduled, we upgrade its
- * priority to let it more available to be scheduled.
- *
- * The time gap between adjustment should be larger than
MIN_ADJUST_PRIORITY_INTERVAL_MS, to avoid
- * being downgraded too fast.
- *
- * eg:
- * A tablet has been scheduled for 5 times and all were failed. its
priority will be downgraded. And if it is
- * scheduled for 5 times and all are failed again, it will be
downgraded again, until to the LOW.
- * And than, because of LOW, this tablet can not be scheduled for a
long time, and it will be upgraded
- * to NORMAL, if still not being scheduled, it will be upgraded up to
VERY_HIGH.
- *
- * return true if dynamic priority changed
- */
- public boolean adjustPriority(TabletSchedulerStat stat) {
- long currentTime = System.currentTimeMillis();
- if (lastAdjustPrioTime == 0) {
- // skip the first time we adjust this priority
- lastAdjustPrioTime = currentTime;
- return false;
- } else {
- if (currentTime - lastAdjustPrioTime <
MIN_ADJUST_PRIORITY_INTERVAL_MS) {
- return false;
- }
- }
-
- boolean isDowngrade = false;
- boolean isUpgrade = false;
-
- if (failedSchedCounter > SCHED_FAILED_COUNTER_THRESHOLD) {
- isDowngrade = true;
- } else {
- long lastTime = lastSchedTime == 0 ? createTime : lastSchedTime;
- if (currentTime - lastTime > MAX_NOT_BEING_SCHEDULED_INTERVAL_MS) {
- isUpgrade = true;
- }
- }
-
- Priority originDynamicPriority = dynamicPriority;
- if (isDowngrade) {
- dynamicPriority = dynamicPriority.adjust(origPriority, false /*
downgrade */);
- failedSchedCounter = 0;
- if (originDynamicPriority != dynamicPriority) {
- LOG.debug("downgrade dynamic priority from {} to {}, origin:
{}, tablet: {}",
- originDynamicPriority.name(), dynamicPriority.name(),
origPriority.name(), tabletId);
- stat.counterTabletPrioDowngraded.incrementAndGet();
- return true;
- }
- } else if (isUpgrade) {
- dynamicPriority = dynamicPriority.adjust(origPriority, true /*
upgrade */);
- // no need to set lastSchedTime, lastSchedTime is set each time we
schedule this tablet
- if (originDynamicPriority != dynamicPriority) {
- LOG.debug("upgrade dynamic priority from {} to {}, origin: {},
tablet: {}",
- originDynamicPriority.name(), dynamicPriority.name(),
origPriority.name(), tabletId);
- stat.counterTabletPrioUpgraded.incrementAndGet();
- return true;
- }
- }
- return false;
- }
-
public boolean isTimeout() {
if (state != TabletSchedCtx.State.RUNNING) {
return false;
@@ -1144,8 +1104,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
result.add(storageMedium == null ? FeConstants.null_string :
storageMedium.name());
result.add(tabletStatus == null ? FeConstants.null_string :
tabletStatus.name());
result.add(state.name());
- result.add(origPriority.name());
- result.add(dynamicPriority.name());
+ result.add(schedFailedCode.name());
+ result.add(priority.name());
result.add(srcReplica == null ? "-1" :
String.valueOf(srcReplica.getBackendId()));
result.add(String.valueOf(srcPathHash));
result.add(String.valueOf(destBackendId));
@@ -1158,7 +1118,6 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs /
1000.0) : FeConstants.null_string);
result.add(String.valueOf(failedSchedCounter));
result.add(String.valueOf(failedRunningCounter));
- result.add(TimeUtils.longToTimeString(lastAdjustPrioTime));
result.add(String.valueOf(visibleVersion));
result.add(String.valueOf(committedVersion));
result.add(Strings.nullToEmpty(errMsg));
@@ -1171,19 +1130,23 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
*/
@Override
public int compareTo(TabletSchedCtx o) {
- if (dynamicPriority.ordinal() < o.dynamicPriority.ordinal()) {
- return 1;
- } else if (dynamicPriority.ordinal() > o.dynamicPriority.ordinal()) {
- return -1;
- } else {
- if (lastVisitedTime < o.lastVisitedTime) {
- return -1;
- } else if (lastVisitedTime > o.lastVisitedTime) {
- return 1;
- } else {
- return 0;
- }
+ return Long.compare(getCompareValue(), o.getCompareValue());
+ }
+
+ private long getCompareValue() {
+ long value = createTime;
+ if (lastVisitedTime > 0) {
+ value = lastVisitedTime;
}
+
+ value += (Priority.VERY_HIGH.ordinal() - priority.ordinal() + 1) * 60
* 1000L;
+ value += 5000L * (failedSchedCounter / 10);
+
+ if (type == Type.BALANCE) {
+ value += 30 * 60 * 1000L;
+ }
+
+ return value;
}
@Override
@@ -1227,6 +1190,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
* call this when releaseTabletCtx()
*/
public void resetReplicaState() {
+ setDecommissionTime(-1);
if (tablet != null) {
for (Replica replica : tablet.getReplicas()) {
// To address issue:
https://github.com/apache/doris/issues/9422
@@ -1243,5 +1207,4 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
}
}
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index dbf3d46237..aaaac36a24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -32,10 +32,12 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletSchedCtx.Type;
import org.apache.doris.common.AnalysisException;
@@ -59,6 +61,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.ImmutableMap;
@@ -100,21 +103,21 @@ public class TabletScheduler extends MasterDaemon {
// the minimum interval of updating cluster statistics and priority of
tablet info
private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s
- private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s
+ public static final long SCHEDULE_INTERVAL_MS = 100;
/*
- * Tablet is added to pendingTablets as well it's id in allTabletIds.
- * TabletScheduler will take tablet from pendingTablets but will not
remove it's id from allTabletIds when
+ * Tablet is added to pendingTablets as well it's id in allTabletTypes.
+ * TabletScheduler will take tablet from pendingTablets but will not
remove it's id from allTabletTypes when
* handling a tablet.
* Tablet' id can only be removed after the clone task or migration task
is done(timeout, cancelled or finished).
- * So if a tablet's id is still in allTabletIds, TabletChecker can not add
tablet to TabletScheduler.
+ * So if a tablet's id is still in allTabletTypes, TabletChecker can not
add tablet to TabletScheduler.
*
- * pendingTablets + runningTablets = allTabletIds
+ * pendingTablets + runningTablets = allTabletTypes
*
- * pendingTablets, allTabletIds, runningTablets and schedHistory are
protected by 'synchronized'
+ * pendingTablets, allTabletTypes, runningTablets and schedHistory are
protected by 'synchronized'
*/
private PriorityQueue<TabletSchedCtx> pendingTablets = new
PriorityQueue<>();
- private Set<Long> allTabletIds = Sets.newHashSet();
+ private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap();
// contains all tabletCtxs which state are RUNNING
private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
// save the latest 1000 scheduled tablet info
@@ -128,6 +131,8 @@ public class TabletScheduler extends MasterDaemon {
private long lastSlotAdjustTime = 0;
+ private long lastCheckTimeoutTime = 0;
+
private Env env;
private SystemInfoService infoService;
private TabletInvertedIndex invertedIndex;
@@ -175,7 +180,8 @@ public class TabletScheduler extends MasterDaemon {
// when upgrading, backend may not get path info yet. so
return false and wait for next round.
// and we should check if backend is alive. If backend is dead
when upgrading, this backend
// will never report its path hash, and tablet scheduler is
blocked.
- LOG.info("not all backends have path info");
+ LOG.info("backend {}:{} with id {} doesn't have path info.",
backend.getHost(),
+ backend.getBePort(), backend.getId());
return false;
}
}
@@ -204,7 +210,7 @@ public class TabletScheduler extends MasterDaemon {
if (!backendsWorkingSlots.containsKey(be.getId())) {
List<Long> pathHashes = be.getDisks().values().stream()
.map(DiskInfo::getPathHash).collect(Collectors.toList());
- PathSlot slot = new PathSlot(pathHashes,
Config.schedule_slot_num_per_path);
+ PathSlot slot = new PathSlot(pathHashes, be.getId());
backendsWorkingSlots.put(be.getId(), slot);
LOG.info("add new backend {} with slots num: {}", be.getId(),
be.getDisks().size());
}
@@ -225,7 +231,18 @@ public class TabletScheduler extends MasterDaemon {
if (!force && Config.disable_tablet_scheduler) {
return AddResult.DISABLED;
}
- if (!force && containsTablet(tablet.getTabletId())) {
+
+ // REPAIR has higher priority than BALANCE.
+ // Suppose adding a BALANCE tablet successfully, then adding this
tablet's REPAIR ctx will fail.
+ // But we set allTabletTypes[tabletId] to REPAIR. Later at the
beginning of scheduling this tablet,
+ // it will reset its type as allTabletTypes[tabletId], so its type
will convert to REPAIR.
+
+ long tabletId = tablet.getTabletId();
+ boolean contains = allTabletTypes.containsKey(tabletId);
+ if (contains && !force) {
+ if (tablet.getType() == TabletSchedCtx.Type.REPAIR) {
+ allTabletTypes.put(tabletId, TabletSchedCtx.Type.REPAIR);
+ }
return AddResult.ALREADY_IN;
}
@@ -238,13 +255,22 @@ public class TabletScheduler extends MasterDaemon {
return AddResult.LIMIT_EXCEED;
}
- allTabletIds.add(tablet.getTabletId());
+ if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
+ allTabletTypes.put(tabletId, tablet.getType());
+ }
+
pendingTablets.offer(tablet);
+ if (!contains) {
+ LOG.info("Add tablet to pending queue, tablet id {}, type {},
status {}, priority {}",
+ tablet.getTabletId(), tablet.getType(),
tablet.getTabletStatus(),
+ tablet.getPriority());
+ }
+
return AddResult.ADDED;
}
public synchronized boolean containsTablet(long tabletId) {
- return allTabletIds.contains(tabletId);
+ return allTabletTypes.containsKey(tabletId);
}
public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
@@ -263,7 +289,7 @@ public class TabletScheduler extends MasterDaemon {
for (TabletSchedCtx tabletCtx : pendingTablets) {
if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
&& partitionIds.contains(tabletCtx.getPartitionId())) {
- tabletCtx.setOrigPriority(Priority.VERY_HIGH);
+ tabletCtx.setPriority(Priority.VERY_HIGH);
}
newPendingTablets.add(tabletCtx);
}
@@ -293,14 +319,15 @@ public class TabletScheduler extends MasterDaemon {
return;
}
- updateLoadStatisticsAndPriorityIfNecessary();
+ if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) {
+ updateLoadStatisticsAndPriorityIfNecessary();
+ handleRunningTablets();
+ selectTabletsForBalance();
+ lastCheckTimeoutTime = System.currentTimeMillis();
+ }
schedulePendingTablets();
- handleRunningTablets();
-
- selectTabletsForBalance();
-
stat.counterTabletScheduleRound.incrementAndGet();
}
@@ -314,8 +341,6 @@ public class TabletScheduler extends MasterDaemon {
rebalancer.updateLoadStatistic(statisticMap);
diskRebalancer.updateLoadStatistic(statisticMap);
- adjustPriorities();
-
lastStatUpdateTime = System.currentTimeMillis();
}
@@ -332,7 +357,7 @@ public class TabletScheduler extends MasterDaemon {
LoadStatisticForTag loadStatistic = new LoadStatisticForTag(tag,
infoService, invertedIndex);
loadStatistic.init();
newStatisticMap.put(tag, loadStatistic);
- LOG.debug("update load statistic:\n{}", loadStatistic.getBrief());
+ LOG.debug("update load statistic for tag {}:\n{}", tag,
loadStatistic.getBrief());
}
this.statisticMap = newStatisticMap;
@@ -342,28 +367,6 @@ public class TabletScheduler extends MasterDaemon {
return statisticMap;
}
- /**
- * adjust priorities of all tablet infos
- */
- private synchronized void adjustPriorities() {
- int size = pendingTablets.size();
- int changedNum = 0;
- TabletSchedCtx tabletCtx;
- for (int i = 0; i < size; i++) {
- tabletCtx = pendingTablets.poll();
- if (tabletCtx == null) {
- break;
- }
-
- if (tabletCtx.adjustPriority(stat)) {
- changedNum++;
- }
- pendingTablets.add(tabletCtx);
- }
-
- LOG.debug("adjust priority for all tablets. changed: {}, total: {}",
changedNum, size);
- }
-
/**
* get at most BATCH_NUM tablets from queue, and try to schedule them.
* After handle, the tablet info should be
@@ -371,7 +374,7 @@ public class TabletScheduler extends MasterDaemon {
* 2. or in schedHistory with state CANCELLING, if some unrecoverable
error happens.
* 3. or in pendingTablets with state PENDING, if failed to be scheduled.
*
- * if in schedHistory, it should be removed from allTabletIds.
+ * if in schedHistory, it should be removed from allTabletTypes.
*/
private void schedulePendingTablets() {
long start = System.currentTimeMillis();
@@ -385,36 +388,25 @@ public class TabletScheduler extends MasterDaemon {
// do not schedule more tablet is tablet scheduler is
disabled.
throw new SchedException(Status.FINISHED, "tablet
scheduler is disabled");
}
+ if (Config.disable_balance && tabletCtx.getType() ==
Type.BALANCE) {
+ finalizeTabletCtx(tabletCtx,
TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+ "config disable balance");
+ continue;
+ }
scheduleTablet(tabletCtx, batchTask);
} catch (SchedException e) {
- tabletCtx.increaseFailedSchedCounter();
tabletCtx.setErrMsg(e.getMessage());
-
if (e.getStatus() == Status.SCHEDULE_FAILED) {
- if (tabletCtx.getType() == Type.BALANCE) {
- // if balance is disabled, remove this tablet
- if (Config.disable_balance) {
- finalizeTabletCtx(tabletCtx,
TabletSchedCtx.State.CANCELLED, e.getStatus(),
- "disable balance and " + e.getMessage());
- } else {
- // remove the balance task if it fails to be
scheduled many times
- if (tabletCtx.getFailedSchedCounter() > 10) {
- finalizeTabletCtx(tabletCtx,
TabletSchedCtx.State.CANCELLED, e.getStatus(),
- "schedule failed too many times and "
+ e.getMessage());
- } else {
- // we must release resource it current hold,
and be scheduled again
- tabletCtx.releaseResource(this);
- // adjust priority to avoid some higher
priority always be the first in pendingTablets
-
stat.counterTabletScheduledFailed.incrementAndGet();
-
dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx, e.getMessage());
- }
- }
+ boolean isExceedLimit =
tabletCtx.onSchedFailedAndCheckExceedLimit(e.getSubCode());
+ if (isExceedLimit) {
+ finalizeTabletCtx(tabletCtx,
TabletSchedCtx.State.CANCELLED, e.getStatus(),
+ "schedule failed too many times and " +
e.getMessage());
} else {
// we must release resource it current hold, and be
scheduled again
tabletCtx.releaseResource(this);
// adjust priority to avoid some higher priority
always be the first in pendingTablets
stat.counterTabletScheduledFailed.incrementAndGet();
- dynamicAdjustPrioAndAddBackToPendingTablets(tabletCtx,
e.getMessage());
+ addBackToPendingTablets(tabletCtx);
}
} else if (e.getStatus() == Status.FINISHED) {
// schedule redundant tablet or scheduler disabled will
throw this exception
@@ -485,6 +477,8 @@ public class TabletScheduler extends MasterDaemon {
tbl.writeLockOrException(new SchedException(Status.UNRECOVERABLE,
"table "
+ tbl.getName() + " does not exist"));
try {
+ long tabletId = tabletCtx.getTabletId();
+
boolean isColocateTable =
colocateTableIndex.isColocateTable(tbl.getId());
OlapTableState tableState = tbl.getState();
@@ -499,8 +493,9 @@ public class TabletScheduler extends MasterDaemon {
throw new SchedException(Status.UNRECOVERABLE, "index does not
exist");
}
- Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
+ Tablet tablet = idx.getTablet(tabletId);
Preconditions.checkNotNull(tablet);
+ ReplicaAllocation replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
if (isColocateTable) {
GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
@@ -516,18 +511,26 @@ public class TabletScheduler extends MasterDaemon {
Set<Long> backendsSet =
colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
TabletStatus st = tablet.getColocateHealthStatus(
- partition.getVisibleVersion(),
-
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
- backendsSet);
+ partition.getVisibleVersion(), replicaAlloc,
backendsSet);
statusPair = Pair.of(st, Priority.HIGH);
tabletCtx.setColocateGroupBackendIds(backendsSet);
} else {
List<Long> aliveBeIds = infoService.getAllBackendIds(true);
statusPair = tablet.getHealthStatusWithPriority(
- infoService,
- partition.getVisibleVersion(),
-
tbl.getPartitionInfo().getReplicaAllocation(partition.getId()),
- aliveBeIds);
+ infoService, partition.getVisibleVersion(),
replicaAlloc, aliveBeIds);
+ }
+
+ if (tabletCtx.getType() != allTabletTypes.get(tabletId)) {
+ TabletSchedCtx.Type curType = tabletCtx.getType();
+ TabletSchedCtx.Type newType = allTabletTypes.get(tabletId);
+ if (curType == TabletSchedCtx.Type.BALANCE && newType ==
TabletSchedCtx.Type.REPAIR) {
+ tabletCtx.setType(newType);
+ tabletCtx.setReplicaAlloc(replicaAlloc);
+ tabletCtx.setTag(null);
+ } else {
+ throw new SchedException(Status.UNRECOVERABLE, "can not
convert type of tablet "
+ + tabletId + " from " + curType.name() + " to " +
newType.name());
+ }
}
if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE &&
tableState != OlapTableState.NORMAL) {
@@ -732,11 +735,11 @@ public class TabletScheduler extends MasterDaemon {
private void handleReplicaVersionIncomplete(TabletSchedCtx tabletCtx,
AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaVersionMissingErr.incrementAndGet();
-
try {
tabletCtx.chooseDestReplicaForVersionIncomplete(backendsWorkingSlots);
} catch (SchedException e) {
- if (e.getMessage().equals("unable to choose dest replica")) {
+ // could not find dest, try add a missing.
+ if (e.getStatus() == Status.UNRECOVERABLE) {
// This situation may occur when the BE nodes
// where all replicas of a tablet are located are decommission,
// and this task is a VERSION_INCOMPLETE task.
@@ -779,25 +782,7 @@ public class TabletScheduler extends MasterDaemon {
private void handleReplicaRelocating(TabletSchedCtx tabletCtx,
AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaUnavailableErr.incrementAndGet();
- try {
- handleReplicaVersionIncomplete(tabletCtx, batchTask);
- LOG.debug("succeed to find version incomplete replica from tablet
relocating. tablet id: {}",
- tabletCtx.getTabletId());
- } catch (SchedException e) {
- if (e.getStatus() == Status.SCHEDULE_FAILED) {
- LOG.debug("failed to find version incomplete replica from
tablet relocating. tablet id: {}, "
- + "try to find a new backend",
tabletCtx.getTabletId());
- // the dest or src slot may be taken after calling
handleReplicaVersionIncomplete(),
- // so we need to release these slots first.
- // and reserve the tablet in TabletSchedCtx so that it can
continue to be scheduled.
- tabletCtx.releaseResource(this, true);
- tabletCtx.setTabletStatus(TabletStatus.REPLICA_MISSING);
- handleReplicaMissing(tabletCtx, batchTask);
- LOG.debug("succeed to find new backend for tablet relocating.
tablet id: {}", tabletCtx.getTabletId());
- } else {
- throw e;
- }
- }
+ handleReplicaVersionIncomplete(tabletCtx, batchTask);
}
/**
@@ -831,7 +816,7 @@ public class TabletScheduler extends MasterDaemon {
// to remove this tablet from the pendingTablets(consider it as
finished)
throw new SchedException(Status.FINISHED, "redundant replica is
deleted");
}
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any
redundant replicas");
+ throw new SchedException(Status.UNRECOVERABLE, "unable to delete any
redundant replicas");
}
private boolean deleteBackendDropped(TabletSchedCtx tabletCtx, boolean
force) throws SchedException {
@@ -1035,7 +1020,7 @@ public class TabletScheduler extends MasterDaemon {
deleteReplicaInternal(tabletCtx, replica, "colocate redundant",
false);
throw new SchedException(Status.FINISHED, "colocate redundant
replica is deleted");
}
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to delete any
colocate redundant replicas");
+ throw new SchedException(Status.UNRECOVERABLE, "unable to delete any
colocate redundant replicas");
}
/**
@@ -1102,16 +1087,17 @@ public class TabletScheduler extends MasterDaemon {
replica.setWatermarkTxnId(nextTxnId);
replica.setState(ReplicaState.DECOMMISSION);
// set priority to normal because it may wait for a long time.
Remain it as VERY_HIGH may block other task.
- tabletCtx.setOrigPriority(Priority.NORMAL);
+ tabletCtx.setPriority(Priority.NORMAL);
LOG.info("set replica {} on backend {} of tablet {} state to
DECOMMISSION due to reason {}",
replica.getId(), replica.getBackendId(),
tabletCtx.getTabletId(), reason);
- throw new SchedException(Status.SCHEDULE_FAILED, "set watermark
txn " + nextTxnId);
+ throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_DECOMMISSION,
+ "set watermark txn " + nextTxnId);
} else if (replica.getState() == ReplicaState.DECOMMISSION &&
replica.getWatermarkTxnId() != -1) {
long watermarkTxnId = replica.getWatermarkTxnId();
try {
if
(!Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(watermarkTxnId,
tabletCtx.getDbId(),
Lists.newArrayList(tabletCtx.getTblId()))) {
- throw new SchedException(Status.SCHEDULE_FAILED,
+ throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_DECOMMISSION,
"wait txn before " + watermarkTxnId + " to be
finished");
}
} catch (AnalysisException e) {
@@ -1225,7 +1211,7 @@ public class TabletScheduler extends MasterDaemon {
}
for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
// add if task from prio backend or cluster is balanced
- if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() ==
TabletSchedCtx.Priority.NORMAL) {
+ if (alternativeTablets.isEmpty() || tabletCtx.getPriority() ==
TabletSchedCtx.Priority.NORMAL) {
addTablet(tabletCtx, false);
}
}
@@ -1260,7 +1246,8 @@ public class TabletScheduler extends MasterDaemon {
LoadStatisticForTag statistic = statisticMap.get(tag);
if (statistic == null) {
throw new SchedException(Status.UNRECOVERABLE,
- String.format("tag %s does not exist.", tag));
+ String.format("tag %s does not exist. available tags:
%s", tag,
+
Joiner.on(",").join(statisticMap.keySet().stream().limit(5).toArray())));
}
beStatistics = statistic.getSortedBeLoadStats(null /* sorted
ignore medium */);
} else {
@@ -1336,7 +1323,7 @@ public class TabletScheduler extends MasterDaemon {
}
if (allFitPaths.isEmpty()) {
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to find
dest path for new replica");
+ throw new SchedException(Status.UNRECOVERABLE, "unable to find
dest path for new replica");
}
// all fit paths has already been sorted by load score in
'allFitPaths' in ascend order.
@@ -1390,25 +1377,109 @@ public class TabletScheduler extends MasterDaemon {
return rootPathLoadStatistic;
}
- throw new SchedException(Status.SCHEDULE_FAILED, "unable to find dest
path which can be fit in");
+ throw new SchedException(Status.UNRECOVERABLE, "unable to find dest
path which can be fit in");
}
- /**
- * For some reason, a tablet info failed to be scheduled this time,
- * So we dynamically change its priority and add back to queue, waiting
for next round.
- */
- private void dynamicAdjustPrioAndAddBackToPendingTablets(TabletSchedCtx
tabletCtx, String message) {
+
+ private void addBackToPendingTablets(TabletSchedCtx tabletCtx) {
Preconditions.checkState(tabletCtx.getState() ==
TabletSchedCtx.State.PENDING);
- tabletCtx.adjustPriority(stat);
addTablet(tabletCtx, true /* force */);
}
+
private void finalizeTabletCtx(TabletSchedCtx tabletCtx,
TabletSchedCtx.State state, Status status, String reason) {
// use 2 steps to avoid nested database lock and
synchronized.(releaseTabletCtx() may hold db lock)
// remove the tablet ctx, so that no other process can see it
removeTabletCtx(tabletCtx, reason);
// release resources taken by tablet ctx
releaseTabletCtx(tabletCtx, state, status == Status.UNRECOVERABLE);
+
+ // if check immediately, then no need to wait TabletChecker's 20s
+ if (state == TabletSchedCtx.State.FINISHED) {
+ tryAddAfterFinished(tabletCtx);
+ }
+ }
+
+ private void tryAddAfterFinished(TabletSchedCtx tabletCtx) {
+ int finishedCounter = tabletCtx.getFinishedCounter();
+ finishedCounter++;
+ tabletCtx.setFinishedCounter(finishedCounter);
+ if (finishedCounter >= TabletSchedCtx.FINISHED_COUNTER_THRESHOLD) {
+ return;
+ }
+
+ Database db =
Env.getCurrentInternalCatalog().getDbNullable(tabletCtx.getDbId());
+ if (db == null) {
+ return;
+ }
+ OlapTable tbl = (OlapTable) db.getTableNullable(tabletCtx.getTblId());
+ if (tbl == null) {
+ return;
+ }
+ Pair<TabletStatus, TabletSchedCtx.Priority> statusPair;
+ ReplicaAllocation replicaAlloc = null;
+ tbl.readLock();
+ try {
+ Partition partition = tbl.getPartition(tabletCtx.getPartitionId());
+ if (partition == null) {
+ return;
+ }
+
+ MaterializedIndex idx = partition.getIndex(tabletCtx.getIndexId());
+ if (idx == null) {
+ return;
+ }
+
+ Tablet tablet = idx.getTablet(tabletCtx.getTabletId());
+ if (tablet == null) {
+ return;
+ }
+
+ replicaAlloc =
tbl.getPartitionInfo().getReplicaAllocation(partition.getId());
+ boolean isColocateTable =
colocateTableIndex.isColocateTable(tbl.getId());
+ if (isColocateTable) {
+ GroupId groupId = colocateTableIndex.getGroup(tbl.getId());
+ if (groupId == null) {
+ return;
+ }
+
+ int tabletOrderIdx = tabletCtx.getTabletOrderIdx();
+ if (tabletOrderIdx == -1) {
+ tabletOrderIdx = idx.getTabletOrderIdx(tablet.getId());
+ }
+ Preconditions.checkState(tabletOrderIdx != -1);
+
+ Set<Long> backendsSet =
colocateTableIndex.getTabletBackendsByGroup(groupId, tabletOrderIdx);
+ TabletStatus st = tablet.getColocateHealthStatus(
+ partition.getVisibleVersion(), replicaAlloc,
backendsSet);
+ statusPair = Pair.of(st, Priority.HIGH);
+ } else {
+ List<Long> aliveBeIds = infoService.getAllBackendIds(true);
+ statusPair = tablet.getHealthStatusWithPriority(
+ infoService, partition.getVisibleVersion(),
replicaAlloc, aliveBeIds);
+
+ if (statusPair.second.ordinal() <
tabletCtx.getPriority().ordinal()) {
+ statusPair.second = tabletCtx.getPriority();
+ }
+ }
+ } finally {
+ tbl.readUnlock();
+ }
+
+ if (statusPair.first == TabletStatus.HEALTHY) {
+ return;
+ }
+
+ TabletSchedCtx newTabletCtx = new TabletSchedCtx(
+ TabletSchedCtx.Type.REPAIR, tabletCtx.getDbId(),
tabletCtx.getTblId(),
+ tabletCtx.getPartitionId(), tabletCtx.getIndexId(),
tabletCtx.getTabletId(),
+ replicaAlloc, System.currentTimeMillis());
+
+ newTabletCtx.setTabletStatus(statusPair.first);
+ newTabletCtx.setPriority(statusPair.second);
+ newTabletCtx.setFinishedCounter(finishedCounter);
+
+ addTablet(newTabletCtx, false);
}
private void releaseTabletCtx(TabletSchedCtx tabletCtx,
TabletSchedCtx.State state, boolean resetReplicaState) {
@@ -1422,7 +1493,7 @@ public class TabletScheduler extends MasterDaemon {
private synchronized void removeTabletCtx(TabletSchedCtx tabletCtx, String
reason) {
runningTablets.remove(tabletCtx.getTabletId());
- allTabletIds.remove(tabletCtx.getTabletId());
+ allTabletTypes.remove(tabletCtx.getTabletId());
schedHistory.add(tabletCtx);
LOG.info("remove the tablet {}. because: {}", tabletCtx.getTabletId(),
reason);
}
@@ -1430,15 +1501,27 @@ public class TabletScheduler extends MasterDaemon {
// get next batch of tablets from queue.
private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
List<TabletSchedCtx> list = Lists.newArrayList();
- int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
- while (count > 0) {
+ int slotNum = getCurrentAvailableSlotNum();
+ // Make slotNum >= 1 to ensure that it could return at least 1 ctx
+ // when the pending list is not empty.
+ if (slotNum < 1) {
+ slotNum = 1;
+ }
+ while (list.size() < Config.schedule_batch_size && slotNum > 0) {
TabletSchedCtx tablet = pendingTablets.poll();
if (tablet == null) {
// no more tablets
break;
}
list.add(tablet);
- count--;
+ TabletStatus status = tablet.getTabletStatus();
+ // for a clone, it will take 2 slots: src slot and dst slot.
+ if (!(status == TabletStatus.REDUNDANT
+ || status == TabletStatus.FORCE_REDUNDANT
+ || status == TabletStatus.COLOCATE_REDUNDANT
+ || status == TabletStatus.REPLICA_COMPACTION_TOO_SLOW)) {
+ slotNum -= 2;
+ }
}
return list;
}
@@ -1469,9 +1552,12 @@ public class TabletScheduler extends MasterDaemon {
// if we have a success task, then stat must be refreshed before
schedule a new task
updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(),
tabletCtx.getSrcPathHash());
updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(),
tabletCtx.getDestPathHash());
+ finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED,
Status.FINISHED, "finished");
+ } else {
+ finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED,
Status.UNRECOVERABLE,
+ request.getTaskStatus().getErrorMsgs().get(0));
}
- // we need this function to free slot for this migration task
- finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED,
Status.FINISHED, "finished");
+
return true;
}
@@ -1496,12 +1582,20 @@ public class TabletScheduler extends MasterDaemon {
try {
tabletCtx.finishCloneTask(cloneTask, request);
} catch (SchedException e) {
- tabletCtx.increaseFailedRunningCounter();
tabletCtx.setErrMsg(e.getMessage());
if (e.getStatus() == Status.RUNNING_FAILED) {
- stat.counterCloneTaskFailed.incrementAndGet();
- addToRunningTablets(tabletCtx);
- return false;
+ tabletCtx.increaseFailedRunningCounter();
+ if (!tabletCtx.isExceedFailedRunningLimit()) {
+ stat.counterCloneTaskFailed.incrementAndGet();
+ addToRunningTablets(tabletCtx);
+ return false;
+ } else {
+ // unrecoverable
+ stat.counterTabletScheduledDiscard.incrementAndGet();
+ finalizeTabletCtx(tabletCtx,
TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+ e.getMessage());
+ return true;
+ }
} else if (e.getStatus() == Status.UNRECOVERABLE) {
// unrecoverable
stat.counterTabletScheduledDiscard.incrementAndGet();
@@ -1639,7 +1733,7 @@ public class TabletScheduler extends MasterDaemon {
}
public synchronized int getTotalNum() {
- return allTabletIds.size();
+ return allTabletTypes.size();
}
public synchronized long getBalanceTabletsNumber() {
@@ -1655,10 +1749,12 @@ public class TabletScheduler extends MasterDaemon {
public static class PathSlot {
// path hash -> slot num
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
+ private long beId;
- public PathSlot(List<Long> paths, int initSlotNum) {
+ public PathSlot(List<Long> paths, long beId) {
+ this.beId = beId;
for (Long pathHash : paths) {
- pathSlots.put(pathHash, new Slot(initSlotNum));
+ pathSlots.put(pathHash, new Slot(beId));
}
}
@@ -1670,22 +1766,8 @@ public class TabletScheduler extends MasterDaemon {
// add new path
for (Long pathHash : paths) {
if (!pathSlots.containsKey(pathHash)) {
- pathSlots.put(pathHash, new
Slot(Config.schedule_slot_num_per_path));
- }
- }
- }
-
- // Update the total slots num of specified paths, increase or decrease
- public synchronized void updateSlot(List<Long> pathHashs, int delta) {
- for (Long pathHash : pathHashs) {
- Slot slot = pathSlots.get(pathHash);
- if (slot == null) {
- continue;
+ pathSlots.put(pathHash, new Slot(beId));
}
-
- slot.total += delta;
- slot.rectify();
- LOG.debug("decrease path {} slots num to {}", pathHash,
pathSlots.get(pathHash).total);
}
}
@@ -1701,6 +1783,21 @@ public class TabletScheduler extends MasterDaemon {
slot.totalCopyTimeMs += copyTimeMs;
}
+ public synchronized boolean hasAvailableSlot(long pathHash) {
+ if (pathHash == -1) {
+ return false;
+ }
+
+ Slot slot = pathSlots.get(pathHash);
+ if (slot == null) {
+ return false;
+ }
+ if (slot.getAvailable() == 0) {
+ return false;
+ }
+ return true;
+ }
+
/**
* If the specified 'pathHash' has available slot, decrease the slot
number and return this path hash
*/
@@ -1709,7 +1806,8 @@ public class TabletScheduler extends MasterDaemon {
if (LOG.isDebugEnabled()) {
LOG.debug("path hash is not set.", new Exception());
}
- throw new SchedException(Status.SCHEDULE_FAILED, "path hash is
not set");
+ throw new SchedException(Status.SCHEDULE_FAILED,
SubCode.WAITING_SLOT,
+ "path hash is not set");
}
Slot slot = pathSlots.get(pathHash);
@@ -1717,12 +1815,11 @@ public class TabletScheduler extends MasterDaemon {
LOG.debug("path {} is not exist", pathHash);
return -1;
}
- slot.rectify();
- if (slot.available <= 0) {
+ if (slot.used >= slot.getTotal()) {
LOG.debug("path {} has no available slot", pathHash);
return -1;
}
- slot.available--;
+ slot.used++;
return pathHash;
}
@@ -1731,23 +1828,15 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return;
}
- slot.available++;
- slot.rectify();
- }
-
- public synchronized int peekSlot(long pathHash) {
- Slot slot = pathSlots.get(pathHash);
- if (slot == null) {
- return -1;
+ if (slot.used > 0) {
+ slot.used--;
}
- slot.rectify();
- return slot.available;
}
public synchronized int getTotalAvailSlotNum() {
int total = 0;
for (Slot slot : pathSlots.values()) {
- total += slot.available;
+ total += slot.getAvailable();
}
return total;
}
@@ -1758,7 +1847,7 @@ public class TabletScheduler extends MasterDaemon {
public synchronized Set<Long> getAvailPathsForBalance() {
Set<Long> pathHashs = Sets.newHashSet();
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
- if (entry.getValue().balanceSlot > 0) {
+ if (entry.getValue().getBalanceAvailable() > 0) {
pathHashs.add(entry.getKey());
}
}
@@ -1768,7 +1857,7 @@ public class TabletScheduler extends MasterDaemon {
public synchronized int getAvailBalanceSlotNum() {
int num = 0;
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
- num += entry.getValue().balanceSlot;
+ num += entry.getValue().getBalanceAvailable();
}
return num;
}
@@ -1776,13 +1865,12 @@ public class TabletScheduler extends MasterDaemon {
public synchronized List<List<String>> getSlotInfo(long beId) {
List<List<String>> results = Lists.newArrayList();
pathSlots.forEach((key, value) -> {
- value.rectify();
List<String> result = Lists.newArrayList();
result.add(String.valueOf(beId));
result.add(String.valueOf(key));
- result.add(String.valueOf(value.available));
- result.add(String.valueOf(value.total));
- result.add(String.valueOf(value.balanceSlot));
+ result.add(String.valueOf(value.getAvailable()));
+ result.add(String.valueOf(value.getTotal()));
+ result.add(String.valueOf(value.getBalanceAvailable()));
result.add(String.valueOf(value.getAvgRate()));
results.add(result);
});
@@ -1794,8 +1882,8 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return -1;
}
- if (slot.balanceSlot > 0) {
- slot.balanceSlot--;
+ if (slot.balanceUsed < slot.getBalanceTotal()) {
+ slot.balanceUsed++;
return pathHash;
}
return -1;
@@ -1807,8 +1895,8 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
continue;
}
- if (slot.balanceSlot > 0) {
- slot.balanceSlot--;
+ if (slot.balanceUsed < slot.getBalanceTotal()) {
+ slot.balanceUsed++;
return pathHash;
}
}
@@ -1820,8 +1908,9 @@ public class TabletScheduler extends MasterDaemon {
if (slot == null) {
return;
}
- slot.balanceSlot++;
- slot.rectify();
+ if (slot.balanceUsed > 0) {
+ slot.balanceUsed--;
+ }
}
public synchronized void updateDiskBalanceLastSuccTime(long pathHash) {
@@ -1851,10 +1940,8 @@ public class TabletScheduler extends MasterDaemon {
}
public static class Slot {
- public int total;
- public int available;
- // slot reserved for balance
- public int balanceSlot;
+ public int used;
+ public int balanceUsed;
public long totalCopySize = 0;
public long totalCopyTimeMs = 0;
@@ -1862,23 +1949,35 @@ public class TabletScheduler extends MasterDaemon {
// for disk balance
public long diskBalanceLastSuccTime = 0;
- public Slot(int total) {
- this.total = total;
- this.available = total;
- this.balanceSlot = Config.balance_slot_num_per_path;
+ private long beId;
+
+ public Slot(long beId) {
+ this.beId = beId;
+ this.used = 0;
+ this.balanceUsed = 0;
}
- public void rectify() {
- if (total <= 0) {
- total = 1;
- }
- if (available > total) {
- available = total;
- }
+ public int getAvailable() {
+ return Math.max(0, getTotal() - used);
+ }
+
+ public int getTotal() {
+ int total = Math.max(1, Config.schedule_slot_num_per_path);
- if (balanceSlot > Config.balance_slot_num_per_path) {
- balanceSlot = Config.balance_slot_num_per_path;
+ Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+ if (be != null && be.isDecommissioned()) {
+ total = Math.max(1,
Config.schedule_decommission_slot_num_per_path);
}
+
+ return total;
+ }
+
+ public int getBalanceAvailable() {
+ return Math.max(0, getBalanceTotal() - balanceUsed);
+ }
+
+ public int getBalanceTotal() {
+ return Math.max(1, Config.balance_slot_num_per_path);
}
// return avg rate, Bytes/S
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
index e2aada6f1f..4441a99431 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
@@ -29,15 +29,15 @@ import com.google.common.collect.Lists;
import java.util.List;
/*
- * show proc "/tablet_scheduler/pending_tablets";
- * show proc "/tablet_scheduler/running_tablets";
- * show proc "/tablet_scheduler/history_tablets";
+ * show proc "/cluster_balance/pending_tablets";
+ * show proc "/cluster_balance/running_tablets";
+ * show proc "/cluster_balance/history_tablets";
*/
public class TabletSchedulerDetailProcDir implements ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>().add("TabletId")
-
.add("Type").add("Medium").add("Status").add("State").add("OrigPrio").add("DynmPrio").add("SrcBe")
+
.add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
.add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
-
.add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("LstAdjPrio").add("VisibleVer")
+
.add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
.add("CmtVer").add("ErrMsg")
.build();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index 41df080a75..d4578e17d7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -39,17 +39,17 @@ public class TabletSchedCtxTest {
ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis());
- ctx1.setOrigPriority(Priority.NORMAL);
+ ctx1.setPriority(Priority.NORMAL);
ctx1.setLastVisitedTime(2);
TabletSchedCtx ctx2 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis());
- ctx2.setOrigPriority(Priority.NORMAL);
+ ctx2.setPriority(Priority.NORMAL);
ctx2.setLastVisitedTime(3);
TabletSchedCtx ctx3 = new TabletSchedCtx(Type.REPAIR,
- 1, 2, 3, 4, 1001, replicaAlloc, System.currentTimeMillis());
- ctx3.setOrigPriority(Priority.NORMAL);
+ 1, 2, 3, 4, 1002, replicaAlloc, System.currentTimeMillis());
+ ctx3.setPriority(Priority.NORMAL);
ctx3.setLastVisitedTime(1);
pendingTablets.add(ctx1);
@@ -62,8 +62,8 @@ public class TabletSchedCtxTest {
// priority is not equal, info2 is HIGH, should ranks ahead
pendingTablets.clear();
- ctx1.setOrigPriority(Priority.NORMAL);
- ctx2.setOrigPriority(Priority.HIGH);
+ ctx1.setPriority(Priority.NORMAL);
+ ctx2.setPriority(Priority.HIGH);
ctx1.setLastVisitedTime(2);
ctx2.setLastVisitedTime(2);
pendingTablets.add(ctx2);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index 95fb22eac6..209f8a1127 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -33,7 +33,6 @@ import org.junit.jupiter.api.Test;
import java.util.List;
public class DecommissionBackendTest extends TestWithFeService {
-
@Override
protected int backendNum() {
return 3;
@@ -42,6 +41,7 @@ public class DecommissionBackendTest extends
TestWithFeService {
@Override
protected void beforeCluster() {
FeConstants.runningUnitTest = true;
+ needCleanDir = false;
}
@BeforeAll
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index ac2fe5978d..f2610738f3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -119,6 +119,7 @@ public abstract class TestWithFeService {
protected String dorisHome;
protected String runningDir = "fe/mocked/" + getClass().getSimpleName() +
"/" + UUID.randomUUID() + "/";
protected ConnectContext connectContext;
+ protected boolean needCleanDir = true;
protected static final String DEFAULT_CLUSTER_PREFIX = "default_cluster:";
@@ -140,7 +141,9 @@ public abstract class TestWithFeService {
runAfterAll();
Env.getCurrentEnv().clear();
StatementScopeIdGenerator.clear();
- cleanDorisFeDir();
+ if (needCleanDir) {
+ cleanDorisFeDir();
+ }
}
@BeforeEach
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]