http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java index bf4c29d..53d8edb 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java @@ -22,15 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; -import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage; -import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption; -import org.apache.stratos.autoscaler.pojo.policy.autoscale.RequestsInFlight; -import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.ChildLevelPartition; -import org.apache.stratos.autoscaler.pojo.policy.deployment.partition.network.ChildLevelNetworkPartition; -import org.apache.stratos.cloud.controller.stub.domain.Partition; import java.io.Serializable; -import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -43,88 +36,25 @@ public class ClusterLevelNetworkPartitionContext extends NetworkPartitionContext private static final Log log = LogFactory.getLog(ClusterLevelNetworkPartitionContext.class); private static final long serialVersionUID = 572769304374110159L; private final String id; - private int scaleDownRequestsCount = 0; - private float averageRequestsServedPerInstance; - private float requestsServedPerInstance; - - private int minInstanceCount = 0, maxInstanceCount = 0; - private int requiredInstanceCountBasedOnStats; - private int requiredInstanceCountBasedOnDependencies; private Map<String, ClusterInstanceContext> instanceIdToClusterInstanceContextMap; - private final String partitionAlgorithm; - - //boolean values to keep whether the requests in flight parameters are reset or not - private boolean rifReset = false, averageRifReset = false, gradientRifReset = false, secondDerivativeRifRest = false; - //boolean values to keep whether the memory consumption parameters are reset or not - private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false, - gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false; - //boolean values to keep whether the load average parameters are reset or not - private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, - secondDerivativeLoadAverageRest = false; - //boolean values to keep whether average requests served per instance parameters are reset or not - private boolean averageRequestServedPerInstanceReset = false; - - private final ChildLevelPartition[] partitions; - - //Following information will keep events details - private RequestsInFlight requestsInFlight; - private MemoryConsumption memoryConsumption; - private LoadAverage loadAverage; - - //details required for partition selection algorithms - private int currentPartitionIndex; - - //partitions of this network partition - private final Map<String, ClusterLevelPartitionContext> partitionCtxts; - - public ClusterLevelNetworkPartitionContext(String id, String partitionAlgo, ChildLevelPartition[] partitions) { + public ClusterLevelNetworkPartitionContext(String id) { //super(id, partitionAlgo, partitions); this.id = id; - this.partitionAlgorithm = partitionAlgo; - if (partitions == null) { - this.partitions = new ChildLevelPartition[0]; - } else { - this.partitions = Arrays.copyOf(partitions, partitions.length); - } - partitionCtxts = new HashMap<String, ClusterLevelPartitionContext>(); - requestsInFlight = new RequestsInFlight(); - loadAverage = new LoadAverage(); - memoryConsumption = new MemoryConsumption(); -// for (ChildLevelPartition partition : partitions) { -// minInstanceCount += partition.get(); -// maxInstanceCount += partition.getPartitionMax(); -// } -// requiredInstanceCountBasedOnStats = minInstanceCount; -// requiredInstanceCountBasedOnDependencies = minInstanceCount; - instanceIdToClusterInstanceContextMap = new HashMap<String, ClusterInstanceContext>(); - } + instanceIdToClusterInstanceContextMap = new HashMap<String, ClusterInstanceContext>(); - public int getMinInstanceCount() { - return minInstanceCount; } - public void setMinInstanceCount(int minInstanceCount) { - this.minInstanceCount = minInstanceCount; - } -// -// public int getMaxInstanceCount() { -// return maxInstanceCount; -// } -// -// public void setMaxInstanceCount(int maxInstanceCount) { -// this.maxInstanceCount = maxInstanceCount; -// } public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((this.id == null) ? 0 : this.id.hashCode()); + result = prime * result + ((this.getId() == null) ? 0 : this.getId().hashCode()); return result; } @@ -141,305 +71,18 @@ public class ClusterLevelNetworkPartitionContext extends NetworkPartitionContext return false; } final ClusterLevelNetworkPartitionContext other = (ClusterLevelNetworkPartitionContext) obj; - if (this.id == null) { - if (other.id != null) { + if (this.getId() == null) { + if (other.getId() != null) { return false; } - } else if (!this.id.equals(other.id)) { + } else if (!this.getId().equals(other.getId())) { return false; } return true; } - @Override - public String toString() { - return "NetworkPartitionContext [id=" + id + "partitionAlgorithm=" + partitionAlgorithm + "]"; - } - - public int getCurrentPartitionIndex() { - return currentPartitionIndex; - } - - public void setCurrentPartitionIndex(int currentPartitionIndex) { - this.currentPartitionIndex = currentPartitionIndex; - } - - public float getAverageRequestsServedPerInstance() { - return averageRequestsServedPerInstance; - } - - public void setAverageRequestsServedPerInstance(float averageRequestServedPerInstance) { - this.averageRequestsServedPerInstance = averageRequestServedPerInstance; - averageRequestServedPerInstanceReset = true; - - if (log.isDebugEnabled()) { - log.debug(String.format("Average Requesets Served Per Instance stats are reset, ready to do scale check [network partition] %s" - , this.id)); - - } - } - - public float getRequestsServedPerInstance() { - return requestsServedPerInstance; - } - - public float getAverageRequestsInFlight() { - return requestsInFlight.getAverage(); - } - - public void setAverageRequestsInFlight(float averageRequestsInFlight) { - requestsInFlight.setAverage(averageRequestsInFlight); - averageRifReset = true; - if (secondDerivativeRifRest && gradientRifReset) { - rifReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public float getRequestsInFlightSecondDerivative() { - return requestsInFlight.getSecondDerivative(); - } - - public void setRequestsInFlightSecondDerivative(float requestsInFlightSecondDerivative) { - requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative); - secondDerivativeRifRest = true; - if (averageRifReset && gradientRifReset) { - rifReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public float getRequestsInFlightGradient() { - return requestsInFlight.getGradient(); - } - - public void setRequestsInFlightGradient(float requestsInFlightGradient) { - requestsInFlight.setGradient(requestsInFlightGradient); - gradientRifReset = true; - if (secondDerivativeRifRest && averageRifReset) { - rifReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public boolean isRifReset() { - return rifReset; - } - - public void setRifReset(boolean rifReset) { - this.rifReset = rifReset; - this.averageRifReset = rifReset; - this.gradientRifReset = rifReset; - this.secondDerivativeRifRest = rifReset; - } - - - public float getAverageMemoryConsumption() { - return memoryConsumption.getAverage(); - } - - public void setAverageMemoryConsumption(float averageMemoryConsumption) { - memoryConsumption.setAverage(averageMemoryConsumption); - averageMemoryConsumptionReset = true; - if (secondDerivativeMemoryConsumptionRest && gradientMemoryConsumptionReset) { - memoryConsumptionReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public float getMemoryConsumptionSecondDerivative() { - return memoryConsumption.getSecondDerivative(); - } - - public void setMemoryConsumptionSecondDerivative(float memoryConsumptionSecondDerivative) { - memoryConsumption.setSecondDerivative(memoryConsumptionSecondDerivative); - secondDerivativeMemoryConsumptionRest = true; - if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) { - memoryConsumptionReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public float getMemoryConsumptionGradient() { - return memoryConsumption.getGradient(); - } - - public void setMemoryConsumptionGradient(float memoryConsumptionGradient) { - memoryConsumption.setGradient(memoryConsumptionGradient); - gradientMemoryConsumptionReset = true; - if (secondDerivativeMemoryConsumptionRest && averageMemoryConsumptionReset) { - memoryConsumptionReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public boolean isMemoryConsumptionReset() { - return memoryConsumptionReset; - } - - public void setMemoryConsumptionReset(boolean memoryConsumptionReset) { - this.memoryConsumptionReset = memoryConsumptionReset; - this.averageMemoryConsumptionReset = memoryConsumptionReset; - this.gradientMemoryConsumptionReset = memoryConsumptionReset; - this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset; - } - - - public float getAverageLoadAverage() { - return loadAverage.getAverage(); - } - - public void setAverageLoadAverage(float averageLoadAverage) { - loadAverage.setAverage(averageLoadAverage); - averageLoadAverageReset = true; - if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) { - loadAverageReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public float getLoadAverageSecondDerivative() { - return loadAverage.getSecondDerivative(); - } - - public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) { - loadAverage.setSecondDerivative(loadAverageSecondDerivative); - secondDerivativeLoadAverageRest = true; - if (averageLoadAverageReset && gradientLoadAverageReset) { - loadAverageReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public float getLoadAverageGradient() { - return loadAverage.getGradient(); - } - - public void setLoadAverageGradient(float loadAverageGradient) { - loadAverage.setGradient(loadAverageGradient); - gradientLoadAverageReset = true; - if (secondDerivativeLoadAverageRest && averageLoadAverageReset) { - loadAverageReset = true; - if (log.isDebugEnabled()) { - log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" - , this.id)); - } - } - } - - public boolean isLoadAverageReset() { - return loadAverageReset; - } - - public void setLoadAverageReset(boolean loadAverageReset) { - this.loadAverageReset = loadAverageReset; - this.averageLoadAverageReset = loadAverageReset; - this.gradientLoadAverageReset = loadAverageReset; - this.secondDerivativeLoadAverageRest = loadAverageReset; - } - public String getId() { return id; } - - public Map<String, ClusterLevelPartitionContext> getPartitionCtxts() { - return partitionCtxts; - } - - public ClusterLevelPartitionContext getPartitionCtxt(String partitionId) { - return partitionCtxts.get(partitionId); - } - - public void addPartitionContext(ClusterLevelPartitionContext partitionContext) { - partitionCtxts.put(partitionContext.getPartitionId(), partitionContext); - } - - public String getPartitionAlgorithm() { - return partitionAlgorithm; - } - - public ChildLevelPartition[] getPartitions() { - return partitions; - } - - public int getNonTerminatedMemberCountOfPartition(String partitionId) { - if (partitionCtxts.containsKey(partitionId)) { - return getPartitionCtxt(partitionId).getNonTerminatedMemberCount(); - } - return 0; - } - - public int getActiveMemberCount(String currentPartitionId) { - if (partitionCtxts.containsKey(currentPartitionId)) { - return getPartitionCtxt(currentPartitionId).getActiveMemberCount(); - } - return 0; - } - - public int getScaleDownRequestsCount() { - return scaleDownRequestsCount; - } - - public void resetScaleDownRequestsCount() { - this.scaleDownRequestsCount = 0; - } - - public void increaseScaleDownRequestsCount() { - this.scaleDownRequestsCount += 1; - } - - public float getRequiredInstanceCountBasedOnStats() { - return requiredInstanceCountBasedOnStats; - } - - public void setRequiredInstanceCountBasedOnStats(int requiredInstanceCountBasedOnStats) { - this.requiredInstanceCountBasedOnStats = requiredInstanceCountBasedOnStats; - } - - public int getRequiredInstanceCountBasedOnDependencies() { - return requiredInstanceCountBasedOnDependencies; - } - - public void setRequiredInstanceCountBasedOnDependencies(int requiredInstanceCountBasedOnDependencies) { - this.requiredInstanceCountBasedOnDependencies = requiredInstanceCountBasedOnDependencies; - } - - public Map<String, ClusterInstanceContext> getClusterInstanceContextMap() { - return instanceIdToClusterInstanceContextMap; - } - - public void addClusterInstanceContext (ClusterInstanceContext clusterInstanceContext) { - instanceIdToClusterInstanceContextMap.put(clusterInstanceContext.getClusterInstanceId(), - clusterInstanceContext); - } - - public ClusterInstanceContext getClusterInstanceContext (String clusterInstanceId) { - return instanceIdToClusterInstanceContextMap.get(clusterInstanceId); - } - -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java index f6bed25..f41466b 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -21,18 +21,19 @@ package org.apache.stratos.autoscaler.event.receiver.topology; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; import org.apache.stratos.autoscaler.context.AutoscalerContext; import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory; -import org.apache.stratos.autoscaler.applications.ApplicationHolder; +import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException; +import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException; import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException; import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException; -import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException; -import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.MonitorFactory; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; +import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor; import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.messaging.domain.applications.Application; @@ -203,7 +204,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } @@ -224,7 +225,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } @@ -254,7 +255,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } @@ -276,7 +277,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } // if monitor does not exist, send cluster terminated event ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(), @@ -285,12 +286,12 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } //changing the status in the monitor, will notify its parent monitor if (monitor.getStatus() == ClusterStatus.Active) { - // terminated gracefully - monitor.setStatus(ClusterStatus.Terminating); - InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId); + // terminated gracefully + monitor.setStatus(ClusterStatus.Terminating); + InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId); } else { - monitor.setStatus(ClusterStatus.Terminating); - monitor.terminateAllMembers(); + monitor.setStatus(ClusterStatus.Terminating); + monitor.terminateAllMembers(); } ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain(). process("", clusterId, instanceId); @@ -309,11 +310,11 @@ public class AutoscalerTopologyEventReceiver implements Runnable { if (null == monitor) { if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } // if the cluster monitor is null, assume that its termianted ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId()); - if (appMonitor != null) { + if (appMonitor != null) { appMonitor.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null)); } return; @@ -354,7 +355,7 @@ public class AutoscalerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new MemberStartedEventListener() { @Override protected void onEvent(Event event) { - + } }); @@ -438,7 +439,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable { (ClusterInstanceCreatedEvent) event; AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance(). getClusterMonitor(clusterInstanceCreatedEvent.getClusterId()); - + String instanceId = ((ClusterInstanceCreatedEvent) event).getInstanceId(); + //FIXME to take lock when clusterMonitor is running if (clusterMonitor != null) { TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), clusterInstanceCreatedEvent.getClusterId()); @@ -450,58 +452,62 @@ public class AutoscalerTopologyEventReceiver implements Runnable { if (service != null) { Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId()); if (cluster != null) { - // create and add Cluster Context try { if (cluster.isKubernetesCluster()) { - clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getKubernetesClusterContext(cluster)); - } else if (cluster.isLbCluster()) { - clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getVMLBClusterContext(cluster)); + clusterMonitor.setClusterContext( + ClusterContextFactory.getKubernetesClusterContext( + instanceId, + cluster)); } else { - clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(), - ClusterContextFactory.getVMServiceClusterContext(cluster)); - } + VMClusterContext clusterContext = + (VMClusterContext) clusterMonitor.getClusterContext(); + if (clusterContext == null) { + clusterMonitor.setClusterContext( + ClusterContextFactory. + getVMServiceClusterContext(instanceId, + cluster)); + } else { + clusterContext.addInstanceContext(instanceId, cluster); + } + } if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) { clusterMonitor.startScheduler(); log.info("Monitoring task for Cluster Monitor with cluster id " + clusterInstanceCreatedEvent.getClusterId() + " started successfully"); } - } catch (PolicyValidationException e) { log.error(e.getMessage(), e); } catch (PartitionValidationException e) { log.error(e.getMessage(), e); } + } } else { - log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() + - ", no cluster instance added to ClusterMonitor " + + log.error("Service " + clusterInstanceCreatedEvent.getServiceName() + + " not found, no cluster instance added to ClusterMonitor " + clusterInstanceCreatedEvent.getClusterId()); } - } else { - log.error("Service " + clusterInstanceCreatedEvent.getServiceName() + - " not found, no cluster instance added to ClusterMonitor " + - clusterInstanceCreatedEvent.getClusterId()); + + }finally{ + TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), + clusterInstanceCreatedEvent.getClusterId()); } - } finally { - TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(), + }else{ + log.error("No Cluster Monitor found for cluster id " + clusterInstanceCreatedEvent.getClusterId()); } - - } else { - log.error("No Cluster Monitor found for cluster id " + - clusterInstanceCreatedEvent.getClusterId()); } } - }); - } - /** - * Terminate load balancer topology receiver thread. - */ + ); + } + + /** + * Terminate load balancer topology receiver thread. + */ + public void terminate() { topologyEventReceiver.terminate(); terminated = true; http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java index d779a5c..f356910 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/MonitorFactory.java @@ -78,8 +78,14 @@ public class MonitorFactory { ClusterChildContext clusterChildCtxt = (ClusterChildContext) context; AbstractClusterMonitor clusterMonitor = (AbstractClusterMonitor) monitor; // FIXME: passing null as alias for cluster instance temporarily. should be removed. - createClusterInstance(clusterChildCtxt.getServiceName(), clusterMonitor.getClusterId(), null, parentInstanceIds.get(0)); - AutoscalerContext.getInstance().addClusterMonitor((AbstractClusterMonitor) monitor); + for(String parentInstanceId : parentInstanceIds) { + createClusterInstance(clusterChildCtxt.getServiceName(), + clusterMonitor.getClusterId(), null, + parentInstanceIds.get(0)); + AutoscalerContext.getInstance(). + addClusterMonitor((AbstractClusterMonitor) monitor); + } + } } else { monitor = getApplicationMonitor(appId); http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java index 646e327..756079e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java @@ -61,8 +61,7 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable protected boolean hasFaultyMember = false; protected boolean stop = false; private AtomicBoolean monitoringStarted; - //protected AbstractClusterContext clusterContext; - protected final Map<String, AbstractClusterContext> instanceIdToClusterContextMap; + protected AbstractClusterContext clusterContext; private String clusterId; private ClusterStatus status; private int monitoringIntervalMilliseconds; @@ -82,7 +81,7 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; this.monitoringStarted = new AtomicBoolean(false); //this.clusterContext = abstractClusterContext; - this.instanceIdToClusterContextMap = new HashMap<String, AbstractClusterContext>(); + //this.instanceIdToClusterContextMap = new HashMap<String, AbstractClusterContext>(); this.obsoleteCheckKnowledgeSession = autoscalerRuleEvaluator.getObsoleteCheckStatefulSession(); this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession(); this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession(); @@ -349,7 +348,7 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable this.hasFaultyMember = hasFaultyMember; } - public void addClusterContextForInstance (String instanceId, AbstractClusterContext clusterContext) { + /*public void addClusterContextForInstance (String instanceId, AbstractClusterContext clusterContext) { if (instanceIdToClusterContextMap.get(instanceId) == null) { synchronized (instanceIdToClusterContextMap) { @@ -371,7 +370,7 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable // if instanceId is null, assume that map contains only one element and return that return instanceIdToClusterContextMap.get(instanceId); - } + }*/ public abstract void terminateAllMembers(); @@ -408,4 +407,12 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable public void setDependentScaleCheckKnowledgeSession(StatefulKnowledgeSession dependentScaleCheckKnowledgeSession) { this.dependentScaleCheckKnowledgeSession = dependentScaleCheckKnowledgeSession; } + + public AbstractClusterContext getClusterContext() { + return clusterContext; + } + + public void setClusterContext(AbstractClusterContext clusterContext) { + this.clusterContext = clusterContext; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java index 1564deb..7fabbec 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java @@ -18,15 +18,9 @@ */ package org.apache.stratos.autoscaler.monitor.cluster; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.context.cluster.AbstractClusterContext; import org.apache.stratos.autoscaler.context.cluster.KubernetesClusterContext; import org.apache.stratos.autoscaler.exception.InvalidArgumentException; import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent; @@ -39,6 +33,9 @@ import org.apache.stratos.common.Property; import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import java.util.Arrays; +import java.util.List; + /* * It is monitoring a kubernetes service cluster periodically. */ @@ -50,10 +47,10 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni public KubernetesServiceClusterMonitor(String serviceType, String clusterId) { super(serviceType, clusterId, - new AutoscalerRuleEvaluator( - StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE, - StratosConstants.CONTAINER_OBSOLETE_CHECK_DROOL_FILE, - StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE)); + new AutoscalerRuleEvaluator( + StratosConstants.CONTAINER_MIN_CHECK_DROOL_FILE, + StratosConstants.CONTAINER_OBSOLETE_CHECK_DROOL_FILE, + StratosConstants.CONTAINER_SCALE_CHECK_DROOL_FILE)); readConfigurations(); } @@ -64,6 +61,7 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni log.debug("KubernetesServiceClusterMonitor is running..." + this.toString()); } try { + if (!ClusterStatus.Active.getNextStates().contains(getStatus())) { monitor(); } else { @@ -74,37 +72,35 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni } } catch (Exception e) { log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(), - e); + e); } } @Override protected void monitor() { - - Set<Map.Entry<String, AbstractClusterContext>> instanceIdToClusterCtxtEntries = instanceIdToClusterContextMap.entrySet(); - for (final Map.Entry<String, AbstractClusterContext> instanceIdToClusterCtxtEntry : instanceIdToClusterCtxtEntries) { - Runnable monitoringRunnable = new Runnable() { - - @Override - public void run() { - obsoleteCheck(); - minCheck(); - scaleCheck(instanceIdToClusterCtxtEntry.getKey()); - } - }; - monitoringRunnable.run(); - } + final String instanceId = this.getKubernetesClusterCtxt().getInstanceId(); + Runnable monitoringRunnable = new Runnable() { + + @Override + public void run() { + obsoleteCheck(); + minCheck(); + scaleCheck(instanceId); + } + }; + monitoringRunnable.run(); } + private void scaleCheck(String instanceId) { boolean rifReset = getKubernetesClusterCtxt().isRifReset(); boolean memoryConsumptionReset = getKubernetesClusterCtxt().isMemoryConsumptionReset(); boolean loadAverageReset = getKubernetesClusterCtxt().isLoadAverageReset(); if (log.isDebugEnabled()) { log.debug("flag of rifReset : " + rifReset - + " flag of memoryConsumptionReset : " - + memoryConsumptionReset + " flag of loadAverageReset : " - + loadAverageReset); + + " flag of memoryConsumptionReset : " + + memoryConsumptionReset + " flag of loadAverageReset : " + + loadAverageReset); } String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); String clusterId = getClusterId(); @@ -125,40 +121,40 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni getKubernetesClusterCtxt().setLoadAverageReset(false); } else if (log.isDebugEnabled()) { log.debug(String.format("Scale check will not run since none of the statistics have not received yet for " - + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId)); + + "[kub-cluster] : %s [cluster] : %s", kubernetesClusterID, clusterId)); } } private AutoscalePolicy getAutoscalePolicy(String instanceId) { - KubernetesClusterContext kubernetesClusterContext = (KubernetesClusterContext) instanceIdToClusterContextMap.get(instanceId); + KubernetesClusterContext kubernetesClusterContext = (KubernetesClusterContext) this.clusterContext; return kubernetesClusterContext.getAutoscalePolicy(); } private void minCheck() { - getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); - String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); + getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); if (log.isDebugEnabled()) { log.debug(String.format( "Running min check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId())); } - minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck( - getMinCheckKnowledgeSession(), minCheckFactHandle, - getKubernetesClusterCtxt()); - } - - private void obsoleteCheck() { - getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); - String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); + minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck( + getMinCheckKnowledgeSession(), minCheckFactHandle, + getKubernetesClusterCtxt()); + } + + private void obsoleteCheck() { + getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + String kubernetesClusterID = getKubernetesClusterCtxt().getKubernetesClusterID(); if (log.isDebugEnabled()) { log.debug(String.format( "Running obsolete check for [kub-cluster] : %s [cluster] : %s ", kubernetesClusterID, getClusterId())); } - obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck( - getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, - getKubernetesClusterCtxt()); - } + obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck( + getObsoleteCheckKnowledgeSession(), obsoleteCheckFactHandle, + getKubernetesClusterCtxt()); + } - @Override + @Override public void destroy() { getMinCheckKnowledgeSession().dispose(); getObsoleteCheckKnowledgeSession().dispose(); @@ -183,8 +179,8 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni @Override public String toString() { return "KubernetesServiceClusterMonitor " - + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID() - + ", clusterId=" + getClusterId() + "]"; + + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID() + + ", clusterId=" + getClusterId() + "]"; } public String getLbReferenceType() { @@ -197,23 +193,23 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni @Override public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException { - + if (properties != null) { Property[] propertyArray = properties.getProperties(); if (propertyArray == null) { return; } List<Property> propertyList = Arrays.asList(propertyArray); - + for (Property property : propertyList) { String key = property.getName(); String value = property.getValue(); - + if (StratosConstants.KUBERNETES_MIN_REPLICAS.equals(key)) { int min = Integer.parseInt(value); int max = getKubernetesClusterCtxt().getMaxReplicas(); if (min > max) { - String msg = String.format("%s should be less than %s . But %s is not less than %s.", + String msg = String.format("%s should be less than %s . But %s is not less than %s.", StratosConstants.KUBERNETES_MIN_REPLICAS, StratosConstants.KUBERNETES_MAX_REPLICAS, min, max); log.error(msg); throw new InvalidArgumentException(msg); @@ -222,7 +218,7 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni break; } } - + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/4f2c1b70/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java index 00e1879..1db7a31 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java @@ -18,13 +18,14 @@ */ package org.apache.stratos.autoscaler.monitor.cluster; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.client.CloudControllerClient; -import org.apache.stratos.autoscaler.context.cluster.AbstractClusterContext; +import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; import org.apache.stratos.autoscaler.context.member.MemberStatsContext; import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; @@ -84,9 +85,10 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId ,networkPartitionId); - if (null != clusterLevelNetworkPartitionContext) { - clusterLevelNetworkPartitionContext.setAverageLoadAverage(value); + ClusterInstanceContext clusterInstanceContext = getClusterInstanceContext(instanceId, + networkPartitionId); + if (null != clusterInstanceContext) { + clusterInstanceContext.setAverageLoadAverage(value); } else { if (log.isDebugEnabled()) { log.debug(String.format("Network partition context is not available for :" + @@ -108,7 +110,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s", clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setLoadAverageGradient(value); } else { @@ -131,7 +134,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { log.debug(String.format("Second Derivation of load avg event: [cluster] %s " + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setLoadAverageSecondDerivative(value); } else { @@ -154,7 +158,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s " + "[value] %s", clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setAverageMemoryConsumption(value); } else { @@ -178,7 +183,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { log.debug(String.format("Grad of Memory Consumption event: [cluster] %s " + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setMemoryConsumptionGradient(value); } else { @@ -201,7 +207,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s " + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setMemoryConsumptionSecondDerivative(value); } else { @@ -215,6 +222,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { public void handleAverageRequestsServingCapabilityEvent(AverageRequestsServingCapabilityEvent averageRequestsServingCapabilityEvent) { String clusterId = averageRequestsServingCapabilityEvent.getClusterId(); + String instanceId = averageRequestsServingCapabilityEvent.getInstanceId(); String networkPartitionId = averageRequestsServingCapabilityEvent.getNetworkPartitionId(); Float floatValue = averageRequestsServingCapabilityEvent.getValue(); @@ -223,7 +231,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { clusterId, networkPartitionId, floatValue)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(null, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if(null != clusterLevelNetworkPartitionContext){ clusterLevelNetworkPartitionContext.setAverageRequestsServedPerInstance(floatValue); @@ -254,7 +263,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s", clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setAverageRequestsInFlight(value); } else { @@ -277,7 +287,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s", clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setRequestsInFlightGradient(value); } else { @@ -300,7 +311,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { log.debug(String.format("Second derivative of Rif event: [cluster] %s " + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); } - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setRequestsInFlightSecondDerivative(value); } else { @@ -319,8 +331,10 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String memberId = memberAverageMemoryConsumptionEvent.getMemberId(); Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); - ClusterLevelNetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -340,8 +354,10 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId(); Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); - ClusterLevelNetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -367,8 +383,10 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String memberId = memberAverageLoadAverageEvent.getMemberId(); Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); - ClusterLevelNetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -388,8 +406,10 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String memberId = memberGradientOfLoadAverageEvent.getMemberId(); Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); - ClusterLevelNetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -410,8 +430,10 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); - ClusterLevelNetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -429,6 +451,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String memberId = memberFaultEvent.getMemberId(); Member member = getMemberByMemberId(memberId); String instanceId = memberFaultEvent.getInstanceId(); + String networkPartitionId = memberFaultEvent.getNetworkPartitionId(); if (null == member) { if (log.isDebugEnabled()) { log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); @@ -443,8 +466,9 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { return; } - ClusterLevelNetworkPartitionContext nwPartitionCtxt; - nwPartitionCtxt = getNetworkPartitionCtxt(instanceId, member); + ClusterInstanceContext nwPartitionCtxt; + nwPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); String partitionId = getPartitionOfMember(memberId); ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); if (!partitionCtxt.activeMemberExist(memberId)) { @@ -483,7 +507,8 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String networkPartitionId = memberActivatedEvent.getNetworkPartitionId(); String partitionId = memberActivatedEvent.getPartitionId(); String memberId = memberActivatedEvent.getMemberId(); - ClusterLevelNetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); ClusterLevelPartitionContext clusterLevelPartitionContext; clusterLevelPartitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); clusterLevelPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); @@ -503,8 +528,10 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String partitionId = maintenanceModeEvent.getPartitionId(); String memberId = maintenanceModeEvent.getMemberId(); String instanceId = maintenanceModeEvent.getInstanceId(); - ClusterLevelNetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); + ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); + ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionCtxt. + getPartitionCtxt(partitionId); clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); if (log.isDebugEnabled()) { log.debug(String.format("Member has been moved as pending termination: " @@ -516,10 +543,11 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { @Override public void handleMemberReadyToShutdownEvent(MemberReadyToShutdownEvent memberReadyToShutdownEvent) { - ClusterLevelNetworkPartitionContext nwPartitionCtxt; + ClusterInstanceContext nwPartitionCtxt; String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId(); String instanceId = memberReadyToShutdownEvent.getInstanceId(); - nwPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); + nwPartitionCtxt = getClusterInstanceContext(instanceId, + networkPartitionId); // start a new member in the same Partition String memberId = memberReadyToShutdownEvent.getMemberId(); @@ -567,8 +595,10 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String clusterId = memberTerminatedEvent.getClusterId(); String instanceId = memberTerminatedEvent.getInstanceId(); String partitionId = memberTerminatedEvent.getPartitionId(); - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); - ClusterLevelPartitionContext clusterMonitorPartitionContext = clusterLevelNetworkPartitionContext.getPartitionCtxt(partitionId); + ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, + networkPartitionId); + ClusterLevelPartitionContext clusterMonitorPartitionContext = + clusterLevelNetworkPartitionContext.getPartitionCtxt(partitionId); clusterMonitorPartitionContext.removeMemberStatsContext(memberId); if (clusterMonitorPartitionContext.removeTerminationPendingMember(memberId)) { @@ -656,35 +686,38 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { public void run() { for (ClusterLevelNetworkPartitionContext networkPartitionContext : getAllNetworkPartitionCtxts().values()) { - for (ClusterLevelPartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { - //if (log.isDebugEnabled()) { - log.info("Starting to terminate all members in cluster [" + getClusterId() + "] Network Partition [ " + - networkPartitionContext.getId() + " ], Partition [ " + - partitionContext.getPartitionId() + " ]"); - // } - // need to terminate active, pending and obsolete members - - // active members - for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) { - log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId()); - terminateMember(activeMemberCtxt.getMemberId()); - } - - // pending members - for (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) { - log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId()); - terminateMember(pendingMemberCtxt.getMemberId()); - } - - // obsolete members - for (String obsoleteMemberId : partitionContext.getObsoletedMembers().keySet()) { - log.info("Terminating obsolete member [member id] " + obsoleteMemberId); - terminateMember(obsoleteMemberId); - } + for(ClusterInstanceContext instanceContext : networkPartitionContext.getClusterInstanceContextMap().values()) { + for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts().values()) { + //if (log.isDebugEnabled()) { + log.info("Starting to terminate all members in cluster [" + getClusterId() + "] Network Partition [ " + + networkPartitionContext.getId() + " ], Partition [ " + + partitionContext.getPartitionId() + " ]"); + // } + // need to terminate active, pending and obsolete members + + // active members + for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) { + log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId()); + terminateMember(activeMemberCtxt.getMemberId()); + } + + // pending members + for (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) { + log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId()); + terminateMember(pendingMemberCtxt.getMemberId()); + } + + // obsolete members + for (String obsoleteMemberId : partitionContext.getObsoletedMembers().keySet()) { + log.info("Terminating obsolete member [member id] " + obsoleteMemberId); + terminateMember(obsoleteMemberId); + } // terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll // (terminateAllKnowledgeSession, terminateAllFactHandle, partitionContext); + } } + } } }, "Member Terminator - [cluster id] " + getClusterId()); @@ -692,33 +725,16 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { memberTerminator.start(); } - public Map<String, ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts(String instanceId) { - - VMClusterContext vmClusterContext = (VMClusterContext) instanceIdToClusterContextMap.get(instanceId); - return vmClusterContext.getNetworkPartitionCtxts(); - } - public Map<String, ClusterLevelNetworkPartitionContext> getAllNetworkPartitionCtxts() { - - Map<String, ClusterLevelNetworkPartitionContext> nwPartitionIdToNwPartitionMap = new HashMap<String, ClusterLevelNetworkPartitionContext>(); - - for (AbstractClusterContext clusterContext : instanceIdToClusterContextMap.values()) { - VMClusterContext vmClusterContext = (VMClusterContext) clusterContext; - nwPartitionIdToNwPartitionMap.putAll(vmClusterContext.getNetworkPartitionCtxts()); - } - return nwPartitionIdToNwPartitionMap; - } - - public ClusterLevelNetworkPartitionContext getNetworkPartitionCtxt(String instanceId, String id) { - - VMClusterContext vmClusterContext = (VMClusterContext) instanceIdToClusterContextMap.get(instanceId); - return vmClusterContext.getNetworkPartitionCtxt(id); + return ((VMClusterContext)this.clusterContext).getNetworkPartitionCtxts(); } - protected ClusterLevelNetworkPartitionContext getNetworkPartitionCtxt(String instanceId, Member member) { - - VMClusterContext vmClusterContext = (VMClusterContext) instanceIdToClusterContextMap.get(instanceId); - return vmClusterContext.getNetworkPartitionCtxt(member); + public ClusterInstanceContext getClusterInstanceContext(String networkPartitionId, String instanceId) { + Map<String, ClusterLevelNetworkPartitionContext> clusterLevelNetworkPartitionContextMap = + ((VMClusterContext)this.clusterContext).getNetworkPartitionCtxts(); + ClusterLevelNetworkPartitionContext networkPartitionContext = + clusterLevelNetworkPartitionContextMap.get(networkPartitionId); + return networkPartitionContext.getClusterInstanceContextMap().get(instanceId); } private static void terminateMember(String memberId) { @@ -730,5 +746,9 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { } } + public Collection<ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts() { + return ((VMClusterContext)this.clusterContext).getNetworkPartitionCtxts().values(); + } + }
