improving the application monitor creation based on clusters event as well as it is async
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/563fd434 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/563fd434 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/563fd434 Branch: refs/heads/master Commit: 563fd4347f06282b7f1b27a54d05bb83024d3103 Parents: bf2c672 Author: reka <[email protected]> Authored: Wed Dec 3 15:17:25 2014 +0530 Committer: reka <[email protected]> Committed: Wed Dec 3 17:53:35 2014 +0530 ---------------------------------------------------------------------- .../autoscaler/api/AutoScalerServiceImpl.java | 10 ++-- .../autoscaler/context/AutoscalerContext.java | 23 +++++++++ .../AutoscalerTopologyEventReceiver.java | 51 +++----------------- .../stratos/autoscaler/util/AutoscalerUtil.java | 12 +++-- 4 files changed, 42 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/563fd434/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java index 5593e1f..8270ab0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/api/AutoScalerServiceImpl.java @@ -114,31 +114,29 @@ public class AutoScalerServiceImpl implements AutoScalerServiceInterface { public String addDeploymentPolicy(DeploymentPolicy deploymentPolicy) throws InvalidPolicyException { String policyId = PolicyManager.getInstance().deployDeploymentPolicy(deploymentPolicy); //Need to start the application Monitor after validation of the deployment policies. - + //FIXME add validation //Check whether all the clusters are there ApplicationHolder.acquireReadLock(); boolean allClusterInitialized = false; - + String appId = deploymentPolicy.getApplicationId(); try { Application application = ApplicationHolder.getApplications(). getApplication(deploymentPolicy.getApplicationId()); if(application != null) { allClusterInitialized = AutoscalerUtil.allClustersInitialized(application); } - } finally { ApplicationHolder.releaseReadLock(); } - if(allClusterInitialized) { + if(allClusterInitialized && !AutoscalerContext.getInstance().containsPendingMonitor(appId)) { AutoscalerUtil.getInstance(). - startApplicationMonitor(deploymentPolicy.getApplicationId()); + startApplicationMonitor(appId); } else { log.info("The application clusters are not yet created. " + "Waiting for them to be created"); } - return policyId; } http://git-wip-us.apache.org/repos/asf/stratos/blob/563fd434/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/AutoscalerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/AutoscalerContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/AutoscalerContext.java index fe067f7..ca6efb4 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/AutoscalerContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/AutoscalerContext.java @@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.context; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; @@ -37,6 +38,8 @@ public class AutoscalerContext { private Map<String, AbstractClusterMonitor> clusterMonitors; // Map<ApplicationId, ApplicationMonitor> private Map<String, ApplicationMonitor> applicationMonitors; + //pending application monitors + private List<String> pendingApplicationMonitors; private AutoscalerContext() { setClusterMonitors(new HashMap<String, AbstractClusterMonitor>()); @@ -86,4 +89,24 @@ public class AutoscalerContext { public void setApplicationMonitors(Map<String, ApplicationMonitor> applicationMonitors) { this.applicationMonitors = applicationMonitors; } + + public List<String> getPendingApplicationMonitors() { + return pendingApplicationMonitors; + } + + public void setPendingApplicationMonitors(List<String> pendingApplicationMonitors) { + this.pendingApplicationMonitors = pendingApplicationMonitors; + } + + public void addPendingMonitor(String appId) { + this.pendingApplicationMonitors.add(appId); + } + + public void removeFromPendingMonitors(String appId) { + this.pendingApplicationMonitors.remove(appId); + } + + public boolean containsPendingMonitor(String appId) { + return this.pendingApplicationMonitors.contains(appId); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/563fd434/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index 10eee4f..89e711f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -84,46 +84,6 @@ public class AutoscalerTopologyEventReceiver { } - private boolean allClustersInitialized(Application application) { - boolean allClustersInitialized = false; - for (ClusterDataHolder holder : application.getClusterDataRecursively()) { - TopologyManager.acquireReadLockForCluster(holder.getServiceType(), - holder.getClusterId()); - - try { - Topology topology = TopologyManager.getTopology(); - if (topology != null) { - Service service = topology.getService(holder.getServiceType()); - if (service != null) { - if (service.clusterExists(holder.getClusterId())) { - allClustersInitialized = true; - return allClustersInitialized; - } else { - if (log.isDebugEnabled()) { - log.debug("[Cluster] " + holder.getClusterId() + " is not found in " + - "the Topology"); - } - allClustersInitialized = false; - } - } else { - if (log.isDebugEnabled()) { - log.debug("Service is null in the CompleteTopologyEvent"); - } - } - } else { - if (log.isDebugEnabled()) { - log.debug("Topology is null in the CompleteTopologyEvent"); - } - } - } finally { - TopologyManager.releaseReadLockForCluster(holder.getServiceType(), - holder.getClusterId()); - } - } - return allClustersInitialized; - } - - private void addEventListeners() { // Listen to topology events that affect clusters topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { @@ -136,7 +96,7 @@ public class AutoscalerTopologyEventReceiver { Applications applications = ApplicationHolder.getApplications(); if (applications != null) { for (Application application : applications.getApplications().values()) { - if (allClustersInitialized(application)) { + if (AutoscalerUtil.allClustersInitialized(application)) { DeploymentPolicy policy = PolicyManager.getInstance(). getDeploymentPolicyByApplication( application.getUniqueIdentifier()); @@ -178,8 +138,13 @@ public class AutoscalerTopologyEventReceiver { try { //acquire read lock ApplicationHolder.acquireReadLock(); - //start the application monitor - //startApplicationMonitor(appId); + //start the application monitor if the policy exists + DeploymentPolicy policy = PolicyManager.getInstance(). + getDeploymentPolicyByApplication(appId); + if(policy != null && !AutoscalerContext.getInstance(). + containsPendingMonitor(appId)) { + AutoscalerUtil.getInstance().startApplicationMonitor(appId); + } } catch (Exception e) { String msg = "Error processing event " + e.getLocalizedMessage(); log.error(msg, e); http://git-wip-us.apache.org/repos/asf/stratos/blob/563fd434/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java index 476e8bb..2ed1074 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java @@ -351,7 +351,9 @@ public class AutoscalerUtil { public synchronized void startApplicationMonitor(String applicationId) { Thread th = null; - if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) { + AutoscalerContext autoscalerContext = AutoscalerContext.getInstance(); + if (autoscalerContext.getAppMonitor(applicationId) == null) { + autoscalerContext.addPendingMonitor(applicationId); th = new Thread(new ApplicationMonitorAdder(applicationId)); } if (th != null) { @@ -383,10 +385,8 @@ public class AutoscalerUtil { } try { long start = System.currentTimeMillis(); - if (log.isDebugEnabled()) { - log.debug("application monitor is going to be started for [application] " + + log.info("application monitor is going to be started for [application] " + appId); - } try { applicationMonitor = MonitorFactory.getApplicationMonitor(appId); } catch (PolicyValidationException e) { @@ -414,8 +414,10 @@ public class AutoscalerUtil { log.error(msg); throw new RuntimeException(msg); } + AutoscalerContext autoscalerContext = AutoscalerContext.getInstance(); - AutoscalerContext.getInstance().addAppMonitor(applicationMonitor); + autoscalerContext.removeAppMonitor(appId); + autoscalerContext.addAppMonitor(applicationMonitor); if (log.isInfoEnabled()) { log.info(String.format("Application monitor has been added successfully: " + "[application] %s", applicationMonitor.getId()));
