This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ac6bdef51fa Add QUERIES_THROTTLED metric for 
ThrottleOnCriticalHeapUsageExecutor (#16676)
ac6bdef51fa is described below

commit ac6bdef51fa2affe76d78750076c9983105d8dfd
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Sep 18 04:46:44 2025 +0800

    Add QUERIES_THROTTLED metric for ThrottleOnCriticalHeapUsageExecutor 
(#16676)
---
 .../java/org/apache/pinot/common/metrics/BrokerMeter.java     |  1 +
 .../java/org/apache/pinot/common/metrics/ServerMeter.java     |  1 +
 .../core/accounting/PerQueryCPUMemAccountantFactory.java      | 11 ++++++++++-
 3 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index bbe08624457..563fa98cdad 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -118,6 +118,7 @@ public class BrokerMeter implements AbstractMetrics.Meter {
   public static final BrokerMeter UNKNOWN_COLUMN_EXCEPTIONS = 
create("UNKNOWN_COLUMN_EXCEPTIONS", "exceptions", false);
   // Queries preempted by accountant
   public static final BrokerMeter QUERIES_KILLED = create("QUERIES_KILLED", 
"query", true);
+  public static final BrokerMeter QUERIES_THROTTLED = 
create("QUERIES_THROTTLED", "query", true);
   // Scatter phase.
   public static final BrokerMeter NO_SERVER_FOUND_EXCEPTIONS = create(
       "NO_SERVER_FOUND_EXCEPTIONS", "exceptions", false);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index de1678d98fe..fbf006ec3d0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -111,6 +111,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   READINESS_CHECK_OK_CALLS("readinessCheck", true),
   READINESS_CHECK_BAD_CALLS("readinessCheck", true),
   QUERIES_KILLED("query", true),
+  QUERIES_THROTTLED("query", true),
   HEAP_CRITICAL_LEVEL_EXCEEDED("count", true),
   HEAP_PANIC_LEVEL_EXCEEDED("count", true),
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index a1897b360ed..fc7811784fb 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -268,7 +268,12 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
 
     @Override
     public boolean throttleQuerySubmission() {
-      return getWatcherTask().getHeapUsageBytes() > 
getWatcherTask().getQueryMonitorConfig().getAlarmingLevel();
+      WatcherTask watcherTask = getWatcherTask();
+      boolean shouldThrottle = watcherTask.getHeapUsageBytes() > 
watcherTask.getQueryMonitorConfig().getAlarmingLevel();
+      if (shouldThrottle) {
+        
watcherTask._metrics.addMeteredGlobalValue(watcherTask._queriesThrottledMeter, 
1);
+      }
+      return shouldThrottle;
     }
 
     @Override
@@ -643,6 +648,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       // metrics class
       private final AbstractMetrics _metrics;
       private final AbstractMetrics.Meter _queryKilledMeter;
+      private final AbstractMetrics.Meter _queriesThrottledMeter;
       private final AbstractMetrics.Meter _heapMemoryCriticalExceededMeter;
       private final AbstractMetrics.Meter _heapMemoryPanicExceededMeter;
       private final AbstractMetrics.Gauge _memoryUsageGauge;
@@ -655,6 +661,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
           case SERVER:
             _metrics = ServerMetrics.get();
             _queryKilledMeter = ServerMeter.QUERIES_KILLED;
+            _queriesThrottledMeter = ServerMeter.QUERIES_THROTTLED;
             _memoryUsageGauge = ServerGauge.JVM_HEAP_USED_BYTES;
             _heapMemoryCriticalExceededMeter = 
ServerMeter.HEAP_CRITICAL_LEVEL_EXCEEDED;
             _heapMemoryPanicExceededMeter = 
ServerMeter.HEAP_PANIC_LEVEL_EXCEEDED;
@@ -662,6 +669,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
           case BROKER:
             _metrics = BrokerMetrics.get();
             _queryKilledMeter = BrokerMeter.QUERIES_KILLED;
+            _queriesThrottledMeter = BrokerMeter.QUERIES_THROTTLED;
             _memoryUsageGauge = BrokerGauge.JVM_HEAP_USED_BYTES;
             _heapMemoryCriticalExceededMeter = 
BrokerMeter.HEAP_CRITICAL_LEVEL_EXCEEDED;
             _heapMemoryPanicExceededMeter = 
BrokerMeter.HEAP_PANIC_LEVEL_EXCEEDED;
@@ -670,6 +678,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
             LOGGER.error("instanceType: {} not supported, using server 
metrics", _instanceType);
             _metrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
             _queryKilledMeter = ServerMeter.QUERIES_KILLED;
+            _queriesThrottledMeter = ServerMeter.QUERIES_THROTTLED;
             _memoryUsageGauge = ServerGauge.JVM_HEAP_USED_BYTES;
             _heapMemoryCriticalExceededMeter = 
ServerMeter.HEAP_CRITICAL_LEVEL_EXCEEDED;
             _heapMemoryPanicExceededMeter = 
ServerMeter.HEAP_PANIC_LEVEL_EXCEEDED;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to