Repository: stratos Updated Branches: refs/heads/master 7df77c8de -> 602a66228
Removing direct invocation of Runnable.run() method in autoscaler monitor classes and introducing executor services Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/602a6622 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/602a6622 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/602a6622 Branch: refs/heads/master Commit: 602a6622855c2b108daf89c973af58ad1adc5fcb Parents: 7df77c8 Author: Imesh Gunaratne <[email protected]> Authored: Sun Jan 25 01:23:40 2015 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sun Jan 25 01:24:13 2015 +0530 ---------------------------------------------------------------------- .../internal/AutoscalerServiceComponent.java | 40 +++++++++++++++++--- .../monitor/cluster/ClusterMonitor.java | 26 ++++++------- .../monitor/component/ApplicationMonitor.java | 19 ++++++++-- .../monitor/component/GroupMonitor.java | 17 +++++++-- .../autoscaler/util/AutoscalerConstants.java | 8 ++++ 5 files changed, 85 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/602a6622/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java index 1c5e980..e3b1faa 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServiceComponent.java @@ -201,11 +201,41 @@ public class AutoscalerServiceComponent { // Shutdown executor service if(executorService != null) { - try { - executorService.shutdownNow(); - } catch (Exception e) { - log.warn("An error occurred while shutting down autoscaler executor service", e); - } + shutdownExecutorService(executorService); + } + + // Shutdown application monitor executor service + shutdownExecutorService(AutoscalerConstants.APPLICATION_MONITOR_THREAD_POOL_ID); + + // Shutdown group monitor executor service + shutdownExecutorService(AutoscalerConstants.GROUP_MONITOR_THREAD_POOL_ID); + + // Shutdown cluster monitor scheduler executor service + shutdownScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID); + + // Shutdown cluster monitor executor service + shutdownExecutorService(AutoscalerConstants.CLUSTER_MONITOR_THREAD_POOL_ID); + } + + private void shutdownExecutorService(String executorServiceId) { + ExecutorService executorService = StratosThreadPool.getExecutorService(executorServiceId, 1); + if(executorService != null) { + shutdownExecutorService(executorService); + } + } + + private void shutdownScheduledExecutorService(String executorServiceId) { + ExecutorService executorService = StratosThreadPool.getScheduledExecutorService(executorServiceId, 1); + if(executorService != null) { + shutdownExecutorService(executorService); + } + } + + private void shutdownExecutorService(ExecutorService executorService) { + try { + executorService.shutdownNow(); + } catch (Exception e) { + log.warn("An error occurred while shutting down executor service", e); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/602a6622/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java index c9da0f8..e212eda 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -54,6 +54,7 @@ import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.Properties; import org.apache.stratos.common.Property; import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.domain.application.ApplicationStatus; import org.apache.stratos.messaging.domain.application.GroupStatus; import org.apache.stratos.messaging.domain.instance.ClusterInstance; @@ -70,7 +71,7 @@ import org.drools.runtime.StatefulKnowledgeSession; import org.drools.runtime.rule.FactHandle; import java.util.*; -import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -82,7 +83,9 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class ClusterMonitor extends Monitor implements Runnable { - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduler; + private final ExecutorService executorService; + protected FactHandle minCheckFactHandle; protected FactHandle obsoleteCheckFactHandle; protected FactHandle scaleCheckFactHandle; @@ -113,7 +116,12 @@ public class ClusterMonitor extends Monitor implements Runnable { public ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree) { - this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>(); + scheduler = StratosThreadPool.getScheduledExecutorService(AutoscalerConstants.CLUSTER_MONITOR_SCHEDULER_ID, 1); + int threadPoolSize = Integer.getInteger(AutoscalerConstants.CLUSTER_MONITOR_THREAD_POOL_SIZE, 10); + executorService = StratosThreadPool.getExecutorService( + AutoscalerConstants.CLUSTER_MONITOR_THREAD_POOL_ID, threadPoolSize); + + networkPartitionIdToClusterLevelNetworkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>(); readConfigurations(); autoscalerRuleEvaluator = new AutoscalerRuleEvaluator(); autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.OBSOLETE_CHECK_DROOL_FILE); @@ -142,10 +150,6 @@ public class ClusterMonitor extends Monitor implements Runnable { scheduler.scheduleAtFixedRate(this, 0, getMonitorIntervalMilliseconds(), TimeUnit.MILLISECONDS); } - protected void stopScheduler() { - scheduler.shutdownNow(); - } - @Override public int hashCode() { final int prime = 31; @@ -578,7 +582,7 @@ public class ClusterMonitor extends Monitor implements Runnable { } }; - monitoringRunnable.run(); + executorService.execute(monitoringRunnable); } for (final ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) { @@ -589,11 +593,8 @@ public class ClusterMonitor extends Monitor implements Runnable { getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext); } }; - - monitoringRunnable.run(); - + executorService.execute(monitoringRunnable); } - } } } @@ -613,7 +614,6 @@ public class ClusterMonitor extends Monitor implements Runnable { getObsoleteCheckKnowledgeSession().dispose(); getScaleCheckKnowledgeSession().dispose(); setDestroyed(true); - stopScheduler(); if (log.isDebugEnabled()) { log.debug("ClusterMonitor Drools session has been disposed. " + this.toString()); } http://git-wip-us.apache.org/repos/asf/stratos/blob/602a6622/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java index 9ff3075..c300263 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ApplicationMonitor.java @@ -36,7 +36,9 @@ import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBu import org.apache.stratos.autoscaler.pojo.policy.PolicyManager; import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy; import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.NetworkPartition; +import org.apache.stratos.autoscaler.util.AutoscalerConstants; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.domain.application.Application; import org.apache.stratos.messaging.domain.application.ApplicationStatus; import org.apache.stratos.messaging.domain.application.GroupStatus; @@ -44,22 +46,33 @@ import org.apache.stratos.messaging.domain.instance.ApplicationInstance; import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.lifecycle.LifeCycleState; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; /** * ApplicationMonitor is to control the child monitors */ public class ApplicationMonitor extends ParentComponentMonitor { + private static final Log log = LogFactory.getLog(ApplicationMonitor.class); + private final ExecutorService executorService; + //Flag to set whether application is terminating private boolean isTerminating; - public ApplicationMonitor(Application application) throws DependencyBuilderException, TopologyInConsistentException { super(application); + + int threadPoolSize = Integer.getInteger(AutoscalerConstants.APPLICATION_MONITOR_THREAD_POOL_SIZE, 10); + this.executorService = StratosThreadPool.getExecutorService( + AutoscalerConstants.APPLICATION_MONITOR_THREAD_POOL_ID, threadPoolSize); + //setting the appId for the application this.appId = application.getUniqueIdentifier(); } @@ -109,7 +122,7 @@ public class ApplicationMonitor extends ParentComponentMonitor { } } }; - monitoringRunnable.run(); + executorService.execute(monitoringRunnable); } private void handleScalingMaxOut(InstanceContext instanceContext, http://git-wip-us.apache.org/repos/asf/stratos/blob/602a6622/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java index c45b21e..1cf4546 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/GroupMonitor.java @@ -39,7 +39,9 @@ import org.apache.stratos.autoscaler.pojo.policy.PolicyManager; import org.apache.stratos.autoscaler.pojo.policy.deployment.ChildPolicy; import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.ChildLevelNetworkPartition; import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.ChildLevelPartition; +import org.apache.stratos.autoscaler.util.AutoscalerConstants; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; +import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.domain.application.Application; import org.apache.stratos.messaging.domain.application.ApplicationStatus; import org.apache.stratos.messaging.domain.application.Group; @@ -53,6 +55,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; /** * This is GroupMonitor to monitor the group which consists of @@ -61,6 +64,8 @@ import java.util.concurrent.ConcurrentHashMap; public class GroupMonitor extends ParentComponentMonitor { private static final Log log = LogFactory.getLog(GroupMonitor.class); + + private final ExecutorService executorService; //has scaling dependents protected boolean hasScalingDependents; //Indicates whether groupScaling enabled or not @@ -77,6 +82,11 @@ public class GroupMonitor extends ParentComponentMonitor { boolean hasScalingDependents) throws DependencyBuilderException, TopologyInConsistentException { super(group); + + int threadPoolSize = Integer.getInteger(AutoscalerConstants.GROUP_MONITOR_THREAD_POOL_SIZE, 10); + this.executorService = StratosThreadPool.getExecutorService( + AutoscalerConstants.GROUP_MONITOR_THREAD_POOL_ID, threadPoolSize); + this.groupScalingEnabled = group.isGroupScalingEnabled(); this.appId = appId; this.hasScalingDependents = hasScalingDependents; @@ -127,7 +137,7 @@ public class GroupMonitor extends ParentComponentMonitor { } } }; - monitoringRunnable.run(); + executorService.execute(monitoringRunnable); } private void handleScalingMaxOut(InstanceContext instanceContext, @@ -223,7 +233,7 @@ public class GroupMonitor extends ParentComponentMonitor { appId); } }; - sendScaleMaxOut.run(); + executorService.execute(sendScaleMaxOut); } } else { if (log.isDebugEnabled()) { @@ -243,8 +253,7 @@ public class GroupMonitor extends ParentComponentMonitor { appId); } }; - sendScaleMaxOut.run(); - + executorService.execute(sendScaleMaxOut); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/602a6622/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java index 9472b9f..27f5f4b 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerConstants.java @@ -70,4 +70,12 @@ public final class AutoscalerConstants { * Payload values */ public static final String PAYLOAD_DEPLOYMENT = "default"; + + public static final String APPLICATION_MONITOR_THREAD_POOL_ID = "application.monitor.thread.pool"; + public static final String APPLICATION_MONITOR_THREAD_POOL_SIZE = "application.monitor.thread.pool.size"; + public static final String GROUP_MONITOR_THREAD_POOL_ID = "group.monitor.thread.pool"; + public static final String GROUP_MONITOR_THREAD_POOL_SIZE = "group.monitor.thread.pool.size"; + public static final String CLUSTER_MONITOR_SCHEDULER_ID = "cluster.monitor.scheduler"; + public static final String CLUSTER_MONITOR_THREAD_POOL_ID = "cluster.monitor.thread.pool"; + public static final String CLUSTER_MONITOR_THREAD_POOL_SIZE = "cluster.monitor.thread.pool.size"; }
