vrajat commented on code in PR #16299:
URL: https://github.com/apache/pinot/pull/16299#discussion_r2200346666
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -715,46 +728,56 @@ private void logQueryMonitorConfig() {
@Override
public void run() {
while (true) {
- QueryMonitorConfig config = _queryMonitorConfig.get();
-
- LOGGER.debug("Running timed task for PerQueryCPUMemAccountant.");
- _triggeringLevel = TriggeringLevel.Normal;
- _sleepTime = config.getNormalSleepTime();
- _aggregatedUsagePerActiveQuery = null;
try {
- // Get the metrics used for triggering the kill
- collectTriggerMetrics();
- // Prioritize the panic check, kill ALL QUERIES immediately if
triggered
- if (outOfMemoryPanicTrigger()) {
- continue;
- }
- // Check for other triggers
- evalTriggers();
- // Refresh thread usage and aggregate to per query usage if
triggered
- _aggregatedUsagePerActiveQuery =
aggregate(_triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal());
- // post aggregation function
- postAggregation(_aggregatedUsagePerActiveQuery);
- // Act on one triggered actions
- triggeredActions();
- } catch (Exception e) {
- LOGGER.error("Caught exception while executing stats aggregation
and query kill", e);
+ runOnce();
} finally {
- LOGGER.debug(_aggregatedUsagePerActiveQuery == null ?
"_aggregatedUsagePerActiveQuery : null"
- : _aggregatedUsagePerActiveQuery.toString());
- LOGGER.debug("_threadEntriesMap size: {}",
_threadEntriesMap.size());
-
- // Publish server heap usage metrics
- if (config.isPublishHeapUsageMetric()) {
- _metrics.setValueOfGlobalGauge(_memoryUsageGauge, _usedBytes);
- }
- // Clean inactive query stats
- cleanInactive();
// Sleep for sometime
reschedule();
}
}
}
+ public void runOnce() {
+ QueryMonitorConfig config = _queryMonitorConfig.get();
+
+ LOGGER.debug("Running timed task for PerQueryCPUMemAccountant.");
+ _triggeringLevel = TriggeringLevel.Normal;
+ _sleepTime = config.getNormalSleepTime();
+ _aggregatedUsagePerActiveQuery = null;
+ try {
+ // Get the metrics used for triggering the kill
+ collectTriggerMetrics();
+ // Prioritize the panic check, kill ALL QUERIES immediately if
triggered
+ if (outOfMemoryPanicTrigger()) {
+ return;
+ }
+ // Check for other triggers
+ evalTriggers();
+ // Refresh thread usage and aggregate to per query usage if triggered
+ reapFinishedTasks();
+ if (_triggeringLevel.ordinal() > TriggeringLevel.Normal.ordinal()) {
+ _aggregatedUsagePerActiveQuery = getQueryResourcesImpl();
Review Comment:
This is a bit of screw-up and workaround to my uninformed decisions from
last year.
* A long time ago I added a couple of debug APIs to get `queryResourceUsage`
and `threadResourceUsage`.
* I added a couple of interfaces `QueryResourceTracker` and
`ThreadResourceTracker` because these function were required in the interface
`ThreadResourceAccountant` and the `getQueryResources` function.
* `getQueryResources` returned `Collection<String, ? extends
QueryResourceTracker>`.
* I also used them for system logging in Startree Pinot.
After I split the functionality in this PR, I could not use the interface
`Collection<String, ? extends QueryResourceTracker>` within this class as it
used some functions specific to `AggregatedStats` such as
`AggregatedStats.merge`. I had three choices.
* Add merge function to the interface. - this didnt seem the right option.
* Keep two copies of the function.
* Create Impl that returns `Map<String, AggregatedStats>` and is used
internally. Then the internal functions can call methods specific to
AggregatedStats.
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -868,8 +891,12 @@ void killAllQueries() {
* use XX:+ExplicitGCInvokesConcurrent to avoid a full gc when system.gc
is triggered
*/
private void killMostExpensiveQuery() {
+ if (!_isThreadMemorySamplingEnabled) {
+ LOGGER.warn("But unable to kill query memory tracking is enabled");
Review Comment:
I've fixed the log message. I just glossed over how wrong it was.
re: history of this function - check
https://github.com/apache/pinot/pull/16172 . I removed a block of code that:
if oom-based killing is enabled but memory sampling disabled and cpu
sampling is enabled, kill queries based on cpu usage.
I just removed that code block as it is a reaction to wrong configuration. I
had forgotten to remove the check for cpu sampling in that PR and cleaned it up
here.
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -870,23 +894,29 @@ private void killMostExpensiveQuery() {
}
AggregatedStats maxUsageTuple;
if (_isThreadMemorySamplingEnabled) {
- maxUsageTuple =
Collections.max(_aggregatedUsagePerActiveQuery.values(),
- Comparator.comparing(AggregatedStats::getAllocatedBytes));
- boolean shouldKill = config.isOomKillQueryEnabled()
- && maxUsageTuple._allocatedBytes >
config.getMinMemoryFootprintForKill();
- if (shouldKill) {
- maxUsageTuple._exceptionAtomicReference
- .set(new RuntimeException(String.format(
- " Query %s got killed because using %d bytes of memory on
%s: %s, exceeding the quota",
- maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(),
_instanceType, _instanceId)));
- interruptRunnerThread(maxUsageTuple.getAnchorThread());
- logTerminatedQuery(maxUsageTuple, _usedBytes);
- } else if (!config.isOomKillQueryEnabled()) {
- LOGGER.warn("Query {} got picked because using {} bytes of memory,
actual kill committed false "
- + "because oomKillQueryEnabled is false",
- maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
+ maxUsageTuple = _aggregatedUsagePerActiveQuery.values().stream()
+ .filter(stats ->
!_cancelSentQueries.contains(stats.getQueryId()))
+ .max(Comparator.comparing(AggregatedStats::getAllocatedBytes))
+ .orElse(null);
+ if (maxUsageTuple != null) {
+ boolean shouldKill = config.isOomKillQueryEnabled()
+ && maxUsageTuple._allocatedBytes >
config.getMinMemoryFootprintForKill();
+ if (shouldKill) {
+ maxUsageTuple._exceptionAtomicReference
+ .set(new RuntimeException(String.format(
+ " Query %s got killed because using %d bytes of memory
on %s: %s, exceeding the quota",
+ maxUsageTuple._queryId,
maxUsageTuple.getAllocatedBytes(), _instanceType, _instanceId)));
+ interruptRunnerThread(maxUsageTuple);
+ logTerminatedQuery(maxUsageTuple, _usedBytes);
+ } else if (!config.isOomKillQueryEnabled()) {
+ LOGGER.warn("Query {} got picked because using {} bytes of
memory, actual kill committed false "
+ + "because oomKillQueryEnabled is false",
+ maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
+ } else {
+ LOGGER.warn("But all queries are below quota, no query killed");
+ }
} else {
- LOGGER.warn("But all queries are below quota, no query killed");
+ LOGGER.warn("No query found to kill based on memory usage");
Review Comment:
Removed line.
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java:
##########
@@ -132,6 +132,8 @@ public static class PerQueryCPUMemResourceUsageAccountant
implements ThreadResou
protected final Set<String> _inactiveQuery;
+ protected Set<String> _cancelSentQueries;
Review Comment:
Its not possible to use it. Its not the best name. Probably a better name is
`seenQueries`. Stepping back, this is what it is used for:
* The feature has to keep track of resource usage of finished tasks. This is
tracked in `Map<String, AggregatedStats> _finishedTasksOfQuery` where key is
query id.
* Running tasks are tracked indirectly in `Map<Thread, TaskEntry>
_threadEntriesMap`. `TaskEntry` has the query id.
* This map has to be cleared regularly. Otherwise it will grow unbounded.
* The class clears it when there are no active threads for that query id -
or no entries in `_threadEntriesMap`.
If a query id is in `_finishedTasksOfQuery` but not in `_threadEntriesMap`
then that query is done. (This can be wrong where query is not done and no
threads are running) All records of that query id should be cleared. Thats what
`_inactiveQuery` does. It maintains the difference of query ids seen in one map
and not in the other.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]