Copilot commented on code in PR #17753:
URL: https://github.com/apache/pinot/pull/17753#discussion_r2844196289


##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java:
##########
@@ -209,49 +220,62 @@ public void run() {
         try {
           //noinspection InfiniteLoopStatement
           while (true) {
-            try {
-              runOnce();
-            } finally {
+            int sleepTimeMs = runOnce();
+            if (sleepTimeMs > 0) {
               //noinspection BusyWait
-              Thread.sleep(_sleepTime);
+              Thread.sleep(sleepTimeMs);
             }
           }
         } catch (InterruptedException e) {
           LOGGER.warn("WatcherTask interrupted, exiting.");
+        } catch (Error e) {
+          LOGGER.error("Caught unexpected error in WatcherTask", e);
         }
       }
 
-      private void runOnce() {
+      private int runOnce() {
+        boolean runQueryAggregate = false;
+        boolean runWorkloadAggregate = false;
+        int sleepTimeMs;
         try {
-          runPreAggregation();
-          runAggregation();
-          runPostAggregation();
-        } catch (Exception e) {
+          if (_nextQueryAggregateTimeMs <= System.currentTimeMillis()) {
+            runQueryAggregate = true;
+            runAggregate(_queryResourceAggregator);
+          }
+          if (_workloadResourceAggregator != null && 
_nextWorkloadAggregateTimeMs <= System.currentTimeMillis()) {
+            runWorkloadAggregate = true;
+            runAggregate(_workloadResourceAggregator);
+          }
+        } catch (RuntimeException e) {
           LOGGER.error("Caught exception while executing stats aggregation and 
query kill", e);
           // TODO: Add a metric to track the number of watcher task errors.
         } finally {
           LOGGER.debug("_threadTrackers size: {}", _threadTrackers.size());
-          for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-            resourceAggregator.cleanUpPostAggregation();
+          if (runQueryAggregate) {
+            _queryResourceAggregator.cleanUpPostAggregation();
           }
-          // Get sleeptime from both resourceAggregators. Pick the minimum. 
PerQuery Accountant modifies the sleep
-          // time when condition is critical.
-          int sleepTime = Integer.MAX_VALUE;
-          for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-            sleepTime = Math.min(sleepTime, 
resourceAggregator.getAggregationSleepTimeMs());
+          if (runWorkloadAggregate) {
+            _workloadResourceAggregator.cleanUpPostAggregation();
           }
-          _sleepTime = sleepTime;
-        }
-      }
-
-      private void runPreAggregation() {
-        // Call the pre-aggregation methods for each ResourceAggregator.
-        for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-          resourceAggregator.preAggregate(_threadTrackers.values());
+          long currentTimeMs = System.currentTimeMillis();
+          if (runQueryAggregate) {
+            _nextQueryAggregateTimeMs = currentTimeMs + 
_queryResourceAggregator.getAggregationSleepTimeMs();
+          }
+          if (runWorkloadAggregate) {
+            _nextWorkloadAggregateTimeMs = currentTimeMs + 
_workloadResourceAggregator.getAggregationSleepTimeMs();
+          }
+          assert _nextQueryAggregateTimeMs > 0;
+          long nextAggregateTimeMs = _nextQueryAggregateTimeMs;
+          if (_nextWorkloadAggregateTimeMs > 0) {
+            nextAggregateTimeMs = Math.min(nextAggregateTimeMs, 
_nextWorkloadAggregateTimeMs);
+          }
+          sleepTimeMs = (int) (nextAggregateTimeMs - currentTimeMs);

Review Comment:
   The sleep time calculation on line 272 can potentially result in a negative 
value if the aggregation processing time exceeds the scheduled interval (i.e., 
if currentTimeMs advances past nextAggregateTimeMs during processing). While 
the check on line 224 (`if (sleepTimeMs > 0)`) prevents sleeping with a 
negative value, a negative sleepTimeMs would cause the watcher thread to spin 
without sleeping, potentially consuming excessive CPU. Consider adding a 
safeguard to set sleepTimeMs to a minimum value (e.g., 1ms) when it's 
calculated as negative or zero, to prevent tight loops.
   ```suggestion
             long sleepTimeMsLong = nextAggregateTimeMs - currentTimeMs;
             if (sleepTimeMsLong <= 0L) {
               // Ensure a minimum sleep time to avoid tight loops when 
processing overruns the interval.
               sleepTimeMs = 1;
             } else if (sleepTimeMsLong > Integer.MAX_VALUE) {
               // Cap at Integer.MAX_VALUE to avoid overflow when casting to 
int.
               sleepTimeMs = Integer.MAX_VALUE;
             } else {
               sleepTimeMs = (int) sleepTimeMsLong;
             }
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java:
##########
@@ -209,49 +220,62 @@ public void run() {
         try {
           //noinspection InfiniteLoopStatement
           while (true) {
-            try {
-              runOnce();
-            } finally {
+            int sleepTimeMs = runOnce();
+            if (sleepTimeMs > 0) {
               //noinspection BusyWait
-              Thread.sleep(_sleepTime);
+              Thread.sleep(sleepTimeMs);
             }
           }
         } catch (InterruptedException e) {
           LOGGER.warn("WatcherTask interrupted, exiting.");
+        } catch (Error e) {
+          LOGGER.error("Caught unexpected error in WatcherTask", e);
         }
       }
 
-      private void runOnce() {
+      private int runOnce() {
+        boolean runQueryAggregate = false;
+        boolean runWorkloadAggregate = false;
+        int sleepTimeMs;
         try {
-          runPreAggregation();
-          runAggregation();
-          runPostAggregation();
-        } catch (Exception e) {
+          if (_nextQueryAggregateTimeMs <= System.currentTimeMillis()) {
+            runQueryAggregate = true;
+            runAggregate(_queryResourceAggregator);
+          }
+          if (_workloadResourceAggregator != null && 
_nextWorkloadAggregateTimeMs <= System.currentTimeMillis()) {
+            runWorkloadAggregate = true;
+            runAggregate(_workloadResourceAggregator);
+          }
+        } catch (RuntimeException e) {
           LOGGER.error("Caught exception while executing stats aggregation and 
query kill", e);
           // TODO: Add a metric to track the number of watcher task errors.
         } finally {
           LOGGER.debug("_threadTrackers size: {}", _threadTrackers.size());
-          for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-            resourceAggregator.cleanUpPostAggregation();
+          if (runQueryAggregate) {
+            _queryResourceAggregator.cleanUpPostAggregation();
           }
-          // Get sleeptime from both resourceAggregators. Pick the minimum. 
PerQuery Accountant modifies the sleep
-          // time when condition is critical.
-          int sleepTime = Integer.MAX_VALUE;
-          for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-            sleepTime = Math.min(sleepTime, 
resourceAggregator.getAggregationSleepTimeMs());
+          if (runWorkloadAggregate) {
+            _workloadResourceAggregator.cleanUpPostAggregation();
           }
-          _sleepTime = sleepTime;
-        }
-      }
-
-      private void runPreAggregation() {
-        // Call the pre-aggregation methods for each ResourceAggregator.
-        for (ResourceAggregator resourceAggregator : 
_resourceAggregators.values()) {
-          resourceAggregator.preAggregate(_threadTrackers.values());
+          long currentTimeMs = System.currentTimeMillis();
+          if (runQueryAggregate) {
+            _nextQueryAggregateTimeMs = currentTimeMs + 
_queryResourceAggregator.getAggregationSleepTimeMs();
+          }
+          if (runWorkloadAggregate) {
+            _nextWorkloadAggregateTimeMs = currentTimeMs + 
_workloadResourceAggregator.getAggregationSleepTimeMs();
+          }
+          assert _nextQueryAggregateTimeMs > 0;
+          long nextAggregateTimeMs = _nextQueryAggregateTimeMs;
+          if (_nextWorkloadAggregateTimeMs > 0) {
+            nextAggregateTimeMs = Math.min(nextAggregateTimeMs, 
_nextWorkloadAggregateTimeMs);
+          }
+          sleepTimeMs = (int) (nextAggregateTimeMs - currentTimeMs);
         }

Review Comment:
   The finally block contains operations that could potentially throw 
exceptions (cleanUpPostAggregation, getAggregationSleepTimeMs), which would not 
be caught by the RuntimeException catch block on line 249. If any of these 
operations throw an exception, it will propagate up to the run() method where 
it would only be caught by the Error handler on line 231, which is 
inappropriate for RuntimeExceptions. This could cause the watcher thread to 
terminate unexpectedly. Consider wrapping the finally block contents in a 
try-catch or moving the timestamp update logic to ensure it's always executed 
even if cleanup fails.



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java:
##########
@@ -192,7 +202,8 @@ public Collection<? extends ThreadResourceTracker> 
getThreadResources() {
     private class WatcherTask implements Runnable, 
PinotClusterConfigChangeListener {
       final AtomicReference<QueryMonitorConfig> _queryMonitorConfig = new 
AtomicReference<>();
 
-      int _sleepTime;
+      long _nextQueryAggregateTimeMs;
+      long _nextWorkloadAggregateTimeMs;

Review Comment:
   The fields `_nextQueryAggregateTimeMs` and `_nextWorkloadAggregateTimeMs` 
are not initialized in the constructor. They default to 0, which means on the 
first call to `runOnce()`, both conditions on lines 241 and 245 will be true 
(since 0 <= currentTimeMs), causing both aggregations to run immediately. While 
this may be intentional for the first run, it would be clearer to explicitly 
initialize these fields to 0 or to the current time in the constructor to 
document the intended behavior.



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/WorkloadResourceAggregator.java:
##########
@@ -56,13 +56,15 @@ public class WorkloadResourceAggregator implements 
ResourceAggregator {
   private Map<String, LongLongMutablePair> _currentCpuMemUsage = new 
HashMap<>();
 
   public WorkloadResourceAggregator(String instanceId, InstanceType 
instanceType, boolean cpuSamplingEnabled,
-      boolean memorySamplingEnabled, AtomicReference<QueryMonitorConfig> 
queryMonitorConfig) {
+      boolean memorySamplingEnabled, AtomicReference<QueryMonitorConfig> 
queryMonitorConfig,
+      WorkloadBudgetManager workloadBudgetManager) {
+    assert workloadBudgetManager.isEnabled();

Review Comment:
   Using an assertion to validate that the workloadBudgetManager is enabled is 
problematic because assertions can be disabled at runtime with the `-da` JVM 
flag. If assertions are disabled and this constructor is called with a disabled 
WorkloadBudgetManager, the subsequent code will proceed with an incorrectly 
initialized object, potentially causing NullPointerExceptions or incorrect 
behavior. Consider replacing this assertion with an explicit check that throws 
an IllegalArgumentException or IllegalStateException.
   ```suggestion
       if (workloadBudgetManager == null || !workloadBudgetManager.isEnabled()) 
{
         throw new IllegalArgumentException("workloadBudgetManager must be 
non-null and enabled");
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to