Repository: ambari Updated Branches: refs/heads/trunk 2fcc94753 -> eec799d16
http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java new file mode 100644 index 0000000..97816e8 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/MetricsRetrievalService.java @@ -0,0 +1,539 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.state.services; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.Thread.UncaughtExceptionHandler; +import java.lang.reflect.Type; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.ambari.server.AmbariService; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.jmx.JMXMetricHolder; +import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor; +import org.apache.ambari.server.controller.utilities.StreamProvider; +import org.apache.commons.io.IOUtils; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.inject.Inject; + +/** + * The {@link MetricsRetrievalService} is used as a headless, autonomous service + * which encapsulates: + * <ul> + * <li>An {@link ExecutorService} for fullfilling remote metric URL requests + * <li>A cache for JMX metrics + * <li>A cache for REST metrics + * </ul> + * + * Classes can inject an instance of this service in order to gain access to its + * caches and request mechanism. + * <p/> + * Callers must submit a request to the service in order to reach out and pull + * in remote metric data. Otherwise, the cache will never be populated. On the + * first usage of this service, the cache will always be empty. On every + * subsequent request, the data from the prior invocation of + * {@link #submit(JMXRunnable)} will be available. + * <p/> + * Metric data is cached temporarily and is controlled by + * {@link Configuration#getMetricsServiceCacheTimeout()}. + */ +@AmbariService +public class MetricsRetrievalService extends AbstractService { + + /** + * Logger. + */ + protected final static Logger LOG = LoggerFactory.getLogger(MetricsRetrievalService.class); + + /** + * The timeout for exceptions which are caught and then cached to prevent log + * spamming. + * + * @see #s_exceptionCache + */ + private static final int EXCEPTION_CACHE_TIMEOUT_MINUTES = 5; + + /** + * Exceptions from this service should not SPAM the logs; so cache exceptions + * and log once every {@vale #EXCEPTION_CACHE_TIMEOUT_MINUTES} minutes. + */ + private static final Cache<String, Throwable> s_exceptionCache = CacheBuilder.newBuilder().expireAfterWrite( + EXCEPTION_CACHE_TIMEOUT_MINUTES, TimeUnit.MINUTES).build(); + + /** + * Configuration. + */ + @Inject + private Configuration m_configuration; + + /** + * Used for reading REST JSON responses. + */ + @Inject + private Gson m_gson; + + /** + * A cache of URL to parsed JMX beans + */ + private Cache<String, JMXMetricHolder> m_jmxCache; + + /** + * A cache of URL to parsed REST data. + */ + private Cache<String, Map<String, String>> m_restCache; + + /** + * The {@link Executor} which will handle all of the requests to load remote + * metrics from URLs. + */ + private ThreadPoolExecutor m_threadPoolExecutor; + + /** + * Used to parse remote JMX JSON into a {@link Map}. + */ + private final ObjectReader m_jmxObjectReader; + + /** + * A thread-safe collection of all of the URL endpoints queued for processing. + * This helps prevent the same endpoint from being queued multiple times. + */ + private final Set<String> m_queuedUrls = Sets.newConcurrentHashSet(); + + + /** + * The size of the worker queue (used for logged warnings about size). + */ + private int m_queueMaximumSize; + + /** + * Constructor. + * + */ + public MetricsRetrievalService() { + ObjectMapper jmxObjectMapper = new ObjectMapper(); + jmxObjectMapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false); + m_jmxObjectReader = jmxObjectMapper.reader(JMXMetricHolder.class); + } + + /** + * {@inheritDoc} + */ + @Override + protected void doStart() { + // initialize the caches + int jmxCacheExpirationMinutes = m_configuration.getMetricsServiceCacheTimeout(); + m_jmxCache = CacheBuilder.newBuilder().expireAfterWrite(jmxCacheExpirationMinutes, + TimeUnit.MINUTES).build(); + + m_restCache = CacheBuilder.newBuilder().expireAfterWrite(jmxCacheExpirationMinutes, + TimeUnit.MINUTES).build(); + + // iniitalize the executor service + int corePoolSize = m_configuration.getMetricsServiceThreadPoolCoreSize(); + int maxPoolSize = m_configuration.getMetricsServiceThreadPoolMaxSize(); + m_queueMaximumSize = m_configuration.getMetricsServiceWorkerQueueSize(); + int threadPriority = m_configuration.getMetricsServiceThreadPriority(); + m_threadPoolExecutor = new ScalingThreadPoolExecutor(corePoolSize, maxPoolSize, 30, + TimeUnit.SECONDS, m_queueMaximumSize); + + m_threadPoolExecutor.allowCoreThreadTimeOut(true); + m_threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); + + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat( + "ambari-metrics-retrieval-service-thread-%d").setPriority( + threadPriority).setUncaughtExceptionHandler( + new MetricRunnableExceptionHandler()).build(); + + m_threadPoolExecutor.setThreadFactory(threadFactory); + + LOG.info( + "Initializing the Metrics Retrieval Service with core={}, max={}, workerQueue={}, threadPriority={}", + corePoolSize, maxPoolSize, m_queueMaximumSize, threadPriority); + } + + /** + * Testing method for setting a synchronous {@link ThreadPoolExecutor}. + * + * @param threadPoolExecutor + */ + public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) { + m_threadPoolExecutor = threadPoolExecutor; + } + + /** + * {@inheritDoc} + */ + @Override + protected void doStop() { + m_jmxCache.invalidateAll(); + m_restCache.invalidateAll(); + m_queuedUrls.clear(); + m_threadPoolExecutor.shutdownNow(); + } + + /** + * Submit a {@link JMXRunnable} for execution. This will run inside of an + * {@link ExecutorService} to retrieve JMX data from a URL endpoint and parse + * the result into a {@link JMXMetricHolder}. + * <p/> + * Once JMX data is retrieved it is cached. Data in the cache can be retrieved + * via {@link #getCachedJMXMetric(String)}. + * <p/> + * Callers need not worry about invoking this mulitple times for the same URL + * endpoint. A single endpoint will only be enqueued once regardless of how + * many times this method is called until it has been fully retrieved and + * parsed. + * + * @param streamProvider + * the {@link StreamProvider} to use to read from the remote + * endpoint. + * @param jmxUrl + * the URL to read from + * + * @see #getCachedJMXMetric(String) + */ + public void submitJMXRequest(StreamProvider streamProvider, String jmxUrl) { + // log warnings if the queue size seems to be rather large + BlockingQueue<Runnable> queue = m_threadPoolExecutor.getQueue(); + int queueSize = queue.size(); + if (queueSize > Math.floor(0.9f * m_queueMaximumSize)) { + LOG.warn("The worker queue contains {} work items and is at {}% of capacity", queueSize, + ((float) queueSize / m_queueMaximumSize) * 100); + } + + // don't enqueue another request for the same URL + if (m_queuedUrls.contains(jmxUrl)) { + return; + } + + // enqueue this URL + m_queuedUrls.add(jmxUrl); + + JMXRunnable jmxRunnable = new JMXRunnable(m_jmxCache, m_queuedUrls, m_jmxObjectReader, + streamProvider, jmxUrl); + + m_threadPoolExecutor.execute(jmxRunnable); + } + + /** + * Submit a {@link RESTRunnable} for execution. This will run inside of an + * {@link ExecutorService} to retrieve JMX data from a URL endpoint and parse + * the result into a {@link Map} of {@link String}. + * <p/> + * Once REST data is retrieved it is cached. Data in the cache can be + * retrieved via {@link #getCachedRESTMetric(String)}. + * <p/> + * Callers need not worry about invoking this mulitple times for the same URL + * endpoint. A single endpoint will only be enqueued once regardless of how + * many times this method is called until it has been fully retrieved and + * parsed. + * + * @param streamProvider + * the {@link StreamProvider} to use to read from the remote + * endpoint. + * @param restUrl + * the URL to read from + * + * @see #getCachedRESTMetric(String) + */ + public void submitRESTRequest(StreamProvider streamProvider, String restUrl) { + if (m_queuedUrls.contains(restUrl)) { + return; + } + + // enqueue this URL + m_queuedUrls.add(restUrl); + + RESTRunnable restRunnable = new RESTRunnable(m_restCache, m_queuedUrls, m_gson, + streamProvider, restUrl); + + m_threadPoolExecutor.execute(restRunnable); + } + + /** + * Gets a cached JMX metric in the form of a {@link JMXMetricHolder}. If there + * is no metric data cached for the given URL, then {@code null} is returned. + * <p/> + * The onky way this cache is populated is by requesting the data to be loaded + * asynchronously via {@link #submit(JMXRunnable)}. + * + * @param jmxUrl + * the URL to retrieve cached data for (not {@code null}). + * @return + */ + public JMXMetricHolder getCachedJMXMetric(String jmxUrl) { + return m_jmxCache.getIfPresent(jmxUrl); + } + + /** + * Gets a cached REST metric in the form of a {@link Map}. If there is no + * metric data cached for the given URL, then {@code null} is returned. + * <p/> + * The onky way this cache is populated is by requesting the data to be loaded + * asynchronously via {@link #submit(JMXRunnable)}. + * + * @param restUrl + * the URL to retrieve cached data for (not {@code null}). + * @return + */ + public Map<String, String> getCachedRESTMetric(String restUrl) { + return m_restCache.getIfPresent(restUrl); + } + + /** + * Encapsulates the common logic for all metric {@link Runnable} instnaces. + */ + private static abstract class MetricRunnable implements Runnable { + + /** + * An initialized stream provider to read the remote endpoint. + */ + protected final StreamProvider m_streamProvider; + + /** + * A fully-qualified URL to read from. + */ + protected final String m_url; + + /** + * The URLs which have been requested but not yet read. + */ + private final Set<String> m_queuedUrls; + + /** + * Constructor. + * + * @param streamProvider + * the stream provider to read the URL with + * @param url + * the URL endpoint to read data from (JMX or REST) + * @param queuedUrls + * the URLs which are currently waiting to be processed. This + * method will remove the specified URL from this {@link Set} when + * it completes (successful or not). + */ + private MetricRunnable(StreamProvider streamProvider, String url, Set<String> queuedUrls) { + m_streamProvider = streamProvider; + m_url = url; + m_queuedUrls = queuedUrls; + } + + /** + * {@inheritDoc} + */ + @Override + public final void run() { + + // provide some profiling + long startTime = 0; + long endTime = 0; + boolean isDebugEnabled = LOG.isDebugEnabled(); + if (isDebugEnabled) { + startTime = System.currentTimeMillis(); + } + + InputStream inputStream = null; + + try { + if (isDebugEnabled) { + endTime = System.currentTimeMillis(); + LOG.debug("Loading metric JSON from {} took {}ms", m_url, (endTime - startTime)); + } + + // read the stream and process it + inputStream = m_streamProvider.readFrom(m_url); + processInputStreamAndCacheResult(inputStream); + + } catch (Exception exception) { + logException(exception, m_url); + } finally { + // remove this URL from the list of queued URLs to ensure it will be + // requested again + m_queuedUrls.remove(m_url); + + IOUtils.closeQuietly(inputStream); + } + } + + /** + * Reads data from the specified {@link InputStream} and processes that into + * a cachable value. The value will then be cached by this method. + * + * @param inputStream + * @throws Exception + */ + protected abstract void processInputStreamAndCacheResult(InputStream inputStream) + throws Exception; + + /** + * Logs the exception for the URL exactly once and caches the fact that the + * exception was logged. This is to prevent an endpoint being down from + * spamming the logs. + * + * @param throwable + * the exception to log (not {@code null}). + * @param url + * the URL associated with the exception (not {@code null}). + */ + final void logException(Throwable throwable, String url) { + String cacheKey = buildCacheKey(throwable, url); + if (null == s_exceptionCache.getIfPresent(cacheKey)) { + // cache it and log it + s_exceptionCache.put(cacheKey, throwable); + LOG.error( + "Unable to retrieve metrics from {}. Subsequent failures will be suppressed from the log for {} minutes.", + url, EXCEPTION_CACHE_TIMEOUT_MINUTES, throwable); + } + } + + /** + * Builds a unique cache key for the combination of {@link Throwable} and + * {@link String} URL. + * + * @param throwable + * @param url + * @return the key, such as {@value IOException-http://www.server.com/jmx}. + */ + private String buildCacheKey(Throwable throwable, String url) { + if (null == throwable || null == url) { + return ""; + } + + String throwableName = throwable.getClass().getSimpleName(); + return throwableName + "-" + url; + } + + } + + /** + * A {@link Runnable} used to retrieve JMX data from a remote URL endpoint. + * There is no need for a {@link Callable} here since the + * {@link MetricsRetrievalService} doesn't care about when the value returns or + * whether an exception is thrown. + */ + private static final class JMXRunnable extends MetricRunnable { + + private final ObjectReader m_jmxObjectReader; + private final Cache<String, JMXMetricHolder> m_cache; + + + /** + * Constructor. + * + * @param cache + * @param queuedUrls + * @param jmxObjectReader + * @param streamProvider + * @param jmxUrl + */ + private JMXRunnable(Cache<String, JMXMetricHolder> cache, Set<String> queuedUrls, + ObjectReader jmxObjectReader, StreamProvider streamProvider, String jmxUrl) { + super(streamProvider, jmxUrl, queuedUrls); + m_cache = cache; + m_jmxObjectReader = jmxObjectReader; + } + + /** + * {@inheritDoc} + */ + @Override + protected void processInputStreamAndCacheResult(InputStream inputStream) throws Exception { + JMXMetricHolder jmxMetricHolder = m_jmxObjectReader.readValue(inputStream); + m_cache.put(m_url, jmxMetricHolder); + } + } + + /** + * A {@link Runnable} used to retrieve REST data from a remote URL endpoint. + * There is no need for a {@link Callable} here since the + * {@link MetricsRetrievalService} doesn't care about when the value returns + * or whether an exception is thrown. + */ + private static final class RESTRunnable extends MetricRunnable { + + private final Gson m_gson; + private final Cache<String, Map<String, String>> m_cache; + + /** + * Constructor. + * + * @param cache + * @param queuedUrls + * @param gson + * @param streamProvider + * @param restUrl + */ + private RESTRunnable(Cache<String, Map<String, String>> cache, Set<String> queuedUrls, + Gson gson, StreamProvider streamProvider, String restUrl) { + super(streamProvider, restUrl, queuedUrls); + m_cache = cache; + m_gson = gson; + } + + /** + * {@inheritDoc} + */ + @Override + protected void processInputStreamAndCacheResult(InputStream inputStream) throws Exception { + Type type = new TypeToken<Map<Object, Object>>() {}.getType(); + + JsonReader jsonReader = new JsonReader( + new BufferedReader(new InputStreamReader(inputStream))); + + Map<String, String> jsonMap = m_gson.fromJson(jsonReader, type); + m_cache.put(m_url, jsonMap); + } + } + + /** + * A default exception handler. + */ + private static final class MetricRunnableExceptionHandler implements UncaughtExceptionHandler { + + /** + * {@inheritDoc} + */ + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Asynchronous metric retrieval encountered an exception with thread {}", t, e); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java index 5d65ea7..d1895bd 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java @@ -35,7 +35,9 @@ import java.util.Properties; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.configuration.Configuration.ConnectionPoolType; import org.apache.ambari.server.configuration.Configuration.DatabaseType; +import org.apache.ambari.server.controller.metrics.ThreadPoolEnabledPropertyProvider; import org.apache.ambari.server.security.authorization.LdapServerProperties; +import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.RandomStringUtils; import org.junit.After; @@ -780,4 +782,74 @@ public class ConfigurationTest { Assert.assertEquals("FooChannel", properties.getProperty("eclipselink.cache.coordination.channel")); Assert.assertEquals("commit", properties.getProperty("eclipselink.persistence-context.flush-mode")); } + + /** + * Tests the default values for the {@link ThreadPoolEnabledPropertyProvider}. + * + * @throws Exception + */ + @Test + public void testThreadPoolEnabledPropertyProviderDefaults() throws Exception { + final int SMALLEST_COMPLETION_SERIVCE_TIMEOUT_MS = 1000; + final int LARGEST_COMPLETION_SERIVCE_TIMEOUT_MS = 5000; + + int processorCount = Runtime.getRuntime().availableProcessors(); + final Properties ambariProperties = new Properties(); + final Configuration configuration = new Configuration(ambariProperties); + + long completionServiceTimeout = configuration.getPropertyProvidersCompletionServiceTimeout(); + int corePoolSize = configuration.getPropertyProvidersThreadPoolCoreSize(); + int maxPoolSize = configuration.getPropertyProvidersThreadPoolMaxSize(); + int workerQueueSize = configuration.getPropertyProvidersWorkerQueueSize(); + + // test defaults + Assert.assertEquals(5000, completionServiceTimeout); + Assert.assertEquals(Configuration.PROCESSOR_BASED_THREADPOOL_CORE_SIZE_DEFAULT, corePoolSize); + Assert.assertEquals(Configuration.PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT, maxPoolSize); + Assert.assertEquals(Integer.MAX_VALUE, workerQueueSize); + + // now let's test to make sure these all make sense + Assert.assertTrue(completionServiceTimeout >= SMALLEST_COMPLETION_SERIVCE_TIMEOUT_MS); + Assert.assertTrue(completionServiceTimeout <= LARGEST_COMPLETION_SERIVCE_TIMEOUT_MS); + Assert.assertTrue(corePoolSize <= maxPoolSize); + Assert.assertTrue(corePoolSize > 2 && corePoolSize <= 32); + Assert.assertTrue(maxPoolSize > 2 && maxPoolSize <= 32); + Assert.assertTrue(workerQueueSize > processorCount * 10); + } + + /** + * Tests the default values for the {@link MetricsRetrievalService}. + * + * @throws Exception + */ + @Test + public void testMetricsRetrieveServiceDefaults() throws Exception { + final int LOWEST_CACHE_TIMEOUT_MINUTES = 30; + + int processorCount = Runtime.getRuntime().availableProcessors(); + final Properties ambariProperties = new Properties(); + final Configuration configuration = new Configuration(ambariProperties); + + int priority = configuration.getMetricsServiceThreadPriority(); + int cacheTimeout = configuration.getMetricsServiceCacheTimeout(); + int corePoolSize = configuration.getMetricsServiceThreadPoolCoreSize(); + int maxPoolSize = configuration.getMetricsServiceThreadPoolMaxSize(); + int workerQueueSize = configuration.getMetricsServiceWorkerQueueSize(); + + // test defaults + Assert.assertEquals(Thread.NORM_PRIORITY, priority); + Assert.assertEquals(LOWEST_CACHE_TIMEOUT_MINUTES, cacheTimeout); + Assert.assertEquals(Configuration.PROCESSOR_BASED_THREADPOOL_CORE_SIZE_DEFAULT, corePoolSize); + Assert.assertEquals(Configuration.PROCESSOR_BASED_THREADPOOL_MAX_SIZE_DEFAULT, maxPoolSize); + Assert.assertEquals(maxPoolSize * 10, workerQueueSize); + + // now let's test to make sure these all make sense + Assert.assertTrue(priority <= Thread.NORM_PRIORITY); + Assert.assertTrue(priority > Thread.MIN_PRIORITY); + + Assert.assertTrue(cacheTimeout >= LOWEST_CACHE_TIMEOUT_MINUTES); + Assert.assertTrue(corePoolSize > 2 && corePoolSize <= 32); + Assert.assertTrue(maxPoolSize > 2 && maxPoolSize <= 32); + Assert.assertTrue(workerQueueSize >= processorCount * 10); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java index 32e84cb..2721731 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java @@ -17,12 +17,19 @@ */ package org.apache.ambari.server.controller.internal; -import com.google.inject.Binder; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Module; -import com.google.inject.persist.PersistService; -import com.google.inject.util.Modules; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.configuration.ComponentSSLConfiguration; @@ -55,7 +62,9 @@ import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.Host; import org.apache.ambari.server.state.RepositoryVersionState; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.apache.ambari.server.state.stack.Metric; +import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -63,21 +72,12 @@ import org.junit.BeforeClass; import org.junit.Test; import org.springframework.security.core.context.SecurityContextHolder; -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.persist.PersistService; +import com.google.inject.util.Modules; /** * Tests the stack defined property provider. @@ -120,6 +120,12 @@ public class StackDefinedPropertyProviderTest { injector.getInstance(GuiceJpaInitializer.class); StackDefinedPropertyProvider.init(injector); + MetricsRetrievalService metricsRetrievalService = injector.getInstance( + MetricsRetrievalService.class); + + metricsRetrievalService.start(); + metricsRetrievalService.setThreadPoolExecutor(new SynchronousThreadPoolExecutor()); + helper = injector.getInstance(OrmTestHelper.class); clusters = injector.getInstance(Clusters.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java index 1628f1f..fa8d0eb 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/JMXPropertyProviderTest.java @@ -18,6 +18,18 @@ package org.apache.ambari.server.controller.metrics; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; @@ -26,6 +38,7 @@ import org.apache.ambari.server.controller.internal.ResourceImpl; import org.apache.ambari.server.controller.jmx.JMXHostProvider; import org.apache.ambari.server.controller.jmx.JMXPropertyProvider; import org.apache.ambari.server.controller.jmx.TestStreamProvider; +import org.apache.ambari.server.controller.metrics.MetricsServiceProvider.MetricsService; import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.SystemException; @@ -37,6 +50,8 @@ import org.apache.ambari.server.security.authorization.AuthorizationException; import org.apache.ambari.server.security.authorization.AuthorizationHelperInitializer; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; +import org.apache.ambari.server.state.services.MetricsRetrievalService; +import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -47,19 +62,6 @@ import org.springframework.security.core.context.SecurityContextHolder; import com.google.inject.Guice; import com.google.inject.Injector; -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.apache.ambari.server.controller.metrics.MetricsServiceProvider.MetricsService; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; - /** * JMX property provider tests. */ @@ -70,11 +72,20 @@ public class JMXPropertyProviderTest { protected static final String HOST_COMPONENT_STATE_PROPERTY_ID = PropertyHelper.getPropertyId("HostRoles", "state"); public static final int NUMBER_OF_RESOURCES = 400; + private static MetricPropertyProviderFactory metricPropertyProviderFactory; @BeforeClass public static void setupClass() { Injector injector = Guice.createInjector(new InMemoryDefaultTestModule()); JMXPropertyProvider.init(injector.getInstance(Configuration.class)); + + metricPropertyProviderFactory = injector.getInstance(MetricPropertyProviderFactory.class); + + MetricsRetrievalService metricsRetrievalService = injector.getInstance( + MetricsRetrievalService.class); + + metricsRetrievalService.start(); + metricsRetrievalService.setThreadPoolExecutor(new SynchronousThreadPoolExecutor()); } @Before @@ -162,7 +173,7 @@ public class JMXPropertyProviderTest { TestStreamProvider streamProvider = new TestStreamProvider(); TestJMXHostProvider hostProvider = new TestJMXHostProvider(false); TestMetricHostProvider metricsHostProvider = new TestMetricHostProvider(); - JMXPropertyProvider propertyProvider = new JMXPropertyProvider( + JMXPropertyProvider propertyProvider = metricPropertyProviderFactory.createJMXPropertyProvider( PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent), streamProvider, hostProvider, @@ -256,7 +267,7 @@ public class JMXPropertyProviderTest { TestJMXHostProvider hostProvider = new TestJMXHostProvider(false); TestMetricHostProvider metricsHostProvider = new TestMetricHostProvider(); - JMXPropertyProvider propertyProvider = new JMXPropertyProvider( + JMXPropertyProvider propertyProvider = metricPropertyProviderFactory.createJMXPropertyProvider( PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent), streamProvider, hostProvider, @@ -291,7 +302,7 @@ public class JMXPropertyProviderTest { TestJMXHostProvider hostProvider = new TestJMXHostProvider(false); TestMetricHostProvider metricsHostProvider = new TestMetricHostProvider(); - JMXPropertyProvider propertyProvider = new JMXPropertyProvider( + JMXPropertyProvider propertyProvider = metricPropertyProviderFactory.createJMXPropertyProvider( PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent), streamProvider, hostProvider, @@ -328,7 +339,7 @@ public class JMXPropertyProviderTest { TestJMXHostProvider hostProvider = new TestJMXHostProvider(true); TestMetricHostProvider metricsHostProvider = new TestMetricHostProvider(); - JMXPropertyProvider propertyProvider = new JMXPropertyProvider( + JMXPropertyProvider propertyProvider = metricPropertyProviderFactory.createJMXPropertyProvider( PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent), streamProvider, hostProvider, @@ -365,7 +376,7 @@ public class JMXPropertyProviderTest { TestJMXHostProvider hostProvider = new TestJMXHostProvider(true); TestMetricHostProvider metricsHostProvider = new TestMetricHostProvider(); - JMXPropertyProvider propertyProvider = new JMXPropertyProvider( + JMXPropertyProvider propertyProvider = metricPropertyProviderFactory.createJMXPropertyProvider( PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent), streamProvider, hostProvider, @@ -398,7 +409,7 @@ public class JMXPropertyProviderTest { TestMetricHostProvider metricsHostProvider = new TestMetricHostProvider(); Set<Resource> resources = new HashSet<Resource>(); - JMXPropertyProvider propertyProvider = new JMXPropertyProvider( + JMXPropertyProvider propertyProvider = metricPropertyProviderFactory.createJMXPropertyProvider( PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent), streamProvider, hostProvider, @@ -441,7 +452,7 @@ public class JMXPropertyProviderTest { TestMetricHostProvider metricsHostProvider = new TestMetricHostProvider(); Set<Resource> resources = new HashSet<Resource>(); - JMXPropertyProvider propertyProvider = new JMXPropertyProvider( + JMXPropertyProvider propertyProvider = metricPropertyProviderFactory.createJMXPropertyProvider( PropertyHelper.getJMXPropertyIds(Resource.Type.HostComponent), streamProvider, hostProvider, @@ -505,11 +516,11 @@ public class JMXPropertyProviderTest { return null; } - if (componentName.equals("NAMENODE")) + if (componentName.equals("NAMENODE")) { return "50070"; - else if (componentName.equals("DATANODE")) + } else if (componentName.equals("DATANODE")) { return "50075"; - else if (componentName.equals("HBASE_MASTER")) + } else if (componentName.equals("HBASE_MASTER")) { if(clusterName == "c2") { return "60011"; } else { @@ -517,12 +528,13 @@ public class JMXPropertyProviderTest { // any other name (includes hardcoded name "c1"). return "60010"; } - else if (componentName.equals("JOURNALNODE")) + } else if (componentName.equals("JOURNALNODE")) { return "8480"; - else if (componentName.equals("STORM_REST_API")) + } else if (componentName.equals("STORM_REST_API")) { return "8745"; - else + } else { return null; + } } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java index f78024f..304b42f 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/RestMetricsPropertyProviderTest.java @@ -18,8 +18,18 @@ package org.apache.ambari.server.controller.metrics; -import com.google.inject.Guice; -import com.google.inject.Injector; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; @@ -44,8 +54,10 @@ import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.state.services.MetricsRetrievalService; import org.apache.ambari.server.state.stack.Metric; import org.apache.ambari.server.state.stack.MetricDefinition; +import org.apache.ambari.server.utils.SynchronousThreadPoolExecutor; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; @@ -53,18 +65,8 @@ import org.junit.BeforeClass; import org.junit.Test; import org.springframework.security.core.context.SecurityContextHolder; -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; +import com.google.inject.Guice; +import com.google.inject.Injector; /** @@ -112,6 +114,12 @@ public class RestMetricsPropertyProviderTest { c1 = clusters.getCluster("c1"); JMXPropertyProvider.init(injector.getInstance(Configuration.class)); + MetricsRetrievalService metricsRetrievalService = injector.getInstance( + MetricsRetrievalService.class); + + metricsRetrievalService.start(); + metricsRetrievalService.setThreadPoolExecutor(new SynchronousThreadPoolExecutor()); + // Setting up Mocks for Controller, Clusters etc, queried as part of user's Role context // while fetching Metrics. amc = createNiceMock(AmbariManagementController.class); @@ -134,8 +142,8 @@ public class RestMetricsPropertyProviderTest { HashMap<String, Map<String, PropertyInfo>> componentMetrics, StreamProvider streamProvider, TestMetricsHostProvider metricsHostProvider) throws Exception { - RestMetricsPropertyProvider restMetricsPropertyProvider = new RestMetricsPropertyProvider( - injector, + MetricPropertyProviderFactory factory = injector.getInstance(MetricPropertyProviderFactory.class); + RestMetricsPropertyProvider restMetricsPropertyProvider = factory.createRESTMetricsPropertyProvider( metricDefinition.getProperties(), componentMetrics, streamProvider, http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java index f47068c..ede1f1f 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/test/BufferedThreadPoolExecutorCompletionServiceTest.java @@ -22,12 +22,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import junit.framework.Assert; - import org.apache.ambari.server.controller.utilities.BufferedThreadPoolExecutorCompletionService; import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor; import org.junit.Test; +import junit.framework.Assert; + public class BufferedThreadPoolExecutorCompletionServiceTest { private void longOp() throws InterruptedException { @@ -38,7 +38,7 @@ public class BufferedThreadPoolExecutorCompletionServiceTest { /** * Tests that when unbounded queue provided to executor, only * {@link ThreadPoolExecutor#getCorePoolSize()} threads are launched - * + * * @throws InterruptedException */ @Test @@ -76,7 +76,7 @@ public class BufferedThreadPoolExecutorCompletionServiceTest { /** * Tests that when load is more than core-pool-size and less than * max-pool-size, the number of threads scales up. - * + * * @throws InterruptedException */ @Test @@ -114,7 +114,7 @@ public class BufferedThreadPoolExecutorCompletionServiceTest { /** * Tests that when load is more than max-pool-size, the number of threads * scales up. - * + * * @throws InterruptedException */ @Test @@ -152,7 +152,7 @@ public class BufferedThreadPoolExecutorCompletionServiceTest { /** * Tests that when load is more than max-pool-size, the number of threads * scales up. - * + * * @throws InterruptedException */ @Test @@ -160,7 +160,8 @@ public class BufferedThreadPoolExecutorCompletionServiceTest { int CORE_POOL_SIZE = 2; int MAX_POOL_SIZE = 10; int TASKS_COUNT = 24; - ThreadPoolExecutor threadPoolExecutor = new ScalingThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 30000, TimeUnit.MILLISECONDS); + ThreadPoolExecutor threadPoolExecutor = new ScalingThreadPoolExecutor(CORE_POOL_SIZE, + MAX_POOL_SIZE, 30000, TimeUnit.MILLISECONDS, CORE_POOL_SIZE); BufferedThreadPoolExecutorCompletionService<Runnable> service = new BufferedThreadPoolExecutorCompletionService<>(threadPoolExecutor); for (int tc = 0; tc < TASKS_COUNT; tc++) { service.submit(new Runnable() { http://git-wip-us.apache.org/repos/asf/ambari/blob/eec799d1/ambari-server/src/test/java/org/apache/ambari/server/utils/SynchronousThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/SynchronousThreadPoolExecutor.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/SynchronousThreadPoolExecutor.java new file mode 100644 index 0000000..9222a6f --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/SynchronousThreadPoolExecutor.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.utils; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * An {@link Executor} which will run every command on the current thread. + */ +public class SynchronousThreadPoolExecutor extends ThreadPoolExecutor { + + /** + * Constructor. + * + */ + public SynchronousThreadPoolExecutor() { + super(1, 1, 0L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + } + + /** + * {@inheritDoc} + */ + @Override + public List<Runnable> shutdownNow() { + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isShutdown() { + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isTerminated() { + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public void execute(Runnable command) { + command.run(); + } +}
