http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java new file mode 100644 index 0000000..d90e0b6 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesClusterMonitor.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.monitor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.KubernetesClusterContext; +import org.apache.stratos.autoscaler.MemberStatsContext; +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; +import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; +import org.apache.stratos.messaging.event.topology.MemberStartedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; + +/* + * Every kubernetes cluster monitor should extend this class + */ +public abstract class KubernetesClusterMonitor extends AbstractClusterMonitor { + + private static final Log log = LogFactory.getLog(KubernetesClusterMonitor.class); + + private KubernetesClusterContext kubernetesClusterCtxt; + protected AutoscalePolicy autoscalePolicy; + + protected KubernetesClusterMonitor(String clusterId, String serviceId, + KubernetesClusterContext kubernetesClusterContext, + AutoscalerRuleEvaluator autoscalerRuleEvaluator, + AutoscalePolicy autoscalePolicy) { + + super(clusterId, serviceId, autoscalerRuleEvaluator); + this.kubernetesClusterCtxt = kubernetesClusterContext; + this.autoscalePolicy = autoscalePolicy; + } + + @Override + public void handleAverageLoadAverageEvent( + AverageLoadAverageEvent averageLoadAverageEvent) { + + String clusterId = averageLoadAverageEvent.getClusterId(); + float value = averageLoadAverageEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Avg load avg event: [cluster] %s [value] %s", + clusterId, value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setAverageLoadAverage(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + + } + + @Override + public void handleGradientOfLoadAverageEvent( + GradientOfLoadAverageEvent gradientOfLoadAverageEvent) { + + String clusterId = gradientOfLoadAverageEvent.getClusterId(); + float value = gradientOfLoadAverageEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Grad of load avg event: [cluster] %s [value] %s", + clusterId, value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setLoadAverageGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + } + + @Override + public void handleSecondDerivativeOfLoadAverageEvent( + SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) { + + String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId(); + float value = secondDerivativeOfLoadAverageEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second Derivation of load avg event: [cluster] %s " + + "[value] %s", clusterId, value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setLoadAverageSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + } + + @Override + public void handleAverageMemoryConsumptionEvent( + AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) { + + String clusterId = averageMemoryConsumptionEvent.getClusterId(); + float value = averageMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Avg Memory Consumption event: [cluster] %s " + + "[value] %s", averageMemoryConsumptionEvent.getClusterId(), value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setAverageMemoryConsumption(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + } + + @Override + public void handleGradientOfMemoryConsumptionEvent( + GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) { + + String clusterId = gradientOfMemoryConsumptionEvent.getClusterId(); + float value = gradientOfMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Grad of Memory Consumption event: [cluster] %s " + + "[value] %s", clusterId, value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setMemoryConsumptionGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + } + + @Override + public void handleSecondDerivativeOfMemoryConsumptionEvent( + SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) { + + String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId(); + float value = secondDerivativeOfMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s " + + "[value] %s", clusterId, value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setMemoryConsumptionSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + } + + @Override + public void handleAverageRequestsInFlightEvent( + AverageRequestsInFlightEvent averageRequestsInFlightEvent) { + + float value = averageRequestsInFlightEvent.getValue(); + String clusterId = averageRequestsInFlightEvent.getClusterId(); + if (log.isDebugEnabled()) { + log.debug(String.format("Average Rif event: [cluster] %s [value] %s", + clusterId, value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setAverageRequestsInFlight(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + } + + @Override + public void handleGradientOfRequestsInFlightEvent( + GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) { + + String clusterId = gradientOfRequestsInFlightEvent.getClusterId(); + float value = gradientOfRequestsInFlightEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Gradient of Rif event: [cluster] %s [value] %s", + clusterId, value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setRequestsInFlightGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + } + + @Override + public void handleSecondDerivativeOfRequestsInFlightEvent( + SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) { + + String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId(); + float value = secondDerivativeOfRequestsInFlightEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second derivative of Rif event: [cluster] %s " + + "[value] %s", clusterId, value)); + } + KubernetesClusterContext kubernetesClusterContext = getKubernetesClusterCtxt(); + if (null != kubernetesClusterContext) { + kubernetesClusterContext.setRequestsInFlightSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Kubernetes cluster context is not available for :" + + " [cluster] %s", clusterId)); + } + } + } + + @Override + public void handleMemberAverageMemoryConsumptionEvent( + MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) { + + String memberId = memberAverageMemoryConsumptionEvent.getMemberId(); + KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); + MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberAverageMemoryConsumptionEvent.getValue(); + memberStatsContext.setAverageMemoryConsumption(value); + } + + @Override + public void handleMemberGradientOfMemoryConsumptionEvent( + MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) { + + String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId(); + KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); + MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberGradientOfMemoryConsumptionEvent.getValue(); + memberStatsContext.setGradientOfMemoryConsumption(value); + } + + @Override + public void handleMemberSecondDerivativeOfMemoryConsumptionEvent( + MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) { + + } + + @Override + public void handleMemberAverageLoadAverageEvent( + MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) { + + KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); + String memberId = memberAverageLoadAverageEvent.getMemberId(); + float value = memberAverageLoadAverageEvent.getValue(); + MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + memberStatsContext.setAverageLoadAverage(value); + } + + @Override + public void handleMemberGradientOfLoadAverageEvent( + MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) { + + String memberId = memberGradientOfLoadAverageEvent.getMemberId(); + KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); + MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberGradientOfLoadAverageEvent.getValue(); + memberStatsContext.setGradientOfLoadAverage(value); + } + + @Override + public void handleMemberSecondDerivativeOfLoadAverageEvent( + MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) { + + String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId(); + KubernetesClusterContext kubernetesClusterCtxt = getKubernetesClusterCtxt(); + MemberStatsContext memberStatsContext = kubernetesClusterCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberSecondDerivativeOfLoadAverageEvent.getValue(); + memberStatsContext.setSecondDerivativeOfLoadAverage(value); + } + + @Override + public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) { + + // kill the container + } + + @Override + public void handleMemberStartedEvent( + MemberStartedEvent memberStartedEvent) { + + } + + @Override + public void handleMemberActivatedEvent( + MemberActivatedEvent memberActivatedEvent) { + + KubernetesClusterContext kubernetesClusterContext; + kubernetesClusterContext = getKubernetesClusterCtxt(); + String memberId = memberActivatedEvent.getMemberId(); + kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been added successfully: " + + "[member] %s", memberId)); + } + kubernetesClusterContext.movePendingMemberToActiveMembers(memberId); + } + + @Override + public void handleMemberMaintenanceModeEvent( + MemberMaintenanceModeEvent maintenanceModeEvent) { + + // no need to do anything here + // we will not be receiving this event for containers + // because we just kill the containers + } + + @Override + public void handleMemberReadyToShutdownEvent( + MemberReadyToShutdownEvent memberReadyToShutdownEvent) { + + // no need to do anything here + // we will not be receiving this event for containers + // because we just kill the containers + } + + @Override + public void handleMemberTerminatedEvent( + MemberTerminatedEvent memberTerminatedEvent) { + + // no need to do anything here + // we will not be receiving this event for containers + // because we just kill the containers + } + + @Override + public void handleClusterRemovedEvent( + ClusterRemovedEvent clusterRemovedEvent) { + + } + + public KubernetesClusterContext getKubernetesClusterCtxt() { + return kubernetesClusterCtxt; + } + + public void setKubernetesClusterCtxt( + KubernetesClusterContext kubernetesClusterCtxt) { + this.kubernetesClusterCtxt = kubernetesClusterCtxt; + } + + public AutoscalePolicy getAutoscalePolicy() { + return autoscalePolicy; + } + + public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) { + this.autoscalePolicy = autoscalePolicy; + } +}
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java new file mode 100644 index 0000000..3c81ba3 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.monitor; + +import java.util.Properties; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.KubernetesClusterContext; +import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; +import org.apache.stratos.autoscaler.exception.SpawningException; +import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; +import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; +import org.apache.stratos.autoscaler.util.AutoScalerConstants; +import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +/* + * It is monitoring a kubernetes service cluster periodically. + */ +public final class KubernetesServiceClusterMonitor extends KubernetesClusterMonitor { + + private static final Log log = LogFactory.getLog(KubernetesServiceClusterMonitor.class); + + private String lbReferenceType; + private int numberOfReplicasInServiceCluster = 0; + int retryInterval = 60000; + + public KubernetesServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt, + String serviceClusterID, String serviceId, + AutoscalePolicy autoscalePolicy) { + super(serviceClusterID, serviceId, kubernetesClusterCtxt, + new AutoscalerRuleEvaluator(), autoscalePolicy); + readConfigurations(); + } + + @Override + public void run() { + + while (!isDestroyed()) { + if (log.isDebugEnabled()) { + log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString()); + } + try { + if (!ClusterStatus.In_Maintenance.equals(getStatus())) { + monitor(); + } else { + if (log.isDebugEnabled()) { + log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in " + + ClusterStatus.In_Maintenance + " mode......"); + } + } + } catch (Exception e) { + log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(), + e); + } + try { + Thread.sleep(getMonitorIntervalMilliseconds()); + } catch (InterruptedException ignore) { + } + } + } + + @Override + protected void monitor() { + + int minReplicas; + try { + TopologyManager.acquireReadLock(); + Service service = TopologyManager.getTopology().getService(getServiceId()); + Cluster cluster = service.getCluster(getClusterId()); + Properties props = cluster.getProperties(); + minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS)); + } finally { + TopologyManager.releaseReadLock(); + } + + // is container created successfully? + boolean success = false; + String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID(); + int activeMembers = getKubernetesClusterCtxt().getActiveMembers().size(); + int pendingMembers = getKubernetesClusterCtxt().getPendingMembers().size(); + int nonTerminatedMembers = activeMembers + pendingMembers; + + if (nonTerminatedMembers == 0) { + while (!success) { + try { + CloudControllerClient ccClient = CloudControllerClient.getInstance(); + MemberContext memberContext = ccClient.createContainer(kubernetesClusterId, getClusterId()); + if (null != memberContext) { + getKubernetesClusterCtxt().addPendingMember(memberContext); + success = true; + numberOfReplicasInServiceCluster = minReplicas; + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member added, [member] %s [kub cluster] %s", + memberContext.getMemberId(), kubernetesClusterId)); + } + } else { + if (log.isDebugEnabled()) { + log.debug("Returned member context is null, did not add to pending members"); + } + } + } catch (SpawningException spawningException) { + if (log.isDebugEnabled()) { + String message = "Cannot create containers, will retry in " + + (retryInterval / 1000) + "s"; + log.debug(message, spawningException); + } + } catch (Exception exception) { + if (log.isDebugEnabled()) { + String message = "Error while creating containers, will retry in " + + (retryInterval / 1000) + "s"; + log.debug(message, exception); + } + } + try { + Thread.sleep(retryInterval); + } catch (InterruptedException ignored) { + } + } + } + } + + @Override + public void destroy() { + getMinCheckKnowledgeSession().dispose(); + getScaleCheckKnowledgeSession().dispose(); + setDestroyed(true); + if (log.isDebugEnabled()) { + log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. " + this.toString()); + } + } + + @Override + protected void readConfigurations() { + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000); + setMonitorIntervalMilliseconds(monitorInterval); + if (log.isDebugEnabled()) { + log.debug("KubernetesServiceClusterMonitor task interval: " + getMonitorIntervalMilliseconds()); + } + } + + @Override + public String toString() { + return "KubernetesServiceClusterMonitor " + + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID() + + ", clusterId=" + getClusterId() + + ", serviceId=" + getServiceId() + "]"; + } + + public String getLbReferenceType() { + return lbReferenceType; + } + + public void setLbReferenceType(String lbReferenceType) { + this.lbReferenceType = lbReferenceType; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java index ffd6713..38ed1a6 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMClusterMonitor.java @@ -22,62 +22,581 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.MemberStatsContext; import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.PartitionContext; +import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; +import org.apache.stratos.autoscaler.exception.TerminationException; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.common.enums.ClusterType; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; +import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; +import org.apache.stratos.messaging.event.topology.MemberStartedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; /** * Is responsible for monitoring a service cluster. This runs periodically * and perform minimum instance check and scaling check using the underlying * rules engine. - * */ - abstract public class VMClusterMonitor extends AbstractClusterMonitor{ - - private static final Log log = LogFactory.getLog(VMClusterMonitor.class); - // Map<NetworkpartitionId, Network Partition Context> - protected Map<String, NetworkPartitionContext> networkPartitionCtxts; - protected DeploymentPolicy deploymentPolicy; - protected AutoscalePolicy autoscalePolicy; - - protected VMClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, - AutoscalerRuleEvaluator autoscalerRuleEvaluator, - DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy, - Map<String, NetworkPartitionContext> networkPartitionCtxts) { - super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator); - this.deploymentPolicy = deploymentPolicy; - this.autoscalePolicy = autoscalePolicy; - this.networkPartitionCtxts = networkPartitionCtxts; - } - - public NetworkPartitionContext getNetworkPartitionCtxt(Member member) { - log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId()); - String networkPartitionId = member.getNetworkPartitionId(); - if(networkPartitionCtxts.containsKey(networkPartitionId)) { - log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId)); - return networkPartitionCtxts.get(networkPartitionId); - } - log.info("returning null getNetworkPartitionCtxt"); - return null; - } - - public String getPartitionOfMember(String memberId){ - for(Service service: TopologyManager.getTopology().getServices()){ - for(Cluster cluster: service.getClusters()){ - if(cluster.memberExists(memberId)){ +abstract public class VMClusterMonitor extends AbstractClusterMonitor { + + private static final Log log = LogFactory.getLog(VMClusterMonitor.class); + // Map<NetworkpartitionId, Network Partition Context> + protected Map<String, NetworkPartitionContext> networkPartitionCtxts; + protected DeploymentPolicy deploymentPolicy; + protected AutoscalePolicy autoscalePolicy; + + protected VMClusterMonitor(String clusterId, String serviceId, + AutoscalerRuleEvaluator autoscalerRuleEvaluator, + DeploymentPolicy deploymentPolicy, AutoscalePolicy autoscalePolicy, + Map<String, NetworkPartitionContext> networkPartitionCtxts) { + super(clusterId, serviceId, autoscalerRuleEvaluator); + this.deploymentPolicy = deploymentPolicy; + this.autoscalePolicy = autoscalePolicy; + this.networkPartitionCtxts = networkPartitionCtxts; + } + + @Override + public void handleAverageLoadAverageEvent( + AverageLoadAverageEvent averageLoadAverageEvent) { + + String networkPartitionId = averageLoadAverageEvent.getNetworkPartitionId(); + String clusterId = averageLoadAverageEvent.getClusterId(); + float value = averageLoadAverageEvent.getValue(); + + if (log.isDebugEnabled()) { + log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, value)); + } + + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setAverageLoadAverage(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + + } + + @Override + public void handleGradientOfLoadAverageEvent( + GradientOfLoadAverageEvent gradientOfLoadAverageEvent) { + + String networkPartitionId = gradientOfLoadAverageEvent.getNetworkPartitionId(); + String clusterId = gradientOfLoadAverageEvent.getClusterId(); + float value = gradientOfLoadAverageEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, value)); + } + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setLoadAverageGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleSecondDerivativeOfLoadAverageEvent( + SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent) { + + String networkPartitionId = secondDerivativeOfLoadAverageEvent.getNetworkPartitionId(); + String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId(); + float value = secondDerivativeOfLoadAverageEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second Derivation of load avg event: [cluster] %s " + + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); + } + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setLoadAverageSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleAverageMemoryConsumptionEvent( + AverageMemoryConsumptionEvent averageMemoryConsumptionEvent) { + + String networkPartitionId = averageMemoryConsumptionEvent.getNetworkPartitionId(); + String clusterId = averageMemoryConsumptionEvent.getClusterId(); + float value = averageMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Avg Memory Consumption event: [cluster] %s [network-partition] %s " + + "[value] %s", clusterId, networkPartitionId, value)); + } + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setAverageMemoryConsumption(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String + .format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleGradientOfMemoryConsumptionEvent( + GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent) { + + String networkPartitionId = gradientOfMemoryConsumptionEvent.getNetworkPartitionId(); + String clusterId = gradientOfMemoryConsumptionEvent.getClusterId(); + float value = gradientOfMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Grad of Memory Consumption event: [cluster] %s " + + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); + } + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setMemoryConsumptionGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleSecondDerivativeOfMemoryConsumptionEvent( + SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent) { + + String networkPartitionId = secondDerivativeOfMemoryConsumptionEvent.getNetworkPartitionId(); + String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId(); + float value = secondDerivativeOfMemoryConsumptionEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second Derivation of Memory Consumption event: [cluster] %s " + + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); + } + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setMemoryConsumptionSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleAverageRequestsInFlightEvent( + AverageRequestsInFlightEvent averageRequestsInFlightEvent) { + + String networkPartitionId = averageRequestsInFlightEvent.getNetworkPartitionId(); + String clusterId = averageRequestsInFlightEvent.getClusterId(); + float value = averageRequestsInFlightEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, value)); + } + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setAverageRequestsInFlight(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleGradientOfRequestsInFlightEvent( + GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent) { + + String networkPartitionId = gradientOfRequestsInFlightEvent.getNetworkPartitionId(); + String clusterId = gradientOfRequestsInFlightEvent.getClusterId(); + float value = gradientOfRequestsInFlightEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, value)); + } + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setRequestsInFlightGradient(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleSecondDerivativeOfRequestsInFlightEvent( + SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent) { + + String networkPartitionId = secondDerivativeOfRequestsInFlightEvent.getNetworkPartitionId(); + String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId(); + float value = secondDerivativeOfRequestsInFlightEvent.getValue(); + if (log.isDebugEnabled()) { + log.debug(String.format("Second derivative of Rif event: [cluster] %s " + + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); + } + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + if (null != networkPartitionContext) { + networkPartitionContext.setRequestsInFlightSecondDerivative(value); + } else { + if (log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + + @Override + public void handleMemberAverageMemoryConsumptionEvent( + MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent) { + + String memberId = memberAverageMemoryConsumptionEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId); + PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberAverageMemoryConsumptionEvent.getValue(); + memberStatsContext.setAverageMemoryConsumption(value); + } + + @Override + public void handleMemberGradientOfMemoryConsumptionEvent( + MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent) { + + String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId); + PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberGradientOfMemoryConsumptionEvent.getValue(); + memberStatsContext.setGradientOfMemoryConsumption(value); + } + + @Override + public void handleMemberSecondDerivativeOfMemoryConsumptionEvent( + MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent) { + + } + + @Override + public void handleMemberAverageLoadAverageEvent( + MemberAverageLoadAverageEvent memberAverageLoadAverageEvent) { + + String memberId = memberAverageLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId); + PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberAverageLoadAverageEvent.getValue(); + memberStatsContext.setAverageLoadAverage(value); + } + + @Override + public void handleMemberGradientOfLoadAverageEvent( + MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent) { + + String memberId = memberGradientOfLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId); + PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberGradientOfLoadAverageEvent.getValue(); + memberStatsContext.setGradientOfLoadAverage(value); + } + + @Override + public void handleMemberSecondDerivativeOfLoadAverageEvent( + MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent) { + + String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); + NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId); + PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); + if (null == memberStatsContext) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member context is not available for : [member] %s", memberId)); + } + return; + } + float value = memberSecondDerivativeOfLoadAverageEvent.getValue(); + memberStatsContext.setSecondDerivativeOfLoadAverage(value); + } + + @Override + public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) { + + String memberId = memberFaultEvent.getMemberId(); + Member member = getMemberByMemberId(memberId); + if (null == member) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member not found in the Topology: [member] %s", memberId)); + } + return; + } + if (!member.isActive()) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member activated event has not received for the member %s. " + + "Therefore ignoring" + " the member fault health stat", memberId)); + } + return; + } + + NetworkPartitionContext nwPartitionCtxt; + nwPartitionCtxt = getNetworkPartitionCtxt(member); + String partitionId = getPartitionOfMember(memberId); + PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + if (!partitionCtxt.activeMemberExist(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Could not find the active member in partition context, " + + "[member] %s ", memberId)); + } + return; + } + // terminate the faulty member + CloudControllerClient ccClient = CloudControllerClient.getInstance(); + try { + ccClient.terminate(memberId); + } catch (TerminationException e) { + String msg = "TerminationException " + e.getLocalizedMessage(); + log.error(msg, e); + } + // remove from active member list + partitionCtxt.removeActiveMemberById(memberId); + if (log.isInfoEnabled()) { + String clusterId = memberFaultEvent.getClusterId(); + log.info(String.format("Faulty member is terminated and removed from the active members list: " + + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId)); + } + } + + @Override + public void handleMemberStartedEvent( + MemberStartedEvent memberStartedEvent) { + + } + + @Override + public void handleMemberActivatedEvent( + MemberActivatedEvent memberActivatedEvent) { + + String networkPartitionId = memberActivatedEvent.getNetworkPartitionId(); + String partitionId = memberActivatedEvent.getPartitionId(); + String memberId = memberActivatedEvent.getMemberId(); + NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId); + PartitionContext partitionContext; + partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); + partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been added successfully: " + + "[member] %s", memberId)); + } + partitionContext.movePendingMemberToActiveMembers(memberId); + } + + @Override + public void handleMemberMaintenanceModeEvent( + MemberMaintenanceModeEvent maintenanceModeEvent) { + + String networkPartitionId = maintenanceModeEvent.getNetworkPartitionId(); + String partitionId = maintenanceModeEvent.getPartitionId(); + String memberId = maintenanceModeEvent.getMemberId(); + NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId); + PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); + partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + if (log.isDebugEnabled()) { + log.debug(String.format("Member has been moved as pending termination: " + + "[member] %s", memberId)); + } + partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); + } + + @Override + public void handleMemberReadyToShutdownEvent( + MemberReadyToShutdownEvent memberReadyToShutdownEvent) { + + NetworkPartitionContext nwPartitionCtxt; + String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId(); + nwPartitionCtxt = getNetworkPartitionCtxt(networkPartitionId); + + // start a new member in the same Partition + String memberId = memberReadyToShutdownEvent.getMemberId(); + String partitionId = getPartitionOfMember(memberId); + PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + // terminate the shutdown ready member + CloudControllerClient ccClient = CloudControllerClient.getInstance(); + try { + ccClient.terminate(memberId); + // remove from active member list + partitionCtxt.removeActiveMemberById(memberId); + + String clusterId = memberReadyToShutdownEvent.getClusterId(); + log.info(String.format("Member is terminated and removed from the active members list: " + + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId)); + } catch (TerminationException e) { + String msg = "TerminationException" + e.getLocalizedMessage(); + log.error(msg, e); + } + } + + @Override + public void handleMemberTerminatedEvent( + MemberTerminatedEvent memberTerminatedEvent) { + + String networkPartitionId = memberTerminatedEvent.getNetworkPartitionId(); + String memberId = memberTerminatedEvent.getMemberId(); + String partitionId = memberTerminatedEvent.getPartitionId(); + NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(networkPartitionId); + PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId); + partitionContext.removeMemberStatsContext(memberId); + + if (partitionContext.removeTerminationPendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from termination pending members list: " + + "[member] %s", memberId)); + } + } else if (partitionContext.removePendingMember(memberId)) { + if (log.isDebugEnabled()) { + log.debug(String.format("Member is removed from pending members list: " + + "[member] %s", memberId)); + } + } else if (partitionContext.removeActiveMemberById(memberId)) { + log.warn(String.format("Member is in the wrong list and it is removed from " + + "active members list", memberId)); + } else if (partitionContext.removeObsoleteMember(memberId)) { + log.warn(String.format("Member's obsolated timeout has been expired and " + + "it is removed from obsolated members list", memberId)); + } else { + log.warn(String.format("Member is not available in any of the list active, " + + "pending and termination pending", memberId)); + } + + if (log.isInfoEnabled()) { + log.info(String.format("Member stat context has been removed successfully: " + + "[member] %s", memberId)); + } + } + + @Override + public void handleClusterRemovedEvent( + ClusterRemovedEvent clusterRemovedEvent) { + + } + + private String getNetworkPartitionIdByMemberId(String memberId) { + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.memberExists(memberId)) { + return cluster.getMember(memberId).getNetworkPartitionId(); + } + } + } + return null; + } + + private Member getMemberByMemberId(String memberId) { + try { + TopologyManager.acquireReadLock(); + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.memberExists(memberId)) { + return cluster.getMember(memberId); + } + } + } + return null; + } finally { + TopologyManager.releaseReadLock(); + } + } + + public NetworkPartitionContext getNetworkPartitionCtxt(Member member) { + log.info("***** getNetworkPartitionCtxt " + member.getNetworkPartitionId()); + String networkPartitionId = member.getNetworkPartitionId(); + if (networkPartitionCtxts.containsKey(networkPartitionId)) { + log.info("returnnig network partition context " + networkPartitionCtxts.get(networkPartitionId)); + return networkPartitionCtxts.get(networkPartitionId); + } + log.info("returning null getNetworkPartitionCtxt"); + return null; + } + + public String getPartitionOfMember(String memberId) { + for (Service service : TopologyManager.getTopology().getServices()) { + for (Cluster cluster : service.getClusters()) { + if (cluster.memberExists(memberId)) { return cluster.getMember(memberId).getPartitionId(); } } } return null; - } - + } + public DeploymentPolicy getDeploymentPolicy() { return deploymentPolicy; } @@ -92,7 +611,7 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) { this.autoscalePolicy = autoscalePolicy; - } + } public Map<String, NetworkPartitionContext> getNetworkPartitionCtxts() { return networkPartitionCtxts; @@ -113,7 +632,7 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; public void addNetworkPartitionCtxt(NetworkPartitionContext ctxt) { this.networkPartitionCtxts.put(ctxt.getId(), ctxt); } - + public NetworkPartitionContext getPartitionCtxt(String id) { return this.networkPartitionCtxts.get(id); } http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java index f547cb1..a0c66f0 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java @@ -18,36 +18,39 @@ */ package org.apache.stratos.autoscaler.monitor; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.NetworkPartitionContext; +import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; +import org.apache.stratos.autoscaler.partition.PartitionManager; +import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; import org.apache.stratos.autoscaler.util.AutoScalerConstants; import org.apache.stratos.autoscaler.util.ConfUtil; -import org.apache.stratos.common.enums.ClusterType; import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; /** * Is responsible for monitoring a service cluster. This runs periodically * and perform minimum instance check and scaling check using the underlying * rules engine. - * */ -public class VMLbClusterMonitor extends VMClusterMonitor{ +public class VMLbClusterMonitor extends VMClusterMonitor { private static final Log log = LogFactory.getLog(VMLbClusterMonitor.class); public VMLbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy, - AutoscalePolicy autoscalePolicy) { - super(clusterId, serviceId, ClusterType.VMLbCluster, new AutoscalerRuleEvaluator(), - deploymentPolicy, autoscalePolicy, - new ConcurrentHashMap<String, NetworkPartitionContext>()); + AutoscalePolicy autoscalePolicy) { + super(clusterId, serviceId, new AutoscalerRuleEvaluator(), + deploymentPolicy, autoscalePolicy, + new ConcurrentHashMap<String, NetworkPartitionContext>()); readConfigurations(); } @@ -56,27 +59,27 @@ public class VMLbClusterMonitor extends VMClusterMonitor{ while (!isDestroyed()) { if (log.isDebugEnabled()) { - log.debug("VMLbClusterMonitor is running.. "+this.toString()); + log.debug("VMLbClusterMonitor is running.. " + this.toString()); } try { - if( !ClusterStatus.In_Maintenance.equals(getStatus())) { + if (!ClusterStatus.In_Maintenance.equals(getStatus())) { monitor(); } else { if (log.isDebugEnabled()) { log.debug("VMLbClusterMonitor is suspended as the cluster is in " + - ClusterStatus.In_Maintenance + " mode......"); + ClusterStatus.In_Maintenance + " mode......"); } } } catch (Exception e) { - log.error("VMLbClusterMonitor : Monitor failed. "+this.toString(), e); + log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e); } try { - Thread.sleep(getMonitorInterval()); + Thread.sleep(getMonitorIntervalMilliseconds()); } catch (InterruptedException ignore) { } } } - + @Override protected void monitor() { // TODO make this concurrent @@ -84,21 +87,21 @@ public class VMLbClusterMonitor extends VMClusterMonitor{ // minimum check per partition for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts() - .values()) { + .values()) { if (partitionContext != null) { getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); getMinCheckKnowledgeSession().setGlobal("isPrimary", false); - + if (log.isDebugEnabled()) { log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId())); } minCheckFactHandle = - AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(), - minCheckFactHandle, - partitionContext); + AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession(), + minCheckFactHandle, + partitionContext); // start only in the first partition context break; } @@ -106,25 +109,55 @@ public class VMLbClusterMonitor extends VMClusterMonitor{ } } - } - - @Override + } + + @Override public void destroy() { getMinCheckKnowledgeSession().dispose(); getMinCheckKnowledgeSession().dispose(); setDestroyed(true); - if(log.isDebugEnabled()) { - log.debug("VMLbClusterMonitor Drools session has been disposed. "+this.toString()); + if (log.isDebugEnabled()) { + log.debug("VMLbClusterMonitor Drools session has been disposed. " + this.toString()); } } - + @Override - protected void readConfigurations () { + protected void readConfigurations() { XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000); - setMonitorInterval(monitorInterval); + setMonitorIntervalMilliseconds(monitorInterval); if (log.isDebugEnabled()) { - log.debug("VMLbClusterMonitor task interval: " + getMonitorInterval()); + log.debug("VMLbClusterMonitor task interval: " + getMonitorIntervalMilliseconds()); + } + } + + @Override + public void handleClusterRemovedEvent( + ClusterRemovedEvent clusterRemovedEvent) { + + String deploymentPolicy = clusterRemovedEvent.getDeploymentPolicy(); + String clusterId = clusterRemovedEvent.getClusterId(); + DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy); + if (depPolicy != null) { + List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance() + .getNetworkPartitionLbHolders(depPolicy); + + for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) { + // removes lb cluster ids + boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId); + if (isRemoved) { + log.info("Removed the lb cluster [id]:" + + clusterId + + " reference from Network Partition [id]: " + + networkPartitionLbHolder + .getNetworkPartitionId()); + + } + if (log.isDebugEnabled()) { + log.debug(networkPartitionLbHolder); + } + + } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java index 9e97e19..0452e32 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java @@ -35,14 +35,12 @@ import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; import org.apache.stratos.cloud.controller.stub.pojo.Properties; import org.apache.stratos.cloud.controller.stub.pojo.Property; -import org.apache.stratos.common.enums.ClusterType; import org.apache.stratos.messaging.domain.topology.ClusterStatus; /** * Is responsible for monitoring a service cluster. This runs periodically * and perform minimum instance check and scaling check using the underlying * rules engine. - * */ public class VMServiceClusterMonitor extends VMClusterMonitor { @@ -50,11 +48,12 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { private String lbReferenceType; private boolean hasPrimary; - public VMServiceClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy, - AutoscalePolicy autoscalePolicy) { - super(clusterId, serviceId, ClusterType.VMServiceCluster, new AutoscalerRuleEvaluator(), - deploymentPolicy, autoscalePolicy, - new ConcurrentHashMap<String, NetworkPartitionContext>()); + public VMServiceClusterMonitor(String clusterId, String serviceId, + DeploymentPolicy deploymentPolicy, + AutoscalePolicy autoscalePolicy) { + super(clusterId, serviceId, new AutoscalerRuleEvaluator(), + deploymentPolicy, autoscalePolicy, + new ConcurrentHashMap<String, NetworkPartitionContext>()); readConfigurations(); } @@ -73,19 +72,19 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { log.debug("VMServiceClusterMonitor is running.. " + this.toString()); } try { - if(!ClusterStatus.In_Maintenance.equals(getStatus())) { + if (!ClusterStatus.In_Maintenance.equals(getStatus())) { monitor(); } else { if (log.isDebugEnabled()) { log.debug("VMServiceClusterMonitor is suspended as the cluster is in " + - ClusterStatus.In_Maintenance + " mode......"); + ClusterStatus.In_Maintenance + " mode......"); } } } catch (Exception e) { log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e); } try { - Thread.sleep(getMonitorInterval()); + Thread.sleep(getMonitorIntervalMilliseconds()); } catch (InterruptedException ignore) { } } @@ -105,13 +104,13 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { List<String> primaryMemberListInPartition = new ArrayList<String>(); // get active primary members in this partition context for (MemberContext memberContext : partitionContext.getActiveMembers()) { - if (isPrimaryMember(memberContext)){ + if (isPrimaryMember(memberContext)) { primaryMemberListInPartition.add(memberContext.getMemberId()); } } // get pending primary members in this partition context for (MemberContext memberContext : partitionContext.getPendingMembers()) { - if (isPrimaryMember(memberContext)){ + if (isPrimaryMember(memberContext)) { primaryMemberListInPartition.add(memberContext.getMemberId()); } } @@ -134,19 +133,19 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset(); boolean loadAverageReset = networkPartitionContext.isLoadAverageReset(); if (log.isDebugEnabled()) { - log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset - + " flag of loadAverageReset" + loadAverageReset); + log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset + + " flag of loadAverageReset" + loadAverageReset); } if (rifReset || memoryConsumptionReset || loadAverageReset) { - getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy); - getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy); - getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset); - getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset); - getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset); - getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType); - getScaleCheckKnowledgeSession().setGlobal("isPrimary", false); - getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition); + getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", autoscalePolicy); + getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset); + getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset); + getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset); + getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType); + getScaleCheckKnowledgeSession().setGlobal("isPrimary", false); + getScaleCheckKnowledgeSession().setGlobal("primaryMembers", primaryMemberListInNetworkPartition); if (log.isDebugEnabled()) { log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId())); @@ -161,12 +160,12 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { networkPartitionContext.setLoadAverageReset(false); } else if (log.isDebugEnabled()) { log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " + - "cycle for network partition %s", networkPartitionContext.getId())); + "cycle for network partition %s", networkPartitionContext.getId())); } } } - - private boolean isPrimaryMember(MemberContext memberContext){ + + private boolean isPrimaryMember(MemberContext memberContext) { Properties props = memberContext.getProperties(); if (log.isDebugEnabled()) { log.debug(" Properties [" + props + "] "); @@ -176,7 +175,7 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { if (prop.getName().equals("PRIMARY")) { if (Boolean.parseBoolean(prop.getValue())) { log.debug("Adding member id [" + memberContext.getMemberId() + "] " + - "member instance id [" + memberContext.getInstanceId() + "] as a primary member"); + "member instance id [" + memberContext.getInstanceId() + "] as a primary member"); return true; } } @@ -184,33 +183,33 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { } return false; } - + @Override - protected void readConfigurations () { + protected void readConfigurations() { XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000); - setMonitorInterval(monitorInterval); + setMonitorIntervalMilliseconds(monitorInterval); if (log.isDebugEnabled()) { - log.debug("VMServiceClusterMonitor task interval: " + getMonitorInterval()); + log.debug("VMServiceClusterMonitor task interval: " + getMonitorIntervalMilliseconds()); } } - - @Override + + @Override public void destroy() { getMinCheckKnowledgeSession().dispose(); getScaleCheckKnowledgeSession().dispose(); setDestroyed(true); - if(log.isDebugEnabled()) { - log.debug("VMServiceClusterMonitor Drools session has been disposed. "+this.toString()); + if (log.isDebugEnabled()) { + log.debug("VMServiceClusterMonitor Drools session has been disposed. " + this.toString()); } } @Override public String toString() { return "VMServiceClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() + - ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy + - ", lbReferenceType=" + lbReferenceType + - ", hasPrimary=" + hasPrimary + " ]"; + ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy + + ", lbReferenceType=" + lbReferenceType + + ", hasPrimary=" + hasPrimary + " ]"; } public String getLbReferenceType() {
