This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 97b81bc869 [MINOR] Heavy hitters bug fix and count selection
97b81bc869 is described below
commit 97b81bc869ad172a3e845d983398ac94b1b0ec7b
Author: mito <[email protected]>
AuthorDate: Mon Oct 10 09:07:39 2022 +0200
[MINOR] Heavy hitters bug fix and count selection
This commit fixes some of the Monitoring UI to correctly reflect the
heavy hitters,
In this process, we have changed the federated workers to only aggregate
statistics and not clear statistics at clear commands, since this
breaks the statistics when multiple coordinators use the federated
backend at the same time.
Closes #1703
---
.../heavyHitter.model.ts} | 35 ++----------
.../monitoring/src/app/models/statistics.model.ts | 4 +-
.../modules/dashboard/worker/worker.component.html | 11 +++-
.../modules/dashboard/worker/worker.component.scss | 5 +-
.../modules/dashboard/worker/worker.component.ts | 38 +++++--------
.../src/app/services/service-mock-data.ts | 3 +-
.../federated/FederatedWorkerHandler.java | 2 +-
.../federated/monitoring/DB-diagram.svg | 2 +-
.../monitoring/models/HeavyHitterModel.java | 66 ++++++++++++++++++++++
.../monitoring/models/StatisticsModel.java | 18 ++++--
.../monitoring/models/StatisticsOptions.java | 1 +
.../monitoring/repositories/DerbyRepository.java | 3 +-
.../monitoring/services/StatisticsService.java | 19 ++++++-
.../monitoring/services/WorkerService.java | 17 ++++++
.../monitoring/FederatedWorkerStatisticsTest.java | 14 ++++-
15 files changed, 168 insertions(+), 70 deletions(-)
diff --git
a/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.scss
b/scripts/monitoring/src/app/models/heavyHitter.model.ts
similarity index 73%
copy from
scripts/monitoring/src/app/modules/dashboard/worker/worker.component.scss
copy to scripts/monitoring/src/app/models/heavyHitter.model.ts
index 1f71cf1422..4061541ce7 100644
--- a/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.scss
+++ b/scripts/monitoring/src/app/models/heavyHitter.model.ts
@@ -17,35 +17,8 @@
* under the License.
*/
-.worker-card {
- width: 31em;
- height: 38em;
-}
-
-.worker-chart {
- height: 100px;
-}
-
-table {
- width: 100%;
-}
-
-.online-worker {
- color: green;
-}
-
-.offline-worker {
- color: darkred;
-}
-.bold {
- font-weight: bold;
-}
-
-.info-row {
- display: inline;
- margin: auto;
- padding: 0.2em;
- > * {
- float: left;
- }
+export class HeavyHitter {
+ constructor(public operation: string = '',
+ public duration: number = 0,
+ public count: number = 0) { }
}
diff --git a/scripts/monitoring/src/app/models/statistics.model.ts
b/scripts/monitoring/src/app/models/statistics.model.ts
index ae2c19ef10..b8328f2abb 100644
--- a/scripts/monitoring/src/app/models/statistics.model.ts
+++ b/scripts/monitoring/src/app/models/statistics.model.ts
@@ -22,11 +22,13 @@ import { Traffic } from "./traffic.model";
import { Event } from "./event.model";
import { DataObject } from "./dataObject.model";
import { FedRequest } from "./fedRequest.model";
+import { HeavyHitter } from "./heavyHitter.model";
export class Statistics {
constructor(public utilization: Utilization[] = [],
public traffic: Traffic[] = [],
public events: Event[] = [],
public dataObjects: DataObject[] = [],
- public requests: FedRequest[] = []) { }
+ public requests: FedRequest[] = [],
+ public heavyHitters: HeavyHitter[] = []) { }
}
diff --git
a/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.html
b/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.html
index 9af92085b4..95bf15f97e 100644
--- a/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.html
+++ b/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.html
@@ -17,7 +17,7 @@
under the License.
-->
-<mat-card class="worker-card">
+<mat-card class="worker-card" [style.height.px]="450 + additionalCardHeight">
<mat-card-title *ngIf="model">{{ model.name }}</mat-card-title>
<mat-divider inset></mat-divider>
<mat-card-content *ngIf="model">
@@ -52,15 +52,20 @@
<mat-divider inset></mat-divider>
<mat-card-content>
+ <mat-form-field appearance="fill" class="full-width">
+ <mat-label>Select number of heavy hitters</mat-label>
+ <input matInput type="number" placeholder="3"
[(ngModel)]="heavyHittersCount">
+ </mat-form-field>
+
<table [dataSource]="dataSource" mat-table matSort>
<ng-container matColumnDef="instruction">
- <th *matHeaderCellDef mat-header-cell
mat-sort-header> Top 3 Inst.</th>
+ <th *matHeaderCellDef mat-header-cell
mat-sort-header> Top {{heavyHittersCount}} Heavy Hitters</th>
<td *matCellDef="let element" mat-cell>
{{element['instruction']}} </td>
</ng-container>
<ng-container matColumnDef="time">
- <th *matHeaderCellDef mat-header-cell
mat-sort-header> Time(ms)</th>
+ <th *matHeaderCellDef mat-header-cell
mat-sort-header> Time(Sec)</th>
<td *matCellDef="let element" mat-cell>
{{element['time']}} </td>
</ng-container>
diff --git
a/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.scss
b/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.scss
index 1f71cf1422..584ccbbff9 100644
--- a/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.scss
+++ b/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.scss
@@ -19,7 +19,10 @@
.worker-card {
width: 31em;
- height: 38em;
+}
+
+.full-width {
+ width: 100%;
}
.worker-chart {
diff --git
a/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.ts
b/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.ts
index 1d749c32fc..822390f589 100644
--- a/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.ts
+++ b/scripts/monitoring/src/app/modules/dashboard/worker/worker.component.ts
@@ -43,6 +43,9 @@ export class WorkerComponent {
public displayedColumns: string[] = ['instruction', 'time',
'frequency'];
public dataSource: MatTableDataSource<any> = new
MatTableDataSource<any>([]);
+ public heavyHittersCount: number = 3;
+ public additionalCardHeight: number = 0;
+
private stopPollingWorker = new Subject<any>();
private stopPollingStatistics = new Subject<any>();
@@ -111,33 +114,22 @@ export class WorkerComponent {
}
private parseInstructions(): any {
- let tmp = {};
- let result: any[] = [];
- this.statistics.events.forEach(e => {
- e.stages.forEach(s => {
- if (!tmp[s.operation]) {
- tmp[s.operation] = {
- frequency: 0,
- time: 0
- }
- }
-
- tmp[s.operation]['frequency'] += 1;
- tmp[s.operation]['time'] += (new
Date(s.endTime).getTime() - new Date(s.startTime).getTime());
- })
+ let result: any = this.statistics.heavyHitters.map(hh => {
+ return {
+ instruction: hh.operation,
+ time: hh.duration,
+ frequency: hh.count
+ }
});
- for (const [key, value] of Object.entries(tmp)) {
- result.push({
- instruction: key,
- // @ts-ignore
- time: value['time'],
- // @ts-ignore
- frequency: value['frequency']
- })
+ // 48 px is the height of one table row
+ this.additionalCardHeight = this.heavyHittersCount * 48;
+
+ if (result.length < this.heavyHittersCount) {
+ this.heavyHittersCount = result.length;
}
- return result.sort((a,b) => b['time']-a['time']).slice(0,3);
+ return result.sort((a,b) =>
b['time']-a['time']).slice(0,this.heavyHittersCount);
}
ngOnDestroy() {
diff --git a/scripts/monitoring/src/app/services/service-mock-data.ts
b/scripts/monitoring/src/app/services/service-mock-data.ts
index df1999b364..0695056c48 100644
--- a/scripts/monitoring/src/app/services/service-mock-data.ts
+++ b/scripts/monitoring/src/app/services/service-mock-data.ts
@@ -58,7 +58,8 @@ let statistics = {
"utilization": [],
"traffic": [],
"dataObjects": [],
- "requests": []
+ "requests": [],
+ "heavyHitters": []
}
export const serviceMockData = {
workers: workers,
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index b748fe8740..0e8df94ec6 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -277,7 +277,7 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
private static void printStatistics() {
if(DMLScript.STATISTICS && Statistics.allowWorkerStatistics) {
System.out.println("Federated Worker " +
Statistics.display());
- Statistics.reset();
+ // Statistics.reset();
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/DB-diagram.svg
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/DB-diagram.svg
index 8fd7a100a8..0d1667a987 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/DB-diagram.svg
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/DB-diagram.svg
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Do not edit this file with editors other than diagrams.net -->
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
-<svg xmlns="http://www.w3.org/2000/svg" style="background-color: rgb(255, 255,
255);" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="361px"
height="254px" viewBox="-0.5 -0.5 361 254" content="<mxfile
host="app.diagrams.net" modified="2022-06-24T15:02:11.930Z"
agent="5.0 (X11)" etag="xU0YAODbRBLnMw9GYvGD"
version="19.0.3" type="google"><diagram
id="C5RBs43oDa-KdzZeNtuy" name="Page-1"& [...]
\ No newline at end of file
+<svg xmlns="http://www.w3.org/2000/svg" style="background-color: rgb(255, 255,
255);" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="521px"
height="624px" viewBox="-0.5 -0.5 521 624" content="<mxfile
host="app.diagrams.net" modified="2022-10-09T13:43:10.880Z"
agent="5.0 (X11)" etag="m9Mr5a77XWx5CHXV3MOd"
version="20.3.6" type="google"><diagram
id="C5RBs43oDa-KdzZeNtuy" name="Page-1"& [...]
\ No newline at end of file
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/HeavyHitterModel.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/HeavyHitterModel.java
new file mode 100644
index 0000000000..03e91dc185
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/HeavyHitterModel.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.federated.monitoring.models;
+
+public class HeavyHitterModel extends BaseModel {
+ private static final long serialVersionUID = 1L;
+ public Long workerId;
+ public String operation;
+ public double duration;
+ public Long count;
+
+ private static final String JsonFormat = "{" +
+ "\"operation\": \"%s\"," +
+ "\"duration\": %.2f," +
+ "\"count\": %d" +
+ "}";
+
+ public HeavyHitterModel() {
+ this(-1L);
+ }
+
+ private HeavyHitterModel(final Long id) {
+ this.id = id;
+ }
+
+ public HeavyHitterModel(final Long workerId,
+ final String operation,
+ final double duration,
+ final Long count) {
+ this(-1L, workerId, operation, duration, count);
+ }
+
+ public HeavyHitterModel(final Long id,
+ final Long workerId,
+ final String operation,
+ final double duration,
+ final Long count) {
+ this.id = id;
+ this.workerId = workerId;
+ this.operation = operation;
+ this.duration = duration;
+ this.count = count;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(JsonFormat, this.operation, this.duration,
this.count);
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsModel.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsModel.java
index 613b286a27..a3527e3d72 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsModel.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsModel.java
@@ -29,13 +29,15 @@ public class StatisticsModel extends BaseModel {
public List<EventModel> events;
public List<DataObjectModel> dataObjects;
public List<RequestModel> requests;
+ public List<HeavyHitterModel> heavyHitters;
private static final String JsonFormat = "{" +
"\"utilization\": [%s]," +
"\"traffic\": [%s]," +
"\"events\": [%s]," +
"\"dataObjects\": [%s]," +
- "\"requests\": [%s]" +
+ "\"requests\": [%s]," +
+ "\"heavyHitters\": [%s]" +
"}";
public StatisticsModel() { }
@@ -44,18 +46,20 @@ public class StatisticsModel extends BaseModel {
List<TrafficModel> traffic,
List<EventModel> events,
List<DataObjectModel>
dataObjects,
- List<RequestModel> requests)
{
+ List<RequestModel> requests,
+ List<HeavyHitterModel>
heavyHitters) {
this.utilization = utilization;
this.traffic = traffic;
this.events = events;
this.dataObjects = dataObjects;
this.requests = requests;
+ this.heavyHitters = heavyHitters;
}
@Override
public String toString() {
- String utilizationStr = null, trafficStr = null, eventsStr =
null, dataObjectsStr = null, requestsStr = null;
+ String utilizationStr = null, trafficStr = null, eventsStr =
null, dataObjectsStr = null, requestsStr = null, heavyHittersStr = null;
if (utilization != null) {
utilizationStr = utilization.stream()
@@ -87,6 +91,12 @@ public class StatisticsModel extends BaseModel {
.collect(Collectors.joining(","));
}
- return String.format(JsonFormat, utilizationStr, trafficStr,
eventsStr, dataObjectsStr, requestsStr);
+ if (heavyHitters != null) {
+ heavyHittersStr = heavyHitters.stream()
+ .map(HeavyHitterModel::toString)
+ .collect(Collectors.joining(","));
+ }
+
+ return String.format(JsonFormat, utilizationStr, trafficStr,
eventsStr, dataObjectsStr, requestsStr, heavyHittersStr);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsOptions.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsOptions.java
index cbbf4cd915..4c05559708 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsOptions.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsOptions.java
@@ -27,4 +27,5 @@ public class StatisticsOptions extends BaseModel {
public boolean events = true;
public boolean dataObjects = true;
public boolean requests = true;
+ public boolean heavyHitters = true;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
index 9c23478b40..718bcf867b 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
@@ -44,7 +44,8 @@ public class DerbyRepository implements IRepository {
new EventModel(),
new EventStageModel(),
new DataObjectModel(),
- new RequestModel()
+ new RequestModel(),
+ new HeavyHitterModel()
));
private static final String ENTITY_SCHEMA_CREATE_STMT = "CREATE TABLE
%s " +
"(id INTEGER PRIMARY KEY GENERATED ALWAYS AS IDENTITY
(START WITH 1, INCREMENT BY 1)";
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/StatisticsService.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/StatisticsService.java
index d99a13ea95..c72966ba58 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/StatisticsService.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/StatisticsService.java
@@ -39,6 +39,7 @@ import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.Coord
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.DataObjectModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.EventModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.EventStageModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.HeavyHitterModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.RequestModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.StatisticsModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.StatisticsOptions;
@@ -58,6 +59,7 @@ public class StatisticsService {
CompletableFuture<Void> eventsFuture = null;
CompletableFuture<Void> dataObjFuture = null;
CompletableFuture<Void> requestsFuture = null;
+ CompletableFuture<Void> heavyHittersFuture = null;
var stats = new StatisticsModel();
@@ -101,7 +103,13 @@ public class StatisticsService {
.thenAcceptAsync(result ->
stats.requests = result);
}
- List<CompletableFuture<Void>> completableFutures =
Arrays.asList(utilizationFuture, trafficFuture, eventsFuture, dataObjFuture,
requestsFuture);
+ if (options.heavyHitters) {
+ heavyHittersFuture = CompletableFuture
+ .supplyAsync(() ->
entityRepository.getAllEntitiesByField(Constants.ENTITY_WORKER_ID_COL,
workerId, HeavyHitterModel.class))
+ .thenAcceptAsync(result ->
stats.heavyHitters = result);
+ }
+
+ List<CompletableFuture<Void>> completableFutures =
Arrays.asList(utilizationFuture, trafficFuture, eventsFuture, dataObjFuture,
requestsFuture, heavyHittersFuture);
completableFutures.forEach(cf -> {
try {
@@ -151,6 +159,8 @@ public class StatisticsService {
var dataObjects = aggFedStats.workerDataObjects;
var requests = aggFedStats.workerRequests;
+ List<HeavyHitterModel> heavyHitters = new ArrayList<>();
+
utilization.workerId = workerId;
traffic.forEach(t -> t.workerId = workerId);
dataObjects.forEach(o -> o.workerId = workerId);
@@ -173,7 +183,12 @@ public class StatisticsService {
setCoordinatorId(request);
}
- return new StatisticsModel(List.of(utilization), traffic,
events, dataObjects, requests);
+ for (var heavyHitterEntry: aggFedStats.heavyHitters.entrySet())
{
+ var newHH = new HeavyHitterModel(workerId,
heavyHitterEntry.getKey(), heavyHitterEntry.getValue().getValue(),
heavyHitterEntry.getValue().getLeft());
+ heavyHitters.add(newHH);
+ }
+
+ return new StatisticsModel(List.of(utilization), traffic,
events, dataObjects, requests, heavyHitters);
}
private static void setCoordinatorId(CoordinatorConnectionModel entity)
{
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
index 2e8c663b79..2882c928d5 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
@@ -32,6 +32,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.DataObjectModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.HeavyHitterModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.RequestModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.StatisticsModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.WorkerModel;
@@ -174,6 +175,22 @@ public class WorkerService {
}
});
}
+ if (stats.heavyHitters != null) {
+ CompletableFuture.runAsync(() -> {
+ //
entityRepository.removeAllEntitiesByField(Constants.ENTITY_WORKER_ID_COL, id,
HeavyHitterModel.class);
+
+ // for (var heavyHitterEntity :
stats.heavyHitters) {
+ //
entityRepository.createEntity(heavyHitterEntity);
+ // }
+ if(!stats.heavyHitters.isEmpty()) {
+
entityRepository.removeAllEntitiesByField(Constants.ENTITY_WORKER_ID_COL, id,
HeavyHitterModel.class);
+
+ for(var heavyHitterEntity :
stats.heavyHitters) {
+
entityRepository.createEntity(heavyHitterEntity);
+ }
+ }
+ });
+ }
} else {
cachedWorkers.get(id).setValue(false);
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerStatisticsTest.java
b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerStatisticsTest.java
index db2403920f..1f9407826b 100644
---
a/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerStatisticsTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/federated/monitoring/FederatedWorkerStatisticsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.DataObjectModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.HeavyHitterModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.RequestModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.StatisticsModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.StatisticsOptions;
@@ -77,7 +78,7 @@ public class FederatedWorkerStatisticsTest extends
FederatedMonitoringTestBase {
int retry = 10;
while(model == null && retry > 0){
Thread.sleep(1000);
- model = (StatisticsModel)
StatisticsService.getWorkerStatistics(1L, "localhost:" + workerPorts[0]);
+ model = StatisticsService.getWorkerStatistics(1L,
"localhost:" + workerPorts[0]);
retry--;
}
@@ -216,6 +217,17 @@ public class FederatedWorkerStatisticsTest extends
FederatedMonitoringTestBase {
}
}
+ return true;
+ });
+ }
+ if (stats.heavyHitters != null) {
+ dataObjFuture =
CompletableFuture.supplyAsync(() -> {
+
entityRepository.removeAllEntitiesByField(Constants.ENTITY_WORKER_ID_COL, id,
HeavyHitterModel.class);
+
+ for (var heavyHitterEntity :
stats.heavyHitters) {
+
entityRepository.createEntity(heavyHitterEntity);
+ }
+
return true;
});
}