Repository: ambari Updated Branches: refs/heads/branch-3.0-ams c32eebf89 -> 27abaf2dd
http://git-wip-us.apache.org/repos/asf/ambari/blob/27abaf2d/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala new file mode 100644 index 0000000..5ca7b17 --- /dev/null +++ b/ambari-metrics/ambari-metrics-spark/src/main/scala/org/apache/ambari/metrics/spark/SparkPhoenixReader.scala @@ -0,0 +1,67 @@ +package org.apache.ambari.metrics.spark + +import org.apache.ambari.metrics.alertservice.common.TimelineMetric +import org.apache.ambari.metrics.alertservice.methods.ema.EmaModel +import org.apache.spark.mllib.stat.Statistics +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.rdd.RDD + +object SparkPhoenixReader { + + def main(args: Array[String]) { + + if (args.length < 6) { + System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>") + System.exit(1) + } + + var metricName = args(0) + var appId = args(1) + var hostname = args(2) + var weight = args(3).toDouble + var timessdev = args(4).toInt + var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure + var modelDir = args(6) + + val conf = new SparkConf() + conf.set("spark.app.name", "AMSAnomalyModelBuilder") + //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077") + + var sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + + val currentTime = System.currentTimeMillis() + val oneDayBack = currentTime - 24*60*60*1000 + + val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString)) + df.registerTempTable("METRIC_RECORD") + val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " + + "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack) + + var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double] + result.collect().foreach( + t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5)) + ) + + //val metricName = result.head().getString(0) + //val hostname = result.head().getString(1) + //val appId = result.head().getString(2) + + val timelineMetric = new TimelineMetric(metricName, appId, hostname, metricValues) + + var emaModel = new EmaModel() + emaModel.train(timelineMetric, weight, timessdev) + emaModel.save(sc, modelDir) + +// var metricData:Seq[Double] = Seq.empty +// result.collect().foreach( +// t => metricData :+ t.getDouble(4) / t.getInt(5) +// ) +// val data: RDD[Double] = sc.parallelize(metricData) +// val myCDF = Map(0.1 -> 0.2, 0.15 -> 0.6, 0.2 -> 0.05, 0.3 -> 0.05, 0.25 -> 0.1) +// val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF) + + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/27abaf2d/ambari-metrics/ambari-metrics-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index f9d7e19..ebd3ccc 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -663,6 +663,11 @@ <version>1.0.0.0-SNAPSHOT</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-alertservice</artifactId> + <version>2.5.1.0.0</version> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/ambari/blob/27abaf2d/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index c8eb65f..9ebc64c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; +import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -63,10 +64,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; @@ -84,6 +82,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time private TimelineMetricMetadataManager metricMetadataManager; private Integer defaultTopNHostsLimit; private MetricCollectorHAController haController; + private AmsKafkaProducer kafkaProducer = new AmsKafkaProducer("104.196.85.21:6667"); /** * Construct the service. @@ -367,11 +366,43 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time // Error indicated by the Sql exception TimelinePutResponse response = new TimelinePutResponse(); + try { + if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) { + kafkaProducer.sendMetrics(fromTimelineMetrics(metrics)); + } + } catch (InterruptedException | ExecutionException e) { + LOG.error(e); + } hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false); return response; } + + private org.apache.ambari.metrics.alertservice.common.TimelineMetrics fromTimelineMetrics(TimelineMetrics timelineMetrics) { + org.apache.ambari.metrics.alertservice.common.TimelineMetrics otherMetrics = new org.apache.ambari.metrics.alertservice.common.TimelineMetrics(); + + List<org.apache.ambari.metrics.alertservice.common.TimelineMetric> timelineMetricList = new ArrayList<>(); + for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { + timelineMetricList.add(fromTimelineMetric(timelineMetric)); + } + otherMetrics.setMetrics(timelineMetricList); + return otherMetrics; + } + + private org.apache.ambari.metrics.alertservice.common.TimelineMetric fromTimelineMetric(TimelineMetric timelineMetric) { + + org.apache.ambari.metrics.alertservice.common.TimelineMetric otherMetric = new org.apache.ambari.metrics.alertservice.common.TimelineMetric(); + otherMetric.setMetricValues(timelineMetric.getMetricValues()); + otherMetric.setStartTime(timelineMetric.getStartTime()); + otherMetric.setHostName(timelineMetric.getHostName()); + otherMetric.setInstanceId(timelineMetric.getInstanceId()); + otherMetric.setAppId(timelineMetric.getAppId()); + otherMetric.setMetricName(timelineMetric.getMetricName()); + + return otherMetric; + } + @Override public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) throws SQLException, IOException { http://git-wip-us.apache.org/repos/asf/ambari/blob/27abaf2d/ambari-metrics/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/pom.xml b/ambari-metrics/pom.xml index 02f9574..573c3b8 100644 --- a/ambari-metrics/pom.xml +++ b/ambari-metrics/pom.xml @@ -34,6 +34,8 @@ <module>ambari-metrics-grafana</module> <module>ambari-metrics-assembly</module> <module>ambari-metrics-host-aggregator</module> + <module>ambari-metrics-alertservice</module> + <module>ambari-metrics-spark</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
