AMBARI-4881. Clean up JMXPropertyProvider hacks for STORM metrics (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/cc4dfe35 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/cc4dfe35 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/cc4dfe35 Branch: refs/heads/branch-alerts-dev Commit: cc4dfe351f3ece0082cfbbe4a3657202cf93967b Parents: ef1d698 Author: Andrew Onishuk <[email protected]> Authored: Thu Oct 2 15:03:11 2014 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Thu Oct 2 15:03:11 2014 +0300 ---------------------------------------------------------------------- .../internal/AbstractProviderModule.java | 24 +- .../controller/internal/ResourceImpl.java | 19 +- .../internal/StackDefinedPropertyProvider.java | 227 ++-- .../server/controller/jmx/JMXHostProvider.java | 13 - .../controller/jmx/JMXPropertyProvider.java | 274 +---- .../controller/metrics/MetricsHostProvider.java | 38 + .../controller/metrics/MetricsProvider.java | 302 +++++ .../metrics/RestMetricsPropertyProvider.java | 448 ++++++++ .../stacks/HDP/2.1/services/STORM/metrics.json | 55 +- .../stacks/HDP/2.2/services/STORM/metrics.json | 1079 ++++++++++++++++++ .../StackDefinedPropertyProviderTest.java | 357 +++--- .../controller/jmx/JMXPropertyProviderTest.java | 513 --------- .../metrics/JMXPropertyProviderTest.java | 522 +++++++++ .../HDP/2.1.1/services/STORM/metrics.json | 209 ++-- 14 files changed, 2925 insertions(+), 1155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java index fc60210..b5164fe 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AbstractProviderModule.java @@ -33,6 +33,7 @@ import org.apache.ambari.server.controller.ganglia.GangliaReportPropertyProvider import org.apache.ambari.server.controller.ganglia.GangliaHostProvider; import org.apache.ambari.server.controller.jmx.JMXHostProvider; import org.apache.ambari.server.controller.jmx.JMXPropertyProvider; +import org.apache.ambari.server.controller.metrics.MetricsHostProvider; import org.apache.ambari.server.controller.nagios.NagiosPropertyProvider; import org.apache.ambari.server.controller.spi.*; import org.apache.ambari.server.controller.utilities.PredicateBuilder; @@ -57,7 +58,7 @@ import java.util.concurrent.ConcurrentHashMap; /** * An abstract provider module implementation. */ -public abstract class AbstractProviderModule implements ProviderModule, ResourceProviderObserver, JMXHostProvider, GangliaHostProvider { +public abstract class AbstractProviderModule implements ProviderModule, ResourceProviderObserver, JMXHostProvider, GangliaHostProvider, MetricsHostProvider { private static final int PROPERTY_REQUEST_CONNECT_TIMEOUT = 5000; private static final int PROPERTY_REQUEST_READ_TIMEOUT = 10000; @@ -204,7 +205,7 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource } - // ----- JMXHostProvider --------------------------------------------------- + // ----- MetricsHostProvider --------------------------------------------------- @Override public String getHostName(String clusterName, String componentName) throws SystemException { @@ -225,6 +226,8 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource return hosts; } + // ----- JMXHostProvider --------------------------------------------------- + @Override public String getPort(String clusterName, String componentName) throws SystemException { // Parent map need not be synchronized @@ -453,11 +456,11 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource type, streamProvider, this, + this, PropertyHelper.getPropertyId("ServiceComponentInfo", "cluster_name"), null, PropertyHelper.getPropertyId("ServiceComponentInfo", "component_name"), - PropertyHelper.getPropertyId("ServiceComponentInfo", "state"), - Collections.singleton("STARTED")); + PropertyHelper.getPropertyId("ServiceComponentInfo", "state")); PropertyProvider gpp = createGangliaComponentPropertyProvider( type, @@ -471,6 +474,7 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource type, this, this, + this, streamProvider, PropertyHelper.getPropertyId("ServiceComponentInfo", "cluster_name"), null, @@ -486,11 +490,11 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource type, streamProvider, this, + this, PropertyHelper.getPropertyId("HostRoles", "cluster_name"), PropertyHelper.getPropertyId("HostRoles", "host_name"), PropertyHelper.getPropertyId("HostRoles", "component_name"), - PropertyHelper.getPropertyId("HostRoles", "state"), - Collections.singleton("STARTED")); + PropertyHelper.getPropertyId("HostRoles", "state")); PropertyProvider gpp = createGangliaHostComponentPropertyProvider( type, @@ -505,6 +509,7 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource type, this, this, + this, streamProvider, PropertyHelper.getPropertyId("HostRoles", "cluster_name"), PropertyHelper.getPropertyId("HostRoles", "host_name"), @@ -728,14 +733,15 @@ public abstract class AbstractProviderModule implements ProviderModule, Resource */ private PropertyProvider createJMXPropertyProvider(Resource.Type type, StreamProvider streamProvider, JMXHostProvider jmxHostProvider, + MetricsHostProvider metricsHostProvider, String clusterNamePropertyId, String hostNamePropertyId, String componentNamePropertyId, - String statePropertyId, - Set<String> healthyStates) { + String statePropertyId) { return new JMXPropertyProvider(PropertyHelper.getJMXPropertyIds(type), streamProvider, - jmxHostProvider, clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId, statePropertyId, healthyStates); + jmxHostProvider, metricsHostProvider, clusterNamePropertyId, hostNamePropertyId, + componentNamePropertyId, statePropertyId); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java index 15fb961..6f963c9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ResourceImpl.java @@ -21,10 +21,8 @@ package org.apache.ambari.server.controller.internal; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.utilities.PropertyHelper; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; /** * Simple resource implementation. @@ -39,14 +37,15 @@ public class ResourceImpl implements Resource { /** * The map of property maps keyed by property category. */ - private final Map<String, Map<String, Object>> propertiesMap = new TreeMap<String, Map<String, Object>>(); + private final Map<String, Map<String, Object>> propertiesMap = + Collections.synchronizedMap(new TreeMap<String, Map<String, Object>>()); // ----- Constructors ------------------------------------------------------ /** * Create a resource of the given type. * - * @param type the resource type + * @param type the resource type */ public ResourceImpl(Type type) { this.type = type; @@ -55,7 +54,7 @@ public class ResourceImpl implements Resource { /** * Copy constructor * - * @param resource the resource to copy + * @param resource the resource to copy */ public ResourceImpl(Resource resource) { this(resource, null); @@ -65,8 +64,8 @@ public class ResourceImpl implements Resource { * Construct a resource from the given resource, setting only the properties * that are found in the given set of property and category ids. * - * @param resource the resource to copy - * @param propertyIds the set of requested property and category ids + * @param resource the resource to copy + * @param propertyIds the set of requested property and category ids */ public ResourceImpl(Resource resource, Set<String> propertyIds) { this.type = resource.getType(); @@ -106,7 +105,7 @@ public class ResourceImpl implements Resource { Map<String, Object> properties = propertiesMap.get(categoryKey); if (properties == null) { - properties = new TreeMap<String, Object>(); + properties = Collections.synchronizedMap(new TreeMap<String, Object>()); propertiesMap.put(categoryKey, properties); } properties.put(PropertyHelper.getPropertyName(id), value); http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java index 51c7565..32411e9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProvider.java @@ -35,6 +35,7 @@ import org.apache.ambari.server.controller.ganglia.GangliaHostProvider; import org.apache.ambari.server.controller.ganglia.GangliaPropertyProvider; import org.apache.ambari.server.controller.jmx.JMXHostProvider; import org.apache.ambari.server.controller.jmx.JMXPropertyProvider; +import org.apache.ambari.server.controller.metrics.MetricsHostProvider; import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.PropertyProvider; import org.apache.ambari.server.controller.spi.Request; @@ -56,57 +57,71 @@ import com.google.inject.Injector; * This class analyzes a service's metrics to determine if additional * metrics should be fetched. It's okay to maintain state here since these * are done per-request. - * */ public class StackDefinedPropertyProvider implements PropertyProvider { private static final Logger LOG = LoggerFactory.getLogger(StackDefinedPropertyProvider.class); - + @Inject private static Clusters clusters = null; @Inject private static AmbariMetaInfo metaInfo = null; - + @Inject + private static Injector injector = null; + + private Resource.Type type = null; private String clusterNamePropertyId = null; private String hostNamePropertyId = null; private String componentNamePropertyId = null; - private String jmxStatePropertyId = null; + private String resourceStatePropertyId = null; private ComponentSSLConfiguration sslConfig = null; private StreamProvider streamProvider = null; private JMXHostProvider jmxHostProvider; private GangliaHostProvider gangliaHostProvider; private PropertyProvider defaultJmx = null; private PropertyProvider defaultGanglia = null; - + + private final MetricsHostProvider metricsHostProvider; + + /** + * PropertyHelper/AbstractPropertyProvider expect map of maps, + * that's why we wrap metrics into map + */ + public static final String WRAPPED_METRICS_KEY = "WRAPPED_METRICS_KEY"; + @Inject public static void init(Injector injector) { clusters = injector.getInstance(Clusters.class); metaInfo = injector.getInstance(AmbariMetaInfo.class); + StackDefinedPropertyProvider.injector = injector; } - + public StackDefinedPropertyProvider(Resource.Type type, JMXHostProvider jmxHostProvider, GangliaHostProvider gangliaHostProvider, + MetricsHostProvider metricsHostProvider, StreamProvider streamProvider, String clusterPropertyId, String hostPropertyId, String componentPropertyId, - String jmxStatePropertyId, + String resourceStatePropertyId, PropertyProvider defaultJmxPropertyProvider, PropertyProvider defaultGangliaPropertyProvider - ) { - + ) { + + this.metricsHostProvider = metricsHostProvider; + if (null == clusterPropertyId) throw new NullPointerException("Cluster name property id cannot be null"); if (null == componentPropertyId) throw new NullPointerException("Component name property id cannot be null"); - + this.type = type; - + clusterNamePropertyId = clusterPropertyId; hostNamePropertyId = hostPropertyId; componentNamePropertyId = componentPropertyId; - this.jmxStatePropertyId = jmxStatePropertyId; + this.resourceStatePropertyId = resourceStatePropertyId; this.jmxHostProvider = jmxHostProvider; this.gangliaHostProvider = gangliaHostProvider; sslConfig = ComponentSSLConfiguration.instance(); @@ -114,34 +129,34 @@ public class StackDefinedPropertyProvider implements PropertyProvider { defaultJmx = defaultJmxPropertyProvider; defaultGanglia = defaultGangliaPropertyProvider; } - - + + @Override public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate) throws SystemException { // only arrange for one instance of Ganglia and JMX instantiation - Map<String, Map<String, PropertyInfo>> gangliaMap = new HashMap<String, Map<String,PropertyInfo>>(); + Map<String, Map<String, PropertyInfo>> gangliaMap = new HashMap<String, Map<String, PropertyInfo>>(); Map<String, Map<String, PropertyInfo>> jmxMap = new HashMap<String, Map<String, PropertyInfo>>(); List<PropertyProvider> additional = new ArrayList<PropertyProvider>(); - + try { for (Resource r : resources) { String clusterName = r.getPropertyValue(clusterNamePropertyId).toString(); String componentName = r.getPropertyValue(componentNamePropertyId).toString(); - + Cluster cluster = clusters.getCluster(clusterName); StackId stack = cluster.getDesiredStackVersion(); String svc = metaInfo.getComponentToService(stack.getStackName(), stack.getStackVersion(), componentName); - + List<MetricDefinition> defs = metaInfo.getMetrics( stack.getStackName(), stack.getStackVersion(), svc, componentName, type.name()); - + if (null == defs || 0 == defs.size()) continue; - + for (MetricDefinition m : defs) { if (m.getType().equals("ganglia")) { gangliaMap.put(componentName, getPropertyInfo(m)); @@ -149,12 +164,20 @@ public class StackDefinedPropertyProvider implements PropertyProvider { jmxMap.put(componentName, getPropertyInfo(m)); } else { PropertyProvider pp = getDelegate(m); - if (null != pp) + if(pp == null) { + pp = getDelegate(m, + streamProvider, metricsHostProvider, + clusterNamePropertyId, hostNamePropertyId, + componentNamePropertyId, resourceStatePropertyId); + } + if(pp != null) { additional.add(pp); + } + } } } - + if (gangliaMap.size() > 0) { GangliaPropertyProvider gpp = type.equals (Resource.Type.Component) ? new GangliaComponentPropertyProvider(gangliaMap, @@ -163,22 +186,23 @@ public class StackDefinedPropertyProvider implements PropertyProvider { new GangliaHostComponentPropertyProvider(gangliaMap, streamProvider, sslConfig, gangliaHostProvider, clusterNamePropertyId, hostNamePropertyId, componentNamePropertyId); - + gpp.populateResources(resources, request, predicate); } else { defaultGanglia.populateResources(resources, request, predicate); } - + if (jmxMap.size() > 0) { JMXPropertyProvider jpp = new JMXPropertyProvider(jmxMap, streamProvider, - jmxHostProvider, clusterNamePropertyId, hostNamePropertyId, - componentNamePropertyId, jmxStatePropertyId, Collections.singleton("STARTED")); - + jmxHostProvider, metricsHostProvider, + clusterNamePropertyId, hostNamePropertyId, + componentNamePropertyId, resourceStatePropertyId); + jpp.populateResources(resources, request, predicate); } else { defaultJmx.populateResources(resources, request, predicate); } - + for (PropertyProvider pp : additional) { pp.populateResources(resources, request, predicate); } @@ -187,7 +211,7 @@ public class StackDefinedPropertyProvider implements PropertyProvider { e.printStackTrace(); throw new SystemException("Error loading deferred resources", e); } - + return resources; } @@ -195,59 +219,134 @@ public class StackDefinedPropertyProvider implements PropertyProvider { public Set<String> checkPropertyIds(Set<String> propertyIds) { return Collections.emptySet(); } - + /** * @param def the metric definition - * @return the converted Map required for JMX or Ganglia execution + * @return the converted Map required for JMX or Ganglia execution. + * Format: <metric name, property info> */ - private Map<String, PropertyInfo> getPropertyInfo(MetricDefinition def) { + private Map<String, PropertyInfo> getPropertyInfo(MetricDefinition def) { Map<String, PropertyInfo> defs = new HashMap<String, PropertyInfo>(); - - for (Entry<String,Metric> entry : def.getMetrics().entrySet()) { + + for (Entry<String, Metric> entry : def.getMetrics().entrySet()) { Metric metric = entry.getValue(); defs.put(entry.getKey(), new PropertyInfo( metric.getName(), metric.isTemporal(), metric.isPointInTime())); } - + return defs; } - + /** - * @param the metric definition for a component and resource type combination + * @param definition metric definition for a component and resource type combination * @return the custom property provider */ private PropertyProvider getDelegate(MetricDefinition definition) { + try { + Class<?> clz = Class.forName(definition.getType()); + + // singleton/factory try { - Class<?> clz = Class.forName(definition.getType()); - - // singleton/factory - try { - Method m = clz.getMethod("getInstance", Map.class, Map.class); - Object o = m.invoke(null, definition.getProperties(), definition.getMetrics()); - return PropertyProvider.class.cast(o); - } catch (Exception e) { - LOG.info("Could not load singleton or factory method for type '" + - definition.getType()); - } - - // try maps constructor - try { - Constructor<?> ct = clz.getConstructor(Map.class, Map.class); - Object o = ct.newInstance(definition.getProperties(), definition.getMetrics()); - return PropertyProvider.class.cast(o); - } catch (Exception e) { - LOG.info("Could not find contructor for type '" + - definition.getType()); - } - - // just new instance - return PropertyProvider.class.cast(clz.newInstance()); + Method m = clz.getMethod("getInstance", Map.class, Map.class); + Object o = m.invoke(null, definition.getProperties(), definition.getMetrics()); + return PropertyProvider.class.cast(o); + } catch (Exception e) { + LOG.info("Could not load singleton or factory method for type '" + + definition.getType()); + } + + // try maps constructor + try { + Constructor<?> ct = clz.getConstructor(Map.class, Map.class); + Object o = ct.newInstance(definition.getProperties(), definition.getMetrics()); + return PropertyProvider.class.cast(o); + } catch (Exception e) { + LOG.info("Could not find contructor for type '" + + definition.getType()); + } + + // just new instance + return PropertyProvider.class.cast(clz.newInstance()); + + } catch (Exception e) { + LOG.error("Could not load class " + definition.getType()); + return null; + } + } + + /** + * + * @param definition the metric definition for a component + * @param streamProvider the stream provider + * @param metricsHostProvider the metrics host provider + * @param clusterNamePropertyId the cluster name property id + * @param hostNamePropertyId the host name property id + * @param componentNamePropertyId the component name property id + * @param statePropertyId the state property id + * @return the custom property provider + */ + + private PropertyProvider getDelegate(MetricDefinition definition, + StreamProvider streamProvider, + MetricsHostProvider metricsHostProvider, + String clusterNamePropertyId, + String hostNamePropertyId, + String componentNamePropertyId, + String statePropertyId) { + Map<String, PropertyInfo> metrics = getPropertyInfo(definition); + HashMap<String, Map<String, PropertyInfo>> componentMetrics = + new HashMap<String, Map<String, PropertyInfo>>(); + componentMetrics.put(WRAPPED_METRICS_KEY, metrics); + + try { + Class<?> clz = Class.forName(definition.getType()); + // singleton/factory + try { + /* + * Interface for singleton/factory method invocation TBD + * when implementing the first real use + */ + Method m = clz.getMethod("getInstance", Map.class, Map.class); + Object o = m.invoke( + definition.getProperties(), componentMetrics, + streamProvider, clusterNamePropertyId, hostNamePropertyId, + componentNamePropertyId, statePropertyId); + return PropertyProvider.class.cast(o); + } catch (Exception e) { + LOG.info("Could not load singleton or factory method for type '" + + definition.getType()); + } + // try maps constructor + try { + /* + * Warning: this branch is already used, that's why please adjust + * all implementations when modifying constructor interface + */ + Constructor<?> ct = clz.getConstructor(Injector.class, Map.class, + Map.class, StreamProvider.class, MetricsHostProvider.class, + String.class, String.class, String.class, String.class); + Object o = ct.newInstance( + injector, + definition.getProperties(), componentMetrics, + streamProvider, metricsHostProvider, + clusterNamePropertyId, hostNamePropertyId, + componentNamePropertyId, statePropertyId); + return PropertyProvider.class.cast(o); } catch (Exception e) { - LOG.error("Could not load class " + definition.getType()); - return null; + LOG.info("Could not find contructor for type '" + + definition.getType()); } + + // just new instance + return PropertyProvider.class.cast(clz.newInstance()); + + } catch (Exception e) { + LOG.error("Could not load class " + definition.getType()); + return null; + } + + } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java index 12f3725..65f7be7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXHostProvider.java @@ -27,19 +27,6 @@ import java.util.Set; public interface JMXHostProvider { /** - * Get the JMX host name for the given cluster name and component name. - * - * @param clusterName the cluster name - * @param componentName the component name - * - * @return the JMX host name - * - * @throws SystemException if unable to get the JMX host name - */ - public String getHostName(String clusterName, String componentName) - throws SystemException; - - /** * Get the JMX host names for the given cluster name and component name. * * @param clusterName the cluster name http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java index ca016f5..975a479 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/jmx/JMXPropertyProvider.java @@ -27,20 +27,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.ambari.server.controller.internal.AbstractPropertyProvider; import org.apache.ambari.server.controller.internal.PropertyInfo; +import org.apache.ambari.server.controller.metrics.MetricsHostProvider; +import org.apache.ambari.server.controller.metrics.MetricsProvider; import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; @@ -57,39 +49,11 @@ import org.slf4j.LoggerFactory; /** * Property provider implementation for JMX sources. */ -public class JMXPropertyProvider extends AbstractPropertyProvider { +public class JMXPropertyProvider extends MetricsProvider { private static final String NAME_KEY = "name"; private static final String PORT_KEY = "tag.port"; private static final String DOT_REPLACEMENT_CHAR = "#"; - private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 12000L; - - public static final String TIMED_OUT_MSG = "Timed out waiting for JMX metrics."; - public static final String STORM_REST_API = "STORM_REST_API"; - - /** - * Thread pool - */ - private static final ExecutorService EXECUTOR_SERVICE; - private static final int THREAD_POOL_CORE_SIZE = 20; - private static final int THREAD_POOL_MAX_SIZE = 100; - private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L; - - static { - LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); // unlimited Queue - - ThreadPoolExecutor threadPoolExecutor = - new ThreadPoolExecutor( - THREAD_POOL_CORE_SIZE, - THREAD_POOL_MAX_SIZE, - THREAD_POOL_TIMEOUT_MILLIS, - TimeUnit.MILLISECONDS, - queue); - - threadPoolExecutor.allowCoreThreadTimeOut(true); - - EXECUTOR_SERVICE = threadPoolExecutor; - } private final static ObjectReader jmxObjectReader; private final static ObjectReader stormObjectReader; @@ -138,16 +102,6 @@ public class JMXPropertyProvider extends AbstractPropertyProvider { private final String statePropertyId; - private final Set<String> healthyStates; - - /** - * The amount of time that this provider will wait for JMX metric values to be - * returned from the JMX sources. If no results are returned for this amount of - * time then the request to populate the resources will fail. - */ - protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS; - - // ----- Constructors ------------------------------------------------------ /** @@ -155,23 +109,23 @@ public class JMXPropertyProvider extends AbstractPropertyProvider { * * @param componentMetrics the map of supported metrics * @param streamProvider the stream provider - * @param jmxHostProvider the host mapping + * @param jmxHostProvider the JMX host mapping + * @param metricsHostProvider the host mapping * @param clusterNamePropertyId the cluster name property id * @param hostNamePropertyId the host name property id * @param componentNamePropertyId the component name property id * @param statePropertyId the state property id - * @param healthyStates the set of healthy state values */ public JMXPropertyProvider(Map<String, Map<String, PropertyInfo>> componentMetrics, StreamProvider streamProvider, JMXHostProvider jmxHostProvider, + MetricsHostProvider metricsHostProvider, String clusterNamePropertyId, String hostNamePropertyId, String componentNamePropertyId, - String statePropertyId, - Set<String> healthyStates) { + String statePropertyId) { - super(componentMetrics); + super(componentMetrics, hostNamePropertyId, metricsHostProvider); this.streamProvider = streamProvider; this.jmxHostProvider = jmxHostProvider; @@ -179,116 +133,11 @@ public class JMXPropertyProvider extends AbstractPropertyProvider { this.hostNamePropertyId = hostNamePropertyId; this.componentNamePropertyId = componentNamePropertyId; this.statePropertyId = statePropertyId; - this.healthyStates = healthyStates; } - - // ----- PropertyProvider -------------------------------------------------- - - @Override - public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate) - throws SystemException { - - // Get a valid ticket for the request. - Ticket ticket = new Ticket(); - - CompletionService<Resource> completionService = - new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE); - - // In a large cluster we could have thousands of resources to populate here. - // Distribute the work across multiple threads. - for (Resource resource : resources) { - completionService.submit(getPopulateResourceCallable(resource, request, predicate, ticket)); - } - - Set<Resource> keepers = new HashSet<Resource>(); - try { - for (int i = 0; i < resources.size(); ++ i) { - Future<Resource> resourceFuture = - completionService.poll(populateTimeout, TimeUnit.MILLISECONDS); - - if (resourceFuture == null) { - // its been more than the populateTimeout since the last callable completed ... - // invalidate the ticket to abort the threads and don't wait any longer - ticket.invalidate(); - LOG.error(TIMED_OUT_MSG); - break; - } else { - // future should already be completed... no need to wait on get - Resource resource = resourceFuture.get(); - if (resource != null) { - keepers.add(resource); - } - } - } - } catch (InterruptedException e) { - logException(e); - } catch (ExecutionException e) { - rethrowSystemException(e.getCause()); - } - return keepers; - } - // ----- helper methods ---------------------------------------------------- /** - * Set the populate timeout value for this provider. - * - * @param populateTimeout the populate timeout value - */ - protected void setPopulateTimeout(long populateTimeout) { - this.populateTimeout = populateTimeout; - } - - /** - * Get the spec to locate the JMX stream from the given host and port - * - * @param protocol the protocol, one of http or https - * @param hostName the host name - * @param port the port - * - * @return the spec - */ - protected String getSpec(String protocol, String hostName, - String port, String componentName) { - if (null == componentName || !componentName.equals(STORM_REST_API)) - return protocol + "://" + hostName + ":" + port + "/jmx"; - else - return protocol + "://" + hostName + ":" + port + "/api/cluster/summary"; - } - - /** - * Get the spec to locate the JMX stream from the given host and port - * - * @param hostName the host name - * @param port the port - * - * @return the spec - */ - protected String getSpec(String hostName, String port) { - return getSpec("http", hostName, port, null); - } - - /** - * Get a callable that can be used to populate the given resource. - * - * @param resource the resource to be populated - * @param request the request - * @param predicate the predicate - * @param ticket a valid ticket - * - * @return a callable that can be used to populate the given resource - */ - private Callable<Resource> getPopulateResourceCallable( - final Resource resource, final Request request, final Predicate predicate, final Ticket ticket) { - return new Callable<Resource>() { - public Resource call() throws SystemException { - return populateResource(resource, request, predicate, ticket); - } - }; - } - - /** * Populate a resource by obtaining the requested JMX properties. * * @param resource the resource to be populated @@ -298,7 +147,8 @@ public class JMXPropertyProvider extends AbstractPropertyProvider { * * @return the populated resource; null if the resource should NOT be part of the result set for the given predicate */ - private Resource populateResource(Resource resource, Request request, Predicate predicate, Ticket ticket) + @Override + protected Resource populateResource(Resource resource, Request request, Predicate predicate, MetricsProvider.Ticket ticket) throws SystemException { Set<String> ids = getRequestPropertyIds(request, predicate); @@ -354,16 +204,14 @@ public class JMXPropertyProvider extends AbstractPropertyProvider { try { for (String hostName : hostNames) { try { - in = streamProvider.readFrom(getSpec(protocol, hostName, port, componentName)); + in = streamProvider.readFrom(getSpec(protocol, hostName, port, "/jmx")); // if the ticket becomes invalid (timeout) then bail out if (!ticket.isValid()) { return resource; } - if (null == componentName || !componentName.equals(STORM_REST_API)) { - getHadoopMetricValue(in, ids, resource, request, ticket); - } else { - getStormMetricValue(in, ids, resource, ticket); - } + + getHadoopMetricValue(in, ids, resource, request, ticket); + } catch (IOException e) { logException(e); } @@ -470,31 +318,6 @@ public class JMXPropertyProvider extends AbstractPropertyProvider { } } - /** - * TODO: Refactor - * Storm-specific metrics fetching - */ - private void getStormMetricValue(InputStream in, Set<String> ids, - Resource resource, Ticket ticket) throws IOException { - HashMap<String, Object> metricHolder = stormObjectReader.readValue(in); - for (String category : ids) { - Map<String, PropertyInfo> defProps = getComponentMetrics().get(STORM_REST_API); - for (Map.Entry<String, PropertyInfo> depEntry : defProps.entrySet()) { - if (depEntry.getKey().startsWith(category)) { - PropertyInfo propInfo = depEntry.getValue(); - String propName = propInfo.getPropertyId(); - Object propertyValue = metricHolder.get(propName); - String absId = PropertyHelper.getPropertyId(category, propName); - if (!ticket.isValid()) { - return; - } - // TODO: Maybe cast to int - resource.setProperty(absId, propertyValue); - } - } - } - } - private void setResourceValue(Resource resource, Map<String, Map<String, Object>> categories, String propertyId, String category, String property, List<String> keyList) { Map<String, Object> properties = categories.get(category); @@ -546,73 +369,4 @@ public class JMXPropertyProvider extends AbstractPropertyProvider { } return null; } - - /** - * Determine whether or not the given property id was requested. - */ - private static boolean isRequestedPropertyId(String propertyId, String requestedPropertyId, Request request) { - return request.getPropertyIds().isEmpty() || propertyId.startsWith(requestedPropertyId); - } - - /** - * Log an error for the given exception. - * - * @param throwable the caught exception - * - * @return the error message that was logged - */ - private static String logException(Throwable throwable) { - String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage(); - - LOG.debug(msg, throwable); - - return msg; - } - - /** - * Rethrow the given exception as a System exception and log the message. - * - * @param throwable the caught exception - * - * @throws org.apache.ambari.server.controller.spi.SystemException always around the given exception - */ - private static void rethrowSystemException(Throwable throwable) throws SystemException { - String msg = logException(throwable); - - if (throwable instanceof SystemException) { - throw (SystemException) throwable; - } - throw new SystemException (msg, throwable); - } - - - // ----- inner class : Ticket ---------------------------------------------- - - /** - * Ticket used to cancel provider threads. The provider threads should - * monitor the validity of the passed in ticket and bail out if it becomes - * invalid (as in a timeout). - */ - private static class Ticket { - /** - * Indicate whether or not the ticket is valid. - */ - private volatile boolean valid = true; - - /** - * Invalidate the ticket. - */ - public void invalidate() { - valid = false; - } - - /** - * Determine whether or not this ticket is valid. - * - * @return true if the ticket is valid - */ - public boolean isValid() { - return valid; - } - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsHostProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsHostProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsHostProvider.java new file mode 100644 index 0000000..1a96829 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsHostProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.controller.metrics; + + +import org.apache.ambari.server.controller.spi.SystemException; + +public interface MetricsHostProvider { + + /** + * Get the host name for the given cluster name and component name. + * + * @param clusterName the cluster name + * @param componentName the component name + * @return the host name + * @throws org.apache.ambari.server.controller.spi.SystemException + * if unable to get the JMX host name + */ + public String getHostName(String clusterName, String componentName) + throws SystemException; + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsProvider.java new file mode 100644 index 0000000..27c55f4 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/MetricsProvider.java @@ -0,0 +1,302 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.server.controller.metrics; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +import org.apache.ambari.server.controller.internal.AbstractPropertyProvider; +import org.apache.ambari.server.controller.internal.PropertyInfo; +import org.apache.ambari.server.controller.spi.Predicate; +import org.apache.ambari.server.controller.spi.Request; +import org.apache.ambari.server.controller.spi.Resource; +import org.apache.ambari.server.controller.spi.SystemException; + +/** + * Unites common functionality for multithreaded metrics providers + * (JMX and REST as of now). Shares the same pool of executor threads. + */ +public abstract class MetricsProvider extends AbstractPropertyProvider { + + /** + * Host states that make available metrics collection + */ + public static final Set<String> healthyStates = Collections.singleton("STARTED"); + protected final String hostNamePropertyId; + private final MetricsHostProvider metricsHostProvider; + + /** + * Executor service is shared between all childs of current class + */ + private static final ExecutorService EXECUTOR_SERVICE = initExecutorService(); + private static final int THREAD_POOL_CORE_SIZE = 20; + private static final int THREAD_POOL_MAX_SIZE = 100; + private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L; + + private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L; + /** + * The amount of time that this provider will wait for JMX metric values to be + * returned from the JMX sources. If no results are returned for this amount of + * time then the request to populate the resources will fail. + */ + protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS; + public static final String TIMED_OUT_MSG = "Timed out waiting for metrics."; + + // ----- Constructors ------------------------------------------------------ + + /** + * Construct a provider. + * + * @param componentMetrics map of metrics for this provider + */ + public MetricsProvider(Map<String, Map<String, PropertyInfo>> componentMetrics, + String hostNamePropertyId, + MetricsHostProvider metricsHostProvider) { + super(componentMetrics); + this.hostNamePropertyId = hostNamePropertyId; + this.metricsHostProvider = metricsHostProvider; + } + + // ----- Thread pool ------------------------------------------------------- + + /** + * Generates thread pool with default parameters + */ + + + private static ExecutorService initExecutorService() { + LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); // unlimited Queue + + ThreadPoolExecutor threadPoolExecutor = + new ThreadPoolExecutor( + THREAD_POOL_CORE_SIZE, + THREAD_POOL_MAX_SIZE, + THREAD_POOL_TIMEOUT_MILLIS, + TimeUnit.MILLISECONDS, + queue); + + threadPoolExecutor.allowCoreThreadTimeOut(true); + + return threadPoolExecutor; + } + + public static ExecutorService getExecutorService() { + return EXECUTOR_SERVICE; + } + + // ----- Common PropertyProvider implementation details -------------------- + + @Override + public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate) + throws SystemException { + + // Get a valid ticket for the request. + Ticket ticket = new Ticket(); + + CompletionService<Resource> completionService = + new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE); + + // In a large cluster we could have thousands of resources to populate here. + // Distribute the work across multiple threads. + for (Resource resource : resources) { + completionService.submit(getPopulateResourceCallable(resource, request, predicate, ticket)); + } + + Set<Resource> keepers = new HashSet<Resource>(); + try { + for (int i = 0; i < resources.size(); ++ i) { + Future<Resource> resourceFuture = + completionService.poll(populateTimeout, TimeUnit.MILLISECONDS); + + if (resourceFuture == null) { + // its been more than the populateTimeout since the last callable completed ... + // invalidate the ticket to abort the threads and don't wait any longer + ticket.invalidate(); + LOG.error(TIMED_OUT_MSG); + break; + } else { + // future should already be completed... no need to wait on get + Resource resource = resourceFuture.get(); + if (resource != null) { + keepers.add(resource); + } + } + } + } catch (InterruptedException e) { + logException(e); + } catch (ExecutionException e) { + rethrowSystemException(e.getCause()); + } + return keepers; + } + + /** + * Get a callable that can be used to populate the given resource. + * + * @param resource the resource to be populated + * @param request the request + * @param predicate the predicate + * @param ticket a valid ticket + * + * @return a callable that can be used to populate the given resource + */ + private Callable<Resource> getPopulateResourceCallable( + final Resource resource, final Request request, final Predicate predicate, final Ticket ticket) { + return new Callable<Resource>() { + public Resource call() throws SystemException { + return populateResource(resource, request, predicate, ticket); + } + }; + } + + + /** + * Populate a resource by obtaining the requested JMX properties. + * + * @param resource the resource to be populated + * @param request the request + * @param predicate the predicate + * @return the populated resource; null if the resource should NOT be part of the result set for the given predicate + */ + + + protected abstract Resource populateResource(Resource resource, + Request request, Predicate predicate, Ticket ticket) + + throws SystemException; + + /** + * Set the populate timeout value for this provider. + * + * @param populateTimeout the populate timeout value + */ + + + protected void setPopulateTimeout(long populateTimeout) { + this.populateTimeout = populateTimeout; + + } + + + // ----- helper methods ---------------------------------------------------- + + /** + * Determine whether or not the given property id was requested. + */ + protected static boolean isRequestedPropertyId(String propertyId, String requestedPropertyId, Request request) { + return request.getPropertyIds().isEmpty() || propertyId.startsWith(requestedPropertyId); + } + + /** + * Log an error for the given exception. + * + * @param throwable the caught exception + * + * @return the error message that was logged + */ + protected static String logException(Throwable throwable) { + String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage(); + + LOG.debug(msg, throwable); + + return msg; + } + + /** + * Rethrow the given exception as a System exception and log the message. + * + * @param throwable the caught exception + * + * @throws org.apache.ambari.server.controller.spi.SystemException always around the given exception + */ + protected static void rethrowSystemException(Throwable throwable) throws SystemException { + String msg = logException(throwable); + + if (throwable instanceof SystemException) { + throw (SystemException) throwable; + } + throw new SystemException (msg, throwable); + } + + /** + * Returns a hostname for component + */ + + + public String getHost(Resource resource, String clusterName, String componentName) throws SystemException { + return hostNamePropertyId == null ? + metricsHostProvider.getHostName(clusterName, componentName) : + (String) resource.getPropertyValue(hostNamePropertyId); + + } + + + /** + * Get complete URL from parts + */ + + protected String getSpec(String protocol, String hostName, + String port, String url) { + return protocol + "://" + hostName + ":" + port + url; + + } + + // ----- inner class : Ticket ---------------------------------------------- + + /** + * Ticket used to cancel provider threads. The provider threads should + * monitor the validity of the passed in ticket and bail out if it becomes + * invalid (as in a timeout). + */ + protected static class Ticket { + /** + * Indicate whether or not the ticket is valid. + */ + private volatile boolean valid = true; + + /** + * Invalidate the ticket. + */ + public void invalidate() { + valid = false; + } + + /** + * Determine whether or not this ticket is valid. + * + * @return true if the ticket is valid + */ + public boolean isValid() { + return valid; + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java new file mode 100644 index 0000000..48d06b8 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProvider.java @@ -0,0 +1,448 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.ambari.server.controller.metrics; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.inject.Inject; +import com.google.inject.Injector; +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.internal.PropertyInfo; +import org.apache.ambari.server.controller.internal.StackDefinedPropertyProvider; +import org.apache.ambari.server.controller.spi.Predicate; +import org.apache.ambari.server.controller.spi.Request; +import org.apache.ambari.server.controller.spi.Resource; +import org.apache.ambari.server.controller.spi.SystemException; +import org.apache.ambari.server.controller.utilities.StreamProvider; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.Clusters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Map; +import java.util.Set; + +/** + * WARNING: Class should be thread-safe! + * <p/> + * Resolves metrics like api/cluster/summary/nimbus.uptime + * For every metric, finds a relevant JSON value and returns is as + * a resource property. + */ +public class RestMetricsPropertyProvider extends MetricsProvider { + + protected final static Logger LOG = + LoggerFactory.getLogger(RestMetricsPropertyProvider.class); + + private static Map<String, RestMetricsPropertyProvider> instances = + new Hashtable<String, RestMetricsPropertyProvider>(); + + @Inject + private AmbariManagementController amc; + + @Inject + private Clusters clusters; + + private final Map<String, String> metricsProperties; + private final StreamProvider streamProvider; + private final String clusterNamePropertyId; + private final String componentNamePropertyId; + private final String statePropertyId; + private MetricsHostProvider metricsHostProvider; + + private static final String DEFAULT_PORT_PROPERTY = "default_port"; + private static final String PORT_CONFIG_TYPE_PROPERTY = "port_config_type"; + private static final String PORT_PROPERTY_NAME_PROPERTY = "port_property_name"; + + /** + * Protocol to use when connecting + */ + private static final String PROTOCOL_OVERRIDE_PROPERTY = "protocol"; + private static final String HTTP_PROTOCOL = "http"; + private static final String HTTPS_PROTOCOL = "https"; + private static final String DEFAULT_PROTOCOL = HTTP_PROTOCOL; + + + /** + * String that separates JSON URL from path inside JSON in metrics path + */ + public static final String URL_PATH_SEPARATOR = "##"; + + /** + * Symbol that separates names of nested JSON sections in metrics path + */ + public static final String DOCUMENT_PATH_SEPARATOR = "#"; + + + /** + * Create a REST property provider. + * + * @param metricsProperties the map of per-component metrics properties + * @param componentMetrics the map of supported metrics for component + * @param streamProvider the stream provider + * @param metricsHostProvider metricsHostProvider instance + * @param clusterNamePropertyId the cluster name property id + * @param hostNamePropertyId the host name property id + * @param componentNamePropertyId the component name property id + * @param statePropertyId the state property id + */ + public RestMetricsPropertyProvider( + Injector injector, + Map<String, String> metricsProperties, + Map<String, Map<String, PropertyInfo>> componentMetrics, + StreamProvider streamProvider, + MetricsHostProvider metricsHostProvider, + String clusterNamePropertyId, + String hostNamePropertyId, + String componentNamePropertyId, + String statePropertyId) { + + super(componentMetrics, hostNamePropertyId, metricsHostProvider); + this.metricsProperties = metricsProperties; + this.streamProvider = streamProvider; + this.clusterNamePropertyId = clusterNamePropertyId; + this.componentNamePropertyId = componentNamePropertyId; + this.statePropertyId = statePropertyId; + this.metricsHostProvider = metricsHostProvider; + injector.injectMembers(this); + } + + // ----- MetricsProvider implementation ------------------------------------ + + + /** + * Populate a resource by obtaining the requested REST properties. + * + * @param resource the resource to be populated + * @param request the request + * @param predicate the predicate + * @return the populated resource; null if the resource should NOT be + * part of the result set for the given predicate + */ + @Override + protected Resource populateResource(Resource resource, + Request request, Predicate predicate, Ticket ticket) + throws SystemException { + + // Remove request properties that request temporal information + Set<String> ids = getRequestPropertyIds(request, predicate); + Set<String> temporalIds = new HashSet<String>(); + for (String id : ids) { + if (request.getTemporalInfo(id) != null) { + temporalIds.add(id); + } + } + ids.removeAll(temporalIds); + + if (ids.isEmpty()) { + // no properties requested + return resource; + } + + // Don't attempt to get REST properties if the resource is in + // an unhealthy state + if (statePropertyId != null) { + String state = (String) resource.getPropertyValue(statePropertyId); + if (state != null && !healthyStates.contains(state)) { + return resource; + } + } + + String componentName = (String) resource.getPropertyValue(componentNamePropertyId); + + Map<String, PropertyInfo> propertyInfos = + getComponentMetrics().get(StackDefinedPropertyProvider.WRAPPED_METRICS_KEY); + if (propertyInfos == null) { + // If there are no metrics defined for the given component then there is nothing to do. + return resource; + } + String protocol = resolveProtocol(); + String port = "-1"; + String hostname = null; + try { + String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId); + Cluster cluster = clusters.getCluster(clusterName); + hostname = getHost(resource, clusterName, componentName); + if (hostname == null) { + String msg = String.format("Unable to get component REST metrics. " + + "No host name for %s.", componentName); + LOG.warn(msg); + return resource; + } + port = resolvePort(cluster, hostname, componentName, metricsProperties); + } catch (Exception e) { + rethrowSystemException(e); + } + + Set<String> resultIds = new HashSet<String>(); + for (String id : ids){ + for (String metricId : propertyInfos.keySet()){ + if (metricId.startsWith(id)){ + resultIds.add(metricId); + } + } + } + + // Extract set of URLs for metrics + HashMap<String, Set<String>> urls = extractPropertyURLs(resultIds, propertyInfos); + + for (String url : urls.keySet()) { + try { + InputStream in = streamProvider.readFrom(getSpec(protocol, hostname, port, url)); + if (!ticket.isValid()) { + return resource; + } + try { + extractValuesFromJSON(in, urls.get(url), resource, propertyInfos); + } finally { + in.close(); + } + } catch (IOException e) { + logException(e); + } + } + return resource; + } + + @Override + public Set<String> checkPropertyIds(Set<String> propertyIds) { + Set<String> unsupported = new HashSet<String>(); + for (String propertyId : propertyIds) { + if (!getComponentMetrics(). + get(StackDefinedPropertyProvider.WRAPPED_METRICS_KEY). + containsKey(propertyId)) { + unsupported.add(propertyId); + } + } + return unsupported; + } + + // ----- helper methods ---------------------------------------------------- + + /** + * Uses port_config_type, port_property_name, default_port parameters from + * metricsProperties to find out right port value for service + * + * @return determines REST port for service + */ + private String resolvePort(Cluster cluster, String hostname, String componentName, + Map<String, String> metricsProperties) + throws AmbariException { + String portConfigType = null; + String portPropertyName = null; + if (metricsProperties.containsKey(PORT_CONFIG_TYPE_PROPERTY) && + metricsProperties.containsKey(PORT_PROPERTY_NAME_PROPERTY)) { + portConfigType = metricsProperties.get(PORT_CONFIG_TYPE_PROPERTY); + portPropertyName = metricsProperties.get(PORT_PROPERTY_NAME_PROPERTY); + } + String portStr = null; + if (portConfigType != null && portPropertyName != null) { + try { + Map<String, Map<String, String>> configTags = + amc.findConfigurationTagsWithOverrides(cluster, hostname); + if (configTags.containsKey(portConfigType)) { + Map<String, String> config = configTags.get(portConfigType); + if (config.containsKey(portPropertyName)) { + portStr = config.get(portPropertyName); + } + } + } catch (AmbariException e) { + String message = String.format("Can not extract config tags for " + + "cluster = %s, hostname = %s", componentName, hostname); + LOG.warn(message); + } + if (portStr == null) { + String message = String.format( + "Can not extract REST port for " + + "component %s from configurations. " + + "Config tag = %s, config key name = %s, " + + "hostname = %s. Probably metrics.json file for " + + "service is misspelled. Trying default port", + componentName, portConfigType, + portPropertyName, hostname); + LOG.warn(message); + } + } + if (portStr == null && metricsProperties.containsKey(DEFAULT_PORT_PROPERTY)) { + if (metricsProperties.containsKey(DEFAULT_PORT_PROPERTY)) { + portStr = metricsProperties.get(DEFAULT_PORT_PROPERTY); + } else { + String message = String.format("Can not determine REST port for " + + "component %s. " + + "Default REST port property %s is not defined at metrics.json " + + "file for service, and there is no any other available ways " + + "to determine port information.", + componentName, DEFAULT_PORT_PROPERTY); + throw new AmbariException(message); + } + } + return portStr; + } + + + /** + * Extracts protocol type from metrics properties. If no protocol is defined, + * uses default protocol. + */ + private String resolveProtocol() { + String protocol = DEFAULT_PROTOCOL; + if (metricsProperties.containsKey(PROTOCOL_OVERRIDE_PROPERTY)) { + protocol = metricsProperties.get(PROTOCOL_OVERRIDE_PROPERTY).toLowerCase(); + if (!protocol.equals(HTTP_PROTOCOL) && !protocol.equals(HTTPS_PROTOCOL)) { + String message = String.format( + "Unsupported protocol type %s, falling back to %s", + protocol, DEFAULT_PROTOCOL); + LOG.warn(message); + protocol = DEFAULT_PROTOCOL; + } + } else { + protocol = DEFAULT_PROTOCOL; + } + return protocol; + } + + + /** + * Extracts JSON URL from metricsPath + */ + private String extractMetricsURL(String metricsPath) + throws IllegalArgumentException { + return validateAndExtractPathParts(metricsPath)[0]; + } + + /** + * Extracts part of metrics path that contains path through nested + * JSON sections + */ + private String extractDocumentPath(String metricsPath) + throws IllegalArgumentException { + return validateAndExtractPathParts(metricsPath)[1]; + } + + /** + * Returns [MetricsURL, DocumentPath] or throws an exception + * if metricsPath is invalid. + */ + private String[] validateAndExtractPathParts(String metricsPath) + throws IllegalArgumentException { + String[] pathParts = metricsPath.split(URL_PATH_SEPARATOR); + if (pathParts.length == 2) { + return pathParts; + } else { + // This warning is expected to occur only on development phase + String message = String.format( + "Metrics path %s does not contain or contains" + + "more than one %s sequence. That probably " + + "means that the mentioned metrics path is misspelled. " + + "Please check the relevant metrics.json file", + metricsPath, URL_PATH_SEPARATOR); + throw new IllegalArgumentException(message); + } + } + + + /** + * Returns a map <document_url, requested_property_ids>. + * requested_property_ids contain a set of property IDs + * that should be fetched for this URL. Doing + * that allows us to extract document only once when getting few properties + * from this document. + * + * @param ids set of property IDs that should be fetched + */ + private HashMap<String, Set<String>> extractPropertyURLs(Set<String> ids, + Map<String, PropertyInfo> propertyInfos) { + HashMap<String, Set<String>> result = new HashMap<String, Set<String>>(); + for (String requestedPropertyId : ids) { + PropertyInfo propertyInfo = propertyInfos.get(requestedPropertyId); + + String metricsPath = propertyInfo.getPropertyId(); + String url = extractMetricsURL(metricsPath); + Set<String> set; + if (!result.containsKey(url)) { + set = new HashSet<String>(); + result.put(url, set); + } else { + set = result.get(url); + } + set.add(requestedPropertyId); + } + return result; + } + + + /** + * Extracts requested properties from a given JSON input stream into + * resource. + * + * @param jsonStream input stream that contains JSON + * @param requestedPropertyIds a set of property IDs + * that should be fetched for this URL + * @param resource all extracted values are placed into resource + */ + private void extractValuesFromJSON(InputStream jsonStream, + Set<String> requestedPropertyIds, + Resource resource, + Map<String, PropertyInfo> propertyInfos) + throws IOException { + Gson gson = new Gson(); + Type type = new TypeToken<Map<Object, Object>>() { + }.getType(); + JsonReader jsonReader = new JsonReader( + new BufferedReader(new InputStreamReader(jsonStream))); + Map<String, String> jsonMap = gson.fromJson(jsonReader, type); + for (String requestedPropertyId : requestedPropertyIds) { + PropertyInfo propertyInfo = propertyInfos.get(requestedPropertyId); + String metricsPath = propertyInfo.getPropertyId(); + String documentPath = extractDocumentPath(metricsPath); + String[] docPath = documentPath.split(DOCUMENT_PATH_SEPARATOR); + Map<String, String> subMap = jsonMap; + for (int i = 0; i < docPath.length; i++) { + String pathElement = docPath[i]; + if (!subMap.containsKey(pathElement)) { + String message = String.format( + "Can not fetch %dth element of document path (%s) " + + "from json. Wrong metrics path: %s", + i, pathElement, metricsPath); + throw new IOException(message); + } + Object jsonSubElement = jsonMap.get(pathElement); + if (i == docPath.length - 1) { // Reached target document section + // Extract property value + resource.setProperty(requestedPropertyId, jsonSubElement); + } else { // Navigate to relevant document section + subMap = gson.fromJson((JsonElement) jsonSubElement, type); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/cc4dfe35/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json index 83e27d1..c2776ab 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json +++ b/ambari-server/src/main/resources/stacks/HDP/2.1/services/STORM/metrics.json @@ -2,45 +2,52 @@ "STORM_REST_API": { "Component": [ { - "type": "jmx", + "type": "org.apache.ambari.server.controller.metrics.RestMetricsPropertyProvider", + "properties" : { + "default_port": "8745", + "port_config_type": "storm-site", + "port_property_name": "storm.port", + "protocol": "http" + }, "metrics": { - "metrics/api/cluster/summary/tasks.total": { - "metric": "tasks.total", + "metrics/api/cluster/summary/tasks.total": + { + "metric": "/api/cluster/summary##tasks.total", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/slots.total": { - "metric": "slots.total", + "metric": "/api/cluster/summary##slots.total", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/slots.free": { - "metric": "slots.free", + "metric": "/api/cluster/summary##slots.free", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/supervisors": { - "metric": "supervisors", + "metric": "/api/cluster/summary##supervisors", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/executors.total": { - "metric": "executors.total", + "metric": "/api/cluster/summary##executors.total", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/slots.used": { - "metric": "slots.used", + "metric": "/api/cluster/summary##slots.used", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/topologies": { - "metric": "topologies", + "metric": "/api/cluster/summary##topologies", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/nimbus.uptime": { - "metric": "nimbus.uptime", + "metric": "/api/cluster/summary##nimbus.uptime", "pointInTime": true, "temporal": false } @@ -49,51 +56,57 @@ ], "HostComponent": [ { - "type": "jmx", + "type": "org.apache.ambari.server.controller.metrics.RestMetricsPropertyProvider", + "properties" : { + "default_port": "8745", + "port_config_type": "storm-site", + "port_property_name": "storm.port", + "protocol": "http" + }, "metrics": { - "metrics/api/cluster/summary/tasks.total": { - "metric": "tasks.total", + "metrics/api/cluster/summary/tasks.total": + { + "metric": "/api/cluster/summary##tasks.total", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/slots.total": { - "metric": "slots.total", + "metric": "/api/cluster/summary##slots.total", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/slots.free": { - "metric": "slots.free", + "metric": "/api/cluster/summary##slots.free", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/supervisors": { - "metric": "supervisors", + "metric": "/api/cluster/summary##supervisors", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/executors.total": { - "metric": "executors.total", + "metric": "/api/cluster/summary##executors.total", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/slots.used": { - "metric": "slots.used", + "metric": "/api/cluster/summary##slots.used", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/topologies": { - "metric": "topologies", + "metric": "/api/cluster/summary##topologies", "pointInTime": true, "temporal": false }, "metrics/api/cluster/summary/nimbus.uptime": { - "metric": "nimbus.uptime", + "metric": "/api/cluster/summary##nimbus.uptime", "pointInTime": true, "temporal": false } } } - ] }, "NIMBUS": {
