AMBARI-21128 Add AMS HA support to local metrics aggregator application (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d496b4ea Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d496b4ea Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d496b4ea Branch: refs/heads/branch-3.0-ams Commit: d496b4ea5fed99d3d6c8963e1f726d9cfecea69a Parents: 041e4e9 Author: Dmytro Sen <[email protected]> Authored: Fri Jun 9 14:36:11 2017 +0300 Committer: Dmytro Sen <[email protected]> Committed: Wed Jun 21 14:53:00 2017 +0300 ---------------------------------------------------------------------- .../timeline/AbstractTimelineMetricsSink.java | 4 +- .../ambari-metrics-host-aggregator/pom.xml | 30 +++- .../AbstractMetricPublisherThread.java | 134 --------------- .../aggregator/AggregatedMetricsPublisher.java | 101 ----------- .../host/aggregator/AggregatorApplication.java | 98 +++++++---- .../host/aggregator/AggregatorWebService.java | 2 +- .../host/aggregator/RawMetricsPublisher.java | 60 ------- .../host/aggregator/TimelineMetricsHolder.java | 26 ++- .../sink/timeline/AbstractMetricPublisher.java | 169 +++++++++++++++++++ .../timeline/AggregatedMetricsPublisher.java | 103 +++++++++++ .../sink/timeline/RawMetricsPublisher.java | 65 +++++++ .../aggregator/AggregatorApplicationTest.java | 55 ++++++ .../aggregator/AggregatorWebServiceTest.java | 135 +++++++++++++++ .../aggregator/TimelineMetricsHolderTest.java | 107 ++++++++++++ .../timeline/AbstractMetricPublisherTest.java | 82 +++++++++ .../AggregatedMetricsPublisherTest.java | 154 +++++++++++++++++ .../sink/timeline/RawMetricsPublisherTest.java | 151 +++++++++++++++++ .../src/main/python/core/aggregator.py | 6 +- .../src/main/python/core/controller.py | 2 +- 19 files changed, 1133 insertions(+), 351 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index a8dc571..a0e59d6 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -84,7 +84,7 @@ public abstract class AbstractTimelineMetricsSink { public static final String INSTANCE_ID_PROPERTY = "instanceId"; public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId"; - protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0); + protected final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0); public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100; protected static final AtomicInteger nullCollectorCounter = new AtomicInteger(0); public static int NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS = 20; @@ -115,7 +115,7 @@ public abstract class AbstractTimelineMetricsSink { private volatile boolean isInitializedForHA = false; @SuppressWarnings("all") - private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5; + private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 3; private final Gson gson = new Gson(); http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml index 0598bef..24432dd 100644 --- a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml +++ b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml @@ -38,12 +38,6 @@ <dependencies> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>3.8.1</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>14.0.1</version> @@ -83,6 +77,30 @@ <artifactId>hadoop-common</artifactId> <version>2.7.1.2.3.4.0-3347</version> </dependency> + <dependency> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-core</artifactId> + <version>1.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + <version>1.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <version>3.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.2</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java deleted file mode 100644 index b1f60fa..0000000 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.host.aggregator; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.codehaus.jackson.map.AnnotationIntrospector; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.annotate.JsonSerialize; -import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; - -import java.io.IOException; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Map; - -/** - * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals. - */ -public abstract class AbstractMetricPublisherThread extends Thread { - protected int publishIntervalInSeconds; - protected String publishURL; - protected ObjectMapper objectMapper; - private Log LOG; - protected TimelineMetricsHolder timelineMetricsHolder; - - public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) { - LOG = LogFactory.getLog(this.getClass()); - this.publishURL = publishURL; - this.publishIntervalInSeconds = publishIntervalInSeconds; - this.timelineMetricsHolder = timelineMetricsHolder; - objectMapper = new ObjectMapper(); - AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); - objectMapper.setAnnotationIntrospector(introspector); - objectMapper.getSerializationConfig() - .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); - } - - /** - * Publishes metrics to collector in specified intervals while not interrupted. - */ - @Override - public void run() { - while (!isInterrupted()) { - try { - sleep(this.publishIntervalInSeconds * 1000); - } catch (InterruptedException e) { - //Ignore - } - try { - processAndPublishMetrics(getMetricsFromCache()); - } catch (Exception e) { - LOG.error("Couldn't process and send metrics : ",e); - } - } - } - - /** - * Processes and sends metrics to collector. - * @param metricsFromCache - * @throws Exception - */ - protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception { - if (metricsFromCache.size()==0) return; - - LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size())); - publishMetricsJson(processMetrics(metricsFromCache)); - } - - /** - * Returns metrics map. Source is based on implementation. - * @return - */ - protected abstract Map<Long,TimelineMetrics> getMetricsFromCache(); - - /** - * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector - * @param metricValues - * @return - */ - protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues); - - protected void publishMetricsJson(String jsonData) throws Exception { - int timeout = 5 * 1000; - HttpURLConnection connection = null; - if (this.publishURL == null) { - throw new IOException("Unknown URL. Unable to connect to metrics collector."); - } - LOG.info("Collector URL : " + publishURL); - connection = (HttpURLConnection) new URL(this.publishURL).openConnection(); - - connection.setRequestMethod("POST"); - connection.setRequestProperty("Content-Type", "application/json"); - connection.setRequestProperty("Connection", "Keep-Alive"); - connection.setConnectTimeout(timeout); - connection.setReadTimeout(timeout); - connection.setDoOutput(true); - - if (jsonData != null) { - try (OutputStream os = connection.getOutputStream()) { - os.write(jsonData.getBytes("UTF-8")); - } - } - int responseCode = connection.getResponseCode(); - if (responseCode != 200) { - throw new Exception("responseCode is " + responseCode); - } - LOG.info("Successfully sent metrics."); - } - - /** - * Interrupts the thread. - */ - protected void stopPublisher() { - this.interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java deleted file mode 100644 index 0540ec9..0000000 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.host.aggregator; - - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; -import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -/** - * Thread that aggregates and publishes metrics to collector on specified interval. - */ -public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread { - - private Log LOG; - - public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) { - super(timelineMetricsHolder, collectorURL, interval); - LOG = LogFactory.getLog(this.getClass()); - } - - /** - * get metrics map form @TimelineMetricsHolder - * @return - */ - @Override - protected Map<Long, TimelineMetrics> getMetricsFromCache() { - return timelineMetricsHolder.extractMetricsForAggregationPublishing(); - } - - /** - * Aggregates given metrics and converts them into json string that will be send to collector - * @param metricForAggregationValues - * @return - */ - @Override - protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) { - HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>(); - for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) { - for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { - if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) { - nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics()); - } - nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric); - } - } - Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>(); - for (TimelineMetrics metrics : nameToMetricMap.values()) { - double sum = 0; - double max = Integer.MIN_VALUE; - double min = Integer.MAX_VALUE; - int count = 0; - for (TimelineMetric metric : metrics.getMetrics()) { - for (Double value : metric.getMetricValues().values()) { - sum+=value; - max = Math.max(max, value); - min = Math.min(min, value); - count++; - } - } - TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0)); - tmpMetric.setMetricValues(new TreeMap<Long, Double>()); - metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min))); - } - String json = null; - try { - json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis())); - LOG.debug(json); - } catch (Exception e) { - LOG.error("Failed to convert result into json", e); - } - - return json; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java index c6b703b..1e5cc82 100644 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java @@ -33,6 +33,9 @@ import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher; +import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher; +import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher; /** * WEB application with 2 publisher threads that processes received metrics and submits results to the collector @@ -40,24 +43,25 @@ import org.apache.hadoop.conf.Configuration; public class AggregatorApplication { private static final int STOP_SECONDS_DELAY = 0; - private static final int JOIN_SECONDS_TIMEOUT = 2; - private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics"; - private static String AGGREGATED_POST_PREFIX = "/aggregated"; + private static final int JOIN_SECONDS_TIMEOUT = 5; private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml"; - private static Log LOG = LogFactory.getLog("AggregatorApplication.class"); + private Log LOG; private final int webApplicationPort; private final int rawPublishingInterval; private final int aggregationInterval; private Configuration configuration; - private String [] collectorHosts; - private AggregatedMetricsPublisher aggregatePublisher; - private RawMetricsPublisher rawPublisher; + private Thread aggregatePublisherThread; + private Thread rawPublisherThread; private TimelineMetricsHolder timelineMetricsHolder; private HttpServer httpServer; - public AggregatorApplication(String collectorHosts) { + public AggregatorApplication(String hostname, String collectorHosts) { + LOG = LogFactory.getLog(this.getClass()); + configuration = new Configuration(true); initConfiguration(); - this.collectorHosts = collectorHosts.split(","); + configuration.set("timeline.metrics.collector.hosts", collectorHosts); + configuration.set("timeline.metrics.hostname", hostname); + configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration()); this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300); this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60); this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888); @@ -70,7 +74,13 @@ public class AggregatorApplication } } - private void initConfiguration() { + private String getZkQuorumFromConfiguration() { + String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181"); + String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", ""); + return getZkConnectionUrl(zkClientPort, zkServerHosts); + } + + protected void initConfiguration() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (classLoader == null) { classLoader = getClass().getClassLoader(); @@ -82,7 +92,7 @@ public class AggregatorApplication throw new IllegalStateException("Unable to initialize the metrics " + "subsystem. No ams-site present in the classpath."); } - configuration = new Configuration(true); + try { configuration.addResource(amsResUrl.toURI().toURL()); } catch (Exception e) { @@ -91,7 +101,7 @@ public class AggregatorApplication } } - private String getHostName() { + protected String getHostName() { String hostName = "localhost"; try { hostName = InetAddress.getLocalHost().getCanonicalHostName(); @@ -101,13 +111,13 @@ public class AggregatorApplication return hostName; } - private URI getURI() { + protected URI getURI() { URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build(); LOG.info(String.format("Web server at %s", uri)); return uri; } - private HttpServer createHttpServer() throws IOException { + protected HttpServer createHttpServer() throws IOException { ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator"); HashMap<String, Object> params = new HashMap(); params.put("com.sun.jersey.api.json.POJOMappingFeature", "true"); @@ -122,29 +132,30 @@ public class AggregatorApplication private void startAggregatePublisherThread() { LOG.info("Starting aggregated metrics publisher."); - String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX; - aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval); - aggregatePublisher.start(); + AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval); + aggregatePublisherThread = new Thread(metricPublisher); + aggregatePublisherThread.start(); } private void startRawPublisherThread() { LOG.info("Starting raw metrics publisher."); - String collectorURL = buildBasicCollectorURL(collectorHosts[0]); - rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval); - rawPublisher.start(); + AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval); + rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher); + aggregatePublisherThread.start(); } private void stop() { - aggregatePublisher.stopPublisher(); - rawPublisher.stopPublisher(); + LOG.info("Stopping aggregator application"); + aggregatePublisherThread.interrupt(); + rawPublisherThread.interrupt(); httpServer.stop(STOP_SECONDS_DELAY); LOG.info("Stopped web server."); try { LOG.info("Waiting for threads to join."); - aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000); - rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000); + aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000); + rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000); LOG.info("Gracefully stopped Aggregator Application."); } catch (InterruptedException e) { LOG.error("Received exception during stop : ", e); @@ -153,28 +164,43 @@ public class AggregatorApplication } - private String buildBasicCollectorURL(String host) { - String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1]; - String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https"; - return String.format(BASE_POST_URL, protocol, host, port); + private String getZkConnectionUrl(String zkClientPort, String zkQuorum) { + StringBuilder sb = new StringBuilder(); + String[] quorumParts = zkQuorum.split(","); + String prefix = ""; + for (String part : quorumParts) { + sb.append(prefix); + sb.append(part.trim()); + if (!part.contains(":")) { + sb.append(":"); + sb.append(zkClientPort); + } + prefix = ","; + } + return sb.toString(); } public static void main( String[] args ) throws Exception { - LOG.info("Starting aggregator application"); - if (args.length != 1) { - throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma"); + if (args.length != 2) { + throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " + + "2nd - collector hosts separated with coma"); } - final AggregatorApplication app = new AggregatorApplication(args[0]); - app.startAggregatePublisherThread(); - app.startRawPublisherThread(); - app.startWebServer(); + final AggregatorApplication app = new AggregatorApplication(args[0], args[1]); + + app.startWebServerAndPublishersThreads(); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { - LOG.info("Stopping aggregator application"); app.stop(); } }); } + + private void startWebServerAndPublishersThreads() { + LOG.info("Starting aggregator application"); + startAggregatePublisherThread(); + startRawPublisherThread(); + startWebServer(); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java index f96d0ed..b151209 100644 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java @@ -39,7 +39,7 @@ public class AggregatorWebService { @GET @Produces("text/json") @Path("/metrics") - public Response helloWorld() throws IOException { + public Response getOkResponse() throws IOException { return Response.ok().build(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java deleted file mode 100644 index f317ed9..0000000 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.host.aggregator; - - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; - -import java.util.Map; - -public class RawMetricsPublisher extends AbstractMetricPublisherThread { - private final Log LOG; - - public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) { - super(timelineMetricsHolder, collectorURL, interval); - LOG = LogFactory.getLog(this.getClass()); - } - - - @Override - protected Map<Long, TimelineMetrics> getMetricsFromCache() { - return timelineMetricsHolder.extractMetricsForRawPublishing(); - } - - @Override - protected String processMetrics(Map<Long, TimelineMetrics> metricValues) { - //merge everything in one TimelineMetrics object - TimelineMetrics timelineMetrics = new TimelineMetrics(); - for (TimelineMetrics metrics : metricValues.values()) { - for (TimelineMetric timelineMetric : metrics.getMetrics()) - timelineMetrics.addOrMergeTimelineMetric(timelineMetric); - } - //map TimelineMetrics to json string - String json = null; - try { - json = objectMapper.writeValueAsString(timelineMetrics); - LOG.debug(json); - } catch (Exception e) { - LOG.error("Failed to convert result into json", e); - } - return json; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java index b355c97..03b6542 100644 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java @@ -19,8 +19,10 @@ package org.apache.hadoop.metrics2.host.aggregator; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -33,8 +35,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class TimelineMetricsHolder { private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60; private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300; - private Cache<Long, TimelineMetrics> aggregationMetricsCache; - private Cache<Long, TimelineMetrics> rawMetricsCache; + private Cache<String, TimelineMetrics> aggregationMetricsCache; + private Cache<String, TimelineMetrics> rawMetricsCache; private static TimelineMetricsHolder instance = null; //to ensure no metric values are expired private static int EXPIRE_DELAY = 30; @@ -63,21 +65,29 @@ public class TimelineMetricsHolder { public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) { aggregationCacheLock.writeLock().lock(); - aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics); + aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics); aggregationCacheLock.writeLock().unlock(); } - public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() { + private String calculateCacheKey(TimelineMetrics timelineMetrics) { + List<TimelineMetric> metrics = timelineMetrics.getMetrics(); + if (metrics.size() > 0) { + return metrics.get(0).getAppId() + System.currentTimeMillis(); + } + return String.valueOf(System.currentTimeMillis()); + } + + public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() { return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock); } public void putMetricsForRawPublishing(TimelineMetrics metrics) { rawCacheLock.writeLock().lock(); - rawMetricsCache.put(System.currentTimeMillis(), metrics); + rawMetricsCache.put(calculateCacheKey(metrics), metrics); rawCacheLock.writeLock().unlock(); } - public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() { + public Map<String, TimelineMetrics> extractMetricsForRawPublishing() { return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock); } @@ -87,9 +97,9 @@ public class TimelineMetricsHolder { * @param lock * @return */ - private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) { + private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) { lock.writeLock().lock(); - Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap()); + Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap()); cache.invalidateAll(); lock.writeLock().unlock(); return metricsMap; http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java new file mode 100644 index 0000000..5af115f --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; + +import java.util.Collection; +import java.util.Map; + +/** + * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals. + */ +public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable { + + private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location"; + private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password"; + private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type"; + private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy"; + private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address"; + private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts"; + private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum"; + private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname"; + protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics"; + protected int publishIntervalInSeconds; + private Log LOG; + protected TimelineMetricsHolder timelineMetricsHolder; + protected Configuration configuration; + private String collectorProtocol; + private String collectorPort; + private Collection<String> collectorHosts; + private String hostname; + private String zkQuorum; + + public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) { + LOG = LogFactory.getLog(this.getClass()); + this.configuration = configuration; + this.publishIntervalInSeconds = publishIntervalInSeconds; + this.timelineMetricsHolder = timelineMetricsHolder; + configure(); + } + + protected void configure() { + collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https"; + collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1]; + collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, "")); + zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, ""); + hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost"); + collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, "")); + if (collectorHosts.isEmpty()) { + LOG.error("No Metric collector configured."); + } else { + if (collectorProtocol.contains("https")) { + String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim(); + String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim(); + String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); + loadTruststore(trustStorePath, trustStoreType, trustStorePwd); + } + } + } + + /** + * Publishes metrics to collector in specified intervals while not interrupted. + */ + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(this.publishIntervalInSeconds * 1000); + } catch (InterruptedException e) { + //Ignore + } + try { + processAndPublishMetrics(getMetricsFromCache()); + } catch (Exception e) { + //ignore + } + } + } + + /** + * Processes and sends metrics to collector. + * @param metricsFromCache + * @throws Exception + */ + protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception { + if (metricsFromCache.size()==0) return; + + LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size())); + emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache)); + } + + /** + * Returns metrics map. Source is based on implementation. + * @return + */ + protected abstract Map<String,TimelineMetrics> getMetricsFromCache(); + + /** + * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector + * @param metricValues + * @return + */ + protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues); + + protected abstract String getPostUrl(); + + @Override + protected String getCollectorUri(String host) { + return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort()); + } + + @Override + protected String getCollectorProtocol() { + return collectorProtocol; + } + + @Override + protected String getCollectorPort() { + return collectorPort; + } + + @Override + protected int getTimeoutSeconds() { + return DEFAULT_POST_TIMEOUT_SECONDS; + } + + @Override + protected String getZookeeperQuorum() { + return zkQuorum; + } + + @Override + protected Collection<String> getConfiguredCollectorHosts() { + return collectorHosts; + } + + @Override + protected String getHostname() { + return hostname; + } + + @Override + protected boolean isHostInMemoryAggregationEnabled() { + return false; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java new file mode 100644 index 0000000..c8dffab --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * Thread that aggregates and publishes metrics to collector on specified interval. + */ +public class AggregatedMetricsPublisher extends AbstractMetricPublisher { + private static String AGGREGATED_POST_PREFIX = "/aggregated"; + private Log LOG; + + public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) { + super(timelineMetricsHolder, configuration, interval); + LOG = LogFactory.getLog(this.getClass()); + } + + /** + * get metrics map form @TimelineMetricsHolder + * @return + */ + @Override + protected Map<String, TimelineMetrics> getMetricsFromCache() { + return timelineMetricsHolder.extractMetricsForAggregationPublishing(); + } + + /** + * Aggregates given metrics and converts them into json string that will be send to collector + * @param metricForAggregationValues + * @return + */ + @Override + protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) { + HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>(); + for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) { + for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { + if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) { + nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics()); + } + nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric); + } + } + Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>(); + for (TimelineMetrics metrics : nameToMetricMap.values()) { + double sum = 0; + double max = Integer.MIN_VALUE; + double min = Integer.MAX_VALUE; + int count = 0; + for (TimelineMetric metric : metrics.getMetrics()) { + for (Double value : metric.getMetricValues().values()) { + sum+=value; + max = Math.max(max, value); + min = Math.min(min, value); + count++; + } + } + TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0)); + tmpMetric.setMetricValues(new TreeMap<Long, Double>()); + metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min))); + } + String json = null; + try { + json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis())); + LOG.debug(json); + } catch (Exception e) { + LOG.error("Failed to convert result into json", e); + } + + return json; + } + + @Override + protected String getPostUrl() { + return BASE_POST_URL + AGGREGATED_POST_PREFIX; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java new file mode 100644 index 0000000..89addb7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; + +import java.util.Map; + +public class RawMetricsPublisher extends AbstractMetricPublisher { + private final Log LOG; + + public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) { + super(timelineMetricsHolder, configuration, interval); + LOG = LogFactory.getLog(this.getClass()); + } + + + @Override + protected Map<String, TimelineMetrics> getMetricsFromCache() { + return timelineMetricsHolder.extractMetricsForRawPublishing(); + } + + @Override + protected String processMetrics(Map<String, TimelineMetrics> metricValues) { + //merge everything in one TimelineMetrics object + TimelineMetrics timelineMetrics = new TimelineMetrics(); + for (TimelineMetrics metrics : metricValues.values()) { + for (TimelineMetric timelineMetric : metrics.getMetrics()) + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + } + //map TimelineMetrics to json string + String json = null; + try { + json = mapper.writeValueAsString(timelineMetrics); + LOG.debug(json); + } catch (Exception e) { + LOG.error("Failed to convert result into json", e); + } + return json; + } + + @Override + protected String getPostUrl() { + return BASE_POST_URL; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java new file mode 100644 index 0000000..ea72d17 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.host.aggregator; + +import junit.framework.Assert; +import org.junit.Test; + +import java.net.URI; + +import static org.easymock.EasyMock.createMockBuilder; + + +public class AggregatorApplicationTest { + @Test + public void testMainNotEnoughArguments() { + try { + AggregatorApplication.main(new String[0]); + throw new Exception("Should not be thrown"); + } catch (Exception e) { + //expected + } + try { + AggregatorApplication.main(new String[1]); + throw new Exception("Should not be thrown"); + } catch (Exception e) { + //expected + } + } + + @Test + public void testGetURI() { + AggregatorApplication aggregatorApplicationMock = createMockBuilder(AggregatorApplication.class) + .withConstructor("", "") + .addMockedMethod("createHttpServer") + .addMockedMethod("initConfiguration").createMock(); + + URI uri = aggregatorApplicationMock.getURI(); + Assert.assertEquals("http://" + aggregatorApplicationMock.getHostName() + ":61888/", uri.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java new file mode 100644 index 0000000..736fd06 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.host.aggregator; + + +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.test.framework.JerseyTest; +import com.sun.jersey.test.framework.WebAppDescriptor; +import com.sun.jersey.test.framework.spi.container.TestContainerFactory; +import com.sun.jersey.test.framework.spi.container.grizzly2.GrizzlyTestContainerFactory; +import junit.framework.Assert; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider; +import org.junit.Test; + + +import javax.ws.rs.core.MediaType; + +import java.util.Collection; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + + +public class AggregatorWebServiceTest extends JerseyTest { + public AggregatorWebServiceTest() { + super(new WebAppDescriptor.Builder( + "org.apache.hadoop.metrics2.host.aggregator") + .contextPath("jersey-guice-filter") + .servletPath("/") + .clientConfig(new DefaultClientConfig(JacksonJaxbJsonProvider.class)) + .build()); + } + + @Override + public TestContainerFactory getTestContainerFactory() { + return new GrizzlyTestContainerFactory(); + } + + @Test + public void testOkResponse() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics") + .accept("text/json") + .get(ClientResponse.class); + assertEquals(200, response.getStatus()); + assertEquals("text/json", response.getType().toString()); + } + + @Test + public void testWrongPath() { + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics").path("aggregated") + .accept("text/json") + .get(ClientResponse.class); + assertEquals(404, response.getStatus()); + } + + + @Test + public void testMetricsPost() { + TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(); + + timelineMetricsHolder.extractMetricsForAggregationPublishing(); + timelineMetricsHolder.extractMetricsForRawPublishing(); + + TimelineMetrics timelineMetrics = TimelineMetricsHolderTest.getTimelineMetricsWithAppID("appid"); + WebResource r = resource(); + ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics") + .accept(MediaType.TEXT_PLAIN) + .post(ClientResponse.class, timelineMetrics); + assertEquals(200, response.getStatus()); + assertEquals(MediaType.TEXT_PLAIN, response.getType().toString()); + + Map<String, TimelineMetrics> aggregationMap = timelineMetricsHolder.extractMetricsForAggregationPublishing(); + Map<String, TimelineMetrics> rawMap = timelineMetricsHolder.extractMetricsForRawPublishing(); + + Assert.assertEquals(1, aggregationMap.size()); + Assert.assertEquals(1, rawMap.size()); + + Collection<TimelineMetrics> aggregationCollection = aggregationMap.values(); + Collection<TimelineMetrics> rawCollection = rawMap.values(); + + Collection<String> aggregationCollectionKeys = aggregationMap.keySet(); + Collection<String> rawCollectionKeys = rawMap.keySet(); + + for (String key : aggregationCollectionKeys) { + Assert.assertTrue(key.contains("appid")); + } + + for (String key : rawCollectionKeys) { + Assert.assertTrue(key.contains("appid")); + } + + Assert.assertEquals(1, aggregationCollection.size()); + Assert.assertEquals(1, rawCollection.size()); + + TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0]; + TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0]; + + + Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size()); + Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size()); + + Assert.assertEquals("appid", aggregationTimelineMetrics.getMetrics().get(0).getAppId()); + Assert.assertEquals("appid", rawTimelineMetrics.getMetrics().get(0).getAppId()); + + aggregationMap = timelineMetricsHolder.extractMetricsForAggregationPublishing(); + rawMap = timelineMetricsHolder.extractMetricsForRawPublishing(); + + //Cache should be empty after extraction + Assert.assertEquals(0, aggregationMap.size()); + Assert.assertEquals(0, rawMap.size()); + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java new file mode 100644 index 0000000..7d8ebf4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.host.aggregator; + +import junit.framework.Assert; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Map; + + +public class TimelineMetricsHolderTest { + private TimelineMetricsHolder timelineMetricsHolderInstance; + + public void clearHolderSingleton() throws NoSuchFieldException, IllegalAccessException { + Class timelineMetricHolderClass = TimelineMetricsHolder.class; + Field field = timelineMetricHolderClass.getDeclaredField("instance"); + field.setAccessible(true); + field.set(field, null); + } + + @Test + public void testGetInstanceDefaultValues() throws Exception { + clearHolderSingleton(); + Assert.assertNotNull(TimelineMetricsHolder.getInstance()); + } + + @Test + public void testGetInstanceWithParameters() throws Exception { + clearHolderSingleton(); + Assert.assertNotNull(TimelineMetricsHolder.getInstance(1,2)); + } + + @Test + public void testCache() throws Exception { + clearHolderSingleton(); + timelineMetricsHolderInstance = TimelineMetricsHolder.getInstance(4,4); + timelineMetricsHolderInstance.putMetricsForAggregationPublishing(getTimelineMetricsWithAppID("aggr")); + timelineMetricsHolderInstance.putMetricsForRawPublishing(getTimelineMetricsWithAppID("raw")); + + Map<String, TimelineMetrics> aggregationMap = timelineMetricsHolderInstance.extractMetricsForAggregationPublishing(); + Map<String, TimelineMetrics> rawMap = timelineMetricsHolderInstance.extractMetricsForRawPublishing(); + + Assert.assertEquals(1, aggregationMap.size()); + Assert.assertEquals(1, rawMap.size()); + + Collection<TimelineMetrics> aggregationCollection = aggregationMap.values(); + Collection<TimelineMetrics> rawCollection = rawMap.values(); + + Collection<String> aggregationCollectionKeys = aggregationMap.keySet(); + Collection<String> rawCollectionKeys = rawMap.keySet(); + + for (String key : aggregationCollectionKeys) { + Assert.assertTrue(key.contains("aggr")); + } + + for (String key : rawCollectionKeys) { + Assert.assertTrue(key.contains("raw")); + } + + Assert.assertEquals(1, aggregationCollection.size()); + Assert.assertEquals(1, rawCollection.size()); + + TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0]; + TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0]; + + + Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size()); + Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size()); + + Assert.assertEquals("aggr", aggregationTimelineMetrics.getMetrics().get(0).getAppId()); + Assert.assertEquals("raw", rawTimelineMetrics.getMetrics().get(0).getAppId()); + + aggregationMap = timelineMetricsHolderInstance.extractMetricsForAggregationPublishing(); + rawMap = timelineMetricsHolderInstance.extractMetricsForRawPublishing(); + + //Cache should be empty after extraction + Assert.assertEquals(0, aggregationMap.size()); + Assert.assertEquals(0, rawMap.size()); + } + + public static TimelineMetrics getTimelineMetricsWithAppID(String appId) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setAppId(appId); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + return timelineMetrics; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java new file mode 100644 index 0000000..a8ddbee --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest; +import org.junit.Test; + +import java.util.Map; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.createMockBuilder; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class AbstractMetricPublisherTest { + @Test + public void testProcessAndPublishMetrics() throws Exception { + AbstractMetricPublisher publisherMock = + createMockBuilder(RawMetricsPublisher.class) + .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 60) + .addMockedMethod("processMetrics") + .addMockedMethod("getCollectorUri") + .addMockedMethod("emitMetricsJson") + .addMockedMethod("getCurrentCollectorHost").createStrictMock(); + + TimelineMetricsHolder.getInstance().putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw")); + expect(publisherMock.getCurrentCollectorHost()).andReturn("collectorhost").once(); + expect(publisherMock.getCollectorUri(anyString())).andReturn("https://collectorhost:11/metrics").once(); + expect(publisherMock.processMetrics(anyObject(Map.class))).andReturn("{metrics}").once(); + expect(publisherMock.emitMetricsJson("https://collectorhost:11/metrics", "{metrics}")).andReturn(true).once(); + + replay(publisherMock); + + publisherMock.processAndPublishMetrics(TimelineMetricsHolder.getInstance().extractMetricsForRawPublishing()); + + verify(publisherMock); + } + + @Test + public void testRunAndStop() throws Exception { + AbstractMetricPublisher publisherMock = createMockBuilder(RawMetricsPublisher.class) + .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 1) + .addMockedMethod("processAndPublishMetrics").createStrictMock(); + publisherMock.processAndPublishMetrics(anyObject(Map.class)); + expectLastCall().times(1); + + + Thread t = createMockBuilder(Thread.class) + .withConstructor(publisherMock) + .addMockedMethod("isInterrupted").createStrictMock(); + expect(t.isInterrupted()).andReturn(false).once(); + expect(t.isInterrupted()).andReturn(true).once(); + + replay(publisherMock, t); + + t.start(); + + Thread.sleep(2222); + + verify(publisherMock, t); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java new file mode 100644 index 0000000..3413052 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import junit.framework.Assert; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class AggregatedMetricsPublisherTest { + + @Test + public void testProcessMetrics() throws Exception { + Configuration configuration = new Configuration(); + TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(); + timelineMetricsHolder.extractMetricsForAggregationPublishing(); + timelineMetricsHolder.extractMetricsForRawPublishing(); + + TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>(); + metric1App1Metrics.put(1L, 1d); + metric1App1Metrics.put(2L, 2d); + metric1App1Metrics.put(3L, 3d); + timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics)); + + TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>(); + metric2App2Metrics.put(1L, 4d); + metric2App2Metrics.put(2L, 5d); + metric2App2Metrics.put(3L, 6d); + timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics)); + + TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>(); + metric3App3Metrics.put(1L, 7d); + metric3App3Metrics.put(2L, 8d); + metric3App3Metrics.put(3L, 9d); + + timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics)); + + + AggregatedMetricsPublisher aggregatedMetricsPublisher = + new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60); + + String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing()); + String expectedMetric1App1 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}"; + String expectedMetric2App2 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}"; + String expectedMetric3App3 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}"; + Assert.assertNotNull(aggregatedJson); + Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1)); + Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3)); + Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2)); + } + + @Test + public void testGetPostUrl() { + Configuration configuration = new Configuration(); + AggregatedMetricsPublisher aggregatedMetricsPublisher = + new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + String actualURL = aggregatedMetricsPublisher.getPostUrl(); + String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated"; + Assert.assertNotNull(actualURL); + Assert.assertEquals(expectedURL, actualURL); + } + + @Test + public void testGetCollectorUri() { + //default configuration + Configuration configuration = new Configuration(); + AbstractMetricPublisher aggregatedMetricsPublisher = + new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org"); + String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated"; + Assert.assertNotNull(actualURL); + Assert.assertEquals(expectedURL, actualURL); + + //https configuration + configuration = new Configuration(); + configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY"); + aggregatedMetricsPublisher = + new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org"); + expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated"; + Assert.assertNotNull(actualURL); + Assert.assertEquals(expectedURL, actualURL); + + //custom port configuration + configuration = new Configuration(); + configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888"); + aggregatedMetricsPublisher = + new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org"); + expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated"; + Assert.assertNotNull(actualURL); + Assert.assertEquals(expectedURL, actualURL); + } + + @Test + public void testGetMetricsFromCache() throws InterruptedException { + TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4); + timelineMetricsHolder.extractMetricsForAggregationPublishing(); + timelineMetricsHolder.extractMetricsForRawPublishing(); + + timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1")); + timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw")); + timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2")); + + Configuration configuration = new Configuration(); + AggregatedMetricsPublisher aggregatedMetricsPublisher = + new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + + Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache(); + Assert.assertNotNull(metricsFromCache); + Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values(); + Assert.assertNotNull(actualTimelineMetrics); + Assert.assertEquals(2, actualTimelineMetrics.size()); + + for (TimelineMetrics timelineMetrics : actualTimelineMetrics) { + List<TimelineMetric> metrics = timelineMetrics.getMetrics(); + Assert.assertEquals(1, metrics.size()); + Assert.assertTrue(metrics.get(0).getAppId().contains("aggr")); + } + + } + + TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metricName); + timelineMetric.setAppId(appId); + timelineMetric.setMetricValues(metricValues); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + return timelineMetrics; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java new file mode 100644 index 0000000..60510d2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; +import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest; +import org.junit.Test; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + + +public class RawMetricsPublisherTest { + @Test + public void testProcessMetrics() throws Exception { + Configuration configuration = new Configuration(); + TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(); + + timelineMetricsHolder.extractMetricsForAggregationPublishing(); + timelineMetricsHolder.extractMetricsForRawPublishing(); + + TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>(); + metric1App1Metrics.put(1L, 1d); + metric1App1Metrics.put(2L, 2d); + metric1App1Metrics.put(3L, 3d); + timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics)); + + TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>(); + metric2App2Metrics.put(1L, 4d); + metric2App2Metrics.put(2L, 5d); + metric2App2Metrics.put(3L, 6d); + timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics)); + + TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>(); + metric3App3Metrics.put(1L, 7d); + metric3App3Metrics.put(2L, 8d); + metric3App3Metrics.put(3L, 9d); + + timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics)); + + + RawMetricsPublisher rawMetricsPublisher = + new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60); + + String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing()); + String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}"; + Assert.assertNotNull(rawJson); + Assert.assertEquals(expectedResult, rawJson); + } + + @Test + public void testGetPostUrl() { + Configuration configuration = new Configuration(); + RawMetricsPublisher rawMetricsPublisher = + new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + String actualURL = rawMetricsPublisher.getPostUrl(); + String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics"; + Assert.assertNotNull(actualURL); + Assert.assertEquals(expectedURL, actualURL); + } + + @Test + public void testGetCollectorUri() { + //default configuration + Configuration configuration = new Configuration(); + AbstractMetricPublisher rawMetricsPublisher = + new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + String actualURL = rawMetricsPublisher.getCollectorUri("c6401.ambari.apache.org"); + String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics"; + Assert.assertNotNull(actualURL); + Assert.assertEquals(expectedURL, actualURL); + + //https configuration + configuration = new Configuration(); + configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY"); + rawMetricsPublisher = + new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + actualURL = rawMetricsPublisher.getCollectorUri("c6402.ambari.apache.org"); + expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics"; + Assert.assertNotNull(actualURL); + Assert.assertEquals(expectedURL, actualURL); + + //custom port configuration + configuration = new Configuration(); + configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888"); + rawMetricsPublisher = + new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + actualURL = rawMetricsPublisher.getCollectorUri("c6403.ambari.apache.org"); + expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics"; + Assert.assertNotNull(actualURL); + Assert.assertEquals(expectedURL, actualURL); + } + + @Test + public void testGetMetricsFromCache() throws InterruptedException { + + TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4); + timelineMetricsHolder.extractMetricsForAggregationPublishing(); + timelineMetricsHolder.extractMetricsForRawPublishing(); + + timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw1")); + timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr")); + timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw2")); + + Configuration configuration = new Configuration(); + RawMetricsPublisher rawMetricsPublisher = + new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); + + Map<String, TimelineMetrics> metricsFromCache = rawMetricsPublisher.getMetricsFromCache(); + Assert.assertNotNull(metricsFromCache); + Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values(); + Assert.assertNotNull(actualTimelineMetrics); + Assert.assertEquals(2, actualTimelineMetrics.size()); + + for (TimelineMetrics timelineMetrics : actualTimelineMetrics) { + List<TimelineMetric> metrics = timelineMetrics.getMetrics(); + Assert.assertEquals(1, metrics.size()); + Assert.assertTrue(metrics.get(0).getAppId().contains("raw")); + } + + } + + TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) { + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName(metricName); + timelineMetric.setAppId(appId); + timelineMetric.setMetricValues(metricValues); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + return timelineMetrics; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/d496b4ea/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py index 2249e53..ba05e9b 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py @@ -42,9 +42,10 @@ class Aggregator(threading.Thread): ams_log_file = "ambari-metrics-aggregator.log" additional_classpath = ':{0}'.format(config_dir) ams_log_dir = self._config.ams_monitor_log_dir() + hostname = self._config.get_hostname_config() logger.info('Starting Aggregator thread.') - cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\ - .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts) + cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6} {7}"\ + .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, hostname, collector_hosts) logger.info("Executing : {0}".format(cmd)) @@ -60,6 +61,7 @@ class Aggregator(threading.Thread): if self._aggregator_process : logger.info('Stopping Aggregator thread.') self._aggregator_process.terminate() + self._aggregator_process = None class AggregatorWatchdog(threading.Thread): SLEEP_TIME = 30
