AMBARI-20345 - Alert Event Publisher Executor Doesn't Scale Threads (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/07a30a12 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/07a30a12 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/07a30a12 Branch: refs/heads/branch-dev-logsearch Commit: 07a30a12b6b7b091e735f9688ebc6934d4b9cf23 Parents: dff7754 Author: Jonathan Hurley <[email protected]> Authored: Tue Mar 7 11:59:13 2017 -0500 Committer: Jonathan Hurley <[email protected]> Committed: Tue Mar 7 14:43:11 2017 -0500 ---------------------------------------------------------------------- ambari-server/docs/configuration/index.md | 12 +- .../server/configuration/Configuration.java | 134 +++++++++++++------ .../events/publishers/AlertEventPublisher.java | 21 +-- .../internal/AlertResourceProviderTest.java | 4 +- .../server/orm/dao/AlertsDAOCachedTest.java | 6 +- .../services/CachedAlertFlushServiceTest.java | 7 +- 6 files changed, 126 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/docs/configuration/index.md ---------------------------------------------------------------------- diff --git a/ambari-server/docs/configuration/index.md b/ambari-server/docs/configuration/index.md index 58eb7c7..f836fc9 100644 --- a/ambari-server/docs/configuration/index.md +++ b/ambari-server/docs/configuration/index.md @@ -54,7 +54,9 @@ The following are the properties which can be used to configure Ambari. | alerts.cache.enabled | Determines whether current alerts should be cached. Enabling this can increase performance on large cluster, but can also result in lost alert data if the cache is not flushed frequently. |`false` | | alerts.cache.flush.interval | The time, in minutes, after which cached alert information is flushed to the database<br/><br/> This property is related to `alerts.cache.enabled`. |`10` | | alerts.cache.size | The size of the alert cache.<br/><br/> This property is related to `alerts.cache.enabled`. |`50000` | -| alerts.execution.scheduler.maxThreads | The number of threads used to handle alerts received from the Ambari Agents. The value should be increased as the size of the cluster increases. |`2` | +| alerts.execution.scheduler.threadpool.size.core | The core number of threads used to process incoming alert events. The value should be increased as the size of the cluster increases. |`2` | +| alerts.execution.scheduler.threadpool.size.max | The number of threads used to handle alerts received from the Ambari Agents. The value should be increased as the size of the cluster increases. |`2` | +| alerts.execution.scheduler.threadpool.worker.size | The number of queued alerts allowed before discarding old alerts which have not been handled. The value should be increased as the size of the cluster increases. |`2000` | | alerts.snmp.dispatcher.udp.port | The UDP port to use when binding the SNMP dispatcher on Ambari Server startup. If no port is specified, then a random port will be used. | | | alerts.template.file | The full path to the XML file that describes the different alert templates. | | | ambari.display.url | The URL to use when creating messages which should include the Ambari Server URL.<br/><br/>The following are examples of valid values:<ul><li>`http://ambari.apache.org:8080`</ul> | | @@ -244,8 +246,8 @@ The following are the properties which can be used to configure Ambari. | server.property-provider.threadpool.size.core | The core number of threads that will be used to retrieve data from federated datasources, such as remote JMX endpoints. |`16` | | server.property-provider.threadpool.size.max | The maximum number of threads that will be used to retrieve data from federated datasources, such as remote JMX endpoints. |`32` | | server.property-provider.threadpool.worker.size | The maximum size of pending federated datasource requests, such as those to JMX endpoints, which can be queued before rejecting new requests. |`2147483647` | -| server.script.threads | The number of threads that should be allocated to run external script. |`4` | -| server.script.timeout | The time, in milliseconds, until an external script is killed. |`5000` | +| server.script.threads | The number of threads that should be allocated to run external script. |`20` | +| server.script.timeout | The time, in milliseconds, until an external script is killed. |`10000` | | server.stage.command.execution_type | How to execute commands in one stage |`STAGE` | | server.stages.parallel | Determines whether operations in different execution requests can be run concurrently. |`true` | | server.startup.web.timeout | The time, in seconds, that the ambari-server Python script will wait for Jetty to startup before returning an error code. |`50` | @@ -303,7 +305,9 @@ As the size of a cluster grows, some of the default property values may no longe ####Alerts & Notifications | Property Name | 10 Hosts | ~50 Hosts | ~100 Hosts | 500+ Hosts | | --- | --- | --- | --- | --- | -| alerts.execution.scheduler.maxThreads | 2 | 2 | 4 | 4 | +| alerts.execution.scheduler.threadpool.size.core | 2 | 2 | 4 | 4 | +| alerts.execution.scheduler.threadpool.size.max | 2 | 2 | 8 | 8 | +| alerts.execution.scheduler.threadpool.worker.size | 400 | 2000 | 4000 | 20000 | | alerts.cache.enabled | false | false | false | true | | alerts.cache.flush.interval | 10 | 10 | 10 | 10 | | alerts.cache.size | 50000 | 50000 | 100000 | 100000 | http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java index cfb4d3d..405251e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java @@ -17,16 +17,36 @@ */ package org.apache.ambari.server.configuration; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; -import com.google.inject.Inject; -import com.google.inject.Singleton; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.Writer; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.lang.reflect.Field; +import java.security.cert.CertificateException; +import java.security.interfaces.RSAPublicKey; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.ambari.annotations.Experimental; import org.apache.ambari.annotations.ExperimentalFeature; import org.apache.ambari.annotations.Markdown; @@ -35,6 +55,7 @@ import org.apache.ambari.server.actionmanager.CommandExecutionType; import org.apache.ambari.server.actionmanager.HostRoleCommand; import org.apache.ambari.server.actionmanager.Stage; import org.apache.ambari.server.controller.spi.PropertyProvider; +import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor; import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener; import org.apache.ambari.server.orm.JPATableGenerationStrategy; import org.apache.ambari.server.orm.PersistenceType; @@ -70,34 +91,16 @@ import org.apache.commons.lang.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.InputStream; -import java.io.Writer; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.lang.reflect.Field; -import java.security.cert.CertificateException; -import java.security.interfaces.RSAPublicKey; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import com.google.inject.Inject; +import com.google.inject.Singleton; /** * The {@link Configuration} class is used to read from the @@ -2206,7 +2209,7 @@ public class Configuration { "alerts.template.file", null); /** - * The maximum number of threads which will handle published alert events. + * The core number of threads which will handle published alert events. */ @ConfigurationMarkdown( group = ConfigurationGrouping.ALERTS, @@ -2216,9 +2219,40 @@ public class Configuration { @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "4"), @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "4") }, markdown = @Markdown( + description = "The core number of threads used to process incoming alert events. The value should be increased as the size of the cluster increases.")) + public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE = new ConfigurationProperty<>( + "alerts.execution.scheduler.threadpool.size.core", 2); + + /** + * The maximum number of threads which will handle published alert events. + */ + @ConfigurationMarkdown( + group = ConfigurationGrouping.ALERTS, + scaleValues = { + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "2"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "8"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "8") }, + markdown = @Markdown( description = "The number of threads used to handle alerts received from the Ambari Agents. The value should be increased as the size of the cluster increases.")) - public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS = new ConfigurationProperty<>( - "alerts.execution.scheduler.maxThreads", 2); + public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE = new ConfigurationProperty<>( + "alerts.execution.scheduler.threadpool.size.max", 2); + + /** + * The size of the {@link BlockingQueue} used to control the + * {@link ScalingThreadPoolExecutor} when handling incoming alert events. + */ + @ConfigurationMarkdown( + group = ConfigurationGrouping.ALERTS, + scaleValues = { + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "400"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2000"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "4000"), + @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "20000") }, + markdown = @Markdown( + description = "The number of queued alerts allowed before discarding old alerts which have not been handled. The value should be increased as the size of the cluster increases.")) + public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE = new ConfigurationProperty<>( + "alerts.execution.scheduler.threadpool.worker.size", 2000); /** * If {@code true} then alert information is cached and not immediately @@ -4511,10 +4545,24 @@ public class Configuration { } /** + * @return core thread pool size for AlertEventPublisher, default 2 + */ + public int getAlertEventPublisherCorePoolSize() { + return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE)); + } + + /** * @return max thread pool size for AlertEventPublisher, default 2 */ - public int getAlertEventPublisherPoolSize() { - return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS)); + public int getAlertEventPublisherMaxPoolSize() { + return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE)); + } + + /** + * @return the size of the queue for unhandled alert events + */ + public int getAlertEventPublisherWorkerQueueSize() { + return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE)); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java index 5427a7b..78d5df6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AlertEventPublisher.java @@ -17,7 +17,6 @@ */ package org.apache.ambari.server.events.publishers; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -25,6 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; import com.google.inject.Inject; import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor; import org.apache.ambari.server.events.AlertEvent; import com.google.common.eventbus.AsyncEventBus; @@ -53,13 +53,18 @@ public final class AlertEventPublisher { */ @Inject public AlertEventPublisher(Configuration config) { - // create a fixed executor that is unbounded for now and will run rejected - // requests in the calling thread to prevent loss of alert handling - int poolsize = config.getAlertEventPublisherPoolSize(); - ThreadPoolExecutor executor = new ThreadPoolExecutor(2, poolsize, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), - new AlertEventBusThreadFactory(), - new ThreadPoolExecutor.CallerRunsPolicy()); + // create an executor which will scale with the number of queued work items + // when handling incoming alerts + int corePoolSize = config.getAlertEventPublisherCorePoolSize(); + int maxPoolSize = config.getAlertEventPublisherMaxPoolSize(); + int workerQueueSize = config.getAlertEventPublisherWorkerQueueSize(); + + ThreadPoolExecutor executor = new ScalingThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, + TimeUnit.SECONDS, workerQueueSize); + + executor.allowCoreThreadTimeOut(false); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); + executor.setThreadFactory(new AlertEventBusThreadFactory()); m_eventBus = new AsyncEventBus(executor); } http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java index 25b9821..f1c5c35 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertResourceProviderTest.java @@ -737,7 +737,9 @@ public class AlertResourceProviderTest { expect(configuration.getDatabaseDriver()).andReturn(JDBC_IN_MEMORY_DRIVER).anyTimes(); expect(configuration.getDatabaseUser()).andReturn("sa").anyTimes(); expect(configuration.getDatabasePassword()).andReturn("").anyTimes(); - expect(configuration.getAlertEventPublisherPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes(); expect(configuration.getMasterKeyLocation()).andReturn(new File("/test")).anyTimes(); expect(configuration.getTemporaryKeyStoreRetentionMinutes()).andReturn(2l).anyTimes(); expect(configuration.isActivelyPurgeTemporaryKeyStore()).andReturn(true).anyTimes(); http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java index 02d942a..f47a997 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOCachedTest.java @@ -17,6 +17,8 @@ */ package org.apache.ambari.server.orm.dao; +import static org.easymock.EasyMock.expect; + import java.util.List; import javax.persistence.EntityManager; @@ -274,7 +276,9 @@ public class AlertsDAOCachedTest { // required for since the configuration is being mocked Configuration configuration = EasyMock.createNiceMock(Configuration.class); - EasyMock.expect(configuration.getAlertEventPublisherPoolSize()).andReturn(2).anyTimes(); + EasyMock.expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes(); + EasyMock.expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes(); + EasyMock.expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes(); EasyMock.expect(configuration.isAlertCacheEnabled()).andReturn(Boolean.TRUE).anyTimes(); EasyMock.expect(configuration.getAlertCacheSize()).andReturn(100).anyTimes(); EasyMock.replay(configuration); http://git-wip-us.apache.org/repos/asf/ambari/blob/07a30a12/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java index 0ad67d0..992c7c4 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/services/CachedAlertFlushServiceTest.java @@ -17,6 +17,8 @@ */ package org.apache.ambari.server.state.services; +import static org.easymock.EasyMock.expect; + import javax.persistence.EntityManager; import org.apache.ambari.server.configuration.Configuration; @@ -123,7 +125,10 @@ public class CachedAlertFlushServiceTest extends EasyMockSupport { // required for since the configuration is being mocked Configuration configuration = createNiceMock(Configuration.class); - EasyMock.expect(configuration.getAlertEventPublisherPoolSize()).andReturn(2).anyTimes(); + expect(configuration.getAlertEventPublisherCorePoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherMaxPoolSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE.getDefaultValue())).anyTimes(); + expect(configuration.getAlertEventPublisherWorkerQueueSize()).andReturn(Integer.valueOf(Configuration.ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE.getDefaultValue())).anyTimes(); + EasyMock.replay(configuration);
