http://git-wip-us.apache.org/repos/asf/stratos/blob/89fb37af/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java index 8ec9f8e,b13465f..6f90516 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java @@@ -27,29 -25,20 +27,33 @@@ import org.apache.commons.logging.Log import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.Constants; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; +import org.apache.stratos.autoscaler.exception.NonExistingKubernetesGroupException; + import org.apache.stratos.autoscaler.exception.CartridgeInformationException; import org.apache.stratos.autoscaler.exception.PartitionValidationException; import org.apache.stratos.autoscaler.exception.SpawningException; import org.apache.stratos.autoscaler.exception.TerminationException; +import org.apache.stratos.autoscaler.kubernetes.KubernetesManager; import org.apache.stratos.autoscaler.util.ConfUtil; -import org.apache.stratos.cloud.controller.stub.*; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidCartridgeTypeExceptionException; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidClusterExceptionException; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidIaasProviderExceptionException; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidMemberExceptionException; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidPartitionExceptionException; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceMemberTerminationFailedExceptionException; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceStub; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException; import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; +import org.apache.stratos.cloud.controller.stub.pojo.ContainerClusterContext; + import org.apache.stratos.cloud.controller.stub.pojo.CartridgeInfo; import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; import org.apache.stratos.cloud.controller.stub.pojo.Properties; import org.apache.stratos.cloud.controller.stub.pojo.Property; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.common.kubernetes.KubernetesGroup; +import org.apache.stratos.common.kubernetes.KubernetesMaster; + import java.rmi.RemoteException; + /** * This class will call cloud controller web service to take the action decided by Autoscaler @@@ -239,109 -228,20 +243,125 @@@ public class CloudControllerClient } } + public CartridgeInfo getCartrdgeInformation (String cartridgeType) throws CartridgeInformationException { + + try { + return stub.getCartridgeInfo(cartridgeType); + + } catch (RemoteException e) { + String msg = e.getMessage(); + log.error(msg, e); + throw new CartridgeInformationException(msg, e); + } catch (CloudControllerServiceUnregisteredCartridgeExceptionException e) { + String msg = e.getMessage(); + log.error(msg, e); + throw new CartridgeInformationException(msg, e); + } + } + + /** + * @param kubernetesClusterId + * @param clusterId + * @return + * @throws SpawningException + */ + public synchronized MemberContext[] startContainers(String kubernetesClusterId, String clusterId) throws SpawningException { + try { - - KubernetesManager kubernetesManager = KubernetesManager.getInstance(); - KubernetesMaster kubernetesMaster = kubernetesManager.getKubernetesMasterInGroup(kubernetesClusterId); - String kubernetesMasterIP = kubernetesMaster.getHostIpAddress(); - KubernetesGroup kubernetesGroup = kubernetesManager.getKubernetesGroup(kubernetesClusterId); - int lower = kubernetesGroup.getPortRange().getLower(); - int upper = kubernetesGroup.getPortRange().getUpper(); - String portRange = Integer.toString(lower) + "-" + Integer.toString(upper); - ++ ++ KubernetesManager kubernetesManager = KubernetesManager.getInstance(); ++ KubernetesMaster kubernetesMaster = kubernetesManager.getKubernetesMasterInGroup(kubernetesClusterId); ++ String kubernetesMasterIP = kubernetesMaster.getHostIpAddress(); ++ KubernetesGroup kubernetesGroup = kubernetesManager.getKubernetesGroup(kubernetesClusterId); ++ int lower = kubernetesGroup.getPortRange().getLower(); ++ int upper = kubernetesGroup.getPortRange().getUpper(); ++ String portRange = Integer.toString(lower) + "-" + Integer.toString(upper); ++ + ContainerClusterContext context = new ContainerClusterContext(); + context.setClusterId(clusterId); + Properties memberContextProps = new Properties(); + Property kubernetesClusterMasterIPProps = new Property(); + kubernetesClusterMasterIPProps.setName(StratosConstants.KUBERNETES_MASTER_IP); + kubernetesClusterMasterIPProps.setValue(kubernetesMasterIP); + memberContextProps.addProperties(kubernetesClusterMasterIPProps); + Property kubernetesClusterPortRangeProps = new Property(); + kubernetesClusterPortRangeProps.setName(StratosConstants.KUBERNETES_PORT_RANGE); + kubernetesClusterPortRangeProps.setValue(portRange); + memberContextProps.addProperties(kubernetesClusterPortRangeProps); + context.setProperties(memberContextProps); + long startTime = System.currentTimeMillis(); + MemberContext[] memberContexts = stub.startContainers(context); - ++ + if(log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); + log.debug(String.format("Service call startContainer() returned in %dms", (endTime - startTime))); + } + return memberContexts; + } catch (CloudControllerServiceUnregisteredCartridgeExceptionException e) { - String message = e.getFaultMessage().getUnregisteredCartridgeException().getMessage(); - log.error(message, e); - throw new SpawningException(message, e); ++ String message = e.getFaultMessage().getUnregisteredCartridgeException().getMessage(); ++ log.error(message, e); ++ throw new SpawningException(message, e); + } catch (RemoteException e) { - log.error(e.getMessage(), e); ++ log.error(e.getMessage(), e); + throw new SpawningException(e.getMessage(), e); - } catch (NonExistingKubernetesGroupException e){ ++ } catch (NonExistingKubernetesGroupException e){ + log.error(e.getMessage(), e); + throw new SpawningException(e.getMessage(), e); + } + } - ++ + public synchronized void terminateAllContainers(String clusterId) throws TerminationException { + try { + if(log.isInfoEnabled()) { + log.info(String.format("Terminating containers via cloud controller: [cluster] %s", clusterId)); + } + long startTime = System.currentTimeMillis(); + stub.terminateAllContainers(clusterId); + if(log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); + log.debug(String.format("Service call terminateContainer() returned in %dms", (endTime - startTime))); + } + } catch (RemoteException e) { - String msg = e.getMessage(); ++ String msg = e.getMessage(); + log.error(msg, e); + throw new TerminationException(msg, e); + } catch (CloudControllerServiceInvalidClusterExceptionException e) { - String msg = e.getFaultMessage().getInvalidClusterException().getMessage(); ++ String msg = e.getFaultMessage().getInvalidClusterException().getMessage(); + log.error(msg, e); + throw new TerminationException(msg, e); - } ++ } + } + + public synchronized MemberContext[] updateContainers(String clusterId, int replicas) - throws SpawningException { ++ throws SpawningException { + try { + log.info(String.format("Updating kubernetes replication controller via cloud controller: " + - "[cluster] %s [replicas] %s", clusterId, replicas)); ++ "[cluster] %s [replicas] %s", clusterId, replicas)); + MemberContext[] memberContexts = stub.updateContainers(clusterId, replicas); + return memberContexts; + } catch (CloudControllerServiceUnregisteredCartridgeExceptionException e) { + String msg = "Error while updating kubernetes controller, cartridge not found for [cluster] " + clusterId; + log.error(msg, e); + throw new SpawningException(msg, e); + } catch (RemoteException e) { + String msg = "Error while updating kubernetes controller, cannot communicate with " + - "cloud controller service"; ++ "cloud controller service"; + log.error(msg, e); + throw new SpawningException(msg, e); - } ++ } + } - ++ + public synchronized void terminateContainer(String memberId) throws TerminationException{ - try { - stub.terminateContainer(memberId); - } catch (RemoteException e) { ++ try { ++ stub.terminateContainer(memberId); ++ } catch (RemoteException e) { + String msg = "Error while updating kubernetes controller, cannot communicate with " + + "cloud controller service"; + log.error(msg, e); + throw new TerminationException(msg, e); - } catch (CloudControllerServiceMemberTerminationFailedExceptionException e) { ++ } catch (CloudControllerServiceMemberTerminationFailedExceptionException e) { + String msg = "Error while terminating container, member not valid for member id : " + memberId; + log.error(msg, e); + throw new TerminationException(msg, e); - } ++ } + } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/89fb37af/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/interfaces/AutoScalerServiceInterface.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/interfaces/AutoScalerServiceInterface.java index 2cc5fb7,74cbb56..16e809d --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/interfaces/AutoScalerServiceInterface.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/interfaces/AutoScalerServiceInterface.java @@@ -19,10 -20,13 +20,15 @@@ package org.apache.stratos.autoscaler.i * */ +package org.apache.stratos.autoscaler.interfaces; + import org.apache.stratos.autoscaler.applications.pojo.ApplicationContext; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; +import org.apache.stratos.autoscaler.exception.*; + import org.apache.stratos.autoscaler.exception.ApplicationDefinitionException; + import org.apache.stratos.autoscaler.exception.InvalidPartitionException; + import org.apache.stratos.autoscaler.exception.InvalidPolicyException; + import org.apache.stratos.autoscaler.exception.NonExistingLBException; import org.apache.stratos.autoscaler.partition.PartitionGroup; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; @@@ -177,16 -83,25 +183,32 @@@ public interface AutoScalerServiceInter * @return true if the service based LB exists in all the network partitions of this policy, * false if a LB couldn't find even in one network partition. */ - public boolean checkServiceLBExistenceAgainstPolicy(String serviceName, String deploymentPolicyId); + public boolean checkServiceLBExistenceAgainstPolicy(String serviceName, String deploymentPolicyId); - public String getDefaultLBClusterId (String deploymentPolicyName); + public String getDefaultLBClusterId(String deploymentPolicyName); - public String getServiceLBClusterId(String serviceType, String deploymentPolicyName); - + public String getServiceLBClusterId (String serviceType, String deploymentPolicyName); + /** + * Dynamically update the properties of an Autoscaling Cluster Monitor + * @param clusterId id of the cluster. + * @param properties updated properties. + */ + void updateClusterMonitor(String clusterId, Properties properties) throws InvalidArgumentException; ++ ++ /** + * deploys an Application Definition + * + * @param applicationContext {@link org.apache.stratos.autoscaler.applications.pojo.ApplicationContext} object + * @throws ApplicationDefinitionException if an error is encountered + */ + public void deployApplicationDefinition (ApplicationContext applicationContext) throws ApplicationDefinitionException; + + /** + * undeploys an Application Definition + * + * @param applicationId Id of the Application to be undeployed + * @throws ApplicationDefinitionException if an error is encountered + */ + public void unDeployApplicationDefinition (String applicationId, int tenantId, String tenantDomain) throws ApplicationDefinitionException; } http://git-wip-us.apache.org/repos/asf/stratos/blob/89fb37af/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java index 4a8b269,a946977..203d6e0 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java @@@ -21,9 -21,9 +21,10 @@@ package org.apache.stratos.autoscaler.i import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; + import org.apache.stratos.autoscaler.applications.ApplicationSynchronizerTaskScheduler; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.exception.AutoScalerException; +import org.apache.stratos.autoscaler.kubernetes.KubernetesManager; import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatEventReceiver; import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyEventReceiver; import org.apache.stratos.autoscaler.partition.PartitionManager; @@@ -32,8 -32,8 +33,9 @@@ import org.apache.stratos.autoscaler.po import org.apache.stratos.autoscaler.registry.RegistryManager; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; +import org.apache.stratos.common.kubernetes.KubernetesGroup; import org.osgi.service.component.ComponentContext; + import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.registry.api.RegistryException; import org.wso2.carbon.registry.core.service.RegistryService; @@@ -105,19 -116,19 +110,27 @@@ public class AutoscalerServerComponent PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy); } + // Adding KubernetesGroups stored in registry to the information model + List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups(); + Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator(); + while (kubernetesGroupIterator.hasNext()) { + KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next(); + KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup); + } + if (log.isInfoEnabled()) { + log.info("Scheduling tasks to publish applications"); + } + + ApplicationSynchronizerTaskScheduler + .schedule(ServiceReferenceHolder.getInstance() + .getTaskService()); + + if (log.isInfoEnabled()) { - log.info("Autoscaler Server Component activated"); + log.info("Autoscaler server Component activated"); } } catch (Throwable e) { - log.error("Error in activating autoscaler component", e); + log.error("Error in activating the autoscaler component ", e); } } @@@ -145,4 -156,18 +158,18 @@@ } ServiceReferenceHolder.getInstance().setRegistry(null); } + + protected void setTaskService(TaskService taskService) { + if (log.isDebugEnabled()) { + log.debug("Setting the Task Service"); + } + ServiceReferenceHolder.getInstance().setTaskService(taskService); + } + + protected void unsetTaskService(TaskService taskService) { + if (log.isDebugEnabled()) { - log.debug("Unsetting the Task Service"); ++ log.debug("Un-setting the Task Service"); + } + ServiceReferenceHolder.getInstance().setTaskService(null); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/89fb37af/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index fedeab9,72dc6e5..f449560 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@@ -19,75 -19,40 +19,96 @@@ package org.apache.stratos.autoscaler.message.receiver.topology; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.NetworkPartitionContext; + import org.apache.stratos.autoscaler.*; + import org.apache.stratos.autoscaler.applications.ApplicationHolder; + import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; + import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.exception.DependencyBuilderException; -import org.apache.stratos.autoscaler.exception.TerminationException; +import org.apache.stratos.autoscaler.exception.PartitionValidationException; +import org.apache.stratos.autoscaler.exception.PolicyValidationException; import org.apache.stratos.autoscaler.exception.TopologyInConsistentException; + import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; + import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.grouping.topic.InstanceNotificationPublisher; +import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher; + import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; + import org.apache.stratos.autoscaler.monitor.ApplicationMonitorFactory; import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; +import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitorFactory; +import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory; +import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; +import org.apache.stratos.autoscaler.monitor.group.GroupMonitor; +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.messaging.domain.topology.Application; +import org.apache.stratos.messaging.domain.topology.ApplicationStatus; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterDataHolder; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.GroupStatus; + import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; + import org.apache.stratos.autoscaler.partition.PartitionManager; + import org.apache.stratos.autoscaler.policy.PolicyManager; + import org.apache.stratos.autoscaler.status.checker.StatusChecker; + import org.apache.stratos.messaging.domain.applications.Application; + import org.apache.stratos.messaging.domain.applications.ApplicationStatus; + import org.apache.stratos.messaging.domain.applications.Applications; + import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; + import org.apache.stratos.messaging.domain.topology.*; import org.apache.stratos.messaging.event.Event; + import org.apache.stratos.messaging.event.topology.*; + import org.apache.stratos.messaging.listener.applications.ApplicationUndeployedEventListener; + import org.apache.stratos.messaging.listener.topology.*; + import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager; +import org.apache.stratos.messaging.event.topology.ApplicationActivatedEvent; +import org.apache.stratos.messaging.event.topology.ApplicationCreatedEvent; +import org.apache.stratos.messaging.event.topology.ApplicationTerminatedEvent; +import org.apache.stratos.messaging.event.topology.ApplicationTerminatingEvent; +import org.apache.stratos.messaging.event.topology.ApplicationUndeployedEvent; +import org.apache.stratos.messaging.event.topology.ClusterActivatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterInactivateEvent; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.event.topology.ClusterTerminatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterTerminatingEvent; +import org.apache.stratos.messaging.event.topology.GroupActivatedEvent; +import org.apache.stratos.messaging.event.topology.GroupInactivateEvent; +import org.apache.stratos.messaging.event.topology.GroupTerminatedEvent; +import org.apache.stratos.messaging.event.topology.GroupTerminatingEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; +import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.ApplicationActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.ApplicationCreatedEventListener; +import org.apache.stratos.messaging.listener.topology.ApplicationTerminatedEventListener; +import org.apache.stratos.messaging.listener.topology.ApplicationTerminatingEventListener; +import org.apache.stratos.messaging.listener.topology.ApplicationUndeployedEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterInActivateEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterTerminatedEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterTerminatingEventListener; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.GroupActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.GroupInActivateEventListener; +import org.apache.stratos.messaging.listener.topology.GroupTerminatedEventListener; +import org.apache.stratos.messaging.listener.topology.GroupTerminatingEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberMaintenanceListener; +import org.apache.stratos.messaging.listener.topology.MemberReadyToShutdownEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -import java.util.List; -import java.util.Set; +import org.drools.runtime.StatefulKnowledgeSession; +import org.drools.runtime.rule.FactHandle; /** * Autoscaler topology receiver. @@@ -135,16 -140,23 +196,22 @@@ public class AutoscalerTopologyEventRec topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { @Override protected void onEvent(Event event) { - if (!topologyInitialized) { log.info("[CompleteTopologyEvent] Received: " + event.getClass()); - - TopologyManager.acquireReadLock(); + ApplicationHolder.acquireReadLock(); try { - for (Application application : TopologyManager.getTopology().getApplications()) { - startApplicationMonitor(application.getUniqueIdentifier()); + Applications applications = ApplicationHolder.getApplications(); + if(applications != null) { + for (Application application : applications.getApplications().values()) { + if(allClustersInitialized(application)) { + startApplicationMonitor(application.getUniqueIdentifier()); + } else { + log.error("Complete Topology is not consistent with the applications " + + "which got persisted"); + } + } + topologyInitialized = true; } - - topologyInitialized = true; } catch (Exception e) { log.error("Error processing event", e); } finally { @@@ -209,11 -222,12 +277,12 @@@ log.info("[ClusterCreatedEvent] Received: " + event.getClass()); ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event; - String clusterId = clusterCreatedEvent.getClusterId(); + String clusterId = clusterCreatedEvent.getCluster().getClusterId(); AbstractClusterMonitor clusterMonitor = - (AbstractClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); + AutoscalerContext.getInstance().getClusterMonitor(clusterId); //changing the status in the monitor, will notify its parent monitor + clusterMonitor.setStop(true); clusterMonitor.setStatus(ClusterStatus.Created); } @@@ -617,106 -935,56 +989,69 @@@ TopologyManager.releaseReadLockForCluster(memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId()); } - } + } }); - + topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { - @Override - protected void onEvent(Event event) { - MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; - //TopologyManager.acquireReadLock(); - TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), - memberMaintenanceModeEvent.getClusterId()); - try { - String clusterId = memberMaintenanceModeEvent.getClusterId(); - AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - monitor = asCtx.getClusterMonitor(clusterId); - if (null == monitor) { - if (log.isDebugEnabled()) { - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - monitor.handleMemberMaintenanceModeEvent(memberMaintenanceModeEvent); - } catch (Exception e) { - String msg = "Error processing event " + e.getLocalizedMessage(); - log.error(msg, e); - } finally { - TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), - memberMaintenanceModeEvent.getClusterId()); - } - } - }); - } + @Override + protected void onEvent(Event event) { - private class ClusterMonitorAdder implements Runnable { - private Cluster cluster; + MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; - public ClusterMonitorAdder(Cluster cluster) { - this.cluster = cluster; - } + //TopologyManager.acquireReadLock(); + TopologyManager.acquireReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId()); - public void run() { - AbstractClusterMonitor monitor = null; - int retries = 5; - boolean success = false; - do { try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - try { - monitor = ClusterMonitorFactory.getMonitor(cluster); - success = true; - } catch (PolicyValidationException e) { - if (log.isDebugEnabled()) { - String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.debug(msg, e); + String memberId = memberMaintenanceModeEvent.getMemberId(); + String partitionId = memberMaintenanceModeEvent.getPartitionId(); + String networkPartitionId = memberMaintenanceModeEvent.getNetworkPartitionId(); + + PartitionContext partitionContext; + String clusterId = memberMaintenanceModeEvent.getClusterId(); + AbstractClusterMonitor monitor; + + if (AutoscalerContext.getInstance().monitorExist(clusterId)) { + monitor = (AbstractClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId); + partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId). + getPartitionCtxt(partitionId); + } else { + monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId); + partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId). + getPartitionCtxt(partitionId); } - retries--; - } catch (PartitionValidationException e) { + partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); if (log.isDebugEnabled()) { - String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.debug(msg, e); + log.debug(String.format("Member has been moved as pending termination: " + + "[member] %s", memberId)); } - retries--; - } - } while (!success && retries != 0); + partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); - if (monitor == null) { - String msg = "Cluster monitor creation failed, even after retrying for 5 times, " - + "for cluster: " + cluster.getClusterId(); - log.error(msg); - throw new RuntimeException(msg); - } - - // Thread th = new Thread(monitor); - // th.start(); - monitor.startScheduler(); - AutoscalerContext.getInstance().addClusterMonitor(monitor); - if (log.isInfoEnabled()) { - log.info(String.format("Cluster monitor has been added successfully: [cluster] %s", - cluster.getClusterId())); + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + //TopologyManager.releaseReadLock(); + TopologyManager.releaseReadLockForCluster(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId()); + } } - } + }); } + @SuppressWarnings("unused") + private void runTerminateAllRule(VMClusterMonitor monitor) { + + FactHandle terminateAllFactHandle = null; + + StatefulKnowledgeSession terminateAllKnowledgeSession = null; + + for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) { + terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll(terminateAllKnowledgeSession + , terminateAllFactHandle, networkPartitionContext); + } + + } /** * Terminate load balancer topology receiver thread. http://git-wip-us.apache.org/repos/asf/stratos/blob/89fb37af/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java index 0000000,ac0871d..fb23985 mode 000000,100644..100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java @@@ -1,0 -1,287 +1,277 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.stratos.autoscaler.monitor; + + import org.apache.commons.configuration.XMLConfiguration; + import org.apache.commons.logging.Log; + import org.apache.commons.logging.LogFactory; + import org.apache.stratos.autoscaler.NetworkPartitionContext; + import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; + import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent; + import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; + import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent; + import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; + import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; + import org.apache.stratos.autoscaler.util.AutoScalerConstants; + import org.apache.stratos.autoscaler.util.ConfUtil; + import org.apache.stratos.messaging.domain.topology.*; + import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + import org.drools.runtime.StatefulKnowledgeSession; + import org.drools.runtime.rule.FactHandle; + + import java.util.Map; + + /** + * Is responsible for monitoring a service cluster. This runs periodically + * and perform minimum instance check and scaling check using the underlying + * rules engine. + */ + abstract public class AbstractClusterMonitor extends Monitor implements Runnable { + + private static final Log log = LogFactory.getLog(AbstractClusterMonitor.class); + // Map<NetworkpartitionId, Network Partition Context> + protected Map<String, NetworkPartitionContext> networkPartitionCtxts; + protected DeploymentPolicy deploymentPolicy; + protected AutoscalePolicy autoscalePolicy; + + + protected FactHandle minCheckFactHandle; + protected FactHandle scaleCheckFactHandle; + protected FactHandle terminateDependencyFactHandle; + protected FactHandle terminateAllFactHandle; + + protected StatefulKnowledgeSession minCheckKnowledgeSession; + protected StatefulKnowledgeSession scaleCheckKnowledgeSession; + protected StatefulKnowledgeSession terminateAllKnowledgeSession; + protected boolean isDestroyed; + + protected String clusterId; + protected String serviceId; ++ protected String appId; + + protected boolean hasFaultyMember = false; + - protected boolean stop = false; - + protected ClusterStatus status; + + //protected ParentComponentMonitor parent; + + protected AutoscalerRuleEvaluator autoscalerRuleEvaluator; + + // time intereval between two runs of the Monitor. Default is 90000ms. + protected int monitorInterval; + + public AbstractClusterMonitor() { + readConfigurations(); + status = ClusterStatus.Created; + } + + private void readConfigurations() { + + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000); + if (log.isDebugEnabled()) { + log.debug("Cluster Monitor task interval: " + getMonitorInterval()); + } + } + + @Override + public void run() { + // TODO Auto-generated method stub + } + + + public NetworkPartitionContext getNetworkPartitionCtxt(Member member) { + log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId()); + String networkPartitionId = member.getNetworkPartitionId(); + if (networkPartitionCtxts.containsKey(networkPartitionId)) { + log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId)); + return networkPartitionCtxts.get(networkPartitionId); + } + log.info("returning null getNetworkPartitionCtxt"); + return null; + } + + public String getPartitionOfMember(String memberId) { + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.memberExists(memberId)) { + return cluster.getMember(memberId).getPartitionId(); + } + } + } + return null; + } + + public void destroy() { + minCheckKnowledgeSession.dispose(); + scaleCheckKnowledgeSession.dispose(); + terminateAllKnowledgeSession.dispose(); + setDestroyed(true); + if (log.isDebugEnabled()) { + log.debug("Cluster Monitor Drools session has been disposed. " + this.toString()); + } + } + + public abstract void terminateAllMembers (); + + public boolean isDestroyed() { + return isDestroyed; + } + + public void setDestroyed(boolean isDestroyed) { + this.isDestroyed = isDestroyed; + } + + public String getServiceId() { + return serviceId; + } + + public void setServiceId(String serviceId) { + this.serviceId = serviceId; + } + + public DeploymentPolicy getDeploymentPolicy() { + return deploymentPolicy; + } + + public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) { + this.deploymentPolicy = deploymentPolicy; + } + + public AutoscalePolicy getAutoscalePolicy() { + return autoscalePolicy; + } + + public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) { + this.autoscalePolicy = autoscalePolicy; + } + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() { + return networkPartitionCtxts; + } + + public NetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) { + return networkPartitionCtxts.get(networkPartitionId); + } + + public void setPartitionCtxt(Map<String, NetworkPartitionContext> partitionCtxt) { + this.networkPartitionCtxts = partitionCtxt; + } + + public boolean partitionCtxtAvailable(String partitionId) { + return networkPartitionCtxts.containsKey(partitionId); + } + + public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) { + this.networkPartitionCtxts.put(ctxt.getId(), ctxt); + } + + public NetworkPartitionContext getPartitionCtxt(String id) { + return this.networkPartitionCtxts.get(id); + } + + public StatefulKnowledgeSession getMinCheckKnowledgeSession() { + return minCheckKnowledgeSession; + } + + public void setMinCheckKnowledgeSession(StatefulKnowledgeSession minCheckKnowledgeSession) { + this.minCheckKnowledgeSession = minCheckKnowledgeSession; + } + + public FactHandle getMinCheckFactHandle() { + return minCheckFactHandle; + } + + public void setMinCheckFactHandle(FactHandle minCheckFactHandle) { + this.minCheckFactHandle = minCheckFactHandle; + } + + + public int getMonitorInterval() { + return monitorInterval; + } + + public ClusterStatus getStatus() { + return status; + } + + public void setStatus(ClusterStatus status) { + + //if(this.status != status) { + this.status = status; + /** + * notifying the parent monitor about the state change + * If the cluster in_active and if it is a in_dependent cluster, + * then won't send the notification to parent. + */ + if (status == ClusterStatus.Inactive && !this.hasDependent) { + log.info("[Cluster] " + clusterId + "is not notifying the parent, " + + "since it is identified as the independent unit"); + - /*} else if (status == ClusterStatus.Terminating) { ++ } else if (status == ClusterStatus.Terminating) { + // notify parent + log.info("[Cluster] " + clusterId + " is not notifying the parent, " + + "since it is in Terminating State"); -*/ ++ + } else { - log.info("[Group] " + this.id + "is notifying the [parent] " + this.parent.getId()); + MonitorStatusEventBuilder.handleClusterStatusEvent(this.parent, this.status, this.clusterId); + } + //} + + } + + @Override + public void onChildEvent(MonitorStatusEvent statusEvent) { + + } + + @Override + public void onEvent(MonitorTerminateAllEvent terminateAllEvent) { + + } + + @Override + public void onEvent(MonitorScalingEvent scalingEvent) { + + } + + public boolean isHasFaultyMember() { + return hasFaultyMember; + } + + public void setHasFaultyMember(boolean hasFaultyMember) { + this.hasFaultyMember = hasFaultyMember; + } - - public boolean isStop() { - return stop; - } - - public void setStop(boolean stop) { - this.stop = stop; - } + } http://git-wip-us.apache.org/repos/asf/stratos/blob/89fb37af/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/89fb37af/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java index bc23dd4,0000000..7f3de58 mode 100644,000000..100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/application/ApplicationMonitorFactory.java @@@ -1,225 -1,0 +1,246 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.monitor.application; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; ++import org.apache.stratos.autoscaler.AutoscalerContext; ++import org.apache.stratos.autoscaler.MemberStatsContext; ++import org.apache.stratos.autoscaler.NetworkPartitionContext; ++import org.apache.stratos.autoscaler.PartitionContext; ++import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; ++import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; +import org.apache.stratos.autoscaler.exception.DependencyBuilderException; +import org.apache.stratos.autoscaler.exception.PartitionValidationException; +import org.apache.stratos.autoscaler.exception.PolicyValidationException; +import org.apache.stratos.autoscaler.exception.TopologyInConsistentException; ++import org.apache.stratos.autoscaler.applications.dependency.context.ApplicationContext; ++import org.apache.stratos.autoscaler.applications.dependency.context.ClusterContext; ++import org.apache.stratos.autoscaler.applications.dependency.context.GroupContext; ++import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor; ++import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; +import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationContext; +import org.apache.stratos.autoscaler.grouping.dependency.context.ClusterContext; +import org.apache.stratos.autoscaler.grouping.dependency.context.GroupContext; +import org.apache.stratos.autoscaler.monitor.Monitor; +import org.apache.stratos.autoscaler.monitor.ParentComponentMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory; +import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; +import org.apache.stratos.autoscaler.monitor.group.GroupMonitor; ++import org.apache.stratos.autoscaler.partition.PartitionGroup; ++import org.apache.stratos.autoscaler.policy.PolicyManager; ++import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; ++import org.apache.stratos.autoscaler.status.checker.StatusChecker; ++import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; ++import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.cloud.controller.stub.pojo.Properties; +import org.apache.stratos.cloud.controller.stub.pojo.Property; ++import org.apache.stratos.messaging.domain.applications.Application; ++import org.apache.stratos.messaging.domain.applications.Group; +import org.apache.stratos.messaging.domain.topology.*; ++import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; ++import org.apache.stratos.messaging.util.Constants; + +import java.util.Map; + +/** + * Factory class to get the Monitors. + */ +public class ApplicationMonitorFactory { + private static final Log log = LogFactory.getLog(ApplicationMonitorFactory.class); + + /** + * Factor method used to create relevant monitors based on the given context + * + * @param context Application/Group/Cluster context + * @param appId appId of the application which requires to create app monitor + * @param parentMonitor parent of the monitor + * @return Monitor which can be ApplicationMonitor/GroupMonitor/ClusterMonitor + * @throws TopologyInConsistentException throws while traversing thr topology + * @throws DependencyBuilderException throws while building dependency for app monitor + * @throws PolicyValidationException throws while validating the policy associated with cluster + * @throws PartitionValidationException throws while validating the partition used in a cluster + */ + public static Monitor getMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId) + throws TopologyInConsistentException, + DependencyBuilderException, PolicyValidationException, PartitionValidationException { + Monitor monitor; + + if (context instanceof GroupContext) { + monitor = getGroupMonitor(parentMonitor, context, appId); + } else if (context instanceof ClusterContext) { + monitor = getClusterMonitor(parentMonitor, (ClusterContext) context, appId); + //Start the thread + Thread th = new Thread((AbstractClusterMonitor) monitor); + th.start(); + } else { + monitor = getApplicationMonitor(appId); + } + return monitor; + } + + /** + * This will create the GroupMonitor based on given groupId by going thr Topology + * + * @param parentMonitor parent of the monitor + * @param context groupId of the group + * @param appId appId of the relevant application + * @return Group monitor + * @throws DependencyBuilderException throws while building dependency for app monitor + * @throws TopologyInConsistentException throws while traversing thr topology + */ + public static Monitor getGroupMonitor(ParentComponentMonitor parentMonitor, ApplicationContext context, String appId) + throws DependencyBuilderException, + TopologyInConsistentException { + GroupMonitor groupMonitor; - TopologyManager.acquireReadLockForApplication(appId); ++ ApplicationManager.acquireReadLockForApplication(appId); + + try { - Group group = TopologyManager.getTopology().getApplication(appId).getGroupRecursively(context.getId()); ++ Group group = ApplicationManager.getApplications().getApplication(appId).getGroupRecursively(context.getId()); + groupMonitor = new GroupMonitor(group, appId); + groupMonitor.setAppId(appId); + if(parentMonitor != null) { + groupMonitor.setParent(parentMonitor); + //Setting the dependent behaviour of the monitor + if(parentMonitor.isDependent() || (context.isDependent() && context.hasChild())) { + groupMonitor.setHasDependent(true); + } else { + groupMonitor.setHasDependent(false); + } + //TODO make sure when it is async + + if (group.getStatus() != groupMonitor.getStatus()) { + //updating the status, if the group is not in created state when creating group Monitor + //so that groupMonitor will notify the parent (useful when restarting stratos) + groupMonitor.setStatus(group.getStatus()); + } + } + + } finally { - TopologyManager.releaseReadLockForApplication(appId); ++ ApplicationManager.releaseReadLockForApplication(appId); + + } + return groupMonitor; + + } + + /** + * This will create a new app monitor based on the give appId by getting the + * application from Topology + * + * @param appId appId of the application which requires to create app monitor + * @return ApplicationMonitor + * @throws DependencyBuilderException throws while building dependency for app monitor + * @throws TopologyInConsistentException throws while traversing thr topology + */ + public static ApplicationMonitor getApplicationMonitor(String appId) + throws DependencyBuilderException, + TopologyInConsistentException { + ApplicationMonitor applicationMonitor; - TopologyManager.acquireReadLockForApplication(appId); ++ ApplicationManager.acquireReadLockForApplication(appId); + try { - Application application = TopologyManager.getTopology().getApplication(appId); ++ Application application = ApplicationManager.getApplications().getApplication(appId); + if (application != null) { + applicationMonitor = new ApplicationMonitor(application); + applicationMonitor.setHasDependent(false); + + } else { + String msg = "[Application] " + appId + " cannot be found in the Topology"; + throw new TopologyInConsistentException(msg); + } + } finally { - TopologyManager.releaseReadLockForApplication(appId); ++ ApplicationManager.releaseReadLockForApplication(appId); + } + + return applicationMonitor; + + } + + /** + * Updates ClusterContext for given cluster + * + * @param parentMonitor parent of the monitor + * @param context + * @return ClusterMonitor - Updated ClusterContext + * @throws org.apache.stratos.autoscaler.exception.PolicyValidationException + * @throws org.apache.stratos.autoscaler.exception.PartitionValidationException + */ + public static VMClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor, + ClusterContext context, String appId) + throws PolicyValidationException, + PartitionValidationException, + TopologyInConsistentException { + //Retrieving the Cluster from Topology + String clusterId = context.getId(); + String serviceName = context.getServiceName(); + + Cluster cluster; + AbstractClusterMonitor clusterMonitor; + //acquire read lock for the service and cluster + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + try { + Topology topology = TopologyManager.getTopology(); + if (topology.serviceExists(serviceName)) { + Service service = topology.getService(serviceName); + if (service.clusterExists(clusterId)) { + cluster = service.getCluster(clusterId); + if (log.isDebugEnabled()) { + log.debug("Dependency check starting the [cluster]" + clusterId); + } + // startClusterMonitor(this, cluster); + //context.setCurrentStatus(Status.Created); + } else { + String msg = "[Cluster] " + clusterId + " cannot be found in the " + + "Topology for [service] " + serviceName; + throw new TopologyInConsistentException(msg); + } + } else { + String msg = "[Service] " + serviceName + " cannot be found in the Topology"; + throw new TopologyInConsistentException(msg); + + } + + + clusterMonitor = ClusterMonitorFactory.getMonitor(cluster); + if (clusterMonitor instanceof VMClusterMonitor) { + return (VMClusterMonitor) clusterMonitor; + } else if (clusterMonitor != null) { + log.warn("Unknown cluster monitor found: " + clusterMonitor.getClass().toString()); + } + return null; + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); + } + } + + + private static Properties convertMemberPropsToMemberContextProps( + java.util.Properties properties) { + Properties props = new Properties(); + for (Map.Entry<Object, Object> e : properties.entrySet()) { + Property prop = new Property(); + prop.setName((String) e.getKey()); + prop.setValue((String) e.getValue()); + props.addProperties(prop); + } + return props; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/89fb37af/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java ---------------------------------------------------------------------- diff --cc components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java index 9d0f134,0000000..dc97dc7 mode 100644,000000..100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java @@@ -1,235 -1,0 +1,326 @@@ +/* - * Licensed to the Apache Software Foundation (ASF) under one ++ * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at - * ++ * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.monitor.cluster; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.PartitionContext; +import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; +import org.apache.stratos.autoscaler.exception.InvalidArgumentException; ++import org.apache.stratos.autoscaler.exception.TerminationException; ++import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; ++import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; ++import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.autoscaler.util.AutoScalerConstants; +import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.cloud.controller.stub.pojo.Properties; +import org.apache.stratos.cloud.controller.stub.pojo.Property; ++import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; ++import org.apache.stratos.messaging.domain.applications.GroupStatus; + +/** + * Is responsible for monitoring a service cluster. This runs periodically + * and perform minimum instance check and scaling check using the underlying + * rules engine. + */ +public class VMServiceClusterMonitor extends VMClusterMonitor { + + private static final Log log = LogFactory.getLog(VMServiceClusterMonitor.class); + private String lbReferenceType; + private boolean hasPrimary; + + public VMServiceClusterMonitor(String clusterId, String serviceId, + DeploymentPolicy deploymentPolicy, + AutoscalePolicy autoscalePolicy) { + super(clusterId, serviceId, + new AutoscalerRuleEvaluator(StratosConstants.VM_MIN_CHECK_DROOL_FILE, + StratosConstants.VM_SCALE_CHECK_DROOL_FILE), + deploymentPolicy, autoscalePolicy, + new ConcurrentHashMap<String, NetworkPartitionContext>()); + readConfigurations(); + } + ++ private static void terminateMember(String memberId) { ++ try { ++ CloudControllerClient.getInstance().terminate(memberId); ++ ++ } catch (TerminationException e) { ++ log.error("Unable to terminate member [member id ] " + memberId, e); ++ } ++ } ++ + @Override + public void run() { + while (!isDestroyed()) { + try { + if ((this.status.getCode() <= ClusterStatus.Active.getCode()) || + (this.status == ClusterStatus.Inactive && !hasDependent) || + !this.hasFaultyMember) { + if (log.isDebugEnabled()) { + log.debug("Cluster monitor is running.. " + this.toString()); + } + monitor(); + } else { + if (log.isDebugEnabled()) { + log.debug("Cluster monitor is suspended as the cluster is in " + + ClusterStatus.Inactive + " mode......"); + } + } + } catch (Exception e) { + log.error("Cluster monitor: Monitor failed." + this.toString(), e); + } + try { - Thread.sleep(getMonitorIntervalMilliseconds()); ++ Thread.sleep(monitorInterval); + } catch (InterruptedException ignore) { + } + } ++ ++ + } + + @Override - protected void monitor() { ++ public void terminateAllMembers() { ++ ++ Thread memberTerminator = new Thread(new Runnable() { ++ public void run() { ++ ++ for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) { ++ for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { ++ //if (log.isDebugEnabled()) { ++ log.info("Starting to terminate all members in Network Partition [ " + ++ networkPartitionContext.getId() + " ], Partition [ " + ++ partitionContext.getPartitionId() + " ]"); ++ // } ++ // need to terminate active, pending and obsolete members ++ ++ // active members ++ for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) { ++ log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId()); ++ terminateMember(activeMemberCtxt.getMemberId()); ++ } ++ ++ // pending members ++ for (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) { ++ log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId()); ++ terminateMember(pendingMemberCtxt.getMemberId()); ++ } ++ ++ // obsolete members ++ for (String obsoleteMemberId : partitionContext.getObsoletedMembers()) { ++ log.info("Terminating obsolete member [member id] " + obsoleteMemberId); ++ terminateMember(obsoleteMemberId); ++ } ++ ++// terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll ++// (terminateAllKnowledgeSession, terminateAllFactHandle, partitionContext); ++ } ++ } ++ } ++ }, "Member Terminator - [cluster id] " + this.clusterId); ++ ++ memberTerminator.start(); ++ } ++ ++ private boolean isPrimaryMember(MemberContext memberContext) { ++ Properties props = memberContext.getProperties(); ++ if (log.isDebugEnabled()) { ++ log.debug(" Properties [" + props + "] "); ++ } ++ if (props != null && props.getProperties() != null) { ++ for (Property prop : props.getProperties()) { ++ if (prop.getName().equals("PRIMARY")) { ++ if (Boolean.parseBoolean(prop.getValue())) { ++ log.debug("Adding member id [" + memberContext.getMemberId() + "] " + ++ "member instance id [" + memberContext.getInstanceId() + "] as a primary member"); ++ return true; ++ } ++ } ++ } ++ } ++ return false; ++ } + ++ public void monitor() { + //TODO make this concurrent ++ + for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) { + // store primary members in the network partition context + List<String> primaryMemberListInNetworkPartition = new ArrayList<String>(); - + //minimum check per partition + for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { + // store primary members in the partition context + List<String> primaryMemberListInPartition = new ArrayList<String>(); + // get active primary members in this partition context + for (MemberContext memberContext : partitionContext.getActiveMembers()) { + if (isPrimaryMember(memberContext)) { + primaryMemberListInPartition.add(memberContext.getMemberId()); + } + } ++ + // get pending primary members in this partition context + for (MemberContext memberContext : partitionContext.getPendingMembers()) { + if (isPrimaryMember(memberContext)) { + primaryMemberListInPartition.add(memberContext.getMemberId()); + } + } + primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition); - getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); - getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType); - getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary); - getMinCheckKnowledgeSession().setGlobal("primaryMemberCount", primaryMemberListInPartition.size()); ++ minCheckKnowledgeSession.setGlobal("clusterId", clusterId); ++ minCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType); ++ minCheckKnowledgeSession.setGlobal("isPrimary", hasPrimary); ++ + + if (log.isDebugEnabled()) { + log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId())); + } + - minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession() ++ minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession + , minCheckFactHandle, partitionContext); + ++ //checking the status of the cluster ++ ++ + } + - boolean rifReset = networkPartitionContext.isRifReset(); ++ /*boolean rifReset = networkPartitionContext.isRifReset(); + boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset(); + boolean loadAverageReset = networkPartitionContext.isLoadAverageReset(); ++ + if (log.isDebugEnabled()) { + log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset - + " flag of loadAverageReset" + loadAverageReset); ++ + " flag of loadAverageReset" + loadAverageReset); + } + if (rifReset || memoryConsumptionReset || loadAverageReset) { - getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); ++ ++ scaleCheckKnowledgeSession.setGlobal("clusterId", clusterId); + //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy); - getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy); - getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset); - getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset); - getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset); - getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType); - getScaleCheckKnowledgeSession().setGlobal("isPrimary", false); - getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition); ++ scaleCheckKnowledgeSession.setGlobal("autoscalePolicy", autoscalePolicy); ++ scaleCheckKnowledgeSession.setGlobal("rifReset", rifReset); ++ scaleCheckKnowledgeSession.setGlobal("mcReset", memoryConsumptionReset); ++ scaleCheckKnowledgeSession.setGlobal("laReset", loadAverageReset); ++ scaleCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType); ++ scaleCheckKnowledgeSession.setGlobal("isPrimary", false); ++ scaleCheckKnowledgeSession.setGlobal("primaryMembers", primaryMemberListInNetworkPartition); + + if (log.isDebugEnabled()) { + log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId())); + log.debug(" Primary members : " + primaryMemberListInNetworkPartition); + } + - scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession() ++ scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(scaleCheckKnowledgeSession + , scaleCheckFactHandle, networkPartitionContext); + + networkPartitionContext.setRifReset(false); + networkPartitionContext.setMemoryConsumptionReset(false); + networkPartitionContext.setLoadAverageReset(false); + } else if (log.isDebugEnabled()) { + log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " + - "cycle for network partition %s", networkPartitionContext.getId())); - } ++ "cycle for network partition %s", networkPartitionContext.getId())); ++ }*/ + } + } + + private boolean isPrimaryMember(MemberContext memberContext) { + Properties props = memberContext.getProperties(); + if (log.isDebugEnabled()) { + log.debug(" Properties [" + props + "] "); + } + if (props != null && props.getProperties() != null) { + for (Property prop : props.getProperties()) { + if (prop.getName().equals("PRIMARY")) { + if (Boolean.parseBoolean(prop.getValue())) { + log.debug("Adding member id [" + memberContext.getMemberId() + "] " + + "member instance id [" + memberContext.getInstanceId() + "] as a primary member"); + return true; + } + } + } + } + return false; + } + + @Override + protected void readConfigurations() { + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000); + setMonitorIntervalMilliseconds(monitorInterval); + if (log.isDebugEnabled()) { + log.debug("VMServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds()); + } + } + + @Override + public void destroy() { + getMinCheckKnowledgeSession().dispose(); + getScaleCheckKnowledgeSession().dispose(); + setDestroyed(true); + stopScheduler(); + if (log.isDebugEnabled()) { + log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString()); + } + } + + @Override + public String toString() { + return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() + + ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy + + ", lbReferenceType=" + lbReferenceType + + ", hasPrimary=" + hasPrimary + " ]"; + } + + public String getLbReferenceType() { + return lbReferenceType; + } + + public void setLbReferenceType(String lbReferenceType) { + this.lbReferenceType = lbReferenceType; + } + + public boolean isHasPrimary() { + return hasPrimary; + } + + public void setHasPrimary(boolean hasPrimary) { + this.hasPrimary = hasPrimary; + } + + @Override - public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException { - // TODO - ++ public void onChildEvent(MonitorStatusEvent statusEvent) { ++ ++ } ++ ++ @Override ++ public void onParentEvent(MonitorStatusEvent statusEvent) { ++ // send the ClusterTerminating event ++ if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() == ++ ApplicationStatus.Terminating) { ++ ClusterStatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId); ++ } + } +}
