http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java deleted file mode 100644 index d5beb48..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.seriesgenerator; - -import java.util.Random; - -public class StepFunctionMetricSeries implements AbstractMetricSeries { - - double startValue = 0.0; - double steadyValueDeviationPercentage = 0.0; - double steadyPeriodSlope = 0.5; - int steadyPeriodMinSize = 10; - int steadyPeriodMaxSize = 20; - double stepChangePercentage = 0.0; - boolean upwardStep = true; - - Random random = new Random(); - - // y = mx + c - double y; - double m; - double x; - double c; - int currentStepSize; - int currentIndex; - - public StepFunctionMetricSeries(double startValue, - double steadyValueDeviationPercentage, - double steadyPeriodSlope, - int steadyPeriodMinSize, - int steadyPeriodMaxSize, - double stepChangePercentage, - boolean upwardStep) { - this.startValue = startValue; - this.steadyValueDeviationPercentage = steadyValueDeviationPercentage; - this.steadyPeriodSlope = steadyPeriodSlope; - this.steadyPeriodMinSize = steadyPeriodMinSize; - this.steadyPeriodMaxSize = steadyPeriodMaxSize; - this.stepChangePercentage = stepChangePercentage; - this.upwardStep = upwardStep; - init(); - } - - private void init() { - y = startValue; - m = steadyPeriodSlope; - x = 1; - c = y - (m * x); - - currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble()); - currentIndex = 0; - } - - @Override - public double nextValue() { - - double value = 0.0; - - if (currentIndex < currentStepSize) { - y = m * x + c; - double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y; - double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * y; - value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); - x++; - currentIndex++; - } - - if (currentIndex == currentStepSize) { - currentIndex = 0; - currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - steadyPeriodMinSize) * random.nextDouble()); - if (upwardStep) { - y = y + stepChangePercentage * y; - } else { - y = y - stepChangePercentage * y; - } - x = 1; - c = y - (m * x); - } - - return value; - } - - @Override - public double[] getSeries(int n) { - double[] series = new double[n]; - for (int i = 0; i < n; i++) { - series[i] = nextValue(); - } - return series; - } - -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java deleted file mode 100644 index a2b0eea..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.seriesgenerator; - -import java.util.Random; - -public class UniformMetricSeries implements AbstractMetricSeries { - - double value = 0.0; - double deviationPercentage = 0.0; - double outlierProbability = 0.0; - double outlierDeviationLowerPercentage = 0.0; - double outlierDeviationHigherPercentage = 0.0; - boolean outliersAboveValue= true; - - Random random = new Random(); - double valueDeviationLowerLimit; - double valueDeviationHigherLimit; - double outlierLeftLowerLimit; - double outlierLeftHigherLimit; - double outlierRightLowerLimit; - double outlierRightUpperLimit; - double nonOutlierProbability; - - - public UniformMetricSeries(double value, - double deviationPercentage, - double outlierProbability, - double outlierDeviationLowerPercentage, - double outlierDeviationHigherPercentage, - boolean outliersAboveValue) { - this.value = value; - this.deviationPercentage = deviationPercentage; - this.outlierProbability = outlierProbability; - this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage; - this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage; - this.outliersAboveValue = outliersAboveValue; - init(); - } - - private void init() { - valueDeviationLowerLimit = value - deviationPercentage * value; - valueDeviationHigherLimit = value + deviationPercentage * value; - - outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value; - outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value; - outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value; - outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value; - - nonOutlierProbability = 1.0 - outlierProbability; - } - - @Override - public double nextValue() { - - double value; - double probability = random.nextDouble(); - - if (probability <= nonOutlierProbability) { - value = valueDeviationLowerLimit + (valueDeviationHigherLimit - valueDeviationLowerLimit) * random.nextDouble(); - } else { - if (!outliersAboveValue) { - value = outlierLeftLowerLimit + (outlierLeftHigherLimit - outlierLeftLowerLimit) * random.nextDouble(); - } else { - value = outlierRightLowerLimit + (outlierRightUpperLimit - outlierRightLowerLimit) * random.nextDouble(); - } - } - return value; - } - - @Override - public double[] getSeries(int n) { - double[] series = new double[n]; - for (int i = 0; i < n; i++) { - series[i] = nextValue(); - } - return series; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R deleted file mode 100644 index 0b66095..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R +++ /dev/null @@ -1,96 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# EMA <- w * EMA + (1 - w) * x -# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 ) -# Alarm = abs(x - EMA) > n * EMS - -ema_global <- function(train_data, test_data, w, n) { - -# res <- get_data(url) -# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) -# names(data) <- c("TS", res$metrics[[1]]$metricname) -# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] -# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] - - anomalies <- data.frame() - ema <- 0 - ems <- 0 - - #Train Step - for (x in train_data) { - ema <- w*ema + (1-w)*x - ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) - } - - for ( i in 1:length(test_data[,1])) { - x <- test_data[i,2] - if (abs(x - ema) > n*ems) { - anomaly <- c(as.numeric(test_data[i,1]), x) - # print (anomaly) - anomalies <- rbind(anomalies, anomaly) - } - ema <- w*ema + (1-w)*x - ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2) - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Value") - } - return (anomalies) -} - -ema_daily <- function(train_data, test_data, w, n) { - -# res <- get_data(url) -# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) -# names(data) <- c("TS", res$metrics[[1]]$metricname) -# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ] -# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] - - anomalies <- data.frame() - ema <- vector("numeric", 7) - ems <- vector("numeric", 7) - - #Train Step - for ( i in 1:length(train_data[,1])) { - x <- train_data[i,2] - time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") - index <- time$wday - ema[index] <- w*ema[index] + (1-w)*x - ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2) - } - - for ( i in 1:length(test_data[,1])) { - x <- test_data[i,2] - time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" ,tz = "GMT") - index <- time$wday - - if (abs(x - ema[index+1]) > n*ems[index+1]) { - anomaly <- c(as.numeric(test_data[i,1]), x) - # print (anomaly) - anomalies <- rbind(anomalies, anomaly) - } - ema[index+1] <- w*ema[index+1] + (1-w)*x - ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2) - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Value") - } - return(anomalies) -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r deleted file mode 100644 index bca3366..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r +++ /dev/null @@ -1,67 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -hsdev_daily <- function(train_data, test_data, n, num_historic_periods, interval, period) { - - #res <- get_data(url) - #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) - #names(data) <- c("TS", res$metrics[[1]]$metricname) - anomalies <- data.frame() - - granularity <- train_data[2,1] - train_data[1,1] - test_start <- test_data[1,1] - test_end <- test_data[length(test_data[1,]),1] - train_start <- test_start - num_historic_periods*period - # round to start of day - train_start <- train_start - (train_start %% interval) - - time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" ,tz = "GMT") - test_data_day <- time$wday - - h_data <- c() - for ( i in length(train_data[,1]):1) { - ts <- train_data[i,1] - if ( ts < train_start) { - break - } - time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT") - if (time$wday == test_data_day) { - x <- train_data[i,2] - h_data <- c(h_data, x) - } - } - - if (length(h_data) < 2*length(test_data[,1])) { - cat ("\nNot enough training data") - return (anomalies) - } - - past_median <- median(h_data) - past_sd <- sd(h_data) - curr_median <- median(test_data[,2]) - - if (abs(curr_median - past_median) > n * past_sd) { - anomaly <- c(test_start, test_end, curr_median, past_median, past_sd) - anomalies <- rbind(anomalies, anomaly) - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past Median", "Past SD") - } - - return (anomalies) -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R deleted file mode 100644 index 8956400..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R +++ /dev/null @@ -1,52 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -ams_iforest <- function(url, train_start, train_end, test_start, test_end, threshold_score) { - - res <- get_data(url) - num_metrics <- length(res$metrics) - anomalies <- data.frame() - - metricname <- res$metrics[[1]]$metricname - data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) - names(data) <- c("TS", res$metrics[[1]]$metricname) - - for (i in 2:num_metrics) { - metricname <- res$metrics[[i]]$metricname - df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), as.numeric(res$metrics[[i]]$metrics)) - names(df) <- c("TS", res$metrics[[i]]$metricname) - data <- merge(data, df) - } - - algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , ][c(1:num_metrics+1)] - iForest <- IsolationTrees(algo_data) - test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ] - - if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest) - for (i in 1:length(if_res$outF)) { - index <- test_start+i-1 - if (if_res$outF[i] > threshold_score) { - anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i]) - anomalies <- rbind(anomalies, anomaly) - } - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Anomaly Score", "Path length") - } - return (anomalies) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r deleted file mode 100644 index f22bc15..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r +++ /dev/null @@ -1,38 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -ams_ks <- function(train_data, test_data, p_value) { - -# res <- get_data(url) -# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) -# names(data) <- c("TS", res$metrics[[1]]$metricname) -# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] -# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2] - - anomalies <- data.frame() - res <- ks.test(train_data[,2], test_data[,2]) - - if (res[2] < p_value) { - anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], res[2]) - anomalies <- rbind(anomalies, anomaly) - } - - if(length(anomalies) > 0) { - names(anomalies) <- c("TS Start", "TS End", "D", "p-value") - } - return (anomalies) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R deleted file mode 100644 index 7650356..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R +++ /dev/null @@ -1,85 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -tukeys_anomalies <- data.frame() -ema_global_anomalies <- data.frame() -ema_daily_anomalies <- data.frame() -ks_anomalies <- data.frame() -hsdev_anomalies <- data.frame() - -init <- function() { - tukeys_anomalies <- data.frame() - ema_global_anomalies <- data.frame() - ema_daily_anomalies <- data.frame() - ks_anomalies <- data.frame() - hsdev_anomalies <- data.frame() -} - -test_methods <- function(data) { - - init() - #res <- get_data(url) - #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) - #names(data) <- c("TS", res$metrics[[1]]$metricname) - - limit <- data[length(data[,1]),1] - step <- data[2,1] - data[1,1] - - train_start <- data[1,1] - train_end <- get_next_day_boundary(train_start, step, limit) - test_start <- train_end + step - test_end <- get_next_day_boundary(test_start, step, limit) - i <- 1 - day <- 24*60*60*1000 - - while (test_start < limit) { - - print (i) - i <- i + 1 - train_data <- data[which(data$TS >= train_start & data$TS <= train_end),] - test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] - - #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, test_data, 3)) - #ema_global_anomalies <<- rbind(ema_global_anomalies, ema_global(train_data, test_data, 0.9, 3)) - #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, test_data, 0.9, 3)) - #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05)) - hsdev_train_data <- data[which(data$TS < test_start),] - hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, test_data, 3, 3, day, 7*day)) - - train_start <- test_start - train_end <- get_next_day_boundary(train_start, step, limit) - test_start <- train_end + step - test_end <- get_next_day_boundary(test_start, step, limit) - } - return (hsdev_anomalies) -} - -get_next_day_boundary <- function(start, step, limit) { - - if (start > limit) { - return (-1) - } - - while (start <= limit) { - if (((start %% (24*60*60*1000)) - 28800000) == 0) { - return (start) - } - start <- start + step - } - return (start) -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r deleted file mode 100644 index 0312226..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r +++ /dev/null @@ -1,51 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -ams_tukeys <- function(train_data, test_data, n) { - -# res <- get_data(url) -# data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), as.numeric(res$metrics[[1]]$metrics)) -# names(data) <- c("TS", res$metrics[[1]]$metricname) -# train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2] -# test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ] - - anomalies <- data.frame() - quantiles <- quantile(train_data[,2]) - iqr <- quantiles[4] - quantiles[2] - niqr <- 0 - - for ( i in 1:length(test_data[,1])) { - x <- test_data[i,2] - lb <- quantiles[2] - n*iqr - ub <- quantiles[4] + n*iqr - if ( (x < lb) || (x > ub) ) { - if (iqr != 0) { - if (x < lb) { - niqr <- (quantiles[2] - x) / iqr - } else { - niqr <- (x - quantiles[4]) / iqr - } - } - anomaly <- c(test_data[i,1], x, niqr) - anomalies <- rbind(anomalies, anomaly) - } - } - if(length(anomalies) > 0) { - names(anomalies) <- c("TS", "Value", "niqr") - } - return (anomalies) -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties deleted file mode 100644 index ab106c4..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2011 The Apache Software Foundation -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -appIds=HOST - -collectorHost=localhost -collectorPort=6188 -collectorProtocol=http - -zkQuorum=localhost:2181 - -ambariServerHost=localhost -clusterName=c1 - -emaW=0.8 -emaN=3 -tukeysN=3 -pointInTimeTestInterval=300000 -pointInTimeTrainInterval=900000 - -ksTestInterval=600000 -ksTrainInterval=600000 -hsdevNhp=3 -hsdevInterval=1800000; - -skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime -hosts=avijayan-ad-1.openstacklocal \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java deleted file mode 100644 index d1e2b41..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.List; -import java.util.TreeMap; - -import static org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS; - -public class TestEmaTechnique { - - private static double[] ts; - private static String fullFilePath; - - @BeforeClass - public static void init() throws URISyntaxException { - - Assume.assumeTrue(System.getenv("R_HOME") != null); - ts = getTS(1000); - URL url = ClassLoader.getSystemResource("R-scripts"); - fullFilePath = new File(url.toURI()).getAbsolutePath(); - RFunctionInvoker.setScriptsDir(fullFilePath); - } - - @Test - public void testEmaInitialization() { - - EmaTechnique ema = new EmaTechnique(0.5, 3); - Assert.assertTrue(ema.getTrackedEmas().isEmpty()); - Assert.assertTrue(ema.getStartingWeight() == 0.5); - Assert.assertTrue(ema.getStartTimesSdev() == 2); - } - - @Test - public void testEma() { - EmaTechnique ema = new EmaTechnique(0.5, 3); - - long now = System.currentTimeMillis(); - - TimelineMetric metric1 = new TimelineMetric(); - metric1.setMetricName("M1"); - metric1.setHostName("H1"); - metric1.setStartTime(now - 1000); - metric1.setAppId("A1"); - metric1.setInstanceId(null); - metric1.setType("Integer"); - - //Train - TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); - for (int i = 0; i < 50; i++) { - double metric = 20000 + Math.random(); - metricValues.put(now - i * 100, metric); - } - metric1.setMetricValues(metricValues); - List<MetricAnomaly> anomalyList = ema.test(metric1); -// Assert.assertTrue(anomalyList.isEmpty()); - - metricValues = new TreeMap<Long, Double>(); - for (int i = 0; i < 50; i++) { - double metric = 20000 + Math.random(); - metricValues.put(now - i * 100, metric); - } - metric1.setMetricValues(metricValues); - anomalyList = ema.test(metric1); - Assert.assertTrue(!anomalyList.isEmpty()); - int l1 = anomalyList.size(); - - Assert.assertTrue(ema.updateModel(metric1, false, 20)); - anomalyList = ema.test(metric1); - int l2 = anomalyList.size(); - Assert.assertTrue(l2 < l1); - - Assert.assertTrue(ema.updateModel(metric1, true, 50)); - anomalyList = ema.test(metric1); - int l3 = anomalyList.size(); - Assert.assertTrue(l3 > l2 && l3 > l1); - - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java deleted file mode 100644 index 9a102a0..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet; -import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries; -import org.apache.ambari.metrics.alertservice.seriesgenerator.UniformMetricSeries; -import org.apache.commons.lang.ArrayUtils; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -public class TestRFunctionInvoker { - - private static String metricName = "TestMetric"; - private static double[] ts; - private static String fullFilePath; - - @BeforeClass - public static void init() throws URISyntaxException { - - Assume.assumeTrue(System.getenv("R_HOME") != null); - ts = getTS(1000); - URL url = ClassLoader.getSystemResource("R-scripts"); - fullFilePath = new File(url.toURI()).getAbsolutePath(); - RFunctionInvoker.setScriptsDir(fullFilePath); - } - - @Test - public void testTukeys() throws URISyntaxException { - - double[] train_ts = ArrayUtils.subarray(ts, 0, 750); - double[] train_x = getRandomData(750); - DataSeries trainData = new DataSeries(metricName, train_ts, train_x); - - double[] test_ts = ArrayUtils.subarray(ts, 750, 1000); - double[] test_x = getRandomData(250); - test_x[50] = 5.5; //Anomaly - DataSeries testData = new DataSeries(metricName, test_ts, test_x); - Map<String, String> configs = new HashMap(); - configs.put("tukeys.n", "3"); - - ResultSet rs = RFunctionInvoker.tukeys(trainData, testData, configs); - Assert.assertEquals(rs.resultset.size(), 2); - Assert.assertEquals(rs.resultset.get(1)[0], 5.5, 0.1); - - } - - public static void main(String[] args) throws URISyntaxException { - - String metricName = "TestMetric"; - double[] ts = getTS(1000); - URL url = ClassLoader.getSystemResource("R-scripts"); - String fullFilePath = new File(url.toURI()).getAbsolutePath(); - RFunctionInvoker.setScriptsDir(fullFilePath); - - double[] train_ts = ArrayUtils.subarray(ts, 0, 750); - double[] train_x = getRandomData(750); - DataSeries trainData = new DataSeries(metricName, train_ts, train_x); - - double[] test_ts = ArrayUtils.subarray(ts, 750, 1000); - double[] test_x = getRandomData(250); - test_x[50] = 5.5; //Anomaly - DataSeries testData = new DataSeries(metricName, test_ts, test_x); - ResultSet rs; - - Map<String, String> configs = new HashMap(); - - System.out.println("TUKEYS"); - configs.put("tukeys.n", "3"); - rs = RFunctionInvoker.tukeys(trainData, testData, configs); - rs.print(); - System.out.println("--------------"); - -// System.out.println("EMA Global"); -// configs.put("ema.n", "3"); -// configs.put("ema.w", "0.8"); -// rs = RFunctionInvoker.ema_global(trainData, testData, configs); -// rs.print(); -// System.out.println("--------------"); -// -// System.out.println("EMA Daily"); -// rs = RFunctionInvoker.ema_daily(trainData, testData, configs); -// rs.print(); -// System.out.println("--------------"); -// -// configs.put("ks.p_value", "0.00005"); -// System.out.println("KS Test"); -// rs = RFunctionInvoker.ksTest(trainData, testData, configs); -// rs.print(); -// System.out.println("--------------"); -// - ts = getTS(5000); - train_ts = ArrayUtils.subarray(ts, 0, 4800); - train_x = getRandomData(4800); - trainData = new DataSeries(metricName, train_ts, train_x); - test_ts = ArrayUtils.subarray(ts, 4800, 5000); - test_x = getRandomData(200); - for (int i = 0; i < 200; i++) { - test_x[i] = test_x[i] * 5; - } - testData = new DataSeries(metricName, test_ts, test_x); - configs.put("hsdev.n", "3"); - configs.put("hsdev.nhp", "3"); - configs.put("hsdev.interval", "86400000"); - configs.put("hsdev.period", "604800000"); - System.out.println("HSdev"); - rs = RFunctionInvoker.hsdev(trainData, testData, configs); - rs.print(); - System.out.println("--------------"); - - } - - static double[] getTS(int n) { - long currentTime = System.currentTimeMillis(); - double[] ts = new double[n]; - currentTime = currentTime - (currentTime % (5 * 60 * 1000)); - - for (int i = 0, j = n - 1; i < n; i++, j--) { - ts[j] = currentTime; - currentTime = currentTime - (5 * 60 * 1000); - } - return ts; - } - - static double[] getRandomData(int n) { - - UniformMetricSeries metricSeries = new UniformMetricSeries(10, 0.1,0.05, 0.6, 0.8, true); - return metricSeries.getSeries(n); - -// double[] metrics = new double[n]; -// Random random = new Random(); -// for (int i = 0; i < n; i++) { -// metrics[i] = random.nextDouble(); -// } -// return metrics; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java deleted file mode 100644 index ef0125f..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.prototype; - -import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; -import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.net.InetAddress; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.UnknownHostException; -import java.util.List; -import java.util.TreeMap; - -public class TestTukeys { - - @BeforeClass - public static void init() throws URISyntaxException { - Assume.assumeTrue(System.getenv("R_HOME") != null); - } - - @Test - public void testPointInTimeDetectionSystem() throws UnknownHostException, URISyntaxException { - - URL url = ClassLoader.getSystemResource("R-scripts"); - String fullFilePath = new File(url.toURI()).getAbsolutePath(); - RFunctionInvoker.setScriptsDir(fullFilePath); - - MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface("avijayan-ams-1.openstacklocal","http", "6188"); - - EmaTechnique ema = new EmaTechnique(0.5, 3); - long now = System.currentTimeMillis(); - - TimelineMetric metric1 = new TimelineMetric(); - metric1.setMetricName("mm9"); - metric1.setHostName(MetricsCollectorInterface.getDefaultLocalHostName()); - metric1.setStartTime(now); - metric1.setAppId("aa9"); - metric1.setInstanceId(null); - metric1.setType("Integer"); - - //Train - TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); - - //2hr data. - for (int i = 0; i < 120; i++) { - double metric = 20000 + Math.random(); - metricValues.put(now - i * 60 * 1000, metric); - } - metric1.setMetricValues(metricValues); - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.addOrMergeTimelineMetric(metric1); - - metricsCollectorInterface.emitMetrics(timelineMetrics); - - List<MetricAnomaly> anomalyList = ema.test(metric1); - metricsCollectorInterface.publish(anomalyList); -// -// PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(ema, metricsCollectorInterface, 3, 5*60*1000, 15*60*1000); -// pointInTimeADSystem.runOnce(); -// -// List<MetricAnomaly> anomalyList2 = ema.test(metric1); -// -// pointInTimeADSystem.runOnce(); -// List<MetricAnomaly> anomalyList3 = ema.test(metric1); -// -// pointInTimeADSystem.runOnce(); -// List<MetricAnomaly> anomalyList4 = ema.test(metric1); -// -// pointInTimeADSystem.runOnce(); -// List<MetricAnomaly> anomalyList5 = ema.test(metric1); -// -// pointInTimeADSystem.runOnce(); -// List<MetricAnomaly> anomalyList6 = ema.test(metric1); -// -// Assert.assertTrue(anomalyList6.size() < anomalyList.size()); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java deleted file mode 100644 index 575ea8b..0000000 --- a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.metrics.alertservice.seriesgenerator; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -public class MetricSeriesGeneratorTest { - - @Test - public void testUniformSeries() { - - UniformMetricSeries metricSeries = new UniformMetricSeries(5, 0.2, 0, 0, 0, true); - Assert.assertTrue(metricSeries.nextValue() <= 6 && metricSeries.nextValue() >= 4); - - double[] uniformSeries = MetricSeriesGeneratorFactory.createUniformSeries(50, 10, 0.2, 0.1, 0.4, 0.5, true); - Assert.assertTrue(uniformSeries.length == 50); - - for (int i = 0; i < uniformSeries.length; i++) { - double value = uniformSeries[i]; - - if (value > 10 * 1.2) { - Assert.assertTrue(value >= 10 * 1.4 && value <= 10 * 1.6); - } else { - Assert.assertTrue(value >= 10 * 0.8 && value <= 10 * 1.2); - } - } - } - - @Test - public void testNormalSeries() { - NormalMetricSeries metricSeries = new NormalMetricSeries(0, 1, 0, 0, 0, true); - Assert.assertTrue(metricSeries.nextValue() <= 3 && metricSeries.nextValue() >= -3); - } - - @Test - public void testMonotonicSeries() { - - MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(0, 0.5, 0, 0, 0, 0, true); - Assert.assertTrue(metricSeries.nextValue() == 0); - Assert.assertTrue(metricSeries.nextValue() == 0.5); - - double[] incSeries = MetricSeriesGeneratorFactory.createMonotonicSeries(20, 0, 0.5, 0, 0, 0, 0, true); - Assert.assertTrue(incSeries.length == 20); - for (int i = 0; i < incSeries.length; i++) { - Assert.assertTrue(incSeries[i] == i * 0.5); - } - } - - @Test - public void testDualBandSeries() { - double[] dualBandSeries = MetricSeriesGeneratorFactory.getDualBandSeries(30, 5, 0.2, 5, 15, 0.3, 4); - Assert.assertTrue(dualBandSeries[0] >= 4 && dualBandSeries[0] <= 6); - Assert.assertTrue(dualBandSeries[4] >= 4 && dualBandSeries[4] <= 6); - Assert.assertTrue(dualBandSeries[5] >= 10.5 && dualBandSeries[5] <= 19.5); - Assert.assertTrue(dualBandSeries[8] >= 10.5 && dualBandSeries[8] <= 19.5); - Assert.assertTrue(dualBandSeries[9] >= 4 && dualBandSeries[9] <= 6); - } - - @Test - public void testStepSeries() { - double[] stepSeries = MetricSeriesGeneratorFactory.getStepFunctionSeries(30, 10, 0, 0, 5, 5, 0.5, true); - - Assert.assertTrue(stepSeries[0] == 10); - Assert.assertTrue(stepSeries[4] == 10); - - Assert.assertTrue(stepSeries[5] == 10*1.5); - Assert.assertTrue(stepSeries[9] == 10*1.5); - - Assert.assertTrue(stepSeries[10] == 10*1.5*1.5); - Assert.assertTrue(stepSeries[14] == 10*1.5*1.5); - } - - @Test - public void testSteadySeriesWithTurbulence() { - double[] steadySeriesWithTurbulence = MetricSeriesGeneratorFactory.getSteadySeriesWithTurbulentPeriod(30, 5, 0, 1, 1, 5, 1); - - int count = 0; - for (int i = 0; i < steadySeriesWithTurbulence.length; i++) { - if (steadySeriesWithTurbulence[i] == 10) { - count++; - } - } - Assert.assertTrue(count == 5); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml new file mode 100644 index 0000000..e6e12f2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml @@ -0,0 +1,205 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ambari-metrics</artifactId> + <groupId>org.apache.ambari</groupId> + <version>2.0.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>ambari-metrics-anomaly-detector</artifactId> + <version>2.0.0.0-SNAPSHOT</version> + <properties> + <scala.version>2.10.4</scala.version> + <scala.binary.version>2.11</scala.binary.version> + </properties> + + <repositories> + <repository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </repository> + </repositories> + + <pluginRepositories> + <pluginRepository> + <id>scala-tools.org</id> + <name>Scala-Tools Maven2 Repository</name> + <url>http://scala-tools.org/repo-releases</url> + </pluginRepository> + </pluginRepositories> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <scalaVersion>${scala.version}</scalaVersion> + <args> + <arg>-target:jvm-1.5</arg> + </args> + </configuration> + </plugin> + </plugins> + </build> + <name>Ambari Metrics Anomaly Detector</name> + <packaging>jar</packaging> + + <dependencies> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + <version>2.5</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.2</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.2</version> + </dependency> + + <dependency> + <groupId>com.github.lucarosellini.rJava</groupId> + <artifactId>JRI</artifactId> + <version>0.9-7</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_2.11</artifactId> + <version>2.1.1</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.10.1.0</version> + <exclusions> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>javax.mail</groupId> + <artifactId>mail</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jmx</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>connect-json</artifactId> + <version>0.10.1.0</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_2.10</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> + <version>1.6.3</version> + </dependency> + <dependency> + <groupId>org.apache.phoenix</groupId> + <artifactId>phoenix-spark</artifactId> + <version>4.10.0-HBase-1.1</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib_2.10</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + <version>4.10</version> + </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.2.5</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>2.1.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib_${scala.binary.version}</artifactId> + <version>2.1.1</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java new file mode 100644 index 0000000..eb19857 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.common; + +import java.util.Arrays; + +public class DataSeries { + + public String seriesName; + public double[] ts; + public double[] values; + + public DataSeries(String seriesName, double[] ts, double[] values) { + this.seriesName = seriesName; + this.ts = ts; + this.values = values; + } + + @Override + public String toString() { + return seriesName + Arrays.toString(ts) + Arrays.toString(values); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java new file mode 100644 index 0000000..101b0e9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.common; + + +import java.util.ArrayList; +import java.util.List; + +public class ResultSet { + + public List<double[]> resultset = new ArrayList<>(); + + public ResultSet(List<double[]> resultset) { + this.resultset = resultset; + } + + public void print() { + System.out.println("Result : "); + if (!resultset.isEmpty()) { + for (int i = 0; i<resultset.get(0).length;i++) { + for (double[] entity : resultset) { + System.out.print(entity[i] + " "); + } + System.out.println(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java new file mode 100644 index 0000000..4ea4ac5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.common; + + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public class StatisticUtils { + + public static double mean(double[] values) { + double sum = 0; + for (double d : values) { + sum += d; + } + return sum / values.length; + } + + public static double variance(double[] values) { + double avg = mean(values); + double variance = 0; + for (double d : values) { + variance += Math.pow(d - avg, 2.0); + } + return variance; + } + + public static double sdev(double[] values, boolean useBesselsCorrection) { + double variance = variance(values); + int n = (useBesselsCorrection) ? values.length - 1 : values.length; + return Math.sqrt(variance / n); + } + + public static double median(double[] values) { + double[] clonedValues = Arrays.copyOf(values, values.length); + Arrays.sort(clonedValues); + int n = values.length; + + if (n % 2 != 0) { + return clonedValues[(n-1)/2]; + } else { + return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java new file mode 100644 index 0000000..b6b1bf5 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.alertservice.prototype.core; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +public class AmbariServerInterface implements Serializable{ + + private static final Log LOG = LogFactory.getLog(AmbariServerInterface.class); + + private String ambariServerHost; + private String clusterName; + + public AmbariServerInterface(String ambariServerHost, String clusterName) { + this.ambariServerHost = ambariServerHost; + this.clusterName = clusterName; + } + + public int getPointInTimeSensitivity() { + + String url = constructUri("http", ambariServerHost, "8080", "/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*"); + + URL obj = null; + BufferedReader in = null; + + try { + obj = new URL(url); + HttpURLConnection con = (HttpURLConnection) obj.openConnection(); + con.setRequestMethod("GET"); + + String encoded = Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8)); + con.setRequestProperty("Authorization", "Basic "+encoded); + + int responseCode = con.getResponseCode(); + LOG.info("Sending 'GET' request to URL : " + url); + LOG.info("Response Code : " + responseCode); + + in = new BufferedReader( + new InputStreamReader(con.getInputStream())); + + StringBuilder responseJsonSb = new StringBuilder(); + String line; + while ((line = in.readLine()) != null) { + responseJsonSb.append(line); + } + + JSONObject jsonObject = new JSONObject(responseJsonSb.toString()); + JSONArray array = jsonObject.getJSONArray("items"); + for(int i = 0 ; i < array.length() ; i++){ + JSONObject alertDefn = array.getJSONObject(i).getJSONObject("AlertDefinition"); + if (alertDefn.get("name") != null && alertDefn.get("name").equals("point_in_time_metrics_anomalies")) { + JSONObject sourceNode = alertDefn.getJSONObject("source"); + JSONArray params = sourceNode.getJSONArray("parameters"); + for(int j = 0 ; j < params.length() ; j++){ + JSONObject param = params.getJSONObject(j); + if (param.get("name").equals("sensitivity")) { + return param.getInt("value"); + } + } + break; + } + } + + } catch (Exception e) { + LOG.error(e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + LOG.warn(e); + } + } + } + + return -1; + } + + private String constructUri(String protocol, String host, String port, String path) { + StringBuilder sb = new StringBuilder(protocol); + sb.append("://"); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append(path); + return sb.toString(); + } + +// public static void main(String[] args) { +// AmbariServerInterface ambariServerInterface = new AmbariServerInterface(); +// ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1"); +// } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java new file mode 100644 index 0000000..2287ee3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.core; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class MetricKafkaProducer { + + Producer producer; + private static String topicName = "ambari-metrics-topic"; + + public MetricKafkaProducer(String kafkaServers) { + Properties configProperties = new Properties(); + configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); //"avijayan-ams-2.openstacklocal:6667" + configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); + configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer"); + producer = new KafkaProducer(configProperties); + } + + public void sendMetrics(TimelineMetrics timelineMetrics) throws InterruptedException, ExecutionException { + + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics); + ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode); + Future<RecordMetadata> kafkaFuture = producer.send(rec); + + System.out.println(kafkaFuture.isDone()); + System.out.println(kafkaFuture.get().topic()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java new file mode 100644 index 0000000..706c69f --- /dev/null +++ b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.alertservice.prototype.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly; +import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; +import scala.Tuple2; + +import java.util.*; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class MetricSparkConsumer { + + private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class); + private static String groupId = "ambari-metrics-group"; + private static String topicName = "ambari-metrics-topic"; + private static int numThreads = 1; + private static long pitStartTime = System.currentTimeMillis(); + private static long ksStartTime = pitStartTime; + private static long hdevStartTime = ksStartTime; + private static Set<Pattern> includeMetricPatterns = new HashSet<>(); + private static Set<String> includedHosts = new HashSet<>(); + private static Set<TrendMetric> trendMetrics = new HashSet<>(); + + public MetricSparkConsumer() { + } + + public static Properties readProperties(String propertiesFile) { + try { + Properties properties = new Properties(); + InputStream inputStream = ClassLoader.getSystemResourceAsStream(propertiesFile); + if (inputStream == null) { + inputStream = new FileInputStream(propertiesFile); + } + properties.load(inputStream); + return properties; + } catch (IOException ioEx) { + LOG.error("Error reading properties file for jmeter"); + return null; + } + } + + public static void main(String[] args) throws InterruptedException { + + if (args.length < 1) { + System.err.println("Usage: MetricSparkConsumer <input-config-file>"); + System.exit(1); + } + + Properties properties = readProperties(args[0]); + + List<String> appIds = Arrays.asList(properties.getProperty("appIds").split(",")); + + String collectorHost = properties.getProperty("collectorHost"); + String collectorPort = properties.getProperty("collectorPort"); + String collectorProtocol = properties.getProperty("collectorProtocol"); + + String zkQuorum = properties.getProperty("zkQuorum"); + + double emaW = Double.parseDouble(properties.getProperty("emaW")); + double emaN = Double.parseDouble(properties.getProperty("emaN")); + int emaThreshold = Integer.parseInt(properties.getProperty("emaThreshold")); + double tukeysN = Double.parseDouble(properties.getProperty("tukeysN")); + + long pitTestInterval = Long.parseLong(properties.getProperty("pointInTimeTestInterval")); + long pitTrainInterval = Long.parseLong(properties.getProperty("pointInTimeTrainInterval")); + + long ksTestInterval = Long.parseLong(properties.getProperty("ksTestInterval")); + long ksTrainInterval = Long.parseLong(properties.getProperty("ksTrainInterval")); + int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp")); + long hsdevInterval = Long.parseLong(properties.getProperty("hsdevInterval")); + + String ambariServerHost = properties.getProperty("ambariServerHost"); + String clusterName = properties.getProperty("clusterName"); + + String includeMetricPatternStrings = properties.getProperty("includeMetricPatterns"); + if (includeMetricPatternStrings != null && !includeMetricPatternStrings.isEmpty()) { + String[] patterns = includeMetricPatternStrings.split(","); + for (String p : patterns) { + LOG.info("Included Pattern : " + p); + includeMetricPatterns.add(Pattern.compile(p)); + } + } + + String includedHostList = properties.getProperty("hosts"); + if (includedHostList != null && !includedHostList.isEmpty()) { + String[] hosts = includedHostList.split(","); + includedHosts.addAll(Arrays.asList(hosts)); + } + + MetricsCollectorInterface metricsCollectorInterface = new MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort); + + SparkConf sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector"); + + JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); + + EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold); + PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(metricsCollectorInterface, + tukeysN, + pitTestInterval, + pitTrainInterval, + ambariServerHost, + clusterName); + + TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface, + ksTestInterval, + ksTrainInterval, + hsdevNhp); + + Broadcast<EmaTechnique> emaTechniqueBroadcast = jssc.sparkContext().broadcast(emaTechnique); + Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = jssc.sparkContext().broadcast(pointInTimeADSystem); + Broadcast<TrendADSystem> trendADSystemBroadcast = jssc.sparkContext().broadcast(trendADSystem); + Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = jssc.sparkContext().broadcast(metricsCollectorInterface); + Broadcast<Set<Pattern>> includePatternBroadcast = jssc.sparkContext().broadcast(includeMetricPatterns); + Broadcast<Set<String>> includedHostBroadcast = jssc.sparkContext().broadcast(includedHosts); + + JavaPairReceiverInputDStream<String, String> messages = + KafkaUtils.createStream(jssc, zkQuorum, groupId, Collections.singletonMap(topicName, numThreads)); + + //Convert JSON string to TimelineMetrics. + JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new Function<Tuple2<String, String>, TimelineMetrics>() { + @Override + public TimelineMetrics call(Tuple2<String, String> message) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + TimelineMetrics metrics = mapper.readValue(message._2, TimelineMetrics.class); + return metrics; + } + }); + + timelineMetricsStream.print(); + + //Group TimelineMetric by AppId. + JavaPairDStream<String, TimelineMetrics> appMetricStream = timelineMetricsStream.mapToPair( + timelineMetrics -> timelineMetrics.getMetrics().isEmpty() ? new Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), timelineMetrics) + ); + + appMetricStream.print(); + + //Filter AppIds that are not needed. + JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() { + @Override + public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) throws Exception { + return appIds.contains(appMetricTuple._1); + } + }); + + filteredAppMetricStream.print(); + + filteredAppMetricStream.foreachRDD(rdd -> { + rdd.foreach( + tuple2 -> { + long currentTime = System.currentTimeMillis(); + EmaTechnique ema = emaTechniqueBroadcast.getValue(); + if (currentTime > pitStartTime + pitTestInterval) { + LOG.info("Running Tukeys...."); + pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, currentTime); + pitStartTime = pitStartTime + pitTestInterval; + } + + if (currentTime > ksStartTime + ksTestInterval) { + LOG.info("Running KS Test...."); + trendADSystemBroadcast.getValue().runKSTest(currentTime, trendMetrics); + ksStartTime = ksStartTime + ksTestInterval; + } + + if (currentTime > hdevStartTime + hsdevInterval) { + LOG.info("Running HSdev Test...."); + trendADSystemBroadcast.getValue().runHsdevMethod(); + hdevStartTime = hdevStartTime + hsdevInterval; + } + + TimelineMetrics metrics = tuple2._2(); + for (TimelineMetric timelineMetric : metrics.getMetrics()) { + + boolean includeHost = includedHostBroadcast.getValue().contains(timelineMetric.getHostName()); + boolean includeMetric = false; + if (includeHost) { + if (includePatternBroadcast.getValue().isEmpty()) { + includeMetric = true; + } + for (Pattern p : includePatternBroadcast.getValue()) { + Matcher m = p.matcher(timelineMetric.getMetricName()); + if (m.find()) { + includeMetric = true; + } + } + } + + if (includeMetric) { + trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), + timelineMetric.getHostName())); + List<MetricAnomaly> anomalies = ema.test(timelineMetric); + metricsCollectorInterfaceBroadcast.getValue().publish(anomalies); + } + } + }); + }); + + jssc.start(); + jssc.awaitTermination(); + } +} + + + +
