http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java index 75f7a2d..a8c90b3 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -23,7 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.context.AutoscalerContext; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; -import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; @@ -172,8 +172,8 @@ public class AutoscalerHealthStatEventReceiver { } return; } - if(monitor instanceof VMClusterMonitor) { - VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) monitor; + if(monitor instanceof ClusterMonitor) { + ClusterMonitor vmClusterMonitor = (ClusterMonitor) monitor; vmClusterMonitor.handleAverageRequestsServingCapabilityEvent(averageRequestsServingCapabilityEvent); } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index 2a6f945..f5875ce 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -24,7 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.context.AutoscalerContext; import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory; -import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; +import org.apache.stratos.autoscaler.context.cluster.ClusterContext; import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException; @@ -33,7 +33,7 @@ import org.apache.stratos.autoscaler.exception.partition.PartitionValidationExce import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; import org.apache.stratos.autoscaler.monitor.MonitorFactory; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; -import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent; import org.apache.stratos.autoscaler.pojo.policy.PolicyManager; @@ -304,7 +304,7 @@ public class AutoscalerTopologyEventReceiver { monitor.notifyParentMonitor(ClusterStatus.Terminated, instanceId); //Removing the instance and instanceContext ClusterInstance instance = (ClusterInstance) monitor.getInstance(instanceId); - ((VMClusterContext)monitor.getClusterContext()). + ((ClusterContext)monitor.getClusterContext()). getNetworkPartitionCtxt(instance.getNetworkPartitionId()). removeInstanceContext(instanceId); monitor.removeInstance(instanceId); @@ -446,8 +446,8 @@ public class AutoscalerTopologyEventReceiver { Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId()); if (cluster != null) { try { - VMClusterContext clusterContext = - (VMClusterContext) clusterMonitor.getClusterContext(); + ClusterContext clusterContext = + (ClusterContext) clusterMonitor.getClusterContext(); if (clusterContext == null) { clusterContext = ClusterContextFactory.getVMClusterContext(instanceId, cluster, clusterMonitor.hasScalingDependents()); @@ -470,7 +470,7 @@ public class AutoscalerTopologyEventReceiver { + clusterInstanceCreatedEvent.getClusterId() + " started successfully"); } else { //monitor already started. Invoking it directly to speed up the process - ((VMClusterMonitor)clusterMonitor).monitor(); + ((ClusterMonitor)clusterMonitor).monitor(); } } catch (PolicyValidationException e) { log.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java index a8721a6..b5b1614 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java @@ -31,7 +31,7 @@ import org.apache.stratos.autoscaler.exception.partition.PartitionValidationExce import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitorFactory; -import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; +import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.component.GroupMonitor; import org.apache.stratos.autoscaler.monitor.component.ParentComponentMonitor; @@ -270,7 +270,7 @@ public class MonitorFactory { } //Creating the instance of the cluster - ((VMClusterMonitor) clusterMonitor).createClusterInstance(parentInstanceIds, cluster); + ((ClusterMonitor) clusterMonitor).createClusterInstance(parentInstanceIds, cluster); //add it to autoscaler context AutoscalerContext.getInstance().addClusterMonitor(clusterMonitor); http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java new file mode 100644 index 0000000..24b8f3a --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -0,0 +1,1244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.monitor.cluster; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.client.CloudControllerClient; +import org.apache.stratos.autoscaler.context.InstanceContext; +import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory; +import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; +import org.apache.stratos.autoscaler.context.cluster.ClusterContext; +import org.apache.stratos.autoscaler.context.member.MemberStatsContext; +import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; +import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; +import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; +import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; +import org.apache.stratos.autoscaler.exception.InvalidArgumentException; +import org.apache.stratos.autoscaler.exception.cartridge.TerminationException; +import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException; +import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; +import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; +import org.apache.stratos.autoscaler.monitor.events.ScalingEvent; +import org.apache.stratos.autoscaler.monitor.events.ScalingOverMaxEvent; +import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder; +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor; +import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor; +import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor; +import org.apache.stratos.autoscaler.util.AutoScalerConstants; +import org.apache.stratos.autoscaler.util.AutoscalerUtil; +import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; +import org.apache.stratos.cloud.controller.stub.domain.MemberContext; +import org.apache.stratos.common.Properties; +import org.apache.stratos.common.Property; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.GroupStatus; +import org.apache.stratos.messaging.domain.instance.ClusterInstance; +import org.apache.stratos.messaging.domain.instance.GroupInstance; +import org.apache.stratos.messaging.domain.instance.Instance; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.health.stat.*; +import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +import java.util.*; + +/** + * Is responsible for monitoring a service cluster. This runs periodically + * and perform minimum instance check and scaling check using the underlying + * rules engine. + */ +public class ClusterMonitor extends AbstractClusterMonitor { + + private static final Log log = LogFactory.getLog(ClusterMonitor.class); + private Map<String, ClusterLevelNetworkPartitionContext> networkPartitionIdToClusterLevelNetworkPartitionCtxts; + private boolean hasPrimary; + private float scalingFactorBasedOnDependencies = 1.0f; + + + protected ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree) { + super(cluster, hasScalingDependents, groupScalingEnabledSubtree); + this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new HashMap<String, ClusterLevelNetworkPartitionContext>(); + readConfigurations(); + autoscalerRuleEvaluator = new AutoscalerRuleEvaluator(); + autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE); + autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_SCALE_CHECK_DROOL_FILE); + autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.VM_MIN_CHECK_DROOL_FILE); + autoscalerRuleEvaluator.parseAndBuildKnowledgeBaseForDroolsFile(StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE); + + this.obsoleteCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession( + StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE); + this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession( + StratosConstants.VM_SCALE_CHECK_DROOL_FILE); + this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession( + StratosConstants.VM_MIN_CHECK_DROOL_FILE); + this.dependentScaleCheckKnowledgeSession = autoscalerRuleEvaluator.getStatefulSession( + StratosConstants.DEPENDENT_SCALE_CHECK_DROOL_FILE); + } + + private static void terminateMember(String memberId) { + try { + CloudControllerClient.getInstance().terminate(memberId); + + } catch (TerminationException e) { + log.error("Unable to terminate member [member id ] " + memberId, e); + } + } + + private static void createClusterInstance(String serviceType, String clusterId, String alias, + String instanceId, String partitionId, String networkPartitionId) { + CloudControllerClient.getInstance().createClusterInstance(serviceType, clusterId, alias, + instanceId, partitionId, networkPartitionId); + } + + public void addClusterLevelNWPartitionContext(ClusterLevelNetworkPartitionContext clusterLevelNWPartitionCtxt) { + networkPartitionIdToClusterLevelNetworkPartitionCtxts.put(clusterLevelNWPartitionCtxt.getId(), clusterLevelNWPartitionCtxt); + } + + public ClusterLevelNetworkPartitionContext getClusterLevelNWPartitionContext(String nwPartitionId) { + return networkPartitionIdToClusterLevelNetworkPartitionCtxts.get(nwPartitionId); + } + + @Override + public void handleAverageLoadAverageEvent( + AverageLoadAverageEvent averageLoadAverageEvent) { + + String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId(); + String clusterId = averageLoadAverageEvent.getClusterId(); + String instanceId = averageLoadAverageEvent.getInstanceId(); + float value = averageLoadAverageEvent.getValue(); + + if (log.isDebugEnabled()) { + log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, value)); + } + + ClusterInstanceContext clusterInstanceContext = getClusterInstanceContext(networkPartitionId, + instanceId); + if (null != clusterInstanceContext) { + clusterInstanceContext.setAverageLoadAverage(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void run() { + while (!isDestroyed()) { + try { + if (log.isDebugEnabled()) { + log.debug("Cluster monitor is running.. " + this.toString()); + } + monitor(); + } catch (Exception e) { + log.error("Cluster monitor: Monitor failed." + this.toString(), e); + } + try { + Thread.sleep(getMonitorIntervalMilliseconds()); + } catch (InterruptedException ignore) { + } + } + + + } + + private boolean isPrimaryMember(MemberContext memberContext) { + Properties props = AutoscalerUtil.toCommonProperties(memberContext.getProperties()); + if (log.isDebugEnabled()) { + log.debug(" Properties [" + props + "] "); + } + if (props != null && props.getProperties() != null) { + for (Property prop : props.getProperties()) { + if (prop.getName().equals("PRIMARY")) { + if (Boolean.parseBoolean(prop.getValue())) { + log.debug("Adding member id [" + memberContext.getMemberId() + "] " + + "member instance id [" + memberContext.getInstanceId() + "] as a primary member"); + return true; + } + } + } + } + return false; + } + + public synchronized void monitor() { + + for (ClusterLevelNetworkPartitionContext networkPartitionContext : getNetworkPartitionCtxts()) { + + final Collection<InstanceContext> clusterInstanceContexts = networkPartitionContext. + getInstanceIdToInstanceContextMap().values(); + + for (final InstanceContext pInstanceContext : clusterInstanceContexts) { + final ClusterInstanceContext instanceContext = (ClusterInstanceContext) pInstanceContext; + ClusterInstance instance = (ClusterInstance) this.instanceIdToInstanceMap. + get(instanceContext.getId()); + + if ((instance.getStatus().getCode() <= ClusterStatus.Active.getCode()) || + (instance.getStatus() == ClusterStatus.Inactive && !hasStartupDependents) + && !this.hasFaultyMember) { + + Runnable monitoringRunnable = new Runnable() { + @Override + public void run() { + + if (log.isDebugEnabled()) { + log.debug("Monitor is running for [cluster] : " + getClusterId()); + } + // store primary members in the cluster instance context + List<String> primaryMemberListInClusterInstance = new ArrayList<String>(); + + for (ClusterLevelPartitionContext partitionContext : + instanceContext.getPartitionCtxts()) { + + // get active primary members in this cluster instance context + for (MemberContext memberContext : partitionContext.getActiveMembers()) { + if (isPrimaryMember(memberContext)) { + primaryMemberListInClusterInstance.add(memberContext.getMemberId()); + } + } + + // get pending primary members in this cluster instance context + for (MemberContext memberContext : partitionContext.getPendingMembers()) { + if (isPrimaryMember(memberContext)) { + primaryMemberListInClusterInstance.add(memberContext.getMemberId()); + } + } + + obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate( + getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext); + + } + + getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInClusterInstance); + getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary); + //FIXME when parent chosen the partition + String paritionAlgo = instanceContext.getPartitionAlgorithm(); + + getMinCheckKnowledgeSession().setGlobal("algorithmName", + paritionAlgo); + + if (log.isDebugEnabled()) { + log.debug(String.format("Running minimum check for cluster instance %s ", + instanceContext.getId() + " for the cluster: " + clusterId)); + } + + minCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getMinCheckKnowledgeSession(), + minCheckFactHandle, instanceContext); + + + //checking the status of the cluster + boolean rifReset = instanceContext.isRifReset(); + boolean memoryConsumptionReset = instanceContext.isMemoryConsumptionReset(); + boolean loadAverageReset = instanceContext.isLoadAverageReset(); + boolean averageRequestServedPerInstanceReset + = instanceContext.isAverageRequestServedPerInstanceReset(); + + if (log.isDebugEnabled()) { + log.debug("Execution point of scaling Rule, [Is rif Reset] : " + rifReset + + " [Is memoryConsumption Reset] : " + memoryConsumptionReset + + " [Is loadAverage Reset] : " + loadAverageReset); + } + + if (rifReset || memoryConsumptionReset || loadAverageReset) { + + log.info("Executing scaling rule as statistics have been reset"); + ClusterContext vmClusterContext = (ClusterContext) clusterContext; + + getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset); + getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset); + getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset); + getScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary); + getScaleCheckKnowledgeSession().setGlobal("algorithmName", paritionAlgo); + getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", + vmClusterContext.getAutoscalePolicy()); + getScaleCheckKnowledgeSession().setGlobal("arspiReset", + averageRequestServedPerInstanceReset); + getScaleCheckKnowledgeSession().setGlobal("primaryMembers", + primaryMemberListInClusterInstance); + + if (log.isDebugEnabled()) { + log.debug(String.format("Running scale check for [cluster instance context] %s ", + instanceContext.getId())); + log.debug(" Primary members : " + primaryMemberListInClusterInstance); + } + + scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getScaleCheckKnowledgeSession() + , scaleCheckFactHandle, instanceContext); + + instanceContext.setRifReset(false); + instanceContext.setMemoryConsumptionReset(false); + instanceContext.setLoadAverageReset(false); + } else if (log.isDebugEnabled()) { + log.debug(String.format("Scale rule will not run since the LB statistics have not " + + "received before this cycle for [cluster instance context] %s [cluster] %s", + instanceContext.getId(), clusterId)); + } + + } + }; + monitoringRunnable.run(); + } + + for (final ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) { + Runnable monitoringRunnable = new Runnable() { + @Override + public void run() { + obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluate( + getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, partitionContext); + } + }; + + monitoringRunnable.run(); + + } + + } + } + } + + @Override + protected void readConfigurations() { + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + int monitorInterval = conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000); + setMonitorIntervalMilliseconds(monitorInterval); + if (log.isDebugEnabled()) { + log.debug("ClusterMonitor task interval set to : " + getMonitorIntervalMilliseconds()); + } + } + + @Override + public void destroy() { + getMinCheckKnowledgeSession().dispose(); + getObsoleteCheckKnowledgeSession().dispose(); + getScaleCheckKnowledgeSession().dispose(); + setDestroyed(true); + stopScheduler(); + if (log.isDebugEnabled()) { + log.debug("ClusterMonitor Drools session has been disposed. " + this.toString()); + } + } + + @Override + public String toString() { + return "ClusterMonitor [clusterId=" + getClusterId() + + ", hasPrimary=" + hasPrimary + " ]"; + } + + public boolean isHasPrimary() { + return hasPrimary; + } + + public void setHasPrimary(boolean hasPrimary) { + this.hasPrimary = hasPrimary; + } + + @Override + public void onChildStatusEvent(MonitorStatusEvent statusEvent) { + + } + + @Override + public void onParentStatusEvent(MonitorStatusEvent statusEvent) { + String instanceId = statusEvent.getInstanceId(); + // send the ClusterTerminating event + if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() == + ApplicationStatus.Terminating) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster terminating event for [application] " + appId + + " [cluster] " + this.getClusterId() + " [instance] " + instanceId); + } + ClusterStatusEventPublisher.sendClusterTerminatingEvent(getAppId(), getServiceId(), getClusterId(), instanceId); + } + } + + @Override + public void onChildScalingEvent(ScalingEvent scalingEvent) { + + } + + @Override + public void onChildScalingOverMaxEvent(ScalingOverMaxEvent scalingOverMaxEvent) { + + } + + @Override + public void onParentScalingEvent(ScalingEvent scalingEvent) { + + if (log.isDebugEnabled()) { + log.debug("Parent scaling event received to [cluster]: " + this.getClusterId() + + ", [network partition]: " + scalingEvent.getNetworkPartitionId() + + ", [event] " + scalingEvent.getId() + ", [group instance] " + scalingEvent.getInstanceId()); + } + + this.scalingFactorBasedOnDependencies = scalingEvent.getFactor(); + ClusterContext vmClusterContext = (ClusterContext) clusterContext; + String instanceId = scalingEvent.getInstanceId(); + + ClusterInstanceContext clusterInstanceContext = + getClusterInstanceContext(scalingEvent.getNetworkPartitionId(), instanceId); + + + // store primary members in the cluster instance context + List<String> primaryMemberListInClusterInstance = new ArrayList<String>(); + + for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) { + + // get active primary members in this cluster instance context + for (MemberContext memberContext : partitionContext.getActiveMembers()) { + if (isPrimaryMember(memberContext)) { + primaryMemberListInClusterInstance.add(memberContext.getMemberId()); + } + } + + // get pending primary members in this cluster instance context + for (MemberContext memberContext : partitionContext.getPendingMembers()) { + if (isPrimaryMember(memberContext)) { + primaryMemberListInClusterInstance.add(memberContext.getMemberId()); + } + } + } + + + //TODO get min instance count from instance context + float requiredInstanceCount = clusterInstanceContext.getMinInstanceCount() * scalingFactorBasedOnDependencies; + int roundedRequiredInstanceCount = getRoundedInstanceCount(requiredInstanceCount, + vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor()); + clusterInstanceContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount); + + getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount", roundedRequiredInstanceCount); + getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", clusterInstanceContext.getPartitionAlgorithm()); + getDependentScaleCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary); + + dependentScaleCheckFactHandle = AutoscalerRuleEvaluator.evaluate(getDependentScaleCheckKnowledgeSession() + , dependentScaleCheckFactHandle, clusterInstanceContext); + + } + + public void sendClusterScalingEvent(String networkPartitionId, String instanceId, float factor) { + + MonitorStatusEventBuilder.handleClusterScalingEvent(this.parent, networkPartitionId, instanceId, factor, this.id); + } + + public void sendScalingOverMaxEvent(String networkPartitionId, String instanceId) { + + MonitorStatusEventBuilder.handleScalingOverMaxEvent(this.parent, networkPartitionId, instanceId, + this.id); + } + + @Override + public void handleGradientOfLoadAverageEvent( + GradientOfLoadAverageEvent gradientOfLoadAverageEvent) { + + String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId(); + String clusterId = gradientOfLoadAverageEvent.getClusterId(); + String instanceId = gradientOfLoadAverageEvent.getInstanceId(); + float value = gradientOfLoadAverageEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, value)); + } + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setLoadAverageGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleSecondDerivativeOfLoadAverageEvent( + SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) { + + String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId(); + String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId(); + String instanceId = secondDerivativeOfLoadAverageEvent.getInstanceId(); + float value = secondDerivativeOfLoadAverageEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second Derivation of load avg event: [cluster] %s " + + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); + } + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setLoadAverageSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleAverageMemoryConsumptionEvent( + AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) { + + String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId(); + String clusterId = averageMemoryConsumptionEvent.getClusterId(); + String instanceId = averageMemoryConsumptionEvent.getInstanceId(); + float value = averageMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s " + + "[value] %s", clusterId, networkPartitionId, value)); + } + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setAverageMemoryConsumption(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String + .format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleGradientOfMemoryConsumptionEvent( + GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) { + + String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId(); + String clusterId = gradientOfMemoryConsumptionEvent.getClusterId(); + String instanceId = gradientOfMemoryConsumptionEvent.getInstanceId(); + float value = gradientOfMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Grad of Memory Consumption event: [cluster] %s " + + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); + } + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setMemoryConsumptionGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleSecondDerivativeOfMemoryConsumptionEvent( + SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) { + + String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId(); + String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId(); + String instanceId = secondDerivativeOfMemoryConsumptionEvent.getInstanceId(); + float value = secondDerivativeOfMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s " + + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); + } + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setMemoryConsumptionSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + public void handleAverageRequestsServingCapabilityEvent( + AverageRequestsServingCapabilityEvent averageRequestsServingCapabilityEvent) { + + String clusterId = averageRequestsServingCapabilityEvent.getClusterId(); + String instanceId = averageRequestsServingCapabilityEvent.getInstanceId(); + String networkPartitionId = averageRequestsServingCapabilityEvent.getNetworkPartitionId(); + Float floatValue = averageRequestsServingCapabilityEvent.getValue(); + + if (log.isDebugEnabled()) { + log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, floatValue)); + } + + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setAverageRequestsServedPerInstance(floatValue); + + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + + } + + @Override + public void handleAverageRequestsInFlightEvent( + AverageRequestsInFlightEvent averageRequestsInFlightEvent) { + + String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId(); + String clusterId = averageRequestsInFlightEvent.getClusterId(); + String instanceId = averageRequestsInFlightEvent.getInstanceId(); + Float servedCount = averageRequestsInFlightEvent.getServedCount(); + Float activeInstances = averageRequestsInFlightEvent.getActiveInstances(); + Float requestsServedPerInstance = servedCount / activeInstances; + if (requestsServedPerInstance.isInfinite()) { + requestsServedPerInstance = 0f; + } + float value = averageRequestsInFlightEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, value)); + } + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setAverageRequestsInFlight(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleGradientOfRequestsInFlightEvent( + GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) { + + String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId(); + String clusterId = gradientOfRequestsInFlightEvent.getClusterId(); + String instanceId = gradientOfRequestsInFlightEvent.getInstanceId(); + float value = gradientOfRequestsInFlightEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, value)); + } + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setRequestsInFlightGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleSecondDerivativeOfRequestsInFlightEvent( + SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) { + + String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId(); + String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId(); + String instanceId = secondDerivativeOfRequestsInFlightEvent.getInstanceId(); + float value = secondDerivativeOfRequestsInFlightEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second derivative of Rif event: [cluster] %s " + + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); + } + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + if (null != clusterLevelNetworkPartitionContext) { + clusterLevelNetworkPartitionContext.setRequestsInFlightSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleMemberAverageMemoryConsumptionEvent( + MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) { + + String instanceId = memberAverageMemoryConsumptionEvent.getInstanceId(); + String memberId = memberAverageMemoryConsumptionEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, + instanceId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberAverageMemoryConsumptionEvent.getValue(); + memberStatsContext.setAverageMemoryConsumption(value); + } + + @Override + public void handleMemberGradientOfMemoryConsumptionEvent( + MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) { + + String instanceId = memberGradientOfMemoryConsumptionEvent.getInstanceId(); + String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, + instanceId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberGradientOfMemoryConsumptionEvent.getValue(); + memberStatsContext.setGradientOfMemoryConsumption(value); + } + + @Override + public void handleMemberSecondDerivativeOfMemoryConsumptionEvent( + MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) { + + } + + @Override + public void handleMemberAverageLoadAverageEvent( + MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) { + + String instanceId = memberAverageLoadAverageEvent.getInstanceId(); + String memberId = memberAverageLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, + instanceId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberAverageLoadAverageEvent.getValue(); + memberStatsContext.setAverageLoadAverage(value); + } + + @Override + public void handleMemberGradientOfLoadAverageEvent( + MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) { + + String instanceId = memberGradientOfLoadAverageEvent.getInstanceId(); + String memberId = memberGradientOfLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, + instanceId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberGradientOfLoadAverageEvent.getValue(); + memberStatsContext.setGradientOfLoadAverage(value); + } + + @Override + public void handleMemberSecondDerivativeOfLoadAverageEvent( + MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) { + + String instanceId = memberSecondDerivativeOfLoadAverageEvent.getInstanceId(); + String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, + instanceId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberSecondDerivativeOfLoadAverageEvent.getValue(); + memberStatsContext.setSecondDerivativeOfLoadAverage(value); + } + + @Override + public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) { + + String memberId = memberFaultEvent.getMemberId(); + String clusterId = memberFaultEvent.getClusterId(); + Member member = getMemberByMemberId(memberId); + String instanceId = memberFaultEvent.getInstanceId(); + String networkPartitionId = memberFaultEvent.getNetworkPartitionId(); + if (null == member) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); + } + return; + } + if (!member.isActive()) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member activated event has not received for the member %s. " + + "Therefore ignoring" + " the member fault health stat", memberId)); + } + return; + } + + ClusterInstanceContext nwPartitionCtxt; + nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId); + String partitionId = getPartitionOfMember(memberId); + ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + if (!partitionCtxt.activeMemberExist(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Could not find the active member in partition context, " + + "[member] %s ", memberId)); + } + return; + } + + // move member to obsolete list + synchronized (this) { + partitionCtxt.moveMemberToObsoleteList(memberId); + } + if (log.isDebugEnabled()) { + log.debug(String.format("Faulty member is added to obsolete list and removed from the active members list: " + + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId)); + } + + ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process( + ClusterStatusInactiveProcessor.class.getName(), clusterId, instanceId); + } + + @Override + public void handleMemberStartedEvent( + MemberStartedEvent memberStartedEvent) { + + } + + @Override + public void handleMemberActivatedEvent( + MemberActivatedEvent memberActivatedEvent) { + + String instanceId = memberActivatedEvent.getInstanceId(); + String clusterId = memberActivatedEvent.getClusterId(); + String networkPartitionId = memberActivatedEvent.getNetworkPartitionId(); + String partitionId = memberActivatedEvent.getPartitionId(); + String memberId = memberActivatedEvent.getMemberId(); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId); + ClusterLevelPartitionContext clusterLevelPartitionContext; + clusterLevelPartitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); + clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isDebugEnabled()) { + log.debug(String.format("Member stat context has been added successfully: " + + "[member] %s", memberId)); + } + clusterLevelPartitionContext.movePendingMemberToActiveMembers(memberId); + ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process( + ClusterStatusActiveProcessor.class.getName(), clusterId, instanceId); + } + + @Override + public void handleMemberMaintenanceModeEvent( + MemberMaintenanceModeEvent maintenanceModeEvent) { + + String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId(); + String partitionId = maintenanceModeEvent.getPartitionId(); + String memberId = maintenanceModeEvent.getMemberId(); + String instanceId = maintenanceModeEvent.getInstanceId(); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(networkPartitionId, + instanceId); + ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionCtxt. + getPartitionCtxt(partitionId); + clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isDebugEnabled()) { + log.debug(String.format("Member has been moved as pending termination: " + + "[member] %s", memberId)); + } + clusterMonitorPartitionContext.moveActiveMemberToTerminationPendingMembers(memberId); + } + + @Override + public void handleMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) { + + ClusterInstanceContext nwPartitionCtxt; + String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId(); + String instanceId = memberReadyToShutdownEvent.getInstanceId(); + nwPartitionCtxt = getClusterInstanceContext(networkPartitionId, instanceId); + + // start a new member in the same Partition + String memberId = memberReadyToShutdownEvent.getMemberId(); + String partitionId = getPartitionOfMember(memberId); + ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + + try { + String clusterId = memberReadyToShutdownEvent.getClusterId(); + //move member to pending termination list + if (partitionCtxt.getPendingTerminationMember(memberId) != null) { + partitionCtxt.movePendingTerminationMemberToObsoleteMembers(memberId); + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from the pending termination members " + + "and moved to obsolete list: [member] %s " + + "[partition] %s [cluster] %s ", memberId, partitionId, clusterId)); + } + } else if (partitionCtxt.getObsoleteMember(memberId) != null) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is in obsolete list: [member] %s " + + "[partition] %s [cluster] %s ", memberId, partitionId, clusterId)); + } + } //TODO else part + + //when no more members are there to terminate Invoking it monitor directly + // to speed up the termination process + if (partitionCtxt.getTotalMemberCount() == 0) { + this.monitor(); + } + + + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } + + @Override + public void handleMemberTerminatedEvent( + MemberTerminatedEvent memberTerminatedEvent) { + + String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId(); + String memberId = memberTerminatedEvent.getMemberId(); + String clusterId = memberTerminatedEvent.getClusterId(); + String instanceId = memberTerminatedEvent.getInstanceId(); + String partitionId = memberTerminatedEvent.getPartitionId(); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext( + networkPartitionId, instanceId); + ClusterLevelPartitionContext clusterMonitorPartitionContext = + clusterLevelNetworkPartitionContext.getPartitionCtxt(partitionId); + clusterMonitorPartitionContext.removeMemberStatsContext(memberId); + + if (clusterMonitorPartitionContext.removeTerminationPendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from termination pending members list: " + + "[member] %s", memberId)); + } + } else if (clusterMonitorPartitionContext.removePendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from pending members list: " + + "[member] %s", memberId)); + } + } else if (clusterMonitorPartitionContext.removeActiveMemberById(memberId)) { + log.warn(String.format("Member is in the wrong list and it is removed from " + + "active members list: %s", memberId)); + } else if (clusterMonitorPartitionContext.removeObsoleteMember(memberId)) { + log.warn(String.format("Obsolete member has either been terminated or its obsolete time out has expired and" + + " it is removed from obsolete members list: %s", memberId)); + } else { + log.warn(String.format("Member is not available in any of the list active, " + + "pending and termination pending: %s", memberId)); + } + + if (log.isDebugEnabled()) { + log.debug(String.format("Member stat context has been removed successfully: " + + "[member] %s", memberId)); + } + //Checking whether the cluster state can be changed either from in_active to created/terminating to terminated + ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process( + ClusterStatusTerminatedProcessor.class.getName(), clusterId, instanceId); + } + + @Override + public void handleClusterRemovedEvent( + ClusterRemovedEvent clusterRemovedEvent) { + + } + + @Override + public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException { + + } + + private String getNetworkPartitionIdByMemberId(String memberId) { + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.memberExists(memberId)) { + return cluster.getMember(memberId).getNetworkPartitionId(); + } + } + } + return null; + } + + private Member getMemberByMemberId(String memberId) { + try { + TopologyManager.acquireReadLock(); + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.memberExists(memberId)) { + return cluster.getMember(memberId); + } + } + } + return null; + } finally { + TopologyManager.releaseReadLock(); + } + } + + public String getPartitionOfMember(String memberId) { + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.memberExists(memberId)) { + return cluster.getMember(memberId).getPartitionId(); + } + } + } + return null; + } + + @Override + public void terminateAllMembers(final String instanceId, final String networkPartitionId) { + final ClusterMonitor monitor = this; + Thread memberTerminator = new Thread(new Runnable() { + public void run() { + + ClusterInstanceContext instanceContext = + (ClusterInstanceContext) getAllNetworkPartitionCtxts().get(networkPartitionId) + .getInstanceContext(instanceId); + boolean allMovedToObsolete = true; + for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) { + if (log.isInfoEnabled()) { + log.info("Starting to terminate all members in cluster [" + getClusterId() + "] " + + "Network Partition [" + instanceContext.getNetworkPartitionId() + "], Partition [" + + partitionContext.getPartitionId() + "]"); + } + // need to terminate active, pending and obsolete members + //FIXME to traverse concurrent + // active members + List<String> activeMembers = new ArrayList<String>(); + Iterator<MemberContext> iterator = partitionContext.getActiveMembers().listIterator(); + while (iterator.hasNext()) { + MemberContext activeMemberCtxt = iterator.next(); + activeMembers.add(activeMemberCtxt.getMemberId()); + + } + for (String memberId : activeMembers) { + log.info("Sending instance cleanup event for the active member: [member-id] " + memberId); + partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); + InstanceNotificationPublisher.getInstance(). + sendInstanceCleanupEventForMember(memberId); + } + Iterator<MemberContext> pendingIterator = partitionContext.getPendingMembers().listIterator(); + + List<String> pendingMembers = new ArrayList<String>(); + while (pendingIterator.hasNext()) { + MemberContext activeMemberCtxt = pendingIterator.next(); + pendingMembers.add(activeMemberCtxt.getMemberId()); + + } + for (String memberId : pendingMembers) { + MemberContext pendingMemberCtxt = pendingIterator.next(); + // pending members + String memeberId = pendingMemberCtxt.getMemberId(); + if (log.isDebugEnabled()) { + log.debug("Moving pending member [member id] " + memeberId + " to obsolete list"); + } + partitionContext.movePendingMemberToObsoleteMembers(memeberId); + } + if(partitionContext.getTotalMemberCount() == 0) { + allMovedToObsolete = allMovedToObsolete && true; + } else { + allMovedToObsolete = false; + } + } + + if(allMovedToObsolete) { + monitor.monitor(); + } + } + }, "Member Terminator - [cluster id] " + getClusterId()); + + memberTerminator.start(); + } + + public Map<String, ClusterLevelNetworkPartitionContext> getAllNetworkPartitionCtxts() { + return ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts(); + } + + public ClusterInstanceContext getClusterInstanceContext(String networkPartitionId, String instanceId) { + Map<String, ClusterLevelNetworkPartitionContext> clusterLevelNetworkPartitionContextMap = + ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts(); + ClusterLevelNetworkPartitionContext networkPartitionContext = + clusterLevelNetworkPartitionContextMap.get(networkPartitionId); + ClusterInstanceContext instanceContext = (ClusterInstanceContext) networkPartitionContext. + getInstanceContext(instanceId); + return instanceContext; + } + + public Collection<ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts() { + return ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts().values(); + } + + public void createClusterInstance(List<String> parentInstanceIds, Cluster cluster) + throws PolicyValidationException, PartitionValidationException { + for (String parentInstanceId : parentInstanceIds) { + createInstance(parentInstanceId, cluster); + } + + } + + public boolean createInstanceOnDemand(String instanceId) { + Cluster cluster = TopologyManager.getTopology().getService(this.serviceType). + getCluster(this.clusterId); + try { + return createInstance(instanceId, cluster); + //TODO exception + } catch (PolicyValidationException e) { + log.error("Error while creating the cluster instance", e); + } catch (PartitionValidationException e) { + log.error("Error while creating the cluster instance", e); + + } + return false; + + } + + private boolean createInstance(String parentInstanceId, Cluster cluster) + throws PolicyValidationException, PartitionValidationException { + Instance parentMonitorInstance = this.parent.getInstance(parentInstanceId); + String partitionId = null; + if (parentMonitorInstance instanceof GroupInstance) { + partitionId = parentMonitorInstance.getPartitionId(); + } + if (parentMonitorInstance != null) { + + ClusterInstance clusterInstance = cluster.getInstanceContexts(parentInstanceId); + if (clusterInstance != null) { + + // Cluster instance is already there. No need to create one. + ClusterContext clusterContext = (ClusterContext) this.getClusterContext(); + if (clusterContext == null) { + + clusterContext = ClusterContextFactory.getVMClusterContext(clusterInstance.getInstanceId(), cluster, + hasScalingDependents()); + this.setClusterContext(clusterContext); + } + + // create VMClusterContext and then add all the instanceContexts + clusterContext.addInstanceContext(parentInstanceId, cluster, hasScalingDependents(), + groupScalingEnabledSubtree()); + if (this.getInstance(clusterInstance.getInstanceId()) == null) { + this.addInstance(clusterInstance); + } + // Checking the current status of the cluster instance + boolean stateChanged = + ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain() + .process("", cluster.getClusterId(), clusterInstance.getInstanceId()); + if (!stateChanged && clusterInstance.getStatus() != ClusterStatus.Created) { + this.notifyParentMonitor(clusterInstance.getStatus(), + clusterInstance.getInstanceId()); + + if (this.hasMonitoringStarted().compareAndSet(false, true)) { + this.startScheduler(); + log.info("Monitoring task for Cluster Monitor with cluster id " + + cluster.getClusterId() + " started successfully"); + } + } + } else { + createClusterInstance(cluster.getServiceName(), cluster.getClusterId(), null, parentInstanceId, partitionId, + parentMonitorInstance.getNetworkPartitionId()); + + } + return true; + + } else { + return false; + + } + + } + + /** + * Move all the members of the cluster instance to termiantion pending + * + * @param instanceId + */ + public void moveMembersFromActiveToPendingTermination(String instanceId) { + + //TODO take read lock for network partition context + //FIXME to iterate properly + for (ClusterLevelNetworkPartitionContext networkPartitionContext : + ((ClusterContext) this.clusterContext).getNetworkPartitionCtxts().values()) { + ClusterInstanceContext clusterInstanceContext = + (ClusterInstanceContext) networkPartitionContext.getInstanceContext(instanceId); + if (clusterInstanceContext != null) { + for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) { + List<String> members = new ArrayList<String>(); + + Iterator<MemberContext> iterator = partitionContext.getActiveMembers().listIterator(); + while (iterator.hasNext()) { + MemberContext activeMember = iterator.next(); + members.add(activeMember.getMemberId()); + } + + for (String memberId : members) { + partitionContext.moveActiveMemberToTerminationPendingMembers( + memberId); + } + List<String> pendingMembers = new ArrayList<String>(); + + Iterator<MemberContext> pendingIterator = partitionContext.getPendingMembers().listIterator(); + while (pendingIterator.hasNext()) { + MemberContext activeMember = pendingIterator.next(); + pendingMembers.add(activeMember.getMemberId()); + } + for (String memberId : members) { + // pending members + if (log.isDebugEnabled()) { + log.debug("Moving pending member [member id] " + memberId + " the obsolete list"); + } + partitionContext.movePendingMemberToObsoleteMembers(memberId); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/d9c323a2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java index e5fa765..f870e11 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java @@ -21,14 +21,9 @@ package org.apache.stratos.autoscaler.monitor.cluster; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException; import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; -import org.apache.stratos.cloud.controller.stub.domain.MemberContext; -import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.Member; -import org.apache.stratos.messaging.domain.topology.MemberStatus; /* * Factory class for creating cluster monitors. @@ -47,19 +42,12 @@ public class ClusterMonitorFactory { boolean groupScalingEnabledSubtree) throws PolicyValidationException, PartitionValidationException { - AbstractClusterMonitor clusterMonitor; -// if (cluster.isKubernetesCluster()) { -// clusterMonitor = getDockerServiceClusterMonitor(cluster); -////// } else if (cluster.isLbCluster()) { -////// clusterMonitor = getVMLbClusterMonitor(cluster); -// } else { - clusterMonitor = getVMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree); -// } - + AbstractClusterMonitor clusterMonitor = + getVMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree); return clusterMonitor; } - private static VMClusterMonitor getVMClusterMonitor(Cluster cluster, boolean hasScalingDependents, + private static ClusterMonitor getVMClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree) throws PolicyValidationException, PartitionValidationException { @@ -67,7 +55,7 @@ public class ClusterMonitorFactory { return null; } - VMClusterMonitor clusterMonitor = new VMClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree); + ClusterMonitor clusterMonitor = new ClusterMonitor(cluster, hasScalingDependents, groupScalingEnabledSubtree); // find lb reference type java.util.Properties props = cluster.getProperties(); @@ -90,121 +78,4 @@ public class ClusterMonitorFactory { log.info("VMClusterMonitor created: " + clusterMonitor.toString()); return clusterMonitor; } -// -// private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster) -// throws PolicyValidationException, PartitionValidationException { -// -// if (null == cluster) { -// return null; -// } -// -// VMLbClusterMonitor clusterMonitor = -// new VMLbClusterMonitor(cluster.getServiceName(), cluster.getClusterId()); -// clusterMonitor.notifyParentMonitor(ClusterStatus.Created); -// -// log.info("VMLbClusterMonitor created: " + clusterMonitor.toString()); -// return clusterMonitor; -// } - - /** - * @param cluster - the cluster which needs to be monitored - * @return - the cluster monitor - */ - private static KubernetesClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) - throws PolicyValidationException { - - if (null == cluster) { - return null; - } - -// String autoscalePolicyName = cluster.getAutoscalePolicyName(); -// -// AutoscalePolicy autoscalePolicy = -// PolicyManager.getInstance() -// .getAutoscalePolicy(autoscalePolicyName); -// if (log.isDebugEnabled()) { -// log.debug("Autoscaling policy name: " + autoscalePolicyName); -// } -// -// AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName); -// -// if (policy == null) { -// String msg = String.format("Autoscaling policy is null: [policy-name] %s", autoscalePolicyName); -// log.error(msg); -// throw new PolicyValidationException(msg); -// } -// - java.util.Properties properties = cluster.getProperties(); - if (properties == null) { - String message = String.format("Properties not found in kubernetes cluster: [cluster-id] %s", - cluster.getClusterId()); - log.error(message); - throw new RuntimeException(message); - } -// String minReplicasProperty = properties.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS); -// int minReplicas = 0; -// if (minReplicasProperty != null && !minReplicasProperty.isEmpty()) { -// minReplicas = Integer.parseInt(minReplicasProperty); -// } -// -// int maxReplicas = 0; -// String maxReplicasProperty = properties.getProperty(StratosConstants.KUBERNETES_MAX_REPLICAS); -// if (maxReplicasProperty != null && !maxReplicasProperty.isEmpty()) { -// maxReplicas = Integer.parseInt(maxReplicasProperty); -// } -// -// String kubernetesHostClusterID = properties.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID); -// KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID, -// cluster.getClusterId(), cluster.getServiceName(), autoscalePolicy, minReplicas, maxReplicas); - - -// KubernetesClusterMonitor dockerClusterMonitor = new KubernetesClusterMonitor(cluster); - - //populate the members after restarting -// for (Member member : cluster.getMembers()) { -// String memberId = member.getMemberId(); -// String clusterId = member.getClusterId(); -// MemberContext memberContext = new MemberContext(); -// memberContext.setMemberId(memberId); -// memberContext.setClusterId(clusterId); -// memberContext.setInitTime(member.getInitTime()); -// -// // if there is at least one member in the topology, that means service has been created already -// // this is to avoid calling startContainer() method again -// //kubernetesClusterCtxt.setServiceClusterCreated(true); -// -// if (MemberStatus.Activated.equals(member.getStatus())) { -// if (log.isDebugEnabled()) { -// String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); -// log.debug(msg); -// } -// ((VMClusterContext) dockerClusterMonitor.getClusterContext()).addActiveMember(memberContext); -// } else if (MemberStatus.Created.equals(member.getStatus()) -// || MemberStatus.Starting.equals(member.getStatus())) { -// if (log.isDebugEnabled()) { -// String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); -// log.debug(msg); -// } -// dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext); -// } -// -// //kubernetesClusterCtxt.addMemberStatsContext(new MemberStatsContext(memberId)); -// if (log.isInfoEnabled()) { -// log.info(String.format("Member stat context has been added: [member] %s", memberId)); -// } -// } -// -// // find lb reference type -// if (properties.containsKey(StratosConstants.LOAD_BALANCER_REF)) { -// String value = properties.getProperty(StratosConstants.LOAD_BALANCER_REF); -// dockerClusterMonitor.setLbReferenceType(value); -// if (log.isDebugEnabled()) { -// log.debug("Set the lb reference type: " + value); -// } -// } - -// log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString()); -// return dockerClusterMonitor; - return null; - } }
