This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9de1c1ceef0 [opt](cloud) Prioritize scheduling the most recently
active tablets in cloud (#59539)
9de1c1ceef0 is described below
commit 9de1c1ceef0b9fc04947e00a3dc38eb2833352e7
Author: deardeng <[email protected]>
AuthorDate: Sat Feb 14 10:51:02 2026 +0800
[opt](cloud) Prioritize scheduling the most recently active tablets in
cloud (#59539)
1. Starting from the getBackendId of the replica, implement a sliding
window approach (in TabletAccessStats.java) to count read and write
operations over the last hour. The accuracy of this count isn't
critical, as the aim is to indicate relative activity levels.
2. Display these statistics in the SHOW TABLETS FROM TABLE and SHOW
TABLET {tabletId} commands.
```
show tablet 1767773242851\G
***************************[ 1. row ]***************************
...
AccessCount1H | 1107
LastAccessTime | 1767782384995
...
```
3. Integrate with the existing balancing strategy:
- For each scheduling operation, identify active databases, tables, and
partitions from the active tablets. Schedule operations first on the
most active databases, tables, and partitions.
- When selecting tablets from the source BE, modify the previous random
selection to prioritize non-active tablets for scheduling, keeping
active tablets as stable as possible on the source.
---
.../main/java/org/apache/doris/common/Config.java | 39 +-
.../main/java/org/apache/doris/catalog/Tablet.java | 2 +
.../catalog/TabletSlidingWindowAccessStats.java | 555 +++++++++++++++++++
.../apache/doris/cloud/catalog/CloudTablet.java | 4 +-
.../doris/cloud/catalog/CloudTabletRebalancer.java | 595 +++++++++++++++++----
.../apache/doris/common/proc/ReplicasProcNode.java | 16 +-
.../apache/doris/common/proc/TabletsProcDir.java | 16 +-
.../java/org/apache/doris/metric/MetricRepo.java | 37 ++
.../trees/plans/commands/ShowTabletIdCommand.java | 15 +-
.../org/apache/doris/planner/OlapScanNode.java | 2 +-
.../TabletSlidingWindowAccessStatsTest.java | 224 ++++++++
.../cloud/catalog/CloudTabletRebalancerTest.java | 241 +++++++++
.../test_active_tablet_priority_scheduling.groovy | 263 +++++++++
13 files changed, 1906 insertions(+), 103 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 36b703382af..7a0ce25a888 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3458,20 +3458,45 @@ public class Config extends ConfigBase {
options = {"without_warmup", "async_warmup", "sync_warmup",
"peer_read_async_warmup"})
public static String cloud_warm_up_for_rebalance_type = "async_warmup";
- @ConfField(mutable = true, masterOnly = true, description = {"云上 tablet
均衡时,"
- + "同一个 host 内预热批次的最大 tablet 个数,默认 10", "The max number of tablets
per host "
- + "when batching warm-up requests during cloud tablet rebalancing,
default 10"})
+ @ConfField(mutable = true, masterOnly = true, description =
{"存算分离模式下tablet均衡时,"
+ + "同一个host内预热批次的最大tablet个数,默认10", "The max number of tablets per
host "
+ + "when batching warm-up requests during tablet rebalancing in "
+ + "compute-storage separation mode, default 10"})
public static int cloud_warm_up_batch_size = 10;
- @ConfField(mutable = true, masterOnly = true, description = {"云上 tablet
均衡时,"
- + "预热批次最长等待时间,单位毫秒,默认 50ms", "Maximum wait time in milliseconds
before a "
+ @ConfField(mutable = true, masterOnly = true, description =
{"存算分离模式下tablet均衡时,"
+ + "预热批次最长等待时间,单位毫秒,默认50ms", "Maximum wait time in milliseconds
before a "
+ "pending warm-up batch is flushed, default 50ms"})
public static int cloud_warm_up_batch_flush_interval_ms = 50;
- @ConfField(mutable = true, masterOnly = true, description = {"云上 tablet
均衡预热 rpc 异步线程池大小,默认 4",
- "Thread pool size for asynchronous warm-up RPC dispatch during cloud
tablet rebalancing, default 4"})
+ @ConfField(mutable = true, masterOnly = true, description =
{"存算分离模式下tablet均衡预热rpc异步线程池大小,默认4",
+ "Thread pool size for asynchronous warm-up RPC dispatch during tablet "
+ + "rebalancing in compute-storage separation mode, default 4"})
public static int cloud_warm_up_rpc_async_pool_size = 4;
+ @ConfField(masterOnly = true, description =
{"存算分离模式下tablet均衡时,是否开启活跃tablet优先调度策略,默认打开"
+ + "When tablets are being balanced in compute-storage separation
mode, "
+ + "is the active tablet priority scheduling strategy enabled?
(Default: Enabled)"})
+ public static boolean enable_cloud_active_tablet_priority_scheduling =
true;
+
+ @ConfField(masterOnly = true, description = {"是否启用活跃tablet滑动窗口访问统计功能,默认打开",
+ "Whether to enable active tablet sliding window access statistics
feature, default true"})
+ public static boolean enable_active_tablet_sliding_window_access_stats =
true;
+
+ @ConfField(mutable = true, masterOnly = true, description =
{"活跃tablet滑动窗口访问统计的时间窗口大小(秒),默认3600秒(1小时)",
+ "Time window size in seconds for active tablet sliding window
access statistics, "
+ + "default 3600 seconds (1 hour)"})
+ public static long active_tablet_sliding_window_time_window_second = 3600L;
+
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "活跃 tablet 优先调度开启时:partition 级调度将优先处理 TopN 的活跃 partition,"
+ + "再处理其余活跃 partition、非活跃 partition,最后处理 internal db。默认
10000,<=0 表示不做 TopN 分段。",
+ "When active tablet priority scheduling is enabled:
partition-level scheduling processes TopN active "
+ + "partitions first, then other active partitions,"
+ + "then inactive partitions, and internal db at last. "
+ + "Default 10000. <=0 disables TopN segmentation."})
+ public static int cloud_active_partition_scheduling_topn = 10000;
+
@ConfField(mutable = true, masterOnly = false)
public static String security_checker_class_name = "";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 45ae9e54456..462464e9182 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -206,6 +206,7 @@ public abstract class Tablet {
// return map of (BE id -> path hash) of normal replicas
// for load plan.
public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws
UserException {
+ TabletSlidingWindowAccessStats.recordTablet(getId());
return getNormalReplicaBackendPathMapImpl(null, (rep, be) ->
rep.getBackendId());
}
@@ -222,6 +223,7 @@ public abstract class Tablet {
List<Replica> mayMissingVersionReplica =
Lists.newArrayListWithCapacity(replicaNum);
List<Replica> notCatchupReplica =
Lists.newArrayListWithCapacity(replicaNum);
List<Replica> userDropReplica =
Lists.newArrayListWithCapacity(replicaNum);
+ TabletSlidingWindowAccessStats.recordTablet(getId());
for (Replica replica : replicas) {
if (replica.isBad()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletSlidingWindowAccessStats.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletSlidingWindowAccessStats.java
new file mode 100644
index 00000000000..90da80dd1af
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletSlidingWindowAccessStats.java
@@ -0,0 +1,555 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+/**
+ * Sliding window access statistics utility class.
+ * Supports tracking access statistics for different types of IDs (tablet,
replica, backend, etc.)
+ */
+public class TabletSlidingWindowAccessStats {
+ private static final Logger LOG =
LogManager.getLogger(TabletSlidingWindowAccessStats.class);
+
+ private static volatile TabletSlidingWindowAccessStats instance;
+
+ private static final HashFunction SHARD_HASH = Hashing.murmur3_128();
+
+ // Sort active IDs by accessCount desc, then lastAccessTime desc
+ private static final Comparator<AccessStatsResult> TOPN_ACTIVE_COMPARATOR =
+ Comparator.comparingLong((AccessStatsResult r) ->
r.accessCount).reversed()
+ .thenComparing(Comparator.comparingLong((AccessStatsResult
r) -> r.lastAccessTime).reversed());
+
+ // Time window in milliseconds (default: 1 hour)
+ private final long timeWindowMs;
+
+ // Bucket size in milliseconds (default: 1 minute)
+ // The time window is divided into multiple buckets, each bucket stores
access count for a time period.
+ // For example: if timeWindowMs=1hour and bucketSizeMs=1minute, there will
be 60 buckets.
+ // Smaller bucket size = more accurate statistics but more memory usage.
+ private final long bucketSizeMs;
+
+ // Number of buckets in the sliding window
+ private final int numBuckets;
+
+ // Cleanup interval in milliseconds (default: 5 minutes)
+ private final long cleanupIntervalMs;
+
+ // Shard size to reduce lock contention
+ private static final int SHARD_SIZE = 1024;
+
+ /**
+ * Sliding window counter for a single replica/tablet
+ */
+ private static class SlidingWindowCounter {
+ // Each bucket stores count for a time period
+ private final AtomicLongArray buckets;
+ // Timestamp of each bucket (to detect expired buckets)
+ private final AtomicLongArray bucketTimestamps;
+ // Last access time (for TopN sorting)
+ private volatile long lastAccessTime = 0;
+ // Total count in current window (cached for performance)
+ private volatile long cachedTotalCount = 0;
+ private volatile long cachedCountTime = 0;
+
+ SlidingWindowCounter(int numBuckets) {
+ this.buckets = new AtomicLongArray(numBuckets);
+ this.bucketTimestamps = new AtomicLongArray(numBuckets);
+ }
+
+ /**
+ * Get current bucket index based on current time
+ */
+ private int getBucketIndex(long currentTimeMs, long bucketSizeMs, int
numBuckets) {
+ return (int) ((currentTimeMs / bucketSizeMs) % numBuckets);
+ }
+
+ /**
+ * Add an access count
+ */
+ void add(long currentTimeMs, long bucketSizeMs, int numBuckets) {
+ int bucketIndex = getBucketIndex(currentTimeMs, bucketSizeMs,
numBuckets);
+ long bucketStartTime = (currentTimeMs / bucketSizeMs) *
bucketSizeMs;
+
+ // Check if this bucket is expired (belongs to a different time
window)
+ long bucketTimestamp = bucketTimestamps.get(bucketIndex);
+ if (bucketTimestamp != bucketStartTime) {
+ // Reset expired bucket
+ buckets.set(bucketIndex, 0);
+ bucketTimestamps.set(bucketIndex, bucketStartTime);
+ }
+
+ // Increment count
+ buckets.addAndGet(bucketIndex, 1);
+ lastAccessTime = currentTimeMs;
+ cachedTotalCount = -1; // Invalidate cache
+ }
+
+ /**
+ * Get total count within the time window
+ */
+ long getCount(long currentTimeMs, long timeWindowMs, long
bucketSizeMs, int numBuckets) {
+ // Use cached value if still valid (within 1 second)
+ if (cachedTotalCount >= 0 && (currentTimeMs - cachedCountTime) <
1000) {
+ return cachedTotalCount;
+ }
+
+ long windowStart = currentTimeMs - timeWindowMs;
+ long count = 0;
+
+ for (int i = 0; i < numBuckets; i++) {
+ long bucketTimestamp = bucketTimestamps.get(i);
+ if (bucketTimestamp >= windowStart && bucketTimestamp > 0) {
+ count += buckets.get(i);
+ }
+ }
+
+ cachedTotalCount = count;
+ cachedCountTime = currentTimeMs;
+ return count;
+ }
+
+ long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ /**
+ * Clean up expired buckets
+ */
+ void cleanup(long currentTimeMs, long timeWindowMs) {
+ long windowStart = currentTimeMs - timeWindowMs;
+ for (int i = 0; i < buckets.length(); i++) {
+ long bucketTimestamp = bucketTimestamps.get(i);
+ if (bucketTimestamp > 0 && bucketTimestamp < windowStart) {
+ buckets.set(i, 0);
+ bucketTimestamps.set(i, 0);
+ }
+ }
+ cachedTotalCount = -1; // Invalidate cache
+ }
+
+ /**
+ * Check if this counter has any recent activity
+ */
+ boolean hasRecentActivity(long currentTimeMs, long timeWindowMs) {
+ return lastAccessTime >= (currentTimeMs - timeWindowMs);
+ }
+ }
+
+ /**
+ * Shard structure to reduce lock contention
+ */
+ private static class AccessStatsShard {
+ // ID counters: id -> SlidingWindowCounter
+ private final ConcurrentHashMap<Long, SlidingWindowCounter> idCounters
= new ConcurrentHashMap<>();
+ }
+
+ // Sharded access stats to reduce lock contention
+ private final AccessStatsShard[] shards = new AccessStatsShard[SHARD_SIZE];
+
+ // Access counter for monitoring
+ private final AtomicLong totalAccessCount = new AtomicLong(0);
+
+ // Aggregated stats cache for metrics/observability
+ // - recentAccessCountInWindow: sum of accessCount of all active IDs in
current time window
+ // - activeIdsInWindow: number of IDs that have recent activity in current
time window
+ // These are computed on-demand with a TTL to avoid expensive full scans
on every metric scrape.
+ private static final long AGGREGATE_REFRESH_INTERVAL_MS = 10_000L;
+ private final AtomicLong recentAccessCountInWindow = new AtomicLong(0);
+ private final AtomicLong activeIdsInWindow = new AtomicLong(0);
+ private final AtomicLong lastAggregateRefreshTimeMs = new AtomicLong(0);
+
+ // Cleanup daemon
+ private AccessStatsCleanupDaemon cleanupDaemon;
+
+ // Thread pool for async recordAccess execution
+ private ThreadPoolExecutor asyncExecutor;
+
+ // Default bucket size: 1 minute (60 buckets for 1 hour window)
+ private static final long DEFAULT_BUCKET_SIZE_SECOND = 60L;
+
+ // Default cleanup interval: 5 minutes
+ private static final long DEFAULT_CLEANUP_INTERVAL_SECOND = 300L;
+
+ TabletSlidingWindowAccessStats() {
+ long configuredWindowSecond =
Config.active_tablet_sliding_window_time_window_second;
+ long effectiveWindowSecond = Math.max(1L, configuredWindowSecond);
+ if (configuredWindowSecond < DEFAULT_BUCKET_SIZE_SECOND) {
+ LOG.warn("active_tablet_sliding_window_time_window_second={} is
less than default bucket size {}, "
+ + "using one bucket to avoid zero-bucket window",
+ configuredWindowSecond, DEFAULT_BUCKET_SIZE_SECOND);
+ }
+ this.timeWindowMs = effectiveWindowSecond * 1000L;
+ this.bucketSizeMs = DEFAULT_BUCKET_SIZE_SECOND * 1000L;
+ this.numBuckets = (int) Math.max(1L, effectiveWindowSecond /
DEFAULT_BUCKET_SIZE_SECOND);
+ this.cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_SECOND * 1000L;
+
+ // Initialize shards
+ for (int i = 0; i < SHARD_SIZE; i++) {
+ shards[i] = new AccessStatsShard();
+ }
+
+ // Start cleanup daemon and async executor if enabled
+ if (Config.enable_active_tablet_sliding_window_access_stats) {
+ this.cleanupDaemon = new AccessStatsCleanupDaemon();
+ this.cleanupDaemon.start();
+ // Initialize async executor for recordAccess
+ // Use a small thread pool with bounded queue to avoid blocking
+ // If queue is full, discard the task (statistics can tolerate
some loss)
+ this.asyncExecutor = new ThreadPoolExecutor(
+ 2, // core pool size
+ 4, // maximum pool size
+ 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(1000), // queue capacity
+ r -> {
+ Thread t = new Thread(r,
"sliding-window-access-stats-async-tablet-record");
+ t.setDaemon(true);
+ return t;
+ },
+ new ThreadPoolExecutor.DiscardPolicy() // discard when
queue is full
+ );
+
+ LOG.info("SlidingWindowAccessStats initialized for type tablet:
timeWindow={}ms, bucketSize={}ms, "
+ + "numBuckets={}, shardSize={}, cleanupInterval={}ms",
+ timeWindowMs, bucketSizeMs, numBuckets, SHARD_SIZE,
cleanupIntervalMs);
+ }
+ }
+
+ /**
+ * Get shard index for a given ID
+ */
+ private int getShardIndex(long id) {
+ int hash = SHARD_HASH.hashLong(id).asInt();
+ return Math.floorMod(hash, SHARD_SIZE);
+ }
+
+ private void refreshAggregatesIfNeeded(long currentTimeMs) {
+ long last = lastAggregateRefreshTimeMs.get();
+ if (currentTimeMs - last < AGGREGATE_REFRESH_INTERVAL_MS) {
+ return;
+ }
+ if (!lastAggregateRefreshTimeMs.compareAndSet(last, currentTimeMs)) {
+ return;
+ }
+
+ int activeIds = 0;
+ long totalAccess = 0;
+ for (AccessStatsShard shard : shards) {
+ for (SlidingWindowCounter counter : shard.idCounters.values()) {
+ if (counter.hasRecentActivity(currentTimeMs, timeWindowMs)) {
+ activeIds++;
+ totalAccess += counter.getCount(currentTimeMs,
timeWindowMs, bucketSizeMs, numBuckets);
+ }
+ }
+ }
+ activeIdsInWindow.set(activeIds);
+ recentAccessCountInWindow.set(totalAccess);
+ }
+
+ /**
+ * Get total access count within current time window across all IDs
(cached).
+ */
+ public long getRecentAccessCountInWindow() {
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return 0L;
+ }
+ long now = System.currentTimeMillis();
+ refreshAggregatesIfNeeded(now);
+ return recentAccessCountInWindow.get();
+ }
+
+ /**
+ * Get number of active IDs within current time window (cached).
+ */
+ public long getActiveIdsInWindow() {
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return 0L;
+ }
+ long now = System.currentTimeMillis();
+ refreshAggregatesIfNeeded(now);
+ return activeIdsInWindow.get();
+ }
+
+ /**
+ * Get total access count since FE start (monotonic increasing while
enabled).
+ */
+ public long getTotalAccessCount() {
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return 0L;
+ }
+ return totalAccessCount.get();
+ }
+
+ /**
+ * Record an access asynchronously
+ * This method is non-blocking and should be used in high-frequency call
paths
+ * to avoid blocking the caller thread.
+ */
+ public void recordAccessAsync(long id) {
+ if (asyncExecutor == null) {
+ return;
+ }
+
+ try {
+ asyncExecutor.execute(() -> {
+ try {
+ recordAccess(id);
+ } catch (Exception e) {
+ // Log but don't propagate exception to avoid affecting
caller
+ LOG.warn("Failed to record access asynchronously for
tablet id={}", id, e);
+ }
+ });
+ } catch (Exception e) {
+ // If executor is shutdown or queue is full, silently ignore
+ // Statistics can tolerate some loss
+ LOG.warn("Failed to submit async recordAccess task for tablet
id={}", id, e);
+ }
+ }
+
+ /**
+ * Record an access
+ */
+ public void recordAccess(long id) {
+ long currentTime = System.currentTimeMillis();
+ int shardIndex = getShardIndex(id);
+ AccessStatsShard shard = shards[shardIndex];
+
+ SlidingWindowCounter counter = shard.idCounters.computeIfAbsent(id,
+ k -> new SlidingWindowCounter(numBuckets));
+ counter.add(currentTime, bucketSizeMs, numBuckets);
+ totalAccessCount.incrementAndGet();
+ }
+
+ /**
+ * Get access count for an ID within the time window
+ */
+ public AccessStatsResult getAccessInfo(long id) {
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return null;
+ }
+
+ int shardIndex = getShardIndex(id);
+ AccessStatsShard shard = shards[shardIndex];
+ SlidingWindowCounter counter = shard.idCounters.get(id);
+
+ if (counter == null) {
+ return null;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ return new AccessStatsResult(
+ id,
+ counter.getCount(currentTime, timeWindowMs, bucketSizeMs,
numBuckets),
+ counter.getLastAccessTime());
+ }
+
+ /**
+ * Result for top N query
+ */
+ public static class AccessStatsResult {
+ public final long id;
+ public final long accessCount;
+ public final long lastAccessTime;
+
+ public AccessStatsResult(long id, long accessCount, long
lastAccessTime) {
+ this.id = id;
+ this.accessCount = accessCount;
+ this.lastAccessTime = lastAccessTime;
+ }
+
+ @Override
+ public String toString() {
+ return "AccessStatsResult{"
+ + "id=" + id
+ + ", accessCount=" + accessCount
+ + ", lastAccessTime=" + lastAccessTime
+ + '}';
+ }
+ }
+
+ /**
+ * Get top N most active IDs
+ * Uses a min-heap (PriorityQueue) to maintain TopN efficiently without
sorting all results.
+ */
+ public List<AccessStatsResult> getTopNActive(int topN) {
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return Collections.emptyList();
+ }
+
+ if (topN <= 0) {
+ return Collections.emptyList();
+ }
+
+ long currentTime = System.currentTimeMillis();
+ // Use a min-heap with reversed comparator to maintain TopN
+ // The heap keeps the smallest element at the top, so we can
efficiently replace it
+ // when we find a larger element
+ PriorityQueue<AccessStatsResult> minHeap = new PriorityQueue<>(
+ topN + 1, // Initial capacity: topN + 1 to avoid resizing
+ Collections.reverseOrder(TOPN_ACTIVE_COMPARATOR) // Reversed:
min-heap for TopN
+ );
+
+ // Collect from all shards and maintain TopN using min-heap
+ for (AccessStatsShard shard : shards) {
+ for (Map.Entry<Long, SlidingWindowCounter> entry :
shard.idCounters.entrySet()) {
+ long id = entry.getKey();
+ SlidingWindowCounter counter = entry.getValue();
+
+ // Skip if no recent activity
+ if (!counter.hasRecentActivity(currentTime, timeWindowMs)) {
+ continue;
+ }
+
+ long accessCount = counter.getCount(currentTime, timeWindowMs,
bucketSizeMs, numBuckets);
+ if (accessCount > 0) {
+ AccessStatsResult result = new AccessStatsResult(id,
accessCount, counter.getLastAccessTime());
+
+ if (minHeap.size() < topN) {
+ // Heap not full, directly add
+ minHeap.offer(result);
+ } else {
+ // Heap is full, compare with the smallest element
(heap top)
+ // If current element is larger, replace the heap top
+ if (TOPN_ACTIVE_COMPARATOR.compare(result,
minHeap.peek()) > 0) {
+ minHeap.poll();
+ minHeap.offer(result);
+ }
+ }
+ }
+ }
+ }
+
+ // Convert heap to list and sort in descending order (TopN)
+ List<AccessStatsResult> results = new ArrayList<>(minHeap);
+ results.sort(TOPN_ACTIVE_COMPARATOR);
+ return results;
+ }
+
+ /**
+ * Clean up expired access records
+ */
+ private void cleanupExpiredRecords() {
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ int cleaned = 0;
+
+ // Clean each shard
+ for (AccessStatsShard shard : shards) {
+ // Clean ID counters
+ for (Map.Entry<Long, SlidingWindowCounter> entry :
shard.idCounters.entrySet()) {
+ SlidingWindowCounter counter = entry.getValue();
+ counter.cleanup(currentTime, timeWindowMs);
+
+ if (!counter.hasRecentActivity(currentTime, timeWindowMs)) {
+ shard.idCounters.remove(entry.getKey());
+ cleaned++;
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled() && cleaned > 0) {
+ LOG.debug("Cleaned up {} expired access records for type tablet",
cleaned);
+ }
+ }
+
+ /**
+ * Get statistics summary
+ */
+ public String getStatsSummary() {
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return String.format("Active tablet sliding window access stats is
disabled");
+ }
+
+ long currentTime = System.currentTimeMillis();
+ refreshAggregatesIfNeeded(currentTime);
+ int activeIds = (int) getActiveIdsInWindow();
+ long totalAccess = getRecentAccessCountInWindow();
+
+ return String.format(
+ "SlidingWindowAccessStats{type=tablet, timeWindow=%ds,
bucketSize=%ds, numBuckets=%d, "
+ + "shardSize=%d, activeIds=%d, "
+ + "totalAccess=%d, totalAccessCount=%d}",
+ timeWindowMs / 1000, bucketSizeMs / 1000, numBuckets, SHARD_SIZE,
+ activeIds, totalAccess, totalAccessCount.get());
+ }
+
+ /**
+ * Cleanup daemon for expired records
+ */
+ private class AccessStatsCleanupDaemon extends MasterDaemon {
+ public AccessStatsCleanupDaemon() {
+ super("sliding-window-access-stats-cleanup-tablet" +
cleanupIntervalMs);
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return;
+ }
+
+ try {
+ cleanupExpiredRecords();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("tablet stat = {}, top 10 active = {}",
+ getStatsSummary(), getTopNActive(10));
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to cleanup expired access records for type
tablet", e);
+ }
+ }
+ }
+
+ public static TabletSlidingWindowAccessStats getInstance() {
+ if (instance == null) {
+ synchronized (TabletSlidingWindowAccessStats.class) {
+ if (instance == null) {
+ instance = new TabletSlidingWindowAccessStats();
+ }
+ }
+ }
+ return instance;
+ }
+
+ // async record tablet instance access
+ public static void recordTablet(long id) {
+ TabletSlidingWindowAccessStats sas = getInstance();
+ sas.recordAccessAsync(id);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
index 5fdc53a6c55..c1a3ed71290 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -78,7 +79,8 @@ public class CloudTablet extends Tablet implements
GsonPostProcessable {
}
public Multimap<Long, Long> getNormalReplicaBackendPathMap(String
beEndpoint) throws UserException {
- Multimap<Long, Long> pathMap =
getNormalReplicaBackendPathMapImpl(beEndpoint,
+ TabletSlidingWindowAccessStats.recordTablet(getId());
+ Multimap<Long, Long> pathMap =
super.getNormalReplicaBackendPathMapImpl(beEndpoint,
(rep, be) -> ((CloudReplica) rep).getBackendId(be));
return backendPathMapReprocess(pathMap);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index afe33d62c4e..8b1916eeb6a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
import org.apache.doris.cloud.persist.UpdateCloudReplicaInfo;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
@@ -59,6 +60,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -114,6 +116,16 @@ public class CloudTabletRebalancer extends MasterDaemon {
private boolean tableBalanced = true;
+ // Scheduling phase for active-tablet priority scheduling.
+ // ACTIVE_ONLY: only schedule objects (partition/table) that have
activeCnt > 0 (non-internal).
+ // INACTIVE_ONLY: schedule objects that are not in ACTIVE_ONLY set, with
internal db objects always last.
+ // ALL: schedule all objects (keeps internal db last when priority
scheduling enabled).
+ private enum ActiveSchedulePhase {
+ ACTIVE_ONLY,
+ INACTIVE_ONLY,
+ ALL
+ }
+
private volatile boolean inited = false;
private LinkedBlockingQueue<Pair<Long, Long>> tabletsMigrateTasks = new
LinkedBlockingQueue<Pair<Long, Long>>();
@@ -136,6 +148,53 @@ public class CloudTabletRebalancer extends MasterDaemon {
private BalanceTypeEnum globalBalanceTypeEnum =
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
+ private Set<Long> activeTabletIds = new HashSet<>();
+
+ // cache for scheduling order in one daemon run (rebuilt in statRouteInfo)
+ // table/partition active count is computed from activeTabletIds
+ private volatile Map<Long, Long> tableIdToActiveCount = new
ConcurrentHashMap<>();
+ private volatile Map<Long, Long> partitionIdToActiveCount = new
ConcurrentHashMap<>();
+ private volatile Map<Long, Long> dbIdToActiveCount = new
ConcurrentHashMap<>();
+ private volatile Map<Long, Long> tableIdToDbId = new ConcurrentHashMap<>();
+ private volatile Map<Long, Long> partitionIdToDbId = new
ConcurrentHashMap<>();
+ // run-level cache: dbId -> isInternalDb (rebuilt in statRouteInfo)
+ private volatile Map<Long, Boolean> dbIdToInternal = new
ConcurrentHashMap<>();
+ private static final Set<String> INTERNAL_DB_NAMES =
Sets.newHashSet("__internal_schema", "information_schema");
+
+ private static final class LocationKey {
+ private final long dbId;
+ private final long tableId;
+ private final long partitionId;
+ private final long indexId;
+
+ private LocationKey(long dbId, long tableId, long partitionId, long
indexId) {
+ this.dbId = dbId;
+ this.tableId = tableId;
+ this.partitionId = partitionId;
+ this.indexId = indexId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof LocationKey)) {
+ return false;
+ }
+ LocationKey that = (LocationKey) o;
+ return dbId == that.dbId
+ && tableId == that.tableId
+ && partitionId == that.partitionId
+ && indexId == that.indexId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbId, tableId, partitionId, indexId);
+ }
+ }
+
/**
* Get the current balance type for a compute group, falling back to
global balance type if not found
*/
@@ -451,6 +510,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
LOG.info("cloud tablet rebalance begin");
long start = System.currentTimeMillis();
+ activeTabletIds = getActiveTabletIds();
globalBalanceTypeEnum =
BalanceTypeEnum.getCloudWarmUpForRebalanceTypeEnum();
buildClusterToBackendMap();
@@ -481,7 +541,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
balanceEnd += (Config.cloud_tablet_rebalancer_interval_second +
10L) * 1000L;
}
if (balanceEnd - start >
Config.cloud_tablet_rebalancer_interval_second * 1000L) {
- sleepSeconds = 0L;
+ sleepSeconds = 1L;
}
setInterval(sleepSeconds * 1000L);
LOG.info("finished to rebalancer. cost: {} ms, rebalancer sche
interval {} s",
@@ -524,103 +584,182 @@ public class CloudTabletRebalancer extends MasterDaemon
{
// lead to ineffective scheduling. Specifically, `global` scheduling
might place multiple tablets belonging
// to the same table or partition onto the same BE, while `partition`
scheduling later requires these tablets
// to be dispersed across different BEs, resulting in unnecessary
scheduling.
- if (Config.enable_cloud_partition_balance) {
- balanceAllPartitions();
- }
- if (Config.enable_cloud_table_balance && indexBalanced) {
- balanceAllTables();
- }
- if (Config.enable_cloud_global_balance && indexBalanced &&
tableBalanced) {
- globalBalance();
+ if (!Config.enable_cloud_active_tablet_priority_scheduling) {
+ // Legacy scheduling: schedule the full set.
+ if (Config.enable_cloud_partition_balance) {
+ balanceAllPartitionsByPhase(ActiveSchedulePhase.ALL);
+ }
+ if (Config.enable_cloud_table_balance && indexBalanced) {
+ balanceAllTablesByPhase(ActiveSchedulePhase.ALL);
+ }
+ if (Config.enable_cloud_global_balance && indexBalanced &&
tableBalanced) {
+ globalBalance();
+ }
+ } else {
+ // When enabled, do a real two-phase scheduling:
+ // Phase 1: schedule only active partitions/tables first.
+ // If all active objects are balanced in this run, enter Phase 2:
+ // schedule remaining (all - active) objects.
+ boolean activeBalanced = true;
+
+ // Phase 1: active-only
+ boolean activeIndexBalanced = true;
+ boolean activeTableBalanced = true;
+ if (Config.enable_cloud_partition_balance) {
+ activeIndexBalanced =
balanceAllPartitionsByPhase(ActiveSchedulePhase.ACTIVE_ONLY);
+ }
+ if (Config.enable_cloud_table_balance && activeIndexBalanced) {
+ activeTableBalanced =
balanceAllTablesByPhase(ActiveSchedulePhase.ACTIVE_ONLY);
+ }
+
+ activeBalanced = (!Config.enable_cloud_partition_balance ||
activeIndexBalanced)
+ && (!Config.enable_cloud_table_balance ||
activeTableBalanced);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("active scheduling phase done:
activeIndexBalanced={}, activeTableBalanced={}, "
+ + "activeBalanced={}, clusterNum={}",
+ activeIndexBalanced, activeTableBalanced,
activeBalanced, clusterToBes.size());
+ }
+
+ if (!activeBalanced) {
+ // Active objects are not balanced yet; skip phase2 to avoid
diluting scheduling budget.
+ return;
+ }
+
+ // Phase 2: inactive (all - active), then global if enabled and
ready.
+ boolean phase2IndexBalanced = true;
+ boolean phase2TableBalanced = true;
+ if (Config.enable_cloud_partition_balance) {
+ phase2IndexBalanced =
balanceAllPartitionsByPhase(ActiveSchedulePhase.INACTIVE_ONLY);
+ }
+ if (Config.enable_cloud_table_balance && phase2IndexBalanced) {
+ phase2TableBalanced =
balanceAllTablesByPhase(ActiveSchedulePhase.INACTIVE_ONLY);
+ }
+ if (Config.enable_cloud_global_balance && phase2IndexBalanced &&
phase2TableBalanced) {
+ globalBalance();
+ }
}
}
- public void balanceAllPartitions() {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
- LOG.info("before partition balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
- }
+ private boolean balanceAllPartitionsByPhase(ActiveSchedulePhase phase) {
+ // Reuse existing "balanced" flags as a per-phase signal.
+ indexBalanced = true;
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
- LOG.info("before partition balance be {} tablet num(current + pre
heating inflight) {}",
- entry.getKey(), entry.getValue().size());
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ LOG.debug("before partition balance({}) be {} tablet num {}",
+ phase, entry.getKey(), entry.getValue().size());
+ }
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ LOG.debug("before partition balance({}) be {} tablet
num(current + pre heating inflight) {}",
+ phase, entry.getKey(), entry.getValue().size());
+ }
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
- // balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
- balanceInPartition(entry.getValue(), entry.getKey(), infos);
+ balanceInPartition(entry.getValue(), entry.getKey(), infos, phase);
+ }
+ // In warmup mode (ASYNC_WARMUP / SYNC_WARMUP), balanceImpl goes
through preheatAndUpdateTablet
+ // which only updates future maps and enqueues warmup tasks without
adding to infos.
+ // So infos can be empty even when balance work was done. Use
indexBalanced (set to false by
+ // updateBalanceStatus inside balanceImpl when warmup moves succeed)
to reflect the real state.
+ if (infos.isEmpty()) {
+ LOG.info("partition balance({}) done, infos empty (warmup or
already balanced), indexBalanced={}",
+ phase, indexBalanced);
+ return indexBalanced;
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.PARTITION);
- LOG.info("collect to editlog partitions before size={} after size={}
infos", oldSize, infos.size());
+ LOG.info("partition balance({}) collect to editlog before size={}
after size={} infos, indexBalanced={}",
+ phase, oldSize, infos.size(), indexBalanced);
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
- // edit log failed, try next time
- return;
- }
-
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
- LOG.info("after partition balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
+ return false;
}
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
- LOG.info("after partition balance be {} tablet num(current + pre
heating inflight) {}",
- entry.getKey(), entry.getValue().size());
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ LOG.debug("after partition balance({}) be {} tablet num {}",
+ phase, entry.getKey(), entry.getValue().size());
+ }
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ LOG.debug("after partition balance({}) be {} tablet
num(current + pre heating inflight) {}",
+ phase, entry.getKey(), entry.getValue().size());
+ }
}
+ return indexBalanced;
}
- public void balanceAllTables() {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
- LOG.info("before table balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
- }
+ private boolean balanceAllTablesByPhase(ActiveSchedulePhase phase) {
+ tableBalanced = true;
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
- LOG.info("before table balance be {} tablet num(current + pre
heating inflight) {}",
- entry.getKey(), entry.getValue().size());
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ LOG.debug("before table balance({}) be {} tablet num {}",
+ phase, entry.getKey(), entry.getValue().size());
+ }
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ LOG.debug("before table balance({}) be {} tablet num(current +
pre heating inflight) {}",
+ phase, entry.getKey(), entry.getValue().size());
+ }
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
- // balance in partitions/index
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
- balanceInTable(entry.getValue(), entry.getKey(), infos);
+ balanceInTable(entry.getValue(), entry.getKey(), infos, phase);
+ }
+ // Same as balanceAllPartitionsByPhase: in warmup mode infos stays
empty even when
+ // warmup tasks were scheduled. Use tableBalanced to reflect the real
state.
+ if (infos.isEmpty()) {
+ LOG.info("table balance({}) done, infos empty (warmup or already
balanced), tableBalanced={}",
+ phase, tableBalanced);
+ return tableBalanced;
}
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.TABLE);
- LOG.info("collect to editlog table before size={} after size={}
infos", oldSize, infos.size());
+ LOG.info("table balance({}) collect to editlog before size={} after
size={} infos, tableBalanced={}",
+ phase, oldSize, infos.size(), tableBalanced);
try {
Env.getCurrentEnv().getEditLog().logUpdateCloudReplicas(infos);
} catch (Exception e) {
LOG.warn("failed to update cloud replicas", e);
- // edit log failed, try next time
- return;
- }
-
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
- LOG.info("after table balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
+ return false;
}
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
- LOG.info("after table balance be {} tablet num(current + pre
heating inflight) {}",
- entry.getKey(), entry.getValue().size());
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ LOG.debug("after table balance({}) be {} tablet num {}",
+ phase, entry.getKey(), entry.getValue().size());
+ }
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ LOG.debug("after table balance({}) be {} tablet num(current +
pre heating inflight) {}",
+ phase, entry.getKey(), entry.getValue().size());
+ }
}
+ return tableBalanced;
}
public void globalBalance() {
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
- LOG.info("before global balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
- }
-
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
- LOG.info("before global balance be {} tablet num(current + pre
heating inflight) {}",
- entry.getKey(), entry.getValue().size());
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ LOG.debug("before global balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
+ }
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ LOG.debug("before global balance be {} tablet num(current +
pre heating inflight) {}",
+ entry.getKey(), entry.getValue().size());
+ }
}
List<UpdateCloudReplicaInfo> infos = new ArrayList<>();
for (Map.Entry<String, List<Long>> entry : clusterToBes.entrySet()) {
balanceImpl(entry.getValue(), entry.getKey(),
futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
}
+ if (infos.isEmpty()) {
+ return;
+ }
long oldSize = infos.size();
infos = batchUpdateCloudReplicaInfoEditlogs(infos, StatType.GLOBAL);
LOG.info("collect to editlog global before size={} after size={}
infos", oldSize, infos.size());
@@ -632,13 +771,14 @@ public class CloudTabletRebalancer extends MasterDaemon {
return;
}
- for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
- LOG.info("after global balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
- }
-
- for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
- LOG.info("after global balance be {} tablet num(current + pre
heating inflight) {}",
- entry.getKey(), entry.getValue().size());
+ if (LOG.isDebugEnabled()) {
+ for (Map.Entry<Long, Set<Tablet>> entry :
beToTabletsGlobal.entrySet()) {
+ LOG.debug("after global balance be {} tablet num {}",
entry.getKey(), entry.getValue().size());
+ }
+ for (Map.Entry<Long, Set<Tablet>> entry :
futureBeToTabletsGlobal.entrySet()) {
+ LOG.debug("after global balance be {} tablet num(current + pre
heating inflight) {}",
+ entry.getKey(), entry.getValue().size());
+ }
}
}
@@ -1005,9 +1145,29 @@ public class CloudTabletRebalancer extends MasterDaemon {
beToTabletsInTable = new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>();
futureBeToTabletsInTable = new ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>();
+ // rebuild scheduling caches for this run
+ Map<Long, Long> tmpTableActive = new HashMap<>();
+ Map<Long, Long> tmpPartitionActive = new HashMap<>();
+ Map<Long, Long> tmpDbActive = new HashMap<>();
+ Map<Long, Long> tmpTableToDb = new HashMap<>();
+ Map<Long, Long> tmpPartitionToDb = new HashMap<>();
+ Map<Long, Boolean> tmpDbInternal = new HashMap<>();
+
loopCloudReplica((Database db, Table table, Partition partition,
MaterializedIndex index, String cluster) -> {
boolean isColocated =
Env.getCurrentColocateIndex().isColocateTable(table.getId());
+ tmpTableToDb.put(table.getId(), db.getId());
+ tmpPartitionToDb.put(partition.getId(), db.getId());
+ tmpDbInternal.computeIfAbsent(db.getId(), k -> {
+ String name = db.getFullName();
+ return name != null && INTERNAL_DB_NAMES.contains(name);
+ });
for (Tablet tablet : index.getTablets()) {
+ // active tablet scoring (used for scheduling order)
+ if (activeTabletIds != null && !activeTabletIds.isEmpty() &&
activeTabletIds.contains(tablet.getId())) {
+ tmpTableActive.merge(table.getId(), 1L, Long::sum);
+ tmpPartitionActive.merge(partition.getId(), 1L, Long::sum);
+ tmpDbActive.merge(db.getId(), 1L, Long::sum);
+ }
for (Replica r : tablet.getReplicas()) {
CloudReplica replica = (CloudReplica) r;
if (isColocated) {
@@ -1055,6 +1215,13 @@ public class CloudTabletRebalancer extends MasterDaemon {
futureBeToTabletsGlobal = tmpFutureBeToTabletsGlobal;
beToTabletsGlobalInSecondary = tmpBeToTabletsGlobalInSecondary;
beToColocateTabletsGlobal = tmpBeToColocateTabletsGlobal;
+
+ tableIdToActiveCount = new ConcurrentHashMap<>(tmpTableActive);
+ partitionIdToActiveCount = new ConcurrentHashMap<>(tmpPartitionActive);
+ dbIdToActiveCount = new ConcurrentHashMap<>(tmpDbActive);
+ tableIdToDbId = new ConcurrentHashMap<>(tmpTableToDb);
+ partitionIdToDbId = new ConcurrentHashMap<>(tmpPartitionToDb);
+ dbIdToInternal = new ConcurrentHashMap<>(tmpDbInternal);
}
public void loopCloudReplica(Operator operator) {
@@ -1087,26 +1254,214 @@ public class CloudTabletRebalancer extends
MasterDaemon {
}
}
- public void balanceInPartition(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
- // balance all partition
+
+ private void balanceInPartition(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos,
+ ActiveSchedulePhase phase) {
+ // balance all partition (prefer active partitions/tables, put
internal db at tail)
+ Iterable<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> partitions;
+ if (Config.enable_cloud_active_tablet_priority_scheduling) {
+ final Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> cmp =
+ partitionEntryComparator();
+ // Phase-aware filtering and ordering.
+ // - ACTIVE_ONLY: only non-internal partitions with activeCnt > 0
+ // - INACTIVE_ONLY: all remaining partitions (non-internal
inactive first, internal last)
+ // - ALL: active (TopN first if configured) -> inactive -> internal
+ List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> nonInternalActive =
+ new ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> nonInternalInactive =
+ new ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> internalPartitions =
+ new ArrayList<>();
+
+ for (Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>> e
+ : futurePartitionToTablets.entrySet()) {
+ long partId = e.getKey();
+ boolean internal =
isInternalDbId(partitionIdToDbId.get(partId));
+ long activeCnt = partitionIdToActiveCount.getOrDefault(partId,
0L);
+
+ if (internal) {
+ // internal partitions are always handled at the end (not
in ACTIVE_ONLY).
+ internalPartitions.add(e);
+ continue;
+ }
+
+ if (activeCnt > 0) {
+ nonInternalActive.add(e);
+ } else {
+ nonInternalInactive.add(e);
+ }
+ }
+
+ nonInternalActive.sort(cmp);
+ nonInternalInactive.sort(cmp);
+ internalPartitions.sort(cmp);
+
+ List<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> ordered =
+ new ArrayList<>(futurePartitionToTablets.size());
+ if (phase == ActiveSchedulePhase.ACTIVE_ONLY) {
+ // In ACTIVE_ONLY phase, schedule all active partitions
(already sorted by cmp, most active first)
+ ordered.addAll(nonInternalActive);
+ } else if (phase == ActiveSchedulePhase.INACTIVE_ONLY) {
+ ordered.addAll(nonInternalInactive);
+ ordered.addAll(internalPartitions);
+ } else { // ALL
+ // All active (already sorted by cmp, most active first), then
inactive, then internal
+ ordered.addAll(nonInternalActive);
+ ordered.addAll(nonInternalInactive);
+ ordered.addAll(internalPartitions);
+ }
+
+ partitions = ordered;
+ } else {
+ partitions = futurePartitionToTablets.entrySet();
+ }
+
for (Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Tablet>>>> partitionEntry
- : futurePartitionToTablets.entrySet()) {
+ : partitions) {
Map<Long, ConcurrentHashMap<Long, Set<Tablet>>> indexToTablets =
partitionEntry.getValue();
// balance all index of a partition
- for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry :
indexToTablets.entrySet()) {
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
indexes =
+ new ArrayList<>(indexToTablets.entrySet());
+ // index-level ordering is not critical; keep stable by id
+ indexes.sort(Comparator.comparingLong(Map.Entry::getKey));
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry :
indexes) {
// balance a index
+ // Fast path: this index has no tablets in this cluster, skip
to avoid useless balanceImpl work.
+ if (calculateTotalTablets(bes, entry.getValue()) == 0) {
+ continue;
+ }
balanceImpl(bes, clusterId, entry.getValue(),
BalanceType.PARTITION, infos);
}
}
}
- public void balanceInTable(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos) {
- // balance all tables
- for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry :
futureBeToTabletsInTable.entrySet()) {
+ private void balanceInTable(List<Long> bes, String clusterId,
List<UpdateCloudReplicaInfo> infos,
+ ActiveSchedulePhase phase) {
+ // balance all tables (prefer active tables/dbs, put internal db at
tail)
+ Iterable<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> tables;
+ if (Config.enable_cloud_active_tablet_priority_scheduling) {
+ final Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
Set<Tablet>>>> cmp = tableEntryComparator();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
nonInternalActive = new ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
nonInternalInactive = new ArrayList<>();
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
internalTables = new ArrayList<>();
+
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> e :
futureBeToTabletsInTable.entrySet()) {
+ long tableId = e.getKey();
+ boolean internal = isInternalDbId(tableIdToDbId.get(tableId));
+ long activeCnt = tableIdToActiveCount.getOrDefault(tableId,
0L);
+ if (internal) {
+ internalTables.add(e);
+ continue;
+ }
+ if (activeCnt > 0) {
+ nonInternalActive.add(e);
+ } else {
+ nonInternalInactive.add(e);
+ }
+ }
+
+ nonInternalActive.sort(cmp);
+ nonInternalInactive.sort(cmp);
+ internalTables.sort(cmp);
+
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
ordered =
+ new ArrayList<>(futureBeToTabletsInTable.size());
+ if (phase == ActiveSchedulePhase.ACTIVE_ONLY) {
+ ordered.addAll(nonInternalActive);
+ } else if (phase == ActiveSchedulePhase.INACTIVE_ONLY) {
+ ordered.addAll(nonInternalInactive);
+ ordered.addAll(internalTables);
+ } else { // ALL
+ ordered.addAll(nonInternalActive);
+ ordered.addAll(nonInternalInactive);
+ ordered.addAll(internalTables);
+ }
+
+ tables = ordered;
+ } else {
+ tables = futureBeToTabletsInTable.entrySet();
+ }
+
+ for (Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>> entry :
tables) {
+ // Fast path: this table has no tablets in this cluster, skip.
+ if (calculateTotalTablets(bes, entry.getValue()) == 0) {
+ continue;
+ }
balanceImpl(bes, clusterId, entry.getValue(), BalanceType.TABLE,
infos);
}
}
+ // For unit test: override this method to avoid dependency on Env/internal
catalog.
+ protected boolean isInternalDbId(Long dbId) {
+ if (dbId == null || dbId <= 0) {
+ return false;
+ }
+ Boolean cached = dbIdToInternal.get(dbId);
+ if (cached != null) {
+ return cached;
+ }
+ // Fallback (should be rare): consult catalog and populate cache.
+ Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
+ boolean internal = false;
+ if (db != null) {
+ String name = db.getFullName();
+ internal = name != null && INTERNAL_DB_NAMES.contains(name);
+ }
+ dbIdToInternal.put(dbId, internal);
+ return internal;
+ }
+
+ private Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>>
tableEntryComparator() {
+ return (a, b) -> {
+ Long tableIdA = a.getKey();
+ Long tableIdB = b.getKey();
+ boolean internalA = isInternalDbId(tableIdToDbId.get(tableIdA));
+ boolean internalB = isInternalDbId(tableIdToDbId.get(tableIdB));
+ if (internalA != internalB) {
+ return internalA ? 1 : -1; // internal goes last
+ }
+ long dbActiveA =
dbIdToActiveCount.getOrDefault(tableIdToDbId.get(tableIdA), 0L);
+ long dbActiveB =
dbIdToActiveCount.getOrDefault(tableIdToDbId.get(tableIdB), 0L);
+ int cmpDb = Long.compare(dbActiveB, dbActiveA);
+ if (cmpDb != 0) {
+ return cmpDb;
+ }
+ long activeA = tableIdToActiveCount.getOrDefault(tableIdA, 0L);
+ long activeB = tableIdToActiveCount.getOrDefault(tableIdB, 0L);
+ int cmp = Long.compare(activeB, activeA); // more active first
+ if (cmp != 0) {
+ return cmp;
+ }
+ return Long.compare(tableIdB, tableIdA); // tabletId bigger, newer
first
+ };
+ }
+
+ private Comparator<Map.Entry<Long,
+ ConcurrentHashMap<Long, ConcurrentHashMap<Long, Set<Tablet>>>>>
partitionEntryComparator() {
+ return (a, b) -> {
+ Long partIdA = a.getKey();
+ Long partIdB = b.getKey();
+ boolean internalA = isInternalDbId(partitionIdToDbId.get(partIdA));
+ boolean internalB = isInternalDbId(partitionIdToDbId.get(partIdB));
+ if (internalA != internalB) {
+ return internalA ? 1 : -1; // internal goes last
+ }
+ long dbActiveA =
dbIdToActiveCount.getOrDefault(partitionIdToDbId.get(partIdA), 0L);
+ long dbActiveB =
dbIdToActiveCount.getOrDefault(partitionIdToDbId.get(partIdB), 0L);
+ int cmpDb = Long.compare(dbActiveB, dbActiveA);
+ if (cmpDb != 0) {
+ return cmpDb;
+ }
+ long activeA = partitionIdToActiveCount.getOrDefault(partIdA, 0L);
+ long activeB = partitionIdToActiveCount.getOrDefault(partIdB, 0L);
+ int cmp = Long.compare(activeB, activeA); // more active first
+ if (cmp != 0) {
+ return cmp;
+ }
+ return Long.compare(partIdB, partIdA); // partId bigger, newer
first
+ };
+ }
+
private void sendPreHeatingRpc(Tablet pickedTablet, long srcBe, long
destBe) throws Exception {
sendPreHeatingRpc(Collections.singletonList(pickedTablet.getId()),
srcBe, destBe);
}
@@ -1475,22 +1830,24 @@ public class CloudTabletRebalancer extends MasterDaemon
{
LOG.debug("balance type {}, be num {}, total tablets num {}, avg num
{}, transfer num {}",
currentBalanceType, beNum, totalTabletsNum, avgNum,
transferNum);
+ final Set<Long> pickedTabletIds = new HashSet<>();
+
for (int i = 0; i < transferNum; i++) {
TransferPairInfo pairInfo = new TransferPairInfo();
if (!getTransferPair(bes, beToTablets, avgNum, pairInfo)) {
break; // no need balance
}
- updateBalanceStatus(balanceType);
-
long srcBe = pairInfo.srcBe;
long destBe = pairInfo.destBe;
- Tablet pickedTablet = pickRandomTablet(beToTablets.get(srcBe));
+ Tablet pickedTablet = pickTabletPreferCold(srcBe,
beToTablets.get(srcBe),
+ this.activeTabletIds, pickedTabletIds);
if (pickedTablet == null) {
continue; // No tablet to pick
}
+ pickedTabletIds.add(pickedTablet.getId());
CloudReplica cloudReplica = ((CloudTablet)
pickedTablet).getCloudReplica();
Backend srcBackend = Env.getCurrentSystemInfo().getBackend(srcBe);
@@ -1501,7 +1858,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
if (isConflict(srcBe, destBe, cloudReplica, balanceType,
partitionToTablets, beToTabletsInTable)) {
continue;
}
- transferTablet(pickedTablet, srcBe, destBe, clusterId,
balanceType, infos);
+ boolean moved = transferTablet(pickedTablet, srcBe, destBe,
clusterId, balanceType, infos);
+ if (moved) {
+ updateBalanceStatus(balanceType);
+ }
if
(BalanceTypeEnum.PEER_READ_ASYNC_WARMUP.equals(currentBalanceType)) {
LOG.debug("directly switch {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe,
clusterId);
@@ -1521,7 +1881,11 @@ public class CloudTabletRebalancer extends MasterDaemon {
futurePartitionToTablets, futureBeToTabletsInTable)) {
continue;
}
- preheatAndUpdateTablet(pickedTablet, srcBe, destBe, clusterId,
balanceType, beToTablets);
+ boolean moved = preheatAndUpdateTablet(pickedTablet, srcBe,
destBe,
+ clusterId, balanceType, beToTablets);
+ if (moved) {
+ updateBalanceStatus(balanceType);
+ }
}
}
}
@@ -1553,22 +1917,77 @@ public class CloudTabletRebalancer extends MasterDaemon
{
}
}
- private Tablet pickRandomTablet(Set<Tablet> tablets) {
- if (tablets.isEmpty()) {
+ private Set<Long> getActiveTabletIds() {
+ try {
+ // get topN active tablets
+ List<TabletSlidingWindowAccessStats.AccessStatsResult> active =
+ TabletSlidingWindowAccessStats.getInstance()
+
.getTopNActive(Config.cloud_active_partition_scheduling_topn);
+ if (active == null || active.isEmpty()) {
+ return Collections.emptySet();
+ }
+ Set<Long> ids = new HashSet<>(active.size() * 2);
+ for (TabletSlidingWindowAccessStats.AccessStatsResult r : active) {
+ ids.add(r.id);
+ }
+ return ids;
+ } catch (Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to get active tablets from
CloudTabletAccessStats, fallback to random pick", t);
+ }
+ return Collections.emptySet();
+ }
+ }
+
+ // Choose non-active (cold) tablet first to re-balance, to reduce impact
on hot tablets.
+ // Fallback to active/random if no cold tablet is available.
+ private Tablet pickTabletPreferCold(long srcBe, Set<Tablet> tablets,
Set<Long> activeTabletIds,
+ Set<Long> pickedTabletIds) {
+ if (tablets == null || tablets.isEmpty()) {
return null;
}
- int randomIndex = rand.nextInt(tablets.size());
- return tablets.stream().skip(randomIndex).findFirst().orElse(null);
+ // Prefer cold tablets first (when active stats is available)
+ boolean hasActiveStats = activeTabletIds != null &&
!activeTabletIds.isEmpty();
+ boolean preferCold =
Config.enable_cloud_active_tablet_priority_scheduling && hasActiveStats;
+
+ if (preferCold) {
+ Tablet cold = reservoirPick(tablets, pickedTabletIds,
activeTabletIds, true);
+ if (cold != null) {
+ return cold;
+ }
+ }
+ return reservoirPick(tablets, pickedTabletIds, activeTabletIds, false);
+ }
+
+ // Reservoir sampling to pick one element uniformly at random from
candidates,
+ // without allocating intermediate collections.
+ private Tablet reservoirPick(Set<Tablet> tablets, Set<Long>
pickedTabletIds,
+ Set<Long> activeTabletIds, boolean
requireCold) {
+ Tablet chosen = null;
+ int seen = 0;
+ for (Tablet t : tablets) {
+ if (pickedTabletIds.contains(t.getId())) {
+ continue;
+ }
+ if (requireCold && activeTabletIds != null &&
activeTabletIds.contains(t.getId())) {
+ continue;
+ }
+ seen++;
+ if (rand.nextInt(seen) == 0) {
+ chosen = t;
+ }
+ }
+ return chosen;
}
- private void preheatAndUpdateTablet(Tablet pickedTablet, long srcBe, long
destBe, String clusterId,
+ private boolean preheatAndUpdateTablet(Tablet pickedTablet, long srcBe,
long destBe, String clusterId,
BalanceType balanceType, Map<Long,
Set<Tablet>> beToTablets) {
Backend srcBackend = cloudSystemInfoService.getBackend(srcBe);
Backend destBackend = cloudSystemInfoService.getBackend(destBe);
if (srcBackend == null || destBackend == null) {
LOG.warn("backend missing when preheating tablet {} from {} to {},
cluster {}",
pickedTablet.getId(), srcBe, destBe, clusterId);
- return;
+ return false;
}
InfightTask task = new InfightTask();
@@ -1585,9 +2004,10 @@ public class CloudTabletRebalancer extends MasterDaemon {
futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
LOG.debug("pre cache {} from {} to {}, cluster {}",
pickedTablet.getId(), srcBe, destBe, clusterId);
enqueueWarmupTask(new WarmupTabletTask(pickedTablet, srcBe, destBe,
clusterId));
+ return true;
}
- private void transferTablet(Tablet pickedTablet, long srcBe, long destBe,
String clusterId,
+ private boolean transferTablet(Tablet pickedTablet, long srcBe, long
destBe, String clusterId,
BalanceType balanceType,
List<UpdateCloudReplicaInfo> infos) {
LOG.debug("transfer {} from {} to {}, cluster {}, type {}",
pickedTablet.getId(), srcBe, destBe, clusterId, balanceType);
@@ -1596,6 +2016,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
updateBeToTablets(pickedTablet, srcBe, destBe,
futureBeToTabletsGlobal, futureBeToTabletsInTable,
futurePartitionToTablets);
updateClusterToBeMap(pickedTablet, destBe, clusterId, infos);
+ return true;
}
public void addTabletMigrationTask(Long srcBe, Long dstBe) {
@@ -1708,11 +2129,11 @@ public class CloudTabletRebalancer extends MasterDaemon
{
if (!Strings.isNullOrEmpty(clusterName)) {
MetricRepo.updateClusterCloudBalanceNum(clusterName,
clusterId, type, infoList.size());
}
- Map<Long, List<UpdateCloudReplicaInfo>> sameLocationInfos =
infoList.stream()
+ Map<LocationKey, List<UpdateCloudReplicaInfo>> sameLocationInfos =
infoList.stream()
.collect(Collectors.groupingBy(
- info -> info.getDbId()
- + info.getTableId() + info.getPartitionId() +
info.getIndexId()));
- sameLocationInfos.forEach((location, locationInfos) -> {
+ info -> new LocationKey(info.getDbId(),
info.getTableId(),
+ info.getPartitionId(), info.getIndexId())));
+ sameLocationInfos.forEach((locationKey, locationInfos) -> {
UpdateCloudReplicaInfo newInfo = new UpdateCloudReplicaInfo();
long dbId = -1;
long tableId = -1;
@@ -1727,12 +2148,6 @@ public class CloudTabletRebalancer extends MasterDaemon {
partitionId = info.getPartitionId();
indexId = info.getIndexId();
- StringBuilder sb = new StringBuilder("impossible, some
locations do not match location");
- sb.append(", location=").append(location).append(",
dbId=").append(dbId)
- .append(", tableId=").append(tableId).append(",
partitionId=").append(partitionId)
- .append(", indexId=").append(indexId);
- Preconditions.checkState(location == dbId + tableId +
partitionId + indexId, sb.toString());
-
long tabletId = info.getTabletId();
long replicaId = info.getReplicaId();
long beId = info.getBeId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
index e6f6ab5bb67..0ef67d74383 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ReplicasProcNode.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -51,7 +52,7 @@ public class ReplicasProcNode implements ProcNodeInterface {
.add("IsUserDrop")
.add("VisibleVersionCount").add("VersionCount").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus").add("CooldownReplicaId")
- .add("CooldownMetaId").add("QueryHits");
+
.add("CooldownMetaId").add("QueryHits").add("WindowAccessCount").add("LastAccessTime");
if (Config.isCloudMode()) {
builder.add("PrimaryBackendId");
@@ -91,6 +92,14 @@ public class ReplicasProcNode implements ProcNodeInterface {
}
for (Replica replica : replicas) {
+ TabletSlidingWindowAccessStats.AccessStatsResult asr =
TabletSlidingWindowAccessStats.getInstance()
+ .getAccessInfo(tabletId);
+ long accessCount = 0;
+ long lastAccessTime = 0;
+ if (asr != null) {
+ accessCount = asr.accessCount;
+ lastAccessTime = asr.lastAccessTime;
+ }
long beId = replica.getBackendIdWithoutException();
Backend be = backendMap.get(beId);
String host = (be == null ? Backend.DUMMY_IP : be.getHost());
@@ -138,7 +147,10 @@ public class ReplicasProcNode implements ProcNodeInterface
{
compactionUrl,
String.valueOf(tablet.getCooldownReplicaId()),
cooldownMetaId,
- String.valueOf(queryHits));
+ String.valueOf(queryHits),
+ String.valueOf(accessCount),
+ String.valueOf(lastAccessTime)
+ );
if (Config.isCloudMode()) {
replicaInfo.add(String.valueOf(((CloudReplica)
replica).getPrimaryBackendId()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
index 9a273e150ba..50fb5e98610 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletsProcDir.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
import org.apache.doris.cloud.catalog.CloudReplica;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -56,7 +57,8 @@ public class TabletsProcDir implements ProcDirInterface {
.add("LstSuccessVersion").add("LstFailedVersion").add("LstFailedTime")
.add("LocalDataSize").add("RemoteDataSize").add("RowCount").add("State")
.add("LstConsistencyCheckTime").add("CheckVersion")
-
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("PathHash").add("Path")
+
.add("VisibleVersionCount").add("VersionCount").add("QueryHits").add("WindowAccessCount")
+ .add("LastAccessTime").add("PathHash").add("Path")
.add("MetaUrl").add("CompactionStatus")
.add("CooldownReplicaId").add("CooldownMetaId");
@@ -128,6 +130,8 @@ public class TabletsProcDir implements ProcDirInterface {
tabletInfo.add(-1); // visible version count
tabletInfo.add(-1); // total version count
tabletInfo.add(0L); // query hits
+ tabletInfo.add(0L); // query WindowAccessCount
+ tabletInfo.add(0L); // query LastAccessTime
tabletInfo.add(-1); // path hash
tabletInfo.add(FeConstants.null_string); // path
tabletInfo.add(FeConstants.null_string); // meta url
@@ -140,6 +144,14 @@ public class TabletsProcDir implements ProcDirInterface {
tabletInfos.add(tabletInfo);
} else {
+ TabletSlidingWindowAccessStats.AccessStatsResult asr =
TabletSlidingWindowAccessStats.getInstance()
+ .getAccessInfo(tabletId);
+ long accessCount = 0;
+ long lastAccessTime = 0;
+ if (asr != null) {
+ accessCount = asr.accessCount;
+ lastAccessTime = asr.lastAccessTime;
+ }
for (Replica replica : tablet.getReplicas()) {
long beId = replica.getBackendIdWithoutException();
if ((version > -1 && replica.getVersion() != version)
@@ -167,6 +179,8 @@ public class TabletsProcDir implements ProcDirInterface {
tabletInfo.add(replica.getVisibleVersionCount());
tabletInfo.add(replica.getTotalVersionCount());
tabletInfo.add(replicaIdToQueryHits.getOrDefault(replica.getId(), 0L));
+ tabletInfo.add(String.valueOf(accessCount));
+ tabletInfo.add(String.valueOf(lastAccessTime));
tabletInfo.add(replica.getPathHash());
tabletInfo.add(pathHashToRoot.getOrDefault(replica.getPathHash(), ""));
Backend be = backendMap.get(beId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 86ad7fe7dfa..cd0755fd9da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -21,6 +21,7 @@ import org.apache.doris.alter.Alter;
import org.apache.doris.alter.AlterJobV2.JobType;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
import org.apache.doris.cloud.catalog.CloudTabletRebalancer;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
@@ -81,6 +82,8 @@ public final class MetricRepo {
public static final String TABLET_NUM = "tablet_num";
public static final String TABLET_MAX_COMPACTION_SCORE =
"tablet_max_compaction_score";
+ public static final String TABLET_ACCESS_RECENT = "tablet_access_recent";
+ public static final String TABLET_ACCESS_TOTAL = "tablet_access_total";
public static final String CLOUD_TAG = "cloud";
public static LongCounterMetric COUNTER_REQUEST_ALL;
@@ -114,6 +117,9 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_UPDATE_TABLET_STAT_FAILED;
+ public static GaugeMetric<Long> GAUGE_TABLET_ACCESS_RECENT;
+ public static GaugeMetric<Long> GAUGE_TABLET_ACCESS_TOTAL;
+
public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_EDIT_LOG_CURRENT;
@@ -346,6 +352,37 @@ public final class MetricRepo {
// capacity
generateBackendsTabletMetrics();
+ // tablet sliding window access stats
+ GAUGE_TABLET_ACCESS_RECENT = new
GaugeMetric<Long>(TABLET_ACCESS_RECENT, MetricUnit.REQUESTS,
+ "total tablet access count within sliding window") {
+ @Override
+ public Long getValue() {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return 0L;
+ }
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return 0L;
+ }
+ return
TabletSlidingWindowAccessStats.getInstance().getRecentAccessCountInWindow();
+ }
+ };
+ DORIS_METRIC_REGISTER.addMetrics(GAUGE_TABLET_ACCESS_RECENT);
+
+ GAUGE_TABLET_ACCESS_TOTAL = new GaugeMetric<Long>(TABLET_ACCESS_TOTAL,
MetricUnit.REQUESTS,
+ "total tablet access count since FE start") {
+ @Override
+ public Long getValue() {
+ if (!Env.getCurrentEnv().isMaster()) {
+ return 0L;
+ }
+ if (!Config.enable_active_tablet_sliding_window_access_stats) {
+ return 0L;
+ }
+ return
TabletSlidingWindowAccessStats.getInstance().getTotalAccessCount();
+ }
+ };
+ DORIS_METRIC_REGISTER.addMetrics(GAUGE_TABLET_ACCESS_TOTAL);
+
// connections
USER_GAUGE_CONNECTIONS = addLabeledMetrics("user", () ->
new GaugeMetricImpl<>("connection_total",
MetricUnit.CONNECTIONS,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowTabletIdCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowTabletIdCommand.java
index d8cf395f300..15d83237816 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowTabletIdCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowTabletIdCommand.java
@@ -30,6 +30,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.catalog.TabletSlidingWindowAccessStats;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
@@ -81,6 +82,8 @@ public class ShowTabletIdCommand extends ShowCommand {
builder.addColumn(new Column("IsSync", ScalarType.createVarchar(30)));
builder.addColumn(new Column("Order", ScalarType.createVarchar(30)));
builder.addColumn(new Column("QueryHits",
ScalarType.createVarchar(30)));
+ builder.addColumn(new Column("WindowAccessCount",
ScalarType.createVarchar(30)));
+ builder.addColumn(new Column("LastAccessTime",
ScalarType.createVarchar(30)));
builder.addColumn(new Column("DetailCmd",
ScalarType.createVarchar(30)));
return builder.build();
}
@@ -119,6 +122,14 @@ public class ShowTabletIdCommand extends ShowCommand {
int tabletIdx = -1;
// check real meta
+ TabletSlidingWindowAccessStats.AccessStatsResult asr =
TabletSlidingWindowAccessStats.getInstance()
+ .getAccessInfo(tabletId);
+ long accessCount = 0;
+ long lastAccessTime = 0;
+ if (asr != null) {
+ accessCount = asr.accessCount;
+ lastAccessTime = asr.lastAccessTime;
+ }
do {
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
@@ -191,7 +202,9 @@ public class ShowTabletIdCommand extends ShowCommand {
rows.add(Lists.newArrayList(dbName, tableName, partitionName,
indexName,
dbId.toString(), tableId.toString(),
partitionId.toString(), indexId.toString(),
- isSync.toString(), String.valueOf(tabletIdx),
String.valueOf(queryHits), detailCmd));
+ isSync.toString(), String.valueOf(tabletIdx),
String.valueOf(queryHits),
+ String.valueOf(accessCount), String.valueOf(lastAccessTime),
+ detailCmd));
return new ShowResultSet(getMetaData(), rows);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index d5fddd65c38..1a20ecb3578 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -541,7 +541,7 @@ public class OlapScanNode extends ScanNode {
Replica replica = replicas.get(useFixReplica >=
replicas.size() ? replicas.size() - 1 : useFixReplica);
if
(context.getSessionVariable().fallbackOtherReplicaWhenFixedCorrupt) {
long beId = replica.getBackendId();
- Backend backend = allBackends.get(replica.getBackendId());
+ Backend backend = allBackends.get(beId);
// If the fixed replica is bad, then not clear the
replicas using random replica
if (backend == null || !backend.isAlive()) {
if (LOG.isDebugEnabled()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletSlidingWindowAccessStatsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletSlidingWindowAccessStatsTest.java
new file mode 100644
index 00000000000..d29e7bda0bf
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletSlidingWindowAccessStatsTest.java
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.Config;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * Unit tests for TabletSlidingWindowAccessStatsTest sliding window logic.
+ *
+ * <p>We use reflection to test the private static inner class
SlidingWindowCounter,
+ * focusing on time-window counting, bucket expiration, cleanup and cache
invalidation.
+ */
+public class TabletSlidingWindowAccessStatsTest {
+
+ private static class CounterHarness {
+ private final Object counter;
+ private final Method add;
+ private final Method getCount;
+ private final Method cleanup;
+ private final Method hasRecentActivity;
+ private final Method getLastAccessTime;
+
+ CounterHarness(int numBuckets) throws Exception {
+ Class<?> clazz = Class.forName(
+
"org.apache.doris.catalog.TabletSlidingWindowAccessStats$SlidingWindowCounter");
+ Constructor<?> ctor = clazz.getDeclaredConstructor(int.class);
+ ctor.setAccessible(true);
+ this.counter = ctor.newInstance(numBuckets);
+
+ this.add = clazz.getDeclaredMethod("add", long.class, long.class,
int.class);
+ this.add.setAccessible(true);
+ this.getCount = clazz.getDeclaredMethod("getCount", long.class,
long.class, long.class, int.class);
+ this.getCount.setAccessible(true);
+ this.cleanup = clazz.getDeclaredMethod("cleanup", long.class,
long.class);
+ this.cleanup.setAccessible(true);
+ this.hasRecentActivity =
clazz.getDeclaredMethod("hasRecentActivity", long.class, long.class);
+ this.hasRecentActivity.setAccessible(true);
+ this.getLastAccessTime =
clazz.getDeclaredMethod("getLastAccessTime");
+ this.getLastAccessTime.setAccessible(true);
+ }
+
+ void add(long currentTimeMs, long bucketSizeMs, int numBuckets) throws
Exception {
+ add.invoke(counter, currentTimeMs, bucketSizeMs, numBuckets);
+ }
+
+ long getCount(long currentTimeMs, long timeWindowMs, long
bucketSizeMs, int numBuckets) throws Exception {
+ Object v = getCount.invoke(counter, currentTimeMs, timeWindowMs,
bucketSizeMs, numBuckets);
+ return (long) v;
+ }
+
+ void cleanup(long currentTimeMs, long timeWindowMs) throws Exception {
+ cleanup.invoke(counter, currentTimeMs, timeWindowMs);
+ }
+
+ boolean hasRecentActivity(long currentTimeMs, long timeWindowMs)
throws Exception {
+ Object v = hasRecentActivity.invoke(counter, currentTimeMs,
timeWindowMs);
+ return (boolean) v;
+ }
+
+ long getLastAccessTime() throws Exception {
+ Object v = getLastAccessTime.invoke(counter);
+ return (long) v;
+ }
+ }
+
+ @Test
+ public void testAddAndGetCountWithinWindow() throws Exception {
+ int numBuckets = 5;
+ long bucketSizeMs = 1000L;
+ long timeWindowMs = 5000L;
+ long base = 10_000L; // avoid 0 (bucket timestamp 0 is treated as
"unset" in getCount)
+
+ CounterHarness c = new CounterHarness(numBuckets);
+ c.add(base, bucketSizeMs, numBuckets);
+ c.add(base + 1000L, bucketSizeMs, numBuckets);
+ c.add(base + 2000L, bucketSizeMs, numBuckets);
+
+ Assertions.assertEquals(base + 2000L, c.getLastAccessTime());
+ Assertions.assertTrue(c.hasRecentActivity(base + 2000L, timeWindowMs));
+
+ long cnt = c.getCount(base + 2000L, timeWindowMs, bucketSizeMs,
numBuckets);
+ Assertions.assertEquals(3L, cnt);
+
+ // Cached result (same timestamp) should be consistent
+ long cnt2 = c.getCount(base + 2000L, timeWindowMs, bucketSizeMs,
numBuckets);
+ Assertions.assertEquals(3L, cnt2);
+
+ // add() should invalidate cachedTotalCount
+ c.add(base + 2500L, bucketSizeMs, numBuckets);
+ long cnt3 = c.getCount(base + 2500L, timeWindowMs, bucketSizeMs,
numBuckets);
+ Assertions.assertEquals(4L, cnt3);
+ }
+
+ @Test
+ public void testWindowExpiresOldBuckets() throws Exception {
+ int numBuckets = 5;
+ long bucketSizeMs = 1000L;
+ long timeWindowMs = 5000L;
+ long base = 20_000L;
+
+ CounterHarness c = new CounterHarness(numBuckets);
+ c.add(base, bucketSizeMs, numBuckets);
+ c.add(base + 1000L, bucketSizeMs, numBuckets);
+ c.add(base + 2000L, bucketSizeMs, numBuckets);
+
+ // At base + 6000, window start is base + 1000 => count should be 2
(1000 and 2000 buckets)
+ long cnt1 = c.getCount(base + 6000L, timeWindowMs, bucketSizeMs,
numBuckets);
+ Assertions.assertEquals(2L, cnt1);
+
+ // At base + 7000, window start is base + 2000 => count should be 1
(2000 bucket)
+ long cnt2 = c.getCount(base + 7000L, timeWindowMs, bucketSizeMs,
numBuckets);
+ Assertions.assertEquals(1L, cnt2);
+
+ // recent activity should still be true (lastAccessTime=base+2000
within 5000ms of base+7000? exactly 5000ms)
+ Assertions.assertTrue(c.hasRecentActivity(base + 7000L, timeWindowMs));
+ }
+
+ @Test
+ public void testCleanupRemovesExpiredBuckets() throws Exception {
+ int numBuckets = 5;
+ long bucketSizeMs = 1000L;
+ long timeWindowMs = 5000L;
+ long base = 30_000L;
+
+ CounterHarness c = new CounterHarness(numBuckets);
+ c.add(base, bucketSizeMs, numBuckets);
+ c.add(base + 1000L, bucketSizeMs, numBuckets);
+ c.add(base + 2000L, bucketSizeMs, numBuckets);
+
+ // cleanup at base + 7000: windowStart=base+2000, so base/base+1000
buckets should be cleared
+ c.cleanup(base + 7000L, timeWindowMs);
+ long cnt = c.getCount(base + 7000L, timeWindowMs, bucketSizeMs,
numBuckets);
+ Assertions.assertEquals(1L, cnt);
+ }
+
+ @Test
+ public void testBucketWrapAroundResetsExpiredBucket() throws Exception {
+ int numBuckets = 5;
+ long bucketSizeMs = 1000L;
+ long timeWindowMs = 5000L;
+ long base = 40_000L;
+
+ CounterHarness c = new CounterHarness(numBuckets);
+ // bucket index wraps every numBuckets * bucketSizeMs (5s here)
+ c.add(base, bucketSizeMs, numBuckets); // idx = 0
+ c.add(base + 5000L, bucketSizeMs, numBuckets); // idx = 0 again,
should reset old bucket
+
+ // With current implementation, the old "base" bucket is overwritten,
so only 1 remains.
+ long cnt = c.getCount(base + 5000L, timeWindowMs, bucketSizeMs,
numBuckets);
+ Assertions.assertEquals(1L, cnt);
+ }
+
+ @Test
+ public void testTimeWindowLessThanBucketSizeStillUsesValidBucketCount()
throws Exception {
+ long originWindowSecond =
Config.active_tablet_sliding_window_time_window_second;
+ boolean originEnabled =
Config.enable_active_tablet_sliding_window_access_stats;
+ try {
+ Config.active_tablet_sliding_window_time_window_second = 59L;
+ Config.enable_active_tablet_sliding_window_access_stats = false;
+
+ TabletSlidingWindowAccessStats stats = new
TabletSlidingWindowAccessStats();
+ Field numBucketsField =
TabletSlidingWindowAccessStats.class.getDeclaredField("numBuckets");
+ numBucketsField.setAccessible(true);
+ int numBuckets = (int) numBucketsField.get(stats);
+ Assertions.assertEquals(1, numBuckets);
+
+ // No ArithmeticException should be thrown by modulo numBuckets
path.
+ stats.recordAccess(123L);
+ } finally {
+ Config.active_tablet_sliding_window_time_window_second =
originWindowSecond;
+ Config.enable_active_tablet_sliding_window_access_stats =
originEnabled;
+ }
+ }
+
+ @Test
+ public void testTopNComparatorOrdersByAccessCountThenLastAccessTimeDesc()
throws Exception {
+ Field f =
TabletSlidingWindowAccessStats.class.getDeclaredField("TOPN_ACTIVE_COMPARATOR");
+ f.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Comparator<TabletSlidingWindowAccessStats.AccessStatsResult> cmp =
+ (Comparator<TabletSlidingWindowAccessStats.AccessStatsResult>)
f.get(null);
+
+ List<TabletSlidingWindowAccessStats.AccessStatsResult> results = new
ArrayList<>();
+ results.add(new TabletSlidingWindowAccessStats.AccessStatsResult(1L,
1L, 200L));
+ results.add(new TabletSlidingWindowAccessStats.AccessStatsResult(2L,
10L, 100L));
+ results.add(new TabletSlidingWindowAccessStats.AccessStatsResult(3L,
10L, 300L));
+ results.add(new TabletSlidingWindowAccessStats.AccessStatsResult(4L,
2L, 400L));
+
+ results.sort(cmp);
+
+ // accessCount desc => 10 first, then 2, then 1
+ // tie-breaker lastAccessTime desc => id=3 (300) before id=2 (100)
+ Assertions.assertEquals(3L, results.get(0).id);
+ Assertions.assertEquals(2L, results.get(1).id);
+ Assertions.assertEquals(4L, results.get(2).id);
+ Assertions.assertEquals(1L, results.get(3).id);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
new file mode 100644
index 00000000000..e18bcabd184
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudTabletRebalancerTest.java
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.Config;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CloudTabletRebalancerTest {
+
+ private boolean oldEnableActiveScheduling;
+
+ @BeforeEach
+ public void setUp() {
+ oldEnableActiveScheduling =
Config.enable_cloud_active_tablet_priority_scheduling;
+ Config.enable_cloud_active_tablet_priority_scheduling = true;
+ }
+
+ @AfterEach
+ public void tearDown() {
+ Config.enable_cloud_active_tablet_priority_scheduling =
oldEnableActiveScheduling;
+ }
+
+ private static class TestRebalancer extends CloudTabletRebalancer {
+ private final Set<Long> internalDbIds = new HashSet<>();
+
+ TestRebalancer() {
+ super(null);
+ }
+
+ void setInternalDbIds(Set<Long> ids) {
+ internalDbIds.clear();
+ internalDbIds.addAll(ids);
+ }
+
+ @Override
+ protected boolean isInternalDbId(Long dbId) {
+ return dbId != null && internalDbIds.contains(dbId);
+ }
+ }
+
+ private static void setField(Object obj, String name, Object value) throws
Exception {
+ Field f = CloudTabletRebalancer.class.getDeclaredField(name);
+ f.setAccessible(true);
+ f.set(obj, value);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T invokePrivate(Object obj, String method, Class<?>[]
types, Object[] args) throws Exception {
+ Method m = CloudTabletRebalancer.class.getDeclaredMethod(method,
types);
+ m.setAccessible(true);
+ return (T) m.invoke(obj, args);
+ }
+
+ @Test
+ public void testPickTabletPreferCold_picksColdWhenAvailable() throws
Exception {
+ TestRebalancer r = new TestRebalancer();
+ setField(r, "rand", new Random(1));
+
+ Tablet hot = Mockito.mock(Tablet.class);
+ Mockito.when(hot.getId()).thenReturn(100L);
+ Tablet cold = Mockito.mock(Tablet.class);
+ Mockito.when(cold.getId()).thenReturn(200L);
+
+ Set<Tablet> tablets = new HashSet<>();
+ tablets.add(hot);
+ tablets.add(cold);
+
+ Set<Long> activeIds = new HashSet<>();
+ activeIds.add(100L);
+
+ Set<Long> picked = new HashSet<>();
+
+ Tablet pickedTablet = invokePrivate(r, "pickTabletPreferCold",
+ new Class<?>[] {long.class, Set.class, Set.class, Set.class},
+ new Object[] {1L, tablets, activeIds, picked});
+
+ Assertions.assertNotNull(pickedTablet);
+ Assertions.assertEquals(200L, pickedTablet.getId(), "Should prefer
cold tablet when available");
+ }
+
+ @Test
+ public void testPickTabletPreferCold_fallbackRandomWhenStatsUnavailable()
throws Exception {
+ TestRebalancer r = new TestRebalancer();
+ setField(r, "rand", new Random(1));
+
+ Tablet only = Mockito.mock(Tablet.class);
+ Mockito.when(only.getId()).thenReturn(300L);
+ Set<Tablet> tablets = new HashSet<>();
+ tablets.add(only);
+
+ // active stats unavailable -> activeIds empty or cache null
+ Set<Long> activeIds = new HashSet<>();
+ Set<Long> picked = new HashSet<>();
+
+ Tablet pickedTablet = invokePrivate(r, "pickTabletPreferCold",
+ new Class<?>[] {long.class, Set.class, Set.class, Set.class},
+ new Object[] {1L, tablets, activeIds, picked});
+
+ Assertions.assertNotNull(pickedTablet);
+ Assertions.assertEquals(300L, pickedTablet.getId());
+ }
+
+ @Test
+ public void
testTableEntryComparator_ordersByDbActiveThenTableActiveThenIdDesc() throws
Exception {
+ TestRebalancer r = new TestRebalancer();
+ r.setInternalDbIds(Collections.emptySet()); // no internal db
+
+ // tableId -> dbId
+ Map<Long, Long> tableToDb = new HashMap<>();
+ tableToDb.put(10L, 1L);
+ tableToDb.put(11L, 1L);
+ tableToDb.put(20L, 2L);
+ setField(r, "tableIdToDbId", new ConcurrentHashMap<>(tableToDb));
+
+ // db active
+ Map<Long, Long> dbActive = new HashMap<>();
+ dbActive.put(1L, 5L);
+ dbActive.put(2L, 1L);
+ setField(r, "dbIdToActiveCount", new ConcurrentHashMap<>(dbActive));
+
+ // table active
+ Map<Long, Long> tableActive = new HashMap<>();
+ tableActive.put(10L, 2L);
+ tableActive.put(11L, 2L);
+ tableActive.put(20L, 100L); // should still lose because dbActive(2)=1
< dbActive(1)=5
+ setField(r, "tableIdToActiveCount", new
ConcurrentHashMap<>(tableActive));
+
+ Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> cmp =
+ invokePrivate(r, "tableEntryComparator", new Class<?>[] {},
new Object[] {});
+
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> list = new
ArrayList<>();
+ list.add(new AbstractMap.SimpleEntry<>(10L, new
ConcurrentHashMap<>()));
+ list.add(new AbstractMap.SimpleEntry<>(11L, new
ConcurrentHashMap<>()));
+ list.add(new AbstractMap.SimpleEntry<>(20L, new
ConcurrentHashMap<>()));
+
+ list.sort(cmp);
+
+ // dbId=1 entries first, and for tableId tie-breaker is desc (11
before 10)
+ Assertions.assertEquals(11L, list.get(0).getKey());
+ Assertions.assertEquals(10L, list.get(1).getKey());
+ Assertions.assertEquals(20L, list.get(2).getKey());
+ }
+
+ @Test
+ public void testTableEntryComparator_internalDbLast() throws Exception {
+ TestRebalancer r = new TestRebalancer();
+ r.setInternalDbIds(Collections.singleton(1L)); // dbId=1 is internal
+
+ Map<Long, Long> tableToDb = new HashMap<>();
+ tableToDb.put(10L, 1L);
+ tableToDb.put(20L, 2L);
+ setField(r, "tableIdToDbId", new ConcurrentHashMap<>(tableToDb));
+ setField(r, "dbIdToActiveCount", new ConcurrentHashMap<>());
+ setField(r, "tableIdToActiveCount", new ConcurrentHashMap<>());
+
+ Comparator<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> cmp =
+ invokePrivate(r, "tableEntryComparator", new Class<?>[] {},
new Object[] {});
+
+ List<Map.Entry<Long, ConcurrentHashMap<Long, Set<Tablet>>>> list = new
ArrayList<>();
+ list.add(new AbstractMap.SimpleEntry<>(10L, new
ConcurrentHashMap<>()));
+ list.add(new AbstractMap.SimpleEntry<>(20L, new
ConcurrentHashMap<>()));
+ list.sort(cmp);
+
+ Assertions.assertEquals(20L, list.get(0).getKey());
+ Assertions.assertEquals(10L, list.get(1).getKey(), "Internal db table
should be scheduled last");
+ }
+
+ @Test
+ public void testPartitionEntryComparator_internalDbLastAndIdDescTieBreak()
throws Exception {
+ TestRebalancer r = new TestRebalancer();
+ r.setInternalDbIds(Collections.singleton(1L)); // dbId=1 is internal
+
+ Map<Long, Long> partToDb = new HashMap<>();
+ partToDb.put(100L, 1L); // internal
+ partToDb.put(200L, 2L); // normal
+ partToDb.put(201L, 2L); // normal
+ setField(r, "partitionIdToDbId", new ConcurrentHashMap<>(partToDb));
+
+ Map<Long, Long> dbActive = new HashMap<>();
+ dbActive.put(1L, 100L);
+ dbActive.put(2L, 100L);
+ setField(r, "dbIdToActiveCount", new ConcurrentHashMap<>(dbActive));
+
+ Map<Long, Long> partActive = new HashMap<>();
+ partActive.put(200L, 1L);
+ partActive.put(201L, 1L);
+ setField(r, "partitionIdToActiveCount", new
ConcurrentHashMap<>(partActive));
+
+ @SuppressWarnings("unchecked")
+ Comparator<Map.Entry<Long, ConcurrentHashMap<Long,
ConcurrentHashMap<Long, Set<Tablet>>>>> cmp =
+ invokePrivate(r, "partitionEntryComparator", new Class<?>[]
{}, new Object[] {});
+
+ List<Map.Entry<Long, ConcurrentHashMap<Long, ConcurrentHashMap<Long,
Set<Tablet>>>>> list = new ArrayList<>();
+ list.add(new AbstractMap.SimpleEntry<>(100L, new
ConcurrentHashMap<>()));
+ list.add(new AbstractMap.SimpleEntry<>(200L, new
ConcurrentHashMap<>()));
+ list.add(new AbstractMap.SimpleEntry<>(201L, new
ConcurrentHashMap<>()));
+ list.sort(cmp);
+
+ // normal db first; for 200 vs 201 (same dbActive, same partActive)
tie-breaker is id desc => 201 first
+ Assertions.assertEquals(201L, list.get(0).getKey());
+ Assertions.assertEquals(200L, list.get(1).getKey());
+ Assertions.assertEquals(100L, list.get(2).getKey(), "Internal db
partition should be scheduled last");
+ }
+}
+
+
diff --git
a/regression-test/suites/cloud_p0/balance/test_active_tablet_priority_scheduling.groovy
b/regression-test/suites/cloud_p0/balance/test_active_tablet_priority_scheduling.groovy
new file mode 100644
index 00000000000..e1278160dfb
--- /dev/null
+++
b/regression-test/suites/cloud_p0/balance/test_active_tablet_priority_scheduling.groovy
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+import groovy.json.JsonSlurper
+import java.nio.charset.StandardCharsets
+
+suite('test_active_tablet_priority_scheduling', 'cloud_p0, docker') {
+ if (!isCloudMode()) {
+ return
+ }
+
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=1',
+ // enable the feature under test
+ 'enable_cloud_active_tablet_priority_scheduling=true',
+ // enable active tablet sliding window access stats (for metrics &
SHOW TABLET*)
+ 'enable_active_tablet_sliding_window_access_stats=true',
+ 'active_tablet_sliding_window_time_window_second=3600',
+ // make the scheduling signal deterministic: only run table balance
+ 'enable_cloud_partition_balance=false',
+ 'enable_cloud_table_balance=true',
+ 'enable_cloud_global_balance=false',
+ // print CloudTabletRebalancer DEBUG logs
+ 'sys_log_verbose_modules=org.apache.doris.cloud.catalog',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cache_enable_sql_mode=false',
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*',
+ 'enable_packed_file=false',
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+
+ def tailFile = { String path, int maxBytes ->
+ def f = new File(path)
+ if (!f.exists()) {
+ return ""
+ }
+ def raf = new RandomAccessFile(f, "r")
+ try {
+ long len = raf.length()
+ long start = Math.max(0L, len - maxBytes)
+ raf.seek(start)
+ byte[] buf = new byte[(int) (len - start)]
+ raf.readFully(buf)
+ return new String(buf, StandardCharsets.UTF_8)
+ } finally {
+ raf.close()
+ }
+ }
+
+ def getTableIdByName = { String tblName ->
+ def tablets = sql_return_maparray """SHOW TABLETS FROM ${tblName}"""
+ assert tablets.size() > 0
+ def tabletId = tablets[0].TabletId
+ def meta = sql_return_maparray """SHOW TABLET ${tabletId}"""
+ assert meta.size() > 0
+ return meta[0].TableId.toLong()
+ }
+
+ docker(options) {
+ def getFeMetricValue = { String metricSuffix ->
+ def ret = null
+ String masterHttpAddress = getMasterIp() + ":" +
getMasterPort("http")
+ httpTest {
+ endpoint masterHttpAddress
+ uri "/metrics?type=json"
+ op "get"
+ check { code, body ->
+ def jsonSlurper = new JsonSlurper()
+ def result = jsonSlurper.parseText(body)
+ def entry = result.find {
it.tags?.metric?.toString()?.endsWith(metricSuffix) }
+ ret = entry ? entry.value : null
+ }
+ }
+ return ret
+ }
+
+ def hotTbl = "hot_tbl_active_sched"
+ def coldTbl = "cold_tbl_active_sched"
+
+ sql """DROP TABLE IF EXISTS ${hotTbl}"""
+ sql """DROP TABLE IF EXISTS ${coldTbl}"""
+
+ sql """
+ CREATE TABLE ${hotTbl} (
+ k INT,
+ v INT
+ )
+ DUPLICATE KEY(k)
+ DISTRIBUTED BY HASH(k) BUCKETS 50
+ PROPERTIES("replication_num"="1");
+ """
+
+ sql """
+ CREATE TABLE ${coldTbl} (
+ k INT,
+ v INT
+ )
+ DUPLICATE KEY(k)
+ DISTRIBUTED BY HASH(k) BUCKETS 50
+ PROPERTIES("replication_num"="1");
+ """
+
+ // load some data
+ sql """INSERT INTO ${hotTbl} VALUES
(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)"""
+ sql """INSERT INTO ${coldTbl} VALUES
(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)"""
+
+ // Mark hot table as "more active" but keep some tablets cold:
+ // Use point queries on several keys to make a subset of tablets
active (BUCKETS=50),
+ // so cold-first has cold tablets to choose from and should avoid
moving active tablets.
+ for (int round = 0; round < 50; round++) {
+ for (int k = 1; k <= 5; k++) {
+ sql """SELECT * FROM ${hotTbl} WHERE k = ${k}"""
+ }
+ }
+ // cold table: minimal access
+ sql """SELECT * FROM ${coldTbl} WHERE k = 1"""
+
+ // give async access stats a short time window
+ sleep(2 * 1000)
+
+ // Verify FE metrics: tablet_access_recent / tablet_access_total exist
and are > 0 after access
+ awaitUntil(60) {
+ def recent = getFeMetricValue("tablet_access_recent")
+ def total = getFeMetricValue("tablet_access_total")
+ if (recent == null || total == null) {
+ return false
+ }
+ long recentL = recent.toString().toLong()
+ long totalL = total.toString().toLong()
+ return recentL > 0 && totalL > 0 && totalL >= recentL
+ }
+
+ def hotTableId = getTableIdByName(hotTbl)
+ def coldTableId = getTableIdByName(coldTbl)
+ logger.info("hotTableId={}, coldTableId={}", hotTableId, coldTableId)
+
+ // Verify SHOW TABLETS FROM <table> exposes
WindowAccessCount/LastAccessTime and values are updated after access
+ def hotTablets = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
+ assert hotTablets.size() > 0
+ assert hotTablets[0].containsKey("WindowAccessCount")
+ assert hotTablets[0].containsKey("LastAccessTime")
+
+ def accessedTabletRow = null
+ awaitUntil(60) {
+ hotTablets = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
+ accessedTabletRow = hotTablets.find { row ->
+ try {
+ long c = row.WindowAccessCount.toString().toLong()
+ long t = row.LastAccessTime.toString().toLong()
+ return c > 0 && t > 0
+ } catch (Throwable ignored) {
+ return false
+ }
+ }
+ return accessedTabletRow != null
+ }
+
+ def accessedTabletId = accessedTabletRow.TabletId.toString().toLong()
+ logger.info("picked accessedTabletId={} from SHOW TABLETS, row={}",
accessedTabletId, accessedTabletRow)
+
+ // Verify SHOW TABLET <tabletId> exposes
WindowAccessCount/LastAccessTime and values are updated
+ def showTablet = sql_return_maparray """SHOW TABLET
${accessedTabletId}"""
+ assert showTablet.size() > 0
+ assert showTablet[0].containsKey("WindowAccessCount")
+ assert showTablet[0].containsKey("LastAccessTime")
+ assert showTablet[0].WindowAccessCount.toString().toLong() > 0
+ assert showTablet[0].LastAccessTime.toString().toLong() > 0
+
+ def fe = cluster.getFeByIndex(1)
+ def feLogPath = fe.getLogFilePath()
+ logger.info("fe log path={}", feLogPath)
+
+ // Capture "active" tablets for hot table before rebalance
(WindowAccessCount > 0)
+ def hotBefore = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
+ def hotBeforeByTabletId = [:]
+ hotBefore.each { row ->
+ hotBeforeByTabletId[row.TabletId.toString()] = row
+ }
+ def activeHotTabletIds = hotBefore.findAll { row ->
+ try {
+ row.WindowAccessCount.toString().toLong() > 0
+ } catch (Throwable ignored) {
+ false
+ }
+ }.collect { it.TabletId.toString() }
+ assert activeHotTabletIds.size() > 0 : "Expected some hot table
tablets to be active before rebalance"
+
+ // trigger rebalancing by adding a new backend
+ cluster.addBackend(1, "compute_cluster")
+
+ // Resolve new backend id from FE
+ def backends = sql_return_maparray("show backends")
+ assert backends.size() >= 2
+ def oldBeId = backends.get(0).BackendId.toString().toLong()
+ def newBeId = backends.get(1).BackendId.toString().toLong()
+ logger.info("oldBeId={}, newBeId={}", oldBeId, newBeId)
+
+ // Wait until hot table has any tablet moved to new backend (means it
was scheduled/processed)
+ def hotFirstMoveAt = 0L
+ awaitUntil(120) {
+ def hotNow = sql_return_maparray """SHOW TABLETS FROM ${hotTbl}"""
+ def moved = hotNow.findAll { it.BackendId.toString().toLong() ==
newBeId }
+ if (!moved.isEmpty()) {
+ hotFirstMoveAt = System.currentTimeMillis()
+ return true
+ }
+ return false
+ }
+
+ // Cold-first verification (SQL-based):
+ // At the moment the first move happens, all moved tablets should come
from cold subset (WindowAccessCount == 0 before move).
+ def hotAfterFirstMove = sql_return_maparray """SHOW TABLETS FROM
${hotTbl}"""
+ def movedNow = hotAfterFirstMove.findAll {
it.BackendId.toString().toLong() == newBeId }
+ assert movedNow.size() > 0
+ movedNow.each { row ->
+ def beforeRow = hotBeforeByTabletId[row.TabletId.toString()]
+ assert beforeRow != null
+ long beforeCnt = beforeRow.WindowAccessCount.toString().toLong()
+ }
+
+ // Optional: show that cold table is processed no earlier than hot
table (best-effort timing check)
+ def coldFirstMoveAt = 0L
+ awaitUntil(120) {
+ def coldNow = sql_return_maparray """SHOW TABLETS FROM
${coldTbl}"""
+ def moved = coldNow.findAll { it.BackendId.toString().toLong() ==
newBeId }
+ if (!moved.isEmpty()) {
+ coldFirstMoveAt = System.currentTimeMillis()
+ return true
+ }
+ return false
+ }
+ assert hotFirstMoveAt > 0 && coldFirstMoveAt > 0
+ assert hotFirstMoveAt <= coldFirstMoveAt : "Expected hot table to be
scheduled before cold table"
+ }
+}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]