http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-common/pom.xml index ebd0fc9,cae9734..1a7fef3 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@@ -106,12 -106,12 +106,16 @@@ </relocation> <relocation> <pattern>org.jboss</pattern> - <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jboss</shadedPattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.jboss</shadedPattern> + </relocation> + <relocation> + <pattern>net.sf.ehcache</pattern> + <shadedPattern>org.apache.ambari.metrics.sink.relocated.ehcache</shadedPattern> </relocation> + <relocation> + <pattern>org.apache.http</pattern> + <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.apache.http</shadedPattern> + </relocation> </relocations> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 73ed3c4,337f640..3c06032 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@@ -245,14 -290,15 +291,19 @@@ public abstract class AbstractTimelineM protected boolean emitMetrics(TimelineMetrics metrics) { String connectUrl; + boolean validCollectorHost = true; + if (isHostInMemoryAggregationEnabled()) { - connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort())); + String hostname = "localhost"; + if (getHostInMemoryAggregationProtocol().equalsIgnoreCase("https")) { + hostname = getHostname(); + } + connectUrl = constructTimelineMetricUri(getHostInMemoryAggregationProtocol(), hostname, String.valueOf(getHostInMemoryAggregationPort())); } else { String collectorHost = getCurrentCollectorHost(); + if (collectorHost == null) { + validCollectorHost = false; + } connectUrl = getCollectorUri(collectorHost); } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java index f8ed95f,1e5cc82..c73cbce --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java @@@ -6,9 -6,9 +6,9 @@@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * ++ * <p> ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@@ -17,227 -17,190 +17,227 @@@ */ package org.apache.hadoop.metrics2.host.aggregator; --import com.sun.jersey.api.container.httpserver.HttpServerFactory; --import com.sun.jersey.api.core.PackagesResourceConfig; --import com.sun.jersey.api.core.ResourceConfig; --import com.sun.net.httpserver.HttpServer; -- - import javax.net.ssl.SSLContext; --import javax.ws.rs.core.UriBuilder; -import java.io.IOException; import java.net.InetAddress; import java.net.URI; import java.net.URL; import java.net.UnknownHostException; import java.util.HashMap; - import com.sun.net.httpserver.HttpsConfigurator; - import com.sun.net.httpserver.HttpsServer; ++import javax.net.ssl.SSLContext; ++import javax.ws.rs.core.UriBuilder; ++ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher; import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher; import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher; +import org.eclipse.jetty.util.ssl.SslContextFactory; + ++import com.sun.jersey.api.container.httpserver.HttpServerFactory; ++import com.sun.jersey.api.core.PackagesResourceConfig; ++import com.sun.jersey.api.core.ResourceConfig; ++import com.sun.net.httpserver.HttpServer; ++import com.sun.net.httpserver.HttpsConfigurator; ++import com.sun.net.httpserver.HttpsServer; + /** * WEB application with 2 publisher threads that processes received metrics and submits results to the collector */ --public class AggregatorApplication --{ -- private static final int STOP_SECONDS_DELAY = 0; -- private static final int JOIN_SECONDS_TIMEOUT = 5; -- private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml"; - private static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml"; -- private Log LOG; -- private final int webApplicationPort; -- private final int rawPublishingInterval; -- private final int aggregationInterval; - private final String webServerProtocol; -- private Configuration configuration; -- private Thread aggregatePublisherThread; -- private Thread rawPublisherThread; -- private TimelineMetricsHolder timelineMetricsHolder; -- private HttpServer httpServer; -- -- public AggregatorApplication(String hostname, String collectorHosts) { -- LOG = LogFactory.getLog(this.getClass()); -- configuration = new Configuration(true); -- initConfiguration(); -- configuration.set("timeline.metrics.collector.hosts", collectorHosts); -- configuration.set("timeline.metrics.hostname", hostname); -- configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration()); -- this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300); -- this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60); -- this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888); - this.webServerProtocol = configuration.get("timeline.metrics.host.inmemory.aggregation.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https"; -- this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval); -- try { -- this.httpServer = createHttpServer(); - } catch (Exception e) { - } catch (IOException e) { -- LOG.error("Exception while starting HTTP server. Exiting", e); -- System.exit(1); -- } ++public class AggregatorApplication { ++ private static final int STOP_SECONDS_DELAY = 0; ++ private static final int JOIN_SECONDS_TIMEOUT = 5; ++ private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml"; ++ private static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml"; ++ private Log LOG; ++ private final int webApplicationPort; ++ private final int rawPublishingInterval; ++ private final int aggregationInterval; ++ private final String webServerProtocol; ++ private Configuration configuration; ++ private Thread aggregatePublisherThread; ++ private Thread rawPublisherThread; ++ private TimelineMetricsHolder timelineMetricsHolder; ++ private HttpServer httpServer; ++ ++ public AggregatorApplication(String hostname, String collectorHosts) { ++ LOG = LogFactory.getLog(this.getClass()); ++ configuration = new Configuration(true); ++ initConfiguration(); ++ configuration.set("timeline.metrics.collector.hosts", collectorHosts); ++ configuration.set("timeline.metrics.hostname", hostname); ++ configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration()); ++ this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300); ++ this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60); ++ this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888); ++ this.webServerProtocol = configuration.get("timeline.metrics.host.inmemory.aggregation.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https"; ++ this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval); ++ try { ++ this.httpServer = createHttpServer(); ++ } catch (Exception e) { ++ LOG.error("Exception while starting HTTP server. Exiting", e); ++ System.exit(1); } -- -- private String getZkQuorumFromConfiguration() { -- String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181"); -- String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", ""); -- return getZkConnectionUrl(zkClientPort, zkServerHosts); ++ } ++ ++ private String getZkQuorumFromConfiguration() { ++ String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181"); ++ String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", ""); ++ return getZkConnectionUrl(zkClientPort, zkServerHosts); ++ } ++ ++ protected void initConfiguration() { ++ ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); ++ if (classLoader == null) { ++ classLoader = getClass().getClassLoader(); } -- protected void initConfiguration() { -- ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); -- if (classLoader == null) { -- classLoader = getClass().getClassLoader(); -- } -- -- URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE); -- LOG.info("Found metric service configuration: " + amsResUrl); - URL sslConfUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE); - LOG.info("Found metric service configuration: " + sslConfUrl); -- if (amsResUrl == null) { - throw new IllegalStateException(String.format("Unable to initialize the metrics " + - "subsystem. No %s present in the classpath.", METRICS_SITE_CONFIGURATION_FILE)); - } - if (sslConfUrl == null) { - throw new IllegalStateException(String.format("Unable to initialize the metrics " + - "subsystem. No %s present in the classpath.", METRICS_SSL_SERVER_CONFIGURATION_FILE)); - throw new IllegalStateException("Unable to initialize the metrics " + - "subsystem. No ams-site present in the classpath."); -- } -- -- try { -- configuration.addResource(amsResUrl.toURI().toURL()); - configuration.addResource(sslConfUrl.toURI().toURL()); -- } catch (Exception e) { -- LOG.error("Couldn't init configuration. ", e); -- System.exit(1); -- } ++ URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE); ++ LOG.info("Found metric service configuration: " + amsResUrl); ++ URL sslConfUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE); ++ LOG.info("Found metric service configuration: " + sslConfUrl); ++ if (amsResUrl == null) { ++ throw new IllegalStateException(String.format("Unable to initialize the metrics " + ++ "subsystem. No %s present in the classpath.", METRICS_SITE_CONFIGURATION_FILE)); } -- protected String getHostName() { -- String hostName = "localhost"; -- try { -- hostName = InetAddress.getLocalHost().getCanonicalHostName(); -- } catch (UnknownHostException e) { -- LOG.error(e); -- } -- return hostName; ++ if (sslConfUrl == null) { ++ throw new IllegalStateException(String.format("Unable to initialize the metrics " + ++ "subsystem. No %s present in the classpath.", METRICS_SSL_SERVER_CONFIGURATION_FILE)); } -- protected URI getURI() { - URI uri = UriBuilder.fromUri("/").scheme(this.webServerProtocol).host(getHostName()).port(this.webApplicationPort).build(); - URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build(); -- LOG.info(String.format("Web server at %s", uri)); -- return uri; ++ try { ++ configuration.addResource(amsResUrl.toURI().toURL()); ++ configuration.addResource(sslConfUrl.toURI().toURL()); ++ } catch (Exception e) { ++ LOG.error("Couldn't init configuration. ", e); ++ System.exit(1); } -- - protected HttpServer createHttpServer() throws Exception { - protected HttpServer createHttpServer() throws IOException { -- ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator"); -- HashMap<String, Object> params = new HashMap(); -- params.put("com.sun.jersey.api.json.POJOMappingFeature", "true"); -- resourceConfig.setPropertiesAndFeatures(params); - HttpServer server = HttpServerFactory.create(getURI(), resourceConfig); - - if (webServerProtocol.equalsIgnoreCase("https")) { - HttpsServer httpsServer = (HttpsServer) server; - SslContextFactory sslContextFactory = new SslContextFactory(); - String keyStorePath = configuration.get("ssl.server.keystore.location"); - String keyStorePassword = configuration.get("ssl.server.keystore.password"); - String keyManagerPassword = configuration.get("ssl.server.keystore.keypassword"); - String trustStorePath = configuration.get("ssl.server.truststore.location"); - String trustStorePassword = configuration.get("ssl.server.truststore.password"); - - sslContextFactory.setKeyStorePath(keyStorePath); - sslContextFactory.setKeyStorePassword(keyStorePassword); - sslContextFactory.setKeyManagerPassword(keyManagerPassword); - sslContextFactory.setTrustStorePath(trustStorePath); - sslContextFactory.setTrustStorePassword(trustStorePassword); - - sslContextFactory.start(); - SSLContext sslContext = sslContextFactory.getSslContext(); - sslContextFactory.stop(); - HttpsConfigurator httpsConfigurator = new HttpsConfigurator(sslContext); - httpsServer.setHttpsConfigurator(httpsConfigurator); - server = httpsServer; - } - return server; - return HttpServerFactory.create(getURI(), resourceConfig); ++ } ++ ++ protected String getHostName() { ++ String hostName = "localhost"; ++ try { ++ hostName = InetAddress.getLocalHost().getCanonicalHostName(); ++ } catch (UnknownHostException e) { ++ LOG.error(e); } -- -- private void startWebServer() { -- LOG.info("Starting web server."); -- this.httpServer.start(); ++ return hostName; ++ } ++ ++ protected URI getURI() { ++ URI uri = UriBuilder.fromUri("/").scheme(this.webServerProtocol).host(getHostName()).port(this.webApplicationPort).build(); ++ LOG.info(String.format("Web server at %s", uri)); ++ return uri; ++ } ++ ++ protected HttpServer createHttpServer() throws Exception { ++ ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator"); ++ HashMap<String, Object> params = new HashMap<>(); ++ params.put("com.sun.jersey.api.json.POJOMappingFeature", "true"); ++ resourceConfig.setPropertiesAndFeatures(params); ++ HttpServer server = HttpServerFactory.create(getURI(), resourceConfig); ++ ++ if (webServerProtocol.equalsIgnoreCase("https")) { ++ HttpsServer httpsServer = (HttpsServer) server; ++ SslContextFactory sslContextFactory = new SslContextFactory(); ++ String keyStorePath = configuration.get("ssl.server.keystore.location"); ++ String keyStorePassword = configuration.get("ssl.server.keystore.password"); ++ String keyManagerPassword = configuration.get("ssl.server.keystore.keypassword"); ++ String trustStorePath = configuration.get("ssl.server.truststore.location"); ++ String trustStorePassword = configuration.get("ssl.server.truststore.password"); ++ ++ sslContextFactory.setKeyStorePath(keyStorePath); ++ sslContextFactory.setKeyStorePassword(keyStorePassword); ++ sslContextFactory.setKeyManagerPassword(keyManagerPassword); ++ sslContextFactory.setTrustStorePath(trustStorePath); ++ sslContextFactory.setTrustStorePassword(trustStorePassword); ++ ++ sslContextFactory.start(); ++ SSLContext sslContext = sslContextFactory.getSslContext(); ++ sslContextFactory.stop(); ++ HttpsConfigurator httpsConfigurator = new HttpsConfigurator(sslContext); ++ httpsServer.setHttpsConfigurator(httpsConfigurator); ++ server = httpsServer; ++ } ++ return server; } -- private void startAggregatePublisherThread() { -- LOG.info("Starting aggregated metrics publisher."); -- AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval); -- aggregatePublisherThread = new Thread(metricPublisher); -- aggregatePublisherThread.start(); -- } ++ private void startWebServer() { ++ LOG.info("Starting web server."); ++ this.httpServer.start(); ++ } ++ ++ private void startAggregatePublisherThread() { ++ LOG.info("Starting aggregated metrics publisher."); ++ AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval); ++ aggregatePublisherThread = new Thread(metricPublisher); ++ aggregatePublisherThread.start(); ++ } ++ ++ private void startRawPublisherThread() { ++ LOG.info("Starting raw metrics publisher."); ++ AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval); ++ rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher); ++ aggregatePublisherThread.start(); ++ } ++ ++ ++ private void stop() { ++ LOG.info("Stopping aggregator application"); ++ aggregatePublisherThread.interrupt(); ++ rawPublisherThread.interrupt(); ++ httpServer.stop(STOP_SECONDS_DELAY); ++ LOG.info("Stopped web server."); ++ try { ++ LOG.info("Waiting for threads to join."); ++ aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000); ++ rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000); ++ LOG.info("Gracefully stopped Aggregator Application."); ++ } catch (InterruptedException e) { ++ LOG.error("Received exception during stop : ", e); -- private void startRawPublisherThread() { -- LOG.info("Starting raw metrics publisher."); -- AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval); -- rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher); -- aggregatePublisherThread.start(); } -- -- -- private void stop() { -- LOG.info("Stopping aggregator application"); -- aggregatePublisherThread.interrupt(); -- rawPublisherThread.interrupt(); -- httpServer.stop(STOP_SECONDS_DELAY); -- LOG.info("Stopped web server."); -- try { -- LOG.info("Waiting for threads to join."); -- aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000); -- rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000); -- LOG.info("Gracefully stopped Aggregator Application."); -- } catch (InterruptedException e) { -- LOG.error("Received exception during stop : ", e); -- -- } -- ++ } ++ ++ private String getZkConnectionUrl(String zkClientPort, String zkQuorum) { ++ StringBuilder sb = new StringBuilder(); ++ String[] quorumParts = zkQuorum.split(","); ++ String prefix = ""; ++ for (String part : quorumParts) { ++ sb.append(prefix); ++ sb.append(part.trim()); ++ if (!part.contains(":")) { ++ sb.append(":"); ++ sb.append(zkClientPort); ++ } ++ prefix = ","; } ++ return sb.toString(); ++ } -- private String getZkConnectionUrl(String zkClientPort, String zkQuorum) { -- StringBuilder sb = new StringBuilder(); -- String[] quorumParts = zkQuorum.split(","); -- String prefix = ""; -- for (String part : quorumParts) { -- sb.append(prefix); -- sb.append(part.trim()); -- if (!part.contains(":")) { -- sb.append(":"); -- sb.append(zkClientPort); -- } -- prefix = ","; -- } -- return sb.toString(); ++ public static void main(String[] args) throws Exception { ++ if (args.length != 2) { ++ throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " + ++ "2nd - collector hosts separated with coma"); } -- public static void main( String[] args ) throws Exception { -- if (args.length != 2) { -- throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " + -- "2nd - collector hosts separated with coma"); -- } ++ final AggregatorApplication app = new AggregatorApplication(args[0], args[1]); -- final AggregatorApplication app = new AggregatorApplication(args[0], args[1]); ++ app.startWebServerAndPublishersThreads(); -- app.startWebServerAndPublishersThreads(); ++ Runtime.getRuntime().addShutdownHook(new Thread() { ++ public void run() { ++ app.stop(); ++ } ++ }); ++ } -- Runtime.getRuntime().addShutdownHook(new Thread() { -- public void run() { -- app.stop(); -- } -- }); -- } -- -- private void startWebServerAndPublishersThreads() { -- LOG.info("Starting aggregator application"); -- startAggregatePublisherThread(); -- startRawPublisherThread(); -- startWebServer(); -- } ++ private void startWebServerAndPublishersThreads() { ++ LOG.info("Starting aggregator application"); ++ startAggregatePublisherThread(); ++ startRawPublisherThread(); ++ startWebServer(); ++ } } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java index b151209,b151209..a6cbc2d --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java @@@ -6,9 -6,9 +6,9 @@@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * ++ * <p> ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@@ -18,8 -18,8 +18,8 @@@ package org.apache.hadoop.metrics2.host.aggregator; -- import com.sun.jersey.spi.resource.Singleton; ++ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import javax.ws.rs.Consumes; @@@ -29,28 -29,28 +29,29 @@@ import javax.ws.rs.Path import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; ++ import java.io.IOException; @Singleton @Path("/ws/v1/timeline") public class AggregatorWebService { -- TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance(); -- -- @GET -- @Produces("text/json") -- @Path("/metrics") -- public Response getOkResponse() throws IOException { -- return Response.ok().build(); -- } -- -- @POST -- @Produces(MediaType.TEXT_PLAIN) -- @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) -- @Path("/metrics") -- public Response postMetrics( -- TimelineMetrics metrics) { -- metricsHolder.putMetricsForAggregationPublishing(metrics); -- metricsHolder.putMetricsForRawPublishing(metrics); -- return Response.ok().build(); -- } ++ TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance(); ++ ++ @GET ++ @Produces("text/json") ++ @Path("/metrics") ++ public Response getOkResponse() throws IOException { ++ return Response.ok().build(); ++ } ++ ++ @POST ++ @Produces(MediaType.TEXT_PLAIN) ++ @Consumes({MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) ++ @Path("/metrics") ++ public Response postMetrics( ++ TimelineMetrics metrics) { ++ metricsHolder.putMetricsForAggregationPublishing(metrics); ++ metricsHolder.putMetricsForRawPublishing(metrics); ++ return Response.ok().build(); ++ } } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java index 03b6542,03b6542..3a8ae41 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java @@@ -6,9 -6,9 +6,9 @@@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * ++ * <p> ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@@ -17,11 -17,11 +17,6 @@@ */ package org.apache.hadoop.metrics2.host.aggregator; --import com.google.common.cache.Cache; --import com.google.common.cache.CacheBuilder; --import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; --import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -- import java.util.List; import java.util.Map; import java.util.TreeMap; @@@ -29,80 -29,80 +24,86 @@@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; ++import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; ++import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; ++ ++import com.google.common.cache.Cache; ++import com.google.common.cache.CacheBuilder; ++ /** * Singleton class with 2 guava caches for raw and aggregated metrics storing */ public class TimelineMetricsHolder { -- private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60; -- private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300; -- private Cache<String, TimelineMetrics> aggregationMetricsCache; -- private Cache<String, TimelineMetrics> rawMetricsCache; -- private static TimelineMetricsHolder instance = null; -- //to ensure no metric values are expired -- private static int EXPIRE_DELAY = 30; -- ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock(); -- ReadWriteLock rawCacheLock = new ReentrantReadWriteLock(); ++ private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60; ++ private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300; ++ private Cache<String, TimelineMetrics> aggregationMetricsCache; ++ private Cache<String, TimelineMetrics> rawMetricsCache; ++ private static TimelineMetricsHolder instance = null; ++ //to ensure no metric values are expired ++ private static int EXPIRE_DELAY = 30; ++ ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock(); ++ ReadWriteLock rawCacheLock = new ReentrantReadWriteLock(); -- private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) { -- this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build(); -- this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build(); -- } ++ private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) { ++ this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build(); ++ this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build(); ++ } -- public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) { -- if (instance == null) { -- instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime); -- } -- return instance; ++ public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) { ++ if (instance == null) { ++ instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime); } ++ return instance; ++ } -- /** -- * Uses default expiration time for caches initialization if they are not initialized yet. -- * @return -- */ -- public static TimelineMetricsHolder getInstance() { -- return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME); -- } ++ /** ++ * Uses default expiration time for caches initialization if they are not initialized yet. ++ * @return ++ */ ++ public static TimelineMetricsHolder getInstance() { ++ return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME); ++ } -- public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) { -- aggregationCacheLock.writeLock().lock(); -- aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics); -- aggregationCacheLock.writeLock().unlock(); -- } ++ public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) { ++ aggregationCacheLock.writeLock().lock(); ++ aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics); ++ aggregationCacheLock.writeLock().unlock(); ++ } -- private String calculateCacheKey(TimelineMetrics timelineMetrics) { -- List<TimelineMetric> metrics = timelineMetrics.getMetrics(); -- if (metrics.size() > 0) { -- return metrics.get(0).getAppId() + System.currentTimeMillis(); -- } -- return String.valueOf(System.currentTimeMillis()); ++ private String calculateCacheKey(TimelineMetrics timelineMetrics) { ++ List<TimelineMetric> metrics = timelineMetrics.getMetrics(); ++ if (metrics.size() > 0) { ++ return metrics.get(0).getAppId() + System.currentTimeMillis(); } ++ return String.valueOf(System.currentTimeMillis()); ++ } -- public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() { -- return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock); -- } ++ public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() { ++ return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock); ++ } -- public void putMetricsForRawPublishing(TimelineMetrics metrics) { -- rawCacheLock.writeLock().lock(); -- rawMetricsCache.put(calculateCacheKey(metrics), metrics); -- rawCacheLock.writeLock().unlock(); -- } ++ public void putMetricsForRawPublishing(TimelineMetrics metrics) { ++ rawCacheLock.writeLock().lock(); ++ rawMetricsCache.put(calculateCacheKey(metrics), metrics); ++ rawCacheLock.writeLock().unlock(); ++ } -- public Map<String, TimelineMetrics> extractMetricsForRawPublishing() { -- return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock); -- } ++ public Map<String, TimelineMetrics> extractMetricsForRawPublishing() { ++ return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock); ++ } -- /** -- * Returns values from cache and clears the cache -- * @param cache -- * @param lock -- * @return -- */ -- private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) { -- lock.writeLock().lock(); -- Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap()); -- cache.invalidateAll(); -- lock.writeLock().unlock(); -- return metricsMap; -- } ++ /** ++ * Returns values from cache and clears the cache ++ * @param cache ++ * @param lock ++ * @return ++ */ ++ private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) { ++ lock.writeLock().lock(); ++ Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap()); ++ cache.invalidateAll(); ++ lock.writeLock().unlock(); ++ return metricsMap; ++ } } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java index 7ce0815,5af115f..8211476 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java @@@ -6,9 -6,9 +6,9 @@@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * ++ * <p> ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@@ -17,153 -17,153 +17,152 @@@ */ package org.apache.hadoop.metrics2.sink.timeline; ++import java.util.Collection; ++import java.util.Map; ++ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; --import java.util.Collection; --import java.util.Map; -- /** * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals. */ public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable { -- - private static final String AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY = "ssl.server.truststore.location"; - private static final String AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY = "ssl.server.truststore.type"; - private static final String AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.password"; - private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location"; - private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password"; - private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type"; -- private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy"; -- private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address"; -- private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts"; -- private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum"; -- private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname"; -- protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics"; -- protected int publishIntervalInSeconds; -- private Log LOG; -- protected TimelineMetricsHolder timelineMetricsHolder; -- protected Configuration configuration; -- private String collectorProtocol; -- private String collectorPort; -- private Collection<String> collectorHosts; -- private String hostname; -- private String zkQuorum; -- -- public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) { -- LOG = LogFactory.getLog(this.getClass()); -- this.configuration = configuration; -- this.publishIntervalInSeconds = publishIntervalInSeconds; -- this.timelineMetricsHolder = timelineMetricsHolder; -- configure(); -- } -- -- protected void configure() { -- collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https"; -- collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1]; -- collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, "")); -- zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, ""); -- hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost"); -- collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, "")); -- if (collectorHosts.isEmpty()) { -- LOG.error("No Metric collector configured."); -- } else { -- if (collectorProtocol.contains("https")) { - String trustStorePath = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY).trim(); - String trustStoreType = configuration.get(AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY).trim(); - String trustStorePwd = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY).trim(); - String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim(); - String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim(); - String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); -- loadTruststore(trustStorePath, trustStoreType, trustStorePwd); -- } -- } -- } -- -- /** -- * Publishes metrics to collector in specified intervals while not interrupted. -- */ -- @Override -- public void run() { -- while (!Thread.currentThread().isInterrupted()) { -- try { -- Thread.sleep(this.publishIntervalInSeconds * 1000); -- } catch (InterruptedException e) { -- //Ignore -- } -- try { -- processAndPublishMetrics(getMetricsFromCache()); -- } catch (Exception e) { -- //ignore -- } -- } -- } -- -- /** -- * Processes and sends metrics to collector. -- * @param metricsFromCache -- * @throws Exception -- */ -- protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception { -- if (metricsFromCache.size()==0) return; -- -- LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size())); -- emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache)); -- } -- -- /** -- * Returns metrics map. Source is based on implementation. -- * @return -- */ -- protected abstract Map<String,TimelineMetrics> getMetricsFromCache(); -- -- /** -- * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector -- * @param metricValues -- * @return -- */ -- protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues); -- -- protected abstract String getPostUrl(); -- -- @Override -- protected String getCollectorUri(String host) { -- return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort()); ++ private static final String AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY = "ssl.server.truststore.location"; ++ private static final String AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY = "ssl.server.truststore.type"; ++ private static final String AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.password"; ++ private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy"; ++ private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address"; ++ private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts"; ++ private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum"; ++ private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname"; ++ protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics"; ++ protected int publishIntervalInSeconds; ++ private Log LOG; ++ protected TimelineMetricsHolder timelineMetricsHolder; ++ protected Configuration configuration; ++ private String collectorProtocol; ++ private String collectorPort; ++ private Collection<String> collectorHosts; ++ private String hostname; ++ private String zkQuorum; ++ ++ public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) { ++ LOG = LogFactory.getLog(this.getClass()); ++ this.configuration = configuration; ++ this.publishIntervalInSeconds = publishIntervalInSeconds; ++ this.timelineMetricsHolder = timelineMetricsHolder; ++ configure(); ++ } ++ ++ protected void configure() { ++ collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https"; ++ collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1]; ++ collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, "")); ++ zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, ""); ++ hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost"); ++ collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, "")); ++ if (collectorHosts.isEmpty()) { ++ LOG.error("No Metric collector configured."); ++ } else { ++ if (collectorProtocol.contains("https")) { ++ String trustStorePath = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY).trim(); ++ String trustStoreType = configuration.get(AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY).trim(); ++ String trustStorePwd = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY).trim(); ++ loadTruststore(trustStorePath, trustStoreType, trustStorePwd); ++ } } -- -- @Override -- protected String getCollectorProtocol() { -- return collectorProtocol; -- } -- -- @Override -- protected String getCollectorPort() { -- return collectorPort; -- } -- -- @Override -- protected int getTimeoutSeconds() { -- return DEFAULT_POST_TIMEOUT_SECONDS; -- } -- -- @Override -- protected String getZookeeperQuorum() { -- return zkQuorum; -- } -- -- @Override -- protected Collection<String> getConfiguredCollectorHosts() { -- return collectorHosts; -- } -- -- @Override -- protected String getHostname() { -- return hostname; -- } -- -- @Override -- protected boolean isHostInMemoryAggregationEnabled() { -- return false; -- } -- -- @Override -- protected int getHostInMemoryAggregationPort() { -- return 0; ++ } ++ ++ /** ++ * Publishes metrics to collector in specified intervals while not interrupted. ++ */ ++ @Override ++ public void run() { ++ while (!Thread.currentThread().isInterrupted()) { ++ try { ++ Thread.sleep(this.publishIntervalInSeconds * 1000); ++ } catch (InterruptedException e) { ++ //Ignore ++ } ++ try { ++ processAndPublishMetrics(getMetricsFromCache()); ++ } catch (Exception e) { ++ //ignore ++ } } ++ } ++ ++ /** ++ * Processes and sends metrics to collector. ++ * @param metricsFromCache ++ * @throws Exception ++ */ ++ protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception { ++ if (metricsFromCache.size() == 0) return; ++ ++ LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size())); ++ emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache)); ++ } ++ ++ /** ++ * Returns metrics map. Source is based on implementation. ++ * @return ++ */ ++ protected abstract Map<String, TimelineMetrics> getMetricsFromCache(); ++ ++ /** ++ * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector ++ * @param metricValues ++ * @return ++ */ ++ protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues); ++ ++ protected abstract String getPostUrl(); ++ ++ @Override ++ protected String getCollectorUri(String host) { ++ return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort()); ++ } ++ ++ @Override ++ protected String getCollectorProtocol() { ++ return collectorProtocol; ++ } ++ ++ @Override ++ protected String getCollectorPort() { ++ return collectorPort; ++ } ++ ++ @Override ++ protected int getTimeoutSeconds() { ++ return DEFAULT_POST_TIMEOUT_SECONDS; ++ } ++ ++ @Override ++ protected String getZookeeperQuorum() { ++ return zkQuorum; ++ } ++ ++ @Override ++ protected Collection<String> getConfiguredCollectorHosts() { ++ return collectorHosts; ++ } ++ ++ @Override ++ protected String getHostname() { ++ return hostname; ++ } ++ ++ @Override ++ protected boolean isHostInMemoryAggregationEnabled() { ++ return false; ++ } ++ ++ @Override ++ protected int getHostInMemoryAggregationPort() { ++ return 0; ++ } } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java index fa0c8fb,c8dffab..f1ed90b --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java @@@ -6,9 -6,9 +6,9 @@@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * ++ * <p> ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@@ -18,91 -18,86 +18,90 @@@ package org.apache.hadoop.metrics2.sink.timeline; -- --import org.apache.commons.logging.Log; --import org.apache.commons.logging.LogFactory; --import org.apache.hadoop.conf.Configuration; --import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; -- import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeMap; ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; ++ /** * Thread that aggregates and publishes metrics to collector on specified interval. */ public class AggregatedMetricsPublisher extends AbstractMetricPublisher { -- private static String AGGREGATED_POST_PREFIX = "/aggregated"; -- private Log LOG; ++ private static String AGGREGATED_POST_PREFIX = "/aggregated"; ++ private Log LOG; -- public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) { -- super(timelineMetricsHolder, configuration, interval); -- LOG = LogFactory.getLog(this.getClass()); -- } ++ public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) { ++ super(timelineMetricsHolder, configuration, interval); ++ LOG = LogFactory.getLog(this.getClass()); ++ } -- /** -- * get metrics map form @TimelineMetricsHolder -- * @return -- */ -- @Override -- protected Map<String, TimelineMetrics> getMetricsFromCache() { -- return timelineMetricsHolder.extractMetricsForAggregationPublishing(); -- } ++ /** ++ * get metrics map form @TimelineMetricsHolder ++ * @return ++ */ ++ @Override ++ protected Map<String, TimelineMetrics> getMetricsFromCache() { ++ return timelineMetricsHolder.extractMetricsForAggregationPublishing(); ++ } -- /** -- * Aggregates given metrics and converts them into json string that will be send to collector -- * @param metricForAggregationValues -- * @return -- */ -- @Override -- protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) { -- HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>(); -- for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) { -- for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { -- if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) { -- nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics()); -- } -- nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric); -- } ++ /** ++ * Aggregates given metrics and converts them into json string that will be send to collector ++ * @param metricForAggregationValues ++ * @return ++ */ ++ @Override ++ protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) { ++ HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>(); ++ for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) { ++ for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { ++ if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) { ++ nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics()); } -- Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>(); -- for (TimelineMetrics metrics : nameToMetricMap.values()) { -- double sum = 0; -- double max = Integer.MIN_VALUE; -- double min = Integer.MAX_VALUE; -- int count = 0; -- for (TimelineMetric metric : metrics.getMetrics()) { -- for (Double value : metric.getMetricValues().values()) { -- sum+=value; -- max = Math.max(max, value); -- min = Math.min(min, value); -- count++; -- } -- } -- TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0)); -- tmpMetric.setMetricValues(new TreeMap<Long, Double>()); -- metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min))); -- } -- String json = null; -- try { -- json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis())); -- LOG.debug(json); -- } catch (Exception e) { -- LOG.error("Failed to convert result into json", e); ++ nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric); ++ } ++ } ++ Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>(); ++ for (TimelineMetrics metrics : nameToMetricMap.values()) { ++ double sum = 0; ++ double max = Integer.MIN_VALUE; ++ double min = Integer.MAX_VALUE; ++ int count = 0; ++ for (TimelineMetric metric : metrics.getMetrics()) { ++ for (Double value : metric.getMetricValues().values()) { ++ sum += value; ++ max = Math.max(max, value); ++ min = Math.min(min, value); ++ count++; } -- -- return json; ++ } ++ TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0)); ++ tmpMetric.setMetricValues(new TreeMap<Long, Double>()); ++ metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min))); } -- -- @Override -- protected String getPostUrl() { -- return BASE_POST_URL + AGGREGATED_POST_PREFIX; ++ String json = null; ++ try { ++ json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis())); ++ LOG.debug(json); ++ } catch (Exception e) { ++ LOG.error("Failed to convert result into json", e); } + - @Override - protected String getHostInMemoryAggregationProtocol() { - return "http"; - } ++ return json; ++ } ++ ++ @Override ++ protected String getPostUrl() { ++ return BASE_POST_URL + AGGREGATED_POST_PREFIX; ++ } ++ ++ @Override ++ protected String getHostInMemoryAggregationProtocol() { ++ return "http"; ++ } } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java index 2469449,89addb7..74b841b --- a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java @@@ -6,9 -6,9 +6,9 @@@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * ++ * <p> ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@@ -18,53 -18,48 +18,53 @@@ package org.apache.hadoop.metrics2.sink.timeline; ++import java.util.Map; ++ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; --import java.util.Map; -- public class RawMetricsPublisher extends AbstractMetricPublisher { -- private final Log LOG; ++ private final Log LOG; -- public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) { -- super(timelineMetricsHolder, configuration, interval); -- LOG = LogFactory.getLog(this.getClass()); -- } ++ public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) { ++ super(timelineMetricsHolder, configuration, interval); ++ LOG = LogFactory.getLog(this.getClass()); ++ } -- @Override -- protected Map<String, TimelineMetrics> getMetricsFromCache() { -- return timelineMetricsHolder.extractMetricsForRawPublishing(); -- } ++ @Override ++ protected Map<String, TimelineMetrics> getMetricsFromCache() { ++ return timelineMetricsHolder.extractMetricsForRawPublishing(); ++ } -- @Override -- protected String processMetrics(Map<String, TimelineMetrics> metricValues) { -- //merge everything in one TimelineMetrics object -- TimelineMetrics timelineMetrics = new TimelineMetrics(); -- for (TimelineMetrics metrics : metricValues.values()) { -- for (TimelineMetric timelineMetric : metrics.getMetrics()) -- timelineMetrics.addOrMergeTimelineMetric(timelineMetric); -- } -- //map TimelineMetrics to json string -- String json = null; -- try { -- json = mapper.writeValueAsString(timelineMetrics); -- LOG.debug(json); -- } catch (Exception e) { -- LOG.error("Failed to convert result into json", e); -- } -- return json; ++ @Override ++ protected String processMetrics(Map<String, TimelineMetrics> metricValues) { ++ //merge everything in one TimelineMetrics object ++ TimelineMetrics timelineMetrics = new TimelineMetrics(); ++ for (TimelineMetrics metrics : metricValues.values()) { ++ for (TimelineMetric timelineMetric : metrics.getMetrics()) ++ timelineMetrics.addOrMergeTimelineMetric(timelineMetric); } -- -- @Override -- protected String getPostUrl() { -- return BASE_POST_URL; ++ //map TimelineMetrics to json string ++ String json = null; ++ try { ++ json = mapper.writeValueAsString(timelineMetrics); ++ LOG.debug(json); ++ } catch (Exception e) { ++ LOG.error("Failed to convert result into json", e); } ++ return json; ++ } + - @Override - protected String getHostInMemoryAggregationProtocol() { - return "http"; - } ++ @Override ++ protected String getPostUrl() { ++ return BASE_POST_URL; ++ } ++ ++ @Override ++ protected String getHostInMemoryAggregationProtocol() { ++ return "http"; ++ } } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java index 8c17ba1,3413052..cacc98b --- a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java @@@ -6,9 -6,9 +6,9 @@@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at -- * -- * http://www.apache.org/licenses/LICENSE-2.0 -- * ++ * <p> ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@@ -17,138 -17,138 +17,138 @@@ */ package org.apache.hadoop.metrics2.sink.timeline; --import junit.framework.Assert; --import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; --import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest; --import org.junit.Test; -- --import org.apache.hadoop.conf.Configuration; -- import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; --public class AggregatedMetricsPublisherTest { -- -- @Test -- public void testProcessMetrics() throws Exception { -- Configuration configuration = new Configuration(); -- TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(); -- timelineMetricsHolder.extractMetricsForAggregationPublishing(); -- timelineMetricsHolder.extractMetricsForRawPublishing(); -- -- TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>(); -- metric1App1Metrics.put(1L, 1d); -- metric1App1Metrics.put(2L, 2d); -- metric1App1Metrics.put(3L, 3d); -- timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics)); -- -- TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>(); -- metric2App2Metrics.put(1L, 4d); -- metric2App2Metrics.put(2L, 5d); -- metric2App2Metrics.put(3L, 6d); -- timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics)); -- -- TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>(); -- metric3App3Metrics.put(1L, 7d); -- metric3App3Metrics.put(2L, 8d); -- metric3App3Metrics.put(3L, 9d); -- -- timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics)); -- -- -- AggregatedMetricsPublisher aggregatedMetricsPublisher = -- new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60); -- -- String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing()); - String expectedMetric1App1 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}"; - String expectedMetric2App2 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}"; - String expectedMetric3App3 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}"; - String expectedMetric1App1 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}"; - String expectedMetric2App2 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}"; - String expectedMetric3App3 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}"; -- Assert.assertNotNull(aggregatedJson); -- Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1)); -- Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3)); -- Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2)); -- } -- -- @Test -- public void testGetPostUrl() { -- Configuration configuration = new Configuration(); -- AggregatedMetricsPublisher aggregatedMetricsPublisher = -- new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); -- String actualURL = aggregatedMetricsPublisher.getPostUrl(); -- String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated"; -- Assert.assertNotNull(actualURL); -- Assert.assertEquals(expectedURL, actualURL); -- } -- -- @Test -- public void testGetCollectorUri() { -- //default configuration -- Configuration configuration = new Configuration(); -- AbstractMetricPublisher aggregatedMetricsPublisher = -- new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); -- String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org"); -- String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated"; -- Assert.assertNotNull(actualURL); -- Assert.assertEquals(expectedURL, actualURL); -- -- //https configuration -- configuration = new Configuration(); -- configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY"); -- aggregatedMetricsPublisher = -- new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); -- actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org"); -- expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated"; -- Assert.assertNotNull(actualURL); -- Assert.assertEquals(expectedURL, actualURL); -- -- //custom port configuration -- configuration = new Configuration(); -- configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888"); -- aggregatedMetricsPublisher = -- new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); -- actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org"); -- expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated"; -- Assert.assertNotNull(actualURL); -- Assert.assertEquals(expectedURL, actualURL); -- } -- -- @Test -- public void testGetMetricsFromCache() throws InterruptedException { -- TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4); -- timelineMetricsHolder.extractMetricsForAggregationPublishing(); -- timelineMetricsHolder.extractMetricsForRawPublishing(); -- -- timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1")); -- timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw")); -- timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2")); -- -- Configuration configuration = new Configuration(); -- AggregatedMetricsPublisher aggregatedMetricsPublisher = -- new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder; ++import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest; ++import org.junit.Test; -- Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache(); -- Assert.assertNotNull(metricsFromCache); -- Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values(); -- Assert.assertNotNull(actualTimelineMetrics); -- Assert.assertEquals(2, actualTimelineMetrics.size()); ++import junit.framework.Assert; -- for (TimelineMetrics timelineMetrics : actualTimelineMetrics) { -- List<TimelineMetric> metrics = timelineMetrics.getMetrics(); -- Assert.assertEquals(1, metrics.size()); -- Assert.assertTrue(metrics.get(0).getAppId().contains("aggr")); -- } ++public class AggregatedMetricsPublisherTest { ++ @Test ++ public void testProcessMetrics() throws Exception { ++ Configuration configuration = new Configuration(); ++ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(); ++ timelineMetricsHolder.extractMetricsForAggregationPublishing(); ++ timelineMetricsHolder.extractMetricsForRawPublishing(); ++ ++ TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>(); ++ metric1App1Metrics.put(1L, 1d); ++ metric1App1Metrics.put(2L, 2d); ++ metric1App1Metrics.put(3L, 3d); ++ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics)); ++ ++ TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>(); ++ metric2App2Metrics.put(1L, 4d); ++ metric2App2Metrics.put(2L, 5d); ++ metric2App2Metrics.put(3L, 6d); ++ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics)); ++ ++ TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>(); ++ metric3App3Metrics.put(1L, 7d); ++ metric3App3Metrics.put(2L, 8d); ++ metric3App3Metrics.put(3L, 9d); ++ ++ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics)); ++ ++ ++ AggregatedMetricsPublisher aggregatedMetricsPublisher = ++ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60); ++ ++ String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing()); ++ String expectedMetric1App1 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}"; ++ String expectedMetric2App2 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}"; ++ String expectedMetric3App3 = "{\"timelineMetric\":{\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}"; ++ Assert.assertNotNull(aggregatedJson); ++ Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1)); ++ Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3)); ++ Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2)); ++ } ++ ++ @Test ++ public void testGetPostUrl() { ++ Configuration configuration = new Configuration(); ++ AggregatedMetricsPublisher aggregatedMetricsPublisher = ++ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); ++ String actualURL = aggregatedMetricsPublisher.getPostUrl(); ++ String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated"; ++ Assert.assertNotNull(actualURL); ++ Assert.assertEquals(expectedURL, actualURL); ++ } ++ ++ @Test ++ public void testGetCollectorUri() { ++ //default configuration ++ Configuration configuration = new Configuration(); ++ AbstractMetricPublisher aggregatedMetricsPublisher = ++ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); ++ String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org"); ++ String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated"; ++ Assert.assertNotNull(actualURL); ++ Assert.assertEquals(expectedURL, actualURL); ++ ++ //https configuration ++ configuration = new Configuration(); ++ configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY"); ++ aggregatedMetricsPublisher = ++ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); ++ actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org"); ++ expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated"; ++ Assert.assertNotNull(actualURL); ++ Assert.assertEquals(expectedURL, actualURL); ++ ++ //custom port configuration ++ configuration = new Configuration(); ++ configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888"); ++ aggregatedMetricsPublisher = ++ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); ++ actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org"); ++ expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated"; ++ Assert.assertNotNull(actualURL); ++ Assert.assertEquals(expectedURL, actualURL); ++ } ++ ++ @Test ++ public void testGetMetricsFromCache() throws InterruptedException { ++ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4, 4); ++ timelineMetricsHolder.extractMetricsForAggregationPublishing(); ++ timelineMetricsHolder.extractMetricsForRawPublishing(); ++ ++ timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1")); ++ timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw")); ++ timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2")); ++ ++ Configuration configuration = new Configuration(); ++ AggregatedMetricsPublisher aggregatedMetricsPublisher = ++ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1); ++ ++ Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache(); ++ Assert.assertNotNull(metricsFromCache); ++ Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values(); ++ Assert.assertNotNull(actualTimelineMetrics); ++ Assert.assertEquals(2, actualTimelineMetrics.size()); ++ ++ for (TimelineMetrics timelineMetrics : actualTimelineMetrics) { ++ List<TimelineMetric> metrics = timelineMetrics.getMetrics(); ++ Assert.assertEquals(1, metrics.size()); ++ Assert.assertTrue(metrics.get(0).getAppId().contains("aggr")); } -- TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) { -- TimelineMetric timelineMetric = new TimelineMetric(); -- timelineMetric.setMetricName(metricName); -- timelineMetric.setAppId(appId); -- timelineMetric.setMetricValues(metricValues); -- TimelineMetrics timelineMetrics = new TimelineMetrics(); -- timelineMetrics.addOrMergeTimelineMetric(timelineMetric); -- return timelineMetrics; -- } ++ } ++ ++ TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) { ++ TimelineMetric timelineMetric = new TimelineMetric(); ++ timelineMetric.setMetricName(metricName); ++ timelineMetric.setAppId(appId); ++ timelineMetric.setMetricValues(metricValues); ++ TimelineMetrics timelineMetrics = new TimelineMetrics(); ++ timelineMetrics.addOrMergeTimelineMetric(timelineMetric); ++ return timelineMetrics; ++ } } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java index b43a87c,60510d2..252f7d4 --- a/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java @@@ -62,7 -62,7 +62,11 @@@ public class RawMetricsPublisherTest new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60); String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing()); ++<<<<<<< HEAD + String expectedResult = "{\"metrics\":[{\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}"; ++======= + String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}"; ++>>>>>>> trunk Assert.assertNotNull(rawJson); Assert.assertEquals(expectedResult, rawJson); } http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py ---------------------------------------------------------------------- diff --cc ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py index 371907d,f19434d..df79d69 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py @@@ -105,15 -111,22 +115,24 @@@ class Emitter(threading.Thread) if not success: logger.info('No valid collectors found...') for collector_host in self.active_collector_hosts: - success = self.try_with_collector_host(collector_host, data) + success = self.try_with_collector(self.collector_protocol, collector_host, self.ollector_port, data) + if success: + break pass - def try_with_collector_host(self, collector_host, data): + def try_with_collector(self, collector_protocol, collector_host, collector_port, data): headers = {"Content-Type" : "application/json", "Accept" : "*/*"} - connection = self.get_connection(collector_host) + connection = self.get_connection(collector_protocol, collector_host, collector_port) logger.debug("message to send: %s" % data) + + try: + if self.cookie_cached[connection.host]: + headers["Cookie"] = self.cookie_cached[connection.host] + logger.debug("Cookie: %s" % self.cookie_cached[connection.host]) + except Exception, e: + self.cookie_cached = {} + pass + retry_count = 0 while retry_count < self.MAX_RETRY_COUNT: response = self.get_response_from_submission(connection, data, headers) http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/522f438c/ambari-metrics/ambari-metrics-timelineservice/pom.xml ----------------------------------------------------------------------
