Copilot commented on code in PR #17753:
URL: https://github.com/apache/pinot/pull/17753#discussion_r2844196289
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java:
##########
@@ -209,49 +220,62 @@ public void run() {
try {
//noinspection InfiniteLoopStatement
while (true) {
- try {
- runOnce();
- } finally {
+ int sleepTimeMs = runOnce();
+ if (sleepTimeMs > 0) {
//noinspection BusyWait
- Thread.sleep(_sleepTime);
+ Thread.sleep(sleepTimeMs);
}
}
} catch (InterruptedException e) {
LOGGER.warn("WatcherTask interrupted, exiting.");
+ } catch (Error e) {
+ LOGGER.error("Caught unexpected error in WatcherTask", e);
}
}
- private void runOnce() {
+ private int runOnce() {
+ boolean runQueryAggregate = false;
+ boolean runWorkloadAggregate = false;
+ int sleepTimeMs;
try {
- runPreAggregation();
- runAggregation();
- runPostAggregation();
- } catch (Exception e) {
+ if (_nextQueryAggregateTimeMs <= System.currentTimeMillis()) {
+ runQueryAggregate = true;
+ runAggregate(_queryResourceAggregator);
+ }
+ if (_workloadResourceAggregator != null &&
_nextWorkloadAggregateTimeMs <= System.currentTimeMillis()) {
+ runWorkloadAggregate = true;
+ runAggregate(_workloadResourceAggregator);
+ }
+ } catch (RuntimeException e) {
LOGGER.error("Caught exception while executing stats aggregation and
query kill", e);
// TODO: Add a metric to track the number of watcher task errors.
} finally {
LOGGER.debug("_threadTrackers size: {}", _threadTrackers.size());
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- resourceAggregator.cleanUpPostAggregation();
+ if (runQueryAggregate) {
+ _queryResourceAggregator.cleanUpPostAggregation();
}
- // Get sleeptime from both resourceAggregators. Pick the minimum.
PerQuery Accountant modifies the sleep
- // time when condition is critical.
- int sleepTime = Integer.MAX_VALUE;
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- sleepTime = Math.min(sleepTime,
resourceAggregator.getAggregationSleepTimeMs());
+ if (runWorkloadAggregate) {
+ _workloadResourceAggregator.cleanUpPostAggregation();
}
- _sleepTime = sleepTime;
- }
- }
-
- private void runPreAggregation() {
- // Call the pre-aggregation methods for each ResourceAggregator.
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- resourceAggregator.preAggregate(_threadTrackers.values());
+ long currentTimeMs = System.currentTimeMillis();
+ if (runQueryAggregate) {
+ _nextQueryAggregateTimeMs = currentTimeMs +
_queryResourceAggregator.getAggregationSleepTimeMs();
+ }
+ if (runWorkloadAggregate) {
+ _nextWorkloadAggregateTimeMs = currentTimeMs +
_workloadResourceAggregator.getAggregationSleepTimeMs();
+ }
+ assert _nextQueryAggregateTimeMs > 0;
+ long nextAggregateTimeMs = _nextQueryAggregateTimeMs;
+ if (_nextWorkloadAggregateTimeMs > 0) {
+ nextAggregateTimeMs = Math.min(nextAggregateTimeMs,
_nextWorkloadAggregateTimeMs);
+ }
+ sleepTimeMs = (int) (nextAggregateTimeMs - currentTimeMs);
Review Comment:
The sleep time calculation on line 272 can potentially result in a negative
value if the aggregation processing time exceeds the scheduled interval (i.e.,
if currentTimeMs advances past nextAggregateTimeMs during processing). While
the check on line 224 (`if (sleepTimeMs > 0)`) prevents sleeping with a
negative value, a negative sleepTimeMs would cause the watcher thread to spin
without sleeping, potentially consuming excessive CPU. Consider adding a
safeguard to set sleepTimeMs to a minimum value (e.g., 1ms) when it's
calculated as negative or zero, to prevent tight loops.
```suggestion
long sleepTimeMsLong = nextAggregateTimeMs - currentTimeMs;
if (sleepTimeMsLong <= 0L) {
// Ensure a minimum sleep time to avoid tight loops when
processing overruns the interval.
sleepTimeMs = 1;
} else if (sleepTimeMsLong > Integer.MAX_VALUE) {
// Cap at Integer.MAX_VALUE to avoid overflow when casting to
int.
sleepTimeMs = Integer.MAX_VALUE;
} else {
sleepTimeMs = (int) sleepTimeMsLong;
}
```
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java:
##########
@@ -209,49 +220,62 @@ public void run() {
try {
//noinspection InfiniteLoopStatement
while (true) {
- try {
- runOnce();
- } finally {
+ int sleepTimeMs = runOnce();
+ if (sleepTimeMs > 0) {
//noinspection BusyWait
- Thread.sleep(_sleepTime);
+ Thread.sleep(sleepTimeMs);
}
}
} catch (InterruptedException e) {
LOGGER.warn("WatcherTask interrupted, exiting.");
+ } catch (Error e) {
+ LOGGER.error("Caught unexpected error in WatcherTask", e);
}
}
- private void runOnce() {
+ private int runOnce() {
+ boolean runQueryAggregate = false;
+ boolean runWorkloadAggregate = false;
+ int sleepTimeMs;
try {
- runPreAggregation();
- runAggregation();
- runPostAggregation();
- } catch (Exception e) {
+ if (_nextQueryAggregateTimeMs <= System.currentTimeMillis()) {
+ runQueryAggregate = true;
+ runAggregate(_queryResourceAggregator);
+ }
+ if (_workloadResourceAggregator != null &&
_nextWorkloadAggregateTimeMs <= System.currentTimeMillis()) {
+ runWorkloadAggregate = true;
+ runAggregate(_workloadResourceAggregator);
+ }
+ } catch (RuntimeException e) {
LOGGER.error("Caught exception while executing stats aggregation and
query kill", e);
// TODO: Add a metric to track the number of watcher task errors.
} finally {
LOGGER.debug("_threadTrackers size: {}", _threadTrackers.size());
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- resourceAggregator.cleanUpPostAggregation();
+ if (runQueryAggregate) {
+ _queryResourceAggregator.cleanUpPostAggregation();
}
- // Get sleeptime from both resourceAggregators. Pick the minimum.
PerQuery Accountant modifies the sleep
- // time when condition is critical.
- int sleepTime = Integer.MAX_VALUE;
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- sleepTime = Math.min(sleepTime,
resourceAggregator.getAggregationSleepTimeMs());
+ if (runWorkloadAggregate) {
+ _workloadResourceAggregator.cleanUpPostAggregation();
}
- _sleepTime = sleepTime;
- }
- }
-
- private void runPreAggregation() {
- // Call the pre-aggregation methods for each ResourceAggregator.
- for (ResourceAggregator resourceAggregator :
_resourceAggregators.values()) {
- resourceAggregator.preAggregate(_threadTrackers.values());
+ long currentTimeMs = System.currentTimeMillis();
+ if (runQueryAggregate) {
+ _nextQueryAggregateTimeMs = currentTimeMs +
_queryResourceAggregator.getAggregationSleepTimeMs();
+ }
+ if (runWorkloadAggregate) {
+ _nextWorkloadAggregateTimeMs = currentTimeMs +
_workloadResourceAggregator.getAggregationSleepTimeMs();
+ }
+ assert _nextQueryAggregateTimeMs > 0;
+ long nextAggregateTimeMs = _nextQueryAggregateTimeMs;
+ if (_nextWorkloadAggregateTimeMs > 0) {
+ nextAggregateTimeMs = Math.min(nextAggregateTimeMs,
_nextWorkloadAggregateTimeMs);
+ }
+ sleepTimeMs = (int) (nextAggregateTimeMs - currentTimeMs);
}
Review Comment:
The finally block contains operations that could potentially throw
exceptions (cleanUpPostAggregation, getAggregationSleepTimeMs), which would not
be caught by the RuntimeException catch block on line 249. If any of these
operations throw an exception, it will propagate up to the run() method where
it would only be caught by the Error handler on line 231, which is
inappropriate for RuntimeExceptions. This could cause the watcher thread to
terminate unexpectedly. Consider wrapping the finally block contents in a
try-catch or moving the timestamp update logic to ensure it's always executed
even if cleanup fails.
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java:
##########
@@ -192,7 +202,8 @@ public Collection<? extends ThreadResourceTracker>
getThreadResources() {
private class WatcherTask implements Runnable,
PinotClusterConfigChangeListener {
final AtomicReference<QueryMonitorConfig> _queryMonitorConfig = new
AtomicReference<>();
- int _sleepTime;
+ long _nextQueryAggregateTimeMs;
+ long _nextWorkloadAggregateTimeMs;
Review Comment:
The fields `_nextQueryAggregateTimeMs` and `_nextWorkloadAggregateTimeMs`
are not initialized in the constructor. They default to 0, which means on the
first call to `runOnce()`, both conditions on lines 241 and 245 will be true
(since 0 <= currentTimeMs), causing both aggregations to run immediately. While
this may be intentional for the first run, it would be clearer to explicitly
initialize these fields to 0 or to the current time in the constructor to
document the intended behavior.
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java:
##########
@@ -56,13 +56,15 @@ public class WorkloadResourceAggregator implements
ResourceAggregator {
private Map<String, LongLongMutablePair> _currentCpuMemUsage = new
HashMap<>();
public WorkloadResourceAggregator(String instanceId, InstanceType
instanceType, boolean cpuSamplingEnabled,
- boolean memorySamplingEnabled, AtomicReference<QueryMonitorConfig>
queryMonitorConfig) {
+ boolean memorySamplingEnabled, AtomicReference<QueryMonitorConfig>
queryMonitorConfig,
+ WorkloadBudgetManager workloadBudgetManager) {
+ assert workloadBudgetManager.isEnabled();
Review Comment:
Using an assertion to validate that the workloadBudgetManager is enabled is
problematic because assertions can be disabled at runtime with the `-da` JVM
flag. If assertions are disabled and this constructor is called with a disabled
WorkloadBudgetManager, the subsequent code will proceed with an incorrectly
initialized object, potentially causing NullPointerExceptions or incorrect
behavior. Consider replacing this assertion with an explicit check that throws
an IllegalArgumentException or IllegalStateException.
```suggestion
if (workloadBudgetManager == null || !workloadBudgetManager.isEnabled())
{
throw new IllegalArgumentException("workloadBudgetManager must be
non-null and enabled");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]