Repository: ambari Updated Branches: refs/heads/branch-2.5 532673b04 -> b9f25b725
AMBARI-19273 : Refine AmbariServer Metrics service and enable JVM metrics source by default. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b9f25b72 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b9f25b72 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b9f25b72 Branch: refs/heads/branch-2.5 Commit: b9f25b72588dea49b6bfc5f8d06242c52f5887ea Parents: 532673b Author: Aravindan Vijayan <[email protected]> Authored: Thu Dec 22 12:25:55 2016 -0800 Committer: Aravindan Vijayan <[email protected]> Committed: Thu Dec 22 12:25:55 2016 -0800 ---------------------------------------------------------------------- ambari-server/conf/unix/metrics.properties | 15 +- ambari-server/conf/windows/metrics.properties | 28 +++ .../src/main/assemblies/server-windows.xml | 4 + ambari-server/src/main/assemblies/server.xml | 4 + .../server/configuration/Configuration.java | 8 + .../ambari/server/controller/AmbariServer.java | 27 +-- .../server/metrics/system/AmbariMetricSink.java | 34 --- .../server/metrics/system/MetricsService.java | 17 +- .../server/metrics/system/MetricsSink.java | 43 ++++ .../server/metrics/system/MetricsSource.java | 7 +- .../server/metrics/system/SingleMetric.java | 44 ++++ .../system/impl/AbstractMetricsSource.java | 11 +- .../system/impl/AmbariMetricSinkImpl.java | 225 ++++++++++++++----- .../metrics/system/impl/Configuration.java | 83 ------- .../metrics/system/impl/JvmMetricsSource.java | 67 ++++-- .../system/impl/MetricsConfiguration.java | 89 ++++++++ .../metrics/system/impl/MetricsServiceImpl.java | 170 ++++++-------- .../system/impl/JvmMetricsSourceTest.java | 36 +++ .../metric/system/impl/MetricsServiceTest.java | 40 ++++ .../system/impl/TestAmbariMetricsSinkImpl.java | 79 +++++++ .../metric/system/impl/TestMetricsSource.java | 37 +++ .../src/test/resources/metrics.properties | 29 +++ 22 files changed, 759 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/conf/unix/metrics.properties ---------------------------------------------------------------------- diff --git a/ambari-server/conf/unix/metrics.properties b/ambari-server/conf/unix/metrics.properties index 5f01e39..3ee22d6 100644 --- a/ambari-server/conf/unix/metrics.properties +++ b/ambari-server/conf/unix/metrics.properties @@ -17,15 +17,12 @@ # limitations under the License. -# Metrics sources info -metrics.sources=jvm - +#### Source Configs ##### # Source interval determines how often the metric is sent to sink. Its unit is in seconds -source.jvm.interval=5 -source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource +metric.sources=jvm -#source.database.interval=10 -#source.database.class=org.apache.ambari.server.metrics.system.impl.DbMetricSource +source.jvm.interval=10 +source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource -# Sink frequency determines how often the sink publish the metrics from buffer to AMS. -sink.frequency=10 \ No newline at end of file +#Override Ambari Server hostname for metrics +#ambariserver.hostname.override= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/conf/windows/metrics.properties ---------------------------------------------------------------------- diff --git a/ambari-server/conf/windows/metrics.properties b/ambari-server/conf/windows/metrics.properties new file mode 100644 index 0000000..3ee22d6 --- /dev/null +++ b/ambari-server/conf/windows/metrics.properties @@ -0,0 +1,28 @@ +# Copyright 2011 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +#### Source Configs ##### +# Source interval determines how often the metric is sent to sink. Its unit is in seconds +metric.sources=jvm + +source.jvm.interval=10 +source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource + +#Override Ambari Server hostname for metrics +#ambariserver.hostname.override= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/assemblies/server-windows.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/assemblies/server-windows.xml b/ambari-server/src/main/assemblies/server-windows.xml index 191fcfb..ce1e270 100644 --- a/ambari-server/src/main/assemblies/server-windows.xml +++ b/ambari-server/src/main/assemblies/server-windows.xml @@ -45,6 +45,10 @@ <outputDirectory>/ambari-server-${project.version}/conf</outputDirectory> </file> <file> + <source>${basedir}/conf/windows/metrics.properties</source> + <outputDirectory>/ambari-server-${project.version}/conf</outputDirectory> + </file> + <file> <source>${basedir}/conf/windows/ca.config</source> <outputDirectory>/ambari-server-${project.version}/keystore</outputDirectory> </file> http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/assemblies/server.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/assemblies/server.xml b/ambari-server/src/main/assemblies/server.xml index 9a193a9..7467f3b 100644 --- a/ambari-server/src/main/assemblies/server.xml +++ b/ambari-server/src/main/assemblies/server.xml @@ -219,6 +219,10 @@ <outputDirectory>/etc/ambari-server/conf</outputDirectory> </file> <file> + <source>conf/unix/metrics.properties</source> + <outputDirectory>/etc/ambari-server/conf</outputDirectory> + </file> + <file> <source>conf/unix/krb5JAASLogin.conf</source> <outputDirectory>/etc/ambari-server/conf</outputDirectory> </file> http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index 43fff0b..26c1402 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -2504,6 +2504,14 @@ public class Configuration { public static final ConfigurationProperty<Integer> LOGSEARCH_PORTAL_READ_TIMEOUT = new ConfigurationProperty<>( "logsearch.portal.read.timeout", 5000); + + /** + * Global disable flag for AmbariServer Metrics. + */ + @Markdown(description = "Global disable flag for AmbariServer Metrics.") + public static final ConfigurationProperty<Boolean> AMBARISERVER_METRICS_DISABLE = new ConfigurationProperty<>( + "ambariserver.metrics.disable", false); + private static final Logger LOG = LoggerFactory.getLogger( Configuration.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java index dcc56c9..535940c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java @@ -161,10 +161,6 @@ import com.google.inject.persist.Transactional; import com.sun.jersey.spi.container.servlet.ServletContainer; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - - @Singleton public class AmbariServer { public static final String VIEWS_URL_PATTERN = "/api/v1/views/*"; @@ -536,6 +532,9 @@ public class AmbariServer { ExecutionScheduleManager executionScheduleManager = injector .getInstance(ExecutionScheduleManager.class); + MetricsService metricsService = injector.getInstance( + MetricsService.class); + clusterController = controller; StateRecoveryManager recoveryManager = injector.getInstance( @@ -548,14 +547,6 @@ public class AmbariServer { */ server.start(); - // TODO, start every other tread. - final ExecutorService executor = Executors.newSingleThreadExecutor(); - MetricsService metricsService = injector.getInstance( - MetricsService.class); - metricsService.init(); - executor.submit(metricsService); - LOG.info("********* Started Ambari Metrics **********"); - serverForAgent.start(); LOG.info("********* Started Server **********"); @@ -568,6 +559,12 @@ public class AmbariServer { serviceManager.startAsync(); LOG.info("********* Started Services **********"); + if (!Configuration.AMBARISERVER_METRICS_DISABLE.equals(true)) { + metricsService.start(); + } else { + LOG.info("AmbariServer Metrics disabled."); + } + server.join(); LOG.info("Joined the Server"); } catch (BadPaddingException bpe) { @@ -822,9 +819,9 @@ public class AmbariServer { gzipFilter.setInitParameter("methods", "GET,POST,PUT,DELETE"); gzipFilter.setInitParameter("excludePathPatterns", ".*(\\.woff|\\.ttf|\\.woff2|\\.eot|\\.svg)"); gzipFilter.setInitParameter("mimeTypes", - "text/html,text/plain,text/xml,text/css,application/x-javascript," + - "application/xml,application/x-www-form-urlencoded," + - "application/javascript,application/json"); + "text/html,text/plain,text/xml,text/css,application/x-javascript," + + "application/xml,application/x-www-form-urlencoded," + + "application/javascript,application/json"); gzipFilter.setInitParameter("minGzipSize", configs.getApiGzipMinSize()); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java deleted file mode 100644 index 809176be..0000000 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/AmbariMetricSink.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -//package org.apache.hadoop.metrics2.sink.ambari; - -package org.apache.ambari.server.metrics.system; - -import java.util.Map; - -public interface AmbariMetricSink { - /** - * initialize Collector URI and sink frequency to publish the metrics to AMS - **/ - void init(String protocol, String collectorUri, int frequency); - - /** - * Publish metrics to Collector - **/ - void publish(Map<String, Number> metricsMap); -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java index 23845c9..4a613f0 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsService.java @@ -15,12 +15,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.ambari.server.metrics.system; -public interface MetricsService extends Runnable { +import java.util.Collection; + +import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource; + + +public interface MetricsService{ /** * Set up configuration **/ - void init(); -} + void start(); + /** + * Get Configured sources + * @return + */ + Collection<AbstractMetricsSource> getSources(); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java new file mode 100644 index 0000000..3096a27 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSink.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.metrics.system; + +import java.util.List; + +import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration; + + +public interface MetricsSink { + + /** + * initialize Sink passing in configuration + **/ + void init(MetricsConfiguration configuration); + + /** + * Publish metrics to Collector + **/ + void publish(List<SingleMetric> metrics); + + + /** + * Returns if the sink is initialized. + */ + boolean isInitialized(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java index cf10408..400dcb6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/MetricsSource.java @@ -15,11 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.ambari.server.metrics.system; +import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration; + public interface MetricsSource extends Runnable{ /** * initialize sink **/ - void init(AmbariMetricSink sink); -} + void init(MetricsConfiguration configuration, MetricsSink sink); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java new file mode 100644 index 0000000..b67c0df --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/SingleMetric.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.metrics.system; + +public class SingleMetric { + + String metricName; + double value; + long timestamp; + + public SingleMetric (String metricName, double value, long timestamp) { + this.metricName = metricName; + this.value = value; + this.timestamp = timestamp; + } + + public String getMetricName() { + return metricName; + } + + public double getValue() { + return value; + } + + public long getTimestamp() { + return timestamp; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java index 58e2045..6bdd0ba 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AbstractMetricsSource.java @@ -17,19 +17,20 @@ */ package org.apache.ambari.server.metrics.system.impl; -import org.apache.ambari.server.metrics.system.AmbariMetricSink; +import org.apache.ambari.server.metrics.system.MetricsSink; import org.apache.ambari.server.metrics.system.MetricsSource; +import org.apache.ambari.server.metrics.system.SingleMetric; -import java.util.Map; +import java.util.List; public abstract class AbstractMetricsSource implements MetricsSource { - protected AmbariMetricSink sink; + protected MetricsSink sink; /** * Pass metrics sink to metrics source **/ @Override - public void init(AmbariMetricSink sink) { + public void init(MetricsConfiguration configuration, MetricsSink sink) { this.sink = sink; } @@ -37,5 +38,5 @@ public abstract class AbstractMetricsSource implements MetricsSource { * Get metrics at the instance * @return a map for metrics that maps metrics name to metrics value **/ - abstract public Map<String, Number> getMetrics(); + abstract public List<SingleMetric> getMetrics(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java index d42dbdf..be24988 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/AmbariMetricSinkImpl.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,73 +17,177 @@ */ package org.apache.ambari.server.metrics.system.impl; -import java.io.IOException; + import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; -import org.apache.ambari.server.metrics.system.AmbariMetricSink; -import org.apache.commons.lang.ClassUtils; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider; +import org.apache.ambari.server.controller.internal.ServiceConfigVersionResourceProvider; +import org.apache.ambari.server.controller.spi.Predicate; +import org.apache.ambari.server.controller.spi.Request; +import org.apache.ambari.server.controller.spi.Resource; +import org.apache.ambari.server.controller.spi.ResourceProvider; +import org.apache.ambari.server.controller.utilities.PredicateBuilder; +import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.metrics.system.MetricsSink; +import org.apache.ambari.server.metrics.system.SingleMetric; +import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.ServiceComponent; +import org.apache.ambari.server.state.ServiceComponentHost; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import jline.internal.Log; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; +import org.springframework.security.core.context.SecurityContextHolder; + +public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink { + private static final String AMBARI_SERVER_APP_ID = "ambari_server"; + private Collection<String> collectorHosts; -public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements AmbariMetricSink { - private static final String APP_ID = "ambari_server"; - private int timeoutSeconds = 10; - private String collectorProtocol; private String collectorUri; + private String port; + private String protocol; private String hostName; - private int counter = 0; - private int frequency; - private List<TimelineMetric> buffer = new ArrayList<>(); + private AmbariManagementController ambariManagementController; + private TimelineMetricsCache timelineMetricsCache; + private boolean isInitialized = false; + + public AmbariMetricSinkImpl(AmbariManagementController amc) { + this.ambariManagementController = amc; + } + @Override - public void init(String protocol, String collectorUri, int frequency) { + public void init(MetricsConfiguration configuration) { + + if (ambariManagementController == null) { + return; + } + + InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken("admin"); + authenticationToken.setAuthenticated(true); + SecurityContextHolder.getContext().setAuthentication(authenticationToken); + Clusters clusters = ambariManagementController.getClusters(); + String ambariMetricsServiceName = "AMBARI_METRICS"; + collectorHosts = new HashSet<>(); + + for (Map.Entry<String, Cluster> kv : clusters.getClusters().entrySet()) { + String clusterName = kv.getKey(); + Cluster c = kv.getValue(); + Resource.Type type = Resource.Type.ServiceConfigVersion; + + Set<String> propertyIds = new HashSet<String>(); + propertyIds.add(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID); + + Predicate predicate = new PredicateBuilder().property( + ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CLUSTER_NAME_PROPERTY_ID).equals(clusterName).and().property( + ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_SERVICE_NAME_PROPERTY_ID).equals(ambariMetricsServiceName).and().property( + ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_IS_CURRENT_PROPERTY_ID).equals("true").toPredicate(); + + Request request = PropertyHelper.getReadRequest(propertyIds); + + ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider( + type, + PropertyHelper.getPropertyIds(type), + PropertyHelper.getKeyPropertyIds(type), + ambariManagementController); + + try { + //get collector host(s) + Service service = c.getService(ambariMetricsServiceName); + if (service != null) { + for (String component : service.getServiceComponents().keySet()) { + ServiceComponent sc = service.getServiceComponents().get(component); + for (ServiceComponentHost serviceComponentHost : sc.getServiceComponentHosts().values()) { + if (serviceComponentHost.getServiceComponentName().equals("METRICS_COLLECTOR")) { + collectorHosts.add(serviceComponentHost.getHostName()); + } + } + } + } + + // get collector port and protocol + Set<Resource> resources = provider.getResources(request, predicate); + + for (Resource resource : resources) { + if (resource != null) { + ArrayList<LinkedHashMap<Object, Object>> configs = (ArrayList<LinkedHashMap<Object, Object>>) + resource.getPropertyValue(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID); + for (LinkedHashMap<Object, Object> config : configs) { + if (config != null && config.get("type").equals("ams-site")) { + TreeMap<Object, Object> properties = (TreeMap<Object, Object>) config.get("properties"); + String timelineWebappAddress = (String) properties.get("timeline.metrics.service.webapp.address"); + if (StringUtils.isNotEmpty(timelineWebappAddress) && timelineWebappAddress.contains(":")) { + port = timelineWebappAddress.split(":")[1]; + } + String httpPolicy = (String) properties.get("timeline.metrics.service.http.policy"); + protocol = httpPolicy.equals("HTTP_ONLY") ? "http" : "https"; + break; + } + } + } + } + } catch (Exception e) { + LOG.info("Exception caught when retrieving Collector URI", e); + } + } + + collectorUri = getCollectorUri(findPreferredCollectHost()); + hostName = configuration.getProperty("ambariserver.hostname.override", getDefaultLocalHostName()); + + int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE, + String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT))); + int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL, + String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS))); - /** - * Protocol is either HTTP or HTTPS, and the collectorURI is the domain name of the collector - * An example of the complete collector URI might be: http://c6403.ambari.org/ws/v1/timeline/metrics - */ - this.frequency = frequency; - this.collectorProtocol = protocol; - this.collectorUri = getCollectorUri(collectorUri); + timelineMetricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); + if (CollectionUtils.isNotEmpty(collectorHosts)) { + isInitialized = true; + } + } + + private String getDefaultLocalHostName() { try { - hostName = InetAddress.getLocalHost().getHostName(); + return InetAddress.getLocalHost().getCanonicalHostName(); } catch (UnknownHostException e) { - Log.info("Error getting host address"); + LOG.info("Error getting host address"); } + return null; } @Override - public void publish(Map<String, Number> metricsMap) { - List<TimelineMetric> metricsList = createMetricsList(metricsMap); + public void publish(List<SingleMetric> metrics) { - if(counter > frequency) { - TimelineMetrics timelineMetrics = new TimelineMetrics(); - timelineMetrics.setMetrics(buffer); - String connectUrl = collectorUri; - String jsonData = null; - try { - jsonData = mapper.writeValueAsString(timelineMetrics); - } catch (IOException e) { - LOG.error("Unable to parse metrics", e); - } - if (jsonData != null) { - emitMetricsJson(connectUrl, jsonData); + //If Sink not yet initialized, drop the metrics on the floor. + if (isInitialized) { + List<TimelineMetric> metricList = getFilteredMetricList(metrics); + if (!metricList.isEmpty()) { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricList); + emitMetrics(timelineMetrics); } - counter = 0; - } else { - buffer.addAll(metricsList); - counter++; } + } + @Override + public boolean isInitialized() { + return isInitialized; } @@ -94,22 +198,22 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements */ @Override protected String getCollectorUri(String host) { - return getCollectorProtocol() + "://" + host + WS_V1_TIMELINE_METRICS; + return constructContainerMetricUri(protocol, host, port); } @Override protected String getCollectorProtocol() { - return collectorProtocol; + return protocol; } @Override protected String getCollectorPort() { - return null; + return port; } @Override protected int getTimeoutSeconds() { - return timeoutSeconds; + return 10; } /** @@ -119,6 +223,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements */ @Override protected String getZookeeperQuorum() { + //Ignoring Zk Fallback. return null; } @@ -129,7 +234,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements */ @Override protected Collection<String> getConfiguredCollectorHosts() { - return null; + return collectorHosts; } /** @@ -142,16 +247,22 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements return hostName; } - private List<TimelineMetric> createMetricsList(Map<String, Number> metricsMap) { - final List<TimelineMetric> metricsList = new ArrayList<>(); - for (Map.Entry<String, Number> entry : metricsMap.entrySet()) { - final long currentTimeMillis = System.currentTimeMillis(); - String metricsName = entry.getKey(); - Number value = entry.getValue(); - TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricsName, value); - metricsList.add(metric); + private List<TimelineMetric> getFilteredMetricList(List<SingleMetric> metrics) { + final List<TimelineMetric> metricList = new ArrayList<>(); + for (SingleMetric metric : metrics) { + + String metricName = metric.getMetricName(); + Double value = metric.getValue(); + + TimelineMetric timelineMetric = createTimelineMetric(metric.getTimestamp(), AMBARI_SERVER_APP_ID, metricName, value); + timelineMetricsCache.putTimelineMetric(timelineMetric, false); + TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(metricName); + + if (cachedMetric != null) { + metricList.add(cachedMetric); + } } - return metricsList; + return metricList; } private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, @@ -161,7 +272,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements timelineMetric.setHostName(hostName); timelineMetric.setAppId(component); timelineMetric.setStartTime(currentTimeMillis); - timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number")); + timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue()); return timelineMetric; } http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java deleted file mode 100644 index 705971f..0000000 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/Configuration.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.server.metrics.system.impl; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; - -public class Configuration { - public static final String CONFIG_FILE = "metrics.properties"; - - private static Logger LOG = LoggerFactory.getLogger(Configuration.class); - private Properties properties; - - public Configuration() { - this(readConfigFile()); - } - - public Configuration(Properties properties) { - this.properties = properties; - } - - private static Properties readConfigFile() { - Properties properties = new Properties(); - - //Get property file stream from classpath - InputStream inputStream = Configuration.class.getClassLoader().getResourceAsStream(CONFIG_FILE); - - if (inputStream == null) { - throw new RuntimeException(CONFIG_FILE + " not found in classpath"); - } - - // load the properties - try { - properties.load(inputStream); - inputStream.close(); - } catch (FileNotFoundException fnf) { - LOG.info("No configuration file " + CONFIG_FILE + " found in classpath.", fnf); - } catch (IOException ie) { - throw new IllegalArgumentException("Can't read configuration file " + - CONFIG_FILE, ie); - } - - return properties; - } - - /** - * Get the property value for the given key. - * - * @return the property value - */ - public String getProperty(String key) { - return properties.getProperty(key); - } - - /** - * Get the property value for the given key. - * - * @return the property value - */ - public String getProperty(String key, String defaultValue) { - return properties.getProperty(key, defaultValue); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java index a04ca43..cb9f275 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/JvmMetricsSource.java @@ -17,41 +17,47 @@ */ package org.apache.ambari.server.metrics.system.impl; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.ambari.server.metrics.system.MetricsSink; +import org.apache.ambari.server.metrics.system.SingleMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.Gauge; import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricFilter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricSet; import com.codahale.metrics.jvm.BufferPoolMetricSet; +import com.codahale.metrics.jvm.FileDescriptorRatioGauge; import com.codahale.metrics.jvm.GarbageCollectorMetricSet; import com.codahale.metrics.jvm.MemoryUsageGaugeSet; import com.codahale.metrics.jvm.ThreadStatesGaugeSet; -import org.apache.ambari.server.metrics.system.AmbariMetricSink; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.management.ManagementFactory; -import java.util.HashMap; -import java.util.Map; public class JvmMetricsSource extends AbstractMetricsSource { static final MetricRegistry registry = new MetricRegistry(); private static Logger LOG = LoggerFactory.getLogger(JvmMetricsSource.class); @Override - public void init(AmbariMetricSink sink) { - super.init(sink); - registerAll("gc", new GarbageCollectorMetricSet(), registry); - registerAll("buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry); - registerAll("memory", new MemoryUsageGaugeSet(), registry); - registerAll("threads", new ThreadStatesGaugeSet(), registry); + public void init(MetricsConfiguration configuration, MetricsSink sink) { + super.init(configuration, sink); + registerAll("jvm.gc", new GarbageCollectorMetricSet(), registry); + registerAll("jvm.buffers", new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer()), registry); + registerAll("jvm.memory", new MemoryUsageGaugeSet(), registry); + registerAll("jvm.threads", new ThreadStatesGaugeSet(), registry); + registry.register("jvm.file.open.descriptor.ratio", new FileDescriptorRatioGauge()); } @Override public void run() { - this.sink.publish(getMetrics()); - LOG.info("********* Published system metrics to sink **********"); + sink.publish(getMetrics()); + LOG.debug("********* Published system metrics to sink **********"); } - private void registerAll(String prefix, MetricSet metricSet, MetricRegistry registry) { for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) { if (entry.getValue() instanceof MetricSet) { @@ -63,13 +69,26 @@ public class JvmMetricsSource extends AbstractMetricsSource { } @Override - public Map<String, Number> getMetrics() { - Map<String, Number> map = new HashMap<>(); - for (String metricName : registry.getGauges().keySet()) { - if (metricName.equals("threads.deadlocks") ) continue; - Number value = (Number)registry.getGauges().get(metricName).getValue(); - map.put(metricName, value); + public List<SingleMetric> getMetrics() { + + List<SingleMetric> metrics = new ArrayList<>(); + Map<String, Gauge> gaugeSet = registry.getGauges(new NonNumericMetricFilter()); + for (String metricName : gaugeSet.keySet()) { + Number value = (Number) gaugeSet.get(metricName).getValue(); + metrics.add(new SingleMetric(metricName, value.doubleValue(), System.currentTimeMillis())); + } + + return metrics; + } + + public class NonNumericMetricFilter implements MetricFilter { + + @Override + public boolean matches(String name, Metric metric) { + if (name.equalsIgnoreCase("jvm.threads.deadlocks")) { + return false; + } + return true; } - return map; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java new file mode 100644 index 0000000..ca83a53 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsConfiguration.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.metrics.system.impl; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsConfiguration { + public static final String CONFIG_FILE = "metrics.properties"; + + private static Logger LOG = LoggerFactory.getLogger(MetricsConfiguration.class); + private Properties properties; + + public static MetricsConfiguration getMetricsConfiguration() { + Properties properties = readConfigFile(); + if (properties == null || properties.isEmpty()) { + return null; + } + return new MetricsConfiguration(properties); + } + + public MetricsConfiguration(Properties properties) { + this.properties = properties; + } + + private static Properties readConfigFile() { + Properties properties = new Properties(); + + //Get property file stream from classpath + InputStream inputStream = MetricsConfiguration.class.getClassLoader().getResourceAsStream(CONFIG_FILE); + + if (inputStream == null) { + LOG.info(CONFIG_FILE + " not found in classpath"); + return null; + } + + // load the properties + try { + properties.load(inputStream); + inputStream.close(); + } catch (FileNotFoundException fnf) { + LOG.info("No configuration file " + CONFIG_FILE + " found in classpath."); + return null; + } catch (IOException ie) { + LOG.error("Can't read configuration file " + CONFIG_FILE, ie); + return null; + } + + return properties; + } + + /** + * Get the property value for the given key. + * + * @return the property value + */ + public String getProperty(String key) { + return properties.getProperty(key); + } + + /** + * Get the property value for the given key. + * + * @return the property value + */ + public String getProperty(String key, String defaultValue) { + return properties.getProperty(key, defaultValue); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java index 1645ebf..d0d2e69 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java @@ -17,152 +17,110 @@ */ package org.apache.ambari.server.metrics.system.impl; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; -import com.google.inject.Inject; -import com.google.inject.Singleton; import org.apache.ambari.server.controller.AmbariManagementController; -import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider; -import org.apache.ambari.server.controller.internal.ServiceConfigVersionResourceProvider; -import org.apache.ambari.server.controller.spi.Predicate; -import org.apache.ambari.server.controller.spi.Request; -import org.apache.ambari.server.controller.spi.Resource; -import org.apache.ambari.server.controller.spi.ResourceProvider; -import org.apache.ambari.server.controller.utilities.PredicateBuilder; -import org.apache.ambari.server.controller.utilities.PropertyHelper; -import org.apache.ambari.server.metrics.system.AmbariMetricSink; import org.apache.ambari.server.metrics.system.MetricsService; -import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken; -import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.Clusters; - +import org.apache.ambari.server.metrics.system.MetricsSink; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.security.core.context.SecurityContextHolder; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import com.google.inject.Inject; +import com.google.inject.Singleton; @Singleton public class MetricsServiceImpl implements MetricsService { private static Logger LOG = LoggerFactory.getLogger(MetricsServiceImpl.class); private Map<String, AbstractMetricsSource> sources = new HashMap<>(); - private AmbariMetricSink sink = new AmbariMetricSinkImpl(); - private String collectorUri = ""; - private String collectorProtocol = ""; - private Configuration configuration; + private MetricsSink sink = null; + private MetricsConfiguration configuration = null; @Inject AmbariManagementController amc; @Override - public void init() { + public void start() { + LOG.info("********* Initializing AmbariServer Metrics Service **********"); try { - configuration = new Configuration(); - if (collectorUri.isEmpty() || collectorProtocol.isEmpty()) { - setCollectorUri(); + configuration = MetricsConfiguration.getMetricsConfiguration(); + if (configuration == null) { + return; + } + sink = new AmbariMetricSinkImpl(amc); + initializeMetricsSink(); + initializeMetricSources(); + + if (!sink.isInitialized()) { + //If Sink is not initialized, Service will check for every 5 mins. + Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + LOG.info("Checking for metrics sink initialization"); + if (!sink.isInitialized()) { + initializeMetricsSink(); + } + } + }, 5, 5, TimeUnit.MINUTES); } - configureSourceAndSink(); } catch (Exception e) { - LOG.info("Error initializing MetricsService", e); - } - - } - @Override - public void run() { - final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - for (Map.Entry<String, AbstractMetricsSource> entry : sources.entrySet()) { - publishMetrics(executor, entry); + LOG.info("Unable to initialize MetricsService : ", e.getMessage()); } } + private void initializeMetricsSink() { - private void setCollectorUri() { - InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken("admin"); - authenticationToken.setAuthenticated(true); - SecurityContextHolder.getContext().setAuthentication(authenticationToken); - Clusters clusters = amc.getClusters(); - for (Map.Entry<String, Cluster> kv : clusters.getClusters().entrySet()) { - String clusterName = kv.getKey(); - Resource.Type type = Resource.Type.ServiceConfigVersion; - - Set<String> propertyIds = new HashSet<String>(); - propertyIds.add(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID); - - Predicate predicate = new PredicateBuilder().property( - ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CLUSTER_NAME_PROPERTY_ID).equals(clusterName).and().property( - ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_SERVICE_NAME_PROPERTY_ID).equals("AMBARI_METRICS").and().property( - ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_IS_CURRENT_PROPERTY_ID).equals("true").toPredicate(); - - Request request = PropertyHelper.getReadRequest(propertyIds); - - ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider( - type, - PropertyHelper.getPropertyIds(type), - PropertyHelper.getKeyPropertyIds(type), - amc); - - try { - Set<Resource> resources = provider.getResources(request, predicate); - - // get collector uri - for (Resource resource : resources) { - if (resource != null) { - ArrayList<LinkedHashMap<Object, Object>> configs = (ArrayList<LinkedHashMap<Object, Object>>) - resource.getPropertyValue(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID); - for (LinkedHashMap<Object, Object> config : configs) { - if (config != null && config.get("type").equals("ams-site")) { - TreeMap<Object, Object> properties = (TreeMap<Object, Object>) config.get("properties"); - collectorUri = (String) properties.get("timeline.metrics.service.webapp.address"); - String which_protocol = (String) properties.get("timeline.metrics.service.http.policy"); - collectorProtocol = which_protocol.equals("HTTP_ONLY") ? "http" : "https"; - break; - } - } - } - } - } catch (Exception e) { - LOG.info("Throwing exception when retrieving Collector URI", e); - } - } + LOG.info("********* Configuring Metric Sink **********"); + sink.init(configuration); } - private void configureSourceAndSink() { + private void initializeMetricSources() { try { - LOG.info("********* Configuring Ambari Metrics Sink and Source**********"); - int frequency = Integer.parseInt(configuration.getProperty("sink.frequency", "10")); // default value 10 - sink.init(collectorProtocol, collectorUri, frequency); - String[] sourceNames = configuration.getProperty("metrics.sources").split(","); + + LOG.info("********* Configuring Metric Sources **********"); + String commaSeparatedSources = configuration.getProperty("metric.sources"); + + if (StringUtils.isEmpty(commaSeparatedSources)) { + LOG.info("No sources configured."); + return; + } + + String[] sourceNames = commaSeparatedSources.split(","); for (String sourceName: sourceNames) { String className = configuration.getProperty("source." + sourceName + ".class"); Class t = Class.forName(className); AbstractMetricsSource src = (AbstractMetricsSource)t.newInstance(); - src.init(sink); + src.init(configuration, sink); sources.put(sourceName, src); } - } - catch (Exception e) { - LOG.info("Throwing exception when registering metric sink and source", e); + + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + for (Map.Entry<String, AbstractMetricsSource> entry : sources.entrySet()) { + startSource(executor, entry); + } + } catch (Exception e) { + LOG.error("Error when configuring metric sink and source", e); } } - private void publishMetrics(ScheduledExecutorService executor, Map.Entry<String, AbstractMetricsSource> entry) { + private void startSource(ScheduledExecutorService executor, Map.Entry<String, AbstractMetricsSource> entry) { String className = entry.getKey(); AbstractMetricsSource source = entry.getValue(); String interval = "source." + className + ".interval"; - int duration = Integer.parseInt(configuration.getProperty(interval, "5")); // default value 5 + int duration = Integer.parseInt(configuration.getProperty(interval, "10")); // default value 10 seconds try { executor.scheduleWithFixedDelay(source, 0, duration, TimeUnit.SECONDS); - } catch (Exception e) { - LOG.info("Throwing exception when failing scheduling source", e); + LOG.info("Throwing exception when starting metric source", e); } } -} + + public Collection<AbstractMetricsSource> getSources() { + return sources.values(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java new file mode 100644 index 0000000..9f649b4 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/JvmMetricsSourceTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.metric.system.impl; + +import org.apache.ambari.server.metrics.system.MetricsSink; +import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource; +import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration; +import org.junit.Test; + +public class JvmMetricsSourceTest { + + @Test + public void testJvmSourceInit() { + JvmMetricsSource jvmMetricsSource = new JvmMetricsSource(); + MetricsConfiguration configuration = MetricsConfiguration.getMetricsConfiguration(); + MetricsSink sink = new TestAmbariMetricsSinkImpl(); + jvmMetricsSource.init(configuration, sink); + org.junit.Assert.assertEquals(jvmMetricsSource.getMetrics().size(), 39); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java new file mode 100644 index 0000000..4029f25 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/MetricsServiceTest.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.metric.system.impl; + +import org.apache.ambari.server.metrics.system.MetricsService; +import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource; +import org.apache.ambari.server.metrics.system.impl.JvmMetricsSource; +import org.apache.ambari.server.metrics.system.impl.MetricsServiceImpl; +import org.junit.Test; + +import junit.framework.Assert; + +public class MetricsServiceTest { + + @Test + public void testMetricsServiceStart() { + MetricsService metricsService = new MetricsServiceImpl(); + metricsService.start(); + Assert.assertTrue(metricsService.getSources().size() == 2); + for (AbstractMetricsSource source : metricsService.getSources()) { + Assert.assertTrue ( source instanceof JvmMetricsSource || source instanceof TestMetricsSource); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java new file mode 100644 index 0000000..3565504 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestAmbariMetricsSinkImpl.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.metric.system.impl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.ambari.server.metrics.system.MetricsSink; +import org.apache.ambari.server.metrics.system.SingleMetric; +import org.apache.ambari.server.metrics.system.impl.MetricsConfiguration; +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; + +public class TestAmbariMetricsSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink { + + @Override + public void publish(List<SingleMetric> metrics) { + } + + @Override + public boolean isInitialized() { + return true; + } + + @Override + protected String getCollectorUri(String host) { + return constructContainerMetricUri(getCollectorProtocol(), host, getCollectorPort()); + } + + @Override + protected String getCollectorProtocol() { + return "http"; + } + + @Override + protected String getCollectorPort() { + return "6188"; + } + + @Override + protected int getTimeoutSeconds() { + return 1000; + } + + @Override + protected String getZookeeperQuorum() { + return null; + } + + @Override + protected Collection<String> getConfiguredCollectorHosts() { + return Collections.singletonList("localhost"); + } + + @Override + protected String getHostname() { + return "localhost"; + } + + @Override + public void init(MetricsConfiguration configuration) { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java new file mode 100644 index 0000000..acf1586 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/metric/system/impl/TestMetricsSource.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.metric.system.impl; + +import java.util.List; + +import org.apache.ambari.server.metrics.system.SingleMetric; +import org.apache.ambari.server.metrics.system.impl.AbstractMetricsSource; + +public class TestMetricsSource extends AbstractMetricsSource { + + @Override + public List<SingleMetric> getMetrics() { + return null; + } + + @Override + public void run() { + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/b9f25b72/ambari-server/src/test/resources/metrics.properties ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/resources/metrics.properties b/ambari-server/src/test/resources/metrics.properties new file mode 100644 index 0000000..5eee064 --- /dev/null +++ b/ambari-server/src/test/resources/metrics.properties @@ -0,0 +1,29 @@ +# Copyright 2011 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +#### Source Configs ##### +# Source interval determines how often the metric is sent to sink. Its unit is in seconds +metric.sources=jvm,testsource + +source.jvm.interval=10 +source.jvm.class=org.apache.ambari.server.metrics.system.impl.JvmMetricsSource +source.testsource.class=org.apache.ambari.server.metric.system.impl.TestMetricsSource + +#### Sink Configs ##### +# Sink frequency determines how often the sink publish the metrics from buffer to AMS. \ No newline at end of file
