http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index b8dcd73..6525eba 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -19,34 +19,54 @@ package org.apache.stratos.autoscaler.message.receiver.topology; +import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.*; +import org.apache.stratos.autoscaler.AutoscalerContext; +import org.apache.stratos.autoscaler.KubernetesClusterContext; +import org.apache.stratos.autoscaler.MemberStatsContext; +import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; +import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.exception.PartitionValidationException; import org.apache.stratos.autoscaler.exception.PolicyValidationException; import org.apache.stratos.autoscaler.exception.TerminationException; -import org.apache.stratos.autoscaler.monitor.AbstractMonitor; -import org.apache.stratos.autoscaler.monitor.ClusterMonitor; -import org.apache.stratos.autoscaler.monitor.KubernetesClusterMonitor; -import org.apache.stratos.autoscaler.monitor.LbClusterMonitor; +import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; +import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory; +import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor; +import org.apache.stratos.autoscaler.monitor.VMClusterMonitor; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.autoscaler.util.AutoscalerUtil; +import org.apache.stratos.common.enums.ClusterType; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.topology.*; -import org.apache.stratos.messaging.listener.topology.*; +import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent; +import org.apache.stratos.messaging.event.topology.ClusterMaintenanceModeEvent; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; +import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterMaintenanceModeEventListener; +import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener; +import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; +import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberMaintenanceListener; +import org.apache.stratos.messaging.listener.topology.MemberReadyToShutdownEventListener; +import org.apache.stratos.messaging.listener.topology.MemberStartedEventListener; +import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener; +import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.drools.runtime.StatefulKnowledgeSession; import org.drools.runtime.rule.FactHandle; -import java.util.List; - /** * Autoscaler topology receiver. */ @@ -116,42 +136,60 @@ public class AutoscalerTopologyEventReceiver implements Runnable { try { MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event; AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; String clusterId = memberReadyToShutdownEvent.getClusterId(); String memberId = memberReadyToShutdownEvent.getMemberId(); - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)) { + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - - NetworkPartitionContext nwPartitionCtxt; - nwPartitionCtxt = monitor.getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId()); - - // start a new member in the same Partition - String partitionId = monitor.getPartitionOfMember(memberId); - PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); - - - // terminate the shutdown ready member - CloudControllerClient ccClient = CloudControllerClient.getInstance(); - ccClient.terminate(memberId); - - // remove from active member list - partitionCtxt.removeActiveMemberById(memberId); - - if (log.isInfoEnabled()) { - log.info(String.format("Member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ", - memberId, partitionId, clusterId)); + + TopologyManager.acquireReadLock(); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster) { + + NetworkPartitionContext nwPartitionCtxt; + nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId()); + + // start a new member in the same Partition + String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId); + PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + + + // terminate the shutdown ready member + CloudControllerClient ccClient = CloudControllerClient.getInstance(); + ccClient.terminate(memberId); + + // remove from active member list + partitionCtxt.removeActiveMemberById(memberId); + + if (log.isInfoEnabled()) { + log.info(String.format("Member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ", + memberId, partitionId, clusterId)); + } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt(); + //terminate the shutdown ready container + CloudControllerClient.getInstance().terminateContainer(memberId); + //remove from active member list + kubernetesClusterContext.removeActiveMemberById(memberId); + + if (log.isInfoEnabled()) { + log.info(String.format("Member is terminated and removed from the active members list: [member] %s [kub cluster] %s [cluster] %s ", + memberId, kubernetesClusterContext.getKubernetesClusterID(), clusterId)); + } } + } catch (TerminationException e) { log.error(e); + } finally { + TopologyManager.releaseReadLock(); } } @@ -185,12 +223,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable { TopologyManager.acquireReadLock(); Service service = TopologyManager.getTopology().getService(e.getServiceName()); Cluster cluster = service.getCluster(e.getClusterId()); - if(AutoscalerContext.getInstance().kubernetesClusterMonitorExist(cluster.getClusterId())) { - AutoscalerContext.getInstance().getKubernetesClusterMonitor(e.getClusterId()).setStatus(e.getStatus()); - } else if(AutoscalerContext.getInstance().monitorExist((cluster.getClusterId()))) { - AutoscalerContext.getInstance().getMonitor(e.getClusterId()).setStatus(e.getStatus()); - } else if (AutoscalerContext.getInstance().lbMonitorExist((cluster.getClusterId()))) { - AutoscalerContext.getInstance().getLBMonitor(e.getClusterId()).setStatus(e.getStatus()); + if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) { + AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus()); } else { log.error("cluster monitor not exists for the cluster: " + cluster.toString()); } @@ -213,8 +247,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { String clusterId = e.getClusterId(); String deploymentPolicy = e.getDeploymentPolicy(); - AbstractMonitor monitor = null; - KubernetesClusterMonitor kubernetesClusterMonitor = null; + AbstractClusterMonitor monitor = null; if (e.isLbCluster()) { DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy); @@ -239,13 +272,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } } - monitor = AutoscalerContext.getInstance() - .removeLbMonitor(clusterId); - - } else { - monitor = AutoscalerContext.getInstance() - .removeMonitor(clusterId); } + + monitor = AutoscalerContext.getInstance().removeClusterMonitor(clusterId); // runTerminateAllRule(monitor); if (monitor != null) { @@ -280,43 +309,73 @@ public class AutoscalerTopologyEventReceiver implements Runnable { String networkPartitionId = e.getNetworkPartitionId(); String clusterId = e.getClusterId(); String partitionId = e.getPartitionId(); - AbstractMonitor monitor; + String memberId = e.getMemberId(); + AbstractClusterMonitor monitor; + + AutoscalerContext asCtx = AutoscalerContext.getInstance(); - if (AutoscalerContext.getInstance().monitorExist(clusterId)) { - monitor = AutoscalerContext.getInstance().getMonitor(clusterId); + if(asCtx.clusterMonitorExist(clusterId)) { + monitor = asCtx.getClusterMonitor(clusterId); } else { - //This is LB member - monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId); + if(log.isDebugEnabled()){ + log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); + } + return; } + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster) { + + NetworkPartitionContext networkPartitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); + + PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId); + partitionContext.removeMemberStatsContext(memberId); + + if (partitionContext.removeTerminationPendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from termination pending members list: [member] %s", memberId)); + } + } else if (partitionContext.removePendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from pending members list: [member] %s", memberId)); + } + } else if (partitionContext.removeActiveMemberById(memberId)) { + log.warn(String.format("Member is in the wrong list and it is removed from active members list", memberId)); + } else if (partitionContext.removeObsoleteMember(memberId)){ + log.warn(String.format("Member's obsolated timeout has been expired and it is removed from obsolated members list", memberId)); + } else { + log.warn(String.format("Member is not available in any of the list active, pending and termination pending", memberId)); + } - NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); - - PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId); - String memberId = e.getMemberId(); - partitionContext.removeMemberStatsContext(memberId); - - if (partitionContext.removeTerminationPendingMember(memberId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is removed from termination pending members list: [member] %s", memberId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been removed successfully: [member] %s", memberId)); } - } else if (partitionContext.removePendingMember(memberId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is removed from pending members list: [member] %s", memberId)); + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + + KubernetesClusterContext kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt(); + kubernetesClusterContext.removeMemberStatsContext(memberId); + + if (kubernetesClusterContext.removeTerminationPendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from termination pending members list: [member] %s", memberId)); + } + } else if (kubernetesClusterContext.removePendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from pending members list: [member] %s", memberId)); + } + } else if (kubernetesClusterContext.removeActiveMemberById(memberId)) { + log.warn(String.format("Member is in the wrong list and it is removed from active members list", memberId)); + } else if (kubernetesClusterContext.removeObsoleteMember(memberId)){ + log.warn(String.format("Member's obsolated timeout has been expired and it is removed from obsolated members list", memberId)); + } else { + log.warn(String.format("Member is not available in any of the list active, pending and termination pending", memberId)); } - } else if (partitionContext.removeActiveMemberById(memberId)) { - log.warn(String.format("Member is in the wrong list and it is removed from active members list", memberId)); - } else if (partitionContext.removeObsoleteMember(memberId)){ - log.warn(String.format("Member's obsolated timeout has been expired and it is removed from obsolated members list", memberId)); - } else { - log.warn(String.format("Member is not available in any of the list active, pending and termination pending", memberId)); - } - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been removed successfully: [member] %s", memberId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been removed successfully: [member] %s", memberId)); + } } -// partitionContext.decrementCurrentActiveMemberCount(1); - - + } catch (Exception e) { log.error("Error processing event", e); } finally { @@ -338,24 +397,37 @@ public class AutoscalerTopologyEventReceiver implements Runnable { String partitionId = e.getPartitionId(); String networkPartitionId = e.getNetworkPartitionId(); - PartitionContext partitionContext; String clusterId = e.getClusterId(); - AbstractMonitor monitor; - - if (AutoscalerContext.getInstance().monitorExist(clusterId)) { - monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); + AbstractClusterMonitor monitor; + + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + if(asCtx.clusterMonitorExist(clusterId)) { + monitor = asCtx.getClusterMonitor(clusterId); } else { - monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId); - partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); - } - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been added successfully: [member] %s", memberId)); + if(log.isDebugEnabled()){ + log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); + } + return; } -// partitionContext.incrementCurrentActiveMemberCount(1); - partitionContext.movePendingMemberToActiveMembers(memberId); - + + if (monitor.getClusterType() == ClusterType.VMServiceCluster) { + PartitionContext partitionContext; + partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); + partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been added successfully: [member] %s", memberId)); + } + partitionContext.movePendingMemberToActiveMembers(memberId); + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext; + kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt(); + kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been added successfully: [member] %s", memberId)); + } + kubernetesClusterContext.movePendingMemberToActiveMembers(memberId); + } + } catch (Exception e) { log.error("Error processing event", e); } finally { @@ -368,42 +440,59 @@ public class AutoscalerTopologyEventReceiver implements Runnable { @Override protected void onEvent(Event event) { try { + TopologyManager.acquireReadLock(); + MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event; AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractMonitor monitor; + AbstractClusterMonitor monitor; String clusterId = memberReadyToShutdownEvent.getClusterId(); String memberId = memberReadyToShutdownEvent.getMemberId(); - if(asCtx.monitorExist(clusterId)){ - monitor = asCtx.getMonitor(clusterId); - }else if(asCtx.lbMonitorExist(clusterId)){ - monitor = asCtx.getLBMonitor(clusterId); - }else{ + if(asCtx.clusterMonitorExist(clusterId)) { + monitor = asCtx.getClusterMonitor(clusterId); + } else { if(log.isDebugEnabled()){ log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); } return; } - NetworkPartitionContext nwPartitionCtxt; - nwPartitionCtxt = monitor.getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId()); + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster) { + + NetworkPartitionContext nwPartitionCtxt; + nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(memberReadyToShutdownEvent.getNetworkPartitionId()); - // start a new member in the same Partition - String partitionId = monitor.getPartitionOfMember(memberId); - PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + // start a new member in the same Partition + String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId); + PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); - // terminate the shutdown ready member - CloudControllerClient ccClient = CloudControllerClient.getInstance(); - ccClient.terminate(memberId); + // terminate the shutdown ready member + CloudControllerClient ccClient = CloudControllerClient.getInstance(); + ccClient.terminate(memberId); - // remove from active member list - partitionCtxt.removeActiveMemberById(memberId); + // remove from active member list + partitionCtxt.removeActiveMemberById(memberId); - if (log.isInfoEnabled()) { - log.info(String.format("Member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ", - memberId, partitionId, clusterId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member is terminated and removed from the active members list: [member] %s [partition] %s [cluster] %s ", + memberId, partitionId, clusterId)); + } + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext; + kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt(); + // terminate the shutdown ready member + CloudControllerClient.getInstance().terminateContainer(memberId); + // remove from active member list + kubernetesClusterContext.removeActiveMemberById(memberId); + + if (log.isInfoEnabled()) { + log.info(String.format("Member is terminated and removed from the active members list: [member] %s [kub cluster] %s [cluster] %s ", + memberId, kubernetesClusterContext.getKubernetesClusterID(), clusterId)); + } } + } catch (TerminationException e) { log.error(e); } @@ -424,22 +513,38 @@ public class AutoscalerTopologyEventReceiver implements Runnable { String partitionId = e.getPartitionId(); String networkPartitionId = e.getNetworkPartitionId(); - PartitionContext partitionContext; String clusterId = e.getClusterId(); - AbstractMonitor monitor; - - if (AutoscalerContext.getInstance().monitorExist(clusterId)) { - monitor = AutoscalerContext.getInstance().getMonitor(clusterId); - partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); + AbstractClusterMonitor monitor; + + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + if (asCtx.clusterMonitorExist(clusterId)) { + monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); } else { - monitor = AutoscalerContext.getInstance().getLBMonitor(clusterId); - partitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); + if(log.isDebugEnabled()){ + log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); + } + return; } - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isDebugEnabled()) { - log.debug(String.format("Member has been moved as pending termination: [member] %s", memberId)); + + if(monitor.getClusterType() == ClusterType.VMServiceCluster + || monitor.getClusterType() == ClusterType.VMLbCluster) { + + PartitionContext partitionContext; + partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); + partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isDebugEnabled()) { + log.debug(String.format("Member has been moved as pending termination: [member] %s", memberId)); + } + partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); + } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { + KubernetesClusterContext kubernetesClusterContext; + kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt(); + kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isDebugEnabled()) { + log.debug(String.format("Member has been moved as pending termination: [member] %s", memberId)); + } + kubernetesClusterContext.moveActiveMemberToTerminationPendingMembers(memberId); } - partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); } catch (Exception e) { log.error("Error processing event", e); @@ -471,64 +576,15 @@ public class AutoscalerTopologyEventReceiver implements Runnable { }); } - private class LBClusterMonitorAdder implements Runnable { - private Cluster cluster; - - public LBClusterMonitorAdder(Cluster cluster) { - this.cluster = cluster; - } - - public void run() { - LbClusterMonitor monitor = null; - int retries = 5; - boolean success = false; - do { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - try { - monitor = AutoscalerUtil.getLBClusterMonitor(cluster); - success = true; - - } catch (PolicyValidationException e) { - String msg = "LB Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.debug(msg, e); - retries--; - - } catch (PartitionValidationException e) { - String msg = "LB Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.debug(msg, e); - retries--; - } - } while (!success && retries <= 0); - - if (monitor == null) { - String msg = "LB Cluster monitor creation failed, even after retrying for 5 times, " - + "for cluster: " + cluster.getClusterId(); - log.error(msg); - throw new RuntimeException(msg); - } - - Thread th = new Thread(monitor); - th.start(); - AutoscalerContext.getInstance().addLbMonitor(monitor); - if (log.isInfoEnabled()) { - log.info(String.format("LB Cluster monitor has been added successfully: [cluster] %s", - cluster.getClusterId())); - } - } - } - private class ClusterMonitorAdder implements Runnable { private Cluster cluster; - + private String clusterMonitorType; public ClusterMonitorAdder(Cluster cluster) { this.cluster = cluster; } public void run() { - ClusterMonitor monitor = null; + AbstractClusterMonitor monitor = null; int retries = 5; boolean success = false; do { @@ -538,68 +594,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } try { - monitor = AutoscalerUtil.getClusterMonitor(cluster); + monitor = ClusterMonitorFactory.getMonitor(cluster); success = true; - + clusterMonitorType = monitor.getClusterType().name(); } catch (PolicyValidationException e) { - String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); + String msg = clusterMonitorType +" monitor creation failed for cluster: " + cluster.getClusterId(); log.debug(msg, e); retries--; } catch (PartitionValidationException e) { - String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.debug(msg, e); - retries--; - } - } while (!success && retries != 0); - - if (monitor == null) { - String msg = "Cluster monitor creation failed, even after retrying for 5 times, " - + "for cluster: " + cluster.getClusterId(); - log.error(msg); - throw new RuntimeException(msg); - } - - Thread th = new Thread(monitor); - th.start(); - AutoscalerContext.getInstance().addMonitor(monitor); - if (log.isInfoEnabled()) { - log.info(String.format("Cluster monitor has been added successfully: [cluster] %s", - cluster.getClusterId())); - } - } - } - - private class KubernetesClusterMonitorAdder implements Runnable { - private Cluster cluster; - - public KubernetesClusterMonitorAdder(Cluster cluster) { - this.cluster = cluster; - } - - public void run() { - KubernetesClusterMonitor monitor = null; - int retries = 5; - boolean success = false; - do { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - - try { - monitor = AutoscalerUtil.getKubernetesClusterMonitor(cluster); - success = true; - - } catch (Exception e) { - String msg = "Kubernetes cluster monitor creation failed for cluster: " + cluster.getClusterId(); + String msg = clusterMonitorType +" monitor creation failed for cluster: " + cluster.getClusterId(); log.debug(msg, e); retries--; } } while (!success && retries != 0); if (monitor == null) { - String msg = "Kubernetes cluster monitor creation failed, even after retrying for 5 times, " + String msg = clusterMonitorType +" monitor creation failed, even after retrying for 5 times, " + "for cluster: " + cluster.getClusterId(); log.error(msg); throw new RuntimeException(msg); @@ -607,16 +618,16 @@ public class AutoscalerTopologyEventReceiver implements Runnable { Thread th = new Thread(monitor); th.start(); - AutoscalerContext.getInstance().addKubernetesClusterMonitor(monitor); + AutoscalerContext.getInstance().addClusterMonitor(monitor); if (log.isInfoEnabled()) { - log.info(String.format("Kubernetes cluster monitor has been added successfully: [cluster] %s", - cluster.getClusterId())); + log.info(String.format("%s monitor has been added successfully: [cluster] %s", + clusterMonitorType, cluster.getClusterId())); } } } - + @SuppressWarnings("unused") - private void runTerminateAllRule(AbstractMonitor monitor) { + private void runTerminateAllRule(VMClusterMonitor monitor) { FactHandle terminateAllFactHandle = null; @@ -639,16 +650,9 @@ public class AutoscalerTopologyEventReceiver implements Runnable { protected synchronized void startClusterMonitor(Cluster cluster) { Thread th = null; - if (cluster.isKubernetesCluster() - && !AutoscalerContext.getInstance().kubernetesClusterMonitorExist(cluster.getClusterId())) { - th = new Thread(new KubernetesClusterMonitorAdder(cluster)); - } else if (cluster.isLbCluster() - && !AutoscalerContext.getInstance().lbMonitorExist(cluster.getClusterId())) { - th = new Thread(new LBClusterMonitorAdder(cluster)); - } else if (!cluster.isLbCluster() && !cluster.isKubernetesCluster() - && !AutoscalerContext.getInstance().monitorExist(cluster.getClusterId())) { - th = new Thread(new ClusterMonitorAdder(cluster)); - } + if (!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) { + th = new Thread(new ClusterMonitorAdder(cluster)); + } if (th != null) { th.start(); try {
http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java new file mode 100644 index 0000000..00796f1 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java @@ -0,0 +1,127 @@ +package org.apache.stratos.autoscaler.monitor; + +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.common.enums.ClusterType; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.drools.runtime.StatefulKnowledgeSession; +import org.drools.runtime.rule.FactHandle; + +public abstract class AbstractClusterMonitor implements Runnable{ + + private String clusterId; + private String serviceId; + private ClusterType clusterType; + private ClusterStatus status; + private int monitorInterval; + + protected FactHandle minCheckFactHandle; + protected FactHandle scaleCheckFactHandle; + private StatefulKnowledgeSession minCheckKnowledgeSession; + private StatefulKnowledgeSession scaleCheckKnowledgeSession; + private boolean isDestroyed; + + private AutoscalerRuleEvaluator autoscalerRuleEvaluator; + + protected AbstractClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, + AutoscalerRuleEvaluator autoscalerRuleEvaluator) { + + super(); + this.clusterId = clusterId; + this.serviceId = serviceId; + this.clusterType = clusterType; + this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; + this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession(); + this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession(); + } + + protected abstract void readConfigurations(); + protected abstract void monitor(); + public abstract void destroy(); + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + + public void setStatus(ClusterStatus status) { + this.status = status; + } + + public ClusterType getClusterType() { + return clusterType; + } + + public ClusterStatus getStatus() { + return status; + } + + public String getServiceId() { + return serviceId; + } + + public void setServiceId(String serviceId) { + this.serviceId = serviceId; + } + + public int getMonitorInterval() { + return monitorInterval; + } + + public void setMonitorInterval(int monitorInterval) { + this.monitorInterval = monitorInterval; + } + + public FactHandle getMinCheckFactHandle() { + return minCheckFactHandle; + } + + public void setMinCheckFactHandle(FactHandle minCheckFactHandle) { + this.minCheckFactHandle = minCheckFactHandle; + } + + public FactHandle getScaleCheckFactHandle() { + return scaleCheckFactHandle; + } + + public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) { + this.scaleCheckFactHandle = scaleCheckFactHandle; + } + + public StatefulKnowledgeSession getMinCheckKnowledgeSession() { + return minCheckKnowledgeSession; + } + + public void setMinCheckKnowledgeSession( + StatefulKnowledgeSession minCheckKnowledgeSession) { + this.minCheckKnowledgeSession = minCheckKnowledgeSession; + } + + public StatefulKnowledgeSession getScaleCheckKnowledgeSession() { + return scaleCheckKnowledgeSession; + } + + public void setScaleCheckKnowledgeSession( + StatefulKnowledgeSession scaleCheckKnowledgeSession) { + this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession; + } + + public boolean isDestroyed() { + return isDestroyed; + } + + public void setDestroyed(boolean isDestroyed) { + this.isDestroyed = isDestroyed; + } + + public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() { + return autoscalerRuleEvaluator; + } + + public void setAutoscalerRuleEvaluator( + AutoscalerRuleEvaluator autoscalerRuleEvaluator) { + this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java deleted file mode 100644 index c1441bb..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractMonitor.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.monitor; - -import java.util.Map; - -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.NetworkPartitionContext; -import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; -import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; -import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.autoscaler.util.AutoScalerConstants; -import org.apache.stratos.autoscaler.util.ConfUtil; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import org.drools.runtime.StatefulKnowledgeSession; -import org.drools.runtime.rule.FactHandle; - -/** - * Is responsible for monitoring a service cluster. This runs periodically - * and perform minimum instance check and scaling check using the underlying - * rules engine. - * - */ - abstract public class AbstractMonitor implements Runnable{ - - private static final Log log = LogFactory.getLog(AbstractMonitor.class); - // Map<NetworkpartitionId, Network Partition Context> - protected Map<String, NetworkPartitionContext> networkPartitionCtxts; - protected DeploymentPolicy deploymentPolicy; - protected AutoscalePolicy autoscalePolicy; - - - protected FactHandle minCheckFactHandle; - protected FactHandle scaleCheckFactHandle; - - protected StatefulKnowledgeSession minCheckKnowledgeSession; - protected StatefulKnowledgeSession scaleCheckKnowledgeSession; - protected boolean isDestroyed; - - protected String clusterId; - protected String serviceId; - - protected AutoscalerRuleEvaluator autoscalerRuleEvaluator; - - // time intereval between two runs of the Monitor. Default is 90000ms. - protected int monitorInterval; - - public AbstractMonitor() { - readConfigurations(); - } - - private void readConfigurations () { - - XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); - monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000); - if (log.isDebugEnabled()) { - log.debug("Cluster Monitor task interval: " + getMonitorInterval()); - } - } - - @Override - public void run() { - // TODO Auto-generated method stub - - } - - - public NetworkPartitionContext getNetworkPartitionCtxt(Member member) { - log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId()); - String networkPartitionId = member.getNetworkPartitionId(); - if(networkPartitionCtxts.containsKey(networkPartitionId)) { - log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId)); - return networkPartitionCtxts.get(networkPartitionId); - } - log.info("returning null getNetworkPartitionCtxt"); - return null; - } - - public String getPartitionOfMember(String memberId){ - for(Service service: TopologyManager.getTopology().getServices()){ - for(Cluster cluster: service.getClusters()){ - if(cluster.memberExists(memberId)){ - return cluster.getMember(memberId).getPartitionId(); - } - } - } - return null; - } - - public void destroy() { - minCheckKnowledgeSession.dispose(); - scaleCheckKnowledgeSession.dispose(); - setDestroyed(true); - if(log.isDebugEnabled()) { - log.debug("Cluster Monitor Drools session has been disposed. "+this.toString()); - } - } - - public boolean isDestroyed() { - return isDestroyed; - } - - public void setDestroyed(boolean isDestroyed) { - this.isDestroyed = isDestroyed; - } - - public String getServiceId() { - return serviceId; - } - - public void setServiceId(String serviceId) { - this.serviceId = serviceId; - } - - public DeploymentPolicy getDeploymentPolicy() { - return deploymentPolicy; - } - - public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) { - this.deploymentPolicy = deploymentPolicy; - } - - public AutoscalePolicy getAutoscalePolicy() { - return autoscalePolicy; - } - - public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) { - this.autoscalePolicy = autoscalePolicy; - } - - public String getClusterId() { - return clusterId; - } - - public void setClusterId(String clusterId) { - this.clusterId = clusterId; - } - - public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() { - return networkPartitionCtxts; - } - - public NetworkPartitionContext getNetworkPartitionCtxt(String networkPartitionId) { - return networkPartitionCtxts.get(networkPartitionId); - } - - public void setPartitionCtxt(Map<String, NetworkPartitionContext> partitionCtxt) { - this.networkPartitionCtxts = partitionCtxt; - } - - public boolean partitionCtxtAvailable(String partitionId) { - return networkPartitionCtxts.containsKey(partitionId); - } - - public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) { - this.networkPartitionCtxts.put(ctxt.getId(), ctxt); - } - - public NetworkPartitionContext getPartitionCtxt(String id) { - return this.networkPartitionCtxts.get(id); - } - - public StatefulKnowledgeSession getMinCheckKnowledgeSession() { - return minCheckKnowledgeSession; - } - - public void setMinCheckKnowledgeSession(StatefulKnowledgeSession minCheckKnowledgeSession) { - this.minCheckKnowledgeSession = minCheckKnowledgeSession; - } - - public FactHandle getMinCheckFactHandle() { - return minCheckFactHandle; - } - - public void setMinCheckFactHandle(FactHandle minCheckFactHandle) { - this.minCheckFactHandle = minCheckFactHandle; - } - - public int getMonitorInterval() { - return monitorInterval; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java deleted file mode 100644 index 5bb478e..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.monitor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.NetworkPartitionContext; -import org.apache.stratos.autoscaler.PartitionContext; -import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; -import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; -import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; -import org.apache.stratos.cloud.controller.stub.pojo.Properties; -import org.apache.stratos.cloud.controller.stub.pojo.Property; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Is responsible for monitoring a service cluster. This runs periodically - * and perform minimum instance check and scaling check using the underlying - * rules engine. - * - */ -public class ClusterMonitor extends AbstractMonitor { - - private static final Log log = LogFactory.getLog(ClusterMonitor.class); - private String lbReferenceType; - private boolean hasPrimary; - private ClusterStatus status; - - public ClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy, - AutoscalePolicy autoscalePolicy) { - this.clusterId = clusterId; - this.serviceId = serviceId; - - this.autoscalerRuleEvaluator = new AutoscalerRuleEvaluator(); - this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession(); - this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession(); - - this.deploymentPolicy = deploymentPolicy; - this.autoscalePolicy = autoscalePolicy; - networkPartitionCtxts = new ConcurrentHashMap<String, NetworkPartitionContext>(); - } - - - - @Override - public void run() { - - try { - // TODO make this configurable, - // this is the delay the min check of normal cluster monitor to wait until LB monitor is added - Thread.sleep(60000); - } catch (InterruptedException ignore) { - } - - while (!isDestroyed()) { - if (log.isDebugEnabled()) { - log.debug("Cluster monitor is running.. " + this.toString()); - } - try { - if(!ClusterStatus.In_Maintenance.equals(status)) { - monitor(); - } else { - if (log.isDebugEnabled()) { - log.debug("Cluster monitor is suspended as the cluster is in " + - ClusterStatus.In_Maintenance + " mode......"); - } - } - } catch (Exception e) { - log.error("Cluster monitor: Monitor failed." + this.toString(), e); - } - try { - Thread.sleep(monitorInterval); - } catch (InterruptedException ignore) { - } - } - } - - private boolean isPrimaryMember(MemberContext memberContext){ - Properties props = memberContext.getProperties(); - if (log.isDebugEnabled()) { - log.debug(" Properties [" + props + "] "); - } - if (props != null && props.getProperties() != null) { - for (Property prop : props.getProperties()) { - if (prop.getName().equals("PRIMARY")) { - if (Boolean.parseBoolean(prop.getValue())) { - log.debug("Adding member id [" + memberContext.getMemberId() + "] " + - "member instance id [" + memberContext.getInstanceId() + "] as a primary member"); - return true; - } - } - } - } - return false; - } - - private void monitor() { - - //TODO make this concurrent - for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) { - // store primary members in the network partition context - List<String> primaryMemberListInNetworkPartition = new ArrayList<String>(); - - //minimum check per partition - for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { - // store primary members in the partition context - List<String> primaryMemberListInPartition = new ArrayList<String>(); - // get active primary members in this partition context - for (MemberContext memberContext : partitionContext.getActiveMembers()) { - if (isPrimaryMember(memberContext)){ - primaryMemberListInPartition.add(memberContext.getMemberId()); - } - } - // get pending primary members in this partition context - for (MemberContext memberContext : partitionContext.getPendingMembers()) { - if (isPrimaryMember(memberContext)){ - primaryMemberListInPartition.add(memberContext.getMemberId()); - } - } - primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition); - minCheckKnowledgeSession.setGlobal("clusterId", clusterId); - minCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType); - minCheckKnowledgeSession.setGlobal("isPrimary", hasPrimary); - minCheckKnowledgeSession.setGlobal("primaryMemberCount", primaryMemberListInPartition.size()); - - if (log.isDebugEnabled()) { - log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId())); - } - - minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession - , minCheckFactHandle, partitionContext); - - } - - boolean rifReset = networkPartitionContext.isRifReset(); - boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset(); - boolean loadAverageReset = networkPartitionContext.isLoadAverageReset(); - if (log.isDebugEnabled()) { - log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset - + " flag of loadAverageReset" + loadAverageReset); - } - if (rifReset || memoryConsumptionReset || loadAverageReset) { - scaleCheckKnowledgeSession.setGlobal("clusterId", clusterId); - //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy); - scaleCheckKnowledgeSession.setGlobal("autoscalePolicy", autoscalePolicy); - scaleCheckKnowledgeSession.setGlobal("rifReset", rifReset); - scaleCheckKnowledgeSession.setGlobal("mcReset", memoryConsumptionReset); - scaleCheckKnowledgeSession.setGlobal("laReset", loadAverageReset); - scaleCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType); - scaleCheckKnowledgeSession.setGlobal("isPrimary", false); - scaleCheckKnowledgeSession.setGlobal("primaryMembers", primaryMemberListInNetworkPartition); - - if (log.isDebugEnabled()) { - log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId())); - log.debug(" Primary members : " + primaryMemberListInNetworkPartition); - } - - scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(scaleCheckKnowledgeSession - , scaleCheckFactHandle, networkPartitionContext); - - networkPartitionContext.setRifReset(false); - networkPartitionContext.setMemoryConsumptionReset(false); - networkPartitionContext.setLoadAverageReset(false); - } else if (log.isDebugEnabled()) { - log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " + - "cycle for network partition %s", networkPartitionContext.getId())); - } - } - } - - @Override - public String toString() { - return "ClusterMonitor [clusterId=" + clusterId + ", serviceId=" + serviceId + - ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy + - ", lbReferenceType=" + lbReferenceType + - ", hasPrimary=" + hasPrimary + " ]"; - } - - public String getLbReferenceType() { - return lbReferenceType; - } - - public void setLbReferenceType(String lbReferenceType) { - this.lbReferenceType = lbReferenceType; - } - - public boolean isHasPrimary() { - return hasPrimary; - } - - public void setHasPrimary(boolean hasPrimary) { - this.hasPrimary = hasPrimary; - } - - public ClusterStatus getStatus() { - return status; - } - - public void setStatus(ClusterStatus status) { - this.status = status; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java new file mode 100644 index 0000000..489078e --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java @@ -0,0 +1,336 @@ +package org.apache.stratos.autoscaler.monitor; + +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.KubernetesClusterContext; +import org.apache.stratos.autoscaler.MemberStatsContext; +import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; +import org.apache.stratos.autoscaler.PartitionContext; +import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; +import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; +import org.apache.stratos.autoscaler.exception.PartitionValidationException; +import org.apache.stratos.autoscaler.exception.PolicyValidationException; +import org.apache.stratos.autoscaler.partition.PartitionGroup; +import org.apache.stratos.autoscaler.partition.PartitionManager; +import org.apache.stratos.autoscaler.policy.PolicyManager; +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; +import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.cloud.controller.stub.pojo.Properties; +import org.apache.stratos.cloud.controller.stub.pojo.Property; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.MemberStatus; +import org.apache.stratos.messaging.util.Constants; + +public class ClusterMonitorFactory { + + private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class); + + public static AbstractClusterMonitor getMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException { + + AbstractClusterMonitor clusterMonitor; + if(cluster.isKubernetesCluster()){ + clusterMonitor = getDockerServiceClusterMonitor(cluster); + } else if (cluster.isLbCluster()){ + clusterMonitor = getVMLbClusterMonitor(cluster); + } else { + clusterMonitor = getVMServiceClusterMonitor(cluster); + } + + return clusterMonitor; + } + + private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException { + // FIXME fix the following code to correctly update + // AutoscalerContext context = AutoscalerContext.getInstance(); + if (null == cluster) { + return null; + } + + String autoscalePolicyName = cluster.getAutoscalePolicyName(); + String deploymentPolicyName = cluster.getDeploymentPolicyName(); + + if (log.isDebugEnabled()) { + log.debug("Deployment policy name: " + deploymentPolicyName); + log.debug("Autoscaler policy name: " + autoscalePolicyName); + } + + AutoscalePolicy policy = + PolicyManager.getInstance() + .getAutoscalePolicy(autoscalePolicyName); + DeploymentPolicy deploymentPolicy = + PolicyManager.getInstance() + .getDeploymentPolicy(deploymentPolicyName); + + if (deploymentPolicy == null) { + String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName; + log.error(msg); + throw new PolicyValidationException(msg); + } + + Partition[] allPartitions = deploymentPolicy.getAllPartitions(); + if (allPartitions == null) { + String msg = + "Deployment Policy's Partitions are null. Policy name: " + + deploymentPolicyName; + log.error(msg); + throw new PolicyValidationException(msg); + } + + CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy); + + VMServiceClusterMonitor clusterMonitor = + new VMServiceClusterMonitor(cluster.getClusterId(), + cluster.getServiceName(), + deploymentPolicy, policy); + clusterMonitor.setStatus(ClusterStatus.Created); + + for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){ + + NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(), + partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()); + + for(Partition partition: partitionGroup.getPartitions()){ + PartitionContext partitionContext = new PartitionContext(partition); + partitionContext.setServiceName(cluster.getServiceName()); + partitionContext.setProperties(cluster.getProperties()); + partitionContext.setNetworkPartitionId(partitionGroup.getId()); + + for (Member member: cluster.getMembers()){ + String memberId = member.getMemberId(); + if(member.getPartitionId().equalsIgnoreCase(partition.getId())){ + MemberContext memberContext = new MemberContext(); + memberContext.setClusterId(member.getClusterId()); + memberContext.setMemberId(memberId); + memberContext.setPartition(partition); + memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties())); + + if(MemberStatus.Activated.equals(member.getStatus())){ + partitionContext.addActiveMember(memberContext); +// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); +// partitionContext.incrementCurrentActiveMemberCount(1); + + } else if(MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())){ + partitionContext.addPendingMember(memberContext); + +// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); + } else if(MemberStatus.Suspended.equals(member.getStatus())){ +// partitionContext.addFaultyMember(memberId); + } + partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if(log.isInfoEnabled()){ + log.info(String.format("Member stat context has been added: [member] %s", memberId)); + } + } + + } + networkPartitionContext.addPartitionContext(partitionContext); + if(log.isInfoEnabled()){ + log.info(String.format("Partition context has been added: [partition] %s", + partitionContext.getPartitionId())); + } + } + + clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext); + if(log.isInfoEnabled()){ + log.info(String.format("Network partition context has been added: [network partition] %s", + networkPartitionContext.getId())); + } + } + + + // find lb reference type + java.util.Properties props = cluster.getProperties(); + + if(props.containsKey(Constants.LOAD_BALANCER_REF)) { + String value = props.getProperty(Constants.LOAD_BALANCER_REF); + clusterMonitor.setLbReferenceType(value); + if(log.isDebugEnabled()) { + log.debug("Set the lb reference type: "+value); + } + } + + // set hasPrimary property + // hasPrimary is true if there are primary members available in that cluster + clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY))); + + log.info("Cluster monitor created: "+clusterMonitor.toString()); + return clusterMonitor; + } + + private static Properties convertMemberPropsToMemberContextProps( + java.util.Properties properties) { + Properties props = new Properties(); + for (Map.Entry<Object, Object> e : properties.entrySet() ) { + Property prop = new Property(); + prop.setName((String)e.getKey()); + prop.setValue((String)e.getValue()); + props.addProperties(prop); + } + return props; + } + + + private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException { + // FIXME fix the following code to correctly update + // AutoscalerContext context = AutoscalerContext.getInstance(); + if (null == cluster) { + return null; + } + + String autoscalePolicyName = cluster.getAutoscalePolicyName(); + String deploymentPolicyName = cluster.getDeploymentPolicyName(); + + if (log.isDebugEnabled()) { + log.debug("Deployment policy name: " + deploymentPolicyName); + log.debug("Autoscaler policy name: " + autoscalePolicyName); + } + + AutoscalePolicy policy = + PolicyManager.getInstance() + .getAutoscalePolicy(autoscalePolicyName); + DeploymentPolicy deploymentPolicy = + PolicyManager.getInstance() + .getDeploymentPolicy(deploymentPolicyName); + + if (deploymentPolicy == null) { + String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName; + log.error(msg); + throw new PolicyValidationException(msg); + } + + String clusterId = cluster.getClusterId(); + VMLbClusterMonitor clusterMonitor = + new VMLbClusterMonitor(clusterId, + cluster.getServiceName(), + deploymentPolicy, policy); + clusterMonitor.setStatus(ClusterStatus.Created); + // partition group = network partition context + for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) { + + NetworkPartitionLbHolder networkPartitionLbHolder = + PartitionManager.getInstance() + .getNetworkPartitionLbHolder(partitionGroup.getId()); +// PartitionManager.getInstance() +// .getNetworkPartitionLbHolder(partitionGroup.getId()); + // FIXME pick a random partition + Partition partition = + partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)]; + PartitionContext partitionContext = new PartitionContext(partition); + partitionContext.setServiceName(cluster.getServiceName()); + partitionContext.setProperties(cluster.getProperties()); + partitionContext.setNetworkPartitionId(partitionGroup.getId()); + partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions + + NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(), + partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()) ; + for (Member member : cluster.getMembers()) { + String memberId = member.getMemberId(); + if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) { + MemberContext memberContext = new MemberContext(); + memberContext.setClusterId(member.getClusterId()); + memberContext.setMemberId(memberId); + memberContext.setPartition(partition); + + if (MemberStatus.Activated.equals(member.getStatus())) { + partitionContext.addActiveMember(memberContext); +// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); +// partitionContext.incrementCurrentActiveMemberCount(1); + } else if (MemberStatus.Created.equals(member.getStatus()) || + MemberStatus.Starting.equals(member.getStatus())) { + partitionContext.addPendingMember(memberContext); +// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); + } else if (MemberStatus.Suspended.equals(member.getStatus())) { +// partitionContext.addFaultyMember(memberId); + } + + partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if(log.isInfoEnabled()){ + log.info(String.format("Member stat context has been added: [member] %s", memberId)); + } + } + + } + networkPartitionContext.addPartitionContext(partitionContext); + + // populate lb cluster id in network partition context. + java.util.Properties props = cluster.getProperties(); + + // get service type of load balanced cluster + String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE); + + if(props.containsKey(Constants.LOAD_BALANCER_REF)) { + String value = props.getProperty(Constants.LOAD_BALANCER_REF); + + if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) { + networkPartitionLbHolder.setDefaultLbClusterId(clusterId); + + } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) { + String serviceName = cluster.getServiceName(); + // TODO: check if this is correct + networkPartitionLbHolder.addServiceLB(serviceName, clusterId); + + if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) { + networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId); + if (log.isDebugEnabled()) { + log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType); + } + } + } + } + + clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext); + } + + log.info("LB Cluster monitor created: "+clusterMonitor.toString()); + return clusterMonitor; + } + + private static DockerServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) { + + if (null == cluster) { + return null; + } + + String autoscalePolicyName = cluster.getAutoscalePolicyName(); + if (log.isDebugEnabled()) { + log.debug("Autoscaler policy name: " + autoscalePolicyName); + } + + AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName); + java.util.Properties props = cluster.getProperties(); + String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID); + KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID); + + DockerServiceClusterMonitor dockerClusterMonitor = new DockerServiceClusterMonitor( + kubernetesClusterCtxt, + cluster.getClusterId(), + cluster.getServiceName(), + policy); + + dockerClusterMonitor.setStatus(ClusterStatus.Created); + + // find lb reference type + if(props.containsKey(Constants.LOAD_BALANCER_REF)) { + String value = props.getProperty(Constants.LOAD_BALANCER_REF); + dockerClusterMonitor.setLbReferenceType(value); + if(log.isDebugEnabled()) { + log.debug("Set the lb reference type: "+value); + } + } + +// // set hasPrimary property +// // hasPrimary is true if there are primary members available in that cluster +// dockerClusterMonitor.setHasPrimary(Boolean.parseBoolean(props.getProperty(Constants.IS_PRIMARY))); + + log.info("Docker cluster monitor created: "+ dockerClusterMonitor.toString()); + return dockerClusterMonitor; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java new file mode 100644 index 0000000..f9b9047 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java @@ -0,0 +1,38 @@ +package org.apache.stratos.autoscaler.monitor; + +import org.apache.stratos.autoscaler.KubernetesClusterContext; +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.common.enums.ClusterType; + +public abstract class ContainerClusterMonitor extends AbstractClusterMonitor { + + private KubernetesClusterContext kubernetesClusterCtxt; + protected AutoscalePolicy autoscalePolicy; + + protected ContainerClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, + KubernetesClusterContext kubernetesClusterContext, + AutoscalerRuleEvaluator autoscalerRuleEvaluator, AutoscalePolicy autoscalePolicy){ + + super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator); + this.kubernetesClusterCtxt = kubernetesClusterContext; + this.autoscalePolicy = autoscalePolicy; + } + + public KubernetesClusterContext getKubernetesClusterCtxt() { + return kubernetesClusterCtxt; + } + + public void setKubernetesClusterCtxt( + KubernetesClusterContext kubernetesClusterCtxt) { + this.kubernetesClusterCtxt = kubernetesClusterCtxt; + } + + public AutoscalePolicy getAutoscalePolicy() { + return autoscalePolicy; + } + + public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) { + this.autoscalePolicy = autoscalePolicy; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/d6f49d37/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java new file mode 100644 index 0000000..ca39b6a --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java @@ -0,0 +1,156 @@ +package org.apache.stratos.autoscaler.monitor; + +import java.util.Properties; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.KubernetesClusterContext; +import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.autoscaler.util.AutoScalerConstants; +import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.common.enums.ClusterType; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +public final class DockerServiceClusterMonitor extends ContainerClusterMonitor{ + + private static final Log log = LogFactory.getLog(DockerServiceClusterMonitor.class); + + private String lbReferenceType; + private int numberOfReplicasInServiceCluster = 0; + int retryInterval = 60000; + + public DockerServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt, + String serviceClusterID, String serviceId, AutoscalePolicy autoscalePolicy) { + super(serviceClusterID, serviceId, ClusterType.DockerServiceCluster, kubernetesClusterCtxt, + new AutoscalerRuleEvaluator(), autoscalePolicy); + readConfigurations(); + } + + @Override + public void run() { + try { + // TODO make this configurable, + // this is the delay the min check of normal cluster monitor to wait + // until LB monitor is added + Thread.sleep(60000); + } catch (InterruptedException ignore) { + } + + while (!isDestroyed()) { + if (log.isDebugEnabled()) { + log.debug("Kubernetes cluster monitor is running.. " + this.toString()); + } + try { + if (!ClusterStatus.In_Maintenance.equals(getStatus())) { + monitor(); + } else { + if (log.isDebugEnabled()) { + log.debug("Kubernetes cluster monitor is suspended as the cluster is in " + + ClusterStatus.In_Maintenance + " mode......"); + } + } + } catch (Exception e) { + log.error("Kubernetes cluster monitor: Monitor failed." + this.toString(), + e); + } + try { + Thread.sleep(getMonitorInterval()); + } catch (InterruptedException ignore) { + } + } + } + + @Override + protected void monitor() { + + // is container created successfully? + boolean success = false; + String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID(); + + try { + TopologyManager.acquireReadLock(); + Properties props = TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties(); + int minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS)); + + int nonTerminatedMembers = getKubernetesClusterCtxt().getActiveMembers().size() + getKubernetesClusterCtxt().getPendingMembers().size(); + + if (nonTerminatedMembers == 0) { + + while (success) { + try { + + MemberContext memberContext = CloudControllerClient.getInstance().createContainer(kubernetesClusterId, getClusterId()); + if(null != memberContext) { + getKubernetesClusterCtxt().addPendingMember(memberContext); + success = true; + numberOfReplicasInServiceCluster = minReplicas; + if(log.isDebugEnabled()){ + log.debug(String.format("Pending member added, [member] %s [kub cluster] %s", + memberContext.getMemberId(), getKubernetesClusterCtxt().getKubernetesClusterID())); + } + } else { + if (log.isDebugEnabled()) { + log.debug("Returned member context is null, did not add to pending members"); + } + } + } catch (Throwable e) { + if (log.isDebugEnabled()) { + String message = "Cannot create a container, will retry in "+(retryInterval/1000)+"s"; + log.debug(message, e); + } + } + + try { + Thread.sleep(retryInterval); + } catch (InterruptedException e1) { + } + } + } + } finally { + TopologyManager.releaseReadLock(); + } + } + + @Override + public void destroy() { + getMinCheckKnowledgeSession().dispose(); + getScaleCheckKnowledgeSession().dispose(); + setDestroyed(true); + if(log.isDebugEnabled()) { + log.debug("DockerClusterMonitor Drools session has been disposed. "+this.toString()); + } + } + + @Override + protected void readConfigurations () { + // same as VM cluster monitor interval + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000); + setMonitorInterval(monitorInterval); + if (log.isDebugEnabled()) { + log.debug("Kubernetes Cluster Monitor task interval: " + getMonitorInterval()); + } + } + + @Override + public String toString() { + return "DockerClusterMonitor " + + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID() + + ", clusterId=" + getClusterId() + + ", serviceId=" + getServiceId() + "]"; + } + + public String getLbReferenceType() { + return lbReferenceType; + } + + public void setLbReferenceType(String lbReferenceType) { + this.lbReferenceType = lbReferenceType; + } +} \ No newline at end of file
