This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a52c2a25cc Reduce logs and improve logging when queries are terminated
due to OOM. (#16172)
a52c2a25cc is described below
commit a52c2a25cc469d88072e38fdd75c7293cda0c9cb
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Wed Jun 25 12:00:42 2025 +0530
Reduce logs and improve logging when queries are terminated due to OOM.
(#16172)
---
.../PerQueryCPUMemAccountantFactory.java | 39 +++++++++-------------
1 file changed, 16 insertions(+), 23 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index df110358cb..f733e81ba7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -109,7 +109,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
CPUMemThreadLevelAccountingObjects.ThreadEntry ret =
new CPUMemThreadLevelAccountingObjects.ThreadEntry();
_threadEntriesMap.put(Thread.currentThread(), ret);
- LOGGER.info("Adding thread to _threadLocalEntry: {}",
Thread.currentThread().getName());
+ LOGGER.debug("Adding thread to _threadLocalEntry: {}",
Thread.currentThread().getName());
return ret;
}
);
@@ -456,7 +456,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
if (!thread.isAlive()) {
_threadEntriesMap.remove(thread);
- LOGGER.info("Removing thread from _threadLocalEntry: {}",
thread.getName());
+ LOGGER.debug("Removing thread from _threadLocalEntry: {}",
thread.getName());
}
}
@@ -482,6 +482,16 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
public void postAggregation(Map<String, AggregatedStats>
aggregatedUsagePerActiveQuery) {
}
+ protected void logQueryResourceUsage(Map<String, ? extends
QueryResourceTracker> aggregatedUsagePerActiveQuery) {
+ LOGGER.warn("Query aggregation results {} for the previous kill.",
aggregatedUsagePerActiveQuery);
+ }
+
+ protected void logTerminatedQuery(QueryResourceTracker
queryResourceTracker, long totalHeapMemoryUsage) {
+ LOGGER.warn("Query {} terminated. Memory Usage: {}. Cpu Usage: {}. Total
Heap Usage: {}",
+ queryResourceTracker.getQueryId(),
queryResourceTracker.getAllocatedBytes(),
+ queryResourceTracker.getCpuTimeNs(), totalHeapMemoryUsage);
+ }
+
@Override
public Exception getErrorStatus() {
return _threadLocalEntry.get()._errorStatus.getAndSet(null);
@@ -892,9 +902,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
" Query %s got killed because using %d bytes of memory on
%s: %s, exceeding the quota",
maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(),
_instanceType, _instanceId)));
interruptRunnerThread(maxUsageTuple.getAnchorThread());
- LOGGER.error("Query {} got picked because using {} bytes of
memory, actual kill committed true}",
- maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
- LOGGER.error("Current task status recorded is {}",
_threadEntriesMap);
+ logTerminatedQuery(maxUsageTuple, _usedBytes);
} else if (!_oomKillQueryEnabled) {
LOGGER.warn("Query {} got picked because using {} bytes of memory,
actual kill committed false "
+ "because oomKillQueryEnabled is false",
@@ -902,25 +910,8 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
} else {
LOGGER.warn("But all queries are below quota, no query killed");
}
- } else {
- maxUsageTuple =
Collections.max(_aggregatedUsagePerActiveQuery.values(),
- Comparator.comparing(AggregatedStats::getCpuTimeNs));
- if (_oomKillQueryEnabled) {
- maxUsageTuple._exceptionAtomicReference
- .set(new RuntimeException(String.format(
- " Query %s got killed because memory pressure, using %d ns
of CPU time on %s: %s",
- maxUsageTuple._queryId, maxUsageTuple.getAllocatedBytes(),
_instanceType, _instanceId)));
- interruptRunnerThread(maxUsageTuple.getAnchorThread());
- LOGGER.error("Query {} got picked because using {} ns of cpu time,
actual kill committed true",
- maxUsageTuple._allocatedBytes, maxUsageTuple._queryId);
- LOGGER.error("Current task status recorded is {}",
_threadEntriesMap);
- } else {
- LOGGER.warn("Query {} got picked because using {} bytes of memory,
actual kill committed false "
- + "because oomKillQueryEnabled is false",
- maxUsageTuple._queryId, maxUsageTuple._allocatedBytes);
- }
}
- LOGGER.warn("Query aggregation results {} for the previous kill.",
_aggregatedUsagePerActiveQuery.toString());
+ logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
}
private void killCPUTimeExceedQueries() {
@@ -935,8 +926,10 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
+ "CPU time exceeding limit of %d ns CPU time",
value._queryId, _instanceType, _instanceId,
value.getCpuTimeNs(), _cpuTimeBasedKillingThresholdNS)));
interruptRunnerThread(value.getAnchorThread());
+ logTerminatedQuery(value, _usedBytes);
}
}
+ logQueryResourceUsage(_aggregatedUsagePerActiveQuery);
}
private void interruptRunnerThread(Thread thread) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]