refactoring cluster monitor to include proper data structure with the cluster instances
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/4f2c1b70 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/4f2c1b70 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/4f2c1b70 Branch: refs/heads/master Commit: 4f2c1b707ebf2c8d8af8325d9bf174302af365b0 Parents: a4e473a Author: reka <[email protected]> Authored: Sun Nov 30 15:03:36 2014 +0530 Committer: reka <[email protected]> Committed: Sun Nov 30 15:48:46 2014 +0530 ---------------------------------------------------------------------- .../algorithm/AutoscaleAlgorithm.java | 9 +- .../autoscaler/algorithm/OneAfterAnother.java | 15 +- .../autoscaler/algorithm/RoundRobin.java | 24 +- .../context/cluster/ClusterContextFactory.java | 438 ++++++++----------- .../context/cluster/ClusterInstanceContext.java | 353 ++++++++++++++- .../cluster/KubernetesClusterContext.java | 10 + .../context/cluster/VMClusterContext.java | 230 +++++++++- .../cluster/VMServiceClusterContext.java | 6 +- .../ClusterLevelNetworkPartitionContext.java | 371 +--------------- .../AutoscalerTopologyEventReceiver.java | 92 ++-- .../autoscaler/monitor/MonitorFactory.java | 10 +- .../monitor/cluster/AbstractClusterMonitor.java | 17 +- .../KubernetesServiceClusterMonitor.java | 100 ++--- .../monitor/cluster/VMClusterMonitor.java | 182 ++++---- .../cluster/VMServiceClusterMonitor.java | 198 +++++---- .../monitor/component/ApplicationMonitor.java | 80 ++-- .../ApplicationLevelNetworkPartition.java | 9 + .../network/ChildLevelNetworkPartition.java | 1 + .../autoscaler/rule/RuleTasksDelegator.java | 23 +- .../status/processor/StatusChecker.java | 12 +- .../cluster/ClusterStatusActiveProcessor.java | 7 +- .../cluster/ClusterStatusInActiveProcessor.java | 8 +- .../ClusterStatusTerminatedProcessor.java | 16 +- .../domain/applications/ClusterDataHolder.java | 19 + .../messaging/domain/topology/Cluster.java | 43 +- .../AverageRequestsServingCapabilityEvent.java | 7 +- .../event/health/stat/MemberFaultEvent.java | 8 +- 27 files changed, 1282 insertions(+), 1006 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java index bcc1bce..aa08581 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java @@ -19,6 +19,7 @@ package org.apache.stratos.autoscaler.algorithm; +import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; import org.apache.stratos.cloud.controller.stub.domain.Partition; @@ -45,17 +46,17 @@ public interface AutoscaleAlgorithm { /** * Returns a {@link Partition} to scale up from the given {@link org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.NetworkPartition} according to algorithm - * @param clusterLevelNetworkPartitionContext {@link org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext} which need the {@link Partition} + * @param clusterInstanceContext {@link org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext} which need the {@link Partition} * @param clusterId Id of the cluster which need the {@link Partition} * @return {@link Partition} to scale up */ - public Partition getNextScaleUpPartition(ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext, String clusterId); + public Partition getNextScaleUpPartition(ClusterInstanceContext clusterInstanceContext, String clusterId); /** * Returns a {@link Partition} to scale down from the given {@link org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.NetworkPartition} according to algorithm - * @param clusterLevelNetworkPartitionContext {@link org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext} which need the {@link Partition} + * @param clusterInstanceContext {@link org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext} which need the {@link Partition} * @param clusterId Id of the cluster which need the {@link Partition} * @return {@link Partition} to scale down */ - public Partition getNextScaleDownPartition(ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext, String clusterId); + public Partition getNextScaleDownPartition(ClusterInstanceContext clusterInstanceContext, String clusterId); } http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java index beabe3e..f900bb5 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java @@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.algorithm; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; import org.apache.stratos.cloud.controller.stub.domain.Partition; @@ -44,7 +45,7 @@ public class OneAfterAnother implements AutoscaleAlgorithm { private static final Log log = LogFactory.getLog(OneAfterAnother.class); - public Partition getNextScaleUpPartition(ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext, String clusterId) { + /*public Partition getNextScaleUpPartition(ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext, String clusterId) { try { if (log.isDebugEnabled()) @@ -134,7 +135,7 @@ public class OneAfterAnother implements AutoscaleAlgorithm { } return null; } - +*/ @Override public boolean scaleUpPartitionAvailable(String clusterId) { @@ -146,4 +147,14 @@ public class OneAfterAnother implements AutoscaleAlgorithm { return false; //To change body of implemented methods use File | Settings | File Templates. } + @Override + public Partition getNextScaleUpPartition(ClusterInstanceContext clusterInstanceContext, String clusterId) { + return null; + } + + @Override + public Partition getNextScaleDownPartition(ClusterInstanceContext clusterInstanceContext, String clusterId) { + return null; + } + } http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java index 5c6dc54..202d0ea 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java @@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.algorithm; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; import org.apache.stratos.cloud.controller.stub.domain.Partition; @@ -36,17 +37,16 @@ public class RoundRobin implements AutoscaleAlgorithm{ private static final Log log = LogFactory.getLog(RoundRobin.class); - public Partition getNextScaleUpPartition(ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext, String clusterId){ - try{ + public Partition getNextScaleUpPartition(ClusterInstanceContext instanceContext, String clusterId){ + /*try{ if (log.isDebugEnabled()) - log.debug(String.format("Searching for a partition to scale up [network partition] %s", - clusterLevelNetworkPartitionContext.getId())) ; - List<?> partitions = Arrays.asList(clusterLevelNetworkPartitionContext.getPartitions()); + log.debug(String.format("Searching for a partition to scale up [ClsuterInstance] %s", + instanceContext.getId())) ; + List<?> partitions = Arrays.asList(instanceContext.getPartitionCtxts()); int noOfPartitions = partitions.size(); - for(int i=0; i < noOfPartitions; i++) - { + for(int i=0; i < noOfPartitions; i++) { int currentPartitionIndex = clusterLevelNetworkPartitionContext.getCurrentPartitionIndex(); if (partitions.get(currentPartitionIndex) instanceof Partition) { Partition currentPartition = (Partition) partitions.get(currentPartitionIndex); @@ -55,7 +55,7 @@ public class RoundRobin implements AutoscaleAlgorithm{ // point to next partition int nextPartitionIndex = currentPartitionIndex == noOfPartitions - 1 ? 0 : currentPartitionIndex+1; clusterLevelNetworkPartitionContext.setCurrentPartitionIndex(nextPartitionIndex); - int nonTerminatedMemberCountOfPartition = clusterLevelNetworkPartitionContext.getNonTerminatedMemberCountOfPartition(currentPartitionId); + int nonTerminatedMemberCountOfPartition = clusterLevelNetworkPartitionContext.(currentPartitionId); if(nonTerminatedMemberCountOfPartition < currentPartition.getPartitionMax()){ // current partition is free if (log.isDebugEnabled()) @@ -77,14 +77,14 @@ public class RoundRobin implements AutoscaleAlgorithm{ } } catch (Exception e) { log.error("Error occurred while searching for next scale up partition", e); - } + }*/ return null; } @Override - public Partition getNextScaleDownPartition(ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext, String clusterId) { - try{ + public Partition getNextScaleDownPartition(ClusterInstanceContext instanceContext, String clusterId) { + /*try{ if (log.isDebugEnabled()) log.debug(String.format("Searching for a partition to scale up [network partition] %s", clusterLevelNetworkPartitionContext.getId())) ; @@ -138,7 +138,7 @@ public class RoundRobin implements AutoscaleAlgorithm{ } catch (Exception e) { log.error("Error occurred while searching for next scale down partition", e); - } + }*/ return null; } http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java index edc0b28..c98501d 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterContextFactory.java @@ -21,10 +21,8 @@ package org.apache.stratos.autoscaler.context.cluster; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.client.CloudControllerClient; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.context.member.MemberStatsContext; -import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; -import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException; import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.ChildLevelPartition; @@ -38,288 +36,205 @@ import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.Properties; import org.apache.stratos.common.Property; import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.applications.Application; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.MemberStatus; - -import java.util.HashMap; import java.util.Map; -import java.util.Random; + +//import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.PartitionManager; public class ClusterContextFactory { private static final Log log = LogFactory.getLog(ClusterContextFactory.class); - public static VMServiceClusterContext getVMServiceClusterContext (Cluster cluster) throws PolicyValidationException, PartitionValidationException { + public static VMServiceClusterContext getVMServiceClusterContext(String instanceId, Cluster cluster) + throws PolicyValidationException, PartitionValidationException { if (null == cluster) { return null; } - String autoscalePolicyName = cluster.getAutoscalePolicyName(); - String deploymentPolicyName = cluster.getDeploymentPolicyName(); - - if (log.isDebugEnabled()) { - log.debug("Deployment policy name: " + deploymentPolicyName); - log.debug("Autoscaler policy name: " + autoscalePolicyName); - } - AutoscalePolicy autoscalePolicy = PolicyManager.getInstance() .getAutoscalePolicy(autoscalePolicyName); - DeploymentPolicy deploymentPolicy = - PolicyManager.getInstance() - .getDeploymentPolicy(deploymentPolicyName); - - if (deploymentPolicy == null) { - String msg = "Deployment policy is null: [policy-name] " + deploymentPolicyName; - log.error(msg); - throw new PolicyValidationException(msg); - } - - Partition[] allPartitions = deploymentPolicy.getAllPartitions(); - if (allPartitions == null) { - String msg = - "Partitions are null in deployment policy: [policy-name]: " + - deploymentPolicyName; - log.error(msg); - throw new PolicyValidationException(msg); - } - - CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy); - - Map<String, ClusterLevelNetworkPartitionContext> networkPartitionContextMap = new HashMap<String, ClusterLevelNetworkPartitionContext>(); - - for (ChildLevelNetworkPartition networkPartition : deploymentPolicy.getChildLevelNetworkPartitions()) { - - String networkPartitionId = networkPartition.getId(); - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext - = new ClusterLevelNetworkPartitionContext(networkPartitionId, - networkPartition.getPartitionAlgo(), - networkPartition.getChildLevelPartitions()); - - for (ChildLevelPartition childLevelPartition : networkPartition.getChildLevelPartitions()) { - Partition applicationLevelPartition = deploymentPolicy.getApplicationLevelNetworkPartition(networkPartitionId) - .getPartition(childLevelPartition.getPartitionId()); - ClusterLevelPartitionContext clusterLevelPartitionContext - = new ClusterLevelPartitionContext(childLevelPartition, applicationLevelPartition); - clusterLevelPartitionContext.setServiceName(cluster.getServiceName()); - clusterLevelPartitionContext.setProperties(cluster.getProperties()); - clusterLevelPartitionContext.setNetworkPartitionId(networkPartition.getId()); - - for (Member member : cluster.getMembers()) { - String memberId = member.getMemberId(); - if (member.getPartitionId().equalsIgnoreCase(childLevelPartition.getPartitionId())) { - MemberContext memberContext = new MemberContext(); - memberContext.setClusterId(member.getClusterId()); - memberContext.setMemberId(memberId); - memberContext.setInitTime(member.getInitTime()); - memberContext.setPartition(applicationLevelPartition); - memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties())); - - if (MemberStatus.Activated.equals(member.getStatus())) { - if (log.isDebugEnabled()) { - String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); - log.debug(msg); - } - clusterLevelPartitionContext.addActiveMember(memberContext); -// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); -// partitionContext.incrementCurrentActiveMemberCount(1); - - } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) { - if (log.isDebugEnabled()) { - String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); - log.debug(msg); - } - clusterLevelPartitionContext.addPendingMember(memberContext); - -// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); - } else if (MemberStatus.Suspended.equals(member.getStatus())) { -// partitionContext.addFaultyMember(memberId); - } - clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been added: [member] %s", memberId)); - } - } - - } - clusterLevelNetworkPartitionContext.addPartitionContext(clusterLevelPartitionContext); - if (log.isInfoEnabled()) { - log.info(String.format("Partition context has been added: [partition] %s", - clusterLevelPartitionContext.getPartitionId())); - } - } - - networkPartitionContextMap.put(networkPartitionId, clusterLevelNetworkPartitionContext); - if (log.isInfoEnabled()) { - log.info(String.format("Network partition context has been added: [network partition] %s", - clusterLevelNetworkPartitionContext.getId())); - } - } - - return new VMServiceClusterContext(cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy, - deploymentPolicy, networkPartitionContextMap); - } - - public static VMClusterContext getVMLBClusterContext (Cluster cluster) throws PolicyValidationException { - - // FIXME fix the following code to correctly update - // AutoscalerContext context = AutoscalerContext.getInstance(); - if (null == cluster) { - return null; - } - - String autoscalePolicyName = cluster.getAutoscalePolicyName(); - String deploymentPolicyName = cluster.getDeploymentPolicyName(); if (log.isDebugEnabled()) { - log.debug("Deployment policy name: " + deploymentPolicyName); log.debug("Autoscaler policy name: " + autoscalePolicyName); } - - AutoscalePolicy autoscalePolicy = - PolicyManager.getInstance() - .getAutoscalePolicy(autoscalePolicyName); - DeploymentPolicy deploymentPolicy = - PolicyManager.getInstance() - .getDeploymentPolicy(deploymentPolicyName); - - if (deploymentPolicy == null) { - String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName; - log.error(msg); - throw new PolicyValidationException(msg); - } - - String clusterId = cluster.getClusterId(); - - Map<String, ClusterLevelNetworkPartitionContext> networkPartitionContextMap = new HashMap<String, ClusterLevelNetworkPartitionContext>(); - - // partition group = network partition context - for (ChildLevelNetworkPartition networkPartition : deploymentPolicy.getChildLevelNetworkPartitions()) { - - String networkPartitionId = networkPartition.getId(); -// NetworkPartitionLbHolder networkPartitionLbHolder = -// PartitionManager.getInstance() -// .getNetworkPartitionLbHolder(networkPartitionId); -// PartitionManager.getInstance() -// .getNetworkPartitionLbHolder(partitionGroup.getPartitionId()); - // FIXME pick a random partition - ChildLevelPartition partition = - networkPartition.getChildLevelPartitions()[new Random().nextInt(networkPartition.getChildLevelPartitions().length)]; - Partition applicationLevelPartition = deploymentPolicy.getApplicationLevelNetworkPartition(networkPartitionId) - .getPartition(partition.getPartitionId()); - ClusterLevelPartitionContext clusterMonitorPartitionContext - = new ClusterLevelPartitionContext(partition, applicationLevelPartition); - clusterMonitorPartitionContext.setServiceName(cluster.getServiceName()); - clusterMonitorPartitionContext.setProperties(cluster.getProperties()); - clusterMonitorPartitionContext.setNetworkPartitionId(networkPartitionId); - clusterMonitorPartitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions - - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = new ClusterLevelNetworkPartitionContext(networkPartitionId, - networkPartition.getPartitionAlgo(), - networkPartition.getChildLevelPartitions()); - for (Member member : cluster.getMembers()) { - String memberId = member.getMemberId(); - if (member.getNetworkPartitionId().equalsIgnoreCase(clusterLevelNetworkPartitionContext.getId())) { - MemberContext memberContext = new MemberContext(); - memberContext.setClusterId(member.getClusterId()); - memberContext.setMemberId(memberId); - memberContext.setPartition(applicationLevelPartition); - memberContext.setInitTime(member.getInitTime()); - - if (MemberStatus.Activated.equals(member.getStatus())) { - if (log.isDebugEnabled()) { - String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); - log.debug(msg); - } - clusterMonitorPartitionContext.addActiveMember(memberContext); -// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); -// partitionContext.incrementCurrentActiveMemberCount(1); - } else if (MemberStatus.Created.equals(member.getStatus()) || - MemberStatus.Starting.equals(member.getStatus())) { - if (log.isDebugEnabled()) { - String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); - log.debug(msg); - } - clusterMonitorPartitionContext.addPendingMember(memberContext); -// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); - } else if (MemberStatus.Suspended.equals(member.getStatus())) { -// partitionContext.addFaultyMember(memberId); - } - - clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been added: [member] %s", memberId)); - } - } - - } - clusterLevelNetworkPartitionContext.addPartitionContext(clusterMonitorPartitionContext); - - -// // populate lb cluster id in network partition context. -// java.util.Properties props = cluster.getProperties(); -// -// // get service type of load balanced cluster -// String loadBalancedServiceType = props.getProperty(StratosConstants.LOAD_BALANCED_SERVICE_TYPE); -// -// if (props.containsKey(StratosConstants.LOAD_BALANCER_REF)) { -// String value = props.getProperty(StratosConstants.LOAD_BALANCER_REF); -// -// if (value.equals(StratosConstants.DEFAULT_LOAD_BALANCER)) { -// networkPartitionLbHolder.setDefaultLbClusterId(clusterId); -// -// } else if (value.equals(StratosConstants.SERVICE_AWARE_LOAD_BALANCER)) { -// String serviceName = cluster.getServiceName(); -// // TODO: check if this is correct -// networkPartitionLbHolder.addServiceLB(serviceName, clusterId); -// -// if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) { -// networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId); -// if (log.isDebugEnabled()) { -// log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType); -// } -// } -// } -// } - -// // populate lb cluster id in network partition context. -// java.util.Properties props = cluster.getProperties(); -// -// // get service type of load balanced cluster -// String loadBalancedServiceType = props.getProperty(org.apache.stratos.messaging.util.Constants.LOAD_BALANCED_SERVICE_TYPE); -// -// if (props.containsKey(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF)) { -// String value = props.getProperty(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF); -// -// if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) { -// networkPartitionLbHolder.setDefaultLbClusterId(clusterId); -// -// } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) { -// String serviceName = cluster.getServiceName(); -// // TODO: check if this is correct -// networkPartitionLbHolder.addServiceLB(serviceName, clusterId); -// -// if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) { -// networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId); -// if (log.isDebugEnabled()) { -// log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType); -// } -// } -// } -// } - - - networkPartitionContextMap.put(networkPartitionId, clusterLevelNetworkPartitionContext); + String deploymentPolicyName; + DeploymentPolicy deploymentPolicy; + ApplicationHolder.acquireReadLock(); + try { + Application application = ApplicationHolder.getApplications(). + getApplication(cluster.getAppId()); + deploymentPolicyName = application.getDeploymentPolicy(); + deploymentPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicyName); + } finally { + ApplicationHolder.releaseReadLock(); } - return new VMClusterContext(clusterId, cluster.getServiceName(), autoscalePolicy, - deploymentPolicy, networkPartitionContextMap); + return new VMServiceClusterContext(cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy, + deploymentPolicy); } - public static KubernetesClusterContext getKubernetesClusterContext (Cluster cluster) throws PolicyValidationException { + /* public static VMClusterContext getVMLBClusterContext(Cluster cluster) throws PolicyValidationException { + + // FIXME fix the following code to correctly update + // AutoscalerContext context = AutoscalerContext.getInstance(); + if (null == cluster) { + return null; + } + + String autoscalePolicyName = cluster.getAutoscalePolicyName(); + String deploymentPolicyName = cluster.getDeploymentPolicyName(); + + if (log.isDebugEnabled()) { + log.debug("Deployment policy name: " + deploymentPolicyName); + log.debug("Autoscaler policy name: " + autoscalePolicyName); + } + + AutoscalePolicy autoscalePolicy = + PolicyManager.getInstance() + .getAutoscalePolicy(autoscalePolicyName); + DeploymentPolicy deploymentPolicy = + PolicyManager.getInstance() + .getDeploymentPolicy(deploymentPolicyName); + + if (deploymentPolicy == null) { + String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName; + log.error(msg); + throw new PolicyValidationException(msg); + } + + String clusterId = cluster.getClusterId(); + + Map<String, ClusterLevelNetworkPartitionContext> networkPartitionContextMap = new HashMap<String, ClusterLevelNetworkPartitionContext>(); + + // partition group = network partition context + for (ChildLevelNetworkPartition networkPartition : deploymentPolicy.getChildLevelNetworkPartitions()) { + + String networkPartitionId = networkPartition.getId(); + // NetworkPartitionLbHolder networkPartitionLbHolder = + // PartitionManager.getInstance() + // .getNetworkPartitionLbHolder(networkPartitionId); + // PartitionManager.getInstance() + // .getNetworkPartitionLbHolder(partitionGroup.getPartitionId()); + // FIXME pick a random partition + Partition partition = + networkPartition.getPartitions()[new Random().nextInt(networkPartition.getPartitions().length)]; + ClusterLevelPartitionContext clusterMonitorPartitionContext = new ClusterLevelPartitionContext(partition); + clusterMonitorPartitionContext.setServiceName(cluster.getServiceName()); + clusterMonitorPartitionContext.setProperties(cluster.getProperties()); + clusterMonitorPartitionContext.setNetworkPartitionId(networkPartitionId); + clusterMonitorPartitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions + + ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = new ClusterLevelNetworkPartitionContext(networkPartitionId, + networkPartition.getPartitionAlgo(), + networkPartition.getPartitions()); + for (Member member : cluster.getMembers()) { + String memberId = member.getMemberId(); + if (member.getNetworkPartitionId().equalsIgnoreCase(clusterLevelNetworkPartitionContext.getId())) { + MemberContext memberContext = new MemberContext(); + memberContext.setClusterId(member.getClusterId()); + memberContext.setMemberId(memberId); + memberContext.setPartition(partition); + memberContext.setInitTime(member.getInitTime()); + + if (MemberStatus.Activated.equals(member.getStatus())) { + if (log.isDebugEnabled()) { + String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); + log.debug(msg); + } + clusterMonitorPartitionContext.addActiveMember(memberContext); + // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); + // partitionContext.incrementCurrentActiveMemberCount(1); + } else if (MemberStatus.Created.equals(member.getStatus()) || + MemberStatus.Starting.equals(member.getStatus())) { + if (log.isDebugEnabled()) { + String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); + log.debug(msg); + } + clusterMonitorPartitionContext.addPendingMember(memberContext); + // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); + } else if (MemberStatus.Suspended.equals(member.getStatus())) { + // partitionContext.addFaultyMember(memberId); + } + + clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been added: [member] %s", memberId)); + } + } + + } + clusterLevelNetworkPartitionContext.addPartitionContext(clusterMonitorPartitionContext); + + + // // populate lb cluster id in network partition context. + // java.util.Properties props = cluster.getProperties(); + // + // // get service type of load balanced cluster + // String loadBalancedServiceType = props.getProperty(StratosConstants.LOAD_BALANCED_SERVICE_TYPE); + // + // if (props.containsKey(StratosConstants.LOAD_BALANCER_REF)) { + // String value = props.getProperty(StratosConstants.LOAD_BALANCER_REF); + // + // if (value.equals(StratosConstants.DEFAULT_LOAD_BALANCER)) { + // networkPartitionLbHolder.setDefaultLbClusterId(clusterId); + // + // } else if (value.equals(StratosConstants.SERVICE_AWARE_LOAD_BALANCER)) { + // String serviceName = cluster.getServiceName(); + // // TODO: check if this is correct + // networkPartitionLbHolder.addServiceLB(serviceName, clusterId); + // + // if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) { + // networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId); + // if (log.isDebugEnabled()) { + // log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType); + // } + // } + // } + // } + + // // populate lb cluster id in network partition context. + // java.util.Properties props = cluster.getProperties(); + // + // // get service type of load balanced cluster + // String loadBalancedServiceType = props.getProperty(org.apache.stratos.messaging.util.Constants.LOAD_BALANCED_SERVICE_TYPE); + // + // if (props.containsKey(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF)) { + // String value = props.getProperty(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF); + // + // if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) { + // networkPartitionLbHolder.setDefaultLbClusterId(clusterId); + // + // } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) { + // String serviceName = cluster.getServiceName(); + // // TODO: check if this is correct + // networkPartitionLbHolder.addServiceLB(serviceName, clusterId); + // + // if (loadBalancedServiceType != null && !loadBalancedServiceType.isEmpty()) { + // networkPartitionLbHolder.addServiceLB(loadBalancedServiceType, clusterId); + // if (log.isDebugEnabled()) { + // log.debug("Added cluster id " + clusterId + " as the LB cluster id for service type " + loadBalancedServiceType); + // } + // } + // } + // } + + + networkPartitionContextMap.put(networkPartitionId, clusterLevelNetworkPartitionContext); + } + + return new VMClusterContext(clusterId, cluster.getServiceName(), autoscalePolicy, + deploymentPolicy, networkPartitionContextMap); + } + */ + public static KubernetesClusterContext getKubernetesClusterContext(String instanceId, + Cluster cluster) + throws PolicyValidationException { if (null == cluster) { return null; @@ -343,7 +258,7 @@ public class ClusterContextFactory { } java.util.Properties properties = cluster.getProperties(); - if(properties == null) { + if (properties == null) { String message = String.format("Properties not found in kubernetes cluster: [cluster-id] %s", cluster.getClusterId()); log.error(message); @@ -363,7 +278,7 @@ public class ClusterContextFactory { String kubernetesHostClusterID = properties.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID); KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID, - cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy, minReplicas, maxReplicas); + cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy, minReplicas, maxReplicas); //populate the members after restarting for (Member member : cluster.getMembers()) { @@ -394,6 +309,7 @@ public class ClusterContextFactory { } kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId)); + kubernetesClusterCtxt.setInstanceId(instanceId); if (log.isInfoEnabled()) { log.info(String.format("Member stat context has been added: [member] %s", memberId)); } http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java index d2be1c8..3e93e69 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java @@ -21,8 +21,13 @@ package org.apache.stratos.autoscaler.context.cluster; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; +import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage; +import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption; +import org.apache.stratos.autoscaler.pojo.policy.autoscale.RequestsInFlight; +import org.apache.stratos.cloud.controller.domain.xsd.Partition; import org.apache.stratos.messaging.domain.topology.Member; +import java.util.HashMap; import java.util.Map; /* @@ -31,14 +36,60 @@ import java.util.Map; public class ClusterInstanceContext { private static final Log log = LogFactory.getLog(ClusterInstanceContext.class); - private final String clusterInstanceId; + private final String id; + + + + private final String partitionAlgorithm; + + //boolean values to keep whether the requests in flight parameters are reset or not + private boolean rifReset = false, averageRifReset = false, gradientRifReset = false, secondDerivativeRifRest = false; + //boolean values to keep whether the memory consumption parameters are reset or not + private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false, + gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false; + //boolean values to keep whether the load average parameters are reset or not + private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, + secondDerivativeLoadAverageRest = false; + //boolean values to keep whether average requests served per instance parameters are reset or not + private boolean averageRequestServedPerInstanceReset = false; + + + + //Following information will keep events details + private RequestsInFlight requestsInFlight; + private MemoryConsumption memoryConsumption; + private LoadAverage loadAverage; + + private int scaleDownRequestsCount = 0; + private float averageRequestsServedPerInstance; + private float requestsServedPerInstance; + + private int minInstanceCount = 0, maxInstanceCount = 0; + private int requiredInstanceCountBasedOnStats; + private int requiredInstanceCountBasedOnDependencies; + + + //details required for partition selection algorithms + private int currentPartitionIndex; // Map<PartitionId, Partition Context> protected Map<String, ClusterLevelPartitionContext> partitionCtxts; - public ClusterInstanceContext(String clusterInstanceId, String serviceId, - Map<String, ClusterLevelPartitionContext> partitionCtxts) { + public ClusterInstanceContext(String clusterInstanceId, String partitionAlgo, Partition[] partitions) { + + this.id = clusterInstanceId; + partitionCtxts = new HashMap<String, ClusterLevelPartitionContext>(); + this.partitionAlgorithm = partitionAlgo; + //partitionCtxts = new HashMap<String, ClusterLevelPartitionContext>(); + requestsInFlight = new RequestsInFlight(); + loadAverage = new LoadAverage(); + memoryConsumption = new MemoryConsumption(); + for (Partition partition : partitions) { + minInstanceCount += partition.getPartitionMin(); + maxInstanceCount += partition.getPartitionMax(); + } + requiredInstanceCountBasedOnStats = minInstanceCount; + requiredInstanceCountBasedOnDependencies = minInstanceCount; - this.clusterInstanceId = clusterInstanceId; } @@ -77,7 +128,297 @@ public class ClusterInstanceContext { return null; } - public String getClusterInstanceId() { - return clusterInstanceId; + + public int getMinInstanceCount() { + return minInstanceCount; + } + + public void setMinInstanceCount(int minInstanceCount) { + this.minInstanceCount = minInstanceCount; + } + + public int getMaxInstanceCount() { + return maxInstanceCount; + } + + public void setMaxInstanceCount(int maxInstanceCount) { + this.maxInstanceCount = maxInstanceCount; + } + + @Override + public String toString() { + return "NetworkPartitionContext [id=" + id + "partitionAlgorithm=" + partitionAlgorithm + ", minInstanceCount=" + + minInstanceCount + ", maxInstanceCount=" + maxInstanceCount + "]"; + } + + public int getCurrentPartitionIndex() { + return currentPartitionIndex; + } + + public void setCurrentPartitionIndex(int currentPartitionIndex) { + this.currentPartitionIndex = currentPartitionIndex; + } + + public float getAverageRequestsServedPerInstance() { + return averageRequestsServedPerInstance; + } + + public void setAverageRequestsServedPerInstance(float averageRequestServedPerInstance) { + this.averageRequestsServedPerInstance = averageRequestServedPerInstance; + averageRequestServedPerInstanceReset = true; + + if (log.isDebugEnabled()) { + log.debug(String.format("Average Requesets Served Per Instance stats are reset, ready to do scale check [network partition] %s" + , this.id)); + + } + } + + public float getRequestsServedPerInstance() { + return requestsServedPerInstance; + } + + public float getAverageRequestsInFlight() { + return requestsInFlight.getAverage(); + } + + public void setAverageRequestsInFlight(float averageRequestsInFlight) { + requestsInFlight.setAverage(averageRequestsInFlight); + averageRifReset = true; + if (secondDerivativeRifRest && gradientRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getRequestsInFlightSecondDerivative() { + return requestsInFlight.getSecondDerivative(); + } + + public void setRequestsInFlightSecondDerivative(float requestsInFlightSecondDerivative) { + requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative); + secondDerivativeRifRest = true; + if (averageRifReset && gradientRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getRequestsInFlightGradient() { + return requestsInFlight.getGradient(); + } + + public void setRequestsInFlightGradient(float requestsInFlightGradient) { + requestsInFlight.setGradient(requestsInFlightGradient); + gradientRifReset = true; + if (secondDerivativeRifRest && averageRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public boolean isRifReset() { + return rifReset; + } + + public void setRifReset(boolean rifReset) { + this.rifReset = rifReset; + this.averageRifReset = rifReset; + this.gradientRifReset = rifReset; + this.secondDerivativeRifRest = rifReset; + } + + + public float getAverageMemoryConsumption() { + return memoryConsumption.getAverage(); + } + + public void setAverageMemoryConsumption(float averageMemoryConsumption) { + memoryConsumption.setAverage(averageMemoryConsumption); + averageMemoryConsumptionReset = true; + if (secondDerivativeMemoryConsumptionRest && gradientMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } } + + public float getMemoryConsumptionSecondDerivative() { + return memoryConsumption.getSecondDerivative(); + } + + public void setMemoryConsumptionSecondDerivative(float memoryConsumptionSecondDerivative) { + memoryConsumption.setSecondDerivative(memoryConsumptionSecondDerivative); + secondDerivativeMemoryConsumptionRest = true; + if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getMemoryConsumptionGradient() { + return memoryConsumption.getGradient(); + } + + public void setMemoryConsumptionGradient(float memoryConsumptionGradient) { + memoryConsumption.setGradient(memoryConsumptionGradient); + gradientMemoryConsumptionReset = true; + if (secondDerivativeMemoryConsumptionRest && averageMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public boolean isMemoryConsumptionReset() { + return memoryConsumptionReset; + } + + public void setMemoryConsumptionReset(boolean memoryConsumptionReset) { + this.memoryConsumptionReset = memoryConsumptionReset; + this.averageMemoryConsumptionReset = memoryConsumptionReset; + this.gradientMemoryConsumptionReset = memoryConsumptionReset; + this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset; + } + + + public float getAverageLoadAverage() { + return loadAverage.getAverage(); + } + + public void setAverageLoadAverage(float averageLoadAverage) { + loadAverage.setAverage(averageLoadAverage); + averageLoadAverageReset = true; + if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getLoadAverageSecondDerivative() { + return loadAverage.getSecondDerivative(); + } + + public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) { + loadAverage.setSecondDerivative(loadAverageSecondDerivative); + secondDerivativeLoadAverageRest = true; + if (averageLoadAverageReset && gradientLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getLoadAverageGradient() { + return loadAverage.getGradient(); + } + + public void setLoadAverageGradient(float loadAverageGradient) { + loadAverage.setGradient(loadAverageGradient); + gradientLoadAverageReset = true; + if (secondDerivativeLoadAverageRest && averageLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public boolean isLoadAverageReset() { + return loadAverageReset; + } + + public void setLoadAverageReset(boolean loadAverageReset) { + this.loadAverageReset = loadAverageReset; + this.averageLoadAverageReset = loadAverageReset; + this.gradientLoadAverageReset = loadAverageReset; + this.secondDerivativeLoadAverageRest = loadAverageReset; + } + + + public String getId() { + return id; + } + + /* public Map<String, ClusterLevelPartitionContext> getPartitionCtxts() { + return partitionCtxts; + } + + public ClusterLevelPartitionContext getPartitionCtxt(String partitionId) { + return partitionCtxts.get(partitionId); + } + + public void addPartitionContext(ClusterLevelPartitionContext partitionContext) { + partitionCtxts.put(partitionContext.getPartitionId(), partitionContext); + }*/ + + public String getPartitionAlgorithm() { + return partitionAlgorithm; + } + + /*public int getNonTerminatedMemberCountOfPartition(String partitionId) { + if (partitionCtxts.containsKey(partitionId)) { + return getPartitionCtxt(partitionId).getNonTerminatedMemberCount(); + } + return 0; + } + + public int getActiveMemberCount(String currentPartitionId) { + if (partitionCtxts.containsKey(currentPartitionId)) { + return getPartitionCtxt(currentPartitionId).getActiveMemberCount(); + } + return 0; + } +*/ + public int getScaleDownRequestsCount() { + return scaleDownRequestsCount; + } + + public void resetScaleDownRequestsCount() { + this.scaleDownRequestsCount = 0; + } + + public void increaseScaleDownRequestsCount() { + this.scaleDownRequestsCount += 1; + } + + public float getRequiredInstanceCountBasedOnStats() { + return requiredInstanceCountBasedOnStats; + } + + public void setRequiredInstanceCountBasedOnStats(int requiredInstanceCountBasedOnStats) { + this.requiredInstanceCountBasedOnStats = requiredInstanceCountBasedOnStats; + } + + public int getRequiredInstanceCountBasedOnDependencies() { + return requiredInstanceCountBasedOnDependencies; + } + + public void setRequiredInstanceCountBasedOnDependencies(int requiredInstanceCountBasedOnDependencies) { + this.requiredInstanceCountBasedOnDependencies = requiredInstanceCountBasedOnDependencies; + } + + } http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java index 040c4ea..44517dd 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/KubernetesClusterContext.java @@ -46,6 +46,8 @@ public class KubernetesClusterContext extends AbstractClusterContext { private static final long serialVersionUID = 808741789615481596L; private static final Log log = LogFactory.getLog(KubernetesClusterContext.class); + private String instanceId; + private String kubernetesClusterId; private String serviceName; @@ -395,6 +397,14 @@ public class KubernetesClusterContext extends AbstractClusterContext { return null; } + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + private class PendingMemberWatcher implements Runnable { private KubernetesClusterContext ctxt; http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java index fb2de33..2d7b1e1 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMClusterContext.java @@ -20,12 +20,28 @@ package org.apache.stratos.autoscaler.context.cluster; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; +import org.apache.stratos.autoscaler.client.CloudControllerClient; +import org.apache.stratos.autoscaler.context.member.MemberStatsContext; +import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; +import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException; +import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; +import org.apache.stratos.autoscaler.pojo.policy.PolicyManager; import org.apache.stratos.autoscaler.pojo.policy.autoscale.AutoscalePolicy; import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy; +import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.ChildLevelNetworkPartition; +import org.apache.stratos.cloud.controller.domain.xsd.MemberContext; +import org.apache.stratos.cloud.controller.domain.xsd.Partition; +import org.apache.stratos.cloud.controller.stub.pojo.*; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.instance.ClusterInstance; +import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.MemberStatus; import java.util.*; +import java.util.Properties; /* * It holds the runtime data of a VM cluster @@ -39,12 +55,12 @@ public class VMClusterContext extends AbstractClusterContext { protected DeploymentPolicy deploymentPolicy; protected AutoscalePolicy autoscalePolicy; - public VMClusterContext(String clusterId, String serviceId, AutoscalePolicy autoscalePolicy, DeploymentPolicy deploymentPolicy, - Map<String, ClusterLevelNetworkPartitionContext> networkPartitionCtxts) { + public VMClusterContext(String clusterId, String serviceId, AutoscalePolicy autoscalePolicy, + DeploymentPolicy deploymentPolicy) { super(clusterId, serviceId); this.deploymentPolicy = deploymentPolicy; - this.networkPartitionCtxts = networkPartitionCtxts; + this.networkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>(); this.autoscalePolicy = autoscalePolicy; } @@ -101,4 +117,212 @@ public class VMClusterContext extends AbstractClusterContext { return null; } + public void addInstanceContext(String instanceId, Cluster cluster) + throws PolicyValidationException, PartitionValidationException { + ClusterLevelNetworkPartitionContext networkPartitionContext = null; + ClusterInstance clusterInstance = cluster.getInstanceContexts(instanceId); + if(networkPartitionCtxts.containsKey(clusterInstance.getNetworkPartitionId())) { + networkPartitionContext = this.networkPartitionCtxts.get( + clusterInstance.getNetworkPartitionId()); + } + if (clusterInstance.getPartitionId() != null) { + //Need to add partition Context based on the given one from the parent + networkPartitionContext = addPartition(clusterInstance, cluster, + this.deploymentPolicy, networkPartitionContext ); + + } else { + networkPartitionContext = parseDeploymentPolicy(clusterInstance, cluster, + this.deploymentPolicy, networkPartitionContext); + } + if(!networkPartitionCtxts.containsKey(clusterInstance.getNetworkPartitionId())) { + this.networkPartitionCtxts.put(clusterInstance.getNetworkPartitionId(), + networkPartitionContext); + if (log.isInfoEnabled()) { + log.info(String.format("Network partition context has been added: " + + "[network partition] %s", clusterInstance.getNetworkPartitionId())); + } + } + + } + + private ClusterLevelNetworkPartitionContext parseDeploymentPolicy( + ClusterInstance instance, + Cluster cluster, + DeploymentPolicy deploymentPolicy, + ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext) + throws PolicyValidationException, PartitionValidationException { + if (log.isDebugEnabled()) { + log.debug("Deployment policy name: " + deploymentPolicy.getId()); + } + + if (deploymentPolicy == null) { + String msg = "Deployment policy is null: [policy-name] " + deploymentPolicy.getId(); + log.error(msg); + throw new PolicyValidationException(msg); + } + + Partition[] allPartitions = deploymentPolicy.getAllPartitions(); + if (allPartitions == null) { + String msg = + "Partitions are null in deployment policy: [policy-name]: " + + deploymentPolicy.getId(); + log.error(msg); + throw new PolicyValidationException(msg); + } + + CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), + deploymentPolicy); + + ChildLevelNetworkPartition networkPartition; + networkPartition = deploymentPolicy.getChildLevelNetworkPartition(instance.getNetworkPartitionId()); + String networkPartitionId = networkPartition.getId(); + + if(clusterLevelNetworkPartitionContext == null) { + clusterLevelNetworkPartitionContext = new ClusterLevelNetworkPartitionContext( + networkPartitionId); + } + ClusterInstanceContext clusterInstanceContext = clusterLevelNetworkPartitionContext. + getClusterInstanceContext(instance.getInstanceId()); + if (clusterInstanceContext == null) { + clusterInstanceContext = new ClusterInstanceContext(instance.getInstanceId(), + networkPartition.getPartitionAlgo(), + networkPartition.getPartitions()); + } + + + for (Partition partition : networkPartition.getPartitions()) { + ClusterLevelPartitionContext clusterLevelPartitionContext = + new ClusterLevelPartitionContext(partition); + clusterLevelPartitionContext.setServiceName(cluster.getServiceName()); + clusterLevelPartitionContext.setProperties(cluster.getProperties()); + clusterLevelPartitionContext.setNetworkPartitionId(networkPartition.getId()); + //add members to partition Context + addMembersFromTopology(cluster, partition, clusterLevelPartitionContext); + + clusterInstanceContext.addPartitionCtxt(clusterLevelPartitionContext); + if (log.isInfoEnabled()) { + log.info(String.format("Partition context has been added: [partition] %s", + clusterLevelPartitionContext.getPartitionId())); + } + } + + if (log.isInfoEnabled()) { + log.info(String.format("Network partition context has been added: " + + "[network partition] %s", clusterLevelNetworkPartitionContext.getId())); + } + return clusterLevelNetworkPartitionContext; + } + + private ClusterLevelNetworkPartitionContext addPartition( + ClusterInstance clusterInstance, + Cluster cluster, + DeploymentPolicy deploymentPolicy, + ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext) + throws PolicyValidationException, PartitionValidationException { + + ChildLevelNetworkPartition networkPartition = deploymentPolicy. + getChildLevelNetworkPartition(clusterInstance.getNetworkPartitionId()); + if (networkPartition == null) { + String msg = + "Network Partition is null in deployment policy: [policy-name]: " + + deploymentPolicy.getId(); + log.error(msg); + throw new PolicyValidationException(msg); + } + Partition partition = networkPartition.getPartition(clusterInstance.getPartitionId()); + if (partition == null) { + String msg = + "[Partition] " + clusterInstance.getPartitionId() + " for [networkPartition] " + + clusterInstance.getNetworkPartitionId() + "is null " + + "in deployment policy: [policy-name]: " + deploymentPolicy.getId(); + log.error(msg); + throw new PolicyValidationException(msg); + } + CloudControllerClient.getInstance().validatePartition(partition); + if(clusterLevelNetworkPartitionContext == null) { + clusterLevelNetworkPartitionContext = + new ClusterLevelNetworkPartitionContext(clusterInstance.getNetworkPartitionId()); + } + + ClusterLevelPartitionContext clusterLevelPartitionContext = + new ClusterLevelPartitionContext(partition); + clusterLevelPartitionContext.setServiceName(cluster.getServiceName()); + clusterLevelPartitionContext.setProperties(cluster.getProperties()); + clusterLevelPartitionContext.setNetworkPartitionId(networkPartition.getId()); + //add members to partition Context + addMembersFromTopology(cluster, partition, clusterLevelPartitionContext); + + ClusterInstanceContext clusterInstanceContext = clusterLevelNetworkPartitionContext. + getClusterInstanceContext(clusterInstance.getInstanceId()); + if (clusterInstanceContext == null) { + clusterInstanceContext = new ClusterInstanceContext(clusterInstance.getInstanceId(), + networkPartition.getPartitionAlgo(), + networkPartition.getPartitions()); + } + clusterInstanceContext.addPartitionCtxt(clusterLevelPartitionContext); + clusterLevelNetworkPartitionContext.addClusterInstanceContext(clusterInstanceContext); + + if (log.isInfoEnabled()) { + log.info(String.format("Partition context has been added: [partition] %s", + clusterLevelPartitionContext.getPartitionId())); + } + + + return clusterLevelNetworkPartitionContext; + } + + private void addMembersFromTopology(Cluster cluster, Partition partition, + ClusterLevelPartitionContext clusterLevelPartitionContext) { + for (Member member : cluster.getMembers()) { + String memberId = member.getMemberId(); + if (member.getPartitionId().equalsIgnoreCase(partition.getId())) { + MemberContext memberContext = new MemberContext(); + memberContext.setClusterId(member.getClusterId()); + memberContext.setMemberId(memberId); + memberContext.setInitTime(member.getInitTime()); + memberContext.setPartition(partition); + memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties())); + + if (MemberStatus.Activated.equals(member.getStatus())) { + if (log.isDebugEnabled()) { + String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); + log.debug(msg); + } + clusterLevelPartitionContext.addActiveMember(memberContext); +// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); +// partitionContext.incrementCurrentActiveMemberCount(1); + + } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) { + if (log.isDebugEnabled()) { + String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); + log.debug(msg); + } + clusterLevelPartitionContext.addPendingMember(memberContext); + +// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); + } else if (MemberStatus.Suspended.equals(member.getStatus())) { +// partitionContext.addFaultyMember(memberId); + } + clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been added: [member] %s", memberId)); + } + } + + } + } + + private org.apache.stratos.cloud.controller.stub.pojo.Properties convertMemberPropsToMemberContextProps( + java.util.Properties properties) { + org.apache.stratos.cloud.controller.stub.pojo.Properties props = new org.apache.stratos.cloud.controller.stub.pojo.Properties(); + for (Map.Entry<Object, Object> e : properties.entrySet()) { + Property property = new Property(); + property.setName((String) e.getKey()); + property.setValue((String) e.getValue()); + props.addProperties(property); + } + return props; + } + + } http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMServiceClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMServiceClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMServiceClusterContext.java index 35c704d..9e982cf 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMServiceClusterContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/VMServiceClusterContext.java @@ -35,10 +35,10 @@ public class VMServiceClusterContext extends VMClusterContext { protected AutoscalePolicy autoscalePolicy; - public VMServiceClusterContext(String clusterId, String serviceId, AutoscalePolicy autoscalePolicy, DeploymentPolicy deploymentPolicy, - Map<String, ClusterLevelNetworkPartitionContext> networkPartitionCtxts) { + public VMServiceClusterContext(String clusterId, String serviceId, AutoscalePolicy autoscalePolicy, + DeploymentPolicy deploymentPolicy) { - super(clusterId, serviceId, autoscalePolicy, deploymentPolicy, networkPartitionCtxts); + super(clusterId, serviceId, autoscalePolicy, deploymentPolicy); this.autoscalePolicy = autoscalePolicy; }
