This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3132cfca90b Fix wrong interval for OOM protection (#17753)
3132cfca90b is described below
commit 3132cfca90bc06a860b233755fcacff195889a6c
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Feb 24 10:35:17 2026 -0800
Fix wrong interval for OOM protection (#17753)
---
.../accounting/ResourceUsageAccountantFactory.java | 111 ++++++++++++---------
.../accounting/WorkloadResourceAggregator.java | 6 +-
.../spi/accounting/WorkloadBudgetManager.java | 28 +++---
.../apache/pinot/spi/utils/CommonConstants.java | 11 +-
4 files changed, 88 insertions(+), 68 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index 176262c8ef7..0ca20422122 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -19,7 +19,6 @@
package org.apache.pinot.core.accounting;
import java.util.Collection;
-import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -34,6 +33,7 @@ import
org.apache.pinot.spi.accounting.ThreadAccountantFactory;
import org.apache.pinot.spi.accounting.ThreadResourceTracker;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.accounting.TrackingScope;
+import org.apache.pinot.spi.accounting.WorkloadBudgetManager;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -80,7 +80,6 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
private final WatcherTask _watcherTask;
private final QueryResourceAggregator _queryResourceAggregator;
private final WorkloadResourceAggregator _workloadResourceAggregator;
- private final EnumMap<TrackingScope, ResourceAggregator>
_resourceAggregators = new EnumMap<>(TrackingScope.class);
public ResourceUsageAccountant(PinotConfiguration config, String
instanceId, InstanceType instanceType) {
LOGGER.info("Initializing ResourceUsageAccountant");
@@ -103,11 +102,14 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
_queryResourceAggregator =
new QueryResourceAggregator(instanceId, instanceType,
cpuSamplingEnabled, memorySamplingEnabled,
_watcherTask._queryMonitorConfig);
- _workloadResourceAggregator =
- new WorkloadResourceAggregator(instanceId, instanceType,
cpuSamplingEnabled, memorySamplingEnabled,
- _watcherTask._queryMonitorConfig);
- _resourceAggregators.put(TrackingScope.QUERY, _queryResourceAggregator);
- _resourceAggregators.put(TrackingScope.WORKLOAD,
_workloadResourceAggregator);
+ WorkloadBudgetManager workloadBudgetManager =
WorkloadBudgetManager.get();
+ if (workloadBudgetManager.isEnabled()) {
+ _workloadResourceAggregator =
+ new WorkloadResourceAggregator(instanceId, instanceType,
cpuSamplingEnabled, memorySamplingEnabled,
+ _watcherTask._queryMonitorConfig, workloadBudgetManager);
+ } else {
+ _workloadResourceAggregator = null;
+ }
LOGGER.info(
"Initialized ResourceUsageAccountant for {}: {} with
cpuSamplingEnabled: {}, memorySamplingEnabled: {}",
instanceType, instanceId, cpuSamplingEnabled, memorySamplingEnabled);
@@ -138,12 +140,13 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
ThreadResourceTrackerImpl threadTracker = _threadLocalEntry.get();
assert threadTracker.getThreadContext() != null;
QueryExecutionContext executionContext =
threadTracker.getThreadContext().getExecutionContext();
- String queryId = executionContext.getCid();
- String workloadName = executionContext.getWorkloadName();
long cpuTimeNs = threadTracker.getCpuTimeNs();
long allocatedBytes = threadTracker.getAllocatedBytes();
- _queryResourceAggregator.updateUntrackedResourceUsage(queryId,
cpuTimeNs, allocatedBytes);
- _workloadResourceAggregator.updateUntrackedResourceUsage(workloadName,
cpuTimeNs, allocatedBytes);
+
_queryResourceAggregator.updateUntrackedResourceUsage(executionContext.getCid(),
cpuTimeNs, allocatedBytes);
+ if (_workloadResourceAggregator != null) {
+
_workloadResourceAggregator.updateUntrackedResourceUsage(executionContext.getWorkloadName(),
cpuTimeNs,
+ allocatedBytes);
+ }
threadTracker.clear();
}
@@ -156,7 +159,14 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
if (!_memorySamplingEnabled) {
allocatedBytes = 0;
}
-
_resourceAggregators.get(trackingScope).updateUntrackedResourceUsage(identifier,
cpuTimeNs, allocatedBytes);
+ if (trackingScope == TrackingScope.QUERY) {
+ _queryResourceAggregator.updateUntrackedResourceUsage(identifier,
cpuTimeNs, allocatedBytes);
+ } else {
+ assert trackingScope == TrackingScope.WORKLOAD;
+ if (_workloadResourceAggregator != null) {
+ _workloadResourceAggregator.updateUntrackedResourceUsage(identifier,
cpuTimeNs, allocatedBytes);
+ }
+ }
}
@Override
@@ -192,7 +202,8 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
private class WatcherTask implements Runnable,
PinotClusterConfigChangeListener {
final AtomicReference<QueryMonitorConfig> _queryMonitorConfig = new
AtomicReference<>();
- int _sleepTime;
+ long _nextQueryAggregateTimeMs;
+ long _nextWorkloadAggregateTimeMs;
WatcherTask(PinotConfiguration config) {
QueryMonitorConfig queryMonitorConfig = new QueryMonitorConfig(config,
ResourceUsageUtils.getMaxHeapSize());
@@ -209,49 +220,62 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
try {
//noinspection InfiniteLoopStatement
while (true) {
- try {
- runOnce();
- } finally {
+ int sleepTimeMs = runOnce();
+ if (sleepTimeMs > 0) {
//noinspection BusyWait
- Thread.sleep(_sleepTime);
+ Thread.sleep(sleepTimeMs);
}
}
} catch (InterruptedException e) {
LOGGER.warn("WatcherTask interrupted, exiting.");
+ } catch (Error e) {
+ LOGGER.error("Caught unexpected error in WatcherTask", e);
}
}
- private void runOnce() {
+ private int runOnce() {
+ boolean runQueryAggregate = false;
+ boolean runWorkloadAggregate = false;
+ int sleepTimeMs;
try {
- runPreAggregation();
- runAggregation();
- runPostAggregation();
- } catch (Exception e) {
+ if (_nextQueryAggregateTimeMs <= System.currentTimeMillis()) {
+ runQueryAggregate = true;
+ runAggregate(_queryResourceAggregator);
+ }
+ if (_workloadResourceAggregator != null &&
_nextWorkloadAggregateTimeMs <= System.currentTimeMillis()) {
+ runWorkloadAggregate = true;
+ runAggregate(_workloadResourceAggregator);
+ }
+ } catch (RuntimeException e) {
LOGGER.error("Caught exception while executing stats aggregation and
query kill", e);
// TODO: Add a metric to track the number of watcher task errors.
} finally {
LOGGER.debug("_threadTrackers size: {}", _threadTrackers.size());
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- resourceAggregator.cleanUpPostAggregation();
+ if (runQueryAggregate) {
+ _queryResourceAggregator.cleanUpPostAggregation();
}
- // Get sleeptime from both resourceAggregators. Pick the minimum.
PerQuery Accountant modifies the sleep
- // time when condition is critical.
- int sleepTime = Integer.MAX_VALUE;
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- sleepTime = Math.min(sleepTime,
resourceAggregator.getAggregationSleepTimeMs());
+ if (runWorkloadAggregate) {
+ _workloadResourceAggregator.cleanUpPostAggregation();
}
- _sleepTime = sleepTime;
- }
- }
-
- private void runPreAggregation() {
- // Call the pre-aggregation methods for each ResourceAggregator.
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- resourceAggregator.preAggregate(_threadTrackers.values());
+ long currentTimeMs = System.currentTimeMillis();
+ if (runQueryAggregate) {
+ _nextQueryAggregateTimeMs = currentTimeMs +
_queryResourceAggregator.getAggregationSleepTimeMs();
+ }
+ if (runWorkloadAggregate) {
+ _nextWorkloadAggregateTimeMs = currentTimeMs +
_workloadResourceAggregator.getAggregationSleepTimeMs();
+ }
+ assert _nextQueryAggregateTimeMs > 0;
+ long nextAggregateTimeMs = _nextQueryAggregateTimeMs;
+ if (_nextWorkloadAggregateTimeMs > 0) {
+ nextAggregateTimeMs = Math.min(nextAggregateTimeMs,
_nextWorkloadAggregateTimeMs);
+ }
+ sleepTimeMs = (int) (nextAggregateTimeMs - currentTimeMs);
}
+ return sleepTimeMs;
}
- private void runAggregation() {
+ private void runAggregate(ResourceAggregator resourceAggregator) {
+ resourceAggregator.preAggregate(_threadTrackers.values());
Iterator<Map.Entry<Thread, ThreadResourceTrackerImpl>> iterator =
_threadTrackers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Thread, ThreadResourceTrackerImpl> entry = iterator.next();
@@ -260,17 +284,10 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
LOGGER.debug("Thread: {} is no longer alive, removing it from
_threadTrackers", thread.getName());
iterator.remove();
} else {
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- resourceAggregator.aggregate(entry.getValue());
- }
+ resourceAggregator.aggregate(entry.getValue());
}
}
- }
-
- private void runPostAggregation() {
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- resourceAggregator.postAggregate();
- }
+ resourceAggregator.postAggregate();
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java
index 12dc6a818b6..80e15690cd5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java
@@ -56,13 +56,15 @@ public class WorkloadResourceAggregator implements
ResourceAggregator {
private Map<String, LongLongMutablePair> _currentCpuMemUsage = new
HashMap<>();
public WorkloadResourceAggregator(String instanceId, InstanceType
instanceType, boolean cpuSamplingEnabled,
- boolean memorySamplingEnabled, AtomicReference<QueryMonitorConfig>
queryMonitorConfig) {
+ boolean memorySamplingEnabled, AtomicReference<QueryMonitorConfig>
queryMonitorConfig,
+ WorkloadBudgetManager workloadBudgetManager) {
+ assert workloadBudgetManager.isEnabled();
_instanceId = instanceId;
_instanceType = instanceType;
_cpuSamplingEnabled = cpuSamplingEnabled;
_memorySamplingEnabled = memorySamplingEnabled;
_queryMonitorConfig = queryMonitorConfig;
- _workloadBudgetManager = WorkloadBudgetManager.get();
+ _workloadBudgetManager = workloadBudgetManager;
}
@Override
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
index 6f9ac882a37..d0da2c62166 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/WorkloadBudgetManager.java
@@ -47,13 +47,13 @@ public class WorkloadBudgetManager {
private long _enforcementWindowMs;
private ConcurrentHashMap<String, Budget> _workloadBudgets;
private final ScheduledExecutorService _resetScheduler =
Executors.newSingleThreadScheduledExecutor();
- private volatile boolean _isEnabled;
+ private volatile boolean _enabled;
public WorkloadBudgetManager(PinotConfiguration config) {
- _isEnabled =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION,
+ _enabled =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION,
CommonConstants.Accounting.DEFAULT_WORKLOAD_ENABLE_COST_COLLECTION);
// Return an object even if disabled. All functionalities of this class
will be noops.
- if (!_isEnabled) {
+ if (!_enabled) {
LOGGER.info("WorkloadBudgetManager is disabled. Creating a no-op
instance.");
return;
}
@@ -65,6 +65,10 @@ public class WorkloadBudgetManager {
LOGGER.info("WorkloadBudgetManager initialized with enforcement window:
{}ms", _enforcementWindowMs);
}
+ public boolean isEnabled() {
+ return _enabled;
+ }
+
/**
* This budget is primarily meant to be used for queries that need to be
issued in a low priority manner.
* This is fixed budget allocated during host startup and used across all
secondary queries.
@@ -96,10 +100,10 @@ public class WorkloadBudgetManager {
}
public void shutdown() {
- if (!_isEnabled) {
+ if (!_enabled) {
return;
}
- _isEnabled = false;
+ _enabled = false;
_resetScheduler.shutdownNow();
try {
if (!_resetScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
@@ -115,7 +119,7 @@ public class WorkloadBudgetManager {
* Adds or updates budget for a workload (Thread-Safe).
*/
public void addOrUpdateWorkload(String workload, long cpuBudgetNs, long
memoryBudgetBytes) {
- if (!_isEnabled) {
+ if (!_enabled) {
LOGGER.info("WorkloadBudgetManager is disabled. Not adding/updating
workload: {}", workload);
return;
}
@@ -126,7 +130,7 @@ public class WorkloadBudgetManager {
}
public void deleteWorkload(String workload) {
- if (!_isEnabled) {
+ if (!_enabled) {
LOGGER.info("WorkloadBudgetManager is disabled. Not deleting workload:
{}", workload);
return;
}
@@ -147,7 +151,7 @@ public class WorkloadBudgetManager {
* Returns the remaining budget for CPU and memory after charge.
*/
public BudgetStats tryCharge(String workload, long cpuUsedNs, long
memoryUsedBytes) {
- if (!_isEnabled) {
+ if (!_enabled) {
return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE,
Long.MAX_VALUE);
}
@@ -163,7 +167,7 @@ public class WorkloadBudgetManager {
* Retrieves the initial and remaining budget for a workload.
*/
public BudgetStats getBudgetStats(String workload) {
- if (!_isEnabled) {
+ if (!_enabled) {
return null;
}
Budget budget = _workloadBudgets.get(workload);
@@ -174,7 +178,7 @@ public class WorkloadBudgetManager {
* Retrieves the total remaining budget across all workloads (Thread-Safe).
*/
public BudgetStats getRemainingBudgetAcrossAllWorkloads() {
- if (!_isEnabled) {
+ if (!_enabled) {
return new BudgetStats(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE,
Long.MAX_VALUE);
}
long totalCpuBudget =
@@ -226,7 +230,7 @@ public class WorkloadBudgetManager {
*/
public boolean canAdmitQuery(String workload) {
// If disabled or no budget configured, always admit
- if (!_isEnabled) {
+ if (!_enabled) {
return true;
}
Budget budget = _workloadBudgets.get(workload);
@@ -239,7 +243,7 @@ public class WorkloadBudgetManager {
}
public Map<String, BudgetStats> getAllBudgetStats() {
- if (!_isEnabled) {
+ if (!_enabled) {
return null;
}
Map<String, BudgetStats> allStats = new ConcurrentHashMap<>();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index b76754e50b5..7b5e6cf3685 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1728,21 +1728,18 @@ public class CommonConstants {
* - Instance Config: enableThreadAllocatedBytesMeasurement = true
*/
- public static final String CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION =
- "accounting.workload.enable.cost.collection";
+ public static final String CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION =
"accounting.workload.enable.cost.collection";
public static final boolean DEFAULT_WORKLOAD_ENABLE_COST_COLLECTION =
false;
public static final String CONFIG_OF_WORKLOAD_ENABLE_COST_ENFORCEMENT =
"accounting.workload.enable.cost.enforcement";
public static final boolean DEFAULT_WORKLOAD_ENABLE_COST_ENFORCEMENT =
false;
- public static final String CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS =
- "accounting.workload.enforcement.window.ms";
+ public static final String CONFIG_OF_WORKLOAD_ENFORCEMENT_WINDOW_MS =
"accounting.workload.enforcement.window.ms";
public static final long DEFAULT_WORKLOAD_ENFORCEMENT_WINDOW_MS = 60_000L;
- public static final String CONFIG_OF_WORKLOAD_SLEEP_TIME_MS =
- "accounting.workload.sleep.time.ms";
- public static final int DEFAULT_WORKLOAD_SLEEP_TIME_MS = 1;
+ public static final String CONFIG_OF_WORKLOAD_SLEEP_TIME_MS =
"accounting.workload.sleep.time.ms";
+ public static final int DEFAULT_WORKLOAD_SLEEP_TIME_MS = 100;
public static final String DEFAULT_WORKLOAD_NAME = "default";
public static final String CONFIG_OF_SECONDARY_WORKLOAD_NAME =
"accounting.secondary.workload.name";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]