klsince commented on code in PR #9449:
URL: https://github.com/apache/pinot/pull/9449#discussion_r978154220
##########
pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java:
##########
@@ -29,57 +34,100 @@
/**
* A minion event observer that can track task progress status in memory.
*/
+@ThreadSafe
public class MinionProgressObserver extends DefaultMinionEventObserver {
private static final Logger LOGGER =
LoggerFactory.getLogger(MinionProgressObserver.class);
+ // TODO: make this configurable
+ private static final int DEFAULT_MAX_NUM_STATUS_TO_TRACK = 128;
- private static volatile long _startTs;
- private static volatile Object _lastStatus;
+ private final int _maxNumStatusToTrack;
+ private final Deque<StatusEntry> _lastStatus = new LinkedList<>();
+ private long _startTs;
+
+ public MinionProgressObserver() {
+ this(DEFAULT_MAX_NUM_STATUS_TO_TRACK);
+ }
+
+ public MinionProgressObserver(int maxNumStatusToTrack) {
+ _maxNumStatusToTrack = maxNumStatusToTrack;
+ }
@Override
- public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
+ public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
_startTs = System.currentTimeMillis();
+ addStatus(_startTs, "Task started");
super.notifyTaskStart(pinotTaskConfig);
}
- public void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object
progress) {
+ public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig,
@Nullable Object progress) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Update progress: {} for task: {}", progress,
pinotTaskConfig.getTaskId());
}
- _lastStatus = progress;
+ addStatus(System.currentTimeMillis(), (progress == null) ? "" :
progress.toString());
super.notifyProgress(pinotTaskConfig, progress);
}
@Nullable
- public Object getProgress() {
- return _lastStatus;
+ public synchronized List<StatusEntry> getProgress() {
+ return new ArrayList<>(_lastStatus);
}
@Override
- public void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable
Object executionResult) {
+ public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig,
@Nullable Object executionResult) {
long endTs = System.currentTimeMillis();
- _lastStatus = "Task succeeded in " + (endTs - _startTs) + "ms";
+ addStatus(endTs, "Task succeeded in " + (endTs - _startTs) + "ms");
super.notifyTaskSuccess(pinotTaskConfig, executionResult);
}
@Override
- public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
+ public synchronized void notifyTaskCancelled(PinotTaskConfig
pinotTaskConfig) {
long endTs = System.currentTimeMillis();
- _lastStatus = "Task got cancelled after " + (endTs - _startTs) + "ms";
+ addStatus(endTs, "Task got cancelled after " + (endTs - _startTs) + "ms");
super.notifyTaskCancelled(pinotTaskConfig);
}
@Override
- public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) {
+ public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig,
Exception e) {
long endTs = System.currentTimeMillis();
- _lastStatus = "Task failed in " + (endTs - _startTs) + "ms, with error:\n"
+ makeStringFromException(e);
+ addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error:
" + makeStringFromException(e));
super.notifyTaskError(pinotTaskConfig, e);
}
+ private void addStatus(long ts, String progress) {
Review Comment:
feels not very necessary, as all public methods are sync'ed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]