http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java index 1603aef..e857eaf 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyEventReceiver.java @@ -19,29 +19,16 @@ package org.apache.stratos.autoscaler.message.receiver.topology; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.AutoscalerContext; -import org.apache.stratos.autoscaler.KubernetesClusterContext; -import org.apache.stratos.autoscaler.MemberStatsContext; import org.apache.stratos.autoscaler.NetworkPartitionContext; -import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; -import org.apache.stratos.autoscaler.PartitionContext; -import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; -import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.exception.PartitionValidationException; import org.apache.stratos.autoscaler.exception.PolicyValidationException; -import org.apache.stratos.autoscaler.exception.TerminationException; import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.ClusterMonitorFactory; -import org.apache.stratos.autoscaler.monitor.ContainerClusterMonitor; import org.apache.stratos.autoscaler.monitor.VMClusterMonitor; -import org.apache.stratos.autoscaler.partition.PartitionManager; -import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.common.enums.ClusterType; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.Event; @@ -112,7 +99,6 @@ public class AutoscalerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() { @Override protected void onEvent(Event event) { - try { TopologyManager.acquireReadLock(); for (Service service : TopologyManager.getTopology().getServices()) { @@ -121,167 +107,108 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } } } catch (Exception e) { - log.error("Error processing event", e); + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); } finally { TopologyManager.releaseReadLock(); } } - - }); topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() { @Override protected void onEvent(Event event) { try { - MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event; + MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event; + String clusterId = memberReadyToShutdownEvent.getClusterId(); AutoscalerContext asCtx = AutoscalerContext.getInstance(); AbstractClusterMonitor monitor; - String clusterId = memberReadyToShutdownEvent.getClusterId(); - String memberId = memberReadyToShutdownEvent.getMemberId(); - - if(asCtx.clusterMonitorExist(clusterId)) { - monitor = asCtx.getClusterMonitor(clusterId); - } else { - if(log.isDebugEnabled()){ + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } - - TopologyManager.acquireReadLock(); - - if(monitor.getClusterType() == ClusterType.VMServiceCluster - || monitor.getClusterType() == ClusterType.VMLbCluster) { - - NetworkPartitionContext nwPartitionCtxt; - String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId(); - nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); - - // start a new member in the same Partition - String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId); - PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); - - - // terminate the shutdown ready member - CloudControllerClient ccClient = CloudControllerClient.getInstance(); - ccClient.terminate(memberId); - - // remove from active member list - partitionCtxt.removeActiveMemberById(memberId); - - if (log.isInfoEnabled()) { - log.info(String.format("Member is terminated and removed from the active members list: " - + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId)); - } - } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { - // no need to do anything - } + monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); + } + } + }); - } catch (TerminationException e) { - log.error(e); + topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() { + @Override + protected void onEvent(Event event) { + try { + log.info("Event received: " + event); + ClusterCreatedEvent clusterCreatedEvent = (ClusterCreatedEvent) event; + TopologyManager.acquireReadLock(); + Service service = TopologyManager.getTopology().getService(clusterCreatedEvent.getServiceName()); + Cluster cluster = service.getCluster(clusterCreatedEvent.getClusterId()); + startClusterMonitor(cluster); + } catch (Exception e) { + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); } finally { TopologyManager.releaseReadLock(); } } - }); - topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() { - @Override - protected void onEvent(Event event) { - try { - log.info("Event received: " + event); - ClusterCreatedEvent e = (ClusterCreatedEvent) event; - TopologyManager.acquireReadLock(); - Service service = TopologyManager.getTopology().getService(e.getServiceName()); - Cluster cluster = service.getCluster(e.getClusterId()); - startClusterMonitor(cluster); - } catch (Exception e) { - log.error("Error processing event", e); - } finally { - TopologyManager.releaseReadLock(); - } - } - - }); - topologyEventReceiver.addEventListener(new ClusterMaintenanceModeEventListener() { @Override protected void onEvent(Event event) { try { log.info("Event received: " + event); - ClusterMaintenanceModeEvent e = (ClusterMaintenanceModeEvent) event; + ClusterMaintenanceModeEvent clusterMaintenanceModeEvent = (ClusterMaintenanceModeEvent) event; TopologyManager.acquireReadLock(); - Service service = TopologyManager.getTopology().getService(e.getServiceName()); - Cluster cluster = service.getCluster(e.getClusterId()); - if(AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) { - AutoscalerContext.getInstance().getClusterMonitor(e.getClusterId()).setStatus(e.getStatus()); - } else { + Service service = TopologyManager.getTopology().getService(clusterMaintenanceModeEvent.getServiceName()); + Cluster cluster = service.getCluster(clusterMaintenanceModeEvent.getClusterId()); + AbstractClusterMonitor monitor; + monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId()); + if (null == monitor) { log.error("cluster monitor not exists for the cluster: " + cluster.toString()); + return; } + monitor.setStatus(clusterMaintenanceModeEvent.getStatus()); } catch (Exception e) { - log.error("Error processing event", e); + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); } finally { TopologyManager.releaseReadLock(); } } - - }); + }); topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() { @Override protected void onEvent(Event event) { try { - ClusterRemovedEvent e = (ClusterRemovedEvent) event; - TopologyManager.acquireReadLock(); - - String clusterId = e.getClusterId(); - String deploymentPolicy = e.getDeploymentPolicy(); - - AbstractClusterMonitor monitor = null; - - if (e.isLbCluster()) { - DeploymentPolicy depPolicy = PolicyManager.getInstance().getDeploymentPolicy(deploymentPolicy); - if (depPolicy != null) { - List<NetworkPartitionLbHolder> lbHolders = PartitionManager.getInstance() - .getNetworkPartitionLbHolders(depPolicy); - - for (NetworkPartitionLbHolder networkPartitionLbHolder : lbHolders) { - // removes lb cluster ids - boolean isRemoved = networkPartitionLbHolder.removeLbClusterId(clusterId); - if (isRemoved) { - log.info("Removed the lb cluster [id]:" - + clusterId - + " reference from Network Partition [id]: " - + networkPartitionLbHolder - .getNetworkPartitionId()); - - } - if (log.isDebugEnabled()) { - log.debug(networkPartitionLbHolder); - } - - } + ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; + String clusterId = clusterRemovedEvent.getClusterId(); + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractClusterMonitor monitor; + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { + log.debug(String.format("A cluster monitor is not found in autoscaler context " + + "[cluster] %s", clusterId)); } + return; } - - monitor = AutoscalerContext.getInstance().removeClusterMonitor(clusterId); - - // runTerminateAllRule(monitor); - if (monitor != null) { - monitor.destroy(); - log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ", - clusterId)); - } + monitor.handleClusterRemovedEvent(clusterRemovedEvent); + asCtx.removeClusterMonitor(clusterId); + monitor.destroy(); + log.info(String.format("Cluster monitor has been removed successfully: [cluster] %s ", + clusterId)); } catch (Exception e) { - log.error("Error processing event", e); - } finally { - TopologyManager.releaseReadLock(); + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); } } - }); topologyEventReceiver.addEventListener(new MemberStartedEventListener() { @@ -295,70 +222,23 @@ public class AutoscalerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { @Override protected void onEvent(Event event) { - try { - TopologyManager.acquireReadLock(); - MemberTerminatedEvent e = (MemberTerminatedEvent) event; - String networkPartitionId = e.getNetworkPartitionId(); - String clusterId = e.getClusterId(); - String partitionId = e.getPartitionId(); - String memberId = e.getMemberId(); + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + String clusterId = memberTerminatedEvent.getClusterId(); AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - - if(asCtx.clusterMonitorExist(clusterId)) { - monitor = asCtx.getClusterMonitor(clusterId); - } else { - if(log.isDebugEnabled()){ + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } - - if(monitor.getClusterType() == ClusterType.VMServiceCluster - || monitor.getClusterType() == ClusterType.VMLbCluster) { - - NetworkPartitionContext networkPartitionContext = - ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); - - PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId); - partitionContext.removeMemberStatsContext(memberId); - - if (partitionContext.removeTerminationPendingMember(memberId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is removed from termination pending members list: " - + "[member] %s", memberId)); - } - } else if (partitionContext.removePendingMember(memberId)) { - if (log.isDebugEnabled()) { - log.debug(String.format("Member is removed from pending members list: " - + "[member] %s", memberId)); - } - } else if (partitionContext.removeActiveMemberById(memberId)) { - log.warn(String.format("Member is in the wrong list and it is removed from " - + "active members list", memberId)); - } else if (partitionContext.removeObsoleteMember(memberId)){ - log.warn(String.format("Member's obsolated timeout has been expired and " - + "it is removed from obsolated members list", memberId)); - } else { - log.warn(String.format("Member is not available in any of the list active, " - + "pending and termination pending", memberId)); - } - - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been removed successfully: " - + "[member] %s", memberId)); - } - } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { - // no need to do anything - } - + monitor.handleMemberTerminatedEvent(memberTerminatedEvent); } catch (Exception e) { - log.error("Error processing event", e); - } finally { - TopologyManager.releaseReadLock(); + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); } } @@ -367,160 +247,47 @@ public class AutoscalerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { - try { - TopologyManager.acquireReadLock(); - - MemberActivatedEvent e = (MemberActivatedEvent) event; - String memberId = e.getMemberId(); - String partitionId = e.getPartitionId(); - String networkPartitionId = e.getNetworkPartitionId(); - - String clusterId = e.getClusterId(); + MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; + String clusterId = memberActivatedEvent.getClusterId(); AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - if(asCtx.clusterMonitorExist(clusterId)) { - monitor = asCtx.getClusterMonitor(clusterId); - } else { - if(log.isDebugEnabled()){ + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } - - if (monitor.getClusterType() == ClusterType.VMServiceCluster - || monitor.getClusterType() == ClusterType.VMLbCluster) { - PartitionContext partitionContext; - partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been added successfully: " - + "[member] %s", memberId)); - } - partitionContext.movePendingMemberToActiveMembers(memberId); - } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { - KubernetesClusterContext kubernetesClusterContext; - kubernetesClusterContext = ((ContainerClusterMonitor) monitor).getKubernetesClusterCtxt(); - kubernetesClusterContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isInfoEnabled()) { - log.info(String.format("Member stat context has been added successfully: " - + "[member] %s", memberId)); - } - kubernetesClusterContext.movePendingMemberToActiveMembers(memberId); - } - + monitor.handleMemberActivatedEvent(memberActivatedEvent); } catch (Exception e) { - log.error("Error processing event", e); - } finally { - TopologyManager.releaseReadLock(); + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); } } }); - topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() { - @Override - protected void onEvent(Event event) { - try { - TopologyManager.acquireReadLock(); - - MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent)event; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - AbstractClusterMonitor monitor; - String clusterId = memberReadyToShutdownEvent.getClusterId(); - String memberId = memberReadyToShutdownEvent.getMemberId(); - - if(asCtx.clusterMonitorExist(clusterId)) { - monitor = asCtx.getClusterMonitor(clusterId); - } else { - if(log.isDebugEnabled()){ - log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); - } - return; - } - - if(monitor.getClusterType() == ClusterType.VMServiceCluster - || monitor.getClusterType() == ClusterType.VMLbCluster) { - - NetworkPartitionContext nwPartitionCtxt; - String networkPartitionId = memberReadyToShutdownEvent.getNetworkPartitionId(); - nwPartitionCtxt = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId); - - // start a new member in the same Partition - String partitionId = ((VMClusterMonitor) monitor).getPartitionOfMember(memberId); - PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); - - - // terminate the shutdown ready member - CloudControllerClient ccClient = CloudControllerClient.getInstance(); - ccClient.terminate(memberId); - - // remove from active member list - partitionCtxt.removeActiveMemberById(memberId); - - if (log.isInfoEnabled()) { - log.info(String.format("Member is terminated and removed from the active members list: " - + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId)); - } - } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { - // no need to do anything - } - - } catch (TerminationException e) { - log.error(e); - } - } - - }); - - topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { @Override protected void onEvent(Event event) { - try { - TopologyManager.acquireReadLock(); - - MemberMaintenanceModeEvent e = (MemberMaintenanceModeEvent) event; - String memberId = e.getMemberId(); - String partitionId = e.getPartitionId(); - String networkPartitionId = e.getNetworkPartitionId(); - - String clusterId = e.getClusterId(); + MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event; + String clusterId = maintenanceModeEvent.getClusterId(); AbstractClusterMonitor monitor; - AutoscalerContext asCtx = AutoscalerContext.getInstance(); - if (asCtx.clusterMonitorExist(clusterId)) { - monitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); - } else { - if(log.isDebugEnabled()){ + monitor = asCtx.getClusterMonitor(clusterId); + if (null == monitor) { + if (log.isDebugEnabled()) { log.debug(String.format("A cluster monitor is not found in autoscaler context " - + "[cluster] %s", clusterId)); + + "[cluster] %s", clusterId)); } return; } - - if(monitor.getClusterType() == ClusterType.VMServiceCluster - || monitor.getClusterType() == ClusterType.VMLbCluster) { - - PartitionContext partitionContext; - partitionContext = ((VMClusterMonitor) monitor).getNetworkPartitionCtxt(networkPartitionId).getPartitionCtxt(partitionId); - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if (log.isDebugEnabled()) { - log.debug(String.format("Member has been moved as pending termination: " - + "[member] %s", memberId)); - } - partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); - } else if(monitor.getClusterType() == ClusterType.DockerServiceCluster) { - // no need to do anything - } - + monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent); } catch (Exception e) { - log.error("Error processing event", e); - } finally { - TopologyManager.releaseReadLock(); + String msg = "Error processing event " + e.getLocalizedMessage(); + log.error(msg, e); } } }); @@ -529,27 +296,14 @@ public class AutoscalerTopologyEventReceiver implements Runnable { topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() { @Override protected void onEvent(Event event) { -// try { -// TopologyManager.acquireReadLock(); -// -// // Remove all clusters of given service from context -// ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event; -// for(Service service : TopologyManager.getTopology().getServices()) { -// for(Cluster cluster : service.getClusters()) { -// removeMonitor(cluster.getHostName()); -// } -// } -// } -// finally { -// TopologyManager.releaseReadLock(); -// } + } }); } private class ClusterMonitorAdder implements Runnable { private Cluster cluster; - private String clusterMonitorType; + public ClusterMonitorAdder(Cluster cluster) { this.cluster = cluster; } @@ -567,38 +321,41 @@ public class AutoscalerTopologyEventReceiver implements Runnable { try { monitor = ClusterMonitorFactory.getMonitor(cluster); success = true; - clusterMonitorType = monitor.getClusterType().name(); } catch (PolicyValidationException e) { - String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.debug(msg, e); + if (log.isDebugEnabled()) { + String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); + log.debug(msg, e); + } retries--; - } catch (PartitionValidationException e) { - String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); - log.debug(msg, e); + if (log.isDebugEnabled()) { + String msg = "Cluster monitor creation failed for cluster: " + cluster.getClusterId(); + log.debug(msg, e); + } retries--; } } while (!success && retries != 0); if (monitor == null) { String msg = "Cluster monitor creation failed, even after retrying for 5 times, " - + "for cluster: " + cluster.getClusterId(); + + "for cluster: " + cluster.getClusterId(); log.error(msg); throw new RuntimeException(msg); } - + //TODO private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + // scheduler.scheduleAtFixedRate(monitor, 0, getMonitorInterval(), TimeUnit.MILLISECONDS); Thread th = new Thread(monitor); th.start(); AutoscalerContext.getInstance().addClusterMonitor(monitor); if (log.isInfoEnabled()) { - log.info(String.format("%s monitor has been added successfully: [cluster] %s", - clusterMonitorType, cluster.getClusterId())); + log.info(String.format("Cluster monitor has been added successfully: [cluster] %s", + cluster.getClusterId())); } } } - + @SuppressWarnings("unused") - private void runTerminateAllRule(VMClusterMonitor monitor) { + private void runTerminateAllRule(VMClusterMonitor monitor) { FactHandle terminateAllFactHandle = null; @@ -621,9 +378,13 @@ public class AutoscalerTopologyEventReceiver implements Runnable { protected synchronized void startClusterMonitor(Cluster cluster) { Thread th = null; - if (!AutoscalerContext.getInstance().clusterMonitorExist(cluster.getClusterId())) { - th = new Thread(new ClusterMonitorAdder(cluster)); - } + + AbstractClusterMonitor monitor; + monitor = AutoscalerContext.getInstance().getClusterMonitor(cluster.getClusterId()); + + if (null == monitor) { + th = new Thread(new ClusterMonitorAdder(cluster)); + } if (th != null) { th.start(); try { @@ -632,9 +393,8 @@ public class AutoscalerTopologyEventReceiver implements Runnable { } if (log.isDebugEnabled()) { - log.debug(String - .format("Cluster monitor thread has been started successfully: [cluster] %s ", - cluster.getClusterId())); + log.debug(String.format("Cluster monitor thread has been started successfully: " + + "[cluster] %s ", cluster.getClusterId())); } } }
http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java index cb60027..6061c3b 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java @@ -19,130 +19,211 @@ package org.apache.stratos.autoscaler.monitor; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.common.enums.ClusterType; import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.GradientOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.health.stat.MemberAverageLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberAverageMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent; +import org.apache.stratos.messaging.event.health.stat.MemberGradientOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberGradientOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.MemberSecondDerivativeOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfLoadAverageEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfMemoryConsumptionEvent; +import org.apache.stratos.messaging.event.health.stat.SecondDerivativeOfRequestsInFlightEvent; +import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; +import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; +import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; +import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; +import org.apache.stratos.messaging.event.topology.MemberStartedEvent; +import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; import org.drools.runtime.StatefulKnowledgeSession; import org.drools.runtime.rule.FactHandle; /* * Every cluster monitor, which are monitoring a cluster, should extend this class. */ -public abstract class AbstractClusterMonitor implements Runnable{ - +public abstract class AbstractClusterMonitor implements Runnable { + private String clusterId; private String serviceId; - private ClusterType clusterType; - private ClusterStatus status; - private int monitorInterval; - - protected FactHandle minCheckFactHandle; - protected FactHandle scaleCheckFactHandle; - private StatefulKnowledgeSession minCheckKnowledgeSession; - private StatefulKnowledgeSession scaleCheckKnowledgeSession; - private boolean isDestroyed; - - private AutoscalerRuleEvaluator autoscalerRuleEvaluator; - - protected AbstractClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, - AutoscalerRuleEvaluator autoscalerRuleEvaluator) { - - super(); - this.clusterId = clusterId; - this.serviceId = serviceId; - this.clusterType = clusterType; - this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; + private ClusterStatus status; + private int monitoringIntervalMilliseconds; + + protected FactHandle minCheckFactHandle; + protected FactHandle scaleCheckFactHandle; + private StatefulKnowledgeSession minCheckKnowledgeSession; + private StatefulKnowledgeSession scaleCheckKnowledgeSession; + private boolean isDestroyed; + + private AutoscalerRuleEvaluator autoscalerRuleEvaluator; + + protected AbstractClusterMonitor(String clusterId, String serviceId, + AutoscalerRuleEvaluator autoscalerRuleEvaluator) { + + super(); + this.clusterId = clusterId; + this.serviceId = serviceId; + this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession(); this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession(); - } + } + + protected abstract void readConfigurations(); + + protected abstract void monitor(); - protected abstract void readConfigurations(); - protected abstract void monitor(); public abstract void destroy(); - - public String getClusterId() { - return clusterId; - } - - public void setClusterId(String clusterId) { - this.clusterId = clusterId; - } - - public void setStatus(ClusterStatus status) { - this.status = status; - } - - public ClusterType getClusterType() { - return clusterType; - } - - public ClusterStatus getStatus() { - return status; - } - - public String getServiceId() { - return serviceId; - } - - public void setServiceId(String serviceId) { - this.serviceId = serviceId; - } - - public int getMonitorInterval() { - return monitorInterval; - } - - public void setMonitorInterval(int monitorInterval) { - this.monitorInterval = monitorInterval; - } - - public FactHandle getMinCheckFactHandle() { - return minCheckFactHandle; - } - - public void setMinCheckFactHandle(FactHandle minCheckFactHandle) { - this.minCheckFactHandle = minCheckFactHandle; - } - - public FactHandle getScaleCheckFactHandle() { - return scaleCheckFactHandle; - } - - public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) { - this.scaleCheckFactHandle = scaleCheckFactHandle; - } - - public StatefulKnowledgeSession getMinCheckKnowledgeSession() { - return minCheckKnowledgeSession; - } - - public void setMinCheckKnowledgeSession( - StatefulKnowledgeSession minCheckKnowledgeSession) { - this.minCheckKnowledgeSession = minCheckKnowledgeSession; - } - - public StatefulKnowledgeSession getScaleCheckKnowledgeSession() { - return scaleCheckKnowledgeSession; - } - - public void setScaleCheckKnowledgeSession( - StatefulKnowledgeSession scaleCheckKnowledgeSession) { - this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession; - } - - public boolean isDestroyed() { - return isDestroyed; - } - - public void setDestroyed(boolean isDestroyed) { - this.isDestroyed = isDestroyed; - } - - public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() { - return autoscalerRuleEvaluator; - } - - public void setAutoscalerRuleEvaluator( - AutoscalerRuleEvaluator autoscalerRuleEvaluator) { - this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; - } + + //handle health events + public abstract void handleAverageLoadAverageEvent( + AverageLoadAverageEvent averageLoadAverageEvent); + + public abstract void handleGradientOfLoadAverageEvent( + GradientOfLoadAverageEvent gradientOfLoadAverageEvent); + + public abstract void handleSecondDerivativeOfLoadAverageEvent( + SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent); + + public abstract void handleAverageMemoryConsumptionEvent( + AverageMemoryConsumptionEvent averageMemoryConsumptionEvent); + + public abstract void handleGradientOfMemoryConsumptionEvent( + GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent); + + public abstract void handleSecondDerivativeOfMemoryConsumptionEvent( + SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent); + + public abstract void handleAverageRequestsInFlightEvent( + AverageRequestsInFlightEvent averageRequestsInFlightEvent); + + public abstract void handleGradientOfRequestsInFlightEvent( + GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent); + + public abstract void handleSecondDerivativeOfRequestsInFlightEvent( + SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent); + + public abstract void handleMemberAverageMemoryConsumptionEvent( + MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent); + + public abstract void handleMemberGradientOfMemoryConsumptionEvent( + MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent); + + public abstract void handleMemberSecondDerivativeOfMemoryConsumptionEvent( + MemberSecondDerivativeOfMemoryConsumptionEvent memberSecondDerivativeOfMemoryConsumptionEvent); + + + public abstract void handleMemberAverageLoadAverageEvent( + MemberAverageLoadAverageEvent memberAverageLoadAverageEvent); + + public abstract void handleMemberGradientOfLoadAverageEvent( + MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent); + + public abstract void handleMemberSecondDerivativeOfLoadAverageEvent( + MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent); + + public abstract void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent); + + //handle topology events + public abstract void handleMemberStartedEvent(MemberStartedEvent memberStartedEvent); + + public abstract void handleMemberActivatedEvent(MemberActivatedEvent memberActivatedEvent); + + public abstract void handleMemberMaintenanceModeEvent( + MemberMaintenanceModeEvent maintenanceModeEvent); + + public abstract void handleMemberReadyToShutdownEvent( + MemberReadyToShutdownEvent memberReadyToShutdownEvent); + + public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent); + + public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent); + + public String getClusterId() { + return clusterId; + } + + public void setClusterId(String clusterId) { + this.clusterId = clusterId; + } + + public void setStatus(ClusterStatus status) { + this.status = status; + } + + public ClusterStatus getStatus() { + return status; + } + + public String getServiceId() { + return serviceId; + } + + public void setServiceId(String serviceId) { + this.serviceId = serviceId; + } + + public int getMonitorIntervalMilliseconds() { + return monitoringIntervalMilliseconds; + } + + public void setMonitorIntervalMilliseconds(int monitorIntervalMilliseconds) { + this.monitoringIntervalMilliseconds = monitorIntervalMilliseconds; + } + + public FactHandle getMinCheckFactHandle() { + return minCheckFactHandle; + } + + public void setMinCheckFactHandle(FactHandle minCheckFactHandle) { + this.minCheckFactHandle = minCheckFactHandle; + } + + public FactHandle getScaleCheckFactHandle() { + return scaleCheckFactHandle; + } + + public void setScaleCheckFactHandle(FactHandle scaleCheckFactHandle) { + this.scaleCheckFactHandle = scaleCheckFactHandle; + } + + public StatefulKnowledgeSession getMinCheckKnowledgeSession() { + return minCheckKnowledgeSession; + } + + public void setMinCheckKnowledgeSession( + StatefulKnowledgeSession minCheckKnowledgeSession) { + this.minCheckKnowledgeSession = minCheckKnowledgeSession; + } + + public StatefulKnowledgeSession getScaleCheckKnowledgeSession() { + return scaleCheckKnowledgeSession; + } + + public void setScaleCheckKnowledgeSession( + StatefulKnowledgeSession scaleCheckKnowledgeSession) { + this.scaleCheckKnowledgeSession = scaleCheckKnowledgeSession; + } + + public boolean isDestroyed() { + return isDestroyed; + } + + public void setDestroyed(boolean isDestroyed) { + this.isDestroyed = isDestroyed; + } + + public AutoscalerRuleEvaluator getAutoscalerRuleEvaluator() { + return autoscalerRuleEvaluator; + } + + public void setAutoscalerRuleEvaluator( + AutoscalerRuleEvaluator autoscalerRuleEvaluator) { + this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java index bd01dc6..208e4ce 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitorFactory.java @@ -52,30 +52,32 @@ import org.apache.stratos.messaging.util.Constants; * Factory class for creating cluster monitors. */ public class ClusterMonitorFactory { - - private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class); - - /** - * @param cluster the cluster to be monitored - * @return the created cluster monitor - * @throws PolicyValidationException when deployment policy is not valid - * @throws PartitionValidationException when partition is not valid - */ - public static AbstractClusterMonitor getMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException { - - AbstractClusterMonitor clusterMonitor; - if(cluster.isKubernetesCluster()){ - clusterMonitor = getDockerServiceClusterMonitor(cluster); - } else if (cluster.isLbCluster()){ - clusterMonitor = getVMLbClusterMonitor(cluster); - } else { - clusterMonitor = getVMServiceClusterMonitor(cluster); - } - - return clusterMonitor; - } - - private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException { + + private static final Log log = LogFactory.getLog(ClusterMonitorFactory.class); + + /** + * @param cluster the cluster to be monitored + * @return the created cluster monitor + * @throws PolicyValidationException when deployment policy is not valid + * @throws PartitionValidationException when partition is not valid + */ + public static AbstractClusterMonitor getMonitor(Cluster cluster) + throws PolicyValidationException, PartitionValidationException { + + AbstractClusterMonitor clusterMonitor; + if (cluster.isKubernetesCluster()) { + clusterMonitor = getDockerServiceClusterMonitor(cluster); + } else if (cluster.isLbCluster()) { + clusterMonitor = getVMLbClusterMonitor(cluster); + } else { + clusterMonitor = getVMServiceClusterMonitor(cluster); + } + + return clusterMonitor; + } + + private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster cluster) + throws PolicyValidationException, PartitionValidationException { // FIXME fix the following code to correctly update // AutoscalerContext context = AutoscalerContext.getInstance(); if (null == cluster) { @@ -91,11 +93,11 @@ public class ClusterMonitorFactory { } AutoscalePolicy policy = - PolicyManager.getInstance() - .getAutoscalePolicy(autoscalePolicyName); + PolicyManager.getInstance() + .getAutoscalePolicy(autoscalePolicyName); DeploymentPolicy deploymentPolicy = - PolicyManager.getInstance() - .getDeploymentPolicy(deploymentPolicyName); + PolicyManager.getInstance() + .getDeploymentPolicy(deploymentPolicyName); if (deploymentPolicy == null) { String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName; @@ -106,8 +108,8 @@ public class ClusterMonitorFactory { Partition[] allPartitions = deploymentPolicy.getAllPartitions(); if (allPartitions == null) { String msg = - "Deployment Policy's Partitions are null. Policy name: " + - deploymentPolicyName; + "Deployment Policy's Partitions are null. Policy name: " + + deploymentPolicyName; log.error(msg); throw new PolicyValidationException(msg); } @@ -115,98 +117,100 @@ public class ClusterMonitorFactory { CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy); VMServiceClusterMonitor clusterMonitor = - new VMServiceClusterMonitor(cluster.getClusterId(), - cluster.getServiceName(), - deploymentPolicy, policy); + new VMServiceClusterMonitor(cluster.getClusterId(), + cluster.getServiceName(), + deploymentPolicy, policy); clusterMonitor.setStatus(ClusterStatus.Created); - - for (PartitionGroup partitionGroup: deploymentPolicy.getPartitionGroups()){ + + for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) { NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(), - partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()); + partitionGroup.getPartitionAlgo(), + partitionGroup.getPartitions()); - for(Partition partition: partitionGroup.getPartitions()){ + for (Partition partition : partitionGroup.getPartitions()) { PartitionContext partitionContext = new PartitionContext(partition); partitionContext.setServiceName(cluster.getServiceName()); partitionContext.setProperties(cluster.getProperties()); partitionContext.setNetworkPartitionId(partitionGroup.getId()); - - for (Member member: cluster.getMembers()){ + + for (Member member : cluster.getMembers()) { String memberId = member.getMemberId(); - if(member.getPartitionId().equalsIgnoreCase(partition.getId())){ + if (member.getPartitionId().equalsIgnoreCase(partition.getId())) { MemberContext memberContext = new MemberContext(); memberContext.setClusterId(member.getClusterId()); memberContext.setMemberId(memberId); memberContext.setPartition(partition); memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties())); - - if(MemberStatus.Activated.equals(member.getStatus())){ + + if (MemberStatus.Activated.equals(member.getStatus())) { partitionContext.addActiveMember(memberContext); // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); // partitionContext.incrementCurrentActiveMemberCount(1); - } else if(MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())){ + } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) { partitionContext.addPendingMember(memberContext); // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); - } else if(MemberStatus.Suspended.equals(member.getStatus())){ + } else if (MemberStatus.Suspended.equals(member.getStatus())) { // partitionContext.addFaultyMember(memberId); } partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if(log.isInfoEnabled()){ + if (log.isInfoEnabled()) { log.info(String.format("Member stat context has been added: [member] %s", memberId)); } } } networkPartitionContext.addPartitionContext(partitionContext); - if(log.isInfoEnabled()){ + if (log.isInfoEnabled()) { log.info(String.format("Partition context has been added: [partition] %s", - partitionContext.getPartitionId())); + partitionContext.getPartitionId())); } } clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext); - if(log.isInfoEnabled()){ + if (log.isInfoEnabled()) { log.info(String.format("Network partition context has been added: [network partition] %s", - networkPartitionContext.getId())); + networkPartitionContext.getId())); } } - - + + // find lb reference type java.util.Properties props = cluster.getProperties(); - - if(props.containsKey(Constants.LOAD_BALANCER_REF)) { + + if (props.containsKey(Constants.LOAD_BALANCER_REF)) { String value = props.getProperty(Constants.LOAD_BALANCER_REF); clusterMonitor.setLbReferenceType(value); - if(log.isDebugEnabled()) { - log.debug("Set the lb reference type: "+value); + if (log.isDebugEnabled()) { + log.debug("Set the lb reference type: " + value); } } - + // set hasPrimary property // hasPrimary is true if there are primary members available in that cluster clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY))); - log.info("VMServiceClusterMonitor created: "+clusterMonitor.toString()); + log.info("VMServiceClusterMonitor created: " + clusterMonitor.toString()); return clusterMonitor; } - + private static Properties convertMemberPropsToMemberContextProps( - java.util.Properties properties) { - Properties props = new Properties(); - for (Map.Entry<Object, Object> e : properties.entrySet() ) { - Property prop = new Property(); - prop.setName((String)e.getKey()); - prop.setValue((String)e.getValue()); - props.addProperties(prop); - } - return props; - } - - - private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster) throws PolicyValidationException, PartitionValidationException { + java.util.Properties properties) { + Properties props = new Properties(); + for (Map.Entry<Object, Object> e : properties.entrySet()) { + Property prop = new Property(); + prop.setName((String) e.getKey()); + prop.setValue((String) e.getValue()); + props.addProperties(prop); + } + return props; + } + + + private static VMLbClusterMonitor getVMLbClusterMonitor(Cluster cluster) + throws PolicyValidationException, PartitionValidationException { // FIXME fix the following code to correctly update // AutoscalerContext context = AutoscalerContext.getInstance(); if (null == cluster) { @@ -222,11 +226,11 @@ public class ClusterMonitorFactory { } AutoscalePolicy policy = - PolicyManager.getInstance() - .getAutoscalePolicy(autoscalePolicyName); + PolicyManager.getInstance() + .getAutoscalePolicy(autoscalePolicyName); DeploymentPolicy deploymentPolicy = - PolicyManager.getInstance() - .getDeploymentPolicy(deploymentPolicyName); + PolicyManager.getInstance() + .getDeploymentPolicy(deploymentPolicyName); if (deploymentPolicy == null) { String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName; @@ -236,21 +240,21 @@ public class ClusterMonitorFactory { String clusterId = cluster.getClusterId(); VMLbClusterMonitor clusterMonitor = - new VMLbClusterMonitor(clusterId, - cluster.getServiceName(), - deploymentPolicy, policy); + new VMLbClusterMonitor(clusterId, + cluster.getServiceName(), + deploymentPolicy, policy); clusterMonitor.setStatus(ClusterStatus.Created); // partition group = network partition context for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) { NetworkPartitionLbHolder networkPartitionLbHolder = - PartitionManager.getInstance() - .getNetworkPartitionLbHolder(partitionGroup.getId()); + PartitionManager.getInstance() + .getNetworkPartitionLbHolder(partitionGroup.getId()); // PartitionManager.getInstance() // .getNetworkPartitionLbHolder(partitionGroup.getId()); // FIXME pick a random partition Partition partition = - partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)]; + partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)]; PartitionContext partitionContext = new PartitionContext(partition); partitionContext.setServiceName(cluster.getServiceName()); partitionContext.setProperties(cluster.getProperties()); @@ -258,7 +262,8 @@ public class ClusterMonitorFactory { partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(), - partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions()) ; + partitionGroup.getPartitionAlgo(), + partitionGroup.getPartitions()); for (Member member : cluster.getMembers()) { String memberId = member.getMemberId(); if (member.getNetworkPartitionId().equalsIgnoreCase(networkPartitionContext.getId())) { @@ -280,23 +285,23 @@ public class ClusterMonitorFactory { } partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); - if(log.isInfoEnabled()){ + if (log.isInfoEnabled()) { log.info(String.format("Member stat context has been added: [member] %s", memberId)); } } } networkPartitionContext.addPartitionContext(partitionContext); - + // populate lb cluster id in network partition context. java.util.Properties props = cluster.getProperties(); // get service type of load balanced cluster String loadBalancedServiceType = props.getProperty(Constants.LOAD_BALANCED_SERVICE_TYPE); - - if(props.containsKey(Constants.LOAD_BALANCER_REF)) { + + if (props.containsKey(Constants.LOAD_BALANCER_REF)) { String value = props.getProperty(Constants.LOAD_BALANCER_REF); - + if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) { networkPartitionLbHolder.setDefaultLbClusterId(clusterId); @@ -317,13 +322,17 @@ public class ClusterMonitorFactory { clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext); } - log.info("VMLbClusterMonitor created: "+clusterMonitor.toString()); + log.info("VMLbClusterMonitor created: " + clusterMonitor.toString()); return clusterMonitor; } - - private static DockerServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) { - if (null == cluster) { + /** + * @param cluster - the cluster which needs to be monitored + * @return - the cluster monitor + */ + private static KubernetesServiceClusterMonitor getDockerServiceClusterMonitor(Cluster cluster) { + + if (null == cluster) { return null; } @@ -335,42 +344,43 @@ public class ClusterMonitorFactory { AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(autoscalePolicyName); java.util.Properties props = cluster.getProperties(); String kubernetesHostClusterID = props.getProperty(StratosConstants.KUBERNETES_CLUSTER_ID); - KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID, - cluster.getClusterId()); - - DockerServiceClusterMonitor dockerClusterMonitor = new DockerServiceClusterMonitor( - kubernetesClusterCtxt, - cluster.getClusterId(), - cluster.getServiceName(), - policy); - + KubernetesClusterContext kubernetesClusterCtxt = new KubernetesClusterContext(kubernetesHostClusterID, + cluster.getClusterId()); + + KubernetesServiceClusterMonitor dockerClusterMonitor = new KubernetesServiceClusterMonitor( + kubernetesClusterCtxt, + cluster.getClusterId(), + cluster.getServiceName(), + policy); + dockerClusterMonitor.setStatus(ClusterStatus.Created); - - for (Member member : cluster.getMembers()) { - String memberId = member.getMemberId(); - String clusterId = member.getClusterId(); - MemberContext memberContext = new MemberContext(); - memberContext.setMemberId(memberId); - memberContext.setClusterId(clusterId); - - if (MemberStatus.Activated.equals(member.getStatus())) { - dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext); - } else if (MemberStatus.Created.equals(member.getStatus()) - || MemberStatus.Starting.equals(member.getStatus())) { - dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext); - } - } + + //populate the members after restarting + for (Member member : cluster.getMembers()) { + String memberId = member.getMemberId(); + String clusterId = member.getClusterId(); + MemberContext memberContext = new MemberContext(); + memberContext.setMemberId(memberId); + memberContext.setClusterId(clusterId); + + if (MemberStatus.Activated.equals(member.getStatus())) { + dockerClusterMonitor.getKubernetesClusterCtxt().addActiveMember(memberContext); + } else if (MemberStatus.Created.equals(member.getStatus()) + || MemberStatus.Starting.equals(member.getStatus())) { + dockerClusterMonitor.getKubernetesClusterCtxt().addPendingMember(memberContext); + } + } // find lb reference type - if(props.containsKey(Constants.LOAD_BALANCER_REF)) { + if (props.containsKey(Constants.LOAD_BALANCER_REF)) { String value = props.getProperty(Constants.LOAD_BALANCER_REF); dockerClusterMonitor.setLbReferenceType(value); - if(log.isDebugEnabled()) { - log.debug("Set the lb reference type: "+value); + if (log.isDebugEnabled()) { + log.debug("Set the lb reference type: " + value); } } - - log.info("KubernetesServiceClusterMonitor created: "+ dockerClusterMonitor.toString()); + + log.info("KubernetesServiceClusterMonitor created: " + dockerClusterMonitor.toString()); return dockerClusterMonitor; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java deleted file mode 100644 index 2621690..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ContainerClusterMonitor.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.monitor; - -import org.apache.stratos.autoscaler.KubernetesClusterContext; -import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; -import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.common.enums.ClusterType; - -/* - * Every container cluster monitor should extend this class - */ -public abstract class ContainerClusterMonitor extends AbstractClusterMonitor { - - private KubernetesClusterContext kubernetesClusterCtxt; - protected AutoscalePolicy autoscalePolicy; - - protected ContainerClusterMonitor(String clusterId, String serviceId, ClusterType clusterType, - KubernetesClusterContext kubernetesClusterContext, - AutoscalerRuleEvaluator autoscalerRuleEvaluator, AutoscalePolicy autoscalePolicy){ - - super(clusterId, serviceId, clusterType, autoscalerRuleEvaluator); - this.kubernetesClusterCtxt = kubernetesClusterContext; - this.autoscalePolicy = autoscalePolicy; - } - - public KubernetesClusterContext getKubernetesClusterCtxt() { - return kubernetesClusterCtxt; - } - - public void setKubernetesClusterCtxt( - KubernetesClusterContext kubernetesClusterCtxt) { - this.kubernetesClusterCtxt = kubernetesClusterCtxt; - } - - public AutoscalePolicy getAutoscalePolicy() { - return autoscalePolicy; - } - - public void setAutoscalePolicy(AutoscalePolicy autoscalePolicy) { - this.autoscalePolicy = autoscalePolicy; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/31056109/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java deleted file mode 100644 index 850a295..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/DockerServiceClusterMonitor.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.monitor; - -import java.util.Properties; - -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.KubernetesClusterContext; -import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; -import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; -import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.autoscaler.util.AutoScalerConstants; -import org.apache.stratos.autoscaler.util.ConfUtil; -import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; -import org.apache.stratos.common.constants.StratosConstants; -import org.apache.stratos.common.enums.ClusterType; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -/* - * It is monitoring a kubernetes service cluster periodically. - */ -public final class DockerServiceClusterMonitor extends ContainerClusterMonitor{ - - private static final Log log = LogFactory.getLog(DockerServiceClusterMonitor.class); - - private String lbReferenceType; - private int numberOfReplicasInServiceCluster = 0; - int retryInterval = 60000; - - public DockerServiceClusterMonitor(KubernetesClusterContext kubernetesClusterCtxt, - String serviceClusterID, String serviceId, AutoscalePolicy autoscalePolicy) { - super(serviceClusterID, serviceId, ClusterType.DockerServiceCluster, kubernetesClusterCtxt, - new AutoscalerRuleEvaluator(), autoscalePolicy); - readConfigurations(); - } - - @Override - public void run() { - try { - // TODO make this configurable, - // this is the delay the min check of normal cluster monitor to wait - // until LB monitor is added - Thread.sleep(60000); - } catch (InterruptedException ignore) { - } - - while (!isDestroyed()) { - if (log.isDebugEnabled()) { - log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString()); - } - try { - if (!ClusterStatus.In_Maintenance.equals(getStatus())) { - monitor(); - } else { - if (log.isDebugEnabled()) { - log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in " - + ClusterStatus.In_Maintenance + " mode......"); - } - } - } catch (Exception e) { - log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(), - e); - } - try { - Thread.sleep(getMonitorInterval()); - } catch (InterruptedException ignore) { - } - } - } - - @Override - protected void monitor() { - - // is container created successfully? - boolean success = false; - String kubernetesClusterId = getKubernetesClusterCtxt().getKubernetesClusterID(); - - try { - TopologyManager.acquireReadLock(); - Properties props = TopologyManager.getTopology().getService(getServiceId()).getCluster(getClusterId()).getProperties(); - int minReplicas = Integer.parseInt(props.getProperty(StratosConstants.KUBERNETES_MIN_REPLICAS)); - - int nonTerminatedMembers = getKubernetesClusterCtxt().getActiveMembers().size() + getKubernetesClusterCtxt().getPendingMembers().size(); - - if (nonTerminatedMembers == 0) { - - while (!success) { - try { - - MemberContext memberContext = CloudControllerClient.getInstance().createContainer(kubernetesClusterId, getClusterId()); - if(null != memberContext) { - getKubernetesClusterCtxt().addPendingMember(memberContext); - success = true; - numberOfReplicasInServiceCluster = minReplicas; - if(log.isDebugEnabled()){ - log.debug(String.format("Pending member added, [member] %s [kub cluster] %s", - memberContext.getMemberId(), getKubernetesClusterCtxt().getKubernetesClusterID())); - } - } else { - if (log.isDebugEnabled()) { - log.debug("Returned member context is null, did not add to pending members"); - } - } - } catch (Throwable e) { - if (log.isDebugEnabled()) { - String message = "Cannot create a container, will retry in "+(retryInterval/1000)+"s"; - log.debug(message, e); - } - } - - try { - Thread.sleep(retryInterval); - } catch (InterruptedException e1) { - } - } - } - } finally { - TopologyManager.releaseReadLock(); - } - } - - @Override - public void destroy() { - getMinCheckKnowledgeSession().dispose(); - getScaleCheckKnowledgeSession().dispose(); - setDestroyed(true); - if(log.isDebugEnabled()) { - log.debug("KubernetesServiceClusterMonitor Drools session has been disposed. "+this.toString()); - } - } - - @Override - protected void readConfigurations () { - XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); - int monitorInterval = conf.getInt(AutoScalerConstants.AUTOSCALER_MONITOR_INTERVAL, 90000); - setMonitorInterval(monitorInterval); - if (log.isDebugEnabled()) { - log.debug("KubernetesServiceClusterMonitor task interval: " + getMonitorInterval()); - } - } - - @Override - public String toString() { - return "KubernetesServiceClusterMonitor " - + "[ kubernetesHostClusterId=" + getKubernetesClusterCtxt().getKubernetesClusterID() - + ", clusterId=" + getClusterId() - + ", serviceId=" + getServiceId() + "]"; - } - - public String getLbReferenceType() { - return lbReferenceType; - } - - public void setLbReferenceType(String lbReferenceType) { - this.lbReferenceType = lbReferenceType; - } -} \ No newline at end of file
