http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java
deleted file mode 100644
index 706c69f..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricSparkConsumer.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.core;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
-import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import scala.Tuple2;
-
-import java.util.*;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class MetricSparkConsumer {
-
-  private static final Log LOG = LogFactory.getLog(MetricSparkConsumer.class);
-  private static String groupId = "ambari-metrics-group";
-  private static String topicName = "ambari-metrics-topic";
-  private static int numThreads = 1;
-  private static long pitStartTime = System.currentTimeMillis();
-  private static long ksStartTime = pitStartTime;
-  private static long hdevStartTime = ksStartTime;
-  private static Set<Pattern> includeMetricPatterns = new HashSet<>();
-  private static Set<String> includedHosts = new HashSet<>();
-  private static Set<TrendMetric> trendMetrics = new HashSet<>();
-
-  public MetricSparkConsumer() {
-  }
-
-  public static Properties readProperties(String propertiesFile) {
-    try {
-      Properties properties = new Properties();
-      InputStream inputStream = 
ClassLoader.getSystemResourceAsStream(propertiesFile);
-      if (inputStream == null) {
-        inputStream = new FileInputStream(propertiesFile);
-      }
-      properties.load(inputStream);
-      return properties;
-    } catch (IOException ioEx) {
-      LOG.error("Error reading properties file for jmeter");
-      return null;
-    }
-  }
-
-  public static void main(String[] args) throws InterruptedException {
-
-    if (args.length < 1) {
-      System.err.println("Usage: MetricSparkConsumer <input-config-file>");
-      System.exit(1);
-    }
-
-    Properties properties = readProperties(args[0]);
-
-    List<String> appIds = 
Arrays.asList(properties.getProperty("appIds").split(","));
-
-    String collectorHost = properties.getProperty("collectorHost");
-    String collectorPort = properties.getProperty("collectorPort");
-    String collectorProtocol = properties.getProperty("collectorProtocol");
-
-    String zkQuorum = properties.getProperty("zkQuorum");
-
-    double emaW = Double.parseDouble(properties.getProperty("emaW"));
-    double emaN = Double.parseDouble(properties.getProperty("emaN"));
-    int emaThreshold = 
Integer.parseInt(properties.getProperty("emaThreshold"));
-    double tukeysN = Double.parseDouble(properties.getProperty("tukeysN"));
-
-    long pitTestInterval = 
Long.parseLong(properties.getProperty("pointInTimeTestInterval"));
-    long pitTrainInterval = 
Long.parseLong(properties.getProperty("pointInTimeTrainInterval"));
-
-    long ksTestInterval = 
Long.parseLong(properties.getProperty("ksTestInterval"));
-    long ksTrainInterval = 
Long.parseLong(properties.getProperty("ksTrainInterval"));
-    int hsdevNhp = Integer.parseInt(properties.getProperty("hsdevNhp"));
-    long hsdevInterval = 
Long.parseLong(properties.getProperty("hsdevInterval"));
-
-    String ambariServerHost = properties.getProperty("ambariServerHost");
-    String clusterName = properties.getProperty("clusterName");
-
-    String includeMetricPatternStrings = 
properties.getProperty("includeMetricPatterns");
-    if (includeMetricPatternStrings != null && 
!includeMetricPatternStrings.isEmpty()) {
-      String[] patterns = includeMetricPatternStrings.split(",");
-      for (String p : patterns) {
-        LOG.info("Included Pattern : " + p);
-        includeMetricPatterns.add(Pattern.compile(p));
-      }
-    }
-
-    String includedHostList = properties.getProperty("hosts");
-    if (includedHostList != null && !includedHostList.isEmpty()) {
-      String[] hosts = includedHostList.split(",");
-      includedHosts.addAll(Arrays.asList(hosts));
-    }
-
-    MetricsCollectorInterface metricsCollectorInterface = new 
MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort);
-
-    SparkConf sparkConf = new 
SparkConf().setAppName("AmbariMetricsAnomalyDetector");
-
-    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(10000));
-
-    EmaTechnique emaTechnique = new EmaTechnique(emaW, emaN, emaThreshold);
-    PointInTimeADSystem pointInTimeADSystem = new 
PointInTimeADSystem(metricsCollectorInterface,
-      tukeysN,
-      pitTestInterval,
-      pitTrainInterval,
-      ambariServerHost,
-      clusterName);
-
-    TrendADSystem trendADSystem = new TrendADSystem(metricsCollectorInterface,
-      ksTestInterval,
-      ksTrainInterval,
-      hsdevNhp);
-
-    Broadcast<EmaTechnique> emaTechniqueBroadcast = 
jssc.sparkContext().broadcast(emaTechnique);
-    Broadcast<PointInTimeADSystem> pointInTimeADSystemBroadcast = 
jssc.sparkContext().broadcast(pointInTimeADSystem);
-    Broadcast<TrendADSystem> trendADSystemBroadcast = 
jssc.sparkContext().broadcast(trendADSystem);
-    Broadcast<MetricsCollectorInterface> metricsCollectorInterfaceBroadcast = 
jssc.sparkContext().broadcast(metricsCollectorInterface);
-    Broadcast<Set<Pattern>> includePatternBroadcast = 
jssc.sparkContext().broadcast(includeMetricPatterns);
-    Broadcast<Set<String>> includedHostBroadcast = 
jssc.sparkContext().broadcast(includedHosts);
-
-    JavaPairReceiverInputDStream<String, String> messages =
-      KafkaUtils.createStream(jssc, zkQuorum, groupId, 
Collections.singletonMap(topicName, numThreads));
-
-    //Convert JSON string to TimelineMetrics.
-    JavaDStream<TimelineMetrics> timelineMetricsStream = messages.map(new 
Function<Tuple2<String, String>, TimelineMetrics>() {
-      @Override
-      public TimelineMetrics call(Tuple2<String, String> message) throws 
Exception {
-        ObjectMapper mapper = new ObjectMapper();
-        TimelineMetrics metrics = mapper.readValue(message._2, 
TimelineMetrics.class);
-        return metrics;
-      }
-    });
-
-    timelineMetricsStream.print();
-
-    //Group TimelineMetric by AppId.
-    JavaPairDStream<String, TimelineMetrics> appMetricStream = 
timelineMetricsStream.mapToPair(
-      timelineMetrics -> timelineMetrics.getMetrics().isEmpty()  ?  new 
Tuple2<>("TEST", new TimelineMetrics()) : new Tuple2<String, 
TimelineMetrics>(timelineMetrics.getMetrics().get(0).getAppId(), 
timelineMetrics)
-    );
-
-    appMetricStream.print();
-
-    //Filter AppIds that are not needed.
-    JavaPairDStream<String, TimelineMetrics> filteredAppMetricStream = 
appMetricStream.filter(new Function<Tuple2<String, TimelineMetrics>, Boolean>() 
{
-      @Override
-      public Boolean call(Tuple2<String, TimelineMetrics> appMetricTuple) 
throws Exception {
-        return appIds.contains(appMetricTuple._1);
-      }
-    });
-
-    filteredAppMetricStream.print();
-
-    filteredAppMetricStream.foreachRDD(rdd -> {
-      rdd.foreach(
-        tuple2 -> {
-          long currentTime = System.currentTimeMillis();
-          EmaTechnique ema = emaTechniqueBroadcast.getValue();
-          if (currentTime > pitStartTime + pitTestInterval) {
-            LOG.info("Running Tukeys....");
-            pointInTimeADSystemBroadcast.getValue().runTukeysAndRefineEma(ema, 
currentTime);
-            pitStartTime = pitStartTime + pitTestInterval;
-          }
-
-          if (currentTime > ksStartTime + ksTestInterval) {
-            LOG.info("Running KS Test....");
-            trendADSystemBroadcast.getValue().runKSTest(currentTime, 
trendMetrics);
-            ksStartTime = ksStartTime + ksTestInterval;
-          }
-
-          if (currentTime > hdevStartTime + hsdevInterval) {
-            LOG.info("Running HSdev Test....");
-            trendADSystemBroadcast.getValue().runHsdevMethod();
-            hdevStartTime = hdevStartTime + hsdevInterval;
-          }
-
-          TimelineMetrics metrics = tuple2._2();
-          for (TimelineMetric timelineMetric : metrics.getMetrics()) {
-
-            boolean includeHost = 
includedHostBroadcast.getValue().contains(timelineMetric.getHostName());
-            boolean includeMetric = false;
-            if (includeHost) {
-              if (includePatternBroadcast.getValue().isEmpty()) {
-                includeMetric = true;
-              }
-              for (Pattern p : includePatternBroadcast.getValue()) {
-                Matcher m = p.matcher(timelineMetric.getMetricName());
-                if (m.find()) {
-                  includeMetric = true;
-                }
-              }
-            }
-
-            if (includeMetric) {
-              trendMetrics.add(new TrendMetric(timelineMetric.getMetricName(), 
timelineMetric.getAppId(),
-                timelineMetric.getHostName()));
-              List<MetricAnomaly> anomalies = ema.test(timelineMetric);
-              metricsCollectorInterfaceBroadcast.getValue().publish(anomalies);
-            }
-          }
-        });
-    });
-
-    jssc.start();
-    jssc.awaitTermination();
-  }
-}
-
-
-
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java
deleted file mode 100644
index 246565d..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/MetricsCollectorInterface.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.core;
-
-import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.InetAddress;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TreeMap;
-
-public class MetricsCollectorInterface implements Serializable {
-
-  private static String hostName = null;
-  private String instanceId = null;
-  public final static String serviceName = "anomaly-engine";
-  private String collectorHost;
-  private String protocol;
-  private String port;
-  private static final String WS_V1_TIMELINE_METRICS = 
"/ws/v1/timeline/metrics";
-  private static final Log LOG = 
LogFactory.getLog(MetricsCollectorInterface.class);
-  private static ObjectMapper mapper;
-  private final static ObjectReader timelineObjectReader;
-
-  static {
-    mapper = new ObjectMapper();
-    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
-    mapper.setAnnotationIntrospector(introspector);
-    mapper.getSerializationConfig()
-      .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
-    timelineObjectReader = mapper.reader(TimelineMetrics.class);
-  }
-
-  public MetricsCollectorInterface(String collectorHost, String protocol, 
String port) {
-    this.collectorHost = collectorHost;
-    this.protocol = protocol;
-    this.port = port;
-    this.hostName = getDefaultLocalHostName();
-  }
-
-  public static String getDefaultLocalHostName() {
-
-    if (hostName != null) {
-      return hostName;
-    }
-
-    try {
-      return InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException e) {
-      LOG.info("Error getting host address");
-    }
-    return null;
-  }
-
-  public void publish(List<MetricAnomaly> metricAnomalies) {
-    if (CollectionUtils.isNotEmpty(metricAnomalies)) {
-      LOG.info("Sending metric anomalies of size : " + metricAnomalies.size());
-      List<TimelineMetric> metricList = getTimelineMetricList(metricAnomalies);
-      if (!metricList.isEmpty()) {
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        timelineMetrics.setMetrics(metricList);
-        emitMetrics(timelineMetrics);
-      }
-    } else {
-      LOG.debug("No anomalies to send.");
-    }
-  }
-
-  private List<TimelineMetric> getTimelineMetricList(List<MetricAnomaly> 
metricAnomalies) {
-    List<TimelineMetric> metrics = new ArrayList<>();
-
-    if (metricAnomalies.isEmpty()) {
-      return metrics;
-    }
-
-    for (MetricAnomaly anomaly : metricAnomalies) {
-      TimelineMetric timelineMetric = new TimelineMetric();
-      timelineMetric.setMetricName(anomaly.getMetricKey());
-      timelineMetric.setAppId(serviceName + "-" + anomaly.getMethodType());
-      timelineMetric.setInstanceId(null);
-      timelineMetric.setHostName(getDefaultLocalHostName());
-      timelineMetric.setStartTime(anomaly.getTimestamp());
-      HashMap<String, String> metadata = new HashMap<>();
-      metadata.put("method", anomaly.getMethodType());
-      metadata.put("anomaly-score", String.valueOf(anomaly.getAnomalyScore()));
-      timelineMetric.setMetadata(metadata);
-      TreeMap<Long,Double> metricValues = new TreeMap<>();
-      metricValues.put(anomaly.getTimestamp(), anomaly.getMetricValue());
-      timelineMetric.setMetricValues(metricValues);
-
-      metrics.add(timelineMetric);
-    }
-    return metrics;
-  }
-
-  public boolean emitMetrics(TimelineMetrics metrics) {
-    String connectUrl = constructTimelineMetricUri();
-    String jsonData = null;
-    LOG.debug("EmitMetrics connectUrl = " + connectUrl);
-    try {
-      jsonData = mapper.writeValueAsString(metrics);
-      LOG.info(jsonData);
-    } catch (IOException e) {
-      LOG.error("Unable to parse metrics", e);
-    }
-    if (jsonData != null) {
-      return emitMetricsJson(connectUrl, jsonData);
-    }
-    return false;
-  }
-
-  private HttpURLConnection getConnection(String spec) throws IOException {
-    return (HttpURLConnection) new URL(spec).openConnection();
-  }
-
-  private boolean emitMetricsJson(String connectUrl, String jsonData) {
-    int timeout = 10000;
-    HttpURLConnection connection = null;
-    try {
-      if (connectUrl == null) {
-        throw new IOException("Unknown URL. Unable to connect to metrics 
collector.");
-      }
-      connection = getConnection(connectUrl);
-
-      connection.setRequestMethod("POST");
-      connection.setRequestProperty("Content-Type", "application/json");
-      connection.setRequestProperty("Connection", "Keep-Alive");
-      connection.setConnectTimeout(timeout);
-      connection.setReadTimeout(timeout);
-      connection.setDoOutput(true);
-
-      if (jsonData != null) {
-        try (OutputStream os = connection.getOutputStream()) {
-          os.write(jsonData.getBytes("UTF-8"));
-        }
-      }
-
-      int statusCode = connection.getResponseCode();
-
-      if (statusCode != 200) {
-        LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
-          "statusCode = " + statusCode);
-      } else {
-        LOG.info("Metrics posted to Collector " + connectUrl);
-      }
-      return true;
-    } catch (IOException ioe) {
-      LOG.error(ioe.getMessage());
-    }
-    return false;
-  }
-
-  private String constructTimelineMetricUri() {
-    StringBuilder sb = new StringBuilder(protocol);
-    sb.append("://");
-    sb.append(collectorHost);
-    sb.append(":");
-    sb.append(port);
-    sb.append(WS_V1_TIMELINE_METRICS);
-    return sb.toString();
-  }
-
-  public TimelineMetrics fetchMetrics(String metricName,
-                                      String appId,
-                                      String hostname,
-                                      long startime,
-                                      long endtime) {
-
-    String url = constructTimelineMetricUri() + "?metricNames=" + metricName + 
"&appId=" + appId +
-      "&hostname=" + hostname + "&startTime=" + startime + "&endTime=" + 
endtime;
-    LOG.debug("Fetch metrics URL : " + url);
-
-    URL obj = null;
-    BufferedReader in = null;
-    TimelineMetrics timelineMetrics = new TimelineMetrics();
-
-    try {
-      obj = new URL(url);
-      HttpURLConnection con = (HttpURLConnection) obj.openConnection();
-      con.setRequestMethod("GET");
-      int responseCode = con.getResponseCode();
-      LOG.debug("Sending 'GET' request to URL : " + url);
-      LOG.debug("Response Code : " + responseCode);
-
-      in = new BufferedReader(
-        new InputStreamReader(con.getInputStream()));
-      timelineMetrics = timelineObjectReader.readValue(in);
-    } catch (Exception e) {
-      LOG.error(e);
-    } finally {
-      if (in != null) {
-        try {
-          in.close();
-        } catch (IOException e) {
-          LOG.warn(e);
-        }
-      }
-    }
-
-    LOG.info("Fetched " + timelineMetrics.getMetrics().size() + " metrics.");
-    return timelineMetrics;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java
deleted file mode 100644
index c579515..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/PointInTimeADSystem.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.core;
-
-import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
-import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
-import org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaModel;
-import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class PointInTimeADSystem implements Serializable {
-
-  //private EmaTechnique emaTechnique;
-  private MetricsCollectorInterface metricsCollectorInterface;
-  private Map<String, Double> tukeysNMap;
-  private double defaultTukeysN = 3;
-
-  private long testIntervalMillis = 5*60*1000; //10mins
-  private long trainIntervalMillis = 15*60*1000; //1hour
-
-  private static final Log LOG = LogFactory.getLog(PointInTimeADSystem.class);
-
-  private AmbariServerInterface ambariServerInterface;
-  private int sensitivity = 50;
-  private int minSensitivity = 0;
-  private int maxSensitivity = 100;
-
-  public PointInTimeADSystem(MetricsCollectorInterface 
metricsCollectorInterface, double defaultTukeysN,
-                             long testIntervalMillis, long 
trainIntervalMillis, String ambariServerHost, String clusterName) {
-    this.metricsCollectorInterface = metricsCollectorInterface;
-    this.defaultTukeysN = defaultTukeysN;
-    this.tukeysNMap = new HashMap<>();
-    this.testIntervalMillis = testIntervalMillis;
-    this.trainIntervalMillis = trainIntervalMillis;
-    this.ambariServerInterface = new AmbariServerInterface(ambariServerHost, 
clusterName);
-    LOG.info("Starting PointInTimeADSystem...");
-  }
-
-  public void runTukeysAndRefineEma(EmaTechnique emaTechnique, long startTime) 
{
-    LOG.info("Running Tukeys for test data interval [" + new Date(startTime - 
testIntervalMillis) + " : " + new Date(startTime) + "], with train data period 
[" + new Date(startTime  - testIntervalMillis - trainIntervalMillis) + " : " + 
new Date(startTime - testIntervalMillis) + "]");
-
-    int requiredSensivity = ambariServerInterface.getPointInTimeSensitivity();
-    if (requiredSensivity == -1 || requiredSensivity == sensitivity) {
-      LOG.info("No change in sensitivity needed.");
-    } else {
-      LOG.info("Current tukey's N value = " + defaultTukeysN);
-      if (requiredSensivity > sensitivity) {
-        int targetSensitivity = Math.min(maxSensitivity, requiredSensivity);
-        while (sensitivity < targetSensitivity) {
-          defaultTukeysN = defaultTukeysN + defaultTukeysN * 0.05;
-          sensitivity++;
-        }
-      } else {
-        int targetSensitivity = Math.max(minSensitivity, requiredSensivity);
-        while (sensitivity > targetSensitivity) {
-          defaultTukeysN = defaultTukeysN - defaultTukeysN * 0.05;
-          sensitivity--;
-        }
-      }
-      LOG.info("New tukey's N value = " + defaultTukeysN);
-    }
-
-    TimelineMetrics timelineMetrics = new TimelineMetrics();
-    for (String metricKey : emaTechnique.getTrackedEmas().keySet()) {
-      LOG.info("EMA key = " + metricKey);
-      EmaModel emaModel = emaTechnique.getTrackedEmas().get(metricKey);
-      String metricName = emaModel.getMetricName();
-      String appId = emaModel.getAppId();
-      String hostname = emaModel.getHostname();
-
-      TimelineMetrics tukeysData = 
metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, startTime - 
(testIntervalMillis + trainIntervalMillis),
-        startTime);
-
-      if (tukeysData.getMetrics().isEmpty()) {
-        LOG.info("No metrics fetched for Tukeys, metricKey = " + metricKey);
-        continue;
-      }
-
-      List<Double> trainTsList = new ArrayList<>();
-      List<Double> trainDataList = new ArrayList<>();
-      List<Double> testTsList = new ArrayList<>();
-      List<Double> testDataList = new ArrayList<>();
-
-      for (TimelineMetric metric : tukeysData.getMetrics()) {
-        for (Long timestamp : metric.getMetricValues().keySet()) {
-          if (timestamp <= (startTime - testIntervalMillis)) {
-            trainDataList.add(metric.getMetricValues().get(timestamp));
-            trainTsList.add((double)timestamp);
-          } else {
-            testDataList.add(metric.getMetricValues().get(timestamp));
-            testTsList.add((double)timestamp);
-          }
-        }
-      }
-
-      if (trainDataList.isEmpty() || testDataList.isEmpty() || 
trainDataList.size() < testDataList.size()) {
-        LOG.info("Not enough train/test data to perform analysis.");
-        continue;
-      }
-
-      String tukeysTrainSeries = "tukeysTrainSeries";
-      double[] trainTs = new double[trainTsList.size()];
-      double[] trainData = new double[trainTsList.size()];
-      for (int i = 0; i < trainTs.length; i++) {
-        trainTs[i] = trainTsList.get(i);
-        trainData[i] = trainDataList.get(i);
-      }
-
-      String tukeysTestSeries = "tukeysTestSeries";
-      double[] testTs = new double[testTsList.size()];
-      double[] testData = new double[testTsList.size()];
-      for (int i = 0; i < testTs.length; i++) {
-        testTs[i] = testTsList.get(i);
-        testData[i] = testDataList.get(i);
-      }
-
-      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + 
testTs.length);
-
-      DataSeries tukeysTrainData = new DataSeries(tukeysTrainSeries, trainTs, 
trainData);
-      DataSeries tukeysTestData = new DataSeries(tukeysTestSeries, testTs, 
testData);
-
-      if (!tukeysNMap.containsKey(metricKey)) {
-        tukeysNMap.put(metricKey, defaultTukeysN);
-      }
-
-      Map<String, String> configs = new HashMap<>();
-      configs.put("tukeys.n", String.valueOf(tukeysNMap.get(metricKey)));
-
-      ResultSet rs = RFunctionInvoker.tukeys(tukeysTrainData, tukeysTestData, 
configs);
-
-      List<TimelineMetric> tukeysMetrics = getAsTimelineMetric(rs, metricName, 
appId, hostname);
-      LOG.info("Tukeys anomalies size : " + tukeysMetrics.size());
-      TreeMap<Long, Double> tukeysMetricValues = new TreeMap<>();
-
-      for (TimelineMetric tukeysMetric : tukeysMetrics) {
-        tukeysMetricValues.putAll(tukeysMetric.getMetricValues());
-        timelineMetrics.addOrMergeTimelineMetric(tukeysMetric);
-      }
-
-      TimelineMetrics emaData = 
metricsCollectorInterface.fetchMetrics(metricKey, 
MetricsCollectorInterface.serviceName+"-ema", 
MetricsCollectorInterface.getDefaultLocalHostName(), startTime - 
testIntervalMillis, startTime);
-      TreeMap<Long, Double> emaMetricValues = new TreeMap();
-      if (!emaData.getMetrics().isEmpty()) {
-        emaMetricValues = emaData.getMetrics().get(0).getMetricValues();
-      }
-
-      LOG.info("Ema anomalies size : " + emaMetricValues.size());
-      int tp = 0;
-      int tn = 0;
-      int fp = 0;
-      int fn = 0;
-
-      for (double ts : testTs) {
-        long timestamp = (long) ts;
-        if (tukeysMetricValues.containsKey(timestamp)) {
-          if (emaMetricValues.containsKey(timestamp)) {
-            tp++;
-          } else {
-            fn++;
-          }
-        } else {
-          if (emaMetricValues.containsKey(timestamp)) {
-            fp++;
-          } else {
-            tn++;
-          }
-        }
-      }
-
-      double recall = (double) tp / (double) (tp + fn);
-      double precision = (double) tp / (double) (tp + fp);
-      LOG.info("----------------------------");
-      LOG.info("Precision Recall values for " + metricKey);
-      LOG.info("tp=" + tp + ", fp=" + fp + ", tn=" + tn + ", fn=" + fn);
-      LOG.info("----------------------------");
-
-      if (recall < 0.5) {
-        LOG.info("Increasing EMA sensitivity by 10%");
-        emaModel.updateModel(true, 5);
-      } else if (precision < 0.5) {
-        LOG.info("Decreasing EMA sensitivity by 10%");
-        emaModel.updateModel(false, 5);
-      }
-
-    }
-
-    if (emaTechnique.getTrackedEmas().isEmpty()){
-      LOG.info("No EMA Technique keys tracked!!!!");
-    }
-
-    if (!timelineMetrics.getMetrics().isEmpty()) {
-      metricsCollectorInterface.emitMetrics(timelineMetrics);
-    }
-  }
-
-  private static List<TimelineMetric> getAsTimelineMetric(ResultSet result, 
String metricName, String appId, String hostname) {
-
-    List<TimelineMetric> timelineMetrics = new ArrayList<>();
-
-    if (result == null) {
-      LOG.info("ResultSet from R call is null!!");
-      return null;
-    }
-
-    if (result.resultset.size() > 0) {
-      double[] ts = result.resultset.get(0);
-      double[] metrics = result.resultset.get(1);
-      double[] anomalyScore = result.resultset.get(2);
-      for (int i = 0; i < ts.length; i++) {
-        TimelineMetric timelineMetric = new TimelineMetric();
-        timelineMetric.setMetricName(metricName + ":" + appId + ":" + 
hostname);
-        
timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
-        timelineMetric.setAppId(MetricsCollectorInterface.serviceName + 
"-tukeys");
-        timelineMetric.setInstanceId(null);
-        timelineMetric.setStartTime((long) ts[i]);
-        TreeMap<Long, Double> metricValues = new TreeMap<>();
-        metricValues.put((long) ts[i], metrics[i]);
-
-        HashMap<String, String> metadata = new HashMap<>();
-        metadata.put("method", "tukeys");
-        if (String.valueOf(anomalyScore[i]).equals("infinity")) {
-          LOG.info("Got anomalyScore = infinity for " + metricName + ":" + 
appId + ":" + hostname);
-        } else {
-          metadata.put("anomaly-score", String.valueOf(anomalyScore[i]));
-        }
-        timelineMetric.setMetadata(metadata);
-
-        timelineMetric.setMetricValues(metricValues);
-        timelineMetrics.add(timelineMetric);
-      }
-    }
-
-    return timelineMetrics;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java
deleted file mode 100644
index 4538f0b..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/RFunctionInvoker.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.core;
-
-
-import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
-import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.rosuda.JRI.REXP;
-import org.rosuda.JRI.RVector;
-import org.rosuda.JRI.Rengine;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class RFunctionInvoker {
-
-  static final Log LOG = LogFactory.getLog(RFunctionInvoker.class);
-  public static Rengine r = new Rengine(new String[]{"--no-save"}, false, 
null);
-  private static String rScriptDir = 
"/usr/lib/ambari-metrics-collector/R-scripts";
-
-  private static void loadDataSets(Rengine r, DataSeries trainData, DataSeries 
testData) {
-    r.assign("train_ts", trainData.ts);
-    r.assign("train_x", trainData.values);
-    r.eval("train_data <- data.frame(train_ts,train_x)");
-    r.eval("names(train_data) <- c(\"TS\", " + trainData.seriesName + ")");
-
-    r.assign("test_ts", testData.ts);
-    r.assign("test_x", testData.values);
-    r.eval("test_data <- data.frame(test_ts,test_x)");
-    r.eval("names(test_data) <- c(\"TS\", " + testData.seriesName + ")");
-  }
-
-  public static void setScriptsDir(String dir) {
-    rScriptDir = dir;
-  }
-
-  public static ResultSet executeMethod(String methodType, DataSeries 
trainData, DataSeries testData, Map<String, String> configs) {
-
-    ResultSet result;
-    switch (methodType) {
-      case "tukeys":
-        result = tukeys(trainData, testData, configs);
-        break;
-      case "ema":
-        result = ema_global(trainData, testData, configs);
-        break;
-      case "ks":
-        result = ksTest(trainData, testData, configs);
-        break;
-      case "hsdev":
-        result = hsdev(trainData, testData, configs);
-        break;
-      default:
-        result = tukeys(trainData, testData, configs);
-        break;
-    }
-    return result;
-  }
-
-  public static ResultSet tukeys(DataSeries trainData, DataSeries testData, 
Map<String, String> configs) {
-    try {
-
-      REXP exp1 = r.eval("source('" + rScriptDir + "/tukeys.r" + "')");
-
-      double n = Double.parseDouble(configs.get("tukeys.n"));
-      r.eval("n <- " + n);
-
-      loadDataSets(r, trainData, testData);
-
-      r.eval("an <- ams_tukeys(train_data, test_data, n)");
-      REXP exp = r.eval("an");
-      RVector cont = (RVector) exp.getContent();
-      List<double[]> result = new ArrayList();
-      for (int i = 0; i < cont.size(); i++) {
-        result.add(cont.at(i).asDoubleArray());
-      }
-      return new ResultSet(result);
-    } catch (Exception e) {
-      LOG.error(e);
-    } finally {
-      r.end();
-    }
-    return null;
-  }
-
-  public static ResultSet ema_global(DataSeries trainData, DataSeries 
testData, Map<String, String> configs) {
-    try {
-      r.eval("source('" + rScriptDir + "/ema.r" + "')");
-
-      int n = Integer.parseInt(configs.get("ema.n"));
-      r.eval("n <- " + n);
-
-      double w = Double.parseDouble(configs.get("ema.w"));
-      r.eval("w <- " + w);
-
-      loadDataSets(r, trainData, testData);
-
-      r.eval("an <- ema_global(train_data, test_data, w, n)");
-      REXP exp = r.eval("an");
-      RVector cont = (RVector) exp.getContent();
-      List<double[]> result = new ArrayList();
-      for (int i = 0; i < cont.size(); i++) {
-        result.add(cont.at(i).asDoubleArray());
-      }
-      return new ResultSet(result);
-
-    } catch (Exception e) {
-      LOG.error(e);
-    } finally {
-      r.end();
-    }
-    return null;
-  }
-
-  public static ResultSet ema_daily(DataSeries trainData, DataSeries testData, 
Map<String, String> configs) {
-    try {
-      r.eval("source('" + rScriptDir + "/ema.r" + "')");
-
-      int n = Integer.parseInt(configs.get("ema.n"));
-      r.eval("n <- " + n);
-
-      double w = Double.parseDouble(configs.get("ema.w"));
-      r.eval("w <- " + w);
-
-      loadDataSets(r, trainData, testData);
-
-      r.eval("an <- ema_daily(train_data, test_data, w, n)");
-      REXP exp = r.eval("an");
-      RVector cont = (RVector) exp.getContent();
-      List<double[]> result = new ArrayList();
-      for (int i = 0; i < cont.size(); i++) {
-        result.add(cont.at(i).asDoubleArray());
-      }
-      return new ResultSet(result);
-
-    } catch (Exception e) {
-      LOG.error(e);
-    } finally {
-      r.end();
-    }
-    return null;
-  }
-
-  public static ResultSet ksTest(DataSeries trainData, DataSeries testData, 
Map<String, String> configs) {
-    try {
-      r.eval("source('" + rScriptDir + "/kstest.r" + "')");
-
-      double p_value = Double.parseDouble(configs.get("ks.p_value"));
-      r.eval("p_value <- " + p_value);
-
-      loadDataSets(r, trainData, testData);
-
-      r.eval("an <- ams_ks(train_data, test_data, p_value)");
-      REXP exp = r.eval("an");
-      RVector cont = (RVector) exp.getContent();
-      List<double[]> result = new ArrayList();
-      for (int i = 0; i < cont.size(); i++) {
-        result.add(cont.at(i).asDoubleArray());
-      }
-      return new ResultSet(result);
-
-    } catch (Exception e) {
-      LOG.error(e);
-    } finally {
-      r.end();
-    }
-    return null;
-  }
-
-  public static ResultSet hsdev(DataSeries trainData, DataSeries testData, 
Map<String, String> configs) {
-    try {
-      r.eval("source('" + rScriptDir + "/hsdev.r" + "')");
-
-      int n = Integer.parseInt(configs.get("hsdev.n"));
-      r.eval("n <- " + n);
-
-      int nhp = Integer.parseInt(configs.get("hsdev.nhp"));
-      r.eval("nhp <- " + nhp);
-
-      long interval = Long.parseLong(configs.get("hsdev.interval"));
-      r.eval("interval <- " + interval);
-
-      long period = Long.parseLong(configs.get("hsdev.period"));
-      r.eval("period <- " + period);
-
-      loadDataSets(r, trainData, testData);
-
-      r.eval("an2 <- hsdev_daily(train_data, test_data, n, nhp, interval, 
period)");
-      REXP exp = r.eval("an2");
-      RVector cont = (RVector) exp.getContent();
-
-      List<double[]> result = new ArrayList();
-      for (int i = 0; i < cont.size(); i++) {
-        result.add(cont.at(i).asDoubleArray());
-      }
-      return new ResultSet(result);
-    } catch (Exception e) {
-      LOG.error(e);
-    } finally {
-      r.end();
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java
deleted file mode 100644
index 2a205d1..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendADSystem.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.core;
-
-import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
-import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
-import 
org.apache.ambari.metrics.alertservice.prototype.methods.hsdev.HsdevTechnique;
-import 
org.apache.ambari.metrics.alertservice.prototype.methods.kstest.KSTechnique;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
-public class TrendADSystem implements Serializable {
-
-  private MetricsCollectorInterface metricsCollectorInterface;
-  private List<TrendMetric> trendMetrics;
-
-  private long ksTestIntervalMillis = 10 * 60 * 1000;
-  private long ksTrainIntervalMillis = 10 * 60 * 1000;
-  private KSTechnique ksTechnique;
-
-  private HsdevTechnique hsdevTechnique;
-  private int hsdevNumHistoricalPeriods = 3;
-
-  private Map<KsSingleRunKey, MetricAnomaly> trackedKsAnomalies = new 
HashMap<>();
-  private static final Log LOG = LogFactory.getLog(TrendADSystem.class);
-  private String inputFile = "";
-
-  public TrendADSystem(MetricsCollectorInterface metricsCollectorInterface,
-                       long ksTestIntervalMillis,
-                       long ksTrainIntervalMillis,
-                       int hsdevNumHistoricalPeriods) {
-
-    this.metricsCollectorInterface = metricsCollectorInterface;
-    this.ksTestIntervalMillis = ksTestIntervalMillis;
-    this.ksTrainIntervalMillis = ksTrainIntervalMillis;
-    this.hsdevNumHistoricalPeriods = hsdevNumHistoricalPeriods;
-
-    this.ksTechnique = new KSTechnique();
-    this.hsdevTechnique = new HsdevTechnique();
-
-    trendMetrics = new ArrayList<>();
-  }
-
-  public void runKSTest(long currentEndTime, Set<TrendMetric> trendMetrics) {
-    readInputFile(inputFile);
-
-    long ksTestIntervalStartTime = currentEndTime - ksTestIntervalMillis;
-    LOG.info("Running KS Test for test data interval [" + new 
Date(ksTestIntervalStartTime) + " : " +
-      new Date(currentEndTime) + "], with train data period [" + new 
Date(ksTestIntervalStartTime - ksTrainIntervalMillis)
-      + " : " + new Date(ksTestIntervalStartTime) + "]");
-
-    for (TrendMetric metric : trendMetrics) {
-      String metricName = metric.metricName;
-      String appId = metric.appId;
-      String hostname = metric.hostname;
-      String key = metricName + ":" + appId + ":" + hostname;
-
-      TimelineMetrics ksData = 
metricsCollectorInterface.fetchMetrics(metricName, appId, hostname, 
ksTestIntervalStartTime - ksTrainIntervalMillis,
-        currentEndTime);
-
-      if (ksData.getMetrics().isEmpty()) {
-        LOG.info("No metrics fetched for KS, metricKey = " + key);
-        continue;
-      }
-
-      List<Double> trainTsList = new ArrayList<>();
-      List<Double> trainDataList = new ArrayList<>();
-      List<Double> testTsList = new ArrayList<>();
-      List<Double> testDataList = new ArrayList<>();
-
-      for (TimelineMetric timelineMetric : ksData.getMetrics()) {
-        for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
-          if (timestamp <= ksTestIntervalStartTime) {
-            trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
-            trainTsList.add((double) timestamp);
-          } else {
-            testDataList.add(timelineMetric.getMetricValues().get(timestamp));
-            testTsList.add((double) timestamp);
-          }
-        }
-      }
-
-      LOG.info("Train Data size : " + trainDataList.size() + ", Test Data Size 
: " + testDataList.size());
-      if (trainDataList.isEmpty() || testDataList.isEmpty() || 
trainDataList.size() < testDataList.size()) {
-        LOG.info("Not enough train/test data to perform KS analysis.");
-        continue;
-      }
-
-      String ksTrainSeries = "KSTrainSeries";
-      double[] trainTs = new double[trainTsList.size()];
-      double[] trainData = new double[trainTsList.size()];
-      for (int i = 0; i < trainTs.length; i++) {
-        trainTs[i] = trainTsList.get(i);
-        trainData[i] = trainDataList.get(i);
-      }
-
-      String ksTestSeries = "KSTestSeries";
-      double[] testTs = new double[testTsList.size()];
-      double[] testData = new double[testTsList.size()];
-      for (int i = 0; i < testTs.length; i++) {
-        testTs[i] = testTsList.get(i);
-        testData[i] = testDataList.get(i);
-      }
-
-      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + 
testTs.length);
-
-      DataSeries ksTrainData = new DataSeries(ksTrainSeries, trainTs, 
trainData);
-      DataSeries ksTestData = new DataSeries(ksTestSeries, testTs, testData);
-
-      MetricAnomaly metricAnomaly = ksTechnique.runKsTest(key, ksTrainData, 
ksTestData);
-      if (metricAnomaly == null) {
-        LOG.info("No anomaly from KS test.");
-      } else {
-        LOG.info("Found Anomaly in KS Test. Publishing KS Anomaly metric....");
-        TimelineMetric timelineMetric = getAsTimelineMetric(metricAnomaly,
-          ksTestIntervalStartTime, currentEndTime, ksTestIntervalStartTime - 
ksTrainIntervalMillis, ksTestIntervalStartTime);
-        TimelineMetrics timelineMetrics = new TimelineMetrics();
-        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
-        metricsCollectorInterface.emitMetrics(timelineMetrics);
-
-        trackedKsAnomalies.put(new KsSingleRunKey(ksTestIntervalStartTime, 
currentEndTime, metricName, appId, hostname), metricAnomaly);
-      }
-    }
-
-    if (trendMetrics.isEmpty()) {
-      LOG.info("No Trend metrics tracked!!!!");
-    }
-
-  }
-
-  private TimelineMetric getAsTimelineMetric(MetricAnomaly metricAnomaly,
-                                   long testStart,
-                                   long testEnd,
-                                   long trainStart,
-                                   long trainEnd) {
-
-    TimelineMetric timelineMetric = new TimelineMetric();
-    timelineMetric.setMetricName(metricAnomaly.getMetricKey());
-    timelineMetric.setAppId(MetricsCollectorInterface.serviceName + "-" + 
metricAnomaly.getMethodType());
-    timelineMetric.setInstanceId(null);
-    
timelineMetric.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
-    timelineMetric.setStartTime(testEnd);
-    HashMap<String, String> metadata = new HashMap<>();
-    metadata.put("method", metricAnomaly.getMethodType());
-    metadata.put("anomaly-score", 
String.valueOf(metricAnomaly.getAnomalyScore()));
-    metadata.put("test-start-time", String.valueOf(testStart));
-    metadata.put("train-start-time", String.valueOf(trainStart));
-    metadata.put("train-end-time", String.valueOf(trainEnd));
-    timelineMetric.setMetadata(metadata);
-    TreeMap<Long,Double> metricValues = new TreeMap<>();
-    metricValues.put(testEnd, metricAnomaly.getMetricValue());
-    timelineMetric.setMetricValues(metricValues);
-    return timelineMetric;
-
-  }
-
-  public void runHsdevMethod() {
-
-    List<TimelineMetric> hsdevMetricAnomalies = new ArrayList<>();
-
-    for (KsSingleRunKey ksSingleRunKey : trackedKsAnomalies.keySet()) {
-
-      long hsdevTestEnd = ksSingleRunKey.endTime;
-      long hsdevTestStart = ksSingleRunKey.startTime;
-
-      long period = hsdevTestEnd - hsdevTestStart;
-
-      long hsdevTrainStart = hsdevTestStart - (hsdevNumHistoricalPeriods) * 
period;
-      long hsdevTrainEnd = hsdevTestStart;
-
-      LOG.info("Running HSdev Test for test data interval [" + new 
Date(hsdevTestStart) + " : " +
-        new Date(hsdevTestEnd) + "], with train data period [" + new 
Date(hsdevTrainStart)
-        + " : " + new Date(hsdevTrainEnd) + "]");
-
-      String metricName = ksSingleRunKey.metricName;
-      String appId = ksSingleRunKey.appId;
-      String hostname = ksSingleRunKey.hostname;
-      String key = metricName + "_" + appId + "_" + hostname;
-
-      TimelineMetrics hsdevData = metricsCollectorInterface.fetchMetrics(
-        metricName,
-        appId,
-        hostname,
-        hsdevTrainStart,
-        hsdevTestEnd);
-
-      if (hsdevData.getMetrics().isEmpty()) {
-        LOG.info("No metrics fetched for HSDev, metricKey = " + key);
-        continue;
-      }
-
-      List<Double> trainTsList = new ArrayList<>();
-      List<Double> trainDataList = new ArrayList<>();
-      List<Double> testTsList = new ArrayList<>();
-      List<Double> testDataList = new ArrayList<>();
-
-      for (TimelineMetric timelineMetric : hsdevData.getMetrics()) {
-        for (Long timestamp : timelineMetric.getMetricValues().keySet()) {
-          if (timestamp <= hsdevTestStart) {
-            trainDataList.add(timelineMetric.getMetricValues().get(timestamp));
-            trainTsList.add((double) timestamp);
-          } else {
-            testDataList.add(timelineMetric.getMetricValues().get(timestamp));
-            testTsList.add((double) timestamp);
-          }
-        }
-      }
-
-      if (trainDataList.isEmpty() || testDataList.isEmpty() || 
trainDataList.size() < testDataList.size()) {
-        LOG.info("Not enough train/test data to perform Hsdev analysis.");
-        continue;
-      }
-
-      String hsdevTrainSeries = "HsdevTrainSeries";
-      double[] trainTs = new double[trainTsList.size()];
-      double[] trainData = new double[trainTsList.size()];
-      for (int i = 0; i < trainTs.length; i++) {
-        trainTs[i] = trainTsList.get(i);
-        trainData[i] = trainDataList.get(i);
-      }
-
-      String hsdevTestSeries = "HsdevTestSeries";
-      double[] testTs = new double[testTsList.size()];
-      double[] testData = new double[testTsList.size()];
-      for (int i = 0; i < testTs.length; i++) {
-        testTs[i] = testTsList.get(i);
-        testData[i] = testDataList.get(i);
-      }
-
-      LOG.info("Train Size = " + trainTs.length + ", Test Size = " + 
testTs.length);
-
-      DataSeries hsdevTrainData = new DataSeries(hsdevTrainSeries, trainTs, 
trainData);
-      DataSeries hsdevTestData = new DataSeries(hsdevTestSeries, testTs, 
testData);
-
-      MetricAnomaly metricAnomaly = hsdevTechnique.runHsdevTest(key, 
hsdevTrainData, hsdevTestData);
-      if (metricAnomaly == null) {
-        LOG.info("No anomaly from Hsdev test. Mismatch between KS and HSDev. 
");
-        ksTechnique.updateModel(key, false, 10);
-      } else {
-        LOG.info("Found Anomaly in Hsdev Test. This confirms KS anomaly.");
-        hsdevMetricAnomalies.add(getAsTimelineMetric(metricAnomaly,
-          hsdevTestStart, hsdevTestEnd, hsdevTrainStart, hsdevTrainEnd));
-      }
-    }
-    clearTrackedKsRunKeys();
-
-    if (!hsdevMetricAnomalies.isEmpty()) {
-      LOG.info("Publishing Hsdev Anomalies....");
-      TimelineMetrics timelineMetrics = new TimelineMetrics();
-      timelineMetrics.setMetrics(hsdevMetricAnomalies);
-      metricsCollectorInterface.emitMetrics(timelineMetrics);
-    }
-  }
-
-  private void clearTrackedKsRunKeys() {
-    trackedKsAnomalies.clear();
-  }
-
-  private void readInputFile(String fileName) {
-    trendMetrics.clear();
-    try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
-      for (String line; (line = br.readLine()) != null; ) {
-        String[] splits = line.split(",");
-        LOG.info("Adding a new metric to track in Trend AD system : " + 
splits[0]);
-        trendMetrics.add(new TrendMetric(splits[0], splits[1], splits[2]));
-      }
-    } catch (IOException e) {
-      LOG.error("Error reading input file : " + e);
-    }
-  }
-
-  class KsSingleRunKey implements Serializable{
-
-    long startTime;
-    long endTime;
-    String metricName;
-    String appId;
-    String hostname;
-
-    public KsSingleRunKey(long startTime, long endTime, String metricName, 
String appId, String hostname) {
-      this.startTime = startTime;
-      this.endTime = endTime;
-      this.metricName = metricName;
-      this.appId = appId;
-      this.hostname = hostname;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java
deleted file mode 100644
index 0640142..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/core/TrendMetric.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.core;
-
-import java.io.Serializable;
-
-public class TrendMetric implements Serializable {
-
-  String metricName;
-  String appId;
-  String hostname;
-
-  public TrendMetric(String metricName, String appId, String hostname) {
-    this.metricName = metricName;
-    this.appId = appId;
-    this.hostname = hostname;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
deleted file mode 100644
index 0b10b4b..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/AnomalyDetectionTechnique.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.methods;
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.sql.Time;
-import java.util.List;
-import java.util.Map;
-
-public abstract class AnomalyDetectionTechnique {
-
-  protected String methodType;
-
-  public abstract List<MetricAnomaly> test(TimelineMetric metric);
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
deleted file mode 100644
index da4f030..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/MetricAnomaly.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.methods;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MetricAnomaly implements Serializable{
-
-  private String methodType;
-  private double anomalyScore;
-  private String metricKey;
-  private long timestamp;
-  private double metricValue;
-
-
-  public MetricAnomaly(String metricKey, long timestamp, double metricValue, 
String methodType, double anomalyScore) {
-    this.metricKey = metricKey;
-    this.timestamp = timestamp;
-    this.metricValue = metricValue;
-    this.methodType = methodType;
-    this.anomalyScore = anomalyScore;
-
-  }
-
-  public String getMethodType() {
-    return methodType;
-  }
-
-  public void setMethodType(String methodType) {
-    this.methodType = methodType;
-  }
-
-  public double getAnomalyScore() {
-    return anomalyScore;
-  }
-
-  public void setAnomalyScore(double anomalyScore) {
-    this.anomalyScore = anomalyScore;
-  }
-
-  public void setMetricKey(String metricKey) {
-    this.metricKey = metricKey;
-  }
-
-  public String getMetricKey() {
-    return metricKey;
-  }
-
-  public void setMetricName(String metricName) {
-    this.metricKey = metricName;
-  }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  public void setTimestamp(long timestamp) {
-    this.timestamp = timestamp;
-  }
-
-  public double getMetricValue() {
-    return metricValue;
-  }
-
-  public void setMetricValue(double metricValue) {
-    this.metricValue = metricValue;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
deleted file mode 100644
index a31410d..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModel.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.Serializable;
-
-import static 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique.suppressAnomaliesTheshold;
-
-@XmlRootElement
-public class EmaModel implements Serializable {
-
-  private String metricName;
-  private String hostname;
-  private String appId;
-  private double ema;
-  private double ems;
-  private double weight;
-  private double timessdev;
-
-  private int ctr = 0;
-
-  private static final Log LOG = LogFactory.getLog(EmaModel.class);
-
-  public EmaModel(String name, String hostname, String appId, double weight, 
double timessdev) {
-    this.metricName = name;
-    this.hostname = hostname;
-    this.appId = appId;
-    this.weight = weight;
-    this.timessdev = timessdev;
-    this.ema = 0.0;
-    this.ems = 0.0;
-  }
-
-  public String getMetricName() {
-    return metricName;
-  }
-
-  public String getHostname() {
-    return hostname;
-  }
-
-  public String getAppId() {
-    return appId;
-  }
-
-  public double testAndUpdate(double metricValue) {
-
-    double anomalyScore = 0.0;
-    LOG.info("Before Update ->" + metricName + ":" + appId + ":" + hostname + 
" - " + "ema = " + ema + ", ems = " + ems + ", timessdev = " + timessdev);
-    update(metricValue);
-    if (ctr > suppressAnomaliesTheshold) {
-      anomalyScore = test(metricValue);
-      if (anomalyScore > 0.0) {
-        LOG.info("Anomaly ->" + metricName + ":" + appId + ":" + hostname + " 
- " + "ema = " + ema + ", ems = " + ems +
-          ", timessdev = " + timessdev + ", metricValue = " + metricValue);
-      } else {
-        LOG.info("Not an Anomaly ->" + metricName + ":" + appId + ":" + 
hostname + " - " + "ema = " + ema + ", ems = " + ems +
-          ", timessdev = " + timessdev + ", metricValue = " + metricValue);
-      }
-    } else {
-      ctr++;
-      if (ctr > suppressAnomaliesTheshold) {
-        LOG.info("Ema Model for " + metricName + ":" + appId + ":" + hostname 
+ " is ready for testing data.");
-      }
-    }
-    return anomalyScore;
-  }
-
-  public void update(double metricValue) {
-    ema = weight * ema + (1 - weight) * metricValue;
-    ems = Math.sqrt(weight * Math.pow(ems, 2.0) + (1 - weight) * 
Math.pow(metricValue - ema, 2.0));
-    LOG.debug("In update : ema = " + ema + ", ems = " + ems);
-  }
-
-  public double test(double metricValue) {
-    LOG.debug("In test : ema = " + ema + ", ems = " + ems);
-    double diff = Math.abs(ema - metricValue) - (timessdev * ems);
-    LOG.debug("diff = " + diff);
-    if (diff > 0) {
-      return Math.abs((metricValue - ema) / ems); //Z score
-    } else {
-      return 0.0;
-    }
-  }
-
-  public void updateModel(boolean increaseSensitivity, double percent) {
-    LOG.info("Updating model for " + metricName + " with increaseSensitivity = 
" + increaseSensitivity + ", percent = " + percent);
-    double delta = percent / 100;
-    if (increaseSensitivity) {
-      delta = delta * -1;
-    }
-    this.timessdev = timessdev + delta * timessdev;
-    //this.weight = Math.min(1.0, weight + delta * weight);
-    LOG.info("New model parameters " + metricName + " : timessdev = " + 
timessdev + ", weight = " + weight);
-  }
-
-  public double getWeight() {
-    return weight;
-  }
-
-  public void setWeight(double weight) {
-    this.weight = weight;
-  }
-
-  public double getTimessdev() {
-    return timessdev;
-  }
-
-  public void setTimessdev(double timessdev) {
-    this.timessdev = timessdev;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
deleted file mode 100644
index 62749c1..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaModelLoader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
-
-import com.google.gson.Gson;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.spark.SparkContext;
-import org.apache.spark.mllib.util.Loader;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
-public class EmaModelLoader implements Loader<EmaTechnique> {
-    private static final Log LOG = LogFactory.getLog(EmaModelLoader.class);
-
-    @Override
-    public EmaTechnique load(SparkContext sc, String path) {
-        return new EmaTechnique(0.5,3);
-//        Gson gson = new Gson();
-//        try {
-//            String fileString = new 
String(Files.readAllBytes(Paths.get(path)), StandardCharsets.UTF_8);
-//            return gson.fromJson(fileString, EmaTechnique.class);
-//        } catch (IOException e) {
-//            LOG.error(e);
-//        }
-//        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
deleted file mode 100644
index 52c6cf3..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/ema/EmaTechnique.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.methods.ema;
-
-import com.google.gson.Gson;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
-import 
org.apache.ambari.metrics.alertservice.prototype.methods.AnomalyDetectionTechnique;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.spark.SparkContext;
-import org.apache.spark.mllib.util.Saveable;
-
-import javax.xml.bind.annotation.XmlElement;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.BufferedWriter;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@XmlRootElement
-public class EmaTechnique extends AnomalyDetectionTechnique implements 
Serializable, Saveable {
-
-  @XmlElement(name = "trackedEmas")
-  private Map<String, EmaModel> trackedEmas;
-  private static final Log LOG = LogFactory.getLog(EmaTechnique.class);
-
-  private double startingWeight = 0.5;
-  private double startTimesSdev = 3.0;
-  private String methodType = "ema";
-  public static int suppressAnomaliesTheshold = 100;
-
-  public EmaTechnique(double startingWeight, double startTimesSdev, int 
suppressAnomaliesTheshold) {
-    trackedEmas = new HashMap<>();
-    this.startingWeight = startingWeight;
-    this.startTimesSdev = startTimesSdev;
-    EmaTechnique.suppressAnomaliesTheshold = suppressAnomaliesTheshold;
-    LOG.info("New EmaTechnique......");
-  }
-
-  public EmaTechnique(double startingWeight, double startTimesSdev) {
-    trackedEmas = new HashMap<>();
-    this.startingWeight = startingWeight;
-    this.startTimesSdev = startTimesSdev;
-    LOG.info("New EmaTechnique......");
-  }
-
-  public List<MetricAnomaly> test(TimelineMetric metric) {
-    String metricName = metric.getMetricName();
-    String appId = metric.getAppId();
-    String hostname = metric.getHostName();
-    String key = metricName + ":" + appId + ":" + hostname;
-
-    EmaModel emaModel = trackedEmas.get(key);
-    if (emaModel == null) {
-      LOG.debug("EmaModel not present for " + key);
-      LOG.debug("Number of tracked Emas : " + trackedEmas.size());
-      emaModel  = new EmaModel(metricName, hostname, appId, startingWeight, 
startTimesSdev);
-      trackedEmas.put(key, emaModel);
-    } else {
-      LOG.debug("EmaModel already present for " + key);
-    }
-
-    List<MetricAnomaly> anomalies = new ArrayList<>();
-
-    for (Long timestamp : metric.getMetricValues().keySet()) {
-      double metricValue = metric.getMetricValues().get(timestamp);
-      double anomalyScore = emaModel.testAndUpdate(metricValue);
-      if (anomalyScore > 0.0) {
-        LOG.info("Found anomaly for : " + key + ", anomalyScore = " + 
anomalyScore);
-        MetricAnomaly metricAnomaly = new MetricAnomaly(key, timestamp, 
metricValue, methodType, anomalyScore);
-        anomalies.add(metricAnomaly);
-      } else {
-        LOG.debug("Discarding non-anomaly for : " + key);
-      }
-    }
-    return anomalies;
-  }
-
-  public boolean updateModel(TimelineMetric timelineMetric, boolean 
increaseSensitivity, double percent) {
-    String metricName = timelineMetric.getMetricName();
-    String appId = timelineMetric.getAppId();
-    String hostname = timelineMetric.getHostName();
-    String key = metricName + "_" + appId + "_" + hostname;
-
-
-    EmaModel emaModel = trackedEmas.get(key);
-
-    if (emaModel == null) {
-      LOG.warn("EMA Model for " + key + " not found");
-      return false;
-    }
-    emaModel.updateModel(increaseSensitivity, percent);
-
-    return true;
-  }
-
-  @Override
-  public void save(SparkContext sc, String path) {
-    Gson gson = new Gson();
-    try {
-      String json = gson.toJson(this);
-      try (Writer writer = new BufferedWriter(new OutputStreamWriter(
-        new FileOutputStream(path), "utf-8"))) {
-        writer.write(json);
-      }
-    } catch (IOException e) {
-      LOG.error(e);
-    }
-  }
-
-  @Override
-  public String formatVersion() {
-    return "1.0";
-  }
-
-  public Map<String, EmaModel> getTrackedEmas() {
-    return trackedEmas;
-  }
-
-  public double getStartingWeight() {
-    return startingWeight;
-  }
-
-  public double getStartTimesSdev() {
-    return startTimesSdev;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
deleted file mode 100644
index 04f4a73..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/hsdev/HsdevTechnique.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.metrics.alertservice.prototype.methods.hsdev;
-
-import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
-import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import static 
org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.median;
-import static 
org.apache.ambari.metrics.alertservice.prototype.common.StatisticUtils.sdev;
-
-import java.io.Serializable;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-public class HsdevTechnique implements Serializable {
-
-  private Map<String, Double> hsdevMap;
-  private String methodType = "hsdev";
-  private static final Log LOG = LogFactory.getLog(HsdevTechnique.class);
-
-  public HsdevTechnique() {
-    hsdevMap = new HashMap<>();
-  }
-
-  public MetricAnomaly runHsdevTest(String key, DataSeries trainData, 
DataSeries testData) {
-    int testLength = testData.values.length;
-    int trainLength = trainData.values.length;
-
-    if (trainLength < testLength) {
-      LOG.info("Not enough train data.");
-      return null;
-    }
-
-    if (!hsdevMap.containsKey(key)) {
-      hsdevMap.put(key, 3.0);
-    }
-
-    double n = hsdevMap.get(key);
-
-    double historicSd = sdev(trainData.values, false);
-    double historicMedian = median(trainData.values);
-    double currentMedian = median(testData.values);
-
-
-    if (historicSd > 0) {
-      double diff = Math.abs(currentMedian - historicMedian);
-      LOG.info("Found anomaly for metric : " + key + " in the period ending " 
+ new Date((long)testData.ts[testLength - 1]));
-      LOG.info("Current median = " + currentMedian + ", Historic Median = " + 
historicMedian + ", HistoricSd = " + historicSd);
-
-      if (diff > n * historicSd) {
-        double zScore = diff / historicSd;
-        LOG.info("Z Score of current series : " + zScore);
-        return new MetricAnomaly(key,
-          (long) testData.ts[testLength - 1],
-          testData.values[testLength - 1],
-          methodType,
-          zScore);
-      }
-    }
-
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
deleted file mode 100644
index a9360d3..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/methods/kstest/KSTechnique.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.metrics.alertservice.prototype.methods.kstest;
-
-import org.apache.ambari.metrics.alertservice.prototype.core.RFunctionInvoker;
-import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
-import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
-import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KSTechnique implements Serializable {
-
-  private String methodType = "ks";
-  private Map<String, Double> pValueMap;
-  private static final Log LOG = LogFactory.getLog(KSTechnique.class);
-
-  public KSTechnique() {
-    pValueMap = new HashMap();
-  }
-
-  public MetricAnomaly runKsTest(String key, DataSeries trainData, DataSeries 
testData) {
-
-    int testLength = testData.values.length;
-    int trainLength = trainData.values.length;
-
-    if (trainLength < testLength) {
-      LOG.info("Not enough train data.");
-      return null;
-    }
-
-    if (!pValueMap.containsKey(key)) {
-      pValueMap.put(key, 0.05);
-    }
-    double pValue = pValueMap.get(key);
-
-    ResultSet result = RFunctionInvoker.ksTest(trainData, testData, 
Collections.singletonMap("ks.p_value", String.valueOf(pValue)));
-    if (result == null) {
-      LOG.error("Resultset is null when invoking KS R function...");
-      return null;
-    }
-
-    if (result.resultset.size() > 0) {
-
-      LOG.info("Is size 1 ? result size = " + result.resultset.get(0).length);
-      LOG.info("p_value = " + result.resultset.get(3)[0]);
-      double dValue = result.resultset.get(2)[0];
-
-      return new MetricAnomaly(key,
-        (long) testData.ts[testLength - 1],
-        testData.values[testLength - 1],
-        methodType,
-        dValue);
-    }
-
-    return null;
-  }
-
-  public void updateModel(String metricKey, boolean increaseSensitivity, 
double percent) {
-
-    LOG.info("Updating KS model for " + metricKey + " with increaseSensitivity 
= " + increaseSensitivity + ", percent = " + percent);
-
-    if (!pValueMap.containsKey(metricKey)) {
-      LOG.error("Unknown metric key : " + metricKey);
-      LOG.info("pValueMap :" + pValueMap.toString());
-      return;
-    }
-
-    double delta = percent / 100;
-    if (!increaseSensitivity) {
-      delta = delta * -1;
-    }
-
-    double pValue = pValueMap.get(metricKey);
-    double newPValue = Math.min(1.0, pValue + delta * pValue);
-    pValueMap.put(metricKey, newPValue);
-    LOG.info("New pValue = " + newPValue);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/4613b471/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
 
b/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
deleted file mode 100644
index 268cd15..0000000
--- 
a/ambari-metrics/ambari-metrics-anomaly-detector/src/main/java/org/apache/ambari/metrics/alertservice/prototype/testing/utilities/MetricAnomalyDetectorTestInput.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.metrics.alertservice.prototype.testing.utilities;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import java.util.List;
-import java.util.Map;
-
-@XmlRootElement
-public class MetricAnomalyDetectorTestInput {
-
-  public MetricAnomalyDetectorTestInput() {
-  }
-
-  //Train data
-  private String trainDataName;
-  private String trainDataType;
-  private Map<String, String> trainDataConfigs;
-  private int trainDataSize;
-
-  //Test data
-  private String testDataName;
-  private String testDataType;
-  private Map<String, String> testDataConfigs;
-  private int testDataSize;
-
-  //Algorithm data
-  private List<String> methods;
-  private Map<String, String> methodConfigs;
-
-  public String getTrainDataName() {
-    return trainDataName;
-  }
-
-  public void setTrainDataName(String trainDataName) {
-    this.trainDataName = trainDataName;
-  }
-
-  public String getTrainDataType() {
-    return trainDataType;
-  }
-
-  public void setTrainDataType(String trainDataType) {
-    this.trainDataType = trainDataType;
-  }
-
-  public Map<String, String> getTrainDataConfigs() {
-    return trainDataConfigs;
-  }
-
-  public void setTrainDataConfigs(Map<String, String> trainDataConfigs) {
-    this.trainDataConfigs = trainDataConfigs;
-  }
-
-  public String getTestDataName() {
-    return testDataName;
-  }
-
-  public void setTestDataName(String testDataName) {
-    this.testDataName = testDataName;
-  }
-
-  public String getTestDataType() {
-    return testDataType;
-  }
-
-  public void setTestDataType(String testDataType) {
-    this.testDataType = testDataType;
-  }
-
-  public Map<String, String> getTestDataConfigs() {
-    return testDataConfigs;
-  }
-
-  public void setTestDataConfigs(Map<String, String> testDataConfigs) {
-    this.testDataConfigs = testDataConfigs;
-  }
-
-  public Map<String, String> getMethodConfigs() {
-    return methodConfigs;
-  }
-
-  public void setMethodConfigs(Map<String, String> methodConfigs) {
-    this.methodConfigs = methodConfigs;
-  }
-
-  public int getTrainDataSize() {
-    return trainDataSize;
-  }
-
-  public void setTrainDataSize(int trainDataSize) {
-    this.trainDataSize = trainDataSize;
-  }
-
-  public int getTestDataSize() {
-    return testDataSize;
-  }
-
-  public void setTestDataSize(int testDataSize) {
-    this.testDataSize = testDataSize;
-  }
-
-  public List<String> getMethods() {
-    return methods;
-  }
-
-  public void setMethods(List<String> methods) {
-    this.methods = methods;
-  }
-}

Reply via email to