Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-ams c32eebf89 -> 27abaf2dd


http://git-wip-us.apache.org/repos/asf/ambari/blob/27abaf2d/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
 
b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
new file mode 100644
index 0000000..5ca7b17
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala
@@ -0,0 +1,67 @@
+package org.apache.ambari.metrics.spark
+
+import org.apache.ambari.metrics.alertservice.common.TimelineMetric
+import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel
+import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.rdd.RDD
+
+object SparkPhoenixReader {
+
+  def main(args: Array[String]) {
+
+    if (args.length < 6) {
+      System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> 
<hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>")
+      System.exit(1)
+    }
+
+    var metricName = args(0)
+    var appId = args(1)
+    var hostname = args(2)
+    var weight = args(3).toDouble
+    var timessdev = args(4).toInt
+    var phoenixConnectionString = args(5) 
//avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure
+    var modelDir = args(6)
+
+    val conf = new SparkConf()
+    conf.set("spark.app.name", "AMSAnomalyModelBuilder")
+    //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077")
+
+    var sc = new SparkContext(conf)
+    val sqlContext = new SQLContext(sc)
+
+    val currentTime = System.currentTimeMillis()
+    val oneDayBack = currentTime - 24*60*60*1000
+
+    val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> 
"METRIC_RECORD", "zkUrl" -> phoenixConnectionString))
+    df.registerTempTable("METRIC_RECORD")
+    val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, 
SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " +
+      "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + 
"' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND 
SERVER_TIME > " + oneDayBack)
+
+    var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double]
+    result.collect().foreach(
+      t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
+    )
+
+    //val metricName = result.head().getString(0)
+    //val hostname = result.head().getString(1)
+    //val appId = result.head().getString(2)
+
+    val timelineMetric = new TimelineMetric(metricName, appId, hostname, 
metricValues)
+
+    var emaModel = new EmaModel()
+    emaModel.train(timelineMetric, weight, timessdev)
+    emaModel.save(sc, modelDir)
+
+//    var metricData:Seq[Double] = Seq.empty
+//    result.collect().foreach(
+//      t => metricData :+ t.getDouble(4) / t.getInt(5)
+//    )
+//    val data: RDD[Double] = sc.parallelize(metricData)
+//    val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 
-> 0.1)
+//    val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/27abaf2d/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml 
b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index f9d7e19..ebd3ccc 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -663,6 +663,11 @@
       <version>1.0.0.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>org.apache.ambari</groupId>
+          <artifactId>ambari-metrics-alertservice</artifactId>
+          <version>2.5.1.0.0</version>
+      </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/ambari/blob/27abaf2d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index c8eb65f..9ebc64c 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
+import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -63,10 +64,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
@@ -84,6 +82,7 @@ public class HBaseTimelineMetricsService extends 
AbstractService implements Time
   private TimelineMetricMetadataManager metricMetadataManager;
   private Integer defaultTopNHostsLimit;
   private MetricCollectorHAController haController;
+  private AmsKafkaProducer kafkaProducer = new 
AmsKafkaProducer("104.196.85.21:6667");
 
   /**
    * Construct the service.
@@ -367,11 +366,43 @@ public class HBaseTimelineMetricsService extends 
AbstractService implements Time
     // Error indicated by the Sql exception
     TimelinePutResponse response = new TimelinePutResponse();
 
+    try {
+      if (!metrics.getMetrics().isEmpty() && 
metrics.getMetrics().get(0).getAppId().equals("HOST")) {
+        kafkaProducer.sendMetrics(fromTimelineMetrics(metrics));
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error(e);
+    }
     hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, 
metrics, false);
 
     return response;
   }
 
+
+  private org.apache.ambari.metrics.alertservice.common.TimelineMetrics 
fromTimelineMetrics(TimelineMetrics timelineMetrics) {
+    org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics 
= new org.apache.ambari.metrics.alertservice.common.TimelineMetrics();
+
+    List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> 
timelineMetricList = new ArrayList<>();
+    for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+      timelineMetricList.add(fromTimelineMetric(timelineMetric));
+    }
+    otherMetrics.setMetrics(timelineMetricList);
+    return otherMetrics;
+  }
+
+  private org.apache.ambari.metrics.alertservice.common.TimelineMetric 
fromTimelineMetric(TimelineMetric timelineMetric) {
+
+    org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = 
new org.apache.ambari.metrics.alertservice.common.TimelineMetric();
+    otherMetric.setMetricValues(timelineMetric.getMetricValues());
+    otherMetric.setStartTime(timelineMetric.getStartTime());
+    otherMetric.setHostName(timelineMetric.getHostName());
+    otherMetric.setInstanceId(timelineMetric.getInstanceId());
+    otherMetric.setAppId(timelineMetric.getAppId());
+    otherMetric.setMetricName(timelineMetric.getMetricName());
+
+    return otherMetric;
+  }
+
   @Override
   public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
       throws SQLException, IOException {

http://git-wip-us.apache.org/repos/asf/ambari/blob/27abaf2d/ambari-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml
index 02f9574..573c3b8 100644
--- a/ambari-metrics/pom.xml
+++ b/ambari-metrics/pom.xml
@@ -34,6 +34,8 @@
     <module>ambari-metrics-grafana</module>
     <module>ambari-metrics-assembly</module>
     <module>ambari-metrics-host-aggregator</module>
+    <module>ambari-metrics-alertservice</module>
+    <module>ambari-metrics-spark</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

Reply via email to