Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-ams 8cad9eb1a -> 63e743557


http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
new file mode 100644
index 0000000..9a102a0
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestRFunctionInvoker.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.common.ResultSet;
+import org.apache.ambari.metrics.alertservice.prototype.common.DataSeries;
+import 
org.apache.ambari.metrics.alertservice.seriesgenerator.UniformMetricSeries;
+import org.apache.commons.lang.ArrayUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class TestRFunctionInvoker {
+
+  private static String metricName = "TestMetric";
+  private static double[] ts;
+  private static String fullFilePath;
+
+  @BeforeClass
+  public static void init() throws URISyntaxException {
+
+    Assume.assumeTrue(System.getenv("R_HOME") != null);
+    ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+  }
+
+  @Test
+  public void testTukeys() throws URISyntaxException {
+
+    double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
+    double[] train_x = getRandomData(750);
+    DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
+
+    double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
+    double[] test_x = getRandomData(250);
+    test_x[50] = 5.5; //Anomaly
+    DataSeries testData = new DataSeries(metricName, test_ts, test_x);
+    Map<String, String> configs = new HashMap();
+    configs.put("tukeys.n", "3");
+
+    ResultSet rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+    Assert.assertEquals(rs.resultset.size(), 2);
+    Assert.assertEquals(rs.resultset.get(1)[0], 5.5, 0.1);
+
+  }
+
+  public static void main(String[] args) throws URISyntaxException {
+
+    String metricName = "TestMetric";
+    double[] ts = getTS(1000);
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    String fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+
+    double[] train_ts = ArrayUtils.subarray(ts, 0, 750);
+    double[] train_x = getRandomData(750);
+    DataSeries trainData = new DataSeries(metricName, train_ts, train_x);
+
+    double[] test_ts = ArrayUtils.subarray(ts, 750, 1000);
+    double[] test_x = getRandomData(250);
+    test_x[50] = 5.5; //Anomaly
+    DataSeries testData = new DataSeries(metricName, test_ts, test_x);
+    ResultSet rs;
+
+    Map<String, String> configs = new HashMap();
+
+    System.out.println("TUKEYS");
+    configs.put("tukeys.n", "3");
+    rs = RFunctionInvoker.tukeys(trainData, testData, configs);
+    rs.print();
+    System.out.println("--------------");
+
+//    System.out.println("EMA Global");
+//    configs.put("ema.n", "3");
+//    configs.put("ema.w", "0.8");
+//    rs = RFunctionInvoker.ema_global(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+//    System.out.println("EMA Daily");
+//    rs = RFunctionInvoker.ema_daily(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+//    configs.put("ks.p_value", "0.00005");
+//    System.out.println("KS Test");
+//    rs = RFunctionInvoker.ksTest(trainData, testData, configs);
+//    rs.print();
+//    System.out.println("--------------");
+//
+    ts = getTS(5000);
+    train_ts = ArrayUtils.subarray(ts, 0, 4800);
+    train_x = getRandomData(4800);
+    trainData = new DataSeries(metricName, train_ts, train_x);
+    test_ts = ArrayUtils.subarray(ts, 4800, 5000);
+    test_x = getRandomData(200);
+    for (int i = 0; i < 200; i++) {
+      test_x[i] = test_x[i] * 5;
+    }
+    testData = new DataSeries(metricName, test_ts, test_x);
+    configs.put("hsdev.n", "3");
+    configs.put("hsdev.nhp", "3");
+    configs.put("hsdev.interval", "86400000");
+    configs.put("hsdev.period", "604800000");
+    System.out.println("HSdev");
+    rs = RFunctionInvoker.hsdev(trainData, testData, configs);
+    rs.print();
+    System.out.println("--------------");
+
+  }
+
+  static double[] getTS(int n) {
+    long currentTime = System.currentTimeMillis();
+    double[] ts = new double[n];
+    currentTime = currentTime - (currentTime % (5 * 60 * 1000));
+
+    for (int i = 0, j = n - 1; i < n; i++, j--) {
+      ts[j] = currentTime;
+      currentTime = currentTime - (5 * 60 * 1000);
+    }
+    return ts;
+  }
+
+  static double[] getRandomData(int n) {
+
+    UniformMetricSeries metricSeries =  new UniformMetricSeries(10, 0.1,0.05, 
0.6, 0.8, true);
+    return metricSeries.getSeries(n);
+
+//    double[] metrics = new double[n];
+//    Random random = new Random();
+//    for (int i = 0; i < n; i++) {
+//      metrics[i] = random.nextDouble();
+//    }
+//    return metrics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
new file mode 100644
index 0000000..bb409cf
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/prototype/TestTukeys.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.prototype;
+
+import org.apache.ambari.metrics.alertservice.prototype.methods.MetricAnomaly;
+import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.TreeMap;
+
+public class TestTukeys {
+
+  @BeforeClass
+  public static void init() throws URISyntaxException {
+    Assume.assumeTrue(System.getenv("R_HOME") != null);
+  }
+
+  @Test
+  public void testPointInTimeDetectionSystem() throws UnknownHostException, 
URISyntaxException {
+
+    URL url = ClassLoader.getSystemResource("R-scripts");
+    String fullFilePath = new File(url.toURI()).getAbsolutePath();
+    RFunctionInvoker.setScriptsDir(fullFilePath);
+
+    MetricsCollectorInterface metricsCollectorInterface = new 
MetricsCollectorInterface("avijayan-ams-1.openstacklocal","http", "6188");
+
+    EmaTechnique ema = new EmaTechnique(0.5, 3);
+    long now = System.currentTimeMillis();
+
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("mm9");
+    metric1.setHostName(MetricsCollectorInterface.getDefaultLocalHostName());
+    metric1.setStartTime(now);
+    metric1.setAppId("aa9");
+    metric1.setInstanceId(null);
+    metric1.setType("Integer");
+
+    //Train
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+    //2hr data.
+    for (int i = 0; i < 120; i++) {
+      double metric = 20000 + Math.random();
+      metricValues.put(now - i * 60 * 1000, metric);
+    }
+    metric1.setMetricValues(metricValues);
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    timelineMetrics.addOrMergeTimelineMetric(metric1);
+
+    metricsCollectorInterface.emitMetrics(timelineMetrics);
+
+    List<MetricAnomaly> anomalyList = ema.test(metric1);
+    metricsCollectorInterface.publish(anomalyList);
+//
+//    PointInTimeADSystem pointInTimeADSystem = new PointInTimeADSystem(ema, 
metricsCollectorInterface, 3, 5*60*1000, 15*60*1000);
+//    pointInTimeADSystem.runOnce();
+//
+//    List<MetricAnomaly> anomalyList2 = ema.test(metric1);
+//
+//    pointInTimeADSystem.runOnce();
+//    List<MetricAnomaly> anomalyList3 = ema.test(metric1);
+//
+//    pointInTimeADSystem.runOnce();
+//    List<MetricAnomaly> anomalyList4 = ema.test(metric1);
+//
+//    pointInTimeADSystem.runOnce();
+//    List<MetricAnomaly> anomalyList5 = ema.test(metric1);
+//
+//    pointInTimeADSystem.runOnce();
+//    List<MetricAnomaly> anomalyList6 = ema.test(metric1);
+//
+//    Assert.assertTrue(anomalyList6.size() < anomalyList.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
new file mode 100644
index 0000000..575ea8b
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-alertservice/src/test/java/org/apache/ambari/metrics/alertservice/seriesgenerator/MetricSeriesGeneratorTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.alertservice.seriesgenerator;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.ambari.metrics.alertservice.prototype.MetricAnomalyDetectorTestInput;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetricSeriesGeneratorTest {
+
+  @Test
+  public void testUniformSeries() {
+
+    UniformMetricSeries metricSeries = new UniformMetricSeries(5, 0.2, 0, 0, 
0, true);
+    Assert.assertTrue(metricSeries.nextValue() <= 6 && 
metricSeries.nextValue() >= 4);
+
+    double[] uniformSeries = 
MetricSeriesGeneratorFactory.createUniformSeries(50, 10, 0.2, 0.1, 0.4, 0.5, 
true);
+    Assert.assertTrue(uniformSeries.length == 50);
+
+    for (int i = 0; i < uniformSeries.length; i++) {
+      double value = uniformSeries[i];
+
+      if (value > 10 * 1.2) {
+        Assert.assertTrue(value >= 10 * 1.4 && value <= 10 * 1.6);
+      } else {
+        Assert.assertTrue(value >= 10 * 0.8 && value <= 10 * 1.2);
+      }
+    }
+  }
+
+  @Test
+  public void testNormalSeries() {
+    NormalMetricSeries metricSeries = new NormalMetricSeries(0, 1, 0, 0, 0, 
true);
+    Assert.assertTrue(metricSeries.nextValue() <= 3 && 
metricSeries.nextValue() >= -3);
+  }
+
+  @Test
+  public void testMonotonicSeries() {
+
+    MonotonicMetricSeries metricSeries = new MonotonicMetricSeries(0, 0.5, 0, 
0, 0, 0, true);
+    Assert.assertTrue(metricSeries.nextValue() == 0);
+    Assert.assertTrue(metricSeries.nextValue() == 0.5);
+
+    double[] incSeries = 
MetricSeriesGeneratorFactory.createMonotonicSeries(20, 0, 0.5, 0, 0, 0, 0, 
true);
+    Assert.assertTrue(incSeries.length == 20);
+    for (int i = 0; i < incSeries.length; i++) {
+      Assert.assertTrue(incSeries[i] == i * 0.5);
+    }
+  }
+
+  @Test
+  public void testDualBandSeries() {
+    double[] dualBandSeries = 
MetricSeriesGeneratorFactory.getDualBandSeries(30, 5, 0.2, 5, 15, 0.3, 4);
+    Assert.assertTrue(dualBandSeries[0] >= 4 && dualBandSeries[0] <= 6);
+    Assert.assertTrue(dualBandSeries[4] >= 4 && dualBandSeries[4] <= 6);
+    Assert.assertTrue(dualBandSeries[5] >= 10.5 && dualBandSeries[5] <= 19.5);
+    Assert.assertTrue(dualBandSeries[8] >= 10.5 && dualBandSeries[8] <= 19.5);
+    Assert.assertTrue(dualBandSeries[9] >= 4 && dualBandSeries[9] <= 6);
+  }
+
+  @Test
+  public void testStepSeries() {
+    double[] stepSeries = 
MetricSeriesGeneratorFactory.getStepFunctionSeries(30, 10, 0, 0, 5, 5, 0.5, 
true);
+
+    Assert.assertTrue(stepSeries[0] == 10);
+    Assert.assertTrue(stepSeries[4] == 10);
+
+    Assert.assertTrue(stepSeries[5] == 10*1.5);
+    Assert.assertTrue(stepSeries[9] == 10*1.5);
+
+    Assert.assertTrue(stepSeries[10] == 10*1.5*1.5);
+    Assert.assertTrue(stepSeries[14] == 10*1.5*1.5);
+  }
+
+  @Test
+  public void testSteadySeriesWithTurbulence() {
+    double[] steadySeriesWithTurbulence = 
MetricSeriesGeneratorFactory.getSteadySeriesWithTurbulentPeriod(30, 5, 0, 1, 1, 
5, 1);
+
+    int count = 0;
+    for (int i = 0; i < steadySeriesWithTurbulence.length; i++) {
+      if (steadySeriesWithTurbulence[i] == 10) {
+        count++;
+      }
+    }
+    Assert.assertTrue(count == 5);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 1f03fe9..3dfcf4e 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
@@ -34,11 +35,11 @@ import org.codehaus.jackson.map.annotate.JsonDeserialize;
 @XmlAccessorType(XmlAccessType.NONE)
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public class TimelineMetric implements Comparable<TimelineMetric> {
+public class TimelineMetric implements Comparable<TimelineMetric>, 
Serializable {
 
   private String metricName;
   private String appId;
-  private String instanceId;
+  private String instanceId = null;
   private String hostName;
   private long startTime;
   private String type;

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
index 0c5965c..a8d3da8 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetrics.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 @XmlAccessorType(XmlAccessType.NONE)
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public class TimelineMetrics {
+public class TimelineMetrics implements Serializable{
 
   private List<TimelineMetric> allMetrics = new ArrayList<TimelineMetric>();
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
 
b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
index bff094b..e51a47f 100644
--- 
a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
+++ 
b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/MetricAnomalyDetector.scala
@@ -21,13 +21,13 @@ import java.util
 import java.util.logging.LogManager
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import 
org.apache.ambari.metrics.alertservice.prototype.MetricsCollectorInterface
 import org.apache.spark.SparkConf
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.kafka._
-import org.apache.ambari.metrics.alertservice.common.{MetricAnomaly, 
TimelineMetrics}
-import org.apache.ambari.metrics.alertservice.methods.MetricAnomalyModel
-import org.apache.ambari.metrics.alertservice.methods.ema.{EmaModel, 
EmaModelLoader}
-import org.apache.ambari.metrics.alertservice.spark.AnomalyMetricPublisher
+import 
org.apache.ambari.metrics.alertservice.prototype.methods.{AnomalyDetectionTechnique,
 MetricAnomaly}
+import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.{EmaModelLoader, 
EmaTechnique}
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
 import org.apache.log4j.Logger
 import org.apache.spark.storage.StorageLevel
 
@@ -41,7 +41,7 @@ object MetricAnomalyDetector extends Logging {
   var groupId = "ambari-metrics-group"
   var topicName = "ambari-metrics-topic"
   var numThreads = 1
-  val anomalyDetectionModels: Array[MetricAnomalyModel] = 
Array[MetricAnomalyModel]()
+  val anomalyDetectionModels: Array[AnomalyDetectionTechnique] = 
Array[AnomalyDetectionTechnique]()
 
   def main(args: Array[String]): Unit = {
 
@@ -54,7 +54,7 @@ object MetricAnomalyDetector extends Logging {
     }
 
     for (method <- args(0).split(",")) {
-      if (method == "ema") anomalyDetectionModels :+ new EmaModel()
+      if (method == "ema") anomalyDetectionModels :+ new EmaTechnique(0.5, 3)
     }
 
     val appIds = util.Arrays.asList(args(1).split(","))
@@ -63,7 +63,7 @@ object MetricAnomalyDetector extends Logging {
     val collectorPort = args(3)
     val collectorProtocol = args(4)
 
-    val anomalyMetricPublisher: AnomalyMetricPublisher = new 
AnomalyMetricPublisher(collectorHost, collectorProtocol, collectorPort)
+    val anomalyMetricPublisher: MetricsCollectorInterface = new 
MetricsCollectorInterface(collectorHost, collectorProtocol, collectorPort)
 
     val sparkConf = new SparkConf().setAppName("AmbariMetricsAnomalyDetector")
 
@@ -99,10 +99,6 @@ object MetricAnomalyDetector extends Logging {
         for (timelineMetric <- timelineMetrics.getMetrics) {
           var anomalies = emaModel.test(timelineMetric)
           anomalyMetricPublisher.publish(anomalies)
-          for (anomaly <- anomalies) {
-            var an = anomaly : MetricAnomaly
-            logger.info(an.getAnomalyAsString)
-          }
         }
       })
     })

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
 
b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
index 3c8e1ed..edd6366 100644
--- 
a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
+++ 
b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
@@ -17,8 +17,8 @@
 
 package org.apache.ambari.metrics.spark
 
-import org.apache.ambari.metrics.alertservice.common.TimelineMetric
-import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel
+import 
org.apache.ambari.metrics.alertservice.prototype.methods.ema.EmaTechnique
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric
 import org.apache.spark.mllib.stat.Statistics
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{SparkConf, SparkContext}
@@ -61,15 +61,19 @@ object SparkPhoenixReader {
       t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
     )
 
-    //val metricName = result.head().getString(0)
+    //val seriesName = result.head().getString(0)
     //val hostname = result.head().getString(1)
     //val appId = result.head().getString(2)
 
-    val timelineMetric = new TimelineMetric(metricName, appId, hostname, 
metricValues)
+    val timelineMetric = new TimelineMetric()
+    timelineMetric.setMetricName(metricName)
+    timelineMetric.setAppId(appId)
+    timelineMetric.setHostName(hostname)
+    timelineMetric.setMetricValues(metricValues)
 
-    var emaModel = new EmaModel()
-    emaModel.train(timelineMetric, weight, timessdev)
-    emaModel.save(sc, modelDir)
+//    var emaModel = new EmaTechnique()
+//    emaModel.train(timelineMetric, weight, timessdev)
+//    emaModel.save(sc, modelDir)
 
 //    var metricData:Seq[Double] = Seq.empty
 //    result.collect().foreach(

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml 
b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index 161b35b..ad849bc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -344,7 +344,7 @@
     <dependency>
       <groupId>org.apache.ambari</groupId>
       <artifactId>ambari-metrics-alertservice</artifactId>
-      <version>2.0.0.0-SNAPSHOT</version>
+      <version>${project.version}</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index 7138a96..d83902f 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -401,30 +401,6 @@ public class HBaseTimelineMetricsService extends 
AbstractService implements Time
   }
 
 
-  private org.apache.ambari.metrics.alertservice.common.TimelineMetrics 
fromTimelineMetrics(TimelineMetrics timelineMetrics) {
-    org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics 
= new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
-
-    List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> 
timelineMetricList = new ArrayList<>();
-    for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
-      timelineMetricList.add(fromTimelineMetric(timelineMetric));
-    }
-    otherMetrics.setMetrics(timelineMetricList);
-    return otherMetrics;
-  }
-
-  private org.apache.ambari.metrics.alertservice.common.TimelineMetric 
fromTimelineMetric(TimelineMetric timelineMetric) {
-
-    org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = 
new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
-    otherMetric.setMetricValues(timelineMetric.getMetricValues());
-    otherMetric.setStartTime(timelineMetric.getStartTime());
-    otherMetric.setHostName(timelineMetric.getHostName());
-    otherMetric.setInstanceId(timelineMetric.getInstanceId());
-    otherMetric.setAppId(timelineMetric.getAppId());
-    otherMetric.setMetricName(timelineMetric.getMetricName());
-
-    return otherMetric;
-  }
-
   @Override
   public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
       throws SQLException, IOException {

http://git-wip-us.apache.org/repos/asf/ambari/blob/63e74355/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
index 0ba6fb5..785b36b 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/MetricsPaddingMethodTest.java
@@ -39,7 +39,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 1000, 1.0d);
     inputValues.put(now - 2000, 2.0d);
@@ -67,7 +66,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 1000, 1.0d);
     inputValues.put(now - 2000, 2.0d);
@@ -95,7 +93,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now, 0.0d);
     inputValues.put(now - 1000, 1.0d);
@@ -123,7 +120,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 1000, 1.0d);
     timelineMetric.setMetricValues(inputValues);
@@ -149,7 +145,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 1000, 1.0d);
     timelineMetric.setMetricValues(inputValues);
@@ -173,7 +168,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
 
     long seconds = 1000;
@@ -234,7 +228,6 @@ public class MetricsPaddingMethodTest {
     timelineMetric.setMetricName("m1");
     timelineMetric.setHostName("h1");
     timelineMetric.setAppId("a1");
-    timelineMetric.setTimestamp(now);
     TreeMap<Long, Double> inputValues = new TreeMap<>();
     inputValues.put(now - 100, 1.0d);
     inputValues.put(now - 200, 2.0d);

Reply via email to