MKehayov commented on code in PR #1698:
URL: https://github.com/apache/systemds/pull/1698#discussion_r979786346


##########
src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java:
##########
@@ -105,61 +114,80 @@ private static synchronized void 
updateCachedWorkers(List<WorkerModel> workers,
                }
        }
 
-       private static Runnable syncWorkerStatisticsWithDB() {
-               return () -> {
-
-                       for(Map.Entry<Long, Pair<String, Boolean>> entry : 
cachedWorkers.entrySet()) {
-                               Long id = entry.getKey();
-                               String address = entry.getValue().getLeft();
+       private static synchronized void startStatsCollectionProcess(int 
threadCount, double frequencySeconds) {
+               if (executorService == null) {
+                       executorService = 
Executors.newScheduledThreadPool(threadCount);
+                       
executorService.scheduleAtFixedRate(syncWorkerStatisticsRunnable(), 0, 
Math.round(frequencySeconds * 1000), TimeUnit.MILLISECONDS);
+               }
+       }
 
-                               var stats = 
StatisticsService.getWorkerStatistics(id, address);
+       public static void syncWorkerStatisticsWithDB(StatisticsModel stats, 
Long id) {
 
-                               if (stats != null) {
+               if (stats != null) {
 
-                                       cachedWorkers.get(id).setValue(true);
+                       cachedWorkers.get(id).setValue(true);
 
-                                       if (stats.utilization != null) {
-                                               
entityRepository.createEntity(stats.utilization.get(0));
-                                       }
-                                       if (stats.traffic != null) {
-                                               for (var trafficEntity: 
stats.traffic) {
-                                                       if 
(trafficEntity.coordinatorId > 0) {
-                                                               
entityRepository.createEntity(trafficEntity);
-                                                       }
+                       if (stats.utilization != null) {
+                               CompletableFuture.runAsync(() -> 
entityRepository.createEntity(stats.utilization.get(0)));

Review Comment:
   For this part of the code, I don't think so, since this is a separate thread 
running the process of collecting, parsing, and saving statistics from workers 
in the backend. Once the statistics are requested to be visualized from the 
frontend they are collected from the backend database and not the workers 
themselves. It would cause a short delay b/w collecting them (the futures are 
all finished) and displaying them (the frontend asks for them), but they should 
show up on the subsequent request from the frontend. This was my logic 
initially, let me know if you also agree or if we should change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to