Merging VMServiceClusterMonitor to VMClusterMonitor as abtraction is not required anymore
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7797f1e5 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7797f1e5 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7797f1e5 Branch: refs/heads/master Commit: 7797f1e5b6b4f3a772c9edf6fb17aa05acf9e586 Parents: fdb84e7 Author: Lahiru Sandaruwan <[email protected]> Authored: Tue Dec 2 07:33:47 2014 +0530 Committer: Lahiru Sandaruwan <[email protected]> Committed: Tue Dec 2 07:36:07 2014 +0530 ---------------------------------------------------------------------- .../monitor/cluster/ClusterMonitorFactory.java | 7 +- .../monitor/cluster/VMClusterMonitor.java | 287 ++++++++++++++++++- .../autoscaler/rule/RuleTasksDelegator.java | 7 +- 3 files changed, 286 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/7797f1e5/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java index 1159fc5..65ab2a2 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java @@ -26,7 +26,6 @@ import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.MemberStatus; @@ -52,20 +51,20 @@ public class ClusterMonitorFactory { // } else if (cluster.isLbCluster()) { // clusterMonitor = getVMLbClusterMonitor(cluster); } else { - clusterMonitor = getVMServiceClusterMonitor(cluster); + clusterMonitor = getVMClusterMonitor(cluster); } return clusterMonitor; } - private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster) + private static VMClusterMonitor getVMClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException { if (null == cluster) { return null; } - VMServiceClusterMonitor clusterMonitor = new VMServiceClusterMonitor(cluster.getServiceName(), cluster.getClusterId()); + VMClusterMonitor clusterMonitor = new VMClusterMonitor(cluster.getServiceName(), cluster.getClusterId()); // find lb reference type java.util.Properties props = cluster.getProperties(); http://git-wip-us.apache.org/repos/asf/stratos/blob/7797f1e5/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java index 69e7ca0..d902b7e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java @@ -18,10 +18,9 @@ */ package org.apache.stratos.autoscaler.monitor.cluster; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; +import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.client.CloudControllerClient; @@ -30,12 +29,23 @@ import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; import org.apache.stratos.autoscaler.context.member.MemberStatsContext; import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; +import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.exception.InvalidArgumentException; import org.apache.stratos.autoscaler.exception.cartridge.TerminationException; +import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent; +import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; +import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; import org.apache.stratos.autoscaler.status.processor.StatusChecker; +import org.apache.stratos.autoscaler.util.AutoScalerConstants; +import org.apache.stratos.autoscaler.util.AutoscalerUtil; +import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.Properties; +import org.apache.stratos.common.Property; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; @@ -53,14 +63,21 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; * and perform minimum instance check and scaling check using the underlying * rules engine. */ -abstract public class VMClusterMonitor extends AbstractClusterMonitor { +public class VMClusterMonitor extends AbstractClusterMonitor { private static final Log log = LogFactory.getLog(VMClusterMonitor.class); private Map<String, ClusterLevelNetworkPartitionContext> networkPartitionIdToClusterLevelNetworkPartitionCtxts; - - protected VMClusterMonitor(String serviceType, String clusterId, AutoscalerRuleEvaluator autoscalerRuleEvaluator) { - super(serviceType, clusterId, autoscalerRuleEvaluator); + private boolean hasPrimary; + private float scalingFactorBasedOnDependencies = 1.0f; + + protected VMClusterMonitor(String serviceType, String clusterId) { + super(serviceType, clusterId, new AutoscalerRuleEvaluator( + StratosConstants.VM_MIN_CHECK_DROOL_FILE, + StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE, + StratosConstants.VM_SCALE_CHECK_DROOL_FILE)); this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>(); + + readConfigurations(); } public void addClusterLevelNWPartitionContext (ClusterLevelNetworkPartitionContext clusterLevelNWPartitionCtxt) { @@ -95,10 +112,266 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { " [network partition] %s", networkPartitionId)); } } + } + + @Override + public void run() { + while (!isDestroyed()) { + try { + /* TODO ***********if (((getStatus().getCode() <= ClusterStatus.Active.getCode()) || + (getStatus() == ClusterStatus.Inactive && !hasStartupDependents)) && !this.hasFaultyMember + && !stop) {*/ + if (log.isDebugEnabled()) { + log.debug("Cluster monitor is running.. " + this.toString()); + } + monitor(); + /*} else { + if (log.isDebugEnabled()) { + log.debug("Cluster monitor is suspended as the cluster is in " + + ClusterStatus.Inactive + " mode......"); + } + }*/ + } catch (Exception e) { + log.error("Cluster monitor: Monitor failed." + this.toString(), e); + } + try { + Thread.sleep(getMonitorIntervalMilliseconds()); + } catch (InterruptedException ignore) { + } + } + + + } + + private boolean isPrimaryMember(MemberContext memberContext) { + Properties props = AutoscalerUtil.toCommonProperties(memberContext.getProperties()); + if (log.isDebugEnabled()) { + log.debug(" Properties [" + props + "] "); + } + if (props != null && props.getProperties() != null) { + for (Property prop : props.getProperties()) { + if (prop.getName().equals("PRIMARY")) { + if (Boolean.parseBoolean(prop.getValue())) { + log.debug("Adding member id [" + memberContext.getMemberId() + "] " + + "member instance id [" + memberContext.getInstanceId() + "] as a primary member"); + return true; + } + } + } + } + return false; + } + + public void monitor() { + final Collection<ClusterLevelNetworkPartitionContext> clusterLevelNetworkPartitionContexts = + ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts().values(); + Runnable monitoringRunnable = new Runnable() { + @Override + public void run() { + for (ClusterLevelNetworkPartitionContext networkPartitionContext : + clusterLevelNetworkPartitionContexts) { + // store primary members in the network partition context + List<String> primaryMemberListInNetworkPartition = new ArrayList<String>(); + //minimum check per partition + for (ClusterInstanceContext instanceContext : networkPartitionContext. + getClusterInstanceContextMap().values()) { + //FIXME to check the status of the instance + if (true) { + for (ClusterLevelPartitionContext partitionContext : + instanceContext.getPartitionCtxts()) { + // store primary members in the partition context + List<String> primaryMemberListInPartition = new ArrayList<String>(); + // get active primary members in this partition context + for (MemberContext memberContext : partitionContext.getActiveMembers()) { + if (isPrimaryMember(memberContext)) { + primaryMemberListInPartition.add(memberContext.getMemberId()); + } + } + + // get pending primary members in this partition context + for (MemberContext memberContext : partitionContext.getPendingMembers()) { + if (isPrimaryMember(memberContext)) { + primaryMemberListInPartition.add(memberContext.getMemberId()); + } + } + primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition); + getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); +// getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType); + getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary); + getMinCheckKnowledgeSession().setGlobal("instanceId", + instanceContext.getId()); + + if (log.isDebugEnabled()) { + log.debug(String.format("Running minimum check for partition %s ", + partitionContext.getPartitionId())); + } + + minCheckFactHandle = AutoscalerRuleEvaluator. + evaluateMinCheck(getMinCheckKnowledgeSession() + , minCheckFactHandle, partitionContext); + + obsoleteCheckFactHandle = AutoscalerRuleEvaluator. + evaluateObsoleteCheck(getObsoleteCheckKnowledgeSession(), + obsoleteCheckFactHandle, partitionContext); + + //checking the status of the cluster + + boolean rifReset = instanceContext.isRifReset(); + boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset(); + boolean loadAverageReset = instanceContext.isLoadAverageReset(); + + if (log.isDebugEnabled()) { + log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset + + " flag of loadAverageReset" + loadAverageReset); + } + if (rifReset || memoryConsumptionReset || loadAverageReset) { + + + VMClusterContext vmClusterContext = (VMClusterContext) clusterContext; + + getScaleCheckKnowledgeSession().setGlobal("instance", instanceContext); + getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", vmClusterContext.getAutoscalePolicy()); + getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset); + getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset); + getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset); +// getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType); + getScaleCheckKnowledgeSession().setGlobal("isPrimary", false); + getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition); + + if (log.isDebugEnabled()) { + log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId())); + log.debug(" Primary members : " + primaryMemberListInNetworkPartition); + } + + scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession() + , scaleCheckFactHandle, networkPartitionContext); + + instanceContext.setRifReset(false); + instanceContext.setMemoryConsumptionReset(false); + instanceContext.setLoadAverageReset(false); + } else if (log.isDebugEnabled()) { + log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " + + "cycle for network partition %s", networkPartitionContext.getId())); + } + } + + } + } + } + + } + }; + monitoringRunnable.run(); + } + + @Override + protected void readConfigurations() { + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000); + setMonitorIntervalMilliseconds(monitorInterval); + if (log.isDebugEnabled()) { + log.debug("VMServiceClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds()); + } + } + + @Override + public void destroy() { + getMinCheckKnowledgeSession().dispose(); + getObsoleteCheckKnowledgeSession().dispose(); + getScaleCheckKnowledgeSession().dispose(); + setDestroyed(true); + stopScheduler(); + if (log.isDebugEnabled()) { + log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString()); + } + } + + @Override + public String toString() { + return "VMServiceClusterMonitor [clusterId=" + getClusterId() + +// ", lbReferenceType=" + lbReferenceType + + ", hasPrimary=" + hasPrimary + " ]"; + } + +// public String getLbReferenceType() { +// return lbReferenceType; +// } +// +// public void setLbReferenceType(String lbReferenceType) { +// this.lbReferenceType = lbReferenceType; +// } + + public boolean isHasPrimary() { + return hasPrimary; + } + + public void setHasPrimary(boolean hasPrimary) { + this.hasPrimary = hasPrimary; + } + + @Override + public void onChildStatusEvent(MonitorStatusEvent statusEvent) { + + } + + @Override + public void onParentStatusEvent(MonitorStatusEvent statusEvent) { + String instanceId = statusEvent.getInstanceId(); + // send the ClusterTerminating event + if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() == + ApplicationStatus.Terminating) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster terminating event for [application]: " + appId + + " [cluster]: " + this.getClusterId()); + } + ClusterStatusEventPublisher.sendClusterTerminatingEvent(getAppId(), getServiceId(), getClusterId(), instanceId); + } + } + + @Override + public void onChildScalingEvent(MonitorScalingEvent scalingEvent) { } @Override + public void onParentScalingEvent(MonitorScalingEvent scalingEvent) { + + if (log.isDebugEnabled()) { + log.debug("Parent scaling event received to [cluster]: " + this.getClusterId() + + ", [network partition]: " + scalingEvent.getNetworkPartitionId() + + ", [event] " + scalingEvent.getId() + ", [group instance] " + scalingEvent.getInstanceId()); + } + + this.scalingFactorBasedOnDependencies = scalingEvent.getFactor(); + VMClusterContext vmClusterContext = (VMClusterContext) clusterContext; + String instanceId = scalingEvent.getInstanceId(); + + ClusterInstanceContext clusterLevelNetworkPartitionContext = + getClusterInstanceContext(scalingEvent.getNetworkPartitionId(), instanceId); + + + //TODO get min instance count from instance context + float requiredInstanceCount = 0 ;/* = clusterLevelNetworkPartitionContext.getMinInstanceCount() * scalingFactorBasedOnDependencies;*/ + int roundedRequiredInstanceCount = getRoundedInstanceCount(requiredInstanceCount, + vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor()); + clusterLevelNetworkPartitionContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount); + + getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + getDependentScaleCheckKnowledgeSession().setGlobal("scalingFactor", scalingFactorBasedOnDependencies); + getDependentScaleCheckKnowledgeSession().setGlobal("instanceRoundingFactor", + vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor()); + + dependentScaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession() + , scaleCheckFactHandle, clusterLevelNetworkPartitionContext); + + } + + public void sendClusterScalingEvent(String networkPartitionId, float factor) { + + MonitorStatusEventBuilder.handleClusterScalingEvent(this.parent, networkPartitionId, factor, this.id); + } + @Override public void handleGradientOfLoadAverageEvent( GradientOfLoadAverageEvent gradientOfLoadAverageEvent) { http://git-wip-us.apache.org/repos/asf/stratos/blob/7797f1e5/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index bb4d87f..e4c82ba 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -37,7 +37,6 @@ import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionCont import org.apache.stratos.autoscaler.exception.cartridge.TerminationException; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; -import org.apache.stratos.autoscaler.monitor.cluster.VMServiceClusterMonitor; //import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.PartitionManager; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.constants.StratosConstants; @@ -222,10 +221,10 @@ public class RuleTasksDelegator { //Notify parent for checking scaling dependencies AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); - if (clusterMonitor instanceof VMServiceClusterMonitor) { + if (clusterMonitor instanceof VMClusterMonitor) { - VMServiceClusterMonitor vmServiceClusterMonitor = (VMServiceClusterMonitor) clusterMonitor; - vmServiceClusterMonitor.sendClusterScalingEvent(networkPartitionId, factor); + VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) clusterMonitor; + vmClusterMonitor.sendClusterScalingEvent(networkPartitionId, factor); } }
