gavinchou commented on code in PR #59539:
URL: https://github.com/apache/doris/pull/59539#discussion_r2678824280


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/TabletAccessStats.java:
##########
@@ -0,0 +1,469 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.stream.Collectors;
+
+public class TabletAccessStats {
+    private static final Logger LOG = 
LogManager.getLogger(TabletAccessStats.class);
+
+    private static volatile TabletAccessStats instance;
+
+    // Sort active tablets by accessCount desc, then lastAccessTime desc
+    private static final Comparator<AccessStatsResult> 
TOPN_ACTIVE_TABLET_COMPARATOR =
+            Comparator.comparingLong((AccessStatsResult r) -> 
r.accessCount).reversed()
+                    .thenComparing(Comparator.comparingLong((AccessStatsResult 
r) -> r.lastAccessTime).reversed());
+
+    // Time window in milliseconds (default: 1 hour)
+    private final long timeWindowMs;
+
+    // Bucket size in milliseconds (default: 1 minute)
+    // Smaller bucket = more accurate but more memory
+    private final long bucketSizeMs;
+
+    // Number of buckets in the sliding window
+    private final int numBuckets;
+
+    // Cleanup interval in milliseconds (default: 5 minutes)
+    private final long cleanupIntervalMs;
+
+    // Shard size to reduce lock contention
+    private static final int SHARD_SIZE = 1024;
+
+    /**
+     * Sliding window counter for a single replica/tablet
+     */
+    private static class SlidingWindowCounter {
+        // Each bucket stores count for a time period
+        private final AtomicLongArray buckets;
+        // Timestamp of each bucket (to detect expired buckets)
+        private final AtomicLongArray bucketTimestamps;
+        // Last access time (for TopN sorting)
+        private volatile long lastAccessTime = 0;
+        // Total count in current window (cached for performance)
+        private volatile long cachedTotalCount = 0;
+        private volatile long cachedCountTime = 0;
+
+        SlidingWindowCounter(int numBuckets) {
+            this.buckets = new AtomicLongArray(numBuckets);
+            this.bucketTimestamps = new AtomicLongArray(numBuckets);
+        }
+
+        /**
+         * Get current bucket index based on current time
+         */
+        private int getBucketIndex(long currentTimeMs, long bucketSizeMs, int 
numBuckets) {
+            return (int) ((currentTimeMs / bucketSizeMs) % numBuckets);
+        }
+
+        /**
+         * Add an access count
+         */
+        void add(long currentTimeMs, long bucketSizeMs, int numBuckets) {
+            int bucketIndex = getBucketIndex(currentTimeMs, bucketSizeMs, 
numBuckets);
+            long bucketStartTime = (currentTimeMs / bucketSizeMs) * 
bucketSizeMs;
+
+            // Check if this bucket is expired (belongs to a different time 
window)
+            long bucketTimestamp = bucketTimestamps.get(bucketIndex);
+            if (bucketTimestamp != bucketStartTime) {
+                // Reset expired bucket
+                buckets.set(bucketIndex, 0);
+                bucketTimestamps.set(bucketIndex, bucketStartTime);
+            }
+
+            // Increment count
+            buckets.addAndGet(bucketIndex, 1);
+            lastAccessTime = currentTimeMs;
+            cachedTotalCount = -1; // Invalidate cache
+        }
+
+        /**
+         * Get total count within the time window
+         */
+        long getCount(long currentTimeMs, long timeWindowMs, long 
bucketSizeMs, int numBuckets) {
+            // Use cached value if still valid (within 1 second)
+            if (cachedTotalCount >= 0 && (currentTimeMs - cachedCountTime) < 
1000) {
+                return cachedTotalCount;
+            }
+
+            long windowStart = currentTimeMs - timeWindowMs;
+            long count = 0;
+
+            for (int i = 0; i < numBuckets; i++) {
+                long bucketTimestamp = bucketTimestamps.get(i);
+                if (bucketTimestamp >= windowStart && bucketTimestamp > 0) {
+                    count += buckets.get(i);
+                }
+            }
+
+            cachedTotalCount = count;
+            cachedCountTime = currentTimeMs;
+            return count;
+        }
+
+        long getLastAccessTime() {
+            return lastAccessTime;
+        }
+
+        /**
+         * Clean up expired buckets
+         */
+        void cleanup(long currentTimeMs, long timeWindowMs) {
+            long windowStart = currentTimeMs - timeWindowMs;
+            for (int i = 0; i < buckets.length(); i++) {
+                long bucketTimestamp = bucketTimestamps.get(i);
+                if (bucketTimestamp > 0 && bucketTimestamp < windowStart) {
+                    buckets.set(i, 0);
+                    bucketTimestamps.set(i, 0);
+                }
+            }
+            cachedTotalCount = -1; // Invalidate cache
+        }
+
+        /**
+         * Check if this counter has any recent activity
+         */
+        boolean hasRecentActivity(long currentTimeMs, long timeWindowMs) {
+            return lastAccessTime >= (currentTimeMs - timeWindowMs);
+        }
+    }
+
+    /**
+     * Shard structure to reduce lock contention
+     */
+    private static class AccessStatsShard {
+        // Tablet counters: tabletId -> SlidingWindowCounter
+        private final ConcurrentHashMap<Long, SlidingWindowCounter> 
tabletCounters = new ConcurrentHashMap<>();
+    }
+
+    // Sharded access stats to reduce lock contention
+    private final AccessStatsShard[] shards = new AccessStatsShard[SHARD_SIZE];
+
+    // Access counter for monitoring
+    private final AtomicLong totalAccessCount = new AtomicLong(0);
+
+    // Cleanup daemon
+    private AccessStatsCleanupDaemon cleanupDaemon;
+
+    // Thread pool for async recordAccess execution
+    private ThreadPoolExecutor asyncExecutor;
+
+    // Default time window: 1 hour
+    private static final long DEFAULT_TIME_WINDOW_SECOND = 3600L;
+
+    // Default bucket size: 1 minute (60 buckets for 1 hour window)
+    private static final long DEFAULT_BUCKET_SIZE_SECOND = 60L;
+
+    // Default cleanup interval: 5 minutes
+    private static final long DEFAULT_CLEANUP_INTERVAL_SECOND = 300L;
+
+    private TabletAccessStats() {
+        this.timeWindowMs = DEFAULT_TIME_WINDOW_SECOND * 1000L;
+        this.bucketSizeMs = DEFAULT_BUCKET_SIZE_SECOND * 1000L;
+        this.numBuckets = (int) (DEFAULT_TIME_WINDOW_SECOND / 
DEFAULT_BUCKET_SIZE_SECOND); // 60 buckets
+        this.cleanupIntervalMs = DEFAULT_CLEANUP_INTERVAL_SECOND * 1000L;
+
+        // Initialize shards
+        for (int i = 0; i < SHARD_SIZE; i++) {
+            shards[i] = new AccessStatsShard();
+        }
+
+        // Start cleanup daemon
+        if (Config.enable_cloud_active_tablet_priority_scheduling) {
+            this.cleanupDaemon = new AccessStatsCleanupDaemon();
+            this.cleanupDaemon.start();
+            // Initialize async executor for recordAccess
+            // Use a small thread pool with bounded queue to avoid blocking
+            // If queue is full, discard the task (statistics can tolerate 
some loss)
+            this.asyncExecutor = new ThreadPoolExecutor(
+                    2,  // core pool size
+                    4,  // maximum pool size
+                    60L, TimeUnit.SECONDS,
+                    new LinkedBlockingQueue<>(1000), // queue capacity
+                    r -> {
+                        Thread t = new Thread(r, 
"cloud-tablet-access-stats-async");
+                        t.setDaemon(true);
+                        return t;
+                    },
+                    new ThreadPoolExecutor.DiscardPolicy() // discard when 
queue is full
+            );
+
+            LOG.info("CloudReplicaAccessStats initialized: timeWindow={}ms, 
bucketSize={}ms, "
+                    + "numBuckets={}, shardSize={}, cleanupInterval={}ms",
+                    timeWindowMs, bucketSizeMs, numBuckets, SHARD_SIZE, 
cleanupIntervalMs);
+        }
+    }
+
+    public static TabletAccessStats getInstance() {
+        if (instance == null) {
+            synchronized (TabletAccessStats.class) {
+                if (instance == null) {
+                    instance = new TabletAccessStats();
+                }
+            }
+        }
+        return instance;
+    }
+
+    /**
+     * Get shard index for a given ID
+     */
+    private int getShardIndex(long id) {
+        return (int) (id & (SHARD_SIZE - 1));
+    }
+
+    /**
+     * Record an access to a replica asynchronously
+     * This method is non-blocking and should be used in high-frequency call 
paths
+     * to avoid blocking the caller thread.
+     */
+    public void recordAccessAsync(long replicaId) {
+        if (!Config.enable_cloud_active_tablet_priority_scheduling || 
asyncExecutor == null) {
+            return;
+        }
+
+        try {
+            asyncExecutor.execute(() -> {
+                try {
+                    recordAccess(replicaId);
+                } catch (Exception e) {
+                    // Log but don't propagate exception to avoid affecting 
caller
+                    LOG.debug("Failed to record access asynchronously for 
replicaId={}", replicaId, e);
+                }
+            });
+        } catch (Exception e) {
+            // If executor is shutdown or queue is full, silently ignore
+            // Statistics can tolerate some loss
+            if (LOG.isDebugEnabled()) {

Review Comment:
   log info here, exception here is abnormal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to