merge with new changes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/9e6e91d6 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/9e6e91d6 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/9e6e91d6 Branch: refs/heads/master Commit: 9e6e91d6280ca0e9d38f96dfeeaa9eb9d2f561cb Parents: bfc263a Author: gayan <[email protected]> Authored: Tue Dec 2 16:33:45 2014 +0530 Committer: gayan <[email protected]> Committed: Tue Dec 2 16:33:45 2014 +0530 ---------------------------------------------------------------------- .../AutoscalerTopologyEventReceiver.java | 260 ++++++++----------- 1 file changed, 105 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/9e6e91d6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index d59fa7d..bfdf30b 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -21,19 +21,20 @@ package org.apache.stratos.autoscaler.event.receiver.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.context.AutoscalerContext; import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory; import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; -import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException; import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException; import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; -import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException; -import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; -import org.apache.stratos.autoscaler.monitor.MonitorFactory; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; +import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent; +import org.apache.stratos.autoscaler.pojo.policy.PolicyManager; +import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy; +import org.apache.stratos.autoscaler.util.AutoscalerUtil; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.applications.Applications; @@ -71,7 +72,7 @@ public class AutoscalerTopologyEventReceiver{ public void execute() { //FIXME this activated before autoscaler deployer activated. - topologyEventReceiver.setExecutorService(executorService); + topologyEventReceiver.setExecutorService(getExecutorService()); topologyEventReceiver.execute(); if (log.isInfoEnabled()) { @@ -143,7 +144,17 @@ public class AutoscalerTopologyEventReceiver{ if (applications != null) { for (Application application : applications.getApplications().values()) { if (allClustersInitialized(application)) { - startApplicationMonitor(application.getUniqueIdentifier()); + DeploymentPolicy policy = PolicyManager.getInstance(). + getDeploymentPolicyByApplication( + application.getUniqueIdentifier()); + if (policy != null) { + AutoscalerUtil.getInstance(). + startApplicationMonitor(application.getUniqueIdentifier()); + } else { + log.info("The relevant application policy is not yet " + + "deployed for this [application] " + + application.getUniqueIdentifier()); + } } else { log.error("Complete Topology is not consistent with the applications " + "which got persisted"); @@ -175,7 +186,7 @@ public class AutoscalerTopologyEventReceiver{ //acquire read lock ApplicationHolder.acquireReadLock(); //start the application monitor - startApplicationMonitor(appId); + //startApplicationMonitor(appId); } catch (Exception e) { String msg = "Error processing event " + e.getLocalizedMessage(); log.error(msg, e); @@ -204,7 +215,7 @@ public class AutoscalerTopologyEventReceiver{ if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } @@ -225,7 +236,7 @@ public class AutoscalerTopologyEventReceiver{ if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } @@ -255,7 +266,7 @@ public class AutoscalerTopologyEventReceiver{ if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } @@ -277,7 +288,7 @@ public class AutoscalerTopologyEventReceiver{ if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } // if monitor does not exist, send cluster terminated event ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(), @@ -286,12 +297,12 @@ public class AutoscalerTopologyEventReceiver{ } //changing the status in the monitor, will notify its parent monitor if (monitor.getStatus() == ClusterStatus.Active) { - // terminated gracefully - monitor.setStatus(ClusterStatus.Terminating); - InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId); + // terminated gracefully + monitor.setStatus(ClusterStatus.Terminating); + InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId); } else { - monitor.setStatus(ClusterStatus.Terminating); - monitor.terminateAllMembers(); + monitor.setStatus(ClusterStatus.Terminating); + monitor.terminateAllMembers(); } ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain(). process("", clusterId, instanceId); @@ -310,11 +321,11 @@ public class AutoscalerTopologyEventReceiver{ if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } // if the cluster monitor is null, assume that its termianted ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId()); - if (appMonitor != null) { + if (appMonitor != null) { appMonitor.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null)); } return; @@ -355,7 +366,7 @@ public class AutoscalerTopologyEventReceiver{ topologyEventReceiver.addEventListener(new MemberStartedEventListener() { @Override protected void onEvent(Event event) { - + } }); @@ -432,153 +443,92 @@ public class AutoscalerTopologyEventReceiver{ }); topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() { - @Override - protected void onEvent(Event event) { - - ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = - (ClusterInstanceCreatedEvent) event; - AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance(). - getClusterMonitor(clusterInstanceCreatedEvent.getClusterId()); - - if (clusterMonitor != null) { - TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), - clusterInstanceCreatedEvent.getClusterId()); - - try { - Service service = TopologyManager.getTopology(). - getService(clusterInstanceCreatedEvent.getServiceName()); - - if (service != null) { - Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId()); - if (cluster != null) { - // create and add Cluster Context - try { - if (cluster.isKubernetesCluster()) { - clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getKubernetesClusterContext(cluster)); - } else if (cluster.isLbCluster()) { - clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getVMLBClusterContext(cluster)); - } else { - clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getVMServiceClusterContext(cluster)); - } + @Override + protected void onEvent(Event event) { + + ClusterInstanceCreatedEvent clusterInstanceCreatedEvent = + (ClusterInstanceCreatedEvent) event; + AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance(). + getClusterMonitor(clusterInstanceCreatedEvent.getClusterId()); + String instanceId = ((ClusterInstanceCreatedEvent) event).getInstanceId(); + //FIXME to take lock when clusterMonitor is running + if (clusterMonitor != null) { + TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), + clusterInstanceCreatedEvent.getClusterId()); + + try { + Service service = TopologyManager.getTopology(). + getService(clusterInstanceCreatedEvent.getServiceName()); + + if (service != null) { + Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId()); + if (cluster != null) { + try { + if (cluster.isKubernetesCluster()) { + clusterMonitor.setClusterContext( + ClusterContextFactory.getKubernetesClusterContext( + instanceId, + cluster)); + } else { + VMClusterContext clusterContext = + (VMClusterContext) clusterMonitor.getClusterContext(); + if (clusterContext == null) { + clusterMonitor.setClusterContext( + ClusterContextFactory. + getVMClusterContext(instanceId, + cluster)); + } + clusterContext.addInstanceContext(instanceId, cluster); + + + } + if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) { + clusterMonitor.startScheduler(); + log.info("Monitoring task for Cluster Monitor with cluster id " + + clusterInstanceCreatedEvent.getClusterId() + " started successfully"); + } + } catch (PolicyValidationException e) { + log.error(e.getMessage(), e); + } catch (PartitionValidationException e) { + log.error(e.getMessage(), e); + } + } - if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) { - clusterMonitor.startScheduler(); - log.info("Monitoring task for Cluster Monitor with cluster id " + - clusterInstanceCreatedEvent.getClusterId() + " started successfully"); - } - - } catch (PolicyValidationException e) { - log.error(e.getMessage(), e); - } catch (PartitionValidationException e) { - log.error(e.getMessage(), e); - } - - } else { - log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() + - ", no cluster instance added to ClusterMonitor " + - clusterInstanceCreatedEvent.getClusterId()); - } } else { - log.error("Service " + clusterInstanceCreatedEvent.getServiceName() + - " not found, no cluster instance added to ClusterMonitor " + - clusterInstanceCreatedEvent.getClusterId()); - } - - } finally { - TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), - clusterInstanceCreatedEvent.getClusterId()); - } - - } else { - log.error("No Cluster Monitor found for cluster id " + - clusterInstanceCreatedEvent.getClusterId()); - } - } - }); + log.error("Service " + clusterInstanceCreatedEvent.getServiceName() + + " not found, no cluster instance added to ClusterMonitor " + + clusterInstanceCreatedEvent.getClusterId()); + } + + } finally { + TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), + clusterInstanceCreatedEvent.getClusterId()); + } + + } else { + log.error("No Cluster Monitor found for cluster id " + + clusterInstanceCreatedEvent.getClusterId()); + } + } + } + + ); } /** * Terminate load balancer topology receiver thread. */ + public void terminate() { topologyEventReceiver.terminate(); terminated = true; } - protected synchronized void startApplicationMonitor(String applicationId) { - Thread th = null; - if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) { - th = new Thread(new ApplicationMonitorAdder(applicationId)); - } - if (th != null) { - th.start(); - } else { - if (log.isDebugEnabled()) { - log.debug(String - .format("Application monitor thread already exists: " + - "[application] %s ", applicationId)); - } - } - } - - private class ApplicationMonitorAdder implements Runnable { - private String appId; - - public ApplicationMonitorAdder(String appId) { - this.appId = appId; - } - - public void run() { - ApplicationMonitor applicationMonitor = null; - int retries = 5; - boolean success = false; - do { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - try { - long start = System.currentTimeMillis(); - if (log.isDebugEnabled()) { - log.debug("application monitor is going to be started for [application] " + - appId); - } - try { - applicationMonitor = MonitorFactory.getApplicationMonitor(appId); - } catch (PolicyValidationException e) { - String msg = "Application monitor creation failed for Application: "; - log.warn(msg, e); - retries--; - } - long end = System.currentTimeMillis(); - log.info("Time taken to start app monitor: " + (end - start) / 1000); - success = true; - } catch (DependencyBuilderException e) { - String msg = "Application monitor creation failed for Application: "; - log.warn(msg, e); - retries--; - } catch (TopologyInConsistentException e) { - String msg = "Application monitor creation failed for Application: "; - log.warn(msg, e); - retries--; - } - } while (!success && retries != 0); - - if (applicationMonitor == null) { - String msg = "Application monitor creation failed, even after retrying for 5 times, " - + "for Application: " + appId; - log.error(msg); - throw new RuntimeException(msg); - } + public ExecutorService getExecutorService() { + return executorService; + } - AutoscalerContext.getInstance().addAppMonitor(applicationMonitor); - if (log.isInfoEnabled()) { - log.info(String.format("Application monitor has been added successfully: " + - "[application] %s", applicationMonitor.getId())); - } - } - } + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } }
