zhtaoxiang commented on code in PR #9449:
URL: https://github.com/apache/pinot/pull/9449#discussion_r978120634


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java:
##########
@@ -111,6 +111,8 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
   protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> segmentDirs,
       File workingDir)
       throws Exception {
+    int numInputSegments = segmentDirs.size();
+    _eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + 
numInputSegments);

Review Comment:
   maybe we can also include segment names?



##########
pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java:
##########
@@ -29,57 +34,100 @@
 /**
  * A minion event observer that can track task progress status in memory.
  */
+@ThreadSafe
 public class MinionProgressObserver extends DefaultMinionEventObserver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionProgressObserver.class);
+  // TODO: make this configurable
+  private static final int DEFAULT_MAX_NUM_STATUS_TO_TRACK = 128;
 
-  private static volatile long _startTs;
-  private static volatile Object _lastStatus;
+  private final int _maxNumStatusToTrack;
+  private final Deque<StatusEntry> _lastStatus = new LinkedList<>();
+  private long _startTs;
+
+  public MinionProgressObserver() {
+    this(DEFAULT_MAX_NUM_STATUS_TO_TRACK);
+  }
+
+  public MinionProgressObserver(int maxNumStatusToTrack) {
+    _maxNumStatusToTrack = maxNumStatusToTrack;
+  }
 
   @Override
-  public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
+  public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
     _startTs = System.currentTimeMillis();
+    addStatus(_startTs, "Task started");
     super.notifyTaskStart(pinotTaskConfig);
   }
 
-  public void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object 
progress) {
+  public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, 
@Nullable Object progress) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Update progress: {} for task: {}", progress, 
pinotTaskConfig.getTaskId());
     }
-    _lastStatus = progress;
+    addStatus(System.currentTimeMillis(), (progress == null) ? "" : 
progress.toString());

Review Comment:
   This assumes that `progress.toString()` can always produce meaningful 
string, is this a valid assumption?



##########
pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java:
##########
@@ -57,23 +60,24 @@ public class PinotTaskProgressResource {
 
   @GET
   @Path("/tasks/subtask/progress")
+  @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get finer grained task progress tracked in memory for the 
given subtasks")
   @ApiResponses(value = {
       @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
   })
-  public Object getSubtaskProgress(
+  public String getSubtaskProgress(

Review Comment:
   ditto:this is not backward compatible, is it okay to keep the old one or add 
a new one?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -410,8 +410,9 @@ public Map<String, PinotTaskConfig> getSubtaskConfigs(
 
   @GET
   @Path("/tasks/subtask/{taskName}/progress")
+  @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Get progress of specified sub tasks for the given task 
tracked by worker in memory")
-  public Map<String, String> getSubtaskProgress(@Context HttpHeaders 
httpHeaders,
+  public String getSubtaskProgress(@Context HttpHeaders httpHeaders,

Review Comment:
   this is not backward compatible, is it okay to keep the old one or add a new 
one?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java:
##########
@@ -55,6 +55,8 @@ public MergeRollupTaskExecutor(MinionConf minionConf) {
   protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> segmentDirs,
       File workingDir)
       throws Exception {
+    int numInputSegments = segmentDirs.size();
+    _eventObserver.notifyProgress(pinotTaskConfig, "Converting segments: " + 
numInputSegments);

Review Comment:
   should we include segment names?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -463,7 +463,7 @@ public synchronized Map<String, PinotTaskConfig> 
getSubtaskConfigs(String taskNa
     return taskConfigs;
   }
 
-  public synchronized Map<String, String> getSubtaskProgress(String taskName, 
@Nullable String subtaskNames,
+  public synchronized Map<String, Object> getSubtaskProgress(String taskName, 
@Nullable String subtaskNames,

Review Comment:
   This public method is not backward compatible and may affect other systems 
if they are using it, although in practice this might not be the case.



##########
pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java:
##########
@@ -29,57 +34,100 @@
 /**
  * A minion event observer that can track task progress status in memory.
  */
+@ThreadSafe
 public class MinionProgressObserver extends DefaultMinionEventObserver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionProgressObserver.class);
+  // TODO: make this configurable
+  private static final int DEFAULT_MAX_NUM_STATUS_TO_TRACK = 128;
 
-  private static volatile long _startTs;
-  private static volatile Object _lastStatus;
+  private final int _maxNumStatusToTrack;
+  private final Deque<StatusEntry> _lastStatus = new LinkedList<>();
+  private long _startTs;
+
+  public MinionProgressObserver() {
+    this(DEFAULT_MAX_NUM_STATUS_TO_TRACK);
+  }
+
+  public MinionProgressObserver(int maxNumStatusToTrack) {
+    _maxNumStatusToTrack = maxNumStatusToTrack;
+  }
 
   @Override
-  public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
+  public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
     _startTs = System.currentTimeMillis();
+    addStatus(_startTs, "Task started");
     super.notifyTaskStart(pinotTaskConfig);
   }
 
-  public void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object 
progress) {
+  public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, 
@Nullable Object progress) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Update progress: {} for task: {}", progress, 
pinotTaskConfig.getTaskId());
     }
-    _lastStatus = progress;
+    addStatus(System.currentTimeMillis(), (progress == null) ? "" : 
progress.toString());
     super.notifyProgress(pinotTaskConfig, progress);
   }
 
   @Nullable
-  public Object getProgress() {
-    return _lastStatus;
+  public synchronized List<StatusEntry> getProgress() {
+    return new ArrayList<>(_lastStatus);
   }
 
   @Override
-  public void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, @Nullable 
Object executionResult) {
+  public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, 
@Nullable Object executionResult) {
     long endTs = System.currentTimeMillis();
-    _lastStatus = "Task succeeded in " + (endTs - _startTs) + "ms";
+    addStatus(endTs, "Task succeeded in " + (endTs - _startTs) + "ms");
     super.notifyTaskSuccess(pinotTaskConfig, executionResult);
   }
 
   @Override
-  public void notifyTaskCancelled(PinotTaskConfig pinotTaskConfig) {
+  public synchronized void notifyTaskCancelled(PinotTaskConfig 
pinotTaskConfig) {
     long endTs = System.currentTimeMillis();
-    _lastStatus = "Task got cancelled after " + (endTs - _startTs) + "ms";
+    addStatus(endTs, "Task got cancelled after " + (endTs - _startTs) + "ms");
     super.notifyTaskCancelled(pinotTaskConfig);
   }
 
   @Override
-  public void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception e) {
+  public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, 
Exception e) {
     long endTs = System.currentTimeMillis();
-    _lastStatus = "Task failed in " + (endTs - _startTs) + "ms, with error:\n" 
+ makeStringFromException(e);
+    addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: 
" + makeStringFromException(e));
     super.notifyTaskError(pinotTaskConfig, e);
   }
 
+  private void addStatus(long ts, String progress) {

Review Comment:
   nit: maybe we can also add `synchronized` keyword here?



##########
pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java:
##########
@@ -29,57 +34,100 @@
 /**
  * A minion event observer that can track task progress status in memory.
  */
+@ThreadSafe
 public class MinionProgressObserver extends DefaultMinionEventObserver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionProgressObserver.class);
+  // TODO: make this configurable
+  private static final int DEFAULT_MAX_NUM_STATUS_TO_TRACK = 128;
 
-  private static volatile long _startTs;
-  private static volatile Object _lastStatus;
+  private final int _maxNumStatusToTrack;
+  private final Deque<StatusEntry> _lastStatus = new LinkedList<>();
+  private long _startTs;
+
+  public MinionProgressObserver() {
+    this(DEFAULT_MAX_NUM_STATUS_TO_TRACK);
+  }
+
+  public MinionProgressObserver(int maxNumStatusToTrack) {
+    _maxNumStatusToTrack = maxNumStatusToTrack;
+  }
 
   @Override
-  public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
+  public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
     _startTs = System.currentTimeMillis();
+    addStatus(_startTs, "Task started");
     super.notifyTaskStart(pinotTaskConfig);
   }
 
-  public void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object 
progress) {
+  public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, 
@Nullable Object progress) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Update progress: {} for task: {}", progress, 
pinotTaskConfig.getTaskId());
     }
-    _lastStatus = progress;
+    addStatus(System.currentTimeMillis(), (progress == null) ? "" : 
progress.toString());
     super.notifyProgress(pinotTaskConfig, progress);
   }
 
   @Nullable
-  public Object getProgress() {
-    return _lastStatus;
+  public synchronized List<StatusEntry> getProgress() {
+    return new ArrayList<>(_lastStatus);

Review Comment:
   how about using immutableList?



##########
pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java:
##########
@@ -29,57 +34,100 @@
 /**
  * A minion event observer that can track task progress status in memory.
  */
+@ThreadSafe
 public class MinionProgressObserver extends DefaultMinionEventObserver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MinionProgressObserver.class);
+  // TODO: make this configurable
+  private static final int DEFAULT_MAX_NUM_STATUS_TO_TRACK = 128;
 
-  private static volatile long _startTs;
-  private static volatile Object _lastStatus;
+  private final int _maxNumStatusToTrack;
+  private final Deque<StatusEntry> _lastStatus = new LinkedList<>();
+  private long _startTs;
+
+  public MinionProgressObserver() {
+    this(DEFAULT_MAX_NUM_STATUS_TO_TRACK);
+  }
+
+  public MinionProgressObserver(int maxNumStatusToTrack) {
+    _maxNumStatusToTrack = maxNumStatusToTrack;
+  }
 
   @Override
-  public void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
+  public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
     _startTs = System.currentTimeMillis();
+    addStatus(_startTs, "Task started");
     super.notifyTaskStart(pinotTaskConfig);
   }
 
-  public void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object 
progress) {
+  public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, 
@Nullable Object progress) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Update progress: {} for task: {}", progress, 
pinotTaskConfig.getTaskId());
     }
-    _lastStatus = progress;
+    addStatus(System.currentTimeMillis(), (progress == null) ? "" : 
progress.toString());
     super.notifyProgress(pinotTaskConfig, progress);
   }
 
   @Nullable
-  public Object getProgress() {
-    return _lastStatus;
+  public synchronized List<StatusEntry> getProgress() {

Review Comment:
   ditto: not backward compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to