siddharthteotia commented on code in PR #10171:
URL: https://github.com/apache/pinot/pull/10171#discussion_r1092317746
##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java:
##########
@@ -32,47 +30,49 @@
*/
public class CPUMemThreadLevelAccountingObjects {
- public static class StatsDigest {
-
- // The current usage sampling for each thread
- final long[] _currentStatsSample;
- // The previous usage sampling for each thread
- final long[] _lastStatSample;
- // The aggregated usage sampling for the finished tasks of a (still)
running queries
- final HashMap<String, Long> _finishedTaskStatAggregator;
-
- StatsDigest(int numThreads) {
- _currentStatsSample = new long[numThreads];
- _lastStatSample = new long[numThreads];
- _finishedTaskStatAggregator = new HashMap<>();
- }
- }
-
/**
- * Entry to track the task execution status of a worker/runner given thread
+ * Entry to track the task execution status and usage stats of a Thread
*/
- public static class TaskEntryHolder {
- AtomicReference<TaskEntry> _threadTaskStatus = new AtomicReference<>(null);
+ public static class ThreadEntry {
+ // current query_id, task_id of the thread; this field is accessed by the
thread itself and the accountant
+ AtomicReference<TaskEntry> _currentThreadTaskStatus = new
AtomicReference<>(null);
+ // current sample of thread memory usage/cputime ; this field is accessed
by the thread itself and the accountant
+ volatile long _currentThreadCPUTimeSampleMS = 0;
+ volatile long _currentThreadMemoryAllocationSampleBytes = 0;
+
+ // previous query_id, task_id of the thread, this field should only be
accessed by the accountant
+ TaskEntry _previousThreadTaskStatus = null;
+ // previous query_id, task_id of the thread, this field should only be
accessed by the accountant
+ long _previousThreadCPUTimeSampleMS = 0;
+ long _previousThreadMemoryAllocationSampleBytes = 0;
+
+ // error message store per runner/worker thread,
+ // will put preemption reasons in this for the killed thread to pickup
+ AtomicReference<Exception> _errorStatus = new AtomicReference<>(null);
/**
- * set the thread tracking info to null
+ * set the thread tracking info to null and usage samples to zero
*/
public void setToIdle() {
- _threadTaskStatus.set(null);
+ // clear task info
+ _currentThreadTaskStatus.set(null);
+ // clear CPU time
+ _currentThreadCPUTimeSampleMS = 0;
+ // clear memory usage
+ _currentThreadMemoryAllocationSampleBytes = 0;
}
/**
*
* @return the current query id on the thread, {@code null} if idle
*/
@Nullable
- public TaskEntry getThreadTaskStatus() {
- return _threadTaskStatus.get();
+ public TaskEntry getCurrentThreadTaskStatus() {
+ return _currentThreadTaskStatus.get();
}
- public TaskEntryHolder setThreadTaskStatus(@Nonnull String queryId, int
taskId, @Nonnull Thread thread) {
- _threadTaskStatus.set(new TaskEntry(queryId, taskId, thread));
- return this;
+ public void setThreadTaskStatus(@Nonnull String queryId, int taskId,
@Nonnull Thread anchorThread) {
Review Comment:
(nit) for future readers, you might want to add a note here or in the
javadoc of `TaskEntry` class describing who are we referring to by
`anchorThread`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]