depreciating the statusChecker and using processor chain
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/3f45f636 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/3f45f636 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/3f45f636 Branch: refs/heads/master Commit: 3f45f636a62b6f5859fd8ef1da4c700c2cc0cb28 Parents: be385de Author: reka <[email protected]> Authored: Wed Dec 3 14:09:59 2014 +0530 Committer: reka <[email protected]> Committed: Wed Dec 3 14:09:59 2014 +0530 ---------------------------------------------------------------------- .../monitor/cluster/VMClusterMonitor.java | 96 ++-- .../component/ParentComponentMonitor.java | 11 +- .../status/processor/StatusChecker.java | 520 ------------------- .../stratos/autoscaler/util/StatusChecker.java | 519 ++++++++++++++++++ .../manager/client/AutoscalerServiceClient.java | 4 + .../rest/endpoint/api/StratosApiV41Utils.java | 20 +- .../src/main/resources/AutoScalerService.wsdl | 2 +- 7 files changed, 588 insertions(+), 584 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/3f45f636/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java index 4bba330..c5b0a8e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java @@ -18,8 +18,6 @@ */ package org.apache.stratos.autoscaler.monitor.cluster; -import java.util.*; - import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,8 +25,8 @@ import org.apache.stratos.autoscaler.client.CloudControllerClient; import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import org.apache.stratos.autoscaler.context.cluster.VMClusterContext; import org.apache.stratos.autoscaler.context.member.MemberStatsContext; -import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; +import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.exception.InvalidArgumentException; import org.apache.stratos.autoscaler.exception.cartridge.TerminationException; @@ -36,10 +34,13 @@ import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent; import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; -import org.apache.stratos.autoscaler.status.processor.StatusChecker; +import org.apache.stratos.autoscaler.util.StatusChecker; +import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor; +import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInActiveProcessor; import org.apache.stratos.autoscaler.util.AutoScalerConstants; import org.apache.stratos.autoscaler.util.AutoscalerUtil; import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.Properties; import org.apache.stratos.common.Property; @@ -50,14 +51,11 @@ import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.health.stat.*; -import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; -import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; -import org.apache.stratos.messaging.event.topology.MemberMaintenanceModeEvent; -import org.apache.stratos.messaging.event.topology.MemberReadyToShutdownEvent; -import org.apache.stratos.messaging.event.topology.MemberStartedEvent; -import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent; +import org.apache.stratos.messaging.event.topology.*; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import java.util.*; + /** * Is responsible for monitoring a service cluster. This runs periodically * and perform minimum instance check and scaling check using the underlying @@ -80,11 +78,20 @@ public class VMClusterMonitor extends AbstractClusterMonitor { readConfigurations(); } - public void addClusterLevelNWPartitionContext (ClusterLevelNetworkPartitionContext clusterLevelNWPartitionCtxt) { + private static void terminateMember(String memberId) { + try { + CloudControllerClient.getInstance().terminate(memberId); + + } catch (TerminationException e) { + log.error("Unable to terminate member [member id ] " + memberId, e); + } + } + + public void addClusterLevelNWPartitionContext(ClusterLevelNetworkPartitionContext clusterLevelNWPartitionCtxt) { networkPartitionIdToClusterLevelNetworkPartitionCtxts.put(clusterLevelNWPartitionCtxt.getId(), clusterLevelNWPartitionCtxt); } - public ClusterLevelNetworkPartitionContext getClusterLevelNWPartitionContext (String nwPartitionId) { + public ClusterLevelNetworkPartitionContext getClusterLevelNWPartitionContext(String nwPartitionId) { return networkPartitionIdToClusterLevelNetworkPartitionCtxts.get(nwPartitionId); } @@ -255,7 +262,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { instanceContext.setLoadAverageReset(false); } else if (log.isDebugEnabled()) { log.debug(String.format("Scale rule will not run since the LB statistics have not " + - "received before this cycle for network partition %s", + "received before this cycle for network partition %s", networkPartitionContext.getId())); } @@ -291,13 +298,6 @@ public class VMClusterMonitor extends AbstractClusterMonitor { } } - @Override - public String toString() { - return "VMClusterMonitor [clusterId=" + getClusterId() + -// ", lbReferenceType=" + lbReferenceType + - ", hasPrimary=" + hasPrimary + " ]"; - } - // public String getLbReferenceType() { // return lbReferenceType; // } @@ -306,6 +306,13 @@ public class VMClusterMonitor extends AbstractClusterMonitor { // this.lbReferenceType = lbReferenceType; // } + @Override + public String toString() { + return "VMClusterMonitor [clusterId=" + getClusterId() + +// ", lbReferenceType=" + lbReferenceType + + ", hasPrimary=" + hasPrimary + " ]"; + } + public boolean isHasPrimary() { return hasPrimary; } @@ -356,7 +363,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { //TODO get min instance count from instance context - float requiredInstanceCount = 0 ;/* = clusterLevelNetworkPartitionContext.getMinInstanceCount() * scalingFactorBasedOnDependencies;*/ + float requiredInstanceCount = 0;/* = clusterLevelNetworkPartitionContext.getMinInstanceCount() * scalingFactorBasedOnDependencies;*/ int roundedRequiredInstanceCount = getRoundedInstanceCount(requiredInstanceCount, vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor()); clusterLevelNetworkPartitionContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount); @@ -375,6 +382,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { MonitorStatusEventBuilder.handleClusterScalingEvent(this.parent, networkPartitionId, factor, this.id); } + @Override public void handleGradientOfLoadAverageEvent( GradientOfLoadAverageEvent gradientOfLoadAverageEvent) { @@ -412,7 +420,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { + "[network-partition] %s [value] %s", clusterId, networkPartitionId, value)); } ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, - networkPartitionId); + networkPartitionId); if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setLoadAverageSecondDerivative(value); } else { @@ -511,7 +519,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { ClusterInstanceContext clusterLevelNetworkPartitionContext = getClusterInstanceContext(instanceId, networkPartitionId); - if(null != clusterLevelNetworkPartitionContext){ + if (null != clusterLevelNetworkPartitionContext) { clusterLevelNetworkPartitionContext.setAverageRequestsServedPerInstance(floatValue); } else { @@ -612,7 +620,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, networkPartitionId); ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( - member.getPartitionId()); + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -635,7 +643,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, networkPartitionId); ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( - member.getPartitionId()); + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -664,7 +672,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, networkPartitionId); ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( - member.getPartitionId()); + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -687,7 +695,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, networkPartitionId); ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( - member.getPartitionId()); + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -711,7 +719,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, networkPartitionId); ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt( - member.getPartitionId()); + member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -727,6 +735,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { public void handleMemberFaultEvent(MemberFaultEvent memberFaultEvent) { String memberId = memberFaultEvent.getMemberId(); + String clusterId = memberFaultEvent.getClusterId(); Member member = getMemberByMemberId(memberId); String instanceId = memberFaultEvent.getInstanceId(); String networkPartitionId = memberFaultEvent.getNetworkPartitionId(); @@ -762,13 +771,12 @@ public class VMClusterMonitor extends AbstractClusterMonitor { partitionCtxt.moveMemberToObsoleteList(memberId); } if (log.isInfoEnabled()) { - String clusterId = memberFaultEvent.getClusterId(); log.info(String.format("Faulty member is added to obsolete list and removed from the active members list: " + "[member] %s [partition] %s [cluster] %s ", memberId, partitionId, clusterId)); } - StatusChecker.getInstance().onMemberFaultEvent(memberFaultEvent.getClusterId(), - partitionId, instanceId); + ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process( + ClusterStatusInActiveProcessor.class.getName(), clusterId, instanceId); } @Override @@ -782,6 +790,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { MemberActivatedEvent memberActivatedEvent) { String instanceId = memberActivatedEvent.getInstanceId(); + String clusterId = memberActivatedEvent.getClusterId(); String networkPartitionId = memberActivatedEvent.getNetworkPartitionId(); String partitionId = memberActivatedEvent.getPartitionId(); String memberId = memberActivatedEvent.getMemberId(); @@ -795,7 +804,8 @@ public class VMClusterMonitor extends AbstractClusterMonitor { + "[member] %s", memberId)); } clusterLevelPartitionContext.movePendingMemberToActiveMembers(memberId); - StatusChecker.getInstance().onMemberStatusChange(memberActivatedEvent.getClusterId()); + ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().process( + ClusterStatusActiveProcessor.class.getName(), clusterId, instanceId); } @Override @@ -809,7 +819,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { ClusterInstanceContext networkPartitionCtxt = getClusterInstanceContext(instanceId, networkPartitionId); ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionCtxt. - getPartitionCtxt(partitionId); + getPartitionCtxt(partitionId); clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); if (log.isDebugEnabled()) { log.debug(String.format("Member has been moved as pending termination: " @@ -855,7 +865,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { if (log.isInfoEnabled()) { log.info(String.format("Member is terminated and removed from the active members list: [member] %s " + - "[partition] %s [cluster] %s ", memberId, partitionId, clusterId)); + "[partition] %s [cluster] %s ", memberId, partitionId, clusterId)); } } catch (Exception e) { String msg = "Error processing event " + e.getLocalizedMessage(); @@ -863,7 +873,6 @@ public class VMClusterMonitor extends AbstractClusterMonitor { } } - @Override public void handleMemberTerminatedEvent( MemberTerminatedEvent memberTerminatedEvent) { @@ -964,7 +973,7 @@ public class VMClusterMonitor extends AbstractClusterMonitor { public void run() { for (ClusterLevelNetworkPartitionContext networkPartitionContext : getAllNetworkPartitionCtxts().values()) { - for(ClusterInstanceContext instanceContext : networkPartitionContext.getClusterInstanceContextMap().values()) { + for (ClusterInstanceContext instanceContext : networkPartitionContext.getClusterInstanceContextMap().values()) { for (ClusterLevelPartitionContext partitionContext : instanceContext.getPartitionCtxts()) { //if (log.isDebugEnabled()) { log.info("Starting to terminate all members in cluster [" + getClusterId() + "] Network Partition [ " + @@ -1004,28 +1013,19 @@ public class VMClusterMonitor extends AbstractClusterMonitor { } public Map<String, ClusterLevelNetworkPartitionContext> getAllNetworkPartitionCtxts() { - return ((VMClusterContext)this.clusterContext).getNetworkPartitionCtxts(); + return ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts(); } public ClusterInstanceContext getClusterInstanceContext(String networkPartitionId, String instanceId) { Map<String, ClusterLevelNetworkPartitionContext> clusterLevelNetworkPartitionContextMap = - ((VMClusterContext)this.clusterContext).getNetworkPartitionCtxts(); + ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts(); ClusterLevelNetworkPartitionContext networkPartitionContext = clusterLevelNetworkPartitionContextMap.get(networkPartitionId); return networkPartitionContext.getClusterInstanceContextMap().get(instanceId); } - private static void terminateMember(String memberId) { - try { - CloudControllerClient.getInstance().terminate(memberId); - - } catch (TerminationException e) { - log.error("Unable to terminate member [member id ] " + memberId, e); - } - } - public Collection<ClusterLevelNetworkPartitionContext> getNetworkPartitionCtxts() { - return ((VMClusterContext)this.clusterContext).getNetworkPartitionCtxts().values(); + return ((VMClusterContext) this.clusterContext).getNetworkPartitionCtxts().values(); } http://git-wip-us.apache.org/repos/asf/stratos/blob/3f45f636/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java index 0ea9462..136cb9f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java @@ -41,7 +41,6 @@ import org.apache.stratos.autoscaler.monitor.Monitor; import org.apache.stratos.autoscaler.monitor.MonitorFactory; import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder; -import org.apache.stratos.autoscaler.status.processor.StatusChecker; import org.apache.stratos.autoscaler.util.ServiceReferenceHolder; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.applications.ParentComponent; @@ -261,8 +260,8 @@ public abstract class ParentComponentMonitor extends Monitor { } boolean startDep; - if(!aliasToActiveMonitorsMap.containsKey(eventId) || !pendingMonitorsList.contains(eventId)) { - startDep = startDependency(eventId, instanceId); + if (!aliasToActiveMonitorsMap.containsKey(eventId) || !pendingMonitorsList.contains(eventId)) { + startDep = startDependency(eventId, instanceId); } else { startDep = startDependencyByInstanceCreation(eventId, instanceId); } @@ -289,7 +288,8 @@ public abstract class ParentComponentMonitor extends Monitor { terminationList = this.startupDependencyTree.getTerminationDependencies(eventId); //Need to notify the parent about the status change from Active-->InActive if (this.parent != null) { - StatusChecker.getInstance().onChildStatusChange(eventId, this.id, this.appId, instanceId); + ServiceReferenceHolder.getInstance().getGroupStatusProcessorChain(). + process(this.id, this.appId, instanceId); } //TODO checking whether terminating them in reverse order, // TODO if so can handle it in the parent event. @@ -387,7 +387,8 @@ public abstract class ParentComponentMonitor extends Monitor { log.error("Error while starting the monitor upon termination" + e); } } else { - StatusChecker.getInstance().onChildStatusChange(eventId, this.id, this.appId, instanceId); + ServiceReferenceHolder.getInstance().getGroupStatusProcessorChain(). + process(this.id, this.appId, instanceId); log.info("Checking the status of group/application as no dependent found..."); } http://git-wip-us.apache.org/repos/asf/stratos/blob/3f45f636/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/StatusChecker.java deleted file mode 100644 index 7d7ed31..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/StatusChecker.java +++ /dev/null @@ -1,520 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler.status.processor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.context.AutoscalerContext; -import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; -import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; -import org.apache.stratos.autoscaler.applications.ApplicationHolder; -import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder; -import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; -import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; -import org.apache.stratos.messaging.domain.applications.*; -import org.apache.stratos.messaging.domain.instance.ClusterInstance; -import org.apache.stratos.messaging.domain.instance.GroupInstance; -import org.apache.stratos.messaging.domain.topology.Cluster; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; -import org.apache.stratos.messaging.domain.topology.Service; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; - -import java.util.Map; - - -/** - * This will be used to evaluate the status of a group - * and notify the interested parties about the status changes. - */ -public class StatusChecker { - private static final Log log = LogFactory.getLog(StatusChecker.class); - - - private StatusChecker() { - - } - - public static StatusChecker getInstance() { - return Holder.INSTANCE; - } - - /** - * Calculating whether the cluster has all min instances as active and send the - * ClusterActivatedEvent. - * - * @param clusterId id of the cluster - */ - public void onMemberStatusChange(final String clusterId) { - Runnable group = new Runnable() { - public void run() { - VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId); - boolean clusterActive = false; - if (monitor != null) { - clusterActive = clusterActive(monitor); - - } - log.info("Status processor running for [cluster] " + clusterId + - " the status [clusterActive] " + clusterActive); - // if active then notify upper layer - if (clusterActive) { - //send event to cluster status topic - monitor.setHasFaultyMember(false); - if (log.isInfoEnabled()) { - log.info("Publishing Cluster activated event for [application]: " - + monitor.getAppId() + " [cluster]: " + clusterId); - } - ClusterStatusEventPublisher.sendClusterActivatedEvent(monitor.getAppId(), - monitor.getServiceId(), monitor.getClusterId()); - } - } - }; - Thread groupThread = new Thread(group); - groupThread.start(); - } - - /** - * This will calculate the status of the cluster upon a member termination. - * The possible states which cluster can change upon member termination are - * Active --> InActive, Terminating-->Terminated, Terminating-->Reset(Created) - * - * @param clusterId id of the cluster - */ - public void onMemberTermination(final String clusterId, final String instanceId) { - Runnable group = new Runnable() { - public void run() { - VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId); - boolean clusterMonitorHasMembers = clusterMonitorHasMembers(monitor); - boolean clusterActive = clusterActive(monitor); - - try { - TopologyManager.acquireReadLockForCluster(monitor.getServiceId(), monitor.getClusterId()); - Service service = TopologyManager.getTopology().getService(monitor.getServiceId()); - Cluster cluster; - String appId = monitor.getAppId(); - if (service != null) { - cluster = service.getCluster(monitor.getClusterId()); - if (cluster != null) { - try { - ApplicationHolder.acquireReadLock(); - Application application = ApplicationHolder.getApplications().getApplication(appId); - //if all members removed from the cluster and cluster is in terminating, - // either it has to be terminated or Reset - if (!clusterMonitorHasMembers && cluster.getStatus(null) == ClusterStatus.Terminating) { - if (application.getStatus(null) == ApplicationStatus.Terminating) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster terminated event for [application]: " + appId + - " [cluster]: " + clusterId); - } - ClusterStatusEventPublisher.sendClusterTerminatedEvent(appId, monitor.getServiceId(), - monitor.getClusterId(), instanceId); - } else { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster created event for [application]: " + appId + - " [cluster]: " + clusterId); - } - ClusterStatusEventPublisher.sendClusterResetEvent(appId, monitor.getServiceId(), - monitor.getClusterId(), instanceId); - } - - } else { - //if the cluster is not active and, if it is in Active state - if (!clusterActive && cluster.getStatus(null) == ClusterStatus.Active) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster in-activate event for [application]: " - + monitor.getAppId() + " [cluster]: " + clusterId); - } - ClusterStatusEventPublisher.sendClusterInActivateEvent(monitor.getAppId(), - monitor.getServiceId(), clusterId, instanceId); - } else { - log.info("Cluster has non terminated [members] and in the [status] " - + cluster.getStatus(null).toString()); - } - } - } finally { - ApplicationHolder.releaseReadLock(); - } - } - } - - - } finally { - TopologyManager.releaseReadLockForCluster(monitor.getServiceId(), monitor.getClusterId()); - - } - } - }; - Thread groupThread = new Thread(group); - groupThread.start(); - - } - - /** - * Calculate whether the cluster is active based on the minimum count available in each partition - * - * @param monitor Cluster monitor which has the member - * @return whether cluster is active or not - */ - private boolean clusterActive(VMClusterMonitor monitor) { - boolean clusterActive = false; - /*for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) { - //minimum check per partition - for (ClusterLevelPartitionContext clusterMonitorPartitionContext : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) { - if (clusterMonitorPartitionContext.getMinimumMemberCount() == clusterMonitorPartitionContext.getActiveMemberCount()) { - clusterActive = true; - } else if (clusterMonitorPartitionContext.getActiveMemberCount() > clusterMonitorPartitionContext.getMinimumMemberCount()) { - log.info("cluster already activated..."); - clusterActive = true; - } else { - return false; - } - } - }*/ - return clusterActive; - } - - /** - * Find out whether cluster monitor has any non terminated members - * - * @param monitor the cluster monitor - * @return whether has members or not - */ - private boolean clusterMonitorHasMembers(VMClusterMonitor monitor) { - boolean hasMember = false; - for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) { - //minimum check per partition - /*for (ClusterLevelPartitionContext partitionContext : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) { - if (partitionContext.getNonTerminatedMemberCount() > 0) { - hasMember = true; - } else { - hasMember = false; - } - }*/ - } - return hasMember; - } - - /** - * This will calculate the status of the cluster upon a member fault event - * - * @param clusterId id of the cluster - * @param partitionId is to decide in which partition has less members while others have active members - */ - public void onMemberFaultEvent(final String clusterId, final String partitionId, final String instanceId) { - Runnable group = new Runnable() { - public void run() { - VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId); - boolean clusterInActive = getClusterInactive(monitor, partitionId); - String appId = monitor.getAppId(); - if (clusterInActive) { - //if the monitor is dependent, temporarily pausing it - if (monitor.hasStartupDependents()) { - monitor.setHasFaultyMember(true); - } - if (log.isInfoEnabled()) { - log.info("Publishing Cluster in-activate event for [application]: " - + monitor.getAppId() + " [cluster]: " + clusterId); - } - //send cluster In-Active event to cluster status topic - ClusterStatusEventPublisher.sendClusterInActivateEvent(appId, - monitor.getServiceId(), clusterId, instanceId); - - } else { - boolean clusterActive = clusterActive(monitor); - if (clusterActive) { - if (log.isInfoEnabled()) { - log.info("Publishing Cluster active event for [application]: " - + monitor.getAppId() + " [cluster]: " + clusterId); - } - ClusterStatusEventPublisher.sendClusterActivatedEvent(appId, monitor.getServiceId(), clusterId); - } - } - - } - }; - Thread groupThread = new Thread(group); - groupThread.start(); - } - - /** - * This will calculate whether all the minimum of partition in a cluster satisfy in order - * to decide on the cluster status. - * - * @param monitor Cluster monitor of which the status needs to be calculated - * @param partitionId partition which got the faulty member - * @return whether cluster inActive or not - */ - private boolean getClusterInactive(VMClusterMonitor monitor, String partitionId) { - boolean clusterInActive = false; - for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) { - /*for (ClusterLevelPartitionContext partition : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) { - if (partitionId.equals(partition.getPartitionId()) && - partition.getActiveMemberCount() <= partition.getMinimumMemberCount()) { - clusterInActive = true; - return clusterInActive; - } - }*/ - - } - return clusterInActive; - } - - /** - * This will use to calculate whether all children of a particular component is active by traversing Top - * - * @param appId application id - * @param idOfComponent id of the component to which calculate the status - * @param idOfChild children of the component as groups - */ - public void onChildStatusChange(String idOfChild, String idOfComponent, String appId, String instanceId) { - ParentComponent component; - Map<String, Group> groups; - Map<String, ClusterDataHolder> clusterData; - - if (log.isInfoEnabled()) { - log.info("StatusChecker calculating the status for the group [ " + idOfChild + " ]"); - } - - try { - ApplicationHolder.acquireWriteLock(); - if (idOfComponent.equals(appId)) { - //it is an application - component = ApplicationHolder.getApplications(). - getApplication(appId); - } else { - //it is a group - component = ApplicationHolder.getApplications(). - getApplication(appId).getGroupRecursively(idOfComponent); - } - groups = component.getAliasToGroupMap(); - clusterData = component.getClusterDataMap(); - - if(component.isGroupScalingEnabled()) { - //TODO - handleStateWithGroupScalingEnabled(); - } else { - handleStateChangeGroupScalingDisabled(component, appId, instanceId, groups, clusterData); - } - } finally { - ApplicationHolder.releaseWriteLock(); - - } - - } - - private void handleStateWithGroupScalingEnabled() { - - } - - private void handleStateChangeGroupScalingDisabled(ParentComponent component, String appId, - String instanceId, - Map<String, Group> groups, - Map<String, ClusterDataHolder> clusterData) { - if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Active, instanceId) || - clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Active, instanceId) || - getAllClusterInSameState(clusterData, ClusterStatus.Active, instanceId) && - getAllGroupInSameState(groups, GroupStatus.Active, instanceId)) { - //send activation event - if (component instanceof Application) { - //send application activated event - if (((Application) component).getStatus(null) != ApplicationStatus.Active) { - log.info("sending app activate: " + appId); - ApplicationBuilder.handleApplicationActivatedEvent(appId, instanceId); - } - } else if (component instanceof Group) { - //send activation to the parent - if (((Group) component).getStatus(null) != GroupStatus.Active) { - log.info("sending group activate: " + component.getUniqueIdentifier()); - ApplicationBuilder.handleGroupActivatedEvent(appId, component.getUniqueIdentifier(), instanceId); - } - } - } else if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Terminated, instanceId) || - clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Terminated, instanceId) || - getAllClusterInSameState(clusterData, ClusterStatus.Terminated, instanceId) && - getAllGroupInSameState(groups, GroupStatus.Terminated, instanceId)) { - //send the terminated event - if (component instanceof Application) { - log.info("sending app terminated: " + appId); - ApplicationBuilder.handleApplicationTerminatedEvent(appId, null); - } else if (component instanceof Group) { - //send activation to the parent - if (((Group) component).getStatus(null) != GroupStatus.Terminated) { - log.info("sending group terminated : " + component.getUniqueIdentifier()); - ApplicationBuilder.handleGroupTerminatedEvent(appId, component.getUniqueIdentifier(), instanceId); - } - } - } else if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Created, instanceId) || - clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Created, instanceId) || - getAllClusterInSameState(clusterData, ClusterStatus.Created, instanceId) && - getAllGroupInSameState(groups, GroupStatus.Created, instanceId)) { - if (component instanceof Application) { - log.info("[Application] " + appId + "couldn't change to Created, since it is" + - "already in " + ((Application) component).getStatus(null).toString()); - } else if (component instanceof Group) { - //send activation to the parent - if (((Group) component).getStatus(null) != GroupStatus.Created) { - log.info("sending group created : " + component.getUniqueIdentifier()); - ApplicationBuilder.handleGroupCreatedEvent(appId, component.getUniqueIdentifier(), instanceId); - } - } - } else if (groups.isEmpty() && getAllClusterInactive(clusterData) || - clusterData.isEmpty() && getAllGroupInActive(groups) || - getAllClusterInactive(clusterData) || getAllGroupInActive(groups)) { - //send the in activation event - if (component instanceof Application) { - //send application activated event - log.warn("Application can't be in in-active : " + appId); - //StatusEventPublisher.sendApplicationInactivatedEvent(appId); - } else if (component instanceof Group) { - //send activation to the parent - if (((Group) component).getStatus(null) != GroupStatus.Inactive) { - log.info("sending group in-active: " + component.getUniqueIdentifier()); - ApplicationBuilder.handleGroupInActivateEvent(appId, component.getUniqueIdentifier(), instanceId); - } - } - } else { - if (component instanceof Application) { - //send application activated event - log.warn("Application can't be in in-active : " + appId); - //StatusEventPublisher.sendApplicationInactivatedEvent(appId); - } else if (component instanceof Group) { - //send activation to the parent - if (((Group) component).getStatus(null) != GroupStatus.Inactive) { - log.info("sending group in-active: " + component.getUniqueIdentifier()); - ApplicationBuilder.handleGroupInActivateEvent(appId, component.getUniqueIdentifier(), "test*****"); - } - } - } - } - - private boolean getAllInstancesOfGroupActive(Group group) { - int activeGroupInstances = 0; - for(GroupInstance context : group.getInstanceIdToInstanceContextMap().values()) { - if(context.getStatus() == GroupStatus.Active) { - activeGroupInstances++; - } - } - - return false; - } - - /** - * Find out whether all the any group is inActive - * - * @param groups groups of a group/application - * @return whether inActive or not - */ - private boolean getAllGroupInActive(Map<String, Group> groups) { - boolean groupStat = false; - for (Group group : groups.values()) { - if (group.getStatus(null) == GroupStatus.Inactive) { - groupStat = true; - return groupStat; - } else { - groupStat = false; - } - } - return groupStat; - } - - /** - * Find out whether all the groups of a group in the same state or not - * - * @param groups groups of a group/application - * @param status the state to check in all groups - * @return whether groups in the given state or not - */ - private boolean getAllGroupInSameState(Map<String, Group> groups, GroupStatus status, String instanceId) { - boolean groupStat = false; - for (Group group : groups.values()) { - GroupInstance context = group.getInstanceContexts(instanceId); - if(context != null) { - if(context.getStatus() == status) { - groupStat = true; - } else { - groupStat = false; - return groupStat; - } - } else { - groupStat = false; - return groupStat; - } - } - return groupStat; - } - - - /** - * Find out whether any of the clusters of a group in the InActive state - * - * @param clusterData clusters of the group - * @return whether inActive or not - */ - private boolean getAllClusterInactive(Map<String, ClusterDataHolder> clusterData) { - boolean clusterStat = false; - for (Map.Entry<String, ClusterDataHolder> clusterDataHolderEntry : clusterData.entrySet()) { - Service service = TopologyManager.getTopology().getService(clusterDataHolderEntry.getValue().getServiceType()); - Cluster cluster = service.getCluster(clusterDataHolderEntry.getValue().getClusterId()); - if (cluster.getStatus(null) == ClusterStatus.Inactive) { - clusterStat = true; - return clusterStat; - } else { - clusterStat = false; - - } - } - return clusterStat; - } - - /** - * Find out whether all the clusters of a group are in the same state - * - * @param clusterData clusters of the group - * @param status the status to check of the group - * @return whether all groups in the same state or not - */ - private boolean getAllClusterInSameState(Map<String, ClusterDataHolder> clusterData, - ClusterStatus status, String instanceId) { - boolean clusterStat = false; - for (Map.Entry<String, ClusterDataHolder> clusterDataHolderEntry : clusterData.entrySet()) { - String serviceName = clusterDataHolderEntry.getValue().getServiceType(); - String clusterId = clusterDataHolderEntry.getValue().getClusterId(); - TopologyManager.acquireReadLockForCluster(serviceName, clusterId); - try { - Service service = TopologyManager.getTopology().getService(serviceName); - Cluster cluster = service.getCluster(clusterId); - ClusterInstance context = cluster.getInstanceContexts(instanceId); - if (context.getStatus() == status) { - clusterStat = true; - } else { - clusterStat = false; - return clusterStat; - } - } finally { - TopologyManager.releaseReadLockForCluster(serviceName, clusterId); - } - - } - return clusterStat; - } - - private static class Holder { - private static final StatusChecker INSTANCE = new StatusChecker(); - } - -} http://git-wip-us.apache.org/repos/asf/stratos/blob/3f45f636/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/StatusChecker.java new file mode 100644 index 0000000..d0a993e --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/StatusChecker.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.context.AutoscalerContext; +import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext; +import org.apache.stratos.autoscaler.applications.ApplicationHolder; +import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder; +import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; +import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; +import org.apache.stratos.messaging.domain.applications.*; +import org.apache.stratos.messaging.domain.instance.ClusterInstance; +import org.apache.stratos.messaging.domain.instance.GroupInstance; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; + +import java.util.Map; + + +/** + * This will be used to evaluate the status of a group + * and notify the interested parties about the status changes. + */ +public class StatusChecker { + private static final Log log = LogFactory.getLog(StatusChecker.class); + + + private StatusChecker() { + + } + + public static StatusChecker getInstance() { + return Holder.INSTANCE; + } + + /** + * Calculating whether the cluster has all min instances as active and send the + * ClusterActivatedEvent. + * + * @param clusterId id of the cluster + */ + public void onMemberStatusChange(final String clusterId) { + Runnable group = new Runnable() { + public void run() { + VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId); + boolean clusterActive = false; + if (monitor != null) { + clusterActive = clusterActive(monitor); + + } + log.info("Status processor running for [cluster] " + clusterId + + " the status [clusterActive] " + clusterActive); + // if active then notify upper layer + if (clusterActive) { + //send event to cluster status topic + monitor.setHasFaultyMember(false); + if (log.isInfoEnabled()) { + log.info("Publishing Cluster activated event for [application]: " + + monitor.getAppId() + " [cluster]: " + clusterId); + } + ClusterStatusEventPublisher.sendClusterActivatedEvent(monitor.getAppId(), + monitor.getServiceId(), monitor.getClusterId()); + } + } + }; + Thread groupThread = new Thread(group); + groupThread.start(); + } + + /** + * This will calculate the status of the cluster upon a member termination. + * The possible states which cluster can change upon member termination are + * Active --> InActive, Terminating-->Terminated, Terminating-->Reset(Created) + * + * @param clusterId id of the cluster + */ + public void onMemberTermination(final String clusterId, final String instanceId) { + Runnable group = new Runnable() { + public void run() { + VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId); + boolean clusterMonitorHasMembers = clusterMonitorHasMembers(monitor); + boolean clusterActive = clusterActive(monitor); + + try { + TopologyManager.acquireReadLockForCluster(monitor.getServiceId(), monitor.getClusterId()); + Service service = TopologyManager.getTopology().getService(monitor.getServiceId()); + Cluster cluster; + String appId = monitor.getAppId(); + if (service != null) { + cluster = service.getCluster(monitor.getClusterId()); + if (cluster != null) { + try { + ApplicationHolder.acquireReadLock(); + Application application = ApplicationHolder.getApplications().getApplication(appId); + //if all members removed from the cluster and cluster is in terminating, + // either it has to be terminated or Reset + if (!clusterMonitorHasMembers && cluster.getStatus(null) == ClusterStatus.Terminating) { + if (application.getStatus(null) == ApplicationStatus.Terminating) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster terminated event for [application]: " + appId + + " [cluster]: " + clusterId); + } + ClusterStatusEventPublisher.sendClusterTerminatedEvent(appId, monitor.getServiceId(), + monitor.getClusterId(), instanceId); + } else { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster created event for [application]: " + appId + + " [cluster]: " + clusterId); + } + ClusterStatusEventPublisher.sendClusterResetEvent(appId, monitor.getServiceId(), + monitor.getClusterId(), instanceId); + } + + } else { + //if the cluster is not active and, if it is in Active state + if (!clusterActive && cluster.getStatus(null) == ClusterStatus.Active) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster in-activate event for [application]: " + + monitor.getAppId() + " [cluster]: " + clusterId); + } + ClusterStatusEventPublisher.sendClusterInActivateEvent(monitor.getAppId(), + monitor.getServiceId(), clusterId, instanceId); + } else { + log.info("Cluster has non terminated [members] and in the [status] " + + cluster.getStatus(null).toString()); + } + } + } finally { + ApplicationHolder.releaseReadLock(); + } + } + } + + + } finally { + TopologyManager.releaseReadLockForCluster(monitor.getServiceId(), monitor.getClusterId()); + + } + } + }; + Thread groupThread = new Thread(group); + groupThread.start(); + + } + + /** + * Calculate whether the cluster is active based on the minimum count available in each partition + * + * @param monitor Cluster monitor which has the member + * @return whether cluster is active or not + */ + private boolean clusterActive(VMClusterMonitor monitor) { + boolean clusterActive = false; + /*for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) { + //minimum check per partition + for (ClusterLevelPartitionContext clusterMonitorPartitionContext : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) { + if (clusterMonitorPartitionContext.getMinimumMemberCount() == clusterMonitorPartitionContext.getActiveMemberCount()) { + clusterActive = true; + } else if (clusterMonitorPartitionContext.getActiveMemberCount() > clusterMonitorPartitionContext.getMinimumMemberCount()) { + log.info("cluster already activated..."); + clusterActive = true; + } else { + return false; + } + } + }*/ + return clusterActive; + } + + /** + * Find out whether cluster monitor has any non terminated members + * + * @param monitor the cluster monitor + * @return whether has members or not + */ + private boolean clusterMonitorHasMembers(VMClusterMonitor monitor) { + boolean hasMember = false; + for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) { + //minimum check per partition + /*for (ClusterLevelPartitionContext partitionContext : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) { + if (partitionContext.getNonTerminatedMemberCount() > 0) { + hasMember = true; + } else { + hasMember = false; + } + }*/ + } + return hasMember; + } + + /** + * This will calculate the status of the cluster upon a member fault event + * + * @param clusterId id of the cluster + * @param partitionId is to decide in which partition has less members while others have active members + */ + public void onMemberFaultEvent(final String clusterId, final String partitionId, final String instanceId) { + Runnable group = new Runnable() { + public void run() { + VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId); + boolean clusterInActive = getClusterInactive(monitor, partitionId); + String appId = monitor.getAppId(); + if (clusterInActive) { + //if the monitor is dependent, temporarily pausing it + if (monitor.hasStartupDependents()) { + monitor.setHasFaultyMember(true); + } + if (log.isInfoEnabled()) { + log.info("Publishing Cluster in-activate event for [application]: " + + monitor.getAppId() + " [cluster]: " + clusterId); + } + //send cluster In-Active event to cluster status topic + ClusterStatusEventPublisher.sendClusterInActivateEvent(appId, + monitor.getServiceId(), clusterId, instanceId); + + } else { + boolean clusterActive = clusterActive(monitor); + if (clusterActive) { + if (log.isInfoEnabled()) { + log.info("Publishing Cluster active event for [application]: " + + monitor.getAppId() + " [cluster]: " + clusterId); + } + ClusterStatusEventPublisher.sendClusterActivatedEvent(appId, monitor.getServiceId(), clusterId); + } + } + + } + }; + Thread groupThread = new Thread(group); + groupThread.start(); + } + + /** + * This will calculate whether all the minimum of partition in a cluster satisfy in order + * to decide on the cluster status. + * + * @param monitor Cluster monitor of which the status needs to be calculated + * @param partitionId partition which got the faulty member + * @return whether cluster inActive or not + */ + private boolean getClusterInactive(VMClusterMonitor monitor, String partitionId) { + boolean clusterInActive = false; + for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) { + /*for (ClusterLevelPartitionContext partition : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) { + if (partitionId.equals(partition.getPartitionId()) && + partition.getActiveMemberCount() <= partition.getMinimumMemberCount()) { + clusterInActive = true; + return clusterInActive; + } + }*/ + + } + return clusterInActive; + } + + /** + * This will use to calculate whether all children of a particular component is active by traversing Top + * + * @param appId application id + * @param idOfComponent id of the component to which calculate the status + * @param idOfChild children of the component as groups + */ + public void onChildStatusChange(String idOfChild, String idOfComponent, String appId, String instanceId) { + ParentComponent component; + Map<String, Group> groups; + Map<String, ClusterDataHolder> clusterData; + + if (log.isInfoEnabled()) { + log.info("StatusChecker calculating the status for the group [ " + idOfChild + " ]"); + } + + try { + ApplicationHolder.acquireWriteLock(); + if (idOfComponent.equals(appId)) { + //it is an application + component = ApplicationHolder.getApplications(). + getApplication(appId); + } else { + //it is a group + component = ApplicationHolder.getApplications(). + getApplication(appId).getGroupRecursively(idOfComponent); + } + groups = component.getAliasToGroupMap(); + clusterData = component.getClusterDataMap(); + + if(component.isGroupScalingEnabled()) { + //TODO + handleStateWithGroupScalingEnabled(); + } else { + handleStateChangeGroupScalingDisabled(component, appId, instanceId, groups, clusterData); + } + } finally { + ApplicationHolder.releaseWriteLock(); + + } + + } + + private void handleStateWithGroupScalingEnabled() { + + } + + private void handleStateChangeGroupScalingDisabled(ParentComponent component, String appId, + String instanceId, + Map<String, Group> groups, + Map<String, ClusterDataHolder> clusterData) { + if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Active, instanceId) || + clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Active, instanceId) || + getAllClusterInSameState(clusterData, ClusterStatus.Active, instanceId) && + getAllGroupInSameState(groups, GroupStatus.Active, instanceId)) { + //send activation event + if (component instanceof Application) { + //send application activated event + if (((Application) component).getStatus(null) != ApplicationStatus.Active) { + log.info("sending app activate: " + appId); + ApplicationBuilder.handleApplicationActivatedEvent(appId, instanceId); + } + } else if (component instanceof Group) { + //send activation to the parent + if (((Group) component).getStatus(null) != GroupStatus.Active) { + log.info("sending group activate: " + component.getUniqueIdentifier()); + ApplicationBuilder.handleGroupActivatedEvent(appId, component.getUniqueIdentifier(), instanceId); + } + } + } else if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Terminated, instanceId) || + clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Terminated, instanceId) || + getAllClusterInSameState(clusterData, ClusterStatus.Terminated, instanceId) && + getAllGroupInSameState(groups, GroupStatus.Terminated, instanceId)) { + //send the terminated event + if (component instanceof Application) { + log.info("sending app terminated: " + appId); + ApplicationBuilder.handleApplicationTerminatedEvent(appId, null); + } else if (component instanceof Group) { + //send activation to the parent + if (((Group) component).getStatus(null) != GroupStatus.Terminated) { + log.info("sending group terminated : " + component.getUniqueIdentifier()); + ApplicationBuilder.handleGroupTerminatedEvent(appId, component.getUniqueIdentifier(), instanceId); + } + } + } else if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Created, instanceId) || + clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Created, instanceId) || + getAllClusterInSameState(clusterData, ClusterStatus.Created, instanceId) && + getAllGroupInSameState(groups, GroupStatus.Created, instanceId)) { + if (component instanceof Application) { + log.info("[Application] " + appId + "couldn't change to Created, since it is" + + "already in " + ((Application) component).getStatus(null).toString()); + } else if (component instanceof Group) { + //send activation to the parent + if (((Group) component).getStatus(null) != GroupStatus.Created) { + log.info("sending group created : " + component.getUniqueIdentifier()); + ApplicationBuilder.handleGroupCreatedEvent(appId, component.getUniqueIdentifier(), instanceId); + } + } + } else if (groups.isEmpty() && getAllClusterInactive(clusterData) || + clusterData.isEmpty() && getAllGroupInActive(groups) || + getAllClusterInactive(clusterData) || getAllGroupInActive(groups)) { + //send the in activation event + if (component instanceof Application) { + //send application activated event + log.warn("Application can't be in in-active : " + appId); + //StatusEventPublisher.sendApplicationInactivatedEvent(appId); + } else if (component instanceof Group) { + //send activation to the parent + if (((Group) component).getStatus(null) != GroupStatus.Inactive) { + log.info("sending group in-active: " + component.getUniqueIdentifier()); + ApplicationBuilder.handleGroupInActivateEvent(appId, component.getUniqueIdentifier(), instanceId); + } + } + } else { + if (component instanceof Application) { + //send application activated event + log.warn("Application can't be in in-active : " + appId); + //StatusEventPublisher.sendApplicationInactivatedEvent(appId); + } else if (component instanceof Group) { + //send activation to the parent + if (((Group) component).getStatus(null) != GroupStatus.Inactive) { + log.info("sending group in-active: " + component.getUniqueIdentifier()); + ApplicationBuilder.handleGroupInActivateEvent(appId, component.getUniqueIdentifier(), "test*****"); + } + } + } + } + + private boolean getAllInstancesOfGroupActive(Group group) { + int activeGroupInstances = 0; + for(GroupInstance context : group.getInstanceIdToInstanceContextMap().values()) { + if(context.getStatus() == GroupStatus.Active) { + activeGroupInstances++; + } + } + + return false; + } + + /** + * Find out whether all the any group is inActive + * + * @param groups groups of a group/application + * @return whether inActive or not + */ + private boolean getAllGroupInActive(Map<String, Group> groups) { + boolean groupStat = false; + for (Group group : groups.values()) { + if (group.getStatus(null) == GroupStatus.Inactive) { + groupStat = true; + return groupStat; + } else { + groupStat = false; + } + } + return groupStat; + } + + /** + * Find out whether all the groups of a group in the same state or not + * + * @param groups groups of a group/application + * @param status the state to check in all groups + * @return whether groups in the given state or not + */ + private boolean getAllGroupInSameState(Map<String, Group> groups, GroupStatus status, String instanceId) { + boolean groupStat = false; + for (Group group : groups.values()) { + GroupInstance context = group.getInstanceContexts(instanceId); + if(context != null) { + if(context.getStatus() == status) { + groupStat = true; + } else { + groupStat = false; + return groupStat; + } + } else { + groupStat = false; + return groupStat; + } + } + return groupStat; + } + + + /** + * Find out whether any of the clusters of a group in the InActive state + * + * @param clusterData clusters of the group + * @return whether inActive or not + */ + private boolean getAllClusterInactive(Map<String, ClusterDataHolder> clusterData) { + boolean clusterStat = false; + for (Map.Entry<String, ClusterDataHolder> clusterDataHolderEntry : clusterData.entrySet()) { + Service service = TopologyManager.getTopology().getService(clusterDataHolderEntry.getValue().getServiceType()); + Cluster cluster = service.getCluster(clusterDataHolderEntry.getValue().getClusterId()); + if (cluster.getStatus(null) == ClusterStatus.Inactive) { + clusterStat = true; + return clusterStat; + } else { + clusterStat = false; + + } + } + return clusterStat; + } + + /** + * Find out whether all the clusters of a group are in the same state + * + * @param clusterData clusters of the group + * @param status the status to check of the group + * @return whether all groups in the same state or not + */ + private boolean getAllClusterInSameState(Map<String, ClusterDataHolder> clusterData, + ClusterStatus status, String instanceId) { + boolean clusterStat = false; + for (Map.Entry<String, ClusterDataHolder> clusterDataHolderEntry : clusterData.entrySet()) { + String serviceName = clusterDataHolderEntry.getValue().getServiceType(); + String clusterId = clusterDataHolderEntry.getValue().getClusterId(); + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + try { + Service service = TopologyManager.getTopology().getService(serviceName); + Cluster cluster = service.getCluster(clusterId); + ClusterInstance context = cluster.getInstanceContexts(instanceId); + if (context.getStatus() == status) { + clusterStat = true; + } else { + clusterStat = false; + return clusterStat; + } + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); + } + + } + return clusterStat; + } + + private static class Holder { + private static final StatusChecker INSTANCE = new StatusChecker(); + } + +} http://git-wip-us.apache.org/repos/asf/stratos/blob/3f45f636/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/client/AutoscalerServiceClient.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/client/AutoscalerServiceClient.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/client/AutoscalerServiceClient.java index c41b5fc..8687a4b 100644 --- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/client/AutoscalerServiceClient.java +++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/client/AutoscalerServiceClient.java @@ -170,6 +170,10 @@ public class AutoscalerServiceClient { return deploymentPolicies; } + public void unDeployDeploymentPolicy(String applicationId) throws RemoteException { + stub.undeployDeploymentPolicy(applicationId); + } + // public void checkLBExistenceAgainstPolicy(String clusterId, String deploymentPolicyId) throws RemoteException, // AutoScalerServiceNonExistingLBExceptionException { // stub.checkLBExistenceAgainstPolicy(clusterId, deploymentPolicyId); http://git-wip-us.apache.org/repos/asf/stratos/blob/3f45f636/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV41Utils.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV41Utils.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV41Utils.java index 9ac723d..02d23bf 100644 --- a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV41Utils.java +++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/api/StratosApiV41Utils.java @@ -23,20 +23,14 @@ import org.apache.axis2.context.ConfigurationContext; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.stub.Properties; import org.apache.stratos.autoscaler.stub.pojo.ApplicationContext; import org.apache.stratos.autoscaler.stub.*; import org.apache.stratos.autoscaler.stub.deployment.policy.DeploymentPolicy; import org.apache.stratos.autoscaler.stub.exception.InvalidKubernetesGroupException; +import org.apache.stratos.cloud.controller.stub.*; import org.apache.stratos.cloud.controller.stub.domain.CartridgeConfig; import org.apache.stratos.cloud.controller.stub.domain.CartridgeInfo; -import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidCartridgeTypeExceptionException; -import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidKubernetesGroupExceptionException; -import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidKubernetesHostExceptionException; -import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidKubernetesMasterExceptionException; -import org.apache.stratos.cloud.controller.stub.CloudControllerServiceNonExistingKubernetesGroupExceptionException; -import org.apache.stratos.cloud.controller.stub.CloudControllerServiceNonExistingKubernetesHostExceptionException; -import org.apache.stratos.cloud.controller.stub.CloudControllerServiceNonExistingKubernetesMasterExceptionException; -import org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException; import org.apache.stratos.common.Property; import org.apache.stratos.manager.client.AutoscalerServiceClient; import org.apache.stratos.manager.client.CloudControllerServiceClient; @@ -244,8 +238,14 @@ public class StratosApiV41Utils { } public static void undeployDeploymentPolicy(String applicationId) throws RestAPIException { - CloudControllerServiceClient cloudControllerServiceClient = getCloudControllerServiceClient(); - if (cloudControllerServiceClient != null) { + AutoscalerServiceClient autoscalerServiceClient = getAutoscalerServiceClient(); + if (autoscalerServiceClient != null) { + try { + autoscalerServiceClient.unDeployDeploymentPolicy(applicationId); + } catch (RemoteException e) { + log.error("Error while unDeploying the Deployment Policy for " + applicationId); + throw new RestAPIException(e); + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/3f45f636/service-stubs/org.apache.stratos.autoscaler.service.stub/src/main/resources/AutoScalerService.wsdl ---------------------------------------------------------------------- diff --git a/service-stubs/org.apache.stratos.autoscaler.service.stub/src/main/resources/AutoScalerService.wsdl b/service-stubs/org.apache.stratos.autoscaler.service.stub/src/main/resources/AutoScalerService.wsdl index 7ce19ef..c0fca11 100644 --- a/service-stubs/org.apache.stratos.autoscaler.service.stub/src/main/resources/AutoScalerService.wsdl +++ b/service-stubs/org.apache.stratos.autoscaler.service.stub/src/main/resources/AutoScalerService.wsdl @@ -2100,4 +2100,4 @@ <http:address location="https://10.100.1.142:9443/services/AutoScalerService.AutoScalerServiceHttpsEndpoint/"></http:address> </wsdl:port> </wsdl:service> -</wsdl:definitions> \ No newline at end of file +</wsdl:definitions>
