This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new d89e5de8159 [improvement](tablet scheduler) fix higher priority tablet
add failed due to pending queue full #41076 (#41268)
d89e5de8159 is described below
commit d89e5de81599d17a56c70ea05ec13d6baac052c9
Author: yujun <[email protected]>
AuthorDate: Thu Sep 26 22:31:20 2024 +0800
[improvement](tablet scheduler) fix higher priority tablet add failed due
to pending queue full #41076 (#41268)
cherry pick from #41076
---
.../clone/ColocateTableCheckerAndBalancer.java | 11 ++++---
.../java/org/apache/doris/clone/TabletChecker.java | 12 ++++++--
.../org/apache/doris/clone/TabletSchedCtx.java | 4 +--
.../org/apache/doris/clone/TabletScheduler.java | 33 +++++++++++++++-----
.../org/apache/doris/clone/TabletSchedCtxTest.java | 36 ++++++++++++++++++++--
5 files changed, 77 insertions(+), 19 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 4febec9e922..7727bc77e18 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -561,15 +561,17 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
AddResult res =
tabletScheduler.addTablet(tabletCtx, false /* not force */);
- if (res == AddResult.LIMIT_EXCEED || res
== AddResult.DISABLED) {
+ if (res == AddResult.DISABLED) {
// tablet in scheduler exceed limit,
or scheduler is disabled,
// skip this group and check next one.
LOG.info("tablet scheduler return: {}.
stop colocate table check", res.name());
break OUT;
} else if (res == AddResult.ADDED) {
counter.addToSchedulerTabletNum++;
- } else {
+ } else if (res == AddResult.ALREADY_IN) {
counter.tabletInScheduler++;
+ } else if (res == AddResult.REPLACE_ADDED
|| res == AddResult.LIMIT_EXCEED) {
+ counter.tabletExceedLimit++;
}
}
}
@@ -589,9 +591,10 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
} // end for groups
long cost = System.currentTimeMillis() - start;
- LOG.info("finished to check tablets.
unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms",
+ LOG.info("finished to check tablets.
unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{}, "
+ + "cost: {} ms",
counter.unhealthyTabletNum, counter.totalTabletNum,
counter.addToSchedulerTabletNum,
- counter.tabletInScheduler, counter.tabletNotReady, cost);
+ counter.tabletInScheduler, counter.tabletNotReady,
counter.tabletExceedLimit, cost);
}
private GlobalColocateStatistic buildGlobalColocateStatistic() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
index f35282d37b6..78795e54f95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletChecker.java
@@ -78,6 +78,7 @@ public class TabletChecker extends MasterDaemon {
put("added", new AtomicLong(0L));
put("in_sched", new AtomicLong(0L));
put("not_ready", new AtomicLong(0L));
+ put("exceed_limit", new AtomicLong(0L));
}
};
@@ -224,6 +225,7 @@ public class TabletChecker extends MasterDaemon {
public long addToSchedulerTabletNum = 0;
public long tabletInScheduler = 0;
public long tabletNotReady = 0;
+ public long tabletExceedLimit = 0;
}
private enum LoopControlStatus {
@@ -344,10 +346,12 @@ public class TabletChecker extends MasterDaemon {
tabletCountByStatus.get("added").set(counter.addToSchedulerTabletNum);
tabletCountByStatus.get("in_sched").set(counter.tabletInScheduler);
tabletCountByStatus.get("not_ready").set(counter.tabletNotReady);
+ tabletCountByStatus.get("exceed_limit").set(counter.tabletExceedLimit);
- LOG.info("finished to check tablets.
unhealth/total/added/in_sched/not_ready: {}/{}/{}/{}/{}, cost: {} ms",
+ LOG.info("finished to check tablets.
unhealth/total/added/in_sched/not_ready/exceed_limit: {}/{}/{}/{}/{}/{},"
+ + "cost: {} ms",
counter.unhealthyTabletNum, counter.totalTabletNum,
counter.addToSchedulerTabletNum,
- counter.tabletInScheduler, counter.tabletNotReady, cost);
+ counter.tabletInScheduler, counter.tabletNotReady,
counter.tabletExceedLimit, cost);
}
private LoopControlStatus handlePartitionTablet(Database db, OlapTable
tbl, Partition partition, boolean isInPrios,
@@ -404,11 +408,13 @@ public class TabletChecker extends MasterDaemon {
tabletCtx.setIsUniqKeyMergeOnWrite(isUniqKeyMergeOnWrite);
AddResult res = tabletScheduler.addTablet(tabletCtx, false /*
not force */);
- if (res == AddResult.LIMIT_EXCEED || res ==
AddResult.DISABLED) {
+ if (res == AddResult.DISABLED) {
LOG.info("tablet scheduler return: {}. stop tablet
checker", res.name());
return LoopControlStatus.BREAK_OUT;
} else if (res == AddResult.ADDED) {
counter.addToSchedulerTabletNum++;
+ } else if (res == AddResult.REPLACE_ADDED || res ==
AddResult.LIMIT_EXCEED) {
+ counter.tabletExceedLimit++;
}
}
} // indices
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index d004d21f79c..5e29adbb6da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -1334,13 +1334,13 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
if (tabletHealth.aliveAndVersionCompleteNum < (replicaNum / 2
+ 1)) {
value -= 3 * baseTime;
if (tabletHealth.hasRecentLoadFailed) {
- value -= 3 * baseTime;
+ value -= 4 * baseTime;
}
}
if (tabletHealth.hasAliveAndVersionIncomplete) {
value -= 1 * baseTime;
if (isUniqKeyMergeOnWrite) {
- value -= 1 * baseTime;
+ value -= 2 * baseTime;
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 9768cc764ef..97c9be0e887 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -71,6 +71,7 @@ import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
@@ -81,7 +82,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
@@ -120,7 +120,7 @@ public class TabletScheduler extends MasterDaemon {
*
* pendingTablets, allTabletTypes, runningTablets and schedHistory are
protected by 'synchronized'
*/
- private PriorityQueue<TabletSchedCtx> pendingTablets = new
PriorityQueue<>();
+ private MinMaxPriorityQueue<TabletSchedCtx> pendingTablets =
MinMaxPriorityQueue.create();
private Map<Long, TabletSchedCtx.Type> allTabletTypes = Maps.newHashMap();
// contains all tabletCtxs which state are RUNNING
private Map<Long, TabletSchedCtx> runningTablets = Maps.newHashMap();
@@ -149,6 +149,7 @@ public class TabletScheduler extends MasterDaemon {
ADDED, // success to add
ALREADY_IN, // already added, skip
LIMIT_EXCEED, // number of pending tablets exceed the limit
+ REPLACE_ADDED, // succ to add, and envit a lowest task
DISABLED // scheduler has been disabled.
}
@@ -268,12 +269,22 @@ public class TabletScheduler extends MasterDaemon {
return AddResult.ALREADY_IN;
}
+ AddResult addResult = AddResult.ADDED;
// if this is not a force add,
// and number of scheduling tablets exceed the limit,
// refuse to add.
- if (!force && (pendingTablets.size() > Config.max_scheduling_tablets
- || runningTablets.size() > Config.max_scheduling_tablets)) {
- return AddResult.LIMIT_EXCEED;
+ if (!force && (pendingTablets.size() >= Config.max_scheduling_tablets
+ || runningTablets.size() >= Config.max_scheduling_tablets)) {
+ // For a sched tablet, if its compare value is bigger, it will be
more close to queue's tail position,
+ // and its priority is lower.
+ TabletSchedCtx lowestPriorityTablet = pendingTablets.peekLast();
+ if (lowestPriorityTablet == null ||
lowestPriorityTablet.compareTo(tablet) <= 0) {
+ return AddResult.LIMIT_EXCEED;
+ }
+ addResult = AddResult.REPLACE_ADDED;
+ pendingTablets.pollLast();
+ finalizeTabletCtx(lowestPriorityTablet,
TabletSchedCtx.State.CANCELLED, Status.UNRECOVERABLE,
+ "envit lower priority sched tablet because pending queue
is full");
}
if (!contains || tablet.getType() == TabletSchedCtx.Type.REPAIR) {
@@ -285,7 +296,7 @@ public class TabletScheduler extends MasterDaemon {
LOG.info("Add tablet to pending queue, {}", tablet);
}
- return AddResult.ADDED;
+ return addResult;
}
@@ -306,11 +317,12 @@ public class TabletScheduler extends MasterDaemon {
* Iterate current tablets, change their priority to VERY_HIGH if
necessary.
*/
public synchronized void changeTabletsPriorityToVeryHigh(long dbId, long
tblId, List<Long> partitionIds) {
- PriorityQueue<TabletSchedCtx> newPendingTablets = new
PriorityQueue<>();
+ MinMaxPriorityQueue<TabletSchedCtx> newPendingTablets =
MinMaxPriorityQueue.create();
for (TabletSchedCtx tabletCtx : pendingTablets) {
if (tabletCtx.getDbId() == dbId && tabletCtx.getTblId() == tblId
&& partitionIds.contains(tabletCtx.getPartitionId())) {
tabletCtx.setPriority(Priority.VERY_HIGH);
+ tabletCtx.setLastVisitedTime(1L);
}
newPendingTablets.add(tabletCtx);
}
@@ -1745,7 +1757,7 @@ public class TabletScheduler extends MasterDaemon {
slotNum = 1;
}
while (list.size() < Config.schedule_batch_size && slotNum > 0) {
- TabletSchedCtx tablet = pendingTablets.poll();
+ TabletSchedCtx tablet = pendingTablets.pollFirst();
if (tablet == null) {
// no more tablets
break;
@@ -1947,6 +1959,11 @@ public class TabletScheduler extends MasterDaemon {
});
}
+ // only use for fe ut
+ public MinMaxPriorityQueue<TabletSchedCtx> getPendingTabletQueue() {
+ return pendingTablets;
+ }
+
public List<List<String>> getPendingTabletsInfo(int limit) {
return collectTabletCtx(getPendingTablets(limit));
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
index 852f072eca1..a8f48949239 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletSchedCtxTest.java
@@ -17,25 +17,57 @@
package org.apache.doris.clone;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletSchedCtx.Type;
+import org.apache.doris.common.Config;
import com.google.common.collect.Lists;
+import com.google.common.collect.MinMaxPriorityQueue;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
-import java.util.PriorityQueue;
public class TabletSchedCtxTest {
+ @Test
+ public void testAddTablet() {
+ List<TabletSchedCtx> tablets = Lists.newArrayList();
+ ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
+ for (long i = 0; i < 20; i++) {
+ tablets.add(new TabletSchedCtx(Type.REPAIR, 1, 2, 3, 4,
+ i, replicaAlloc, i));
+ tablets.add(new TabletSchedCtx(Type.BALANCE, 1, 2, 3, 4,
+ 1000 + i, replicaAlloc, i));
+ }
+ Collections.shuffle(tablets);
+ Config.max_scheduling_tablets = 5;
+ TabletScheduler scheduler = Env.getCurrentEnv().getTabletScheduler();
+ for (TabletSchedCtx tablet : tablets) {
+ scheduler.addTablet(tablet, false);
+ }
+
+ MinMaxPriorityQueue<TabletSchedCtx> queue =
scheduler.getPendingTabletQueue();
+ List<TabletSchedCtx> gotTablets = Lists.newArrayList();
+ while (!queue.isEmpty()) {
+ gotTablets.add(queue.pollFirst());
+ }
+ Assert.assertEquals(Config.max_scheduling_tablets, gotTablets.size());
+ for (int i = 0; i < gotTablets.size(); i++) {
+ TabletSchedCtx tablet = gotTablets.get(i);
+ Assert.assertEquals(Type.REPAIR, tablet.getType());
+ Assert.assertEquals((long) i, tablet.getCreateTime());
+ }
+ }
+
@Test
public void testPriorityCompare() {
// equal priority, but info3's last visit time is earlier than info2
and info1, so info1 should ranks ahead
- PriorityQueue<TabletSchedCtx> pendingTablets = new PriorityQueue<>();
+ MinMaxPriorityQueue<TabletSchedCtx> pendingTablets =
MinMaxPriorityQueue.create();
ReplicaAllocation replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
TabletSchedCtx ctx1 = new TabletSchedCtx(Type.REPAIR,
1, 2, 3, 4, 1000, replicaAlloc, System.currentTimeMillis());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]