github-actions[bot] commented on code in PR #60855:
URL: https://github.com/apache/doris/pull/60855#discussion_r3013068279


##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java:
##########
@@ -515,98 +580,126 @@ public Cloud.RemoveDeleteBitmapUpdateLockResponse 
removeDeleteBitmapUpdateLock(
     @Deprecated
     public Cloud.AlterObjStoreInfoResponse 
alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request)
             throws RpcException {
-        return executeWithMetrics("alterObjStoreInfo", (client) -> 
client.alterObjStoreInfo(request));
+        return executeWithMetrics("alterObjStoreInfo", (client) -> 
client.alterObjStoreInfo(request), 1,
+                Cloud.AlterObjStoreInfoResponse::getStatus);
     }
 
     public Cloud.AlterObjStoreInfoResponse 
alterStorageVault(Cloud.AlterObjStoreInfoRequest request)
             throws RpcException {
-        return executeWithMetrics("alterStorageVault", (client) -> 
client.alterStorageVault(request));
+        return executeWithMetrics("alterStorageVault", (client) -> 
client.alterStorageVault(request), 1,
+                Cloud.AlterObjStoreInfoResponse::getStatus);
     }
 
     public Cloud.FinishTabletJobResponse 
finishTabletJob(Cloud.FinishTabletJobRequest request)
             throws RpcException {
-        return executeWithMetrics("finishTabletJob", (client) -> 
client.finishTabletJob(request));
+        return executeWithMetrics("finishTabletJob", (client) -> 
client.finishTabletJob(request), 1,
+                Cloud.FinishTabletJobResponse::getStatus);
     }
 
-    public Cloud.GetRLTaskCommitAttachResponse
-            getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
+    public Cloud.GetRLTaskCommitAttachResponse 
getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
             throws RpcException {
-        return executeWithMetrics("getRLTaskCommitAttach",
-                (client) -> client.getRLTaskCommitAttach(request));
+        return executeWithMetrics("getRLTaskCommitAttach", (client) -> 
client.getRLTaskCommitAttach(request), 1,
+                Cloud.GetRLTaskCommitAttachResponse::getStatus);
     }
 
     public Cloud.ResetRLProgressResponse 
resetRLProgress(Cloud.ResetRLProgressRequest request)
             throws RpcException {
-        return executeWithMetrics("resetRLProgress", (client) -> 
client.resetRLProgress(request));
+        return executeWithMetrics("resetRLProgress", (client) -> 
client.resetRLProgress(request), 1,
+                Cloud.ResetRLProgressResponse::getStatus);
     }
 
-    public Cloud.ResetStreamingJobOffsetResponse 
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request)
-            throws RpcException {
-        return executeWithMetrics("resetStreamingJobOffset",
-                (client) -> client.resetStreamingJobOffset(request));
+    public Cloud.ResetStreamingJobOffsetResponse resetStreamingJobOffset(
+            Cloud.ResetStreamingJobOffsetRequest request) throws RpcException {
+        return executeWithMetrics("resetStreamingJobOffset", (client) -> 
client.resetStreamingJobOffset(request), 1,
+                Cloud.ResetStreamingJobOffsetResponse::getStatus);
     }
 
-    public Cloud.GetObjStoreInfoResponse
-            getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws 
RpcException {
-        return executeWithMetrics("getObjStoreInfo", (client) -> 
client.getObjStoreInfo(request));
+    public Cloud.GetObjStoreInfoResponse 
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws RpcException {
+        return executeWithMetrics("getObjStoreInfo", (client) -> 
client.getObjStoreInfo(request), 1,
+                Cloud.GetObjStoreInfoResponse::getStatus);
     }
 
-    public Cloud.AbortTxnWithCoordinatorResponse
-            abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest 
request) throws RpcException {
-        return executeWithMetrics("abortTxnWithCoordinator",
-                (client) -> client.abortTxnWithCoordinator(request));
+    public Cloud.AbortTxnWithCoordinatorResponse abortTxnWithCoordinator(
+            Cloud.AbortTxnWithCoordinatorRequest request) throws RpcException {
+        return executeWithMetrics("abortTxnWithCoordinator", (client) -> 
client.abortTxnWithCoordinator(request), 1,
+                Cloud.AbortTxnWithCoordinatorResponse::getStatus);
     }
 
-    public Cloud.GetPrepareTxnByCoordinatorResponse
-            getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest 
request) throws RpcException {
+    public Cloud.GetPrepareTxnByCoordinatorResponse getPrepareTxnByCoordinator(
+            Cloud.GetPrepareTxnByCoordinatorRequest request) throws 
RpcException {
         return executeWithMetrics("getPrepareTxnByCoordinator",
-                (client) -> client.getPrepareTxnByCoordinator(request));
+                (client) -> client.getPrepareTxnByCoordinator(request), 1,
+                Cloud.GetPrepareTxnByCoordinatorResponse::getStatus);
     }
 
     public Cloud.CreateInstanceResponse 
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
-        return executeWithMetrics("createInstance", (client) -> 
client.createInstance(request));
+        return executeWithMetrics("createInstance", (client) -> 
client.createInstance(request), 1,
+                Cloud.CreateInstanceResponse::getStatus);
     }
 
     public Cloud.GetStreamingTaskCommitAttachResponse 
getStreamingTaskCommitAttach(
             Cloud.GetStreamingTaskCommitAttachRequest request) throws 
RpcException {
         return executeWithMetrics("getStreamingTaskCommitAttach",
-                (client) -> client.getStreamingTaskCommitAttach(request));
+                (client) -> client.getStreamingTaskCommitAttach(request), 1,
+                Cloud.GetStreamingTaskCommitAttachResponse::getStatus);
     }
 
     public Cloud.DeleteStreamingJobResponse 
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request)
             throws RpcException {
-        return executeWithMetrics("deleteStreamingJob", (client) -> 
client.deleteStreamingJob(request));
+        return executeWithMetrics("deleteStreamingJob", (client) -> 
client.deleteStreamingJob(request), 1,
+                Cloud.DeleteStreamingJobResponse::getStatus);
     }
 
     public Cloud.AlterInstanceResponse 
alterInstance(Cloud.AlterInstanceRequest request) throws RpcException {
-        return executeWithMetrics("alterInstance", (client) -> 
client.alterInstance(request));
+        return executeWithMetrics("alterInstance", (client) -> 
client.alterInstance(request), 1,
+                Cloud.AlterInstanceResponse::getStatus);
     }
 
     public Cloud.BeginSnapshotResponse 
beginSnapshot(Cloud.BeginSnapshotRequest request) throws RpcException {
-        return executeWithMetrics("beginSnapshot", (client) -> 
client.beginSnapshot(request));
+        return executeWithMetrics("beginSnapshot", (client) -> 
client.beginSnapshot(request), 1,
+                Cloud.BeginSnapshotResponse::getStatus);
     }
 
     public Cloud.UpdateSnapshotResponse 
updateSnapshot(Cloud.UpdateSnapshotRequest request) throws RpcException {
-        return executeWithMetrics("updateSnapshot", (client) -> 
client.updateSnapshot(request));
+        return executeWithMetrics("updateSnapshot", (client) -> 
client.updateSnapshot(request), 1,
+                Cloud.UpdateSnapshotResponse::getStatus);
     }
 
     public Cloud.CommitSnapshotResponse 
commitSnapshot(Cloud.CommitSnapshotRequest request) throws RpcException {
-        return executeWithMetrics("commitSnapshot", (client) -> 
client.commitSnapshot(request));
+        return executeWithMetrics("commitSnapshot", (client) -> 
client.commitSnapshot(request), 1,
+                Cloud.CommitSnapshotResponse::getStatus);
     }
 
     public Cloud.AbortSnapshotResponse 
abortSnapshot(Cloud.AbortSnapshotRequest request) throws RpcException {
-        return executeWithMetrics("abortSnapshot", (client) -> 
client.abortSnapshot(request));
+        return executeWithMetrics("abortSnapshot", (client) -> 
client.abortSnapshot(request), 1,
+                Cloud.AbortSnapshotResponse::getStatus);
     }
 
     public Cloud.ListSnapshotResponse listSnapshot(Cloud.ListSnapshotRequest 
request) throws RpcException {
-        return executeWithMetrics("listSnapshot", (client) -> 
client.listSnapshot(request));
+        return executeWithMetrics("listSnapshot", (client) -> 
client.listSnapshot(request), 1,
+                Cloud.ListSnapshotResponse::getStatus);
     }
 
     public Cloud.DropSnapshotResponse dropSnapshot(Cloud.DropSnapshotRequest 
request) throws RpcException {
-        return executeWithMetrics("dropSnapshot", (client) -> 
client.dropSnapshot(request));
+        return executeWithMetrics("dropSnapshot", (client) -> 
client.dropSnapshot(request), 1,
+                Cloud.DropSnapshotResponse::getStatus);
     }
 
     public Cloud.CloneInstanceResponse 
cloneInstance(Cloud.CloneInstanceRequest request) throws RpcException {
-        return executeWithMetrics("cloneInstance", (client) -> 
client.cloneInstance(request));
+        return executeWithMetrics("cloneInstance", (client) -> 
client.cloneInstance(request), 1,
+                Cloud.CloneInstanceResponse::getStatus);
+    }
+
+    private static void handleResponseStatus(Cloud.MetaServiceResponseStatus 
status) {
+        if (Config.meta_service_rpc_overload_throttle_enabled) {
+            MetaServiceOverloadThrottle.Signal signal = 
isMetaServiceBusy(status)

Review Comment:
   **[Missing Feature - Medium]** `handleResponseStatus()` only emits 
`Signal.SUCCESS` or `Signal.OVERLOAD`, but never `Signal.TIMEOUT`. gRPC 
`DEADLINE_EXCEEDED` errors caught in `executeRequest()` (line 237) do not reach 
this method because exceptions are thrown rather than responses returned. This 
means the overload throttle's `Signal.TIMEOUT` enum value is dead code - the 
throttle will never react to RPC timeouts, only to explicit `MAX_QPS_LIMIT` 
responses from the meta-service.
   
   If the intent is for the throttle to also react to timeouts, you should 
record `Signal.TIMEOUT` in the `StatusRuntimeException` catch block of 
`executeRequest()` when the status is `DEADLINE_EXCEEDED`.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java:
##########
@@ -283,18 +302,58 @@ private <Response> Response executeWithMetrics(String 
methodName, Function<MetaS
         }
     }
 
-    public Future<Cloud.GetVersionResponse> 
getVisibleVersionAsync(Cloud.GetVersionRequest request)
-            throws RpcException {
+    public Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest 
request, int timeoutMs) {
         long startTime = System.currentTimeMillis();
-        String methodName = request.hasIsTableVersion() && 
request.getIsTableVersion() ? "getTableVersion"
-                : "getPartitionVersion";
-        MetaServiceClient client = null;
-
+        String methodName = request.hasIsTableVersion() && 
request.getIsTableVersion()
+                ? MetaServiceRpcLimiterManager.GET_TABLE_VERSION_METHOD :
+                MetaServiceRpcLimiterManager.GET_PARTITION_VERSION_METHOD;
+        int cost = 1;
+        if (request.hasBatchMode() && request.getBatchMode()) {
+            cost = 
MetaServiceRpcLimiterManager.getInstance().getClampedCost(methodName, 
request.getDbIdsCount());
+        }
+        boolean acquired = false;
         if (MetricRepo.isInit && Config.isCloudMode()) {
             CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
             
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
         }
 
+        long deadline = System.currentTimeMillis() + timeoutMs;
+        Cloud.GetVersionResponse resp = null;
+        try {
+            acquired = 
MetaServiceRpcLimiterManager.getInstance().acquire(methodName, cost);
+            Future<Cloud.GetVersionResponse> future = 
getVisibleVersionAsync(request);
+            while (resp == null) {
+                try {
+                    resp = future.get(Math.max(0, deadline - 
System.currentTimeMillis()), TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    LOG.warn("get version from meta service: future get 
interrupted exception", e);
+                }

Review Comment:
   **[Bug - Medium]** `InterruptedException` is caught but the thread's 
interrupt flag is not restored. Per Java best practices, after catching 
`InterruptedException` without propagating it, you should call 
`Thread.currentThread().interrupt()` to preserve the interrupt status. Without 
this, callers up the stack that check the interrupt flag will not know the 
thread was interrupted.
   
   Also, this catch-and-retry pattern will loop indefinitely if the future 
completes with `InterruptedException` every time (theoretically possible if the 
thread keeps getting interrupted). Consider either propagating the exception or 
breaking out after a few interrupted retries.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceRpcLimiterManager.java:
##########
@@ -0,0 +1,432 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.rpc;
+
+import org.apache.doris.cloud.rpc.RpcRateLimiter.CostLimiter;
+import org.apache.doris.cloud.rpc.RpcRateLimiter.OverloadQpsLimiter;
+import org.apache.doris.cloud.rpc.RpcRateLimiter.QpsLimiter;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.metric.CloudMetrics;
+import org.apache.doris.metric.MetricRepo;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+public class MetaServiceRpcLimiterManager {
+    private static final Logger LOG = 
LogManager.getLogger(MetaServiceRpcLimiterManager.class);
+    private static final String GET_VERSION_METHOD = "getVersion";
+    public static final String GET_TABLE_VERSION_METHOD = "getTableVersion";
+    public static final String GET_PARTITION_VERSION_METHOD = 
"getPartitionVersion";
+
+    private final int processorCount;
+    private static volatile MetaServiceRpcLimiterManager instance;
+
+    private volatile boolean lastEnabled = false;
+    private volatile int lastMaxWaitRequestNum = 0;
+    private volatile int lastDefaultQps = 0;
+    private volatile String lastQpsConfig = "";
+    private volatile String lastCostConfig = "";
+    private volatile boolean lastOverloadThrottleEnabled = false;
+    private volatile String lastOverloadThrottleMethodAllowlist = "";
+
+    private Map<String, Integer> methodQpsConfig = new ConcurrentHashMap<>();
+    private Map<String, Integer> methodCostConfig = new ConcurrentHashMap<>();
+    private Set<String> overloadThrottleMethods = 
ConcurrentHashMap.newKeySet();
+
+    private final Map<String, QpsLimiter> qpsLimiters = new 
ConcurrentHashMap<>();
+    private final Map<String, CostLimiter> costLimiters = new 
ConcurrentHashMap<>();
+    private final Map<String, OverloadQpsLimiter> overloadQpsLimiters = new 
ConcurrentHashMap<>();
+
+    public static MetaServiceRpcLimiterManager getInstance() {
+        if (instance == null) {
+            synchronized (MetaServiceRpcLimiterManager.class) {
+                if (instance == null) {
+                    instance = new 
MetaServiceRpcLimiterManager(Runtime.getRuntime().availableProcessors());
+                }
+            }
+        }
+        return instance;
+    }
+
+    @VisibleForTesting
+    MetaServiceRpcLimiterManager(int processorCount) {
+        this.processorCount = processorCount;
+        reloadConfig();
+        
MetaServiceOverloadThrottle.getInstance().setFactorChangeListener(this::setOverloadFactor);
+    }
+
+    @VisibleForTesting
+    boolean isConfigChanged() {
+        return Config.meta_service_rpc_rate_limit_enabled != lastEnabled
+                || Config.meta_service_rpc_rate_limit_default_qps_per_core != 
lastDefaultQps
+                || Config.meta_service_rpc_rate_limit_max_waiting_request_num 
!= lastMaxWaitRequestNum
+                || 
!Objects.equals(Config.meta_service_rpc_rate_limit_qps_per_core_config, 
lastQpsConfig)
+                || 
!Objects.equals(Config.meta_service_rpc_cost_limit_per_core_config, 
lastCostConfig)
+                || Config.meta_service_rpc_overload_throttle_enabled != 
lastOverloadThrottleEnabled
+                || 
!Objects.equals(Config.meta_service_rpc_overload_throttle_methods,
+                lastOverloadThrottleMethodAllowlist);
+    }
+
+    @VisibleForTesting
+    boolean reloadConfig() {
+        if (!isConfigChanged()) {
+            return false;
+        }
+        synchronized (this) {
+            if (!isConfigChanged()) {
+                return false;
+            }
+            reloadRateLimiterConfig();
+            reloadOverloadThrottleConfig();
+        }
+        return true;
+    }
+
+    private void reloadRateLimiterConfig() {
+        boolean enabled = Config.meta_service_rpc_rate_limit_enabled;
+        int maxWaitRequestNum = 
Config.meta_service_rpc_rate_limit_max_waiting_request_num;
+        int defaultQpsPerCore = 
Config.meta_service_rpc_rate_limit_default_qps_per_core;
+        String qpsConfig = 
Config.meta_service_rpc_rate_limit_qps_per_core_config;
+        String costConfig = Config.meta_service_rpc_cost_limit_per_core_config;
+        // Parse the qps and cost config
+        parseConfig(qpsConfig, "QPS", methodQpsConfig);
+        parseConfig(costConfig, "cost limit", methodCostConfig);
+        updateQpsLimiters(defaultQpsPerCore, maxWaitRequestNum);
+
+        // If disabled, clear all limiters
+        if (!enabled) {
+            methodCostConfig.clear();
+            qpsLimiters.clear();
+            costLimiters.clear();
+        } else {
+            updateCostLimiters();
+        }
+        // Update last config
+        lastEnabled = enabled;
+        lastMaxWaitRequestNum = maxWaitRequestNum;
+        lastDefaultQps = defaultQpsPerCore;
+        lastQpsConfig = qpsConfig;
+        lastCostConfig = costConfig;
+        LOG.info("Reload meta service rpc rate limit config. enabled: {}, 
maxWaitRequestNum: {}, defaultQps: {}, "
+                        + "qpsConfig: [{}], costConfig: [{}]", lastEnabled, 
lastMaxWaitRequestNum, lastDefaultQps,
+                lastQpsConfig, lastCostConfig);
+    }
+
+    private void reloadOverloadThrottleConfig() {
+        boolean overloadThrottleEnabled = 
Config.meta_service_rpc_overload_throttle_enabled;
+        String overloadThrottleMethods = 
Config.meta_service_rpc_overload_throttle_methods;
+        if (!overloadThrottleEnabled) {
+            this.overloadThrottleMethods.clear();
+            this.overloadQpsLimiters.clear();
+        } else {
+            Set<String> newOverloadThrottleMethods = new HashSet<>();
+            if (overloadThrottleMethods != null && 
!overloadThrottleMethods.isEmpty()) {
+                for (String method : overloadThrottleMethods.split(",")) {
+                    String trimmed = method.trim();
+                    if (!trimmed.isEmpty()) {
+                        if (trimmed.equalsIgnoreCase(GET_VERSION_METHOD)) {
+                            
newOverloadThrottleMethods.add(GET_TABLE_VERSION_METHOD);
+                            
newOverloadThrottleMethods.add(GET_PARTITION_VERSION_METHOD);
+                        } else {
+                            newOverloadThrottleMethods.add(trimmed);
+                        }
+                    }
+                }
+            }
+            Set<String> toRemove = new HashSet<>();
+            for (String method : this.overloadThrottleMethods) {
+                if (!newOverloadThrottleMethods.contains(method)) {
+                    toRemove.add(method);
+                }
+            }
+            this.overloadThrottleMethods.removeAll(toRemove);
+            this.overloadThrottleMethods.addAll(newOverloadThrottleMethods);
+            this.overloadQpsLimiters.keySet()
+                    .removeIf(method -> 
!this.overloadThrottleMethods.contains(method));
+        }
+        lastOverloadThrottleEnabled = overloadThrottleEnabled;
+        lastOverloadThrottleMethodAllowlist = overloadThrottleMethods;
+    }
+
+    private void updateQpsLimiters(int defaultQpsPerCore, int 
maxWaitRequestNum) {
+        List<String> toRemove = new ArrayList<>();
+        for (Entry<String, QpsLimiter> entry : qpsLimiters.entrySet()) {
+            String methodName = entry.getKey();
+            int qps = getMethodTotalQps(methodName, defaultQpsPerCore);
+            if (qps <= 0) {
+                toRemove.add(methodName);
+                continue;
+            }
+            QpsLimiter limiter = entry.getValue();
+            limiter.update(maxWaitRequestNum, qps);
+            LOG.info("Updated rate limiter for method: {}, maxWaitRequestNum: 
{}, qps: {}", methodName,
+                    maxWaitRequestNum, qps);
+        }
+        if (!toRemove.isEmpty()) {
+            LOG.info("Remove zero qps rate limiter for methods: {}", toRemove);
+            for (String methodName : toRemove) {
+                qpsLimiters.remove(methodName);
+            }
+        }
+    }
+
+    private void updateCostLimiters() {
+        List<String> toRemove = new ArrayList<>();
+        for (Entry<String, CostLimiter> entry : costLimiters.entrySet()) {
+            String methodName = entry.getKey();
+            int costLimit = getMethodTotalCostLimit(methodName);
+            if (costLimit <= 0) {
+                toRemove.add(methodName);
+                continue;
+            }
+            CostLimiter limiter = entry.getValue();
+            limiter.setLimit(costLimit);
+            LOG.info("Updated cost limiter for method: {}, cost: {}", 
methodName, costLimit);
+        }
+        if (!toRemove.isEmpty()) {
+            LOG.info("Remove cost limiter for methods: {}", toRemove);
+            for (String methodName : toRemove) {
+                costLimiters.remove(methodName);
+            }
+        }
+    }
+
+    private void parseConfig(String config, String configName, Map<String, 
Integer> map) {
+        if (config == null || config.isEmpty()) {
+            map.clear();
+            return;
+        }
+
+        Map<String, Integer> target = new HashMap<>();
+        String[] entries = config.split(";");
+        for (String entry : entries) {
+            if (entry.trim().isEmpty()) {
+                continue;
+            }
+            String[] parts = entry.trim().split(":");
+            if (parts.length == 2) {
+                try {
+                    String methodName = parts[0].trim();
+                    int limit = Integer.parseInt(parts[1].trim());
+                    if (methodName.equalsIgnoreCase(GET_VERSION_METHOD)) {
+                        target.put(GET_TABLE_VERSION_METHOD, limit);
+                        target.put(GET_PARTITION_VERSION_METHOD, limit);
+                    } else {
+                        target.put(methodName, limit);
+                    }
+                } catch (NumberFormatException e) {
+                    LOG.warn("Invalid {} config entry: {}", configName, entry);
+                }
+            } else {
+                LOG.warn("Invalid {} config entry: {}", configName, entry);
+            }
+        }
+        map.clear();
+        map.putAll(target);
+    }
+
+    private int getMethodTotalQps(String methodName, int defaultQpsPerCore) {
+        int qpsPerCore = methodQpsConfig.getOrDefault(methodName, 
defaultQpsPerCore);
+        if (qpsPerCore <= 0) {
+            return 0;
+        }
+        return qpsPerCore * processorCount;
+    }
+
+    protected int getClampedCost(String methodName, int cost) {
+        if (Config.meta_service_rpc_cost_clamped_to_limit_enabled) {
+            int limit = getMethodTotalCostLimit(methodName);
+            if (limit > 0 && cost > limit) {
+                LOG.info("Clamped cost: {} for method: {} to limit: {}", cost,
+                        methodName, limit);
+                cost = limit;
+            }
+        }
+        return cost;
+    }
+
+    private int getMethodTotalCostLimit(String methodName) {
+        int costPerCore = methodCostConfig.getOrDefault(methodName, 0);
+        if (costPerCore <= 0) {
+            return 0;
+        }
+        return costPerCore * processorCount;
+    }
+
+    private QpsLimiter getQpsLimiter(String methodName) {
+        return qpsLimiters.compute(methodName, (name, limiter) -> {
+            if (limiter != null) {
+                return limiter;
+            }
+            int qps = getMethodTotalQps(name, 
Config.meta_service_rpc_rate_limit_default_qps_per_core);
+            if (qps > 0) {
+                return new QpsLimiter(name, 
Config.meta_service_rpc_rate_limit_max_waiting_request_num, qps);
+            }
+            return null;
+        });
+    }
+
+    private CostLimiter getCostLimiter(String methodName) {
+        return costLimiters.compute(methodName, (name, limiter) -> {
+            if (limiter != null) {
+                return limiter;
+            }
+            int costLimit = getMethodTotalCostLimit(name);
+            if (costLimit > 0) {
+                return new CostLimiter(methodName, costLimit);
+            }
+            return null;
+        });
+    }
+
+    private OverloadQpsLimiter getOverloadQpsLimiter(String methodName, double 
factor) {
+        return overloadQpsLimiters.compute(methodName, (name, limiter) -> {
+            if (limiter != null) {
+                return limiter;

Review Comment:
   **[Race Condition - Medium]** `getOverloadQpsLimiter()` ignores the `factor` 
parameter when the limiter already exists (`if (limiter != null) return 
limiter`). The factor is only used at creation time. This creates a race:
   
   1. `setOverloadFactor(factor >= 1.0)` clears `overloadQpsLimiters` (line 
396).
   2. Concurrently, `acquire()` reads a stale `factor < 1.0` and calls 
`getOverloadQpsLimiter(methodName, staleFactor)`, re-populating the map.
   3. This orphaned limiter will never have its factor corrected until the next 
`setOverloadFactor()` call.
   
   Consider either: (a) always applying the current factor to existing limiters 
in this method, or (b) re-reading the factor from 
`MetaServiceOverloadThrottle.getInstance().getFactor()` inside the `compute` 
lambda instead of using the stale parameter.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceOverloadThrottle.java:
##########
@@ -0,0 +1,259 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.rpc;
+
+import org.apache.doris.common.Config;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+
+/**
+ * Overload throttle controller for meta-service RPCs.
+ *
+ * <p>Monitors RPC outcomes (success, timeout, overload) within a rolling time 
window
+ * and adjusts a global factor (0.1–1.0) that scales the configured QPS limits 
in
+ * {@link MetaServiceRpcLimiterManager}. Uses a state machine:
+ * NORMAL → FAST_DECREASE → COOLDOWN → SLOW_RECOVERY → NORMAL.
+ */
+public class MetaServiceOverloadThrottle {
+    private static final Logger LOG = 
LogManager.getLogger(MetaServiceOverloadThrottle.class);
+
+    public enum State {
+        NORMAL,
+        FAST_DECREASE,
+        COOLDOWN,
+        SLOW_RECOVERY
+    }
+
+    public enum Signal {
+        SUCCESS,
+        TIMEOUT,
+        OVERLOAD
+    }
+
+    private volatile State state = State.NORMAL;
+    private volatile double factor = 1.0;
+
+    private final LongAdder windowTotal = new LongAdder();
+    private final LongAdder windowBad = new LongAdder();
+    private final AtomicLong windowStartMs = new 
AtomicLong(System.currentTimeMillis());
+
+    private volatile long cooldownStartMs = 0;
+    private volatile long lastRecoveryMs = 0;
+
+    private static volatile MetaServiceOverloadThrottle instance;
+    private volatile Consumer<Double> factorChangeListener = null;
+
+    private MetaServiceOverloadThrottle() {
+    }
+
+    public static MetaServiceOverloadThrottle getInstance() {
+        if (instance == null) {
+            synchronized (MetaServiceOverloadThrottle.class) {
+                if (instance == null) {
+                    instance = new MetaServiceOverloadThrottle();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public void recordSignal(Signal signal) {
+        if (!Config.meta_service_rpc_overload_throttle_enabled) {
+            return;
+        }

Review Comment:
   **[Concurrency Bug - High]** `recordSignal()` is called from multiple RPC 
threads concurrently, but the entire state-machine logic (reading `state`, 
calling handle methods, transitioning state, updating `factor`, resetting 
window) uses only `volatile` without mutual exclusion. This causes several race 
conditions:
   
   1. **Lost factor updates:** In `decreaseFactor()`, two threads can both read 
`factor = 0.8`, both compute `0.8 * 0.7 = 0.56`, and both write `0.56`. The 
second decrease is lost (should be `0.8 * 0.7 * 0.7 = 0.392`).
   
   2. **Concurrent state transitions:** In `handleFastDecrease()`, one thread 
may see `isOverloaded()=false` and transition to COOLDOWN, while another sees 
`isOverloaded()=true` and calls `decreaseFactor()`. Both execute concurrently, 
violating the state machine.
   
   3. **Timestamp race:** `cooldownStartMs = now` is written *after* 
`transitionTo(State.COOLDOWN)`. A concurrent thread can enter 
`handleCooldown()` with `cooldownStartMs = 0`, causing `now - 0 >= cooldownMs` 
to be true, skipping the entire cooldown period.
   
   **Recommendation:** Synchronize `recordSignal()` (or at least the 
state-machine portion). The lock contention cost is negligible compared to RPC 
latency.



##########
regression-test/suites/cloud_p0/test_ms_rpc_rate_limiter.groovy:
##########
@@ -0,0 +1,116 @@
+import groovy.json.JsonSlurper
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_ms_rpc_rate_limiter") {
+    if (!isCloudMode()) {
+        return
+    }
+    def tableName = "test_ms_rpc_rate_limiter"
+
+    def getProfileList = {
+        def dst = 'http://' + context.config.feHttpAddress

Review Comment:
   **[Style]** Per Doris regression test standards: "for ordinary single test 
tables, do not use `def tableName` form; instead hardcode your table name in 
all SQL." Please inline the table name `test_ms_rpc_rate_limiter` directly in 
each SQL statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to