Repository: stratos Updated Branches: refs/heads/master 283d21d95 -> b098d8a32
Make cluster monitors to use Cluster as their constructor argument. Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c1ad7a52 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c1ad7a52 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c1ad7a52 Branch: refs/heads/master Commit: c1ad7a52b0fc54f640d7db476ac7ddaaf15bbacc Parents: 93e7500 Author: Nirmal Fernando <[email protected]> Authored: Fri Dec 5 23:04:00 2014 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Sun Dec 7 17:59:31 2014 +0530 ---------------------------------------------------------------------- .../monitor/cluster/AbstractClusterMonitor.java | 18 +- .../monitor/cluster/ClusterMonitorFactory.java | 93 +- .../cluster/KubernetesClusterMonitor.java | 847 +++++++++---------- .../KubernetesServiceClusterMonitor.java | 322 +++---- .../monitor/cluster/VMClusterMonitor.java | 4 +- .../autoscaler/rule/RuleTasksDelegator.java | 66 +- .../messaging/domain/topology/Cluster.java | 25 + .../modules/distribution/container-mincheck.drl | 54 ++ 8 files changed, 776 insertions(+), 653 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/c1ad7a52/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java index 19905b1..edfe063 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java @@ -35,6 +35,7 @@ import org.apache.stratos.messaging.domain.applications.ApplicationStatus; import org.apache.stratos.messaging.domain.applications.Group; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.instance.ClusterInstance; +import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.event.health.stat.*; import org.apache.stratos.messaging.event.topology.*; @@ -68,16 +69,17 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable protected String serviceType; private AtomicBoolean monitoringStarted; private String clusterId; + private Cluster cluster; private ClusterStatus status; private int monitoringIntervalMilliseconds; private boolean isDestroyed; - protected AbstractClusterMonitor(String serviceType, String clusterId) { + protected AbstractClusterMonitor(Cluster cluster) { super(); - this.serviceType = serviceType; - this.clusterId = clusterId; - this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; + this.setCluster(new Cluster(cluster)); + this.serviceType = cluster.getServiceName(); + this.clusterId = cluster.getClusterId(); this.monitoringStarted = new AtomicBoolean(false); //this.clusterContext = abstractClusterContext; //this.instanceIdToClusterContextMap = new HashMap<String, AbstractClusterContext>(); @@ -407,6 +409,14 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable public void setClusterContext(AbstractClusterContext clusterContext) { this.clusterContext = clusterContext; } + + public Cluster getCluster() { + return cluster; + } + + public void setCluster(Cluster cluster) { + this.cluster = cluster; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c1ad7a52/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java index 618c15a..cba5357 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java @@ -21,6 +21,7 @@ package org.apache.stratos.autoscaler.monitor.cluster; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException; import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; @@ -48,8 +49,8 @@ public class ClusterMonitorFactory { AbstractClusterMonitor clusterMonitor; // if (cluster.isKubernetesCluster()) { // clusterMonitor = getDockerServiceClusterMonitor(cluster); -//// } else if (cluster.isLbCluster()) { -//// clusterMonitor = getVMLbClusterMonitor(cluster); +////// } else if (cluster.isLbCluster()) { +////// clusterMonitor = getVMLbClusterMonitor(cluster); // } else { clusterMonitor = getVMClusterMonitor(cluster); // } @@ -64,7 +65,7 @@ public class ClusterMonitorFactory { return null; } - VMClusterMonitor clusterMonitor = new VMClusterMonitor(cluster.getServiceName(), cluster.getClusterId()); + VMClusterMonitor clusterMonitor = new VMClusterMonitor(cluster); // find lb reference type java.util.Properties props = cluster.getProperties(); @@ -107,7 +108,7 @@ public class ClusterMonitorFactory { * @param cluster - the cluster which needs to be monitored * @return - the cluster monitor */ - private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) + private static KubernetesClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) throws PolicyValidationException { if (null == cluster) { @@ -155,50 +156,50 @@ public class ClusterMonitorFactory { // cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy, minReplicas, maxReplicas); - KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor(cluster.getServiceName(), cluster.getClusterId()); + KubernetesClusterMonitor dockerClusterMonitor = new KubernetesClusterMonitor(cluster); //populate the members after restarting - for (Member member : cluster.getMembers()) { - String memberId = member.getMemberId(); - String clusterId = member.getClusterId(); - MemberContext memberContext = new MemberContext(); - memberContext.setMemberId(memberId); - memberContext.setClusterId(clusterId); - memberContext.setInitTime(member.getInitTime()); - - // if there is at least one member in the topology, that means service has been created already - // this is to avoid calling startContainer() method again - //kubernetesClusterCtxt.setServiceClusterCreated(true); - - if (MemberStatus.Activated.equals(member.getStatus())) { - if (log.isDebugEnabled()) { - String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); - log.debug(msg); - } - dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext); - } else if (MemberStatus.Created.equals(member.getStatus()) - || MemberStatus.Starting.equals(member.getStatus())) { - if (log.isDebugEnabled()) { - String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); - log.debug(msg); - } - dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext); - } - - //kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been added: [member] %s", memberId)); - } - } - - // find lb reference type - if (properties.containsKey(StratosConstants.LOAD_BALANCER_REF)) { - String value = properties.getProperty(StratosConstants.LOAD_BALANCER_REF); - dockerClusterMonitor.setLbReferenceType(value); - if (log.isDebugEnabled()) { - log.debug("Set the lb reference type: " + value); - } - } +// for (Member member : cluster.getMembers()) { +// String memberId = member.getMemberId(); +// String clusterId = member.getClusterId(); +// MemberContext memberContext = new MemberContext(); +// memberContext.setMemberId(memberId); +// memberContext.setClusterId(clusterId); +// memberContext.setInitTime(member.getInitTime()); +// +// // if there is at least one member in the topology, that means service has been created already +// // this is to avoid calling startContainer() method again +// //kubernetesClusterCtxt.setServiceClusterCreated(true); +// +// if (MemberStatus.Activated.equals(member.getStatus())) { +// if (log.isDebugEnabled()) { +// String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); +// log.debug(msg); +// } +// ((VMClusterContext) dockerClusterMonitor.getClusterContext()).addActiveMember(memberContext); +// } else if (MemberStatus.Created.equals(member.getStatus()) +// || MemberStatus.Starting.equals(member.getStatus())) { +// if (log.isDebugEnabled()) { +// String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); +// log.debug(msg); +// } +// dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext); +// } +// +// //kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId)); +// if (log.isInfoEnabled()) { +// log.info(String.format("Member stat context has been added: [member] %s", memberId)); +// } +// } +// +// // find lb reference type +// if (properties.containsKey(StratosConstants.LOAD_BALANCER_REF)) { +// String value = properties.getProperty(StratosConstants.LOAD_BALANCER_REF); +// dockerClusterMonitor.setLbReferenceType(value); +// if (log.isDebugEnabled()) { +// log.debug("Set the lb reference type: " + value); +// } +// } log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString()); return dockerClusterMonitor; http://git-wip-us.apache.org/repos/asf/stratos/blob/c1ad7a52/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java index b5d6da3..c2b0f31 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesClusterMonitor.java @@ -58,16 +58,15 @@ import org.drools.runtime.StatefulKnowledgeSession; /* * Every kubernetes cluster monitor should extend this class */ -public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor { +public class KubernetesClusterMonitor extends VMClusterMonitor { private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class); - private StatefulKnowledgeSession dependentScaleCheckKnowledgeSession; +// private StatefulKnowledgeSession dependentScaleCheckKnowledgeSession; - protected KubernetesClusterMonitor(String serviceType, String clusterId, - AutoscalerRuleEvaluator autoscalerRuleEvaluator) { + protected KubernetesClusterMonitor(Cluster cluster) { - super(serviceType, clusterId); + super(cluster); autoscalerRuleEvaluator = new AutoscalerRuleEvaluator(); autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.CONTAINER_OBSOLETE_CHECK_DROOL_FILE); @@ -87,431 +86,431 @@ public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor { //this.kubernetesClusterCtxt = kubernetesClusterContext; } - @Override - public void handleAverageLoadAverageEvent( - AverageLoadAverageEvent averageLoadAverageEvent) { - - String clusterId = averageLoadAverageEvent.getClusterId(); - float value = averageLoadAverageEvent.getValue(); - if (log.isDebugEnabled()) { - log.debug(String.format("Avg load avg event: [cluster] %s [value] %s", - clusterId, value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setAverageLoadAverage(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - - } - - @Override - public void handleGradientOfLoadAverageEvent( - GradientOfLoadAverageEvent gradientOfLoadAverageEvent) { - - String clusterId = gradientOfLoadAverageEvent.getClusterId(); - float value = gradientOfLoadAverageEvent.getValue(); - if (log.isDebugEnabled()) { - log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s", - clusterId, value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setLoadAverageGradient(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - } - - @Override - public void handleSecondDerivativeOfLoadAverageEvent( - SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) { - - String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId(); - float value = secondDerivativeOfLoadAverageEvent.getValue(); - if (log.isDebugEnabled()) { - log.debug(String.format("Second Derivation of load avg event: [cluster] %s " - + "[value] %s", clusterId, value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setLoadAverageSecondDerivative(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - } - - @Override - public void handleAverageMemoryConsumptionEvent( - AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) { - - String clusterId = averageMemoryConsumptionEvent.getClusterId(); - float value = averageMemoryConsumptionEvent.getValue(); - if (log.isDebugEnabled()) { - log.debug(String.format("Avg Memory Consumption event: [cluster] %s " - + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setAverageMemoryConsumption(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - } - - @Override - public void handleGradientOfMemoryConsumptionEvent( - GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) { - - String clusterId = gradientOfMemoryConsumptionEvent.getClusterId(); - float value = gradientOfMemoryConsumptionEvent.getValue(); - if (log.isDebugEnabled()) { - log.debug(String.format("Grad of Memory Consumption event: [cluster] %s " - + "[value] %s", clusterId, value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setMemoryConsumptionGradient(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - } - - @Override - public void handleSecondDerivativeOfMemoryConsumptionEvent( - SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) { - - String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId(); - float value = secondDerivativeOfMemoryConsumptionEvent.getValue(); - if (log.isDebugEnabled()) { - log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s " - + "[value] %s", clusterId, value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - } - - @Override - public void handleAverageRequestsInFlightEvent( - AverageRequestsInFlightEvent averageRequestsInFlightEvent) { - - float value = averageRequestsInFlightEvent.getValue(); - String clusterId = averageRequestsInFlightEvent.getClusterId(); - if (log.isDebugEnabled()) { - log.debug(String.format("Average Rif event: [cluster] %s [value] %s", - clusterId, value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setAverageRequestsInFlight(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - } - - @Override - public void handleGradientOfRequestsInFlightEvent( - GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) { - - String clusterId = gradientOfRequestsInFlightEvent.getClusterId(); - float value = gradientOfRequestsInFlightEvent.getValue(); - if (log.isDebugEnabled()) { - log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s", - clusterId, value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setRequestsInFlightGradient(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - } - - @Override - public void handleSecondDerivativeOfRequestsInFlightEvent( - SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) { - - String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId(); - float value = secondDerivativeOfRequestsInFlightEvent.getValue(); - if (log.isDebugEnabled()) { - log.debug(String.format("Second derivative of Rif event: [cluster] %s " - + "[value] %s", clusterId, value)); - } - KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); - if (null != kubernetesClusterContext) { - kubernetesClusterContext.setRequestsInFlightSecondDerivative(value); - } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Kubernetes cluster context is not available for :" + - " [cluster] %s", clusterId)); - } - } - } - - @Override - public void handleMemberAverageMemoryConsumptionEvent( - MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) { - - String memberId = memberAverageMemoryConsumptionEvent.getMemberId(); - KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); - MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); - if (null == memberStatsContext) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member context is not available for : [member] %s", memberId)); - } - return; - } - float value = memberAverageMemoryConsumptionEvent.getValue(); - memberStatsContext.setAverageMemoryConsumption(value); - } - - @Override - public void handleMemberGradientOfMemoryConsumptionEvent( - MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) { - - String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId(); - KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); - MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); - if (null == memberStatsContext) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member context is not available for : [member] %s", memberId)); - } - return; - } - float value = memberGradientOfMemoryConsumptionEvent.getValue(); - memberStatsContext.setGradientOfMemoryConsumption(value); - } - - @Override - public void handleMemberSecondDerivativeOfMemoryConsumptionEvent( - MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) { - - } - - @Override - public void handleMemberAverageLoadAverageEvent( - MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) { - - KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); - String memberId = memberAverageLoadAverageEvent.getMemberId(); - float value = memberAverageLoadAverageEvent.getValue(); - MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); - if (null == memberStatsContext) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member context is not available for : [member] %s", memberId)); - } - return; - } - memberStatsContext.setAverageLoadAverage(value); - } - - @Override - public void handleMemberGradientOfLoadAverageEvent( - MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) { - - String memberId = memberGradientOfLoadAverageEvent.getMemberId(); - KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); - MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); - if (null == memberStatsContext) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member context is not available for : [member] %s", memberId)); - } - return; - } - float value = memberGradientOfLoadAverageEvent.getValue(); - memberStatsContext.setGradientOfLoadAverage(value); - } - - @Override - public void handleMemberSecondDerivativeOfLoadAverageEvent( - MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) { - - String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId(); - KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); - MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); - if (null == memberStatsContext) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member context is not available for : [member] %s", memberId)); - } - return; - } - float value = memberSecondDerivativeOfLoadAverageEvent.getValue(); - memberStatsContext.setSecondDerivativeOfLoadAverage(value); - } - - @Override - public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) { - // kill the container - String memberId = memberFaultEvent.getMemberId(); - Member member = getMemberByMemberId(memberId); - if (null == member) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); - } - return; - } - if (!member.isActive()) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member activated event has not received for the member %s. " - + "Therefore ignoring" + " the member fault health stat", memberId)); - } - return; - } - - if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("Could not find the active member in kubernetes cluster context, " - + "[member] %s ", memberId)); - } - return; - } - - // move member to obsolete list - getKubernetesClusterCtxt().moveMemberToObsoleteList(memberId); - if (log.isInfoEnabled()) { - String clusterId = memberFaultEvent.getClusterId(); - String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); - log.info(String.format("Faulty member is moved to obsolete list and removed from the active members list: " - + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId)); - } - } - - @Override - public void handleMemberStartedEvent( - MemberStartedEvent memberStartedEvent) { - - } - - @Override - public void handleMemberActivatedEvent( - MemberActivatedEvent memberActivatedEvent) { - - KubernetesClusterContext kubernetesClusterContext; - kubernetesClusterContext = getKubernetesClusterCtxt(); - String memberId = memberActivatedEvent.getMemberId(); - kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been added successfully: " - + "[member] %s", memberId)); - } - kubernetesClusterContext.movePendingMemberToActiveMembers(memberId); - } - - @Override - public void handleMemberMaintenanceModeEvent( - MemberMaintenanceModeEvent maintenanceModeEvent) { - - // no need to do anything here - // we will not be receiving this event for containers - // we will only receive member terminated event - } - - @Override - public void handleMemberReadyToShutdownEvent( - MemberReadyToShutdownEvent memberReadyToShutdownEvent) { - - // no need to do anything here - // we will not be receiving this event for containers - // we will only receive member terminated event - } - - @Override - public void handleMemberTerminatedEvent( - MemberTerminatedEvent memberTerminatedEvent) { - - String memberId = memberTerminatedEvent.getMemberId(); - if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is removed from termination pending members list: " - + "[member] %s", memberId)); - } - } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is removed from pending members list: " - + "[member] %s", memberId)); - } - } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) { - log.warn(String.format("Member is in the wrong list and it is removed from " - + "active members list: %s", memberId)); - } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) { - log.warn(String.format("Obsolete member has either been terminated or its obsolete time out has expired and" - + " it is removed from obsolete members list: %s", memberId)); - } else { - log.warn(String.format("Member is not available in any of the list active, " - + "pending and termination pending: %s", memberId)); - } - - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been removed successfully: " - + "[member] %s", memberId)); - } - } - - @Override - public void handleClusterRemovedEvent( - ClusterRemovedEvent clusterRemovedEvent) { - getKubernetesClusterCtxt().getPendingMembers().clear(); - getKubernetesClusterCtxt().getActiveMembers().clear(); - getKubernetesClusterCtxt().getTerminationPendingMembers().clear(); - getKubernetesClusterCtxt().getObsoletedMembers().clear(); - } - - public KubernetesClusterContext getKubernetesClusterCtxt() { - return (KubernetesClusterContext) getClusterContext(); - } - - private Member getMemberByMemberId(String memberId) { - try { - TopologyManager.acquireReadLock(); - for (Service service : TopologyManager.getTopology().getServices()) { - for (Cluster cluster : service.getClusters()) { - if (cluster.memberExists(memberId)) { - return cluster.getMember(memberId); - } - } - } - return null; - } finally { - TopologyManager.releaseReadLock(); - } - } +// @Override +// public void handleAverageLoadAverageEvent( +// AverageLoadAverageEvent averageLoadAverageEvent) { +// +// String clusterId = averageLoadAverageEvent.getClusterId(); +// float value = averageLoadAverageEvent.getValue(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Avg load avg event: [cluster] %s [value] %s", +// clusterId, value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setAverageLoadAverage(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// +// } +// +// @Override +// public void handleGradientOfLoadAverageEvent( +// GradientOfLoadAverageEvent gradientOfLoadAverageEvent) { +// +// String clusterId = gradientOfLoadAverageEvent.getClusterId(); +// float value = gradientOfLoadAverageEvent.getValue(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s", +// clusterId, value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setLoadAverageGradient(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// } +// +// @Override +// public void handleSecondDerivativeOfLoadAverageEvent( +// SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) { +// +// String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId(); +// float value = secondDerivativeOfLoadAverageEvent.getValue(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Second Derivation of load avg event: [cluster] %s " +// + "[value] %s", clusterId, value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setLoadAverageSecondDerivative(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// } +// +// @Override +// public void handleAverageMemoryConsumptionEvent( +// AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) { +// +// String clusterId = averageMemoryConsumptionEvent.getClusterId(); +// float value = averageMemoryConsumptionEvent.getValue(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Avg Memory Consumption event: [cluster] %s " +// + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setAverageMemoryConsumption(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// } +// +// @Override +// public void handleGradientOfMemoryConsumptionEvent( +// GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) { +// +// String clusterId = gradientOfMemoryConsumptionEvent.getClusterId(); +// float value = gradientOfMemoryConsumptionEvent.getValue(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Grad of Memory Consumption event: [cluster] %s " +// + "[value] %s", clusterId, value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setMemoryConsumptionGradient(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// } +// +// @Override +// public void handleSecondDerivativeOfMemoryConsumptionEvent( +// SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) { +// +// String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId(); +// float value = secondDerivativeOfMemoryConsumptionEvent.getValue(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s " +// + "[value] %s", clusterId, value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// } +// +// @Override +// public void handleAverageRequestsInFlightEvent( +// AverageRequestsInFlightEvent averageRequestsInFlightEvent) { +// +// float value = averageRequestsInFlightEvent.getValue(); +// String clusterId = averageRequestsInFlightEvent.getClusterId(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Average Rif event: [cluster] %s [value] %s", +// clusterId, value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setAverageRequestsInFlight(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// } +// +// @Override +// public void handleGradientOfRequestsInFlightEvent( +// GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) { +// +// String clusterId = gradientOfRequestsInFlightEvent.getClusterId(); +// float value = gradientOfRequestsInFlightEvent.getValue(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s", +// clusterId, value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setRequestsInFlightGradient(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// } +// +// @Override +// public void handleSecondDerivativeOfRequestsInFlightEvent( +// SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) { +// +// String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId(); +// float value = secondDerivativeOfRequestsInFlightEvent.getValue(); +// if (log.isDebugEnabled()) { +// log.debug(String.format("Second derivative of Rif event: [cluster] %s " +// + "[value] %s", clusterId, value)); +// } +// KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); +// if (null != kubernetesClusterContext) { +// kubernetesClusterContext.setRequestsInFlightSecondDerivative(value); +// } else { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Kubernetes cluster context is not available for :" + +// " [cluster] %s", clusterId)); +// } +// } +// } +// +// @Override +// public void handleMemberAverageMemoryConsumptionEvent( +// MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) { +// +// String memberId = memberAverageMemoryConsumptionEvent.getMemberId(); +// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); +// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); +// if (null == memberStatsContext) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member context is not available for : [member] %s", memberId)); +// } +// return; +// } +// float value = memberAverageMemoryConsumptionEvent.getValue(); +// memberStatsContext.setAverageMemoryConsumption(value); +// } +// +// @Override +// public void handleMemberGradientOfMemoryConsumptionEvent( +// MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) { +// +// String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId(); +// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); +// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); +// if (null == memberStatsContext) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member context is not available for : [member] %s", memberId)); +// } +// return; +// } +// float value = memberGradientOfMemoryConsumptionEvent.getValue(); +// memberStatsContext.setGradientOfMemoryConsumption(value); +// } +// +// @Override +// public void handleMemberSecondDerivativeOfMemoryConsumptionEvent( +// MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) { +// +// } +// +// @Override +// public void handleMemberAverageLoadAverageEvent( +// MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) { +// +// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); +// String memberId = memberAverageLoadAverageEvent.getMemberId(); +// float value = memberAverageLoadAverageEvent.getValue(); +// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); +// if (null == memberStatsContext) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member context is not available for : [member] %s", memberId)); +// } +// return; +// } +// memberStatsContext.setAverageLoadAverage(value); +// } +// +// @Override +// public void handleMemberGradientOfLoadAverageEvent( +// MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) { +// +// String memberId = memberGradientOfLoadAverageEvent.getMemberId(); +// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); +// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); +// if (null == memberStatsContext) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member context is not available for : [member] %s", memberId)); +// } +// return; +// } +// float value = memberGradientOfLoadAverageEvent.getValue(); +// memberStatsContext.setGradientOfLoadAverage(value); +// } +// +// @Override +// public void handleMemberSecondDerivativeOfLoadAverageEvent( +// MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) { +// +// String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId(); +// KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); +// MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); +// if (null == memberStatsContext) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member context is not available for : [member] %s", memberId)); +// } +// return; +// } +// float value = memberSecondDerivativeOfLoadAverageEvent.getValue(); +// memberStatsContext.setSecondDerivativeOfLoadAverage(value); +// } +// +// @Override +// public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) { +// // kill the container +// String memberId = memberFaultEvent.getMemberId(); +// Member member = getMemberByMemberId(memberId); +// if (null == member) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); +// } +// return; +// } +// if (!member.isActive()) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member activated event has not received for the member %s. " +// + "Therefore ignoring" + " the member fault health stat", memberId)); +// } +// return; +// } +// +// if (!getKubernetesClusterCtxt().activeMemberExist(memberId)) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Could not find the active member in kubernetes cluster context, " +// + "[member] %s ", memberId)); +// } +// return; +// } +// +// // move member to obsolete list +// getKubernetesClusterCtxt().moveMemberToObsoleteList(memberId); +// if (log.isInfoEnabled()) { +// String clusterId = memberFaultEvent.getClusterId(); +// String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); +// log.info(String.format("Faulty member is moved to obsolete list and removed from the active members list: " +// + "[member] %s [kub-cluster] %s [cluster] %s ", memberId, kubernetesClusterID, clusterId)); +// } +// } +// +// @Override +// public void handleMemberStartedEvent( +// MemberStartedEvent memberStartedEvent) { +// +// } +// +// @Override +// public void handleMemberActivatedEvent( +// MemberActivatedEvent memberActivatedEvent) { +// +// KubernetesClusterContext kubernetesClusterContext; +// kubernetesClusterContext = getKubernetesClusterCtxt(); +// String memberId = memberActivatedEvent.getMemberId(); +// kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId)); +// if (log.isInfoEnabled()) { +// log.info(String.format("Member stat context has been added successfully: " +// + "[member] %s", memberId)); +// } +// kubernetesClusterContext.movePendingMemberToActiveMembers(memberId); +// } +// +// @Override +// public void handleMemberMaintenanceModeEvent( +// MemberMaintenanceModeEvent maintenanceModeEvent) { +// +// // no need to do anything here +// // we will not be receiving this event for containers +// // we will only receive member terminated event +// } +// +// @Override +// public void handleMemberReadyToShutdownEvent( +// MemberReadyToShutdownEvent memberReadyToShutdownEvent) { +// +// // no need to do anything here +// // we will not be receiving this event for containers +// // we will only receive member terminated event +// } +// +// @Override +// public void handleMemberTerminatedEvent( +// MemberTerminatedEvent memberTerminatedEvent) { +// +// String memberId = memberTerminatedEvent.getMemberId(); +// if (getKubernetesClusterCtxt().removeTerminationPendingMember(memberId)) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member is removed from termination pending members list: " +// + "[member] %s", memberId)); +// } +// } else if (getKubernetesClusterCtxt().removePendingMember(memberId)) { +// if (log.isDebugEnabled()) { +// log.debug(String.format("Member is removed from pending members list: " +// + "[member] %s", memberId)); +// } +// } else if (getKubernetesClusterCtxt().removeActiveMemberById(memberId)) { +// log.warn(String.format("Member is in the wrong list and it is removed from " +// + "active members list: %s", memberId)); +// } else if (getKubernetesClusterCtxt().removeObsoleteMember(memberId)) { +// log.warn(String.format("Obsolete member has either been terminated or its obsolete time out has expired and" +// + " it is removed from obsolete members list: %s", memberId)); +// } else { +// log.warn(String.format("Member is not available in any of the list active, " +// + "pending and termination pending: %s", memberId)); +// } +// +// if (log.isInfoEnabled()) { +// log.info(String.format("Member stat context has been removed successfully: " +// + "[member] %s", memberId)); +// } +// } +// +// @Override +// public void handleClusterRemovedEvent( +// ClusterRemovedEvent clusterRemovedEvent) { +// getKubernetesClusterCtxt().getPendingMembers().clear(); +// getKubernetesClusterCtxt().getActiveMembers().clear(); +// getKubernetesClusterCtxt().getTerminationPendingMembers().clear(); +// getKubernetesClusterCtxt().getObsoletedMembers().clear(); +// } +// +// public KubernetesClusterContext getKubernetesClusterCtxt() { +// return (KubernetesClusterContext) getClusterContext(); +// } +// +// private Member getMemberByMemberId(String memberId) { +// try { +// TopologyManager.acquireReadLock(); +// for (Service service : TopologyManager.getTopology().getServices()) { +// for (Cluster cluster : service.getClusters()) { +// if (cluster.memberExists(memberId)) { +// return cluster.getMember(memberId); +// } +// } +// } +// return null; +// } finally { +// TopologyManager.releaseReadLock(); +// } +// } @Override public void terminateAllMembers(String instanceId, String networkPartitionId) { try { - CloudControllerClient.getInstance().terminateAllContainers(getKubernetesClusterCtxt().getClusterId()); + CloudControllerClient.getInstance().terminateAllContainers(getClusterId()); } catch (TerminationException e) { log.error(String.format("Could not terminate containers: [cluster-id] %s", - getKubernetesClusterCtxt().getClusterId()), e); + getClusterId()), e); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c1ad7a52/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java index edcaa65..0221a05 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java @@ -31,6 +31,7 @@ import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.common.Properties; import org.apache.stratos.common.Property; import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.ClusterStatus; import java.util.Arrays; @@ -45,9 +46,8 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni private String lbReferenceType; - public KubernetesServiceClusterMonitor(String serviceType, String clusterId) { - super(serviceType, clusterId, - new AutoscalerRuleEvaluator()); + public KubernetesServiceClusterMonitor(Cluster cluster) { + super(cluster); readConfigurations(); } @@ -73,162 +73,162 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni } } - @Override - protected void monitor() { - final String instanceId = this.getKubernetesClusterCtxt().getInstanceId(); - Runnable monitoringRunnable = new Runnable() { - - @Override - public void run() { - obsoleteCheck(); - minCheck(); - scaleCheck(instanceId); - } - }; - monitoringRunnable.run(); - } - - - private void scaleCheck(String instanceId) { - boolean rifReset = getKubernetesClusterCtxt().isRifReset(); - boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset(); - boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset(); - if (log.isDebugEnabled()) { - log.debug("flag of rifReset : " + rifReset - + " flag of memoryConsumptionReset : " - + memoryConsumptionReset + " flag of loadAverageReset : " - + loadAverageReset); - } - String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); - String clusterId = getClusterId(); - if (rifReset || memoryConsumptionReset || loadAverageReset) { - getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId); - getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy(instanceId)); - getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset); - getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset); - getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset); - if (log.isDebugEnabled()) { - log.debug(String.format( - "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId())); - } - scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate( - getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt()); - getKubernetesClusterCtxt().setRifReset(false); - getKubernetesClusterCtxt().setMemoryConsumptionReset(false); - getKubernetesClusterCtxt().setLoadAverageReset(false); - } else if (log.isDebugEnabled()) { - log.debug(String.format("Scale check will not run since none of the statistics have not received yet for " - + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId)); - } - } - - private AutoscalePolicy getAutoscalePolicy(String instanceId) { - KubernetesClusterContext kubernetesClusterContext = (KubernetesClusterContext) this.clusterContext; - return kubernetesClusterContext.getAutoscalePolicy(); - } - - private void minCheck() { - getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); - String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); - if (log.isDebugEnabled()) { - log.debug(String.format( - "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId())); - } - minCheckFactHandle = AutoscalerRuleEvaluator.evaluate( - getMinCheckKnowledgeSession(), minCheckFactHandle, - getKubernetesClusterCtxt()); - } - - private void obsoleteCheck() { - getObsoleteCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); - String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); - if (log.isDebugEnabled()) { - log.debug(String.format( - "Running obsolete check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId())); - } - obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate( - getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, - getKubernetesClusterCtxt()); - } - - @Override - public void destroy() { - getMinCheckKnowledgeSession().dispose(); - getObsoleteCheckKnowledgeSession().dispose(); - getScaleCheckKnowledgeSession().dispose(); - setDestroyed(true); - stopScheduler(); - if (log.isDebugEnabled()) { - log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString()); - } - } - - @Override - protected void readConfigurations() { - XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); - int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000); - setMonitorIntervalMilliseconds(monitorInterval); - if (log.isDebugEnabled()) { - log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds()); - } - } - - @Override - public String toString() { - return "KubernetesServiceClusterMonitor for " + "[ clusterId=" + getClusterId() + "]"; - } - - public String getLbReferenceType() { - return lbReferenceType; - } - - public void setLbReferenceType(String lbReferenceType) { - this.lbReferenceType = lbReferenceType; - } - - @Override - public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException { - - if (properties != null) { - Property[] propertyArray = properties.getProperties(); - if (propertyArray == null) { - return; - } - List<Property> propertyList = Arrays.asList(propertyArray); - - for (Property property : propertyList) { - String key = property.getName(); - String value = property.getValue(); - - if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) { - int min = Integer.parseInt(value); - int max = getKubernetesClusterCtxt().getMaxReplicas(); - if (min > max) { - String msg = String.format("%s should be less than %s . But %s is not less than %s.", - StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max); - log.error(msg); - throw new InvalidArgumentException(msg); - } - getKubernetesClusterCtxt().setMinReplicas(min); - break; - } - } - - } - } - - @Override - public void terminateAllMembers(String instanceId, String networkPartitionId) { - - } - - @Override - public void onChildScalingEvent(MonitorScalingEvent scalingEvent) { - - } - - @Override - public void onParentScalingEvent(MonitorScalingEvent scalingEvent) { - - } +// @Override +// public void monitor() { +// final String instanceId = this.getKubernetesClusterCtxt().getInstanceId(); +// Runnable monitoringRunnable = new Runnable() { +// +// @Override +// public void run() { +// obsoleteCheck(); +// minCheck(); +// scaleCheck(instanceId); +// } +// }; +// monitoringRunnable.run(); +// } +// +// +// private void scaleCheck(String instanceId) { +// boolean rifReset = getKubernetesClusterCtxt().isRifReset(); +// boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset(); +// boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset(); +// if (log.isDebugEnabled()) { +// log.debug("flag of rifReset : " + rifReset +// + " flag of memoryConsumptionReset : " +// + memoryConsumptionReset + " flag of loadAverageReset : " +// + loadAverageReset); +// } +// String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); +// String clusterId = getClusterId(); +// if (rifReset || memoryConsumptionReset || loadAverageReset) { +// getScaleCheckKnowledgeSession().setGlobal("clusterId", clusterId); +// getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", getAutoscalePolicy(instanceId)); +// getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset); +// getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset); +// getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset); +// if (log.isDebugEnabled()) { +// log.debug(String.format( +// "Running scale check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId())); +// } +// scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate( +// getScaleCheckKnowledgeSession(), scaleCheckFactHandle, getKubernetesClusterCtxt()); +// getKubernetesClusterCtxt().setRifReset(false); +// getKubernetesClusterCtxt().setMemoryConsumptionReset(false); +// getKubernetesClusterCtxt().setLoadAverageReset(false); +// } else if (log.isDebugEnabled()) { +// log.debug(String.format("Scale check will not run since none of the statistics have not received yet for " +// + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId)); +// } +// } +// +// private AutoscalePolicy getAutoscalePolicy(String instanceId) { +// KubernetesClusterContext kubernetesClusterContext = (KubernetesClusterContext) this.clusterContext; +// return kubernetesClusterContext.getAutoscalePolicy(); +// } +// +// private void minCheck() { +// getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); +// String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); +// if (log.isDebugEnabled()) { +// log.debug(String.format( +// "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId())); +// } +// minCheckFactHandle = AutoscalerRuleEvaluator.evaluate( +// getMinCheckKnowledgeSession(), minCheckFactHandle, +// getKubernetesClusterCtxt()); +// } +// +// private void obsoleteCheck() { +// getObsoleteCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); +// String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); +// if (log.isDebugEnabled()) { +// log.debug(String.format( +// "Running obsolete check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId())); +// } +// obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate( +// getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, +// getKubernetesClusterCtxt()); +// } +// +// @Override +// public void destroy() { +// getMinCheckKnowledgeSession().dispose(); +// getObsoleteCheckKnowledgeSession().dispose(); +// getScaleCheckKnowledgeSession().dispose(); +// setDestroyed(true); +// stopScheduler(); +// if (log.isDebugEnabled()) { +// log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString()); +// } +// } +// +// @Override +// protected void readConfigurations() { +// XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); +// int monitorInterval = conf.getInt(AutoScalerConstants.KubernetesService_Cluster_MONITOR_INTERVAL, 60000); +// setMonitorIntervalMilliseconds(monitorInterval); +// if (log.isDebugEnabled()) { +// log.debug("KubernetesServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds()); +// } +// } +// +// @Override +// public String toString() { +// return "KubernetesServiceClusterMonitor for " + "[ clusterId=" + getClusterId() + "]"; +// } +// +// public String getLbReferenceType() { +// return lbReferenceType; +// } +// +// public void setLbReferenceType(String lbReferenceType) { +// this.lbReferenceType = lbReferenceType; +// } +// +// @Override +// public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException { +// +// if (properties != null) { +// Property[] propertyArray = properties.getProperties(); +// if (propertyArray == null) { +// return; +// } +// List<Property> propertyList = Arrays.asList(propertyArray); +// +// for (Property property : propertyList) { +// String key = property.getName(); +// String value = property.getValue(); +// +// if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) { +// int min = Integer.parseInt(value); +// int max = getKubernetesClusterCtxt().getMaxReplicas(); +// if (min > max) { +// String msg = String.format("%s should be less than %s . But %s is not less than %s.", +// StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max); +// log.error(msg); +// throw new InvalidArgumentException(msg); +// } +// getKubernetesClusterCtxt().setMinReplicas(min); +// break; +// } +// } +// +// } +// } +// +// @Override +// public void terminateAllMembers(String instanceId, String networkPartitionId) { +// +// } +// +// @Override +// public void onChildScalingEvent(MonitorScalingEvent scalingEvent) { +// +// } +// +// @Override +// public void onParentScalingEvent(MonitorScalingEvent scalingEvent) { +// +// } } http://git-wip-us.apache.org/repos/asf/stratos/blob/c1ad7a52/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java index d000b88..718ede9 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java @@ -70,8 +70,8 @@ public class VMClusterMonitor extends AbstractClusterMonitor { private boolean hasPrimary; private float scalingFactorBasedOnDependencies = 1.0f; - protected VMClusterMonitor(String serviceType, String clusterId) { - super(serviceType, clusterId); + protected VMClusterMonitor(Cluster cluster) { + super(cluster); this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>(); readConfigurations(); http://git-wip-us.apache.org/repos/asf/stratos/blob/c1ad7a52/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index 2bfd091..bd0ea1c 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -190,6 +190,8 @@ public class RuleTasksDelegator { ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = clusterContext.getNetworkPartitionCtxt(nwPartitionId); ClusterInstanceContext clusterInstanceContext = clusterLevelNetworkPartitionContext.getClusterInstanceContext(instanceId); minimumCountOfNetworkPartition = clusterInstanceContext.getMinInstanceCount(); + + MemberContext[] memberContexts = CloudControllerClient.getInstance() .startContainers(clusterMonitorPartitionContext.getPartition(), @@ -241,25 +243,57 @@ public class RuleTasksDelegator { ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = clusterContext.getNetworkPartitionCtxt(nwPartitionId); ClusterInstanceContext clusterInstanceContext = clusterLevelNetworkPartitionContext.getClusterInstanceContext(instanceId); minimumCountOfNetworkPartition = clusterInstanceContext.getMinInstanceCount(); - MemberContext memberContext = - CloudControllerClient.getInstance() - .spawnAnInstance(clusterMonitorPartitionContext.getPartition(), - clusterId, - clusterMonitorPartitionContext.getNetworkPartitionId(), - instanceId, - isPrimary, - minimumCountOfNetworkPartition); - if (memberContext != null) { - clusterMonitorPartitionContext.addPendingMember(memberContext); - if (log.isDebugEnabled()) { - log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(), - memberContext.getPartition().getId())); + + if (vmClusterMonitor.getCluster().isKubernetesCluster()) { + MemberContext[] memberContexts = + CloudControllerClient.getInstance() + .startContainers(clusterMonitorPartitionContext.getPartition(), + clusterId, + instanceId, + clusterMonitorPartitionContext.getNetworkPartitionId(), + isPrimary, + minimumCountOfNetworkPartition); + if (null != memberContexts) { + for (MemberContext memberContext : memberContexts) { + if (null != memberContext) { + clusterMonitorPartitionContext.addPendingMember(memberContext); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(), + memberContext.getPartition().getId())); + } + } else { + if (log.isDebugEnabled()) { + log.debug("Returned member context is null, did not add any pending members"); + } + } + } + } else { + if (log.isDebugEnabled()) { + log.debug("Returned member context is null, did not add to pending members"); + } } + } else { + + MemberContext memberContext = + CloudControllerClient.getInstance() + .spawnAnInstance(clusterMonitorPartitionContext.getPartition(), + clusterId, + clusterMonitorPartitionContext.getNetworkPartitionId(), + instanceId, + isPrimary, + minimumCountOfNetworkPartition); + if (memberContext != null) { + clusterMonitorPartitionContext.addPendingMember(memberContext); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(), + memberContext.getPartition().getId())); + } - } else if (log.isDebugEnabled()) { - log.debug("Returned member context is null, did not add to pending members"); + } else if (log.isDebugEnabled()) { + log.debug("Returned member context is null, did not add to pending members"); + } } - + } catch (Throwable e) { String message = "Cannot spawn an instance"; log.error(message, e); http://git-wip-us.apache.org/repos/asf/stratos/blob/c1ad7a52/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Cluster.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Cluster.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Cluster.java index 2e345d4..3d26183 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Cluster.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Cluster.java @@ -63,6 +63,23 @@ public class Cluster implements Serializable { private Map<String, ClusterInstance> instanceIdToInstanceContextMap; //private LifeCycleStateManager<ClusterStatus> clusterStateManager; + public Cluster(Cluster cluster) { + this.serviceName = cluster.getServiceName(); + this.clusterId = cluster.getClusterId(); + this.deploymentPolicyName = cluster.getDeploymentPolicyName(); + this.autoscalePolicyName = cluster.getAutoscalePolicyName(); + this.appId = cluster.getAppId(); + this.setKubernetesCluster(cluster.isKubernetesCluster()); + this.setHostNames(cluster.getHostNames()); + this.memberMap = cluster.getMemberMap(); + this.setInstanceIdToInstanceContextMap(cluster.getInstanceIdToInstanceContextMap()); + this.properties = cluster.getProperties(); + this.loadBalanceAlgorithmName = cluster.getLoadBalanceAlgorithmName(); + this.parentId = cluster.getParentId(); + this.tenantRange = cluster.getTenantRange(); + this.setLbCluster(cluster.isLbCluster()); + } + public Cluster(String serviceName, String clusterId, String deploymentPolicyName, String autoscalePolicyName, String appId) { this.serviceName = serviceName; @@ -258,6 +275,14 @@ public class Cluster implements Serializable { return getInstanceIdToInstanceContextMap().keySet().size(); } + public Map<String, Member> getMemberMap() { + return memberMap; + } + + public void setMemberMap(Map<String, Member> memberMap) { + this.memberMap = memberMap; + } + public boolean equals(Object other) { if (other == null || !(other instanceof Cluster)) { return false; http://git-wip-us.apache.org/repos/asf/stratos/blob/c1ad7a52/products/stratos/modules/distribution/container-mincheck.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/container-mincheck.drl b/products/stratos/modules/distribution/container-mincheck.drl new file mode 100755 index 0000000..2679d25 --- /dev/null +++ b/products/stratos/modules/distribution/container-mincheck.drl @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.autoscaler.rule; + +import org.apache.commons.logging.Log; +import org.apache.stratos.autoscaler.context.cluster.KubernetesClusterContext; + +global org.apache.stratos.autoscaler.rule.RuleLog log; +global org.apache.stratos.autoscaler.rule.RuleTasksDelegator delegator; +global java.lang.String clusterId; + +rule "Container Minimum Rule" +dialect "mvel" + when + $kubernetesClusterContext : KubernetesClusterContext () + kubernetesClusterId : String() from $kubernetesClusterContext.getKubernetesClusterID() + minReplicas : Integer() from $kubernetesClusterContext.getMinReplicas() + nonTerminatedReplicas : Integer() from $kubernetesClusterContext.getNonTerminatedMemberCount() + isServiceClusterCreated : Boolean() from $kubernetesClusterContext.isServiceClusterCreated() + + eval(log.info("Running minimum rule: [kub-cluster] " + kubernetesClusterId + " [cluster] " + clusterId)) + eval(log.info("[min-check] " + " [cluster] : " + clusterId + " [Replicas] nonTerminated : " + nonTerminatedReplicas)) + eval(log.info("[min-check] " + " [cluster] : " + clusterId + " [Replicas] minReplicas : " + minReplicas)) + eval(nonTerminatedReplicas < minReplicas) + then + if (isServiceClusterCreated) { + // we suceeded calling startContainer() once, can't call it again + log.info("[min-check] Decided to scale-up : [cluster] : " + clusterId); + log.info("[min-check] " + " [cluster] : " + clusterId + " ; min-rule not satisfied, scaling up to minReplicas : " + minReplicas); + delegator.delegateScaleUpContainers($kubernetesClusterContext, minReplicas); + } else { + // we should call startContainer + log.info("[min-check] Decided to create the cluster : [cluster] : " + clusterId); + log.info("[min-check] " + " [cluster] : " + clusterId + " ; min-rule not satisfied, no containers created yet, creating minReplicas : " + minReplicas); + delegator.delegateStartContainers($kubernetesClusterContext); + } +end
