http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
deleted file mode 100644
index d5beb48..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/StepFunctionMetricSeries.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.seriesgenerator;
-
-import java.util.Random;
-
-public class StepFunctionMetricSeries implements AbstractMetricSeries {
-
-  double startValue = 0.0;
-  double steadyValueDeviationPercentage = 0.0;
-  double steadyPeriodSlope = 0.5;
-  int steadyPeriodMinSize = 10;
-  int steadyPeriodMaxSize = 20;
-  double stepChangePercentage = 0.0;
-  boolean upwardStep = true;
-
-  Random random = new Random();
-
-  // y = mx + c
-  double y;
-  double m;
-  double x;
-  double c;
-  int currentStepSize;
-  int currentIndex;
-
-  public StepFunctionMetricSeries(double startValue,
-                                  double steadyValueDeviationPercentage,
-                                  double steadyPeriodSlope,
-                                  int steadyPeriodMinSize,
-                                  int steadyPeriodMaxSize,
-                                  double stepChangePercentage,
-                                  boolean upwardStep) {
-    this.startValue = startValue;
-    this.steadyValueDeviationPercentage = steadyValueDeviationPercentage;
-    this.steadyPeriodSlope = steadyPeriodSlope;
-    this.steadyPeriodMinSize = steadyPeriodMinSize;
-    this.steadyPeriodMaxSize = steadyPeriodMaxSize;
-    this.stepChangePercentage = stepChangePercentage;
-    this.upwardStep = upwardStep;
-    init();
-  }
-
-  private void init() {
-    y = startValue;
-    m = steadyPeriodSlope;
-    x = 1;
-    c = y - (m * x);
-
-    currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - 
steadyPeriodMinSize) * random.nextDouble());
-    currentIndex = 0;
-  }
-
-  @Override
-  public double nextValue() {
-
-    double value = 0.0;
-
-    if (currentIndex < currentStepSize) {
-      y = m * x + c;
-      double valueDeviationLowerLimit = y - steadyValueDeviationPercentage * y;
-      double valueDeviationHigherLimit = y + steadyValueDeviationPercentage * 
y;
-      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - 
valueDeviationLowerLimit) * random.nextDouble();
-      x++;
-      currentIndex++;
-    }
-
-    if (currentIndex == currentStepSize) {
-      currentIndex = 0;
-      currentStepSize = (int) (steadyPeriodMinSize + (steadyPeriodMaxSize - 
steadyPeriodMinSize) * random.nextDouble());
-      if (upwardStep) {
-        y = y + stepChangePercentage * y;
-      } else {
-        y = y - stepChangePercentage * y;
-      }
-      x = 1;
-      c = y - (m * x);
-    }
-
-    return value;
-  }
-
-  @Override
-  public double[] getSeries(int n) {
-    double[] series = new double[n];
-    for (int i = 0; i < n; i++) {
-      series[i] = nextValue();
-    }
-    return series;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
deleted file mode 100644
index a2b0eea..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/java/org/apache/ambari/metrics/alertservice/seriesgenerator/UniformMetricSeries.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.seriesgenerator;
-
-import java.util.Random;
-
-public class UniformMetricSeries implements AbstractMetricSeries {
-
-  double value = 0.0;
-  double deviationPercentage = 0.0;
-  double outlierProbability = 0.0;
-  double outlierDeviationLowerPercentage = 0.0;
-  double outlierDeviationHigherPercentage = 0.0;
-  boolean outliersAboveValue= true;
-
-  Random random = new Random();
-  double valueDeviationLowerLimit;
-  double valueDeviationHigherLimit;
-  double outlierLeftLowerLimit;
-  double outlierLeftHigherLimit;
-  double outlierRightLowerLimit;
-  double outlierRightUpperLimit;
-  double nonOutlierProbability;
-
-
-  public UniformMetricSeries(double value,
-                             double deviationPercentage,
-                             double outlierProbability,
-                             double outlierDeviationLowerPercentage,
-                             double outlierDeviationHigherPercentage,
-                             boolean outliersAboveValue) {
-    this.value = value;
-    this.deviationPercentage = deviationPercentage;
-    this.outlierProbability = outlierProbability;
-    this.outlierDeviationLowerPercentage = outlierDeviationLowerPercentage;
-    this.outlierDeviationHigherPercentage = outlierDeviationHigherPercentage;
-    this.outliersAboveValue = outliersAboveValue;
-    init();
-  }
-
-  private void init() {
-    valueDeviationLowerLimit = value - deviationPercentage * value;
-    valueDeviationHigherLimit = value + deviationPercentage * value;
-
-    outlierLeftLowerLimit = value - outlierDeviationHigherPercentage * value;
-    outlierLeftHigherLimit = value - outlierDeviationLowerPercentage * value;
-    outlierRightLowerLimit = value + outlierDeviationLowerPercentage * value;
-    outlierRightUpperLimit = value + outlierDeviationHigherPercentage * value;
-
-    nonOutlierProbability = 1.0 - outlierProbability;
-  }
-
-  @Override
-  public double nextValue() {
-
-    double value;
-    double probability = random.nextDouble();
-
-    if (probability <= nonOutlierProbability) {
-      value = valueDeviationLowerLimit + (valueDeviationHigherLimit - 
valueDeviationLowerLimit) * random.nextDouble();
-    } else {
-      if (!outliersAboveValue) {
-        value = outlierLeftLowerLimit + (outlierLeftHigherLimit - 
outlierLeftLowerLimit) * random.nextDouble();
-      } else {
-        value = outlierRightLowerLimit + (outlierRightUpperLimit - 
outlierRightLowerLimit) * random.nextDouble();
-      }
-    }
-    return value;
-  }
-
-  @Override
-  public double[] getSeries(int n) {
-    double[] series = new double[n];
-    for (int i = 0; i < n; i++) {
-      series[i] = nextValue();
-    }
-    return series;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R
deleted file mode 100644
index 0b66095..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/ema.R
+++ /dev/null
@@ -1,96 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#  EMA <- w * EMA + (1 - w) * x
-# EMS <- sqrt( w * EMS^2 + (1 - w) * (x - EMA)^2 )
-# Alarm = abs(x - EMA) > n * EMS
-
-ema_global <- function(train_data, test_data, w, n) {
-  
-#  res <- get_data(url)
-#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), 
as.numeric(res$metrics[[1]]$metrics))
-#  names(data) <- c("TS", res$metrics[[1]]$metricname)
-#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
-#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
-  
-  anomalies <- data.frame()
-  ema <- 0
-  ems <- 0
-
-  #Train Step
-  for (x in train_data) {
-    ema <- w*ema + (1-w)*x
-    ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
-  }
-  
-  for ( i in 1:length(test_data[,1])) {
-    x <- test_data[i,2]
-    if (abs(x - ema) > n*ems) {
-      anomaly <- c(as.numeric(test_data[i,1]), x)
-      # print (anomaly)
-      anomalies <- rbind(anomalies, anomaly)
-    }
-    ema <- w*ema + (1-w)*x
-    ems <- sqrt(w* ems^2 + (1 - w)*(x - ema)^2)
-  }
-  
-  if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS", "Value")
-  }
-  return (anomalies)
-}
-
-ema_daily <- function(train_data, test_data, w, n) {
-  
-#  res <- get_data(url)
-#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), 
as.numeric(res$metrics[[1]]$metrics))
-#  names(data) <- c("TS", res$metrics[[1]]$metricname)
-#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), ]
-#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
-  
-  anomalies <- data.frame()
-  ema <- vector("numeric", 7)
-  ems <- vector("numeric", 7)
-  
-  #Train Step
-  for ( i in 1:length(train_data[,1])) {
-    x <- train_data[i,2]
-    time <- as.POSIXlt(as.numeric(train_data[i,1])/1000, origin = "1970-01-01" 
,tz = "GMT")
-    index <- time$wday
-    ema[index] <- w*ema[index] + (1-w)*x
-    ems[index] <- sqrt(w* ems[index]^2 + (1 - w)*(x - ema[index])^2)
-  }
-  
-  for ( i in 1:length(test_data[,1])) {
-    x <- test_data[i,2]
-    time <- as.POSIXlt(as.numeric(test_data[i,1])/1000, origin = "1970-01-01" 
,tz = "GMT")
-    index <- time$wday
-    
-    if (abs(x - ema[index+1]) > n*ems[index+1]) {
-      anomaly <- c(as.numeric(test_data[i,1]), x)
-      # print (anomaly)
-      anomalies <- rbind(anomalies, anomaly)
-    }
-    ema[index+1] <- w*ema[index+1] + (1-w)*x
-    ems[index+1] <- sqrt(w* ems[index+1]^2 + (1 - w)*(x - ema[index+1])^2)
-  }
-  
-  if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS", "Value")
-  }
-  return(anomalies)
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
deleted file mode 100644
index bca3366..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/hsdev.r
+++ /dev/null
@@ -1,67 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-hsdev_daily <- function(train_data, test_data, n, num_historic_periods, 
interval, period) {
-
-  #res <- get_data(url)
-  #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), 
as.numeric(res$metrics[[1]]$metrics))
-  #names(data) <- c("TS", res$metrics[[1]]$metricname)
-  anomalies <- data.frame()
-
-  granularity <- train_data[2,1] - train_data[1,1]
-  test_start <- test_data[1,1]
-  test_end <- test_data[length(test_data[1,]),1]
-  train_start <- test_start - num_historic_periods*period
-  # round to start of day
-  train_start <- train_start - (train_start %% interval)
-
-  time <- as.POSIXlt(as.numeric(test_data[1,1])/1000, origin = "1970-01-01" 
,tz = "GMT")
-  test_data_day <- time$wday
-
-  h_data <- c()
-  for ( i in length(train_data[,1]):1) {
-    ts <- train_data[i,1]
-    if ( ts < train_start) {
-      break
-    }
-    time <- as.POSIXlt(as.numeric(ts)/1000, origin = "1970-01-01" ,tz = "GMT")
-    if (time$wday == test_data_day) {
-      x <- train_data[i,2]
-      h_data <- c(h_data, x)
-    }
-  }
-
-  if (length(h_data) < 2*length(test_data[,1])) {
-    cat ("\nNot enough training data")
-    return (anomalies)
-  }
-
-  past_median <- median(h_data)
-  past_sd <- sd(h_data)
-  curr_median <- median(test_data[,2])
-
-  if (abs(curr_median - past_median) > n * past_sd) {
-    anomaly <- c(test_start, test_end, curr_median, past_median, past_sd)
-    anomalies <- rbind(anomalies, anomaly)
-  }
-
-  if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS Start", "TS End", "Current Median", "Past 
Median", "Past SD")
-  }
-
-  return (anomalies)
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R
deleted file mode 100644
index 8956400..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/iforest.R
+++ /dev/null
@@ -1,52 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-ams_iforest <- function(url, train_start, train_end, test_start, test_end, 
threshold_score) {
-  
-  res <- get_data(url)
-  num_metrics <- length(res$metrics)
-  anomalies <- data.frame()
-  
-  metricname <- res$metrics[[1]]$metricname
-  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), 
as.numeric(res$metrics[[1]]$metrics))
-  names(data) <- c("TS", res$metrics[[1]]$metricname)
-
-  for (i in 2:num_metrics) {
-    metricname <- res$metrics[[i]]$metricname
-    df <- data.frame(as.numeric(names(res$metrics[[i]]$metrics)), 
as.numeric(res$metrics[[i]]$metrics))
-    names(df) <- c("TS", res$metrics[[i]]$metricname)
-    data <- merge(data, df)
-  }
-  
-  algo_data <- data[ which(df$TS >= train_start & df$TS <= train_end) , 
][c(1:num_metrics+1)]
-  iForest <- IsolationTrees(algo_data)
-  test_data <- data[ which(df$TS >= test_start & df$TS <= test_end) , ]
-  
-  if_res <- AnomalyScore(test_data[c(1:num_metrics+1)], iForest)
-  for (i in 1:length(if_res$outF)) {
-    index <- test_start+i-1
-    if (if_res$outF[i] > threshold_score) {
-      anomaly <- c(test_data[i,1], if_res$outF[i], if_res$pathLength[i])
-      anomalies <- rbind(anomalies, anomaly)
-    } 
-  }
-  
-  if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS", "Anomaly Score", "Path length")
-  }
-  return (anomalies)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
deleted file mode 100644
index f22bc15..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/kstest.r
+++ /dev/null
@@ -1,38 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-ams_ks <- function(train_data, test_data, p_value) {
-  
-#  res <- get_data(url)
-#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), 
as.numeric(res$metrics[[1]]$metrics))
-#  names(data) <- c("TS", res$metrics[[1]]$metricname)
-#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
-#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), 2]
-  
-  anomalies <- data.frame()
-  res <- ks.test(train_data[,2], test_data[,2])
-  
-  if (res[2] < p_value) {
-    anomaly <- c(test_data[1,1], test_data[length(test_data),1], res[1], 
res[2])
-    anomalies <- rbind(anomalies, anomaly)
-  }
- 
-  if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS Start", "TS End", "D", "p-value")
-  }
-  return (anomalies)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R
deleted file mode 100644
index 7650356..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/test.R
+++ /dev/null
@@ -1,85 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-
-tukeys_anomalies <- data.frame()
-ema_global_anomalies <- data.frame()
-ema_daily_anomalies <- data.frame()
-ks_anomalies <- data.frame()
-hsdev_anomalies <- data.frame()
-
-init <- function() {
-  tukeys_anomalies <- data.frame()
-  ema_global_anomalies <- data.frame()
-  ema_daily_anomalies <- data.frame()
-  ks_anomalies <- data.frame()
-  hsdev_anomalies <- data.frame()
-}
-
-test_methods <- function(data) {
-
-  init()
-  #res <- get_data(url)
-  #data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), 
as.numeric(res$metrics[[1]]$metrics))
-  #names(data) <- c("TS", res$metrics[[1]]$metricname)
-
-  limit <- data[length(data[,1]),1]
-  step <- data[2,1] - data[1,1]
-
-  train_start <- data[1,1]
-  train_end <- get_next_day_boundary(train_start, step, limit)
-  test_start <- train_end + step
-  test_end <- get_next_day_boundary(test_start, step, limit)
-  i <- 1
-  day <- 24*60*60*1000
-
-  while (test_start < limit) {
-
-    print (i)
-    i <- i + 1
-    train_data <- data[which(data$TS >= train_start & data$TS <= train_end),]
-    test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
-
-    #tukeys_anomalies <<- rbind(tukeys_anomalies, ams_tukeys(train_data, 
test_data, 3))
-    #ema_global_anomalies <<- rbind(ema_global_anomalies, 
ema_global(train_data, test_data, 0.9, 3))
-    #ema_daily_anomalies <<- rbind(ema_daily_anomalies, ema_daily(train_data, 
test_data, 0.9, 3))
-    #ks_anomalies <<- rbind(ks_anomalies, ams_ks(train_data, test_data, 0.05))
-    hsdev_train_data <- data[which(data$TS < test_start),]
-    hsdev_anomalies <<- rbind(hsdev_anomalies, hsdev_daily(hsdev_train_data, 
test_data, 3, 3, day, 7*day))
-
-    train_start <- test_start
-    train_end <- get_next_day_boundary(train_start, step, limit)
-    test_start <- train_end + step
-    test_end <- get_next_day_boundary(test_start, step, limit)
-  }
-  return (hsdev_anomalies)
-}
-
-get_next_day_boundary <- function(start, step, limit) {
-
-  if (start > limit) {
-    return (-1)
-  }
-
-  while (start <= limit) {
-    if (((start %% (24*60*60*1000)) - 28800000) == 0) {
-      return (start)
-    }
-    start <- start + step
-  }
-  return (start)
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
deleted file mode 100644
index 0312226..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/R-scripts/tukeys.r
+++ /dev/null
@@ -1,51 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-ams_tukeys <- function(train_data, test_data, n) {
-
-#  res <- get_data(url)
-#  data <- data.frame(as.numeric(names(res$metrics[[1]]$metrics)), 
as.numeric(res$metrics[[1]]$metrics))
-#  names(data) <- c("TS", res$metrics[[1]]$metricname)
-#  train_data <- data[which(data$TS >= train_start & data$TS <= train_end), 2]
-#  test_data <- data[which(data$TS >= test_start & data$TS <= test_end), ]
-
-  anomalies <- data.frame()
-  quantiles <- quantile(train_data[,2])
-  iqr <- quantiles[4] - quantiles[2]
-  niqr <- 0
-
-  for ( i in 1:length(test_data[,1])) {
-    x <- test_data[i,2]
-    lb <- quantiles[2] - n*iqr
-    ub <- quantiles[4] + n*iqr
-    if ( (x < lb)  || (x > ub) ) {
-      if (iqr != 0) {
-        if (x < lb) {
-          niqr <- (quantiles[2] - x) / iqr
-        } else {
-          niqr <- (x - quantiles[4]) / iqr
-        }
-      }
-        anomaly <- c(test_data[i,1], x, niqr)
-        anomalies <- rbind(anomalies, anomaly)
-      }
-  }
-  if(length(anomalies) > 0) {
-    names(anomalies) <- c("TS", "Value", "niqr")
-  }
-  return (anomalies)
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties
 
b/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties
deleted file mode 100644
index ab106c4..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/main/resources/input-config.properties
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright 2011 The Apache Software Foundation
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-appIds=HOST
-
-collectorHost=localhost
-collectorPort=6188
-collectorProtocol=http
-
-zkQuorum=localhost:2181
-
-ambariServerHost=localhost
-clusterName=c1
-
-emaW=0.8
-emaN=3
-tukeysN=3
-pointInTimeTestInterval=300000
-pointInTimeTrainInterval=900000
-
-ksTestInterval=600000
-ksTrainInterval=600000
-hsdevNhp=3
-hsdevInterval=1800000;
-
-skipMetricPatterns=sdisk*,cpu_sintr*,proc*,disk*,boottime
-hosts=avijayan-ad-1.openstacklocal
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
deleted file mode 100644
index d1e2b41..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestEmaTechnique.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype;
-
-import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
-import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.List;
-import java.util.TreeMap;
-
-import static 
org.apache.ambari.metrics.alertservice.prototype.TestRFunctionInvoker.getTS;
-
-public class TestEmaTechnique {
-
-  private static double[] ts;
-  private static String fullFilePath;
-
-  @BeforeClass
-  public static void init() throws URISyntaxException {
-
-    Assume.assumeTrue(System.getenv("R_HOME") != null);
-    ts = getTS(1000);
-    URL url = ClassLoader.getSystemResource("R-scripts");
-    fullFilePath = new File(url.toURI()).getAbsolutePath();
-    RFunctionInvoker.setScriptsDir(fullFilePath);
-  }
-
-  @Test
-  public void testEmaInitialization() {
-
-    EmaTechnique ema = new EmaTechnique(0.5, 3);
-    Assert.assertTrue(ema.getTrackedEmas().isEmpty());
-    Assert.assertTrue(ema.getStartingWeight() == 0.5);
-    Assert.assertTrue(ema.getStartTimesSdev() == 2);
-  }
-
-  @Test
-  public void testEma() {
-    EmaTechnique ema = new EmaTechnique(0.5, 3);
-
-    long now = System.currentTimeMillis();
-
-    TimelineMetric metric1 = new TimelineMetric();
-    metric1.setMetricName("M1");
-    metric1.setHostName("H1");
-    metric1.setStartTime(now - 1000);
-    metric1.setAppId("A1");
-    metric1.setInstanceId(null);
-    metric1.setType("Integer");
-
-    //Train
-    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-    for (int i = 0; i < 50; i++) {
-      double metric = 20000 + Math.random();
-      metricValues.put(now - i * 100, metric);
-    }
-    metric1.setMetricValues(metricValues);
-    List<MetricAnomaly> anomalyList = ema.test(metric1);
-//    Assert.assertTrue(anomalyList.isEmpty());
-
-    metricValues = new TreeMap<Long, Double>();
-    for (int i = 0; i < 50; i++) {
-      double metric = 20000 + Math.random();
-      metricValues.put(now - i * 100, metric);
-    }
-    metric1.setMetricValues(metricValues);
-    anomalyList = ema.test(metric1);
-    Assert.assertTrue(!anomalyList.isEmpty());
-    int l1 = anomalyList.size();
-
-    Assert.assertTrue(ema.updateModel(metric1, false, 20));
-    anomalyList = ema.test(metric1);
-    int l2 = anomalyList.size();
-    Assert.assertTrue(l2 < l1);
-
-    Assert.assertTrue(ema.updateModel(metric1, true, 50));
-    anomalyList = ema.test(metric1);
-    int l3 = anomalyList.size();
-    Assert.assertTrue(l3 > l2 && l3 > l1);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
deleted file mode 100644
index 9a102a0..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype;
-
-import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
-import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
-import 
org.apache.ambari.metrics.alertservice.seriesgenerator.UniformMetricSeries;
-import org.apache.commons.lang.ArrayUtils;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-public class TestRFunctionInvoker {
-
-  private static String metricName = "TestMetric";
-  private static double[] ts;
-  private static String fullFilePath;
-
-  @BeforeClass
-  public static void init() throws URISyntaxException {
-
-    Assume.assumeTrue(System.getenv("R_HOME") != null);
-    ts = getTS(1000);
-    URL url = ClassLoader.getSystemResource("R-scripts");
-    fullFilePath = new File(url.toURI()).getAbsolutePath();
-    RFunctionInvoker.setScriptsDir(fullFilePath);
-  }
-
-  @Test
-  public void testTukeys() throws URISyntaxException {
-
-    double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
-    double[] train_x = getRandomData(750);
-    DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
-
-    double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
-    double[] test_x = getRandomData(250);
-    test_x[50] = 5.5; //Anomaly
-    DataSeries testData = new DataSeries(metricName, test_ts, test_x);
-    Map<String, String> configs = new HashMap();
-    configs.put("tukeys.n", "3");
-
-    ResultSet rs = RFunctionInvoker.tukeys(trainData, testData, configs);
-    Assert.assertEquals(rs.resultset.size(), 2);
-    Assert.assertEquals(rs.resultset.get(1)[0], 5.5, 0.1);
-
-  }
-
-  public static void main(String[] args) throws URISyntaxException {
-
-    String metricName = "TestMetric";
-    double[] ts = getTS(1000);
-    URL url = ClassLoader.getSystemResource("R-scripts");
-    String fullFilePath = new File(url.toURI()).getAbsolutePath();
-    RFunctionInvoker.setScriptsDir(fullFilePath);
-
-    double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
-    double[] train_x = getRandomData(750);
-    DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
-
-    double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
-    double[] test_x = getRandomData(250);
-    test_x[50] = 5.5; //Anomaly
-    DataSeries testData = new DataSeries(metricName, test_ts, test_x);
-    ResultSet rs;
-
-    Map<String, String> configs = new HashMap();
-
-    System.out.println("TUKEYS");
-    configs.put("tukeys.n", "3");
-    rs = RFunctionInvoker.tukeys(trainData, testData, configs);
-    rs.print();
-    System.out.println("--------------");
-
-//    System.out.println("EMA Global");
-//    configs.put("ema.n", "3");
-//    configs.put("ema.w", "0.8");
-//    rs = RFunctionInvoker.ema_global(trainData, testData, configs);
-//    rs.print();
-//    System.out.println("--------------");
-//
-//    System.out.println("EMA Daily");
-//    rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
-//    rs.print();
-//    System.out.println("--------------");
-//
-//    configs.put("ks.p_value", "0.00005");
-//    System.out.println("KS Test");
-//    rs = RFunctionInvoker.ksTest(trainData, testData, configs);
-//    rs.print();
-//    System.out.println("--------------");
-//
-    ts = getTS(5000);
-    train_ts = ArrayUtils.subarray(ts, 0, 4800);
-    train_x = getRandomData(4800);
-    trainData = new DataSeries(metricName, train_ts, train_x);
-    test_ts = ArrayUtils.subarray(ts, 4800, 5000);
-    test_x = getRandomData(200);
-    for (int i = 0; i < 200; i++) {
-      test_x[i] = test_x[i] * 5;
-    }
-    testData = new DataSeries(metricName, test_ts, test_x);
-    configs.put("hsdev.n", "3");
-    configs.put("hsdev.nhp", "3");
-    configs.put("hsdev.interval", "86400000");
-    configs.put("hsdev.period", "604800000");
-    System.out.println("HSdev");
-    rs = RFunctionInvoker.hsdev(trainData, testData, configs);
-    rs.print();
-    System.out.println("--------------");
-
-  }
-
-  static double[] getTS(int n) {
-    long currentTime = System.currentTimeMillis();
-    double[] ts = new double[n];
-    currentTime = currentTime - (currentTime % (5 * 60 * 1000));
-
-    for (int i = 0, j = n - 1; i < n; i++, j--) {
-      ts[j] = currentTime;
-      currentTime = currentTime - (5 * 60 * 1000);
-    }
-    return ts;
-  }
-
-  static double[] getRandomData(int n) {
-
-    UniformMetricSeries metricSeries =  new UniformMetricSeries(10, 0.1,0.05, 
0.6, 0.8, true);
-    return metricSeries.getSeries(n);
-
-//    double[] metrics = new double[n];
-//    Random random = new Random();
-//    for (int i = 0; i < n; i++) {
-//      metrics[i] = random.nextDouble();
-//    }
-//    return metrics;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
deleted file mode 100644
index ef0125f..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype;
-
-import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
-import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.Assume;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.List;
-import java.util.TreeMap;
-
-public class TestTukeys {
-
-  @BeforeClass
-  public static void init() throws URISyntaxException {
-    Assume.assumeTrue(System.getenv("R_HOME") != null);
-  }
-
-  @Test
-  public void testPointInTimeDetectionSystem() throws UnknownHostException, 
URISyntaxException {
-
-    URL url = ClassLoader.getSystemResource("R-scripts");
-    String fullFilePath = new File(url.toURI()).getAbsolutePath();
-    RFunctionInvoker.setScriptsDir(fullFilePath);
-
-    MetricsCollectorInterface metricsCollectorInterface = new 
MetricsCollectorInterface("avijayan-ams-1.openstacklocal","http", "6188");
-
-    EmaTechnique ema = new EmaTechnique(0.5, 3);
-    long now = System.currentTimeMillis();
-
-    TimelineMetric metric1 = new TimelineMetric();
-    metric1.setMetricName("mm9");
-    metric1.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
-    metric1.setStartTime(now);
-    metric1.setAppId("aa9");
-    metric1.setInstanceId(null);
-    metric1.setType("Integer");
-
-    //Train
-    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-
-    //2hr data.
-    for (int i = 0; i < 120; i++) {
-      double metric = 20000 + Math.random();
-      metricValues.put(now - i * 60 * 1000, metric);
-    }
-    metric1.setMetricValues(metricValues);
-    TimelineMetrics timelineMetrics = new TimelineMetrics();
-    timelineMetrics.addOrMergeTimelineMetric(metric1);
-
-    metricsCollectorInterface.emitMetrics(timelineMetrics);
-
-    List<MetricAnomaly> anomalyList = ema.test(metric1);
-    metricsCollectorInterface.publish(anomalyList);
-//
-//    PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(ema, 
metricsCollectorInterface, 3, 5*60*1000, 15*60*1000);
-//    pointInTimeADSystem.runOnce();
-//
-//    List<MetricAnomaly> anomalyList2 = ema.test(metric1);
-//
-//    pointInTimeADSystem.runOnce();
-//    List<MetricAnomaly> anomalyList3 = ema.test(metric1);
-//
-//    pointInTimeADSystem.runOnce();
-//    List<MetricAnomaly> anomalyList4 = ema.test(metric1);
-//
-//    pointInTimeADSystem.runOnce();
-//    List<MetricAnomaly> anomalyList5 = ema.test(metric1);
-//
-//    pointInTimeADSystem.runOnce();
-//    List<MetricAnomaly> anomalyList6 = ema.test(metric1);
-//
-//    Assert.assertTrue(anomalyList6.size() < anomalyList.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
deleted file mode 100644
index 575ea8b..0000000
--- 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.seriesgenerator;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import 
org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MetricSeriesGeneratorTest {
-
-  @Test
-  public void testUniformSeries() {
-
-    UniformMetricSeries metricSeries = new UniformMetricSeries(5, 0.2, 0, 0, 
0, true);
-    Assert.assertTrue(metricSeries.nextValue() <= 6 && 
metricSeries.nextValue() >= 4);
-
-    double[] uniformSeries = 
MetricSeriesGeneratorFactory.createUniformSeries(50, 10, 0.2, 0.1, 0.4, 0.5, 
true);
-    Assert.assertTrue(uniformSeries.length == 50);
-
-    for (int i = 0; i < uniformSeries.length; i++) {
-      double value = uniformSeries[i];
-
-      if (value > 10 * 1.2) {
-        Assert.assertTrue(value >= 10 * 1.4 && value <= 10 * 1.6);
-      } else {
-        Assert.assertTrue(value >= 10 * 0.8 && value <= 10 * 1.2);
-      }
-    }
-  }
-
-  @Test
-  public void testNormalSeries() {
-    NormalMetricSeries metricSeries = new NormalMetricSeries(0, 1, 0, 0, 0, 
true);
-    Assert.assertTrue(metricSeries.nextValue() <= 3 && 
metricSeries.nextValue() >= -3);
-  }
-
-  @Test
-  public void testMonotonicSeries() {
-
-    MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(0, 0.5, 0, 
0, 0, 0, true);
-    Assert.assertTrue(metricSeries.nextValue() == 0);
-    Assert.assertTrue(metricSeries.nextValue() == 0.5);
-
-    double[] incSeries = 
MetricSeriesGeneratorFactory.createMonotonicSeries(20, 0, 0.5, 0, 0, 0, 0, 
true);
-    Assert.assertTrue(incSeries.length == 20);
-    for (int i = 0; i < incSeries.length; i++) {
-      Assert.assertTrue(incSeries[i] == i * 0.5);
-    }
-  }
-
-  @Test
-  public void testDualBandSeries() {
-    double[] dualBandSeries = 
MetricSeriesGeneratorFactory.getDualBandSeries(30, 5, 0.2, 5, 15, 0.3, 4);
-    Assert.assertTrue(dualBandSeries[0] >= 4 && dualBandSeries[0] <= 6);
-    Assert.assertTrue(dualBandSeries[4] >= 4 && dualBandSeries[4] <= 6);
-    Assert.assertTrue(dualBandSeries[5] >= 10.5 && dualBandSeries[5] <= 19.5);
-    Assert.assertTrue(dualBandSeries[8] >= 10.5 && dualBandSeries[8] <= 19.5);
-    Assert.assertTrue(dualBandSeries[9] >= 4 && dualBandSeries[9] <= 6);
-  }
-
-  @Test
-  public void testStepSeries() {
-    double[] stepSeries = 
MetricSeriesGeneratorFactory.getStepFunctionSeries(30, 10, 0, 0, 5, 5, 0.5, 
true);
-
-    Assert.assertTrue(stepSeries[0] == 10);
-    Assert.assertTrue(stepSeries[4] == 10);
-
-    Assert.assertTrue(stepSeries[5] == 10*1.5);
-    Assert.assertTrue(stepSeries[9] == 10*1.5);
-
-    Assert.assertTrue(stepSeries[10] == 10*1.5*1.5);
-    Assert.assertTrue(stepSeries[14] == 10*1.5*1.5);
-  }
-
-  @Test
-  public void testSteadySeriesWithTurbulence() {
-    double[] steadySeriesWithTurbulence = 
MetricSeriesGeneratorFactory.getSteadySeriesWithTurbulentPeriod(30, 5, 0, 1, 1, 
5, 1);
-
-    int count = 0;
-    for (int i = 0; i < steadySeriesWithTurbulence.length; i++) {
-      if (steadySeriesWithTurbulence[i] == 10) {
-        count++;
-      }
-    }
-    Assert.assertTrue(count == 5);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml 
b/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml
new file mode 100644
index 0000000..e6e12f2
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detector/pom.xml
@@ -0,0 +1,205 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>ambari-metrics</artifactId>
+        <groupId>org.apache.ambari</groupId>
+        <version>2.0.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>ambari-metrics-anomaly-detector</artifactId>
+    <version>2.0.0.0-SNAPSHOT</version>
+    <properties>
+        <scala.version>2.10.4</scala.version>
+        <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>scala-tools.org</id>
+            <name>Scala-Tools Maven2 Repository</name>
+            <url>http://scala-tools.org/repo-releases</url>
+        </repository>
+    </repositories>
+
+    <pluginRepositories>
+        <pluginRepository>
+            <id>scala-tools.org</id>
+            <name>Scala-Tools Maven2 Repository</name>
+            <url>http://scala-tools.org/repo-releases</url>
+        </pluginRepository>
+    </pluginRepositories>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <scalaVersion>${scala.version}</scalaVersion>
+                    <args>
+                        <arg>-target:jvm-1.5</arg>
+                    </args>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <name>Ambari Metrics Anomaly Detector</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>2.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.lucarosellini.rJava</groupId>
+            <artifactId>JRI</artifactId>
+            <version>0.9-7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.11</artifactId>
+            <version>2.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.10.1.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.mail</groupId>
+                    <artifactId>mail</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jmx</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-json</artifactId>
+            <version>0.10.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka_2.10</artifactId>
+            <version>1.6.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <version>1.6.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.phoenix</groupId>
+            <artifactId>phoenix-spark</artifactId>
+            <version>4.10.0-HBase-1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_2.10</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+            <version>4.10</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ambari</groupId>
+            <artifactId>ambari-metrics-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>2.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
+            <version>2.1.1</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
new file mode 100644
index 0000000..eb19857
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/DataSeries.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.common;
+
+import java.util.Arrays;
+
+public class DataSeries {
+
+    public String seriesName;
+    public double[] ts;
+    public double[] values;
+
+    public DataSeries(String seriesName, double[] ts, double[] values) {
+        this.seriesName = seriesName;
+        this.ts = ts;
+        this.values = values;
+    }
+
+    @Override
+    public String toString() {
+        return seriesName + Arrays.toString(ts) + Arrays.toString(values);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
new file mode 100644
index 0000000..101b0e9
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/ResultSet.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.common;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ResultSet {
+
+    public List<double[]> resultset = new ArrayList<>();
+
+    public ResultSet(List<double[]> resultset) {
+        this.resultset = resultset;
+    }
+
+    public void print() {
+        System.out.println("Result : ");
+        if (!resultset.isEmpty()) {
+            for (int i = 0; i<resultset.get(0).length;i++) {
+                for (double[] entity : resultset) {
+                    System.out.print(entity[i] + " ");
+                }
+                System.out.println();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
new file mode 100644
index 0000000..4ea4ac5
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/common/StatisticUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.common;
+
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public class StatisticUtils {
+
+  public static double mean(double[] values) {
+    double sum = 0;
+    for (double d : values) {
+      sum += d;
+    }
+    return sum / values.length;
+  }
+
+  public static double variance(double[] values) {
+    double avg =  mean(values);
+    double variance = 0;
+    for (double d : values) {
+      variance += Math.pow(d - avg, 2.0);
+    }
+    return variance;
+  }
+
+  public static double sdev(double[]  values, boolean useBesselsCorrection) {
+    double variance = variance(values);
+    int n = (useBesselsCorrection) ? values.length - 1 : values.length;
+    return Math.sqrt(variance / n);
+  }
+
+  public static double median(double[] values) {
+    double[] clonedValues = Arrays.copyOf(values, values.length);
+    Arrays.sort(clonedValues);
+    int n = values.length;
+
+    if (n % 2 != 0) {
+      return clonedValues[(n-1)/2];
+    } else {
+      return ( clonedValues[(n-1)/2] + clonedValues[n/2] ) / 2;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java
new file mode 100644
index 0000000..b6b1bf5
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/AmbariServerInterface.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.alertservice.prototype.core;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+public class AmbariServerInterface implements Serializable{
+
+  private static final Log LOG = 
LogFactory.getLog(AmbariServerInterface.class);
+
+  private String ambariServerHost;
+  private String clusterName;
+
+  public AmbariServerInterface(String ambariServerHost, String clusterName) {
+    this.ambariServerHost = ambariServerHost;
+    this.clusterName = clusterName;
+  }
+
+  public int getPointInTimeSensitivity() {
+
+    String url = constructUri("http", ambariServerHost, "8080", 
"/api/v1/clusters/" + clusterName + "/alert_definitions?fields=*");
+
+    URL obj = null;
+    BufferedReader in = null;
+
+    try {
+      obj = new URL(url);
+      HttpURLConnection con = (HttpURLConnection) obj.openConnection();
+      con.setRequestMethod("GET");
+
+      String encoded = 
Base64.getEncoder().encodeToString(("admin:admin").getBytes(StandardCharsets.UTF_8));
+      con.setRequestProperty("Authorization", "Basic "+encoded);
+
+      int responseCode = con.getResponseCode();
+      LOG.info("Sending 'GET' request to URL : " + url);
+      LOG.info("Response Code : " + responseCode);
+
+      in = new BufferedReader(
+        new InputStreamReader(con.getInputStream()));
+
+      StringBuilder responseJsonSb = new StringBuilder();
+      String line;
+      while ((line = in.readLine()) != null) {
+        responseJsonSb.append(line);
+      }
+
+      JSONObject jsonObject = new JSONObject(responseJsonSb.toString());
+      JSONArray array = jsonObject.getJSONArray("items");
+      for(int i = 0 ; i < array.length() ; i++){
+        JSONObject alertDefn = 
array.getJSONObject(i).getJSONObject("AlertDefinition");
+        if (alertDefn.get("name") != null && 
alertDefn.get("name").equals("point_in_time_metrics_anomalies")) {
+          JSONObject sourceNode = alertDefn.getJSONObject("source");
+          JSONArray params = sourceNode.getJSONArray("parameters");
+          for(int j = 0 ; j < params.length() ; j++){
+            JSONObject param = params.getJSONObject(j);
+            if (param.get("name").equals("sensitivity")) {
+              return param.getInt("value");
+            }
+          }
+          break;
+        }
+      }
+
+    } catch (Exception e) {
+      LOG.error(e);
+    } finally {
+      if (in != null) {
+        try {
+          in.close();
+        } catch (IOException e) {
+          LOG.warn(e);
+        }
+      }
+    }
+
+    return -1;
+  }
+
+  private String constructUri(String protocol, String host, String port, 
String path) {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(path);
+    return sb.toString();
+  }
+
+//  public static void main(String[] args) {
+//    AmbariServerInterface ambariServerInterface = new 
AmbariServerInterface();
+//    
ambariServerInterface.getPointInTimeSensitivity("avijayan-ams-1.openstacklocal","c1");
+//  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java
new file mode 100644
index 0000000..2287ee3
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricKafkaProducer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.core;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class MetricKafkaProducer {
+
+    Producer producer;
+    private static String topicName = "ambari-metrics-topic";
+
+    public MetricKafkaProducer(String kafkaServers) {
+        Properties configProperties = new Properties();
+        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaServers); //"avijayan-ams-2.openstacklocal:6667"
+        
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+        
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+        producer = new KafkaProducer(configProperties);
+    }
+
+    public void sendMetrics(TimelineMetrics timelineMetrics) throws 
InterruptedException, ExecutionException {
+
+        ObjectMapper objectMapper = new ObjectMapper();
+        JsonNode jsonNode = objectMapper.valueToTree(timelineMetrics);
+        ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, 
JsonNode>(topicName,jsonNode);
+        Future<RecordMetadata> kafkaFuture =  producer.send(rec);
+
+        System.out.println(kafkaFuture.isDone());
+        System.out.println(kafkaFuture.get().topic());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e33b5455/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java
new file mode 100644
index 0000000..706c69f
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype.core;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import scala.Tuple2;
+
+import java.util.*;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class MetricSparkConsumer {
+
+  private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class);
+  private static String groupId = "ambari-metrics-group";
+  private static String topicName = "ambari-metrics-topic";
+  private static int numThreads = 1;
+  private static long pitStartTime = System.currentTimeMillis();
+  private static long ksStartTime = pitStartTime;
+  private static long hdevStartTime = ksStartTime;
+  private static Set<Pattern> includeMetricPatterns = new HashSet<>();
+  private static Set<String> includedHosts = new HashSet<>();
+  private static Set<TrendMetric> trendMetrics = new HashSet<>();
+
+  public MetricSparkConsumer() {
+  }
+
+  public static Properties readProperties(String propertiesFile) {
+    try {
+      Properties properties = new Properties();
+      InputStream inputStream = 
ClassLoader.getSystemResourceAsStream(propertiesFile);
+      if (inputStream == null) {
+        inputStream = new FileInputStream(propertiesFile);
+      }
+      properties.load(inputStream);
+      return properties;
+    } catch (IOException ioEx) {
+      LOG.error("Error reading properties file for jmeter");
+      return null;
+    }
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+
+    if (args.length < 1) {
+      System.err.println("Usage: MetricSparkConsumer <input-config-file>");
+      System.exit(1);
+    }
+
+    Properties properties = readProperties(args[0]);
+
+    List<String> appIds = 
Arrays.asList(properties.getProperty("appIds").split(","));
+
+    String collectorHost = properties.getProperty("collectorHost");
+    String collectorPort = properties.getProperty("collectorPort");
+    String collectorProtocol = properties.getProperty("collectorProtocol");
+
+    String zkQuorum = properties.getProperty("zkQuorum");
+
+    double emaW = Double.parseDouble(properties.getProperty("emaW"));
+    double emaN = Double.parseDouble(properties.getProperty("emaN"));
+    int emaThreshold = 
Integer.parseInt(properties.getProperty("emaThreshold"));
+    double tukeysN = Double.parseDouble(properties.getProperty("tukeysN"));
+
+    long pitTestInterval = 
Long.parseLong(properties.getProperty("pointInTimeTestInterval"));
+    long pitTrainInterval = 
Long.parseLong(properties.getProperty("pointInTimeTrainInterval"));
+
+    long ksTestInterval = 
Long.parseLong(properties.getProperty("ksTestInterval"));
+    long ksTrainInterval = 
Long.parseLong(properties.getProperty("ksTrainInterval"));
+    int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp"));
+    long hsdevInterval = 
Long.parseLong(properties.getProperty("hsdevInterval"));
+
+    String ambariServerHost = properties.getProperty("ambariServerHost");
+    String clusterName = properties.getProperty("clusterName");
+
+    String includeMetricPatternStrings = 
properties.getProperty("includeMetricPatterns");
+    if (includeMetricPatternStrings != null && 
!includeMetricPatternStrings.isEmpty()) {
+      String[] patterns = includeMetricPatternStrings.split(",");
+      for (String p : patterns) {
+        LOG.info("Included Pattern : " + p);
+        includeMetricPatterns.add(Pattern.compile(p));
+      }
+    }
+
+    String includedHostList = properties.getProperty("hosts");
+    if (includedHostList != null && !includedHostList.isEmpty()) {
+      String[] hosts = includedHostList.split(",");
+      includedHosts.addAll(Arrays.asList(hosts));
+    }
+
+    MetricsCollectorInterface metricsCollectorInterface = new 
MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
+
+    SparkConf sparkConf = new 
SparkConf().setAppName("AmbariMetricsAnomalyDetector");
+
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(10000));
+
+    EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold);
+    PointInTimeADSystem pointInTimeADSystem = new 
PointInTimeADSystem(metricsCollectorInterface,
+      tukeysN,
+      pitTestInterval,
+      pitTrainInterval,
+      ambariServerHost,
+      clusterName);
+
+    TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
+      ksTestInterval,
+      ksTrainInterval,
+      hsdevNhp);
+
+    Broadcast<EmaTechnique> emaTechniqueBroadcast = 
jssc.sparkContext().broadcast(emaTechnique);
+    Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = 
jssc.sparkContext().broadcast(pointInTimeADSystem);
+    Broadcast<TrendADSystem> trendADSystemBroadcast = 
jssc.sparkContext().broadcast(trendADSystem);
+    Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = 
jssc.sparkContext().broadcast(metricsCollectorInterface);
+    Broadcast<Set<Pattern>> includePatternBroadcast = 
jssc.sparkContext().broadcast(includeMetricPatterns);
+    Broadcast<Set<String>> includedHostBroadcast = 
jssc.sparkContext().broadcast(includedHosts);
+
+    JavaPairReceiverInputDStream<String, String> messages =
+      KafkaUtils.createStream(jssc, zkQuorum, groupId, 
Collections.singletonMap(topicName, numThreads));
+
+    //Convert JSON string to TimelineMetrics.
+    JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new 
Function<Tuple2<String, String>, TimelineMetrics>() {
+      @Override
+      public TimelineMetrics call(Tuple2<String, String> message) throws 
Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        TimelineMetrics metrics = mapper.readValue(message._2, 
TimelineMetrics.class);
+        return metrics;
+      }
+    });
+
+    timelineMetricsStream.print();
+
+    //Group TimelineMetric by AppId.
+    JavaPairDStream<String, TimelineMetrics> appMetricStream = 
timelineMetricsStream.mapToPair(
+      timelineMetrics -> timelineMetrics.getMetrics().isEmpty()  ?  new 
Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, 
TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), 
timelineMetrics)
+    );
+
+    appMetricStream.print();
+
+    //Filter AppIds that are not needed.
+    JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = 
appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() 
{
+      @Override
+      public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) 
throws Exception {
+        return appIds.contains(appMetricTuple._1);
+      }
+    });
+
+    filteredAppMetricStream.print();
+
+    filteredAppMetricStream.foreachRDD(rdd -> {
+      rdd.foreach(
+        tuple2 -> {
+          long currentTime = System.currentTimeMillis();
+          EmaTechnique ema = emaTechniqueBroadcast.getValue();
+          if (currentTime > pitStartTime + pitTestInterval) {
+            LOG.info("Running Tukeys....");
+            pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, 
currentTime);
+            pitStartTime = pitStartTime + pitTestInterval;
+          }
+
+          if (currentTime > ksStartTime + ksTestInterval) {
+            LOG.info("Running KS Test....");
+            trendADSystemBroadcast.getValue().runKSTest(currentTime, 
trendMetrics);
+            ksStartTime = ksStartTime + ksTestInterval;
+          }
+
+          if (currentTime > hdevStartTime + hsdevInterval) {
+            LOG.info("Running HSdev Test....");
+            trendADSystemBroadcast.getValue().runHsdevMethod();
+            hdevStartTime = hdevStartTime + hsdevInterval;
+          }
+
+          TimelineMetrics metrics = tuple2._2();
+          for (TimelineMetric timelineMetric : metrics.getMetrics()) {
+
+            boolean includeHost = 
includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
+            boolean includeMetric = false;
+            if (includeHost) {
+              if (includePatternBroadcast.getValue().isEmpty()) {
+                includeMetric = true;
+              }
+              for (Pattern p : includePatternBroadcast.getValue()) {
+                Matcher m = p.matcher(timelineMetric.getMetricName());
+                if (m.find()) {
+                  includeMetric = true;
+                }
+              }
+            }
+
+            if (includeMetric) {
+              trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), 
timelineMetric.getAppId(),
+                timelineMetric.getHostName()));
+              List<MetricAnomaly> anomalies = ema.test(timelineMetric);
+              metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
+            }
+          }
+        });
+    });
+
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}
+
+
+
+

Reply via email to