Rename PartitionContext.java to CLusterLevelPartitionContext.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a19c5a80 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a19c5a80 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a19c5a80 Branch: refs/heads/master Commit: a19c5a80ad9ef8a6f714139db2968fd24102ca68 Parents: 8683ac9 Author: Lahiru Sandaruwan <[email protected]> Authored: Thu Nov 27 15:27:03 2014 +0530 Committer: Lahiru Sandaruwan <[email protected]> Committed: Thu Nov 27 15:27:03 2014 +0530 ---------------------------------------------------------------------- .../autoscaler/ClusterContextFactory.java | 37 +- .../ClusterLevelPartitionContext.java | 722 +++++++++++++++++++ .../autoscaler/NetworkPartitionContext.java | 12 +- .../stratos/autoscaler/PartitionContext.java | 722 ------------------- .../monitor/cluster/VMClusterMonitor.java | 42 +- .../monitor/cluster/VMLbClusterMonitor.java | 5 +- .../cluster/VMServiceClusterMonitor.java | 3 +- .../rule/AutoscalerRuleEvaluator.java | 8 +- .../autoscaler/rule/RuleTasksDelegator.java | 42 +- .../status/processor/StatusChecker.java | 14 +- .../cluster/ClusterStatusActiveProcessor.java | 8 +- .../cluster/ClusterStatusInActiveProcessor.java | 4 +- .../ClusterStatusTerminatedProcessor.java | 6 +- .../stratos/autoscaler/TestMinimumRule.java | 2 +- .../autoscaler/TestObsoletedMemberRule.java | 6 +- .../src/test/resources/autoscaler.drl | 4 +- .../resources/test-minimum-autoscaler-rule.drl | 4 +- .../test-terminating-obsoleted-members-rule.drl | 4 +- .../src/main/conf/drools/dependent-scaling.drl | 2 +- .../src/main/conf/drools/mincheck.drl | 4 +- .../src/main/conf/drools/obsoletecheck.drl | 6 +- .../src/main/conf/drools/scaling.drl | 8 +- .../src/main/conf/drools/terminateall.drl | 4 +- .../main/conf/drools/terminatedependency.drl | 4 +- 24 files changed, 833 insertions(+), 840 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/a19c5a80/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContextFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContextFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContextFactory.java index 317e7a2..4e17b2a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContextFactory.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContextFactory.java @@ -37,7 +37,6 @@ import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.MemberStatus; -import org.apache.stratos.messaging.util.*; import org.apache.stratos.messaging.util.Constants; import java.util.HashMap; @@ -96,10 +95,10 @@ public class ClusterContextFactory { partitionGroup.getPartitions()); for (Partition partition : partitionGroup.getPartitions()) { - PartitionContext partitionContext = new PartitionContext(partition); - partitionContext.setServiceName(cluster.getServiceName()); - partitionContext.setProperties(cluster.getProperties()); - partitionContext.setNetworkPartitionId(partitionGroup.getId()); + ClusterLevelPartitionContext clusterMonitorPartitionContext = new ClusterLevelPartitionContext(partition); + clusterMonitorPartitionContext.setServiceName(cluster.getServiceName()); + clusterMonitorPartitionContext.setProperties(cluster.getProperties()); + clusterMonitorPartitionContext.setNetworkPartitionId(partitionGroup.getId()); for (Member member : cluster.getMembers()) { String memberId = member.getMemberId(); @@ -116,7 +115,7 @@ public class ClusterContextFactory { String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); log.debug(msg); } - partitionContext.addActiveMember(memberContext); + clusterMonitorPartitionContext.addActiveMember(memberContext); // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); // partitionContext.incrementCurrentActiveMemberCount(1); @@ -125,23 +124,23 @@ public class ClusterContextFactory { String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); log.debug(msg); } - partitionContext.addPendingMember(memberContext); + clusterMonitorPartitionContext.addPendingMember(memberContext); // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); } else if (MemberStatus.Suspended.equals(member.getStatus())) { // partitionContext.addFaultyMember(memberId); } - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); if (log.isInfoEnabled()) { log.info(String.format("Member stat context has been added: [member] %s", memberId)); } } } - networkPartitionContext.addPartitionContext(partitionContext); + networkPartitionContext.addPartitionContext(clusterMonitorPartitionContext); if (log.isInfoEnabled()) { log.info(String.format("Partition context has been added: [partition] %s", - partitionContext.getPartitionId())); + clusterMonitorPartitionContext.getPartitionId())); } } @@ -201,11 +200,11 @@ public class ClusterContextFactory { // FIXME pick a random partition Partition partition = partitionGroup.getPartitions()[new Random().nextInt(partitionGroup.getPartitions().length)]; - PartitionContext partitionContext = new PartitionContext(partition); - partitionContext.setServiceName(cluster.getServiceName()); - partitionContext.setProperties(cluster.getProperties()); - partitionContext.setNetworkPartitionId(networkPartitionId); - partitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions + ClusterLevelPartitionContext clusterMonitorPartitionContext = new ClusterLevelPartitionContext(partition); + clusterMonitorPartitionContext.setServiceName(cluster.getServiceName()); + clusterMonitorPartitionContext.setProperties(cluster.getProperties()); + clusterMonitorPartitionContext.setNetworkPartitionId(networkPartitionId); + clusterMonitorPartitionContext.setMinimumMemberCount(1);//Here it hard codes the minimum value as one for LB cartridge partitions NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(networkPartitionId, partitionGroup.getPartitionAlgo(), @@ -224,7 +223,7 @@ public class ClusterContextFactory { String msg = String.format("Active member loaded from topology and added to active member list, %s", member.toString()); log.debug(msg); } - partitionContext.addActiveMember(memberContext); + clusterMonitorPartitionContext.addActiveMember(memberContext); // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); // partitionContext.incrementCurrentActiveMemberCount(1); } else if (MemberStatus.Created.equals(member.getStatus()) || @@ -233,20 +232,20 @@ public class ClusterContextFactory { String msg = String.format("Pending member loaded from topology and added to pending member list, %s", member.toString()); log.debug(msg); } - partitionContext.addPendingMember(memberContext); + clusterMonitorPartitionContext.addPendingMember(memberContext); // networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1); } else if (MemberStatus.Suspended.equals(member.getStatus())) { // partitionContext.addFaultyMember(memberId); } - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); if (log.isInfoEnabled()) { log.info(String.format("Member stat context has been added: [member] %s", memberId)); } } } - networkPartitionContext.addPartitionContext(partitionContext); + networkPartitionContext.addPartitionContext(clusterMonitorPartitionContext); // populate lb cluster id in network partition context. java.util.Properties props = cluster.getProperties(); http://git-wip-us.apache.org/repos/asf/stratos/blob/a19c5a80/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterLevelPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterLevelPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterLevelPartitionContext.java new file mode 100644 index 0000000..01f0b8e --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterLevelPartitionContext.java @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; + +import java.io.Serializable; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.stratos.common.constants.StratosConstants; + +/** + * This is an object that inserted to the rules engine. + * Holds information about a partition. + * + * + */ + +public class ClusterLevelPartitionContext implements Serializable{ + + private static final long serialVersionUID = -2920388667345980487L; + private static final Log log = LogFactory.getLog(ClusterLevelPartitionContext.class); + private String partitionId; + private String serviceName; + private String networkPartitionId; + private Partition partition; + private int minimumMemberCount = 0; + private int pendingMembersFailureCount = 0; + private final int PENDING_MEMBER_FAILURE_THRESHOLD = 5; + + // properties + private Properties properties; + + // 15 mints as the default + private long pendingMemberExpiryTime = 900000; + // pending members + private List<MemberContext> pendingMembers; + + // 1 day as default + private long obsoltedMemberExpiryTime = 1*24*60*60*1000; + + // 30 mints as default + private long terminationPendingMemberExpiryTime = 1800000; + + // members to be terminated + private Map<String, MemberContext> obsoletedMembers; + + // active members + private List<MemberContext> activeMembers; + + // termination pending members, member is added to this when Autoscaler send grace fully shut down event + private List<MemberContext> terminationPendingMembers; + + //member id: time that member is moved to termination pending status + private Map<String, Long> terminationPendingStartedTime; + + //Keep statistics come from CEP + private Map<String, MemberStatsContext> memberStatsContexts; + + // for the use of tests + public ClusterLevelPartitionContext(long memberExpiryTime) { + + this.activeMembers = new ArrayList<MemberContext>(); + this.terminationPendingMembers = new ArrayList<MemberContext>(); + pendingMemberExpiryTime = memberExpiryTime; + } + + public ClusterLevelPartitionContext(Partition partition) { + this.setPartition(partition); + this.minimumMemberCount = partition.getPartitionMin(); + this.partitionId = partition.getId(); + this.pendingMembers = new ArrayList<MemberContext>(); + this.activeMembers = new ArrayList<MemberContext>(); + this.terminationPendingMembers = new ArrayList<MemberContext>(); + this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>(); + memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>(); + + terminationPendingStartedTime = new HashMap<String, Long>(); + // check if a different value has been set for expiryTime + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + pendingMemberExpiryTime = conf.getLong(StratosConstants.PENDING_VM_MEMBER_EXPIRY_TIMEOUT, 900000); + obsoltedMemberExpiryTime = conf.getLong(StratosConstants.OBSOLETED_VM_MEMBER_EXPIRY_TIMEOUT, 86400000); + if (log.isDebugEnabled()) { + log.debug("Member expiry time is set to: " + pendingMemberExpiryTime); + log.debug("Member obsoleted expiry time is set to: " + obsoltedMemberExpiryTime); + } + + Thread th = new Thread(new PendingMemberWatcher(this)); + th.start(); + Thread th2 = new Thread(new ObsoletedMemberWatcher(this)); + th2.start(); + Thread th3 = new Thread(new TerminationPendingMemberWatcher(this)); + th3.start(); + } + + public long getTerminationPendingStartedTimeOfMember(String memberId) { + return terminationPendingStartedTime.get(memberId); + } + + public List<MemberContext> getPendingMembers() { + return pendingMembers; + } + + public void setPendingMembers(List<MemberContext> pendingMembers) { + this.pendingMembers = pendingMembers; + } + + public int getActiveMemberCount() { + return activeMembers.size(); + } + + public void setActiveMembers(List<MemberContext> activeMembers) { + this.activeMembers = activeMembers; + } + + public String getPartitionId() { + return partitionId; + } + public void setPartitionId(String partitionId) { + this.partitionId = partitionId; + } + + public int getMinimumMemberCount() { + return minimumMemberCount; + } + + public void setMinimumMemberCount(int minimumMemberCount) { + this.minimumMemberCount = minimumMemberCount; + } + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } + + public void addPendingMember(MemberContext ctxt) { + this.pendingMembers.add(ctxt); + } + + public boolean removePendingMember(String id) { + if (id == null) { + return false; + } + synchronized (pendingMembers) { + for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext(); ) { + MemberContext pendingMember = (MemberContext) iterator.next(); + if (id.equals(pendingMember.getMemberId())) { + iterator.remove(); + return true; + } + + } + } + + return false; + } + + public void movePendingMemberToActiveMembers(String memberId) { + if (memberId == null) { + return; + } + synchronized (pendingMembers) { + Iterator<MemberContext> iterator = pendingMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext pendingMember = iterator.next(); + if (pendingMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(pendingMember.getMemberId())) { + // member is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.activeMembers.add(pendingMember); + pendingMembersFailureCount = 0; + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member is removed and added to the " + + "activated member list. [Member Id] %s", memberId)); + } + break; + } + } + } + } + + public boolean activeMemberAvailable(String memberId) { + for (MemberContext activeMember : activeMembers) { + if (memberId.equals(activeMember.getMemberId())) { + return true; + } + } + return false; + } + + public boolean pendingMemberAvailable(String memberId) { + + for (MemberContext pendingMember : pendingMembers) { + if (memberId.equals(pendingMember.getMemberId())) { + return true; + } + } + return false; + } + + public void moveActiveMemberToTerminationPendingMembers(String memberId) { + if (memberId == null) { + return; + } + synchronized (activeMembers) { + Iterator<MemberContext> iterator = activeMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext activeMember = iterator.next(); + if (activeMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(activeMember.getMemberId())) { + // member is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.terminationPendingMembers.add(activeMember); + if (log.isDebugEnabled()) { + log.debug(String.format("Active member is removed and added to the " + + "termination pending member list. [Member Id] %s", memberId)); + } + break; + } + } + } + } + + /** + * Check the member lists for the provided member ID and move the member to the obsolete list + * + * @param memberId The member ID of the member to search + */ + public void moveMemberToObsoleteList(String memberId) { + if (memberId == null) { + return; + } + + // check active member list + Iterator<MemberContext> activeMemberIterator = activeMembers.listIterator(); + MemberContext removedMember = this.removeMemberFrom(activeMemberIterator, memberId); + if (removedMember != null) { + this.addObsoleteMember(removedMember); + removedMember.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Active member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + + return; + } + + // check pending member list + Iterator<MemberContext> pendingMemberIterator = pendingMembers.listIterator(); + removedMember = this.removeMemberFrom(pendingMemberIterator, memberId); + if (removedMember != null) { + this.addObsoleteMember(removedMember); + removedMember.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + + return; + } + + // check termination pending member list + Iterator<MemberContext> terminationPendingMembersIterator = terminationPendingMembers.listIterator(); + removedMember = this.removeMemberFrom(terminationPendingMembersIterator, memberId); + if (removedMember != null) { + this.addObsoleteMember(removedMember); + removedMember.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Termination Pending member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + } + } + + /** + * Removes the {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} object mapping + * to the specified member id from the specified MemberContext collection + * + * @param iterator The {@link java.util.Iterator} for the collection containing {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} + * objects + * @param memberId Member Id {@link String} for the {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} + * to be removed + * @return {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} object if + * object found and removed, null if otherwise. + */ + private MemberContext removeMemberFrom(Iterator<MemberContext> iterator, String memberId) { + while (iterator.hasNext()) { + MemberContext activeMember = iterator.next(); + if (activeMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(activeMember.getMemberId())) { + iterator.remove(); + return activeMember; + } + } + + return null; + } + + public void addActiveMember(MemberContext ctxt) { + this.activeMembers.add(ctxt); + } + + public void removeActiveMember(MemberContext ctxt) { + this.activeMembers.remove(ctxt); + } + + public boolean removeTerminationPendingMember(String memberId) { + boolean terminationPendingMemberAvailable = false; + synchronized (terminationPendingMembers) { + for (MemberContext memberContext : terminationPendingMembers) { + if (memberContext.getMemberId().equals(memberId)) { + terminationPendingMemberAvailable = true; + terminationPendingMembers.remove(memberContext); + break; + } + } + } + return terminationPendingMemberAvailable; + } + + public long getObsoltedMemberExpiryTime() { + return obsoltedMemberExpiryTime; + } + + public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) { + this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime; + } + + public void addObsoleteMember(MemberContext ctxt) { + this.obsoletedMembers.put(ctxt.getMemberId(), ctxt); + } + + public boolean removeObsoleteMember(String memberId) { + if(this.obsoletedMembers.remove(memberId) == null) { + return false; + } + return true; + } + + public long getPendingMemberExpiryTime() { + return pendingMemberExpiryTime; + } + + public void setPendingMemberExpiryTime(long pendingMemberExpiryTime) { + this.pendingMemberExpiryTime = pendingMemberExpiryTime; + } + + public Map<String, MemberContext> getObsoletedMembers() { + return obsoletedMembers; + } + + public void setObsoletedMembers(Map<String, MemberContext> obsoletedMembers) { + this.obsoletedMembers = obsoletedMembers; + } + + public String getNetworkPartitionId() { + return networkPartitionId; + } + + public void setNetworkPartitionId(String networkPartitionId) { + this.networkPartitionId = networkPartitionId; + } + + + public Map<String, MemberStatsContext> getMemberStatsContexts() { + return memberStatsContexts; + } + + public MemberStatsContext getMemberStatsContext(String memberId) { + return memberStatsContexts.get(memberId); + } + + public void addMemberStatsContext(MemberStatsContext ctxt) { + this.memberStatsContexts.put(ctxt.getMemberId(), ctxt); + } + + public void removeMemberStatsContext(String memberId) { + this.memberStatsContexts.remove(memberId); + } + + public MemberStatsContext getPartitionCtxt(String id) { + return this.memberStatsContexts.get(id); + } + +// public boolean memberExist(String memberId) { +// return memberStatsContexts.containsKey(memberId); +// } + + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public List<MemberContext> getTerminationPendingMembers() { + return terminationPendingMembers; + } + + public void setTerminationPendingMembers(List<MemberContext> terminationPendingMembers) { + this.terminationPendingMembers = terminationPendingMembers; + } + + public int getTotalMemberCount() { + + return activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size(); + } + + public int getNonTerminatedMemberCount() { + return activeMembers.size() + pendingMembers.size(); + } + + public List<MemberContext> getActiveMembers() { + return activeMembers; + } + + public boolean removeActiveMemberById(String memberId) { + boolean removeActiveMember = false; + synchronized (activeMembers) { + Iterator<MemberContext> iterator = activeMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext memberContext = iterator.next(); + if(memberId.equals(memberContext.getMemberId())){ + iterator.remove(); + removeActiveMember = true; + + break; + } + } + } + return removeActiveMember; + } + + public boolean activeMemberExist(String memberId) { + + for (MemberContext memberContext: activeMembers) { + if(memberId.equals(memberContext.getMemberId())){ + return true; + } + } + return false; + } + + public int getAllMemberForTerminationCount () { + int count = activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size(); + if (log.isDebugEnabled()) { + log.debug("PartitionContext:getAllMemberForTerminationCount:size:" + count); + } + return count; + } + + // Map<String, MemberStatsContext> getMemberStatsContexts().keySet() + public Set<String> getAllMemberForTermination () { + + List<MemberContext> merged = new ArrayList<MemberContext>(); + + + merged.addAll(activeMembers); + merged.addAll(pendingMembers); + merged.addAll(terminationPendingMembers); + + Set<String> results = new HashSet<String>(merged.size()); + + for (MemberContext ctx: merged) { + results.add(ctx.getMemberId()); + } + + + if (log.isDebugEnabled()) { + log.debug("PartitionContext:getAllMemberForTermination:size:" + results.size()); + } + + //MemberContext x = new MemberContext(); + //x.getMemberId() + + return results; + } + + public void movePendingTerminationMemberToObsoleteMembers(String memberId) { + + log.info("Starting the moving of termination pending to obsolete for [member] " + memberId); + if (memberId == null) { + return; + } + Iterator<MemberContext> iterator = terminationPendingMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext terminationPendingMember = iterator.next(); + if (terminationPendingMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(terminationPendingMember.getMemberId())) { + + log.info("Found termination pending member and trying to move [member] " + memberId + " to obsolete list"); + // member is pending termination + // remove from pending termination list + iterator.remove(); + // add to the obsolete list + this.obsoletedMembers.put(memberId, terminationPendingMember); + + terminationPendingStartedTime.put(memberId, System.currentTimeMillis()); + + if (log.isDebugEnabled()) { + log.debug(String.format("Termination pending member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + break; + } + } + } + + public MemberContext getPendingTerminationMember(String memberId) { + for (MemberContext memberContext : terminationPendingMembers) { + if (memberId.equals(memberContext.getMemberId())) { + return memberContext; + } + } + return null; + } + + public long getTerminationPendingMemberExpiryTime() { + return terminationPendingMemberExpiryTime; + } + + public void movePendingMemberToObsoleteMembers(String memberId) { + if (memberId == null) { + return; + } + Iterator<MemberContext> iterator = pendingMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext pendingMember = iterator.next(); + if (pendingMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(pendingMember.getMemberId())) { + + // remove from pending list + iterator.remove(); + // add to the obsolete list + this.obsoletedMembers.put(memberId, pendingMember); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + break; + } + } + + } + + private class PendingMemberWatcher implements Runnable { + private ClusterLevelPartitionContext ctxt; + + public PendingMemberWatcher(ClusterLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + + while (true) { + long expiryTime = ctxt.getPendingMemberExpiryTime(); + List<MemberContext> pendingMembers = ctxt.getPendingMembers(); + + synchronized (pendingMembers) { + Iterator<MemberContext> iterator = pendingMembers.listIterator(); + while ( iterator.hasNext()) { + MemberContext pendingMember = iterator.next(); + + if (pendingMember == null) { + continue; + } + long pendingTime = System.currentTimeMillis() - pendingMember.getInitTime(); + if (pendingTime >= expiryTime) { + + + iterator.remove(); + log.info("Pending state of member: " + pendingMember.getMemberId() + + " is expired. " + "Adding as an obsoleted member."); + // member should be terminated + ctxt.addObsoleteMember(pendingMember); + pendingMembersFailureCount++; + if( pendingMembersFailureCount > PENDING_MEMBER_FAILURE_THRESHOLD){ + setPendingMemberExpiryTime(expiryTime * 2);//Doubles the expiry time after the threshold of failure exceeded + //TODO Implement an alerting system: STRATOS-369 + } + } + } + } + + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + + } + + private class ObsoletedMemberWatcher implements Runnable { + private ClusterLevelPartitionContext ctxt; + + public ObsoletedMemberWatcher(ClusterLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + while (true) { + long obsoltedMemberExpiryTime = ctxt.getObsoltedMemberExpiryTime(); + Map<String, MemberContext> obsoletedMembers = ctxt.getObsoletedMembers(); + + Iterator<Entry<String, MemberContext>> iterator = obsoletedMembers.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, MemberContext> pairs = iterator.next(); + MemberContext obsoleteMember = (MemberContext) pairs.getValue(); + if (obsoleteMember == null) { + continue; + } + long obsoleteTime = System.currentTimeMillis() - obsoleteMember.getInitTime(); + if (obsoleteTime >= obsoltedMemberExpiryTime) { + iterator.remove(); + } + } + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + } + + /** + * This thread is responsible for moving member to obsolete list if pending termination timeout happens + */ + private class TerminationPendingMemberWatcher implements Runnable { + private ClusterLevelPartitionContext ctxt; + + public TerminationPendingMemberWatcher(ClusterLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + + while (true) { + long terminationPendingMemberExpiryTime = ctxt.getTerminationPendingMemberExpiryTime(); + + Iterator<MemberContext> iterator = ctxt.getTerminationPendingMembers().listIterator(); + while (iterator.hasNext()) { + + MemberContext terminationPendingMember = iterator.next(); + if (terminationPendingMember == null){ + continue; + } + long terminationPendingTime = System.currentTimeMillis() + - ctxt.getTerminationPendingStartedTimeOfMember(terminationPendingMember.getMemberId()); + if (terminationPendingTime >= terminationPendingMemberExpiryTime) { + log.info("Moving [member] " + terminationPendingMember.getMemberId() + partitionId); + iterator.remove(); + obsoletedMembers.put(terminationPendingMember.getMemberId(), terminationPendingMember); + } + } + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) {} + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/a19c5a80/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java index cadca37..8fc7bf7 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java @@ -72,7 +72,7 @@ public class NetworkPartitionContext implements Serializable{ private int currentPartitionIndex; //partitions of this network partition - private final Map<String, PartitionContext> partitionCtxts; + private final Map<String, ClusterLevelPartitionContext> partitionCtxts; public NetworkPartitionContext(String id, String partitionAlgo, Partition[] partitions) { @@ -84,7 +84,7 @@ public class NetworkPartitionContext implements Serializable{ } else { this.partitions = Arrays.copyOf(partitions, partitions.length); } - partitionCtxts = new HashMap<String, PartitionContext>(); + partitionCtxts = new HashMap<String, ClusterLevelPartitionContext>(); requestsInFlight = new RequestsInFlight(); loadAverage = new LoadAverage(); memoryConsumption = new MemoryConsumption(); @@ -359,16 +359,16 @@ public class NetworkPartitionContext implements Serializable{ return id; } - public Map<String, PartitionContext> getPartitionCtxts() { + public Map<String, ClusterLevelPartitionContext> getPartitionCtxts() { return partitionCtxts; } - public PartitionContext getPartitionCtxt(String partitionId) { + public ClusterLevelPartitionContext getPartitionCtxt(String partitionId) { return partitionCtxts.get(partitionId); } - public void addPartitionContext(PartitionContext partitionContext) { - partitionCtxts.put(partitionContext.getPartitionId(), partitionContext); + public void addPartitionContext(ClusterLevelPartitionContext clusterMonitorPartitionContext) { + partitionCtxts.put(clusterMonitorPartitionContext.getPartitionId(), clusterMonitorPartitionContext); } public String getPartitionAlgorithm() { http://git-wip-us.apache.org/repos/asf/stratos/blob/a19c5a80/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java deleted file mode 100644 index 162ae98..0000000 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java +++ /dev/null @@ -1,722 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.autoscaler; - -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.autoscaler.util.ConfUtil; -import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; -import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; - -import java.io.Serializable; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.stratos.common.constants.StratosConstants; - -/** - * This is an object that inserted to the rules engine. - * Holds information about a partition. - * - * - */ - -public class PartitionContext implements Serializable{ - - private static final long serialVersionUID = -2920388667345980487L; - private static final Log log = LogFactory.getLog(PartitionContext.class); - private String partitionId; - private String serviceName; - private String networkPartitionId; - private Partition partition; - private int minimumMemberCount = 0; - private int pendingMembersFailureCount = 0; - private final int PENDING_MEMBER_FAILURE_THRESHOLD = 5; - - // properties - private Properties properties; - - // 15 mints as the default - private long pendingMemberExpiryTime = 900000; - // pending members - private List<MemberContext> pendingMembers; - - // 1 day as default - private long obsoltedMemberExpiryTime = 1*24*60*60*1000; - - // 30 mints as default - private long terminationPendingMemberExpiryTime = 1800000; - - // members to be terminated - private Map<String, MemberContext> obsoletedMembers; - - // active members - private List<MemberContext> activeMembers; - - // termination pending members, member is added to this when Autoscaler send grace fully shut down event - private List<MemberContext> terminationPendingMembers; - - //member id: time that member is moved to termination pending status - private Map<String, Long> terminationPendingStartedTime; - - //Keep statistics come from CEP - private Map<String, MemberStatsContext> memberStatsContexts; - - // for the use of tests - public PartitionContext(long memberExpiryTime) { - - this.activeMembers = new ArrayList<MemberContext>(); - this.terminationPendingMembers = new ArrayList<MemberContext>(); - pendingMemberExpiryTime = memberExpiryTime; - } - - public PartitionContext(Partition partition) { - this.setPartition(partition); - this.minimumMemberCount = partition.getPartitionMin(); - this.partitionId = partition.getId(); - this.pendingMembers = new ArrayList<MemberContext>(); - this.activeMembers = new ArrayList<MemberContext>(); - this.terminationPendingMembers = new ArrayList<MemberContext>(); - this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>(); - memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>(); - - terminationPendingStartedTime = new HashMap<String, Long>(); - // check if a different value has been set for expiryTime - XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); - pendingMemberExpiryTime = conf.getLong(StratosConstants.PENDING_VM_MEMBER_EXPIRY_TIMEOUT, 900000); - obsoltedMemberExpiryTime = conf.getLong(StratosConstants.OBSOLETED_VM_MEMBER_EXPIRY_TIMEOUT, 86400000); - if (log.isDebugEnabled()) { - log.debug("Member expiry time is set to: " + pendingMemberExpiryTime); - log.debug("Member obsoleted expiry time is set to: " + obsoltedMemberExpiryTime); - } - - Thread th = new Thread(new PendingMemberWatcher(this)); - th.start(); - Thread th2 = new Thread(new ObsoletedMemberWatcher(this)); - th2.start(); - Thread th3 = new Thread(new TerminationPendingMemberWatcher(this)); - th3.start(); - } - - public long getTerminationPendingStartedTimeOfMember(String memberId) { - return terminationPendingStartedTime.get(memberId); - } - - public List<MemberContext> getPendingMembers() { - return pendingMembers; - } - - public void setPendingMembers(List<MemberContext> pendingMembers) { - this.pendingMembers = pendingMembers; - } - - public int getActiveMemberCount() { - return activeMembers.size(); - } - - public void setActiveMembers(List<MemberContext> activeMembers) { - this.activeMembers = activeMembers; - } - - public String getPartitionId() { - return partitionId; - } - public void setPartitionId(String partitionId) { - this.partitionId = partitionId; - } - - public int getMinimumMemberCount() { - return minimumMemberCount; - } - - public void setMinimumMemberCount(int minimumMemberCount) { - this.minimumMemberCount = minimumMemberCount; - } - - public Partition getPartition() { - return partition; - } - - public void setPartition(Partition partition) { - this.partition = partition; - } - - public void addPendingMember(MemberContext ctxt) { - this.pendingMembers.add(ctxt); - } - - public boolean removePendingMember(String id) { - if (id == null) { - return false; - } - synchronized (pendingMembers) { - for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext(); ) { - MemberContext pendingMember = (MemberContext) iterator.next(); - if (id.equals(pendingMember.getMemberId())) { - iterator.remove(); - return true; - } - - } - } - - return false; - } - - public void movePendingMemberToActiveMembers(String memberId) { - if (memberId == null) { - return; - } - synchronized (pendingMembers) { - Iterator<MemberContext> iterator = pendingMembers.listIterator(); - while (iterator.hasNext()) { - MemberContext pendingMember = iterator.next(); - if (pendingMember == null) { - iterator.remove(); - continue; - } - if (memberId.equals(pendingMember.getMemberId())) { - // member is activated - // remove from pending list - iterator.remove(); - // add to the activated list - this.activeMembers.add(pendingMember); - pendingMembersFailureCount = 0; - if (log.isDebugEnabled()) { - log.debug(String.format("Pending member is removed and added to the " + - "activated member list. [Member Id] %s", memberId)); - } - break; - } - } - } - } - - public boolean activeMemberAvailable(String memberId) { - for (MemberContext activeMember : activeMembers) { - if (memberId.equals(activeMember.getMemberId())) { - return true; - } - } - return false; - } - - public boolean pendingMemberAvailable(String memberId) { - - for (MemberContext pendingMember : pendingMembers) { - if (memberId.equals(pendingMember.getMemberId())) { - return true; - } - } - return false; - } - - public void moveActiveMemberToTerminationPendingMembers(String memberId) { - if (memberId == null) { - return; - } - synchronized (activeMembers) { - Iterator<MemberContext> iterator = activeMembers.listIterator(); - while (iterator.hasNext()) { - MemberContext activeMember = iterator.next(); - if (activeMember == null) { - iterator.remove(); - continue; - } - if (memberId.equals(activeMember.getMemberId())) { - // member is activated - // remove from pending list - iterator.remove(); - // add to the activated list - this.terminationPendingMembers.add(activeMember); - if (log.isDebugEnabled()) { - log.debug(String.format("Active member is removed and added to the " + - "termination pending member list. [Member Id] %s", memberId)); - } - break; - } - } - } - } - - /** - * Check the member lists for the provided member ID and move the member to the obsolete list - * - * @param memberId The member ID of the member to search - */ - public void moveMemberToObsoleteList(String memberId) { - if (memberId == null) { - return; - } - - // check active member list - Iterator<MemberContext> activeMemberIterator = activeMembers.listIterator(); - MemberContext removedMember = this.removeMemberFrom(activeMemberIterator, memberId); - if (removedMember != null) { - this.addObsoleteMember(removedMember); - removedMember.setObsoleteInitTime(System.currentTimeMillis()); - if (log.isDebugEnabled()) { - log.debug(String.format("Active member is removed and added to the " + - "obsolete member list. [Member Id] %s", memberId)); - } - - return; - } - - // check pending member list - Iterator<MemberContext> pendingMemberIterator = pendingMembers.listIterator(); - removedMember = this.removeMemberFrom(pendingMemberIterator, memberId); - if (removedMember != null) { - this.addObsoleteMember(removedMember); - removedMember.setObsoleteInitTime(System.currentTimeMillis()); - if (log.isDebugEnabled()) { - log.debug(String.format("Pending member is removed and added to the " + - "obsolete member list. [Member Id] %s", memberId)); - } - - return; - } - - // check termination pending member list - Iterator<MemberContext> terminationPendingMembersIterator = terminationPendingMembers.listIterator(); - removedMember = this.removeMemberFrom(terminationPendingMembersIterator, memberId); - if (removedMember != null) { - this.addObsoleteMember(removedMember); - removedMember.setObsoleteInitTime(System.currentTimeMillis()); - if (log.isDebugEnabled()) { - log.debug(String.format("Termination Pending member is removed and added to the " + - "obsolete member list. [Member Id] %s", memberId)); - } - } - } - - /** - * Removes the {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} object mapping - * to the specified member id from the specified MemberContext collection - * - * @param iterator The {@link java.util.Iterator} for the collection containing {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} - * objects - * @param memberId Member Id {@link String} for the {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} - * to be removed - * @return {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} object if - * object found and removed, null if otherwise. - */ - private MemberContext removeMemberFrom(Iterator<MemberContext> iterator, String memberId) { - while (iterator.hasNext()) { - MemberContext activeMember = iterator.next(); - if (activeMember == null) { - iterator.remove(); - continue; - } - if (memberId.equals(activeMember.getMemberId())) { - iterator.remove(); - return activeMember; - } - } - - return null; - } - - public void addActiveMember(MemberContext ctxt) { - this.activeMembers.add(ctxt); - } - - public void removeActiveMember(MemberContext ctxt) { - this.activeMembers.remove(ctxt); - } - - public boolean removeTerminationPendingMember(String memberId) { - boolean terminationPendingMemberAvailable = false; - synchronized (terminationPendingMembers) { - for (MemberContext memberContext : terminationPendingMembers) { - if (memberContext.getMemberId().equals(memberId)) { - terminationPendingMemberAvailable = true; - terminationPendingMembers.remove(memberContext); - break; - } - } - } - return terminationPendingMemberAvailable; - } - - public long getObsoltedMemberExpiryTime() { - return obsoltedMemberExpiryTime; - } - - public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) { - this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime; - } - - public void addObsoleteMember(MemberContext ctxt) { - this.obsoletedMembers.put(ctxt.getMemberId(), ctxt); - } - - public boolean removeObsoleteMember(String memberId) { - if(this.obsoletedMembers.remove(memberId) == null) { - return false; - } - return true; - } - - public long getPendingMemberExpiryTime() { - return pendingMemberExpiryTime; - } - - public void setPendingMemberExpiryTime(long pendingMemberExpiryTime) { - this.pendingMemberExpiryTime = pendingMemberExpiryTime; - } - - public Map<String, MemberContext> getObsoletedMembers() { - return obsoletedMembers; - } - - public void setObsoletedMembers(Map<String, MemberContext> obsoletedMembers) { - this.obsoletedMembers = obsoletedMembers; - } - - public String getNetworkPartitionId() { - return networkPartitionId; - } - - public void setNetworkPartitionId(String networkPartitionId) { - this.networkPartitionId = networkPartitionId; - } - - - public Map<String, MemberStatsContext> getMemberStatsContexts() { - return memberStatsContexts; - } - - public MemberStatsContext getMemberStatsContext(String memberId) { - return memberStatsContexts.get(memberId); - } - - public void addMemberStatsContext(MemberStatsContext ctxt) { - this.memberStatsContexts.put(ctxt.getMemberId(), ctxt); - } - - public void removeMemberStatsContext(String memberId) { - this.memberStatsContexts.remove(memberId); - } - - public MemberStatsContext getPartitionCtxt(String id) { - return this.memberStatsContexts.get(id); - } - -// public boolean memberExist(String memberId) { -// return memberStatsContexts.containsKey(memberId); -// } - - - public Properties getProperties() { - return properties; - } - - public void setProperties(Properties properties) { - this.properties = properties; - } - - public String getServiceName() { - return serviceName; - } - - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - public List<MemberContext> getTerminationPendingMembers() { - return terminationPendingMembers; - } - - public void setTerminationPendingMembers(List<MemberContext> terminationPendingMembers) { - this.terminationPendingMembers = terminationPendingMembers; - } - - public int getTotalMemberCount() { - - return activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size(); - } - - public int getNonTerminatedMemberCount() { - return activeMembers.size() + pendingMembers.size(); - } - - public List<MemberContext> getActiveMembers() { - return activeMembers; - } - - public boolean removeActiveMemberById(String memberId) { - boolean removeActiveMember = false; - synchronized (activeMembers) { - Iterator<MemberContext> iterator = activeMembers.listIterator(); - while (iterator.hasNext()) { - MemberContext memberContext = iterator.next(); - if(memberId.equals(memberContext.getMemberId())){ - iterator.remove(); - removeActiveMember = true; - - break; - } - } - } - return removeActiveMember; - } - - public boolean activeMemberExist(String memberId) { - - for (MemberContext memberContext: activeMembers) { - if(memberId.equals(memberContext.getMemberId())){ - return true; - } - } - return false; - } - - public int getAllMemberForTerminationCount () { - int count = activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size(); - if (log.isDebugEnabled()) { - log.debug("PartitionContext:getAllMemberForTerminationCount:size:" + count); - } - return count; - } - - // Map<String, MemberStatsContext> getMemberStatsContexts().keySet() - public Set<String> getAllMemberForTermination () { - - List<MemberContext> merged = new ArrayList<MemberContext>(); - - - merged.addAll(activeMembers); - merged.addAll(pendingMembers); - merged.addAll(terminationPendingMembers); - - Set<String> results = new HashSet<String>(merged.size()); - - for (MemberContext ctx: merged) { - results.add(ctx.getMemberId()); - } - - - if (log.isDebugEnabled()) { - log.debug("PartitionContext:getAllMemberForTermination:size:" + results.size()); - } - - //MemberContext x = new MemberContext(); - //x.getMemberId() - - return results; - } - - public void movePendingTerminationMemberToObsoleteMembers(String memberId) { - - log.info("Starting the moving of termination pending to obsolete for [member] " + memberId); - if (memberId == null) { - return; - } - Iterator<MemberContext> iterator = terminationPendingMembers.listIterator(); - while (iterator.hasNext()) { - MemberContext terminationPendingMember = iterator.next(); - if (terminationPendingMember == null) { - iterator.remove(); - continue; - } - if (memberId.equals(terminationPendingMember.getMemberId())) { - - log.info("Found termination pending member and trying to move [member] " + memberId + " to obsolete list"); - // member is pending termination - // remove from pending termination list - iterator.remove(); - // add to the obsolete list - this.obsoletedMembers.put(memberId, terminationPendingMember); - - terminationPendingStartedTime.put(memberId, System.currentTimeMillis()); - - if (log.isDebugEnabled()) { - log.debug(String.format("Termination pending member is removed and added to the " + - "obsolete member list. [Member Id] %s", memberId)); - } - break; - } - } - } - - public MemberContext getPendingTerminationMember(String memberId) { - for (MemberContext memberContext : terminationPendingMembers) { - if (memberId.equals(memberContext.getMemberId())) { - return memberContext; - } - } - return null; - } - - public long getTerminationPendingMemberExpiryTime() { - return terminationPendingMemberExpiryTime; - } - - public void movePendingMemberToObsoleteMembers(String memberId) { - if (memberId == null) { - return; - } - Iterator<MemberContext> iterator = pendingMembers.listIterator(); - while (iterator.hasNext()) { - MemberContext pendingMember = iterator.next(); - if (pendingMember == null) { - iterator.remove(); - continue; - } - if (memberId.equals(pendingMember.getMemberId())) { - - // remove from pending list - iterator.remove(); - // add to the obsolete list - this.obsoletedMembers.put(memberId, pendingMember); - if (log.isDebugEnabled()) { - log.debug(String.format("Pending member is removed and added to the " + - "obsolete member list. [Member Id] %s", memberId)); - } - break; - } - } - - } - - private class PendingMemberWatcher implements Runnable { - private PartitionContext ctxt; - - public PendingMemberWatcher(PartitionContext ctxt) { - this.ctxt = ctxt; - } - - @Override - public void run() { - - while (true) { - long expiryTime = ctxt.getPendingMemberExpiryTime(); - List<MemberContext> pendingMembers = ctxt.getPendingMembers(); - - synchronized (pendingMembers) { - Iterator<MemberContext> iterator = pendingMembers.listIterator(); - while ( iterator.hasNext()) { - MemberContext pendingMember = iterator.next(); - - if (pendingMember == null) { - continue; - } - long pendingTime = System.currentTimeMillis() - pendingMember.getInitTime(); - if (pendingTime >= expiryTime) { - - - iterator.remove(); - log.info("Pending state of member: " + pendingMember.getMemberId() + - " is expired. " + "Adding as an obsoleted member."); - // member should be terminated - ctxt.addObsoleteMember(pendingMember); - pendingMembersFailureCount++; - if( pendingMembersFailureCount > PENDING_MEMBER_FAILURE_THRESHOLD){ - setPendingMemberExpiryTime(expiryTime * 2);//Doubles the expiry time after the threshold of failure exceeded - //TODO Implement an alerting system: STRATOS-369 - } - } - } - } - - try { - // TODO find a constant - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - } - } - - } - - private class ObsoletedMemberWatcher implements Runnable { - private PartitionContext ctxt; - - public ObsoletedMemberWatcher(PartitionContext ctxt) { - this.ctxt = ctxt; - } - - @Override - public void run() { - while (true) { - long obsoltedMemberExpiryTime = ctxt.getObsoltedMemberExpiryTime(); - Map<String, MemberContext> obsoletedMembers = ctxt.getObsoletedMembers(); - - Iterator<Entry<String, MemberContext>> iterator = obsoletedMembers.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, MemberContext> pairs = iterator.next(); - MemberContext obsoleteMember = (MemberContext) pairs.getValue(); - if (obsoleteMember == null) { - continue; - } - long obsoleteTime = System.currentTimeMillis() - obsoleteMember.getInitTime(); - if (obsoleteTime >= obsoltedMemberExpiryTime) { - iterator.remove(); - } - } - try { - // TODO find a constant - Thread.sleep(15000); - } catch (InterruptedException ignore) { - } - } - } - } - - /** - * This thread is responsible for moving member to obsolete list if pending termination timeout happens - */ - private class TerminationPendingMemberWatcher implements Runnable { - private PartitionContext ctxt; - - public TerminationPendingMemberWatcher(PartitionContext ctxt) { - this.ctxt = ctxt; - } - - @Override - public void run() { - - while (true) { - long terminationPendingMemberExpiryTime = ctxt.getTerminationPendingMemberExpiryTime(); - - Iterator<MemberContext> iterator = ctxt.getTerminationPendingMembers().listIterator(); - while (iterator.hasNext()) { - - MemberContext terminationPendingMember = iterator.next(); - if (terminationPendingMember == null){ - continue; - } - long terminationPendingTime = System.currentTimeMillis() - - ctxt.getTerminationPendingStartedTimeOfMember(terminationPendingMember.getMemberId()); - if (terminationPendingTime >= terminationPendingMemberExpiryTime) { - log.info("Moving [member] " + terminationPendingMember.getMemberId() + partitionId); - iterator.remove(); - obsoletedMembers.put(terminationPendingMember.getMemberId(), terminationPendingMember); - } - } - try { - // TODO find a constant - Thread.sleep(15000); - } catch (InterruptedException ignore) {} - } - } - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/a19c5a80/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java index f22d2d2..558c612 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java @@ -307,7 +307,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -328,7 +328,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -355,7 +355,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -376,7 +376,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -397,7 +397,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { Member member = getMemberByMemberId(memberId); String networkPartitionId = getNetworkPartitionIdByMemberId(memberId); NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - PartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); + ClusterLevelPartitionContext partitionCtxt = networkPartitionCtxt.getPartitionCtxt(member.getPartitionId()); MemberStatsContext memberStatsContext = partitionCtxt.getMemberStatsContext(memberId); if (null == memberStatsContext) { if (log.isDebugEnabled()) { @@ -432,7 +432,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { NetworkPartitionContext nwPartitionCtxt; nwPartitionCtxt = getNetworkPartitionCtxt(instanceId, member); String partitionId = getPartitionOfMember(memberId); - PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); if (!partitionCtxt.activeMemberExist(memberId)) { if (log.isDebugEnabled()) { log.debug(String.format("Could not find the active member in partition context, " @@ -470,14 +470,14 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String partitionId = memberActivatedEvent.getPartitionId(); String memberId = memberActivatedEvent.getMemberId(); NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - PartitionContext partitionContext; - partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + ClusterLevelPartitionContext clusterMonitorPartitionContext; + clusterMonitorPartitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); + clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); if (log.isInfoEnabled()) { log.info(String.format("Member stat context has been added successfully: " + "[member] %s", memberId)); } - partitionContext.movePendingMemberToActiveMembers(memberId); + clusterMonitorPartitionContext.movePendingMemberToActiveMembers(memberId); StatusChecker.getInstance().onMemberStatusChange(memberActivatedEvent.getClusterId()); } @@ -490,13 +490,13 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String memberId = maintenanceModeEvent.getMemberId(); String instanceId = maintenanceModeEvent.getInstanceId(); NetworkPartitionContext networkPartitionCtxt = getNetworkPartitionCtxt(instanceId, networkPartitionId); - PartitionContext partitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); - partitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); + ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionCtxt.getPartitionCtxt(partitionId); + clusterMonitorPartitionContext.addMemberStatsContext(new MemberStatsContext(memberId)); if (log.isDebugEnabled()) { log.debug(String.format("Member has been moved as pending termination: " + "[member] %s", memberId)); } - partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); + clusterMonitorPartitionContext.moveActiveMemberToTerminationPendingMembers(memberId); } @Override @@ -510,7 +510,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { // start a new member in the same Partition String memberId = memberReadyToShutdownEvent.getMemberId(); String partitionId = getPartitionOfMember(memberId); - PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); + ClusterLevelPartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId); // terminate the shutdown ready member //CloudControllerClient ccClient = CloudControllerClient.getInstance(); @@ -554,23 +554,23 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { String instanceId = memberTerminatedEvent.getInstanceId(); String partitionId = memberTerminatedEvent.getPartitionId(); NetworkPartitionContext networkPartitionContext = getNetworkPartitionCtxt(instanceId, networkPartitionId); - PartitionContext partitionContext = networkPartitionContext.getPartitionCtxt(partitionId); - partitionContext.removeMemberStatsContext(memberId); + ClusterLevelPartitionContext clusterMonitorPartitionContext = networkPartitionContext.getPartitionCtxt(partitionId); + clusterMonitorPartitionContext.removeMemberStatsContext(memberId); - if (partitionContext.removeTerminationPendingMember(memberId)) { + if (clusterMonitorPartitionContext.removeTerminationPendingMember(memberId)) { if (log.isDebugEnabled()) { log.debug(String.format("Member is removed from termination pending members list: " + "[member] %s", memberId)); } - } else if (partitionContext.removePendingMember(memberId)) { + } else if (clusterMonitorPartitionContext.removePendingMember(memberId)) { if (log.isDebugEnabled()) { log.debug(String.format("Member is removed from pending members list: " + "[member] %s", memberId)); } - } else if (partitionContext.removeActiveMemberById(memberId)) { + } else if (clusterMonitorPartitionContext.removeActiveMemberById(memberId)) { log.warn(String.format("Member is in the wrong list and it is removed from " + "active members list: %s", memberId)); - } else if (partitionContext.removeObsoleteMember(memberId)) { + } else if (clusterMonitorPartitionContext.removeObsoleteMember(memberId)) { log.warn(String.format("Obsolete member has either been terminated or its obsolete time out has expired and" + " it is removed from obsolete members list: %s", memberId)); } else { @@ -642,7 +642,7 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { public void run() { for (NetworkPartitionContext networkPartitionContext : getAllNetworkPartitionCtxts().values()) { - for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { + for (ClusterLevelPartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { //if (log.isDebugEnabled()) { log.info("Starting to terminate all members in cluster [" + getClusterId() + "] Network Partition [ " + networkPartitionContext.getId() + " ], Partition [ " + http://git-wip-us.apache.org/repos/asf/stratos/blob/a19c5a80/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java index aa8ec30..d22d20c 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java @@ -21,7 +21,6 @@ package org.apache.stratos.autoscaler.monitor.cluster; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.logging.Log; @@ -30,13 +29,11 @@ import org.apache.stratos.autoscaler.*; import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.autoscaler.policy.PolicyManager; -import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; import org.apache.stratos.autoscaler.util.AutoScalerConstants; import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.common.constants.StratosConstants; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; /** @@ -92,7 +89,7 @@ public class VMLbClusterMonitor extends VMClusterMonitor { for (NetworkPartitionContext networkPartitionContext : getNetworkPartitionCtxts(instanceIdToClusterCtxtEntry.getKey()).values()) { // minimum check per partition - for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts() + for (ClusterLevelPartitionContext partitionContext : networkPartitionContext.getPartitionCtxts() .values()) { if (partitionContext != null) { http://git-wip-us.apache.org/repos/asf/stratos/blob/a19c5a80/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java index 6fcafea..37b7eda 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java @@ -35,7 +35,6 @@ import org.apache.stratos.cloud.controller.stub.pojo.Property; import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.applications.ApplicationStatus; import org.apache.stratos.messaging.domain.applications.GroupStatus; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; import java.util.ArrayList; import java.util.List; @@ -122,7 +121,7 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { // store primary members in the network partition context List<String> primaryMemberListInNetworkPartition = new ArrayList<String>(); //minimum check per partition - for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { + for (ClusterLevelPartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { // store primary members in the partition context List<String> primaryMemberListInPartition = new ArrayList<String>(); // get active primary members in this partition context http://git-wip-us.apache.org/repos/asf/stratos/blob/a19c5a80/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java index 15894ef..e5b6ce1 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java @@ -21,8 +21,8 @@ package org.apache.stratos.autoscaler.rule; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.ClusterLevelPartitionContext; import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; -import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.common.constants.StratosConstants; import org.drools.KnowledgeBase; @@ -214,8 +214,8 @@ public class AutoscalerRuleEvaluator { return ksession; } - public static String getLbClusterId(PartitionContext partitionContext, String nwpartitionId) { - Properties props = partitionContext.getProperties(); + public static String getLbClusterId(ClusterLevelPartitionContext clusterMonitorPartitionContext, String nwpartitionId) { + Properties props = clusterMonitorPartitionContext.getProperties(); String value = (String) props.get(org.apache.stratos.messaging.util.Constants.LOAD_BALANCER_REF); @@ -229,7 +229,7 @@ public class AutoscalerRuleEvaluator { if (value.equals(org.apache.stratos.messaging.util.Constants.DEFAULT_LOAD_BALANCER)) { lbClusterId = networkPartitionLbHolder.getDefaultLbClusterId(); } else if (value.equals(org.apache.stratos.messaging.util.Constants.SERVICE_AWARE_LOAD_BALANCER)) { - String serviceName = partitionContext.getServiceName(); + String serviceName = clusterMonitorPartitionContext.getServiceName(); lbClusterId = networkPartitionLbHolder.getLBClusterIdOfService(serviceName); } return lbClusterId;
