This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new f32a8e8408 [SYSTEMDS-3588] Federated Remote Monitoring
f32a8e8408 is described below
commit f32a8e84085250785797c4de41f54da53f7af98a
Author: baunsgaard <[email protected]>
AuthorDate: Wed Jun 28 00:44:30 2023 +0200
[SYSTEMDS-3588] Federated Remote Monitoring
This commit fixes the federated remote monitoring to no longer break
the event timeline in case the federated remote is tunneled through ssh,
via localhost proxy ports.
Closes #1852
---
.../src/app/modules/events/view/view.component.ts | 47 +-----
scripts/tutorials/federated/code/exp/CNN.dml | 3 +-
.../federated/code/exp/{CNN.dml => CNNLong.dml} | 5 +-
.../federated/code/exp/{mLogReg.dml => adult.dml} | 29 ++--
.../tutorials/federated/code/exp/adult_spec1.json | 5 +
.../code/exp/{mLogReg.dml => aggRepeat.dml} | 15 +-
.../federated/code/exp/{mLogReg.dml => criteo.dml} | 27 +++-
.../tutorials/federated/code/exp/criteo_spec1.json | 71 +++++++++
scripts/tutorials/federated/code/exp/mLogReg.dml | 2 +-
scripts/tutorials/federated/code/network/CNN.dml | 3 +-
scripts/tutorials/federated/install.sh | 6 +-
scripts/tutorials/federated/parameters.sh | 21 ++-
scripts/tutorials/federated/portforward.sh | 10 +-
scripts/tutorials/federated/run.sh | 36 +++--
scripts/tutorials/federated/setup.sh | 12 ++
scripts/tutorials/federated/startAllWorkers.sh | 10 +-
scripts/tutorials/federated/stopAllWorkers.sh | 2 +
scripts/tutorials/federated/sync.sh | 1 +
.../federated/FederatedStatistics.java | 160 ++++++++++++++-------
.../federated/FederatedWorkerHandler.java | 2 +
.../controllers/StatisticsController.java | 6 +
.../monitoring/controllers/WorkerController.java | 18 ++-
.../federated/monitoring/models/EventModel.java | 19 +--
.../monitoring/models/StatisticsOptions.java | 2 +-
.../monitoring/repositories/DerbyRepository.java | 23 ++-
.../monitoring/services/StatisticsService.java | 37 +++--
.../monitoring/services/WorkerService.java | 15 +-
27 files changed, 402 insertions(+), 185 deletions(-)
diff --git a/scripts/monitoring/src/app/modules/events/view/view.component.ts
b/scripts/monitoring/src/app/modules/events/view/view.component.ts
index 6329f62370..641bffbb6f 100644
--- a/scripts/monitoring/src/app/modules/events/view/view.component.ts
+++ b/scripts/monitoring/src/app/modules/events/view/view.component.ts
@@ -61,10 +61,7 @@ export class ViewWorkerEventsComponent {
this.fedSiteService.getStatisticsPolling(id,
this.stopPollingStatistics).subscribe(stats => {
this.statistics = stats;
-
- const timeframe = this.getTimeframe();
- const minVal = this.getLastSeconds(timeframe[1], 3);
-
+
const coordinatorNames = this.getCoordinatorNames();
for (const coordinatorName of coordinatorNames) {
@@ -73,7 +70,7 @@ export class ViewWorkerEventsComponent {
const canvas: any =
document.createElement("canvas");
canvas.width = 400;
eventSectionEle.appendChild(canvas);
-
+ Chart.defaults.font.size = 24;
this.eventTimelineChart[coordinatorName] = new Chart(canvas.getContext('2d'), {
type: 'bar',
data: {
@@ -115,14 +112,14 @@ export class ViewWorkerEventsComponent {
display: true,
text:
`Event timeline of worker with respect to coordinator ${coordinatorName}`
}
+
},
scales: {
x: {
min: 0,
ticks: {
callback: function(value, index, ticks) {
-
// @ts-ignore
-
return new Date(minVal + value).toLocaleTimeString();
+
return value + " ms";
}
},
stacked: true
@@ -140,42 +137,6 @@ export class ViewWorkerEventsComponent {
});
}
- private getLastSeconds(time: number, seconds: number): number {
- const benchmark = new Date(time);
-
- const back = new Date(time);
- back.setSeconds(benchmark.getSeconds() - seconds)
-
- return back.getTime();
- }
-
- private getTimeframe() {
- const coordinatorNames = this.getCoordinatorNames();
- let minTime = 0;
- let maxTime = 0;
-
- coordinatorNames.forEach(c => {
- const coordinatorEvents =
this.statistics.events.filter(e => e.coordinatorName === c);
-
- for (const event of coordinatorEvents) {
- for (const stage of event.stages) {
- let startTime = new
Date(stage.startTime).getTime();
- let endTime = new
Date(stage.endTime).getTime();
-
- if (startTime < minTime) {
- minTime = startTime;
- }
-
- if (endTime > maxTime) {
- maxTime = endTime;
- }
- }
- }
- })
-
- return [minTime, maxTime];
- }
-
private getCoordinatorNames() {
let names: string[] = [];
diff --git a/scripts/tutorials/federated/code/exp/CNN.dml
b/scripts/tutorials/federated/code/exp/CNN.dml
index ec033210a0..ba41ad1fcf 100644
--- a/scripts/tutorials/federated/code/exp/CNN.dml
+++ b/scripts/tutorials/federated/code/exp/CNN.dml
@@ -28,7 +28,8 @@ Yt = read($4)
epochs = 2
learning_rate = 0.1
utype = "BSP"
-freq = "EPOCH"
+# freq = "EPOCH"
+freq= "NBATCHES"
# freq="BATCH"
batch_size = 128
scheme = "DISJOINT_CONTIGUOUS"
diff --git a/scripts/tutorials/federated/code/exp/CNN.dml
b/scripts/tutorials/federated/code/exp/CNNLong.dml
similarity index 96%
copy from scripts/tutorials/federated/code/exp/CNN.dml
copy to scripts/tutorials/federated/code/exp/CNNLong.dml
index ec033210a0..a1be6164e7 100644
--- a/scripts/tutorials/federated/code/exp/CNN.dml
+++ b/scripts/tutorials/federated/code/exp/CNNLong.dml
@@ -25,10 +25,11 @@ Y = read($2)
Xt = read($3) / 255
Yt = read($4)
-epochs = 2
+epochs = 100
learning_rate = 0.1
utype = "BSP"
-freq = "EPOCH"
+# freq = "EPOCH"
+freq= "NBATCHES"
# freq="BATCH"
batch_size = 128
scheme = "DISJOINT_CONTIGUOUS"
diff --git a/scripts/tutorials/federated/code/exp/mLogReg.dml
b/scripts/tutorials/federated/code/exp/adult.dml
similarity index 55%
copy from scripts/tutorials/federated/code/exp/mLogReg.dml
copy to scripts/tutorials/federated/code/exp/adult.dml
index 97b8bbf755..aa4084aa25 100644
--- a/scripts/tutorials/federated/code/exp/mLogReg.dml
+++ b/scripts/tutorials/federated/code/exp/adult.dml
@@ -19,15 +19,28 @@
#
#-------------------------------------------------------------
-X = read($1)
-Y = read($2) + 1
-Xt = read($3)
-Yt = read($4) + 1
+# create federated matrix
+N = 32561
+M = 15
+data = federated(type="frame",
+ addresses=list("localhost:8001/./data/adult.data",
+ "localhost:8002/./data/adult.data",
+ "localhost:8003/./data/adult.data"),
+ ranges= list(list(0,0),list(N/3,M),
+ list(N/3,0),list((N/3)*2,M),
+ list((N/3)*2,0),list(N, M))
+)
-beta = multiLogReg(X=X, Y=Y, verbose=$5, maxi=100)
+# transform encode
+jspec1 = read("code/exp/adult_spec1.json", data_type="scalar",
value_type="string")
+[X0, M] = transformencode(target=data, spec=jspec1)
-[m, pred, acc] = multiLogRegPredict(X=Xt, B=beta, Y=Yt)
-
-[co, avg] = confusionMatrix(P=pred, Y=Yt)
+# model training
+y = X0[,ncol(X0)]
+X = X0[,2:(ncol(X0)-1)]
+B = multiLogReg(X=X, Y=y, icpt=2, verbose=TRUE)
+# Predicting
+[m, pred, acc] = multiLogRegPredict(X=X, B=B, Y=y, verbose=TRUE)
+[co, avg] = confusionMatrix(P=pred, Y=y)
print(toString(avg))
diff --git a/scripts/tutorials/federated/code/exp/adult_spec1.json
b/scripts/tutorials/federated/code/exp/adult_spec1.json
new file mode 100644
index 0000000000..8e3ff45504
--- /dev/null
+++ b/scripts/tutorials/federated/code/exp/adult_spec1.json
@@ -0,0 +1,5 @@
+{
+ "ids":true,
+ "recode":[15],
+ "dummycode":[2,4,6,7,8,9,10,14]
+}
diff --git a/scripts/tutorials/federated/code/exp/mLogReg.dml
b/scripts/tutorials/federated/code/exp/aggRepeat.dml
similarity index 80%
copy from scripts/tutorials/federated/code/exp/mLogReg.dml
copy to scripts/tutorials/federated/code/exp/aggRepeat.dml
index 97b8bbf755..1a5b690b6a 100644
--- a/scripts/tutorials/federated/code/exp/mLogReg.dml
+++ b/scripts/tutorials/federated/code/exp/aggRepeat.dml
@@ -20,14 +20,11 @@
#-------------------------------------------------------------
X = read($1)
-Y = read($2) + 1
-Xt = read($3)
-Yt = read($4) + 1
-beta = multiLogReg(X=X, Y=Y, verbose=$5, maxi=100)
+s = 0
+for(v in 1:$2){
+ X = X + v
+ s = s + sum(X)
+}
-[m, pred, acc] = multiLogRegPredict(X=Xt, B=beta, Y=Yt)
-
-[co, avg] = confusionMatrix(P=pred, Y=Yt)
-
-print(toString(avg))
+print("Sum of Data: " + s)
diff --git a/scripts/tutorials/federated/code/exp/mLogReg.dml
b/scripts/tutorials/federated/code/exp/criteo.dml
similarity index 54%
copy from scripts/tutorials/federated/code/exp/mLogReg.dml
copy to scripts/tutorials/federated/code/exp/criteo.dml
index 97b8bbf755..a79edba4a2 100644
--- a/scripts/tutorials/federated/code/exp/mLogReg.dml
+++ b/scripts/tutorials/federated/code/exp/criteo.dml
@@ -19,15 +19,28 @@
#
#-------------------------------------------------------------
-X = read($1)
-Y = read($2) + 1
-Xt = read($3)
-Yt = read($4) + 1
+# create federated matrix
+N = 1000000;
+M = 40;
+data = federated(type="frame",
+ addresses=list("localhost:8001/./data/criteo_day21_1M_cleaned",
+ "localhost:8002/./data/criteo_day21_1M_cleaned",
+ "localhost:8003/./data/criteo_day21_1M_cleaned"),
+ ranges= list(list(0,0),list(N/3,M),
+ list(N/3,0),list((N/3)*2,M),
+ list((N/3)*2,0),list(N, M))
+)
-beta = multiLogReg(X=X, Y=Y, verbose=$5, maxi=100)
+# transform encode
+jspec1 = read("code/exp/criteo_spec1.json", data_type="scalar",
value_type="string");
+[X0, M] = transformencode(target=data, spec=jspec1);
-[m, pred, acc] = multiLogRegPredict(X=Xt, B=beta, Y=Yt)
-[co, avg] = confusionMatrix(P=pred, Y=Yt)
+y = X0[,1] + 1;
+X = X0[,2:ncol(X0)]
+B = multiLogReg(X=X, Y=y, icpt=2, maxi=30, verbose=TRUE);
+## done
+[m, pred, acc] = multiLogRegPredict(X=X, B=B, Y=y, verbose=TRUE)
+[co, avg] = confusionMatrix(P=pred, Y=y)
print(toString(avg))
diff --git a/scripts/tutorials/federated/code/exp/criteo_spec1.json
b/scripts/tutorials/federated/code/exp/criteo_spec1.json
new file mode 100644
index 0000000000..4d59bb52ab
--- /dev/null
+++ b/scripts/tutorials/federated/code/exp/criteo_spec1.json
@@ -0,0 +1,71 @@
+{
+ "ids":true,
+
"dummycode":[15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40],
+ "bin":[
+ {
+ "id":2,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":3,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":4,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":5,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":6,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":7,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":8,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":9,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":10,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":11,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":12,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":13,
+ "method":"equi-width",
+ "numbins":10
+ },
+ {
+ "id":14,
+ "method":"equi-width",
+ "numbins":10
+ }
+ ]
+}
diff --git a/scripts/tutorials/federated/code/exp/mLogReg.dml
b/scripts/tutorials/federated/code/exp/mLogReg.dml
index 97b8bbf755..c7d20b8f96 100644
--- a/scripts/tutorials/federated/code/exp/mLogReg.dml
+++ b/scripts/tutorials/federated/code/exp/mLogReg.dml
@@ -24,7 +24,7 @@ Y = read($2) + 1
Xt = read($3)
Yt = read($4) + 1
-beta = multiLogReg(X=X, Y=Y, verbose=$5, maxi=100)
+beta = multiLogReg(X=X, Y=Y, verbose=$5, maxi=10)
[m, pred, acc] = multiLogRegPredict(X=Xt, B=beta, Y=Yt)
diff --git a/scripts/tutorials/federated/code/network/CNN.dml
b/scripts/tutorials/federated/code/network/CNN.dml
index 348e252c95..701ac489bb 100644
--- a/scripts/tutorials/federated/code/network/CNN.dml
+++ b/scripts/tutorials/federated/code/network/CNN.dml
@@ -86,7 +86,8 @@ train_paramserv = function(matrix[double] X, matrix[double] Y,
# Use paramserv function
model_trained = paramserv(model=model_list, features=X, labels=Y,
val_features=X_val,
val_labels=Y_val, upd="./code/network/CNN.dml::gradients",
agg="./code/network/CNN.dml::aggregation", mode=mode,
- utype=utype, freq=freq, epochs=epochs, batchsize=batch_size, k=workers,
hyperparams=params, checkpointing="NONE")
+ utype=utype, freq=freq, epochs=epochs, batchsize=batch_size, k=workers,
hyperparams=params, checkpointing="NONE",
+ nbatches=32)
}
predict = function(matrix[double] X, int batch_size, list[unknown] model)
diff --git a/scripts/tutorials/federated/install.sh
b/scripts/tutorials/federated/install.sh
index 9ddd94d1f9..7e112f6f38 100755
--- a/scripts/tutorials/federated/install.sh
+++ b/scripts/tutorials/federated/install.sh
@@ -38,8 +38,8 @@ fi
## Install remotes
for index in ${!address[*]}; do
+ echo "Installing for: ${address[$index]}"
if [ "${address[$index]}" != "localhost" ]; then
- echo "Installing for: ${address[$index]}"
# Install SystemDS on system.
ssh -T ${address[$index]} "
mkdir -p github;
@@ -48,11 +48,13 @@ for index in ${!address[*]}; do
cd systemds;
git reset --hard origin/master > /dev/null 2>&1;
git pull > /dev/null 2>&1;
- mvn clean package -P distribution > /dev/null 2>&1;
+ mvn clean package -P distribution | grep -E 'BUILD SUCCESS|BUILD FA';
echo 'Installed Systemds on' \$HOSTNAME;
cd \$HOME
mkdir -p ${remoteDir}
" &
+ # git merge FETCH_HEAD > /dev/null 2>&1;
+ # git fetch origin pull/1852/head > /dev/null 2>&1;
fi
done
diff --git a/scripts/tutorials/federated/parameters.sh
b/scripts/tutorials/federated/parameters.sh
index 77abb3de3a..57dbd6af4b 100755
--- a/scripts/tutorials/federated/parameters.sh
+++ b/scripts/tutorials/federated/parameters.sh
@@ -21,7 +21,6 @@
#-------------------------------------------------------------
# Memory allowed to be used by each worker and coordinator
-export SYSTEMDS_STANDALONE_OPTS="-Xmx4g -Xms4g -Xmn400m"
# Path to the systemds clone of the repository.
export SYSTEMDS_ROOT="$HOME/github/systemds"
# Add SystemDS bin to path
@@ -37,18 +36,30 @@ export LOG4JPROP='conf/log4j-off.properties'
# Set the system to start up on quiet mode, to not print excessively on every
execution.
export SYSDS_QUIET=1
+# export COMMAND='java -Xmx8g -Xms8g -cp "./lib/*;./SystemDS_old.jar"
org.apache.sysds.api.DMLScript -f'
# Set the addresses of your federated workers.
-
-# address=("tango" "delta" "india" "echo")
-# address=("tango" "delta")
-
+# address=("so007" "so004" "so005" "so006")
address=("localhost" "localhost" "localhost" "localhost")
# We assume for the scripts to work that each worker have a unique port
ports=("8001" "8002" "8003" "8004")
numWorkers=${#address[@]}
+# Set memory usage:
+addressesString=${address// /|}
+## if distributed set memory higher!
+if [[ "$addressesString" == *"so0"* ]]; then
+ export SYSTEMDS_STANDALONE_OPTS="-Xmx16g -Xms16g -Xmn1600m"
+else
+ export SYSTEMDS_STANDALONE_OPTS="-Xmx4g -Xms4g -Xmn400m"
+fi
+
+if [[ $HOSTNAME == *"so0"* ]]; then
+ ## Set scale out nodes memory higher!
+ export SYSTEMDS_STANDALONE_OPTS="-Xmx230g -Xms230g -Xmn23000m"
+fi
+
# If remote workers are used make and use this directory on the sites.
# Note this is a directory relative to the $home on the sites.
remoteDir="github/federatedTutorial-v3/"
diff --git a/scripts/tutorials/federated/portforward.sh
b/scripts/tutorials/federated/portforward.sh
index 5418c487df..217742e012 100755
--- a/scripts/tutorials/federated/portforward.sh
+++ b/scripts/tutorials/federated/portforward.sh
@@ -23,8 +23,14 @@
source parameters.sh
for index in ${!address[*]}; do
- echo ${ports[$index]}: ${address[$index]}:${ports[$index]}
- ssh -f -N -L ${ports[$index]}:${address[$index]}:${ports[$index]}
${address[$index]} &
+ if [ "${address[$index]}" == "localhost" ]; then
+ echo No Port forward for localhost
+ else
+ echo ${ports[$index]}: ${address[$index]}:${ports[$index]}
+ ssh -f -N -L ${ports[$index]}:${address[$index]}:${ports[$index]}
${address[$index]} &
+ mkdir -p tmp/worker/
+ echo $! > tmp/worker/pF$HOSTNAME-${address[$index]}
+ fi
done
wait
diff --git a/scripts/tutorials/federated/run.sh
b/scripts/tutorials/federated/run.sh
index 402dcab9b7..b4f64dc051 100755
--- a/scripts/tutorials/federated/run.sh
+++ b/scripts/tutorials/federated/run.sh
@@ -29,21 +29,20 @@ source parameters.sh
# Get execution explaination
# systemds code/exp/sum.dml -explain -args $x
-
# Execute a Linear model algorithm
# systemds code/exp/lm.dml \
# -config conf/$conf.xml \
# -stats 100 \
# -debug \
# -args $x $y_hot TRUE "results/fed_mnist_${numWorkers}.res" \
-# -fedMonitoringAddress "http://localhost:8080"
+# -fedMonitoringAddress "http://localhost:8080"
# Execute a Multi Log Regression model, do prediction and print confusion
matrix
-systemds code/exp/mLogReg.dml \
- -config conf/$conf.xml \
- -stats 30 \
- -args $x $y $xt $yt TRUE \
- -fedMonitoringAddress "http://localhost:8080"
+# systemds code/exp/mLogReg.dml \
+# -config conf/$conf.xml \
+# -stats 30 \
+# -args $x $y $xt $yt TRUE \
+# -fedMonitoringAddress "http://localhost:8080"
# Execute locally to compare
# systemds code/exp/mLogReg.dml \
@@ -56,9 +55,30 @@ systemds code/exp/mLogReg.dml \
# -args $x $y_hot $xt $yt_hot \
# -fedMonitoringAddress "http://localhost:8080"
+systemds code/exp/CNNLong.dml \
+ -stats \
+ -args $x $y_hot $xt $yt_hot \
+ -fedMonitoringAddress "http://localhost:8080"
# systemds code/exp/sumRepeat.dml \
# -config conf/$conf.xml \
# -stats 30 \
# -args $x 100 \
-# -fedMonitoringAddress "http://localhost:8080"
+# -fedMonitoringAddress "http://localhost:8080"
+
+# systemds code/exp/aggRepeat.dml \
+# -config conf/$conf.xml \
+# -stats 30 \
+# -args $x 100 \
+# -fedMonitoringAddress "http://localhost:8080"
+
+# systemds code/exp/adult.dml \
+# -config conf/$conf.xml \
+# -stats 30 \
+# -debug \
+# -fedMonitoringAddress "http://localhost:8080"
+
+# systemds code/exp/criteo.dml \
+# -config conf/$conf.xml \
+# -stats 30 \
+# -fedMonitoringAddress "http://localhost:8080"
diff --git a/scripts/tutorials/federated/setup.sh
b/scripts/tutorials/federated/setup.sh
index a3846080f7..db817d9e3b 100755
--- a/scripts/tutorials/federated/setup.sh
+++ b/scripts/tutorials/federated/setup.sh
@@ -70,6 +70,7 @@ datasets="mnist_features mnist_labels mnist_labels_hot
mnist_test_features mnist
for name in $datasets; do
if [[ ! -f "data/${name}_${numWorkers}_1.data.mtd" ]]; then
echo "Generating data/${name}_${numWorkers}_1.data"
+ sleep 0.2
systemds code/dataGen/slice.dml \
-config conf/def.xml \
-args $name $numWorkers &
@@ -81,12 +82,23 @@ wait
# Distribute the slices to individual workers.
for index in ${!address[@]}; do
if [ "${address[$index]}" != "localhost" ]; then
+ sleep 0.2
echo "Syncronize and distribute data partitions."
## File ID is the federated Indentification number
fileId=$((index + 1))
# ssh -q ${address[$index]} [[ -f
"${remoteDir}/data/mnist_features_${numWorkers}_${fileId}.data" ]] &&
# echo "Skipping transfer since ${address[$index]} already have
the file" ||
rsync -ah -e ssh --include="**_${numWorkers}_${fileId}.da**"
--exclude='*' data/ ${address[$index]}:$remoteDir/data/ &
+
+ if [[ -f "data/adult.data" ]]; then
+ sleep 0.1
+ rsync -ah -e ssh --include="adult.dat**" data/
${address[$index]}:$remoteDir/data/ &
+ fi
+
+ if [[ -f "data/criteo_day21_1M_cleaned" ]]; then
+ sleep 0.1
+ rsync -ah -e ssh --include="criteo_day21_1M_cleane**" data/
${address[$index]}:$remoteDir/data/ &
+ fi
fi
done
diff --git a/scripts/tutorials/federated/startAllWorkers.sh
b/scripts/tutorials/federated/startAllWorkers.sh
index 02176d0da2..68e2ce6091 100755
--- a/scripts/tutorials/federated/startAllWorkers.sh
+++ b/scripts/tutorials/federated/startAllWorkers.sh
@@ -26,6 +26,7 @@ source parameters.sh
echo "Starting Workers."
for index in ${!address[*]}; do
+ sleep 0.1
if [ "${address[$index]}" == "localhost" ]; then
./scripts/startWorker.sh ${ports[$index]} $conf &
else
@@ -38,9 +39,16 @@ done
./scripts/startMonitoring.sh
for index in ${!address[*]}; do
+ # curl \
+ # --header "Content-Type: application/json" \
+ # --data "{\"name\":\"Worker -
${ports[$index]}\",\"address\":\"${address[$index]}:${ports[$index]}\"}" \
+ # http://localhost:8080/workers > /dev/null
+
+ ## Always localhost because we have firewall up.
+ sleep 0.1
curl \
--header "Content-Type: application/json" \
- --data "{\"name\":\"Worker -
${ports[$index]}\",\"address\":\"${address[$index]}:${ports[$index]}\"}" \
+ --data "{\"name\":\"Worker -
${ports[$index]}\",\"address\":\"localhost:${ports[$index]}\"}" \
http://localhost:8080/workers > /dev/null
done
diff --git a/scripts/tutorials/federated/stopAllWorkers.sh
b/scripts/tutorials/federated/stopAllWorkers.sh
index d9384eb130..b75990ce45 100755
--- a/scripts/tutorials/federated/stopAllWorkers.sh
+++ b/scripts/tutorials/federated/stopAllWorkers.sh
@@ -29,12 +29,14 @@ for index in ${!address[*]}; do
if [ "${address[$index]}" != "localhost" ]; then
# remote workers stop.
ssh ${address[$index]} " cd ${remoteDir}; ./scripts/stopWorker.sh" &
+ rsync -ah -e ssh ${address[$index]}:$remoteDir/tmp/ &
else
# stop all localhost workers.
./scripts/stopWorker.sh
fi
done
+./scripts/stopWorker.sh
./scripts/stopMonitoring.sh
wait
diff --git a/scripts/tutorials/federated/sync.sh
b/scripts/tutorials/federated/sync.sh
index 51571b181c..0efefea870 100755
--- a/scripts/tutorials/federated/sync.sh
+++ b/scripts/tutorials/federated/sync.sh
@@ -24,6 +24,7 @@ source parameters.sh
# Synchronize code and setup.
for index in ${!address[*]}; do
+ sleep 0.2
# echo "${address[$index]}"
if [ "${address[$index]}" != "localhost" ]; then
# Make the required directiories
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
index d4813317e1..189d3c72bb 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedStatistics.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.log4j.Logger;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
@@ -68,6 +69,8 @@ import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.utils.Statistics;
public class FederatedStatistics {
+ protected static Logger LOG =
Logger.getLogger(FederatedStatistics.class);
+
// stats of the federated worker on the coordinator site
private static Set<Pair<String, Integer>> _fedWorkerAddresses = new
HashSet<>();
private static final LongAdder readCount = new LongAdder();
@@ -88,7 +91,7 @@ public class FederatedStatistics {
// stats on the federated worker itself
private static final LongAdder fedLookupTableGetCount = new LongAdder();
- private static final LongAdder fedLookupTableGetTime = new LongAdder();
// in milli sec
+ private static final LongAdder fedLookupTableGetTime = new LongAdder();
// msec
private static final LongAdder fedLookupTableEntryCount = new
LongAdder();
private static final LongAdder fedReuseReadHitCount = new LongAdder();
private static final LongAdder fedReuseReadBytesCount = new LongAdder();
@@ -477,17 +480,13 @@ public class FederatedStatistics {
public static List<TrafficModel> getCoordinatorsTrafficBytes() {
var result = new ArrayList<>(coordinatorsTrafficBytes);
-
coordinatorsTrafficBytes.clear();
-
return result;
}
public static List<EventModel> getWorkerEvents() {
var result = new ArrayList<>(workerEvents);
-
workerEvents.clear();
-
return result;
}
public static List<RequestModel> getWorkerRequests() {
@@ -501,6 +500,7 @@ public class FederatedStatistics {
public static void addEvent(EventModel event) {
workerEvents.add(event);
}
+
public static void addWorkerRequest(RequestModel request) {
if (!workerFederatedRequests.containsKey(request.type)) {
workerFederatedRequests.put(request.type, request);
@@ -508,9 +508,11 @@ public class FederatedStatistics {
workerFederatedRequests.get(request.type).count++;
}
+
public static void addDataObject(DataObjectModel dataObject) {
workerDataObjects.put(dataObject.varName, dataObject);
}
+
public static void removeDataObjects() {
workerDataObjects.clear();
}
@@ -674,8 +676,21 @@ public class FederatedStatistics {
}
public static class FedStatsCollection implements Serializable {
+ // TODO fix this class to use shallow pointers.
private static final long serialVersionUID = 1L;
+ private CacheStatsCollection cacheStats = new
CacheStatsCollection();
+ public double jitCompileTime = 0;
+ public UtilizationModel utilization = new UtilizationModel(0.0,
0.0);
+ private GCStatsCollection gcStats = new GCStatsCollection();
+ private LineageCacheStatsCollection linCacheStats = new
LineageCacheStatsCollection();
+ private MultiTenantStatsCollection mtStats = new
MultiTenantStatsCollection();
+ public HashMap<String, Pair<Long, Double>> heavyHitters = new
HashMap<>();
+ public List<TrafficModel> coordinatorsTrafficBytes = new
ArrayList<>();
+ public List<EventModel> workerEvents = new ArrayList<>();
+ public List<DataObjectModel> workerDataObjects = new
ArrayList<>();
+ public List<RequestModel> workerRequests = new ArrayList<>();
+
private void collectStats() {
cacheStats.collectStats();
jitCompileTime =
((double)Statistics.getJITCompileTime()) / 1000; // in sec
@@ -710,6 +725,20 @@ public class FederatedStatistics {
protected static class CacheStatsCollection implements
Serializable {
private static final long serialVersionUID = 1L;
+ private long memHits = 0;
+ private long linHits = 0;
+ private long fsBuffHits = 0;
+ private long fsHits = 0;
+ private long hdfsHits = 0;
+ private long linWrites = 0;
+ private long fsBuffWrites = 0;
+ private long fsWrites = 0;
+ private long hdfsWrites = 0;
+ private double acqRTime = 0;
+ private double acqMTime = 0;
+ private double rlsTime = 0;
+ private double expTime = 0;
+
private void collectStats() {
memHits = CacheStatistics.getMemHits();
linHits = CacheStatistics.getLinHits();
@@ -720,10 +749,10 @@ public class FederatedStatistics {
fsBuffWrites =
CacheStatistics.getFSBuffWrites();
fsWrites = CacheStatistics.getFSWrites();
hdfsWrites = CacheStatistics.getHDFSWrites();
- acqRTime =
((double)CacheStatistics.getAcquireRTime()) / 1000000000; // in sec
- acqMTime =
((double)CacheStatistics.getAcquireMTime()) / 1000000000; // in sec
- rlsTime =
((double)CacheStatistics.getReleaseTime()) / 1000000000; // in sec
- expTime =
((double)CacheStatistics.getExportTime()) / 1000000000; // in sec
+ acqRTime = ((double)
CacheStatistics.getAcquireRTime()) / 1000000000; // in sec
+ acqMTime = ((double)
CacheStatistics.getAcquireMTime()) / 1000000000; // in sec
+ rlsTime = ((double)
CacheStatistics.getReleaseTime()) / 1000000000; // in sec
+ expTime = ((double)
CacheStatistics.getExportTime()) / 1000000000; // in sec
}
private void aggregate(CacheStatsCollection that) {
@@ -742,19 +771,25 @@ public class FederatedStatistics {
expTime += that.expTime;
}
- private long memHits = 0;
- private long linHits = 0;
- private long fsBuffHits = 0;
- private long fsHits = 0;
- private long hdfsHits = 0;
- private long linWrites = 0;
- private long fsBuffWrites = 0;
- private long fsWrites = 0;
- private long hdfsWrites = 0;
- private double acqRTime = 0;
- private double acqMTime = 0;
- private double rlsTime = 0;
- private double expTime = 0;
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CacheStatsCollection:");
+ sb.append("\tmemHits:" + memHits);
+ sb.append("\tlinHits:" + linHits);
+ sb.append("\tfsBuffHits:" + fsBuffHits);
+ sb.append("\tfsHits:" + fsHits);
+ sb.append("\thdfsHits:" + hdfsHits);
+ sb.append("\tlinWrites:" + linWrites);
+ sb.append("\tfsBuffWrites:" + fsBuffWrites);
+ sb.append("\tfsWrites:" + fsWrites);
+ sb.append("\thdfsWrites:" + hdfsWrites);
+ sb.append("\tacqRTime:" + acqRTime);
+ sb.append("\tacqMTime:" + acqMTime);
+ sb.append("\trlsTime:" + rlsTime);
+ sb.append("\texpTime:" + expTime);
+ return sb.toString();
+ }
}
protected static class GCStatsCollection implements
Serializable {
@@ -777,6 +812,16 @@ public class FederatedStatistics {
protected static class LineageCacheStatsCollection implements
Serializable {
private static final long serialVersionUID = 1L;
+ private long numHitsMem = 0;
+ private long numHitsFS = 0;
+ private long numHitsDel = 0;
+ private long numHitsInst = 0;
+ private long numHitsSB = 0;
+ private long numHitsFunc = 0;
+ private long numWritesMem = 0;
+ private long numWritesFS = 0;
+ private long numMemDel = 0;
+
private void collectStats() {
numHitsMem =
LineageCacheStatistics.getMemHits();
numHitsFS = LineageCacheStatistics.getFSHits();
@@ -801,20 +846,35 @@ public class FederatedStatistics {
numMemDel += that.numMemDel;
}
- private long numHitsMem = 0;
- private long numHitsFS = 0;
- private long numHitsDel = 0;
- private long numHitsInst = 0;
- private long numHitsSB = 0;
- private long numHitsFunc = 0;
- private long numWritesMem = 0;
- private long numWritesFS = 0;
- private long numMemDel = 0;
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("numHitsMem: " + numHitsMem);
+ sb.append("\tnumHitsFS: " + numHitsFS);
+ sb.append("\tnumHitsDel: " + numHitsDel);
+ sb.append("\tnumHitsInst: " + numHitsInst);
+ sb.append("\tnumHitsSB: " + numHitsSB);
+ sb.append("\tnumHitsFunc: " + numHitsFunc);
+ sb.append("\tnumWritesMem: " + numWritesMem);
+ sb.append("\tnumWritesFS: " + numWritesFS);
+ sb.append("\tnumMemDel: " + numMemDel);
+ return sb.toString();
+ }
}
protected static class MultiTenantStatsCollection implements
Serializable {
private static final long serialVersionUID = 1L;
+ private long fLTGetCount = 0;
+ private double fLTGetTime = 0;
+ private long fLTEntryCount = 0;
+ private long reuseReadHits = 0;
+ private long reuseReadBytes = 0;
+ private long putLineageCount = 0;
+ private long putLineageItems = 0;
+ private long serializationReuseCount = 0;
+ private long serializationReuseBytes = 0;
+
private void collectStats() {
fLTGetCount = getFedLookupTableGetCount();
fLTGetTime =
((double)getFedLookupTableGetTime()) / 1000000000; // in sec
@@ -839,28 +899,26 @@ public class FederatedStatistics {
serializationReuseBytes +=
that.serializationReuseBytes;
}
- private long fLTGetCount = 0;
- private double fLTGetTime = 0;
- private long fLTEntryCount = 0;
- private long reuseReadHits = 0;
- private long reuseReadBytes = 0;
- private long putLineageCount = 0;
- private long putLineageItems = 0;
- private long serializationReuseCount = 0;
- private long serializationReuseBytes = 0;
}
- private CacheStatsCollection cacheStats = new
CacheStatsCollection();
- public double jitCompileTime = 0;
- public UtilizationModel utilization = new UtilizationModel(0.0,
0.0);
- private GCStatsCollection gcStats = new GCStatsCollection();
- private LineageCacheStatsCollection linCacheStats = new
LineageCacheStatsCollection();
- private MultiTenantStatsCollection mtStats = new
MultiTenantStatsCollection();
- public HashMap<String, Pair<Long, Double>> heavyHitters = new
HashMap<>();
- public List<TrafficModel> coordinatorsTrafficBytes = new
ArrayList<>();
- public List<EventModel> workerEvents = new ArrayList<>();
- public List<DataObjectModel> workerDataObjects = new
ArrayList<>();
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\nFedStatsCollection: ");
+ sb.append("\ncacheStats " + cacheStats);
+ sb.append("\njit " + jitCompileTime);
+ sb.append("\nutilization " + utilization);
+ sb.append("\ngcStats " + gcStats);
+ sb.append("\nlinCacheStats " + linCacheStats);
+ sb.append("\nmtStats " + mtStats);
+ sb.append("\nheavyHitters " + heavyHitters);
+ sb.append("\ncoordinatorsTrafficBytes " +
coordinatorsTrafficBytes);
+ sb.append("\nworkerEvents " + workerEvents);
+ sb.append("\nworkerDataObjects " + workerDataObjects);
+ sb.append("\nworkerRequests " + workerRequests);
+ sb.append("\n\n");
+ return sb.toString();
+ }
- public List<RequestModel> workerRequests = new ArrayList<>();
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
index 9bfb32dbee..5dde65dab4 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorkerHandler.java
@@ -663,6 +663,8 @@ public class FederatedWorkerHandler extends
ChannelInboundHandlerAdapter {
// get function and input parameters
try {
FederatedUDF udf = (FederatedUDF) request.getParam(0);
+ if(LOG.isDebugEnabled())
+ LOG.debug(udf);
eventStage.operation = udf.getClass().getSimpleName();
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/StatisticsController.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/StatisticsController.java
index a2ec3c5de4..d2b7004be7 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/StatisticsController.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/StatisticsController.java
@@ -19,6 +19,8 @@
package
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.Response;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.StatisticsOptions;
@@ -27,6 +29,7 @@ import
org.apache.sysds.runtime.controlprogram.federated.monitoring.services.Sta
import io.netty.handler.codec.http.FullHttpResponse;
public class StatisticsController implements IController {
+ private static final Log LOG =
LogFactory.getLog(StatisticsController.class.getName());
private final StatisticsService statisticsService = new
StatisticsService();
@Override
@@ -52,6 +55,9 @@ public class StatisticsController implements IController {
var result = statisticsService.getAll(objectId, options);
+ if(LOG.isDebugEnabled())
+ LOG.debug("Get -- events: " + result.events);
+
return Response.ok(result.toString());
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/WorkerController.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/WorkerController.java
index e81834e035..c6a3af625e 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/WorkerController.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/controllers/WorkerController.java
@@ -19,6 +19,8 @@
package
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.Request;
import org.apache.sysds.runtime.controlprogram.federated.monitoring.Response;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.WorkerModel;
@@ -28,6 +30,8 @@ import
org.apache.sysds.runtime.controlprogram.federated.monitoring.services.Wor
import io.netty.handler.codec.http.FullHttpResponse;
public class WorkerController implements IController {
+ private static final Log LOG =
LogFactory.getLog(WorkerController.class.getName());
+
private final WorkerService workerService = new WorkerService();
@Override
@@ -74,12 +78,12 @@ public class WorkerController implements IController {
public FullHttpResponse get(Request request, Long objectId) {
var result = workerService.get(objectId);
- if (result == null) {
+ if (result == null)
return Response.notFound(Constants.NOT_FOUND_MSG);
- }
-
+
result.setOnlineStatus(workerService.getWorkerOnlineStatus(result.id));
-
+ if(LOG.isDebugEnabled())
+ LOG.debug("Get: " + result);
return Response.ok(result.toString());
}
@@ -87,9 +91,11 @@ public class WorkerController implements IController {
public FullHttpResponse getAll(Request request) {
var workers = workerService.getAll();
- for (var worker: workers) {
+ for (var worker: workers)
worker.setOnlineStatus(workerService.getWorkerOnlineStatus(worker.id));
- }
+
+ if(LOG.isDebugEnabled())
+ LOG.debug("Get All: " + workers);
return Response.ok(workers.toString());
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/EventModel.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/EventModel.java
index e3e4dc3398..7736ca970e 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/EventModel.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/EventModel.java
@@ -31,6 +31,7 @@ public class EventModel extends CoordinatorConnectionModel {
private static final String JsonFormat = "{" +
"\"coordinatorName\": \"%s\"," +
+ "\"coordinatorId\": %d," +
"\"stages\": [%s]" +
"}";
@@ -43,27 +44,13 @@ public class EventModel extends CoordinatorConnectionModel {
this.stages = new ArrayList<>();
}
- public EventModel(final Long workerId, final Long coordinatorId) {
- this(-1L, workerId, coordinatorId);
- }
-
- public EventModel(final Long id, final Long workerId, final Long
coordinatorId) {
- this.id = id;
- this.workerId = workerId;
- this.coordinatorId = coordinatorId;
- this.stages = new ArrayList<>();
- }
-
public void setCoordinatorName(String name) {
this.coordinatorName = name;
}
@Override
public String toString() {
- String stagesStr = this.stages.stream()
- .map(EventStageModel::toString)
- .collect(Collectors.joining(","));
-
- return String.format(JsonFormat, this.coordinatorName,
stagesStr);
+ String stagesStr =
this.stages.stream().map(EventStageModel::toString).collect(Collectors.joining(","));
+ return String.format(JsonFormat, this.coordinatorName,
this.coordinatorId, stagesStr);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsOptions.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsOptions.java
index 4c05559708..d90b567671 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsOptions.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/models/StatisticsOptions.java
@@ -21,7 +21,7 @@ package
org.apache.sysds.runtime.controlprogram.federated.monitoring.models;
public class StatisticsOptions extends BaseModel {
private static final long serialVersionUID = 2524032122999491726L;
- public int rowCount = 20;
+ public int rowCount = 300;
public boolean utilization = true;
public boolean traffic = true;
public boolean events = true;
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
index 718bcf867b..760b9c51e8 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/repositories/DerbyRepository.java
@@ -19,9 +19,6 @@
package
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories;
-import org.apache.sysds.runtime.controlprogram.federated.monitoring.models.*;
-import
org.apache.sysds.runtime.controlprogram.federated.monitoring.services.MapperService;
-
import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -34,7 +31,23 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.BaseModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.CoordinatorModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.DataObjectModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.EventModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.EventStageModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.HeavyHitterModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.RequestModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.TrafficModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.UtilizationModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.WorkerModel;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.services.MapperService;
+
+
public class DerbyRepository implements IRepository {
+ protected static final Log LOG =
LogFactory.getLog(DerbyRepository.class.getName());
private final static String DB_CONNECTION = "jdbc:derby:memory:derbyDB";
private final List<BaseModel> _allEntities = new ArrayList<>(List.of(
new WorkerModel(),
@@ -236,6 +249,7 @@ public class DerbyRepository implements IRepository {
return resultModels;
}
+
public <T extends BaseModel> List<T> getAllEntitiesByField(String
fieldName, Object value, Class<T> type) {
return getAllEntitiesByField(fieldName, value, type, -1);
}
@@ -247,7 +261,7 @@ public class DerbyRepository implements IRepository {
try (var db = DriverManager.getConnection(DB_CONNECTION)) {
var entityName =
type.getSimpleName().replace(Constants.ENTITY_CLASS_SUFFIX, "");
- if (rowCount < 0) {
+ if (rowCount < 0) { // take all.
st =
db.prepareStatement(String.format(GET_ENTITY_WITH_COL_STMT, entityName,
fieldName));
} else {
st =
db.prepareStatement(String.format(GET_ENTITY_WITH_COL_LIMIT_STMT, entityName,
fieldName, rowCount));
@@ -266,7 +280,6 @@ public class DerbyRepository implements IRepository {
} catch (SQLException e) {
throw new RuntimeException(e);
}
-
return resultModels;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/StatisticsService.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/StatisticsService.java
index c72966ba58..432207819e 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/StatisticsService.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/StatisticsService.java
@@ -29,6 +29,8 @@ import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
@@ -50,6 +52,7 @@ import
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories.IRepository;
public class StatisticsService {
+ protected static final Log LOG =
LogFactory.getLog(StatisticsService.class.getName());
private static final IRepository entityRepository = new
DerbyRepository();
@@ -184,7 +187,10 @@ public class StatisticsService {
}
for (var heavyHitterEntry: aggFedStats.heavyHitters.entrySet())
{
- var newHH = new HeavyHitterModel(workerId,
heavyHitterEntry.getKey(), heavyHitterEntry.getValue().getValue(),
heavyHitterEntry.getValue().getLeft());
+ var newHH = new HeavyHitterModel(workerId, //
+ heavyHitterEntry.getKey(),
+ heavyHitterEntry.getValue().getValue(),//
+ heavyHitterEntry.getValue().getLeft());
heavyHitters.add(newHH);
}
@@ -193,11 +199,21 @@ public class StatisticsService {
private static void setCoordinatorId(CoordinatorConnectionModel entity)
{
List<CoordinatorModel> coordinators = new ArrayList<>();
- var monitoringKey = entity.getCoordinatorHostId();
+ String monitoringKey = entity.getCoordinatorHostId();
if (monitoringKey != null) {
coordinators =
entityRepository.getAllEntitiesByField(Constants.ENTITY_MONITORING_KEY_COL,
monitoringKey, CoordinatorModel.class);
}
+ if(coordinators.isEmpty()){
+ int processID =
Integer.parseInt(monitoringKey.split("-")[1]);
+ coordinators =
entityRepository.getAllEntities(CoordinatorModel.class);
+ for(CoordinatorModel c : coordinators){
+ if(c.processId == processID){
+ entity.coordinatorId = c.id;
+ return;
+ }
+ }
+ }
if (!coordinators.isEmpty()) {
entity.coordinatorId = coordinators.get(0).id;
@@ -215,20 +231,21 @@ public class StatisticsService {
if (matcher.find()) {
String host = matcher.group(2);
String portStr = matcher.group(3);
- int port = 80;
- if (portStr != null && !portStr.isBlank() &&
!portStr.isEmpty())
- port = Integer.parseInt(portStr.replace(":",
""));
+ try {
+ // Force us to use the port specified.
+ int port =
Integer.parseInt(portStr.replace(":", ""));
- InetSocketAddress isa = new InetSocketAddress(host,
port);
- FederatedRequest frUDF = new
FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
+ InetSocketAddress isa = new
InetSocketAddress(host, port);
+ FederatedRequest frUDF = new
FederatedRequest(FederatedRequest.RequestType.EXEC_UDF, -1,
new
FederatedStatistics.FedStatsCollectFunction());
- try {
result =
FederatedData.executeFederatedOperation(isa, frUDF);
- } catch(DMLRuntimeException dre) {
+ }
+ catch(DMLRuntimeException dre) {
throw dre; // caused by offline federated
workers
- } catch (Exception e) {
+ }
+ catch(Exception e) {
throw new RuntimeException(e);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
index 2882c928d5..4c7989a9f7 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/monitoring/services/WorkerService.java
@@ -29,8 +29,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
+import
org.apache.sysds.runtime.controlprogram.federated.monitoring.controllers.WorkerController;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.DataObjectModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.HeavyHitterModel;
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.models.RequestModel;
@@ -41,6 +44,7 @@ import
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories
import
org.apache.sysds.runtime.controlprogram.federated.monitoring.repositories.IRepository;
public class WorkerService {
+ protected static final Log LOG =
LogFactory.getLog(WorkerController.class.getName());
private static final IRepository entityRepository = new
DerbyRepository();
// { workerId, { workerAddress, workerStatus } }
private static final Map<Long, Pair<String, Boolean>> cachedWorkers =
new HashMap<>();
@@ -141,15 +145,14 @@ public class WorkerService {
});
}
if (stats.events != null) {
- for (var eventEntity: stats.events) {
- if (eventEntity.coordinatorId > 0) {
+ for(var eventEntity : stats.events) {
+ if(eventEntity.coordinatorId > 0) {
CompletableFuture.runAsync(()
-> {
var eventId =
entityRepository.createEntity(eventEntity);
-
- for (var stageEntity :
eventEntity.stages) {
+ for(var stageEntity :
eventEntity.stages) {
stageEntity.eventId = eventId;
-
-
entityRepository.createEntity(stageEntity);
+ if(stageEntity
!= null)
+
entityRepository.createEntity(stageEntity);
}
});
}