MKehayov commented on code in PR #1698:
URL: https://github.com/apache/systemds/pull/1698#discussion_r981236652


##########
src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerStatisticsTest.java:
##########
@@ -58,6 +87,54 @@ public void testWorkerStatisticsParsedCorrectly() {
                Assert.assertNotEquals("Utilization stats parsed correctly", 0, 
model.utilization.size());
        }
 
+       @Test
+       public void testWorkerStatisticsPerformance() throws 
InterruptedException {
+               ExecutorService executor = 
Executors.newFixedThreadPool(workerPorts.length);
+
+               double meanExecTime = 0.f;
+               double numRepetitionsExperiment = 100.f;
+
+               for (int j = -10; j < numRepetitionsExperiment; j++) {
+
+                       Collection<Callable<StatisticsModel>> collect = new 
ArrayList<>();
+                       Collection<Callable<Boolean>> parse = new ArrayList<>();
+
+                       for (int i = 1; i <= workerPorts.length; i++) {
+                               long id = i;
+                               String address = "localhost:" + workerPorts[i - 
1];
+                               workerMonitoringService.create(new 
WorkerModel(id, "Worker", address));
+                               collect.add(() -> 
StatisticsService.getWorkerStatistics(id, address));
+                       }
+
+                       long start = System.currentTimeMillis();
+
+                       // Returns a list of Futures holding their status and 
results when all complete.
+                       // Future.isDone() is true for each element of the 
returned list
+                       // 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection)
+                       List<Future<StatisticsModel>> taskFutures = 
executor.invokeAll(collect);
+
+                       taskFutures.forEach(res -> parse.add(() -> 
syncWorkerStats(res.get(), res.get().traffic.get(0).workerId)));
+
+                       executor.invokeAll(parse);
+
+                       long finish = System.currentTimeMillis();
+                       long elapsedTime = (finish - start);
+
+                       if (j >= 0) {
+                               meanExecTime += elapsedTime;
+                       }
+               }
+
+               executor.shutdown();
+
+               // Wait until all threads are finish
+               // Returns true if all tasks have completed following shut down.
+               // Note that isTerminated is never true unless either shutdown 
or shutdownNow was called first.
+               while (!executor.isTerminated());
+
+               System.out.println(String.format(PERFORMANCE_FORMAT, 
workerPorts.length, Math.round(meanExecTime / numRepetitionsExperiment)));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to