This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3132cfca90b Fix wrong interval for OOM protection (#17753)
3132cfca90b is described below

commit 3132cfca90bc06a860b233755fcacff195889a6c
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Feb 24 10:35:17 2026 -0800

    Fix wrong interval for OOM protection (#17753)
---
 .../accounting/ResourceUsageAccountantFactory.java | 111 ++++++++++++---------
 .../accounting/WorkloadResourceAggregator.java     |   6 +-
 .../spi/accounting/WorkloadBudgetManager.java      |  28 +++---
 .../apache/pinot/spi/utils/CommonConstants.java    |  11 +-
 4 files changed, 88 insertions(+), 68 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index 176262c8ef7..0ca20422122 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.accounting;
 
 import java.util.Collection;
-import java.util.EnumMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -34,6 +33,7 @@ import 
org.apache.pinot.spi.accounting.ThreadAccountantFactory;
 import org.apache.pinot.spi.accounting.ThreadResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.accounting.TrackingScope;
+import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -80,7 +80,6 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
     private final WatcherTask _watcherTask;
     private final QueryResourceAggregator _queryResourceAggregator;
     private final WorkloadResourceAggregator _workloadResourceAggregator;
-    private final EnumMap<TrackingScope, ResourceAggregator> 
_resourceAggregators = new EnumMap<>(TrackingScope.class);
 
     public ResourceUsageAccountant(PinotConfiguration config, String 
instanceId, InstanceType instanceType) {
       LOGGER.info("Initializing ResourceUsageAccountant");
@@ -103,11 +102,14 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
       _queryResourceAggregator =
           new QueryResourceAggregator(instanceId, instanceType, 
cpuSamplingEnabled, memorySamplingEnabled,
               _watcherTask._queryMonitorConfig);
-      _workloadResourceAggregator =
-          new WorkloadResourceAggregator(instanceId, instanceType, 
cpuSamplingEnabled, memorySamplingEnabled,
-              _watcherTask._queryMonitorConfig);
-      _resourceAggregators.put(TrackingScope.QUERY, _queryResourceAggregator);
-      _resourceAggregators.put(TrackingScope.WORKLOAD, 
_workloadResourceAggregator);
+      WorkloadBudgetManager workloadBudgetManager = 
WorkloadBudgetManager.get();
+      if (workloadBudgetManager.isEnabled()) {
+        _workloadResourceAggregator =
+            new WorkloadResourceAggregator(instanceId, instanceType, 
cpuSamplingEnabled, memorySamplingEnabled,
+                _watcherTask._queryMonitorConfig, workloadBudgetManager);
+      } else {
+        _workloadResourceAggregator = null;
+      }
       LOGGER.info(
           "Initialized ResourceUsageAccountant for {}: {} with 
cpuSamplingEnabled: {}, memorySamplingEnabled: {}",
           instanceType, instanceId, cpuSamplingEnabled, memorySamplingEnabled);
@@ -138,12 +140,13 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
       ThreadResourceTrackerImpl threadTracker = _threadLocalEntry.get();
       assert threadTracker.getThreadContext() != null;
       QueryExecutionContext executionContext = 
threadTracker.getThreadContext().getExecutionContext();
-      String queryId = executionContext.getCid();
-      String workloadName = executionContext.getWorkloadName();
       long cpuTimeNs = threadTracker.getCpuTimeNs();
       long allocatedBytes = threadTracker.getAllocatedBytes();
-      _queryResourceAggregator.updateUntrackedResourceUsage(queryId, 
cpuTimeNs, allocatedBytes);
-      _workloadResourceAggregator.updateUntrackedResourceUsage(workloadName, 
cpuTimeNs, allocatedBytes);
+      
_queryResourceAggregator.updateUntrackedResourceUsage(executionContext.getCid(),
 cpuTimeNs, allocatedBytes);
+      if (_workloadResourceAggregator != null) {
+        
_workloadResourceAggregator.updateUntrackedResourceUsage(executionContext.getWorkloadName(),
 cpuTimeNs,
+            allocatedBytes);
+      }
       threadTracker.clear();
     }
 
@@ -156,7 +159,14 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
       if (!_memorySamplingEnabled) {
         allocatedBytes = 0;
       }
-      
_resourceAggregators.get(trackingScope).updateUntrackedResourceUsage(identifier,
 cpuTimeNs, allocatedBytes);
+      if (trackingScope == TrackingScope.QUERY) {
+        _queryResourceAggregator.updateUntrackedResourceUsage(identifier, 
cpuTimeNs, allocatedBytes);
+      } else {
+        assert trackingScope == TrackingScope.WORKLOAD;
+        if (_workloadResourceAggregator != null) {
+          _workloadResourceAggregator.updateUntrackedResourceUsage(identifier, 
cpuTimeNs, allocatedBytes);
+        }
+      }
     }
 
     @Override
@@ -192,7 +202,8 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
     private class WatcherTask implements Runnable, 
PinotClusterConfigChangeListener {
       final AtomicReference<QueryMonitorConfig> _queryMonitorConfig = new 
AtomicReference<>();
 
-      int _sleepTime;
+      long _nextQueryAggregateTimeMs;
+      long _nextWorkloadAggregateTimeMs;
 
       WatcherTask(PinotConfiguration config) {
         QueryMonitorConfig queryMonitorConfig = new QueryMonitorConfig(config, 
ResourceUsageUtils.getMaxHeapSize());
@@ -209,49 +220,62 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
         try {
           //noinspection InfiniteLoopStatement
           while (true) {
-            try {
-              runOnce();
-            } finally {
+            int sleepTimeMs = runOnce();
+            if (sleepTimeMs > 0) {
               //noinspection BusyWait
-              Thread.sleep(_sleepTime);
+              Thread.sleep(sleepTimeMs);
             }
           }
         } catch (InterruptedException e) {
           LOGGER.warn("WatcherTask interrupted, exiting.");
+        } catch (Error e) {
+          LOGGER.error("Caught unexpected error in WatcherTask", e);
         }
       }
 
-      private void runOnce() {
+      private int runOnce() {
+        boolean runQueryAggregate = false;
+        boolean runWorkloadAggregate = false;
+        int sleepTimeMs;
         try {
-          runPreAggregation();
-          runAggregation();
-          runPostAggregation();
-        } catch (Exception e) {
+          if (_nextQueryAggregateTimeMs <= System.currentTimeMillis()) {
+            runQueryAggregate = true;
+            runAggregate(_queryResourceAggregator);
+          }
+          if (_workloadResourceAggregator != null && 
_nextWorkloadAggregateTimeMs <= System.currentTimeMillis()) {
+            runWorkloadAggregate = true;
+            runAggregate(_workloadResourceAggregator);
+          }
+        } catch (RuntimeException e) {
           LOGGER.error("Caught exception while executing stats aggregation and 
query kill", e);
           // TODO: Add a metric to track the number of watcher task errors.
         } finally {
           LOGGER.debug("_threadTrackers size: {}", _threadTrackers.size());
-          for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-            resourceAggregator.cleanUpPostAggregation();
+          if (runQueryAggregate) {
+            _queryResourceAggregator.cleanUpPostAggregation();
           }
-          // Get sleeptime from both resourceAggregators. Pick the minimum. 
PerQuery Accountant modifies the sleep
-          // time when condition is critical.
-          int sleepTime = Integer.MAX_VALUE;
-          for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-            sleepTime = Math.min(sleepTime, 
resourceAggregator.getAggregationSleepTimeMs());
+          if (runWorkloadAggregate) {
+            _workloadResourceAggregator.cleanUpPostAggregation();
           }
-          _sleepTime = sleepTime;
-        }
-      }
-
-      private void runPreAggregation() {
-        // Call the pre-aggregation methods for each ResourceAggregator.
-        for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-          resourceAggregator.preAggregate(_threadTrackers.values());
+          long currentTimeMs = System.currentTimeMillis();
+          if (runQueryAggregate) {
+            _nextQueryAggregateTimeMs = currentTimeMs + 
_queryResourceAggregator.getAggregationSleepTimeMs();
+          }
+          if (runWorkloadAggregate) {
+            _nextWorkloadAggregateTimeMs = currentTimeMs + 
_workloadResourceAggregator.getAggregationSleepTimeMs();
+          }
+          assert _nextQueryAggregateTimeMs > 0;
+          long nextAggregateTimeMs = _nextQueryAggregateTimeMs;
+          if (_nextWorkloadAggregateTimeMs > 0) {
+            nextAggregateTimeMs = Math.min(nextAggregateTimeMs, 
_nextWorkloadAggregateTimeMs);
+          }
+          sleepTimeMs = (int) (nextAggregateTimeMs - currentTimeMs);
         }
+        return sleepTimeMs;
       }
 
-      private void runAggregation() {
+      private void runAggregate(ResourceAggregator resourceAggregator) {
+        resourceAggregator.preAggregate(_threadTrackers.values());
         Iterator<Map.Entry<Thread, ThreadResourceTrackerImpl>> iterator = 
_threadTrackers.entrySet().iterator();
         while (iterator.hasNext()) {
           Map.Entry<Thread, ThreadResourceTrackerImpl> entry = iterator.next();
@@ -260,17 +284,10 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
             LOGGER.debug("Thread: {} is no longer alive, removing it from 
_threadTrackers", thread.getName());
             iterator.remove();
           } else {
-            for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-              resourceAggregator.aggregate(entry.getValue());
-            }
+            resourceAggregator.aggregate(entry.getValue());
           }
         }
-      }
-
-      private void runPostAggregation() {
-        for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-          resourceAggregator.postAggregate();
-        }
+        resourceAggregator.postAggregate();
       }
 
       @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java
index 12dc6a818b6..80e15690cd5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java
@@ -56,13 +56,15 @@ public class WorkloadResourceAggregator implements 
ResourceAggregator {
   private Map<String, LongLongMutablePair> _currentCpuMemUsage = new 
HashMap<>();
 
   public WorkloadResourceAggregator(String instanceId, InstanceType 
instanceType, boolean cpuSamplingEnabled,
-      boolean memorySamplingEnabled, AtomicReference<QueryMonitorConfig> 
queryMonitorConfig) {
+      boolean memorySamplingEnabled, AtomicReference<QueryMonitorConfig> 
queryMonitorConfig,
+      WorkloadBudgetManager workloadBudgetManager) {
+    assert workloadBudgetManager.isEnabled();
     _instanceId = instanceId;
     _instanceType = instanceType;
     _cpuSamplingEnabled = cpuSamplingEnabled;
     _memorySamplingEnabled = memorySamplingEnabled;
     _queryMonitorConfig = queryMonitorConfig;
-    _workloadBudgetManager = WorkloadBudgetManager.get();
+    _workloadBudgetManager = workloadBudgetManager;
   }
 
   @Override
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
index 6f9ac882a37..d0da2c62166 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
@@ -47,13 +47,13 @@ public class WorkloadBudgetManager {
   private long _enforcementWindowMs;
   private ConcurrentHashMap<String, Budget> _workloadBudgets;
   private final ScheduledExecutorService _resetScheduler = 
Executors.newSingleThreadScheduledExecutor();
-  private volatile boolean _isEnabled;
+  private volatile boolean _enabled;
 
   public WorkloadBudgetManager(PinotConfiguration config) {
-    _isEnabled = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION,
+    _enabled = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION,
         CommonConstants.Accounting.DEFAULT_WORKLOAD_ENABLE_COST_COLLECTION);
     // Return an object even if disabled. All functionalities of this class 
will be noops.
-    if (!_isEnabled) {
+    if (!_enabled) {
       LOGGER.info("WorkloadBudgetManager is disabled. Creating a no-op 
instance.");
       return;
     }
@@ -65,6 +65,10 @@ public class WorkloadBudgetManager {
     LOGGER.info("WorkloadBudgetManager initialized with enforcement window: 
{}ms", _enforcementWindowMs);
   }
 
+  public boolean isEnabled() {
+    return _enabled;
+  }
+
   /**
    * This budget is primarily meant to be used for queries that need to be 
issued in a low priority manner.
    * This is fixed budget allocated during host startup and used across all 
secondary queries.
@@ -96,10 +100,10 @@ public class WorkloadBudgetManager {
   }
 
   public void shutdown() {
-    if (!_isEnabled) {
+    if (!_enabled) {
       return;
     }
-    _isEnabled = false;
+    _enabled = false;
     _resetScheduler.shutdownNow();
     try {
       if (!_resetScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -115,7 +119,7 @@ public class WorkloadBudgetManager {
    * Adds or updates budget for a workload (Thread-Safe).
    */
   public void addOrUpdateWorkload(String workload, long cpuBudgetNs, long 
memoryBudgetBytes) {
-    if (!_isEnabled) {
+    if (!_enabled) {
       LOGGER.info("WorkloadBudgetManager is disabled. Not adding/updating 
workload: {}", workload);
       return;
     }
@@ -126,7 +130,7 @@ public class WorkloadBudgetManager {
   }
 
   public void deleteWorkload(String workload) {
-    if (!_isEnabled) {
+    if (!_enabled) {
       LOGGER.info("WorkloadBudgetManager is disabled. Not deleting workload: 
{}", workload);
       return;
     }
@@ -147,7 +151,7 @@ public class WorkloadBudgetManager {
    * Returns the remaining budget for CPU and memory after charge.
    */
   public BudgetStats tryCharge(String workload, long cpuUsedNs, long 
memoryUsedBytes) {
-    if (!_isEnabled) {
+    if (!_enabled) {
       return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE, 
Long.MAX_VALUE);
     }
 
@@ -163,7 +167,7 @@ public class WorkloadBudgetManager {
    * Retrieves the initial and remaining budget for a workload.
    */
   public BudgetStats getBudgetStats(String workload) {
-    if (!_isEnabled) {
+    if (!_enabled) {
       return null;
     }
     Budget budget = _workloadBudgets.get(workload);
@@ -174,7 +178,7 @@ public class WorkloadBudgetManager {
    * Retrieves the total remaining budget across all workloads (Thread-Safe).
    */
   public BudgetStats getRemainingBudgetAcrossAllWorkloads() {
-    if (!_isEnabled) {
+    if (!_enabled) {
       return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE, 
Long.MAX_VALUE);
     }
     long totalCpuBudget =
@@ -226,7 +230,7 @@ public class WorkloadBudgetManager {
    */
   public boolean canAdmitQuery(String workload) {
     // If disabled or no budget configured, always admit
-    if (!_isEnabled) {
+    if (!_enabled) {
       return true;
     }
     Budget budget = _workloadBudgets.get(workload);
@@ -239,7 +243,7 @@ public class WorkloadBudgetManager {
   }
 
   public Map<String, BudgetStats> getAllBudgetStats() {
-    if (!_isEnabled) {
+    if (!_enabled) {
       return null;
     }
     Map<String, BudgetStats> allStats = new ConcurrentHashMap<>();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b76754e50b5..7b5e6cf3685 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1728,21 +1728,18 @@ public class CommonConstants {
      *  - Instance Config: enableThreadAllocatedBytesMeasurement = true
      */
 
-    public static final String CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION =
-        "accounting.workload.enable.cost.collection";
+    public static final String CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION = 
"accounting.workload.enable.cost.collection";
     public static final boolean DEFAULT_WORKLOAD_ENABLE_COST_COLLECTION = 
false;
 
     public static final String CONFIG_OF_WORKLOAD_ENABLE_COST_ENFORCEMENT =
         "accounting.workload.enable.cost.enforcement";
     public static final boolean DEFAULT_WORKLOAD_ENABLE_COST_ENFORCEMENT = 
false;
 
-    public static final String CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS =
-        "accounting.workload.enforcement.window.ms";
+    public static final String CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS = 
"accounting.workload.enforcement.window.ms";
     public static final long DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS = 60_000L;
 
-    public static final String CONFIG_OF_WORKLOAD_SLEEP_TIME_MS =
-        "accounting.workload.sleep.time.ms";
-    public static final int DEFAULT_WORKLOAD_SLEEP_TIME_MS = 1;
+    public static final String CONFIG_OF_WORKLOAD_SLEEP_TIME_MS = 
"accounting.workload.sleep.time.ms";
+    public static final int DEFAULT_WORKLOAD_SLEEP_TIME_MS = 100;
 
     public static final String DEFAULT_WORKLOAD_NAME = "default";
     public static final String CONFIG_OF_SECONDARY_WORKLOAD_NAME = 
"accounting.secondary.workload.name";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to