This is an automated email from the ASF dual-hosted git repository. avijayan pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit b1ec22e38d60ffdda5cb7a65f9779da9de8b8a3c Author: Aravindan Vijayan <[email protected]> AuthorDate: Tue May 30 13:35:54 2017 -0700 AMBARI-21106 : Ambari Metrics Anomaly detection prototype (Commit 3). (avijayan) --- .../metrics/alertservice/R/RFunctionInvoker.java | 15 ++-- .../src/main/resources/R-scripts/ema.R | 79 ++++++++++++++++++++++ .../src/main/resources/R-scripts/hsdev.r | 60 ++++++++++++++++ .../src/main/resources/R-scripts/iforest.R | 35 ++++++++++ .../src/main/resources/R-scripts/kstest.r | 21 ++++++ .../src/main/resources/R-scripts/test.R | 67 ++++++++++++++++++ .../src/main/resources/R-scripts/tukeys.r | 26 +++++++ .../src/main/resources/R-scripts/util.R | 19 ++++++ 8 files changed, 312 insertions(+), 10 deletions(-) diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java index 8d1e520..71ad66d 100644 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/R/RFunctionInvoker.java @@ -31,8 +31,7 @@ public class RFunctionInvoker { public static ResultSet tukeys(DataSet trainData, DataSet testData, Map<String, String> configs) { try { - r.eval("library(ambarimetricsAD)"); - r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/tukeys.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + r.eval("source('tukeys.r', echo=TRUE)"); int n = Integer.parseInt(configs.get("tukeys.n")); r.eval("n <- " + n); @@ -57,8 +56,7 @@ public class RFunctionInvoker { public static ResultSet ema_global(DataSet trainData, DataSet testData, Map<String, String> configs) { try { - r.eval("library(ambarimetricsAD)"); - r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + r.eval("source('ema.R', echo=TRUE)"); int n = Integer.parseInt(configs.get("ema.n")); r.eval("n <- " + n); @@ -87,8 +85,7 @@ public class RFunctionInvoker { public static ResultSet ema_daily(DataSet trainData, DataSet testData, Map<String, String> configs) { try { - r.eval("library(ambarimetricsAD)"); - r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/ema.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + r.eval("source('ema.R', echo=TRUE)"); int n = Integer.parseInt(configs.get("ema.n")); r.eval("n <- " + n); @@ -117,8 +114,7 @@ public class RFunctionInvoker { public static ResultSet ksTest(DataSet trainData, DataSet testData, Map<String, String> configs) { try { - r.eval("library(ambarimetricsAD)"); - r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/kstest.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + r.eval("source('kstest.r', echo=TRUE)"); double p_value = Double.parseDouble(configs.get("ks.p_value")); r.eval("p_value <- " + p_value); @@ -144,8 +140,7 @@ public class RFunctionInvoker { public static ResultSet hsdev(DataSet trainData, DataSet testData, Map<String, String> configs) { try { - r.eval("library(ambarimetricsAD)"); - r.eval("source('~/dev/AMS/AD/ambarimetricsAD/org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R/hsdev.org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.alerting.R', echo=TRUE)"); + r.eval("source('hsdev.r', echo=TRUE)"); int n = Integer.parseInt(configs.get("hsdev.n")); r.eval("n <- " + n); diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R new file mode 100644 index 0000000..d3188f0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R @@ -0,0 +1,79 @@ +# EMA <- w * EMA + (1 - w) * x +# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 ) +# Alarm = abs(x - EMA) > n * EMS + +ema_global <- function(train_data, test_data, w, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + ema <- 0 + ems <- 0 + + #Train Step + for (x in train_data) { + ema <- w*ema + (1-w)*x + ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) + } + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + if (abs(x - ema) > n*ems) { + anomaly <- c(as.numeric(test_data[i,1]), x) + # print (anomaly) + anomalies <- rbind(anomalies, anomaly) + } + ema <- w*ema + (1-w)*x + ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value") + } + return (anomalies) +} + +ema_daily <- function(train_data, test_data, w, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + ema <- vector("numeric", 7) + ems <- vector("numeric", 7) + + #Train Step + for ( i in 1:length(train_data[,1])) { + x <- train_data[i,2] + time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") + index <- time$wday + ema[index] <- w*ema[index] + (1-w)*x + ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2) + } + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") + index <- time$wday + + if (abs(x - ema[index+1]) > n*ems[index+1]) { + anomaly <- c(as.numeric(test_data[i,1]), x) + # print (anomaly) + anomalies <- rbind(anomalies, anomaly) + } + ema[index+1] <- w*ema[index+1] + (1-w)*x + ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value") + } + return(anomalies) +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r new file mode 100644 index 0000000..ff8a8f7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r @@ -0,0 +1,60 @@ +hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) { + + #res <- get_data(url) + #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + #names(data) <- c("TS", res$metrics[[1]]$metricname) + anomalies <- data.frame() + + granularity <- train_data[2,1] - train_data[1,1] + test_start <- test_data[1,1] + test_end <- test_data[length(test_data[1,]),1] + cat ("\n test_start : ", as.numeric(test_start)) + train_start <- test_start - num_historic_periods*period + cat ("\n train_start : ", as.numeric(train_start)) + # round to start of day + train_start <- train_start - (train_start %% interval) + cat ("\n train_start after rounding: ", as.numeric(train_start)) + + time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT") + test_data_day <- time$wday + + h_data <- c() + for ( i in length(train_data[,1]):1) { + ts <- train_data[i,1] + if ( ts < train_start) { + cat ("\n Breaking out of loop : ", ts) + break + } + time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT") + if (time$wday == test_data_day) { + x <- train_data[i,2] + h_data <- c(h_data, x) + } + } + + cat ("\n Train data length : ", length(train_data[,1])) + cat ("\n Test data length : ", length(test_data[,1])) + cat ("\n Historic data length : ", length(h_data)) + if (length(h_data) < 2*length(test_data[,1])) { + cat ("\nNot enough training data") + return (anomalies) + } + + past_median <- median(h_data) + cat ("\npast_median : ", past_median) + past_sd <- sd(h_data) + cat ("\npast_sd : ", past_sd) + curr_median <- median(test_data[,2]) + cat ("\ncurr_median : ", curr_median) + + if (abs(curr_median - past_median) > n * past_sd) { + anomaly <- c(test_start, test_end, curr_median, past_median, past_sd) + anomalies <- rbind(anomalies, anomaly) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", " Past SD") + } + + return (anomalies) +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R new file mode 100644 index 0000000..1e0c534 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R @@ -0,0 +1,35 @@ +ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) { + + res <- get_data(url) + num_metrics <- length(res$metrics) + anomalies <- data.frame() + + metricname <- res$metrics[[1]]$metricname + data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + names(data) <- c("TS", res$metrics[[1]]$metricname) + + for (i in 2:num_metrics) { + metricname <- res$metrics[[i]]$metricname + df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics)) + names(df) <- c("TS", res$metrics[[i]]$metricname) + data <- merge(data, df) + } + + algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)] + iForest <- IsolationTrees(algo_data) + test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ] + + if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest) + for (i in 1:length(if_res$outF)) { + index <- test_start+i-1 + if (if_res$outF[i] > threshold_score) { + anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i]) + anomalies <- rbind(anomalies, anomaly) + } + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Anomaly Score", "Path length") + } + return (anomalies) +} \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r new file mode 100644 index 0000000..af21038 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r @@ -0,0 +1,21 @@ +ams_ks <- function(train_data, test_data, p_value) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2] + + anomalies <- data.frame() + res <- ks.test(train_data, test_data[,2]) + + if (res[2] < p_value) { + anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2]) + anomalies <- rbind(anomalies, anomaly) + } + + if(length(anomalies) > 0) { + names(anomalies) <- c("TS Start", "TS End", "D", "p-value") + } + return (anomalies) +} \ No newline at end of file diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R new file mode 100644 index 0000000..e66049f --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R @@ -0,0 +1,67 @@ +tukeys_anomalies <- data.frame() +ema_global_anomalies <- data.frame() +ema_daily_anomalies <- data.frame() +ks_anomalies <- data.frame() +hsdev_anomalies <- data.frame() + +init <- function() { + tukeys_anomalies <- data.frame() + ema_global_anomalies <- data.frame() + ema_daily_anomalies <- data.frame() + ks_anomalies <- data.frame() + hsdev_anomalies <- data.frame() +} + +test_methods <- function(data) { + + init() + #res <- get_data(url) + #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) + #names(data) <- c("TS", res$metrics[[1]]$metricname) + + limit <- data[length(data[,1]),1] + step <- data[2,1] - data[1,1] + + train_start <- data[1,1] + train_end <- get_next_day_boundary(train_start, step, limit) + test_start <- train_end + step + test_end <- get_next_day_boundary(test_start, step, limit) + i <- 1 + day <- 24*60*60*1000 + + while (test_start < limit) { + + print (i) + i <- i + 1 + train_data <- data[which(data$TS >= train_start & data$TS <= train_end),] + test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3)) + #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3)) + #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3)) + #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05)) + hsdev_train_data <- data[which(data$TS < test_start),] + hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day)) + + train_start <- test_start + train_end <- get_next_day_boundary(train_start, step, limit) + test_start <- train_end + step + test_end <- get_next_day_boundary(test_start, step, limit) + } + return (hsdev_anomalies) +} + +get_next_day_boundary <- function(start, step, limit) { + + if (start > limit) { + return (-1) + } + + while (start <= limit) { + if (((start %% (24*60*60*1000)) - 28800000) == 0) { + return (start) + } + start <- start + step + } + return (start) +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r new file mode 100644 index 0000000..38f71f2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r @@ -0,0 +1,26 @@ +ams_tukeys <- function(train_data, test_data, n) { + +# res <- get_data(url) +# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) +# names(data) <- c("TS", res$metrics[[1]]$metricname) +# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] +# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] + + anomalies <- data.frame() + quantiles <- quantile(train_data[,2]) + iqr <- quantiles[4] - quantiles[2] + + for ( i in 1:length(test_data[,1])) { + x <- test_data[i,2] + lb <- quantiles[2] - n*iqr + ub <- quantiles[4] + n*iqr + if ( (x < lb) || (x > ub) ) { + anomaly <- c(test_data[i,1], x) + anomalies <- rbind(anomalies, anomaly) + } + } + if(length(anomalies) > 0) { + names(anomalies) <- c("TS", "Value") + } + return (anomalies) +} diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R new file mode 100644 index 0000000..eb19d37 --- /dev/null +++ b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/util.R @@ -0,0 +1,19 @@ +#url_prefix = 'http://104.196.95.78:3000/api/datasources/proxy/1/ws/v1/timeline/metrics?' +#url_suffix = '&startTime=1459972944&endTime=1491508944&precision=MINUTES' +#data_url <- paste(url_prefix, query, sep ="") +#data_url <- paste(data_url, url_suffix, sep="") + +get_data <- function(url) { + library(rjson) + res <- fromJSON(readLines(url)[1]) + return (res) +} + +find_index <- function(data, ts) { + for (i in 1:length(data)) { + if (as.numeric(ts) == as.numeric(data[i])) { + return (i) + } + } + return (-1) +} \ No newline at end of file -- To stop receiving notification emails like this one, please contact [email protected].
