http://git-wip-us.apache.org/repos/asf/stratos/blob/c20d28c2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/ClusterLevelPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/ClusterLevelPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/ClusterLevelPartitionContext.java new file mode 100644 index 0000000..c044957 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/ClusterLevelPartitionContext.java @@ -0,0 +1,720 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.context.partition; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.context.member.MemberStatsContext; +import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; +import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; + +import java.io.Serializable; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.stratos.common.constants.StratosConstants; + +/** + * This is an object that inserted to the rules engine. + * Holds information about a partition. + * + * + */ + +public class ClusterLevelPartitionContext extends PartitionContext implements Serializable{ + + private static final long serialVersionUID = -2920388667345980487L; + private static final Log log = LogFactory.getLog(ClusterLevelPartitionContext.class); + private String serviceName; + private String networkPartitionId; + private Partition partition; + private int minimumMemberCount = 0; + private int pendingMembersFailureCount = 0; + private final int PENDING_MEMBER_FAILURE_THRESHOLD = 5; + + // properties + private Properties properties; + + // 15 mints as the default + private long pendingMemberExpiryTime = 900000; + // pending members + private List<MemberContext> pendingMembers; + + // 1 day as default + private long obsoltedMemberExpiryTime = 1*24*60*60*1000; + + // 30 mints as default + private long terminationPendingMemberExpiryTime = 1800000; + + // members to be terminated + private Map<String, MemberContext> obsoletedMembers; + + // active members + private List<MemberContext> activeMembers; + + // termination pending members, member is added to this when Autoscaler send grace fully shut down event + private List<MemberContext> terminationPendingMembers; + + //member id: time that member is moved to termination pending status + private Map<String, Long> terminationPendingStartedTime; + + //Keep statistics come from CEP + private Map<String, MemberStatsContext> memberStatsContexts; + + // for the use of tests + public ClusterLevelPartitionContext(long memberExpiryTime) { + super(memberExpiryTime); + this.activeMembers = new ArrayList<MemberContext>(); + this.terminationPendingMembers = new ArrayList<MemberContext>(); + } + + public ClusterLevelPartitionContext(Partition partition) { + + super(partition); + this.minimumMemberCount = partition.getPartitionMin(); + this.pendingMembers = new ArrayList<MemberContext>(); + this.activeMembers = new ArrayList<MemberContext>(); + this.terminationPendingMembers = new ArrayList<MemberContext>(); + this.obsoletedMembers = new ConcurrentHashMap<String, MemberContext>(); + memberStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>(); + + terminationPendingStartedTime = new HashMap<String, Long>(); + // check if a different value has been set for expiryTime + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + pendingMemberExpiryTime = conf.getLong(StratosConstants.PENDING_VM_MEMBER_EXPIRY_TIMEOUT, 900000); + obsoltedMemberExpiryTime = conf.getLong(StratosConstants.OBSOLETED_VM_MEMBER_EXPIRY_TIMEOUT, 86400000); + if (log.isDebugEnabled()) { + log.debug("Member expiry time is set to: " + pendingMemberExpiryTime); + log.debug("Member obsoleted expiry time is set to: " + obsoltedMemberExpiryTime); + } + + Thread th = new Thread(new PendingMemberWatcher(this)); + th.start(); + Thread th2 = new Thread(new ObsoletedMemberWatcher(this)); + th2.start(); + Thread th3 = new Thread(new TerminationPendingMemberWatcher(this)); + th3.start(); + } + + public long getTerminationPendingStartedTimeOfMember(String memberId) { + return terminationPendingStartedTime.get(memberId); + } + + public List<MemberContext> getPendingMembers() { + return pendingMembers; + } + + public void setPendingMembers(List<MemberContext> pendingMembers) { + this.pendingMembers = pendingMembers; + } + + public int getActiveMemberCount() { + return activeMembers.size(); + } + + public void setActiveMembers(List<MemberContext> activeMembers) { + this.activeMembers = activeMembers; + } + + public int getMinimumMemberCount() { + return minimumMemberCount; + } + + public void setMinimumMemberCount(int minimumMemberCount) { + this.minimumMemberCount = minimumMemberCount; + } + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } + + public void addPendingMember(MemberContext ctxt) { + this.pendingMembers.add(ctxt); + } + + public boolean removePendingMember(String id) { + if (id == null) { + return false; + } + synchronized (pendingMembers) { + for (Iterator<MemberContext> iterator = pendingMembers.iterator(); iterator.hasNext(); ) { + MemberContext pendingMember = (MemberContext) iterator.next(); + if (id.equals(pendingMember.getMemberId())) { + iterator.remove(); + return true; + } + + } + } + + return false; + } + + public void movePendingMemberToActiveMembers(String memberId) { + if (memberId == null) { + return; + } + synchronized (pendingMembers) { + Iterator<MemberContext> iterator = pendingMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext pendingMember = iterator.next(); + if (pendingMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(pendingMember.getMemberId())) { + // member is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.activeMembers.add(pendingMember); + pendingMembersFailureCount = 0; + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member is removed and added to the " + + "activated member list. [Member Id] %s", memberId)); + } + break; + } + } + } + } + + public boolean activeMemberAvailable(String memberId) { + for (MemberContext activeMember : activeMembers) { + if (memberId.equals(activeMember.getMemberId())) { + return true; + } + } + return false; + } + + public boolean pendingMemberAvailable(String memberId) { + + for (MemberContext pendingMember : pendingMembers) { + if (memberId.equals(pendingMember.getMemberId())) { + return true; + } + } + return false; + } + + public void moveActiveMemberToTerminationPendingMembers(String memberId) { + if (memberId == null) { + return; + } + synchronized (activeMembers) { + Iterator<MemberContext> iterator = activeMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext activeMember = iterator.next(); + if (activeMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(activeMember.getMemberId())) { + // member is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.terminationPendingMembers.add(activeMember); + if (log.isDebugEnabled()) { + log.debug(String.format("Active member is removed and added to the " + + "termination pending member list. [Member Id] %s", memberId)); + } + break; + } + } + } + } + + /** + * Check the member lists for the provided member ID and move the member to the obsolete list + * + * @param memberId The member ID of the member to search + */ + public void moveMemberToObsoleteList(String memberId) { + if (memberId == null) { + return; + } + + // check active member list + Iterator<MemberContext> activeMemberIterator = activeMembers.listIterator(); + MemberContext removedMember = this.removeMemberFrom(activeMemberIterator, memberId); + if (removedMember != null) { + this.addObsoleteMember(removedMember); + removedMember.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Active member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + + return; + } + + // check pending member list + Iterator<MemberContext> pendingMemberIterator = pendingMembers.listIterator(); + removedMember = this.removeMemberFrom(pendingMemberIterator, memberId); + if (removedMember != null) { + this.addObsoleteMember(removedMember); + removedMember.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + + return; + } + + // check termination pending member list + Iterator<MemberContext> terminationPendingMembersIterator = terminationPendingMembers.listIterator(); + removedMember = this.removeMemberFrom(terminationPendingMembersIterator, memberId); + if (removedMember != null) { + this.addObsoleteMember(removedMember); + removedMember.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Termination Pending member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + } + } + + /** + * Removes the {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} object mapping + * to the specified member id from the specified MemberContext collection + * + * @param iterator The {@link java.util.Iterator} for the collection containing {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} + * objects + * @param memberId Member Id {@link String} for the {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} + * to be removed + * @return {@link org.apache.stratos.cloud.controller.stub.pojo.MemberContext} object if + * object found and removed, null if otherwise. + */ + private MemberContext removeMemberFrom(Iterator<MemberContext> iterator, String memberId) { + while (iterator.hasNext()) { + MemberContext activeMember = iterator.next(); + if (activeMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(activeMember.getMemberId())) { + iterator.remove(); + return activeMember; + } + } + + return null; + } + + public void addActiveMember(MemberContext ctxt) { + this.activeMembers.add(ctxt); + } + + public void removeActiveMember(MemberContext ctxt) { + this.activeMembers.remove(ctxt); + } + + public boolean removeTerminationPendingMember(String memberId) { + boolean terminationPendingMemberAvailable = false; + synchronized (terminationPendingMembers) { + for (MemberContext memberContext : terminationPendingMembers) { + if (memberContext.getMemberId().equals(memberId)) { + terminationPendingMemberAvailable = true; + terminationPendingMembers.remove(memberContext); + break; + } + } + } + return terminationPendingMemberAvailable; + } + + public long getObsoltedMemberExpiryTime() { + return obsoltedMemberExpiryTime; + } + + public void setObsoltedMemberExpiryTime(long obsoltedMemberExpiryTime) { + this.obsoltedMemberExpiryTime = obsoltedMemberExpiryTime; + } + + public void addObsoleteMember(MemberContext ctxt) { + this.obsoletedMembers.put(ctxt.getMemberId(), ctxt); + } + + public boolean removeObsoleteMember(String memberId) { + if(this.obsoletedMembers.remove(memberId) == null) { + return false; + } + return true; + } + + public long getPendingMemberExpiryTime() { + return pendingMemberExpiryTime; + } + + public void setPendingMemberExpiryTime(long pendingMemberExpiryTime) { + this.pendingMemberExpiryTime = pendingMemberExpiryTime; + } + + public Map<String, MemberContext> getObsoletedMembers() { + return obsoletedMembers; + } + + public void setObsoletedMembers(Map<String, MemberContext> obsoletedMembers) { + this.obsoletedMembers = obsoletedMembers; + } + + public String getNetworkPartitionId() { + return networkPartitionId; + } + + public void setNetworkPartitionId(String networkPartitionId) { + this.networkPartitionId = networkPartitionId; + } + + + public Map<String, MemberStatsContext> getMemberStatsContexts() { + return memberStatsContexts; + } + + public MemberStatsContext getMemberStatsContext(String memberId) { + return memberStatsContexts.get(memberId); + } + + public void addMemberStatsContext(MemberStatsContext ctxt) { + this.memberStatsContexts.put(ctxt.getMemberId(), ctxt); + } + + public void removeMemberStatsContext(String memberId) { + this.memberStatsContexts.remove(memberId); + } + + public MemberStatsContext getPartitionCtxt(String id) { + return this.memberStatsContexts.get(id); + } + +// public boolean memberExist(String memberId) { +// return memberStatsContexts.containsKey(memberId); +// } + + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public List<MemberContext> getTerminationPendingMembers() { + return terminationPendingMembers; + } + + public void setTerminationPendingMembers(List<MemberContext> terminationPendingMembers) { + this.terminationPendingMembers = terminationPendingMembers; + } + + public int getTotalMemberCount() { + + return activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size(); + } + + public int getNonTerminatedMemberCount() { + return activeMembers.size() + pendingMembers.size(); + } + + public List<MemberContext> getActiveMembers() { + return activeMembers; + } + + public boolean removeActiveMemberById(String memberId) { + boolean removeActiveMember = false; + synchronized (activeMembers) { + Iterator<MemberContext> iterator = activeMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext memberContext = iterator.next(); + if(memberId.equals(memberContext.getMemberId())){ + iterator.remove(); + removeActiveMember = true; + + break; + } + } + } + return removeActiveMember; + } + + public boolean activeMemberExist(String memberId) { + + for (MemberContext memberContext: activeMembers) { + if(memberId.equals(memberContext.getMemberId())){ + return true; + } + } + return false; + } + + public int getAllMemberForTerminationCount () { + int count = activeMembers.size() + pendingMembers.size() + terminationPendingMembers.size(); + if (log.isDebugEnabled()) { + log.debug("PartitionContext:getAllMemberForTerminationCount:size:" + count); + } + return count; + } + + // Map<String, MemberStatsContext> getMemberStatsContexts().keySet() + public Set<String> getAllMemberForTermination () { + + List<MemberContext> merged = new ArrayList<MemberContext>(); + + + merged.addAll(activeMembers); + merged.addAll(pendingMembers); + merged.addAll(terminationPendingMembers); + + Set<String> results = new HashSet<String>(merged.size()); + + for (MemberContext ctx: merged) { + results.add(ctx.getMemberId()); + } + + + if (log.isDebugEnabled()) { + log.debug("PartitionContext:getAllMemberForTermination:size:" + results.size()); + } + + //MemberContext x = new MemberContext(); + //x.getMemberId() + + return results; + } + + public void movePendingTerminationMemberToObsoleteMembers(String memberId) { + + log.info("Starting the moving of termination pending to obsolete for [member] " + memberId); + if (memberId == null) { + return; + } + Iterator<MemberContext> iterator = terminationPendingMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext terminationPendingMember = iterator.next(); + if (terminationPendingMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(terminationPendingMember.getMemberId())) { + + log.info("Found termination pending member and trying to move [member] " + memberId + " to obsolete list"); + // member is pending termination + // remove from pending termination list + iterator.remove(); + // add to the obsolete list + this.obsoletedMembers.put(memberId, terminationPendingMember); + + terminationPendingStartedTime.put(memberId, System.currentTimeMillis()); + + if (log.isDebugEnabled()) { + log.debug(String.format("Termination pending member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + break; + } + } + } + + public MemberContext getPendingTerminationMember(String memberId) { + for (MemberContext memberContext : terminationPendingMembers) { + if (memberId.equals(memberContext.getMemberId())) { + return memberContext; + } + } + return null; + } + + public long getTerminationPendingMemberExpiryTime() { + return terminationPendingMemberExpiryTime; + } + + public void movePendingMemberToObsoleteMembers(String memberId) { + if (memberId == null) { + return; + } + Iterator<MemberContext> iterator = pendingMembers.listIterator(); + while (iterator.hasNext()) { + MemberContext pendingMember = iterator.next(); + if (pendingMember == null) { + iterator.remove(); + continue; + } + if (memberId.equals(pendingMember.getMemberId())) { + + // remove from pending list + iterator.remove(); + // add to the obsolete list + this.obsoletedMembers.put(memberId, pendingMember); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member is removed and added to the " + + "obsolete member list. [Member Id] %s", memberId)); + } + break; + } + } + + } + + @Override + public int getCurrentElementCount() { + //TODO find and return correct member instance count + return 0; + } + + private class PendingMemberWatcher implements Runnable { + private ClusterLevelPartitionContext ctxt; + + public PendingMemberWatcher(ClusterLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + + while (true) { + long expiryTime = ctxt.getPendingMemberExpiryTime(); + List<MemberContext> pendingMembers = ctxt.getPendingMembers(); + + synchronized (pendingMembers) { + Iterator<MemberContext> iterator = pendingMembers.listIterator(); + while ( iterator.hasNext()) { + MemberContext pendingMember = iterator.next(); + + if (pendingMember == null) { + continue; + } + long pendingTime = System.currentTimeMillis() - pendingMember.getInitTime(); + if (pendingTime >= expiryTime) { + + + iterator.remove(); + log.info("Pending state of member: " + pendingMember.getMemberId() + + " is expired. " + "Adding as an obsoleted member."); + // member should be terminated + ctxt.addObsoleteMember(pendingMember); + pendingMembersFailureCount++; + if( pendingMembersFailureCount > PENDING_MEMBER_FAILURE_THRESHOLD){ + setPendingMemberExpiryTime(expiryTime * 2);//Doubles the expiry time after the threshold of failure exceeded + //TODO Implement an alerting system: STRATOS-369 + } + } + } + } + + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + + } + + private class ObsoletedMemberWatcher implements Runnable { + private ClusterLevelPartitionContext ctxt; + + public ObsoletedMemberWatcher(ClusterLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + while (true) { + long obsoltedMemberExpiryTime = ctxt.getObsoltedMemberExpiryTime(); + Map<String, MemberContext> obsoletedMembers = ctxt.getObsoletedMembers(); + + Iterator<Entry<String, MemberContext>> iterator = obsoletedMembers.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, MemberContext> pairs = iterator.next(); + MemberContext obsoleteMember = (MemberContext) pairs.getValue(); + if (obsoleteMember == null) { + continue; + } + long obsoleteTime = System.currentTimeMillis() - obsoleteMember.getInitTime(); + if (obsoleteTime >= obsoltedMemberExpiryTime) { + iterator.remove(); + } + } + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + } + + /** + * This thread is responsible for moving member to obsolete list if pending termination timeout happens + */ + private class TerminationPendingMemberWatcher implements Runnable { + private ClusterLevelPartitionContext ctxt; + + public TerminationPendingMemberWatcher(ClusterLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + + while (true) { + long terminationPendingMemberExpiryTime = ctxt.getTerminationPendingMemberExpiryTime(); + + Iterator<MemberContext> iterator = ctxt.getTerminationPendingMembers().listIterator(); + while (iterator.hasNext()) { + + MemberContext terminationPendingMember = iterator.next(); + if (terminationPendingMember == null){ + continue; + } + long terminationPendingTime = System.currentTimeMillis() + - ctxt.getTerminationPendingStartedTimeOfMember(terminationPendingMember.getMemberId()); + if (terminationPendingTime >= terminationPendingMemberExpiryTime) { + log.info("Moving [member] " + terminationPendingMember.getMemberId() + partitionId); + iterator.remove(); + obsoletedMembers.put(terminationPendingMember.getMemberId(), terminationPendingMember); + } + } + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) {} + } + } + } +}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c20d28c2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/GroupLevelPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/GroupLevelPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/GroupLevelPartitionContext.java new file mode 100644 index 0000000..c403295 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/GroupLevelPartitionContext.java @@ -0,0 +1,739 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.context.partition; + +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.context.member.MemberStatsContext; +import org.apache.stratos.autoscaler.util.ConfUtil; +import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; +import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.messaging.domain.instance.Instance; + +import java.io.Serializable; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This is an object that inserted to the rules engine. + * Holds information about a partition. + */ + +public class GroupLevelPartitionContext implements Serializable { + + private static final long serialVersionUID = -2920388667345980487L; + private static final Log log = LogFactory.getLog(GroupLevelPartitionContext.class); + private final int PENDING_MEMBER_FAILURE_THRESHOLD = 5; + private String partitionId; + private String serviceName; + private String networkPartitionId; + private Partition partition; + private int minimumInstanceCount = 0; + private int pendingInstancesFailureCount = 0; + // properties + private Properties properties; + + // 15 mints as the default + private long pendingInstanceExpiryTime = 900000; + // pending instances + private List<Instance> pendingInstances; + + // 1 day as default + private long obsoltedInstanceExpiryTime = 1 * 24 * 60 * 60 * 1000; + + // 30 mints as default + private long terminationPendingInstanceExpiryTime = 1800000; + + // instances to be terminated + private Map<String, Instance> obsoletedInstances; + + // active instances + private List<Instance> activeInstances; + + // termination pending instances, instance is added to this when Autoscaler send grace fully shut down event + private List<Instance> terminationPendingInstances; + + //instance id: time that instance is moved to termination pending status + private Map<String, Long> terminationPendingStartedTime; + + //Keep statistics come from CEP + private Map<String, MemberStatsContext> instanceStatsContexts; + + //group instances kept inside a partition + private Map<String, Instance> instanceIdToInstanceContextMap; + + // for the use of tests + public GroupLevelPartitionContext(long instanceExpiryTime) { + + this.activeInstances = new ArrayList<Instance>(); + this.terminationPendingInstances = new ArrayList<Instance>(); + pendingInstanceExpiryTime = instanceExpiryTime; + } + + public GroupLevelPartitionContext(Partition partition) { + this.setPartition(partition); + this.minimumInstanceCount = partition.getPartitionMin(); + this.partitionId = partition.getId(); + this.pendingInstances = new ArrayList<Instance>(); + this.activeInstances = new ArrayList<Instance>(); + this.terminationPendingInstances = new ArrayList<Instance>(); + this.obsoletedInstances = new ConcurrentHashMap<String, Instance>(); + instanceStatsContexts = new ConcurrentHashMap<String, MemberStatsContext>(); + instanceIdToInstanceContextMap = new HashMap<String, Instance>(); + + + terminationPendingStartedTime = new HashMap<String, Long>(); + // check if a different value has been set for expiryTime + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + pendingInstanceExpiryTime = conf.getLong(StratosConstants.PENDING_VM_MEMBER_EXPIRY_TIMEOUT, 900000); + obsoltedInstanceExpiryTime = conf.getLong(StratosConstants.OBSOLETED_VM_MEMBER_EXPIRY_TIMEOUT, 86400000); + if (log.isDebugEnabled()) { + log.debug("Instance expiry time is set to: " + pendingInstanceExpiryTime); + log.debug("Instance obsoleted expiry time is set to: " + obsoltedInstanceExpiryTime); + } + + /*FIXME Thread th = new Thread(new PendingInstanceWatcher(this)); + th.start(); + Thread th2 = new Thread(new ObsoletedInstanceWatcher(this)); + th2.start(); + Thread th3 = new Thread(new TerminationPendingInstanceWatcher(this)); + th3.start();*/ + } + + public long getTerminationPendingStartedTimeOfInstance(String instanceId) { + return terminationPendingStartedTime.get(instanceId); + } + + public Map<String, Instance> getInstanceIdToInstanceContextMap() { + return instanceIdToInstanceContextMap; + } + + public void setInstanceIdToInstanceContextMap(Map<String, Instance> instanceIdToInstanceContextMap) { + this.instanceIdToInstanceContextMap = instanceIdToInstanceContextMap; + } + + public void addInstanceContext(Instance context) { + this.instanceIdToInstanceContextMap.put(context.getInstanceId(), context); + + } + + public List<Instance> getPendingInstances() { + return pendingInstances; + } + + public void setPendingInstances(List<Instance> pendingInstances) { + this.pendingInstances = pendingInstances; + } + + public int getActiveInstanceCount() { + return activeInstances.size(); + } + + public String getPartitionId() { + return partitionId; + } + + public void setPartitionId(String partitionId) { + this.partitionId = partitionId; + } + + public int getMinimumInstanceCount() { + return minimumInstanceCount; + } + + public void setMinimumInstanceCount(int minimumInstanceCount) { + this.minimumInstanceCount = minimumInstanceCount; + } + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } + + public void addPendingInstance(Instance ctxt) { + this.pendingInstances.add(ctxt); + } + + public boolean removePendingInstance(String id) { + if (id == null) { + return false; + } + synchronized (pendingInstances) { + for (Iterator<Instance> iterator = pendingInstances.iterator(); iterator.hasNext(); ) { + Instance pendingInstance = (Instance) iterator.next(); + if (id.equals(pendingInstance.getInstanceId())) { + iterator.remove(); + return true; + } + + } + } + + return false; + } + + public void movePendingInstanceToActiveInstances(String instanceId) { + if (instanceId == null) { + return; + } + synchronized (pendingInstances) { + Iterator<Instance> iterator = pendingInstances.listIterator(); + while (iterator.hasNext()) { + Instance pendingInstance = iterator.next(); + if (pendingInstance == null) { + iterator.remove(); + continue; + } + if (instanceId.equals(pendingInstance.getInstanceId())) { + // instance is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.activeInstances.add(pendingInstance); + pendingInstancesFailureCount = 0; + if (log.isDebugEnabled()) { + log.debug(String.format("Instance is removed and added to the " + + "activated Instance list. [Instance Id] %s", instanceId)); + } + break; + } + } + } + } + + public boolean activeInstanceAvailable(String instanceId) { + for (Instance activeInstance : activeInstances) { + if (instanceId.equals(activeInstance.getInstanceId())) { + return true; + } + } + return false; + } + + public boolean pendingInstanceAvailable(String instanceId) { + + for (Instance pendingInstance : pendingInstances) { + if (instanceId.equals(pendingInstance.getInstanceId())) { + return true; + } + } + return false; + } + + public void moveActiveInstanceToTerminationPendingInstances(String instanceId) { + if (instanceId == null) { + return; + } + synchronized (activeInstances) { + Iterator<Instance> iterator = activeInstances.listIterator(); + while (iterator.hasNext()) { + Instance activeInstance = iterator.next(); + if (activeInstance == null) { + iterator.remove(); + continue; + } + if (instanceId.equals(activeInstance.getInstanceId())) { + // instance is activated + // remove from pending list + iterator.remove(); + // add to the activated list + this.terminationPendingInstances.add(activeInstance); + if (log.isDebugEnabled()) { + log.debug(String.format("Active instance is removed and added to the " + + "termination pending instance list. [Instance Id] %s", instanceId)); + } + break; + } + } + } + } + + /** + * Removes the {@link org.apache.stratos.messaging.domain.instance.Instance} object mapping + * to the specified instance id from the specified InstanceContext collection + * + * @param iterator The {@link java.util.Iterator} for the collection containing + * {@link org.apache.stratos.messaging.domain.instance.Instance} + * objects + * @param instanceId Instance Id {@link String} for the + * {@link org.apache.stratos.messaging.domain.instance.Instance} + * to be removed + * @return {@link org.apache.stratos.messaging.domain.instance.Instance} object if + * object found and removed, null if otherwise. + */ + private Instance removeInstanceFrom(Iterator<Instance> iterator, String instanceId) { + while (iterator.hasNext()) { + Instance activeInstance = iterator.next(); + if (activeInstance == null) { + iterator.remove(); + continue; + } + if (instanceId.equals(activeInstance.getInstanceId())) { + iterator.remove(); + return activeInstance; + } + } + + return null; + } + + /** + * Check the instance lists for the provided instance ID and move the instance to the obsolete list + * + * @param ctxt The instance ID of the instance to search + *//* + public void moveInstanceToObsoleteList(String instanceId) { + if (instanceId == null) { + return; + } + + // check active instance list + Iterator<InstanceContext> activeInstanceIterator = activeInstances.listIterator(); + InstanceContext removedInstance = this.removeInstanceFrom(activeInstanceIterator, instanceId); + if (removedInstance != null) { + this.addObsoleteInstance(removedInstance); + removedInstance.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Active instance is removed and added to the " + + "obsolete instance list. [Instance Id] %s", instanceId)); + } + + return; + } + + // check pending instance list + Iterator<InstanceContext> pendingInstanceIterator = pendingInstances.listIterator(); + removedInstance = this.removeInstanceFrom(pendingInstanceIterator, instanceId); + if (removedInstance != null) { + this.addObsoleteInstance(removedInstance); + removedInstance.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending instance is removed and added to the " + + "obsolete instance list. [Instance Id] %s", instanceId)); + } + + return; + } + + // check termination pending instance list + Iterator<InstanceContext> terminationPendingInstancesIterator = terminationPendingInstances.listIterator(); + removedInstance = this.removeInstanceFrom(terminationPendingInstancesIterator, instanceId); + if (removedInstance != null) { + this.addObsoleteInstance(removedInstance); + removedInstance.setObsoleteInitTime(System.currentTimeMillis()); + if (log.isDebugEnabled()) { + log.debug(String.format("Termination Pending instance is removed and added to the " + + "obsolete instance list. [Instance Id] %s", instanceId)); + } + } + } +*/ + public void addActiveInstance(Instance ctxt) { + this.activeInstances.add(ctxt); + } + + public void removeActiveInstance(Instance ctxt) { + this.activeInstances.remove(ctxt); + } + + public boolean removeTerminationPendingInstance(String instanceId) { + boolean terminationPendingInstanceAvailable = false; + synchronized (terminationPendingInstances) { + for (Instance instanceContext : terminationPendingInstances) { + if (instanceContext.getInstanceId().equals(instanceId)) { + terminationPendingInstanceAvailable = true; + terminationPendingInstances.remove(instanceContext); + break; + } + } + } + return terminationPendingInstanceAvailable; + } + + public long getObsoltedInstanceExpiryTime() { + return obsoltedInstanceExpiryTime; + } + + public void setObsoltedInstanceExpiryTime(long obsoltedInstanceExpiryTime) { + this.obsoltedInstanceExpiryTime = obsoltedInstanceExpiryTime; + } + + public void addObsoleteInstance(Instance ctxt) { + this.obsoletedInstances.put(ctxt.getInstanceId(), ctxt); + } + + public boolean removeObsoleteInstance(String instanceId) { + if (this.obsoletedInstances.remove(instanceId) == null) { + return false; + } + return true; + } + + public long getPendingInstanceExpiryTime() { + return pendingInstanceExpiryTime; + } + + public void setPendingInstanceExpiryTime(long pendingInstanceExpiryTime) { + this.pendingInstanceExpiryTime = pendingInstanceExpiryTime; + } + + public Map<String, Instance> getObsoletedInstances() { + return obsoletedInstances; + } + + public void setObsoletedInstances(Map<String, Instance> obsoletedInstances) { + this.obsoletedInstances = obsoletedInstances; + } + + public String getNetworkPartitionId() { + return networkPartitionId; + } + + public void setNetworkPartitionId(String networkPartitionId) { + this.networkPartitionId = networkPartitionId; + } + + public Map<String, MemberStatsContext> getInstanceStatsContexts() { + return instanceStatsContexts; + } + + public MemberStatsContext getInstanceStatsContext(String instanceId) { + return instanceStatsContexts.get(instanceId); + } + + public void addInstanceStatsContext(MemberStatsContext ctxt) { + this.instanceStatsContexts.put(ctxt.getInstanceId(), ctxt); + } + + public void removeInstanceStatsContext(String instanceId) { + this.instanceStatsContexts.remove(instanceId); + } + + public MemberStatsContext getPartitionCtxt(String id) { + return this.instanceStatsContexts.get(id); + } + + public Properties getProperties() { + return properties; + } + +// public boolean instanceExist(String instanceId) { +// return instanceStatsContexts.containsKey(instanceId); +// } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public List<Instance> getTerminationPendingInstances() { + return terminationPendingInstances; + } + + public void setTerminationPendingInstances(List<Instance> terminationPendingInstances) { + this.terminationPendingInstances = terminationPendingInstances; + } + + public int getTotalInstanceCount() { + + return activeInstances.size() + pendingInstances.size() + terminationPendingInstances.size(); + } + + public int getNonTerminatedInstanceCount() { + return activeInstances.size() + pendingInstances.size(); + } + + public List<Instance> getActiveInstances() { + return activeInstances; + } + + public void setActiveInstances(List<Instance> activeInstances) { + this.activeInstances = activeInstances; + } + + public boolean removeActiveInstanceById(String instanceId) { + boolean removeActiveInstance = false; + synchronized (activeInstances) { + Iterator<Instance> iterator = activeInstances.listIterator(); + while (iterator.hasNext()) { + Instance instanceContext = iterator.next(); + if (instanceId.equals(instanceContext.getInstanceId())) { + iterator.remove(); + removeActiveInstance = true; + + break; + } + } + } + return removeActiveInstance; + } + + public boolean activeInstanceExist(String instanceId) { + + for (Instance instanceContext : activeInstances) { + if (instanceId.equals(instanceContext.getInstanceId())) { + return true; + } + } + return false; + } + + public int getAllInstanceForTerminationCount() { + int count = activeInstances.size() + pendingInstances.size() + terminationPendingInstances.size(); + if (log.isDebugEnabled()) { + log.debug("PartitionContext:getAllInstanceForTerminationCount:size:" + count); + } + return count; + } + + // Map<String, InstanceStatsContext> getInstanceStatsContexts().keySet() + public Set<String> getAllInstanceForTermination() { + + List<Instance> merged = new ArrayList<Instance>(); + + + merged.addAll(activeInstances); + merged.addAll(pendingInstances); + merged.addAll(terminationPendingInstances); + + Set<String> results = new HashSet<String>(merged.size()); + + for (Instance ctx : merged) { + results.add(ctx.getInstanceId()); + } + + + if (log.isDebugEnabled()) { + log.debug("PartitionContext:getAllInstanceForTermination:size:" + results.size()); + } + + //InstanceContext x = new InstanceContext(); + //x.getInstanceId() + + return results; + } + + public void movePendingTerminationInstanceToObsoleteInstances(String instanceId) { + + log.info("Starting the moving of termination pending to obsolete for [instance] " + instanceId); + if (instanceId == null) { + return; + } + Iterator<Instance> iterator = terminationPendingInstances.listIterator(); + while (iterator.hasNext()) { + Instance terminationPendingInstance = iterator.next(); + if (terminationPendingInstance == null) { + iterator.remove(); + continue; + } + if (instanceId.equals(terminationPendingInstance.getInstanceId())) { + + log.info("Found termination pending instance and trying to move [instance] " + instanceId + " to obsolete list"); + // instance is pending termination + // remove from pending termination list + iterator.remove(); + // add to the obsolete list + this.obsoletedInstances.put(instanceId, terminationPendingInstance); + + terminationPendingStartedTime.put(instanceId, System.currentTimeMillis()); + + if (log.isDebugEnabled()) { + log.debug(String.format("Termination pending instance is removed and added to the " + + "obsolete instance list. [Instance Id] %s", instanceId)); + } + break; + } + } + } + + public Instance getPendingTerminationInstance(String instanceId) { + for (Instance instanceContext : terminationPendingInstances) { + if (instanceId.equals(instanceContext.getInstanceId())) { + return instanceContext; + } + } + return null; + } + + public long getTerminationPendingInstanceExpiryTime() { + return terminationPendingInstanceExpiryTime; + } + + public void movePendingInstanceToObsoleteInstances(String instanceId) { + if (instanceId == null) { + return; + } + Iterator<Instance> iterator = pendingInstances.listIterator(); + while (iterator.hasNext()) { + Instance pendingInstance = iterator.next(); + if (pendingInstance == null) { + iterator.remove(); + continue; + } + if (instanceId.equals(pendingInstance.getInstanceId())) { + + // remove from pending list + iterator.remove(); + // add to the obsolete list + this.obsoletedInstances.put(instanceId, pendingInstance); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending instance is removed and added to the " + + "obsolete instance list. [Instance Id] %s", instanceId)); + } + break; + } + } + + } + + /*private class PendingInstanceWatcher implements Runnable { + private ParentComponentLevelPartitionContext ctxt; + + public PendingInstanceWatcher(ParentComponentLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + + while (true) { + long expiryTime = ctxt.getPendingInstanceExpiryTime(); + List<InstanceContext> pendingInstances = ctxt.getPendingInstances(); + + synchronized (pendingInstances) { + Iterator<InstanceContext> iterator = pendingInstances.listIterator(); + while ( iterator.hasNext()) { + InstanceContext pendingInstance = iterator.next(); + + if (pendingInstance == null) { + continue; + } + long pendingTime = System.currentTimeMillis() - pendingInstance.getInitTime(); + if (pendingTime >= expiryTime) { + + + iterator.remove(); + log.info("Pending state of instance: " + pendingInstance.getInstanceId() + + " is expired. " + "Adding as an obsoleted instance."); + // instance should be terminated + ctxt.addObsoleteInstance(pendingInstance); + pendingInstancesFailureCount++; + if( pendingInstancesFailureCount > PENDING_MEMBER_FAILURE_THRESHOLD){ + setPendingInstanceExpiryTime(expiryTime * 2);//Doubles the expiry time after the threshold of failure exceeded + //TODO Implement an alerting system: STRATOS-369 + } + } + } + } + + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + + } +*/ + /*private class ObsoletedInstanceWatcher implements Runnable { + private ParentComponentLevelPartitionContext ctxt; + + public ObsoletedInstanceWatcher(ParentComponentLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + while (true) { + long obsoltedInstanceExpiryTime = ctxt.getObsoltedInstanceExpiryTime(); + Map<String, InstanceContext> obsoletedInstances = ctxt.getObsoletedInstances(); + + Iterator<Entry<String, InstanceContext>> iterator = obsoletedInstances.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, InstanceContext> pairs = iterator.next(); + InstanceContext obsoleteInstance = (InstanceContext) pairs.getValue(); + if (obsoleteInstance == null) { + continue; + } + long obsoleteTime = System.currentTimeMillis() - obsoleteInstance.getInitTime(); + if (obsoleteTime >= obsoltedInstanceExpiryTime) { + iterator.remove(); + } + } + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + }*/ + + /** + * This thread is responsible for moving instance to obsolete list if pending termination timeout happens + */ + private class TerminationPendingInstanceWatcher implements Runnable { + private GroupLevelPartitionContext ctxt; + + public TerminationPendingInstanceWatcher(GroupLevelPartitionContext ctxt) { + this.ctxt = ctxt; + } + + @Override + public void run() { + + while (true) { + long terminationPendingInstanceExpiryTime = ctxt.getTerminationPendingInstanceExpiryTime(); + + Iterator<Instance> iterator = ctxt.getTerminationPendingInstances().listIterator(); + while (iterator.hasNext()) { + + Instance terminationPendingInstance = iterator.next(); + if (terminationPendingInstance == null) { + continue; + } + long terminationPendingTime = System.currentTimeMillis() + - ctxt.getTerminationPendingStartedTimeOfInstance(terminationPendingInstance.getInstanceId()); + if (terminationPendingTime >= terminationPendingInstanceExpiryTime) { + log.info("Moving [instance] " + terminationPendingInstance.getInstanceId() + partitionId); + iterator.remove(); + obsoletedInstances.put(terminationPendingInstance.getInstanceId(), terminationPendingInstance); + } + } + try { + // TODO find a constant + Thread.sleep(15000); + } catch (InterruptedException ignore) { + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/c20d28c2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/PartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/PartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/PartitionContext.java new file mode 100644 index 0000000..7247e40 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/PartitionContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.context.partition; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; + +import java.io.Serializable; +import java.util.*; +/** + * This is an object that inserted to the rules engine. + * Holds information about a partition. + * + * + */ + +public class PartitionContext implements Serializable{ + + private static final long serialVersionUID = -2920388667345980487L; + private static final Log log = LogFactory.getLog(ClusterLevelPartitionContext.class); + protected String partitionId; + private Partition partition; + + // properties + private Properties properties; + + // for the use of tests + public PartitionContext(long memberExpiryTime) { + + } + + public PartitionContext(Partition partition) { + + this.partition = partition; + } + + public Partition getPartition() { + return partition; + } + + public void setPartition(Partition partition) { + this.partition = partition; + } + + public String getPartitionId() { + return partitionId; + } + public void setPartitionId(String partitionId) { + this.partitionId = partitionId; + } + + public int getCurrentElementCount() { + //TODO find and return correct member instance count + return 0; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/c20d28c2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ApplicationLevelNetworkPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ApplicationLevelNetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ApplicationLevelNetworkPartitionContext.java new file mode 100644 index 0000000..a65b764 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ApplicationLevelNetworkPartitionContext.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.context.partition.network; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.instance.ApplicationInstance; + +import java.io.Serializable; +import java.util.Map; + +/** + * Holds runtime data of a network partition. + * + */ +public class ApplicationLevelNetworkPartitionContext extends NetworkPartitionContext implements Serializable { + private static final Log log = LogFactory.getLog(ApplicationLevelNetworkPartitionContext.class); + private final String id; + + //group instances kept inside a partition + private Map<String, ApplicationInstance> instanceIdToInstanceContextMap; + + public ApplicationLevelNetworkPartitionContext(String id) { + this.id = id; + } + + public Map<String, ApplicationInstance> getInstanceIdToInstanceContextMap() { + return instanceIdToInstanceContextMap; + } + + public void setInstanceIdToInstanceContextMap(Map<String, ApplicationInstance> instanceIdToInstanceContextMap) { + this.instanceIdToInstanceContextMap = instanceIdToInstanceContextMap; + } + + public void addInstanceContext(ApplicationInstance context) { + this.instanceIdToInstanceContextMap.put(context.getInstanceId(), context); + + } + + + public int hashCode() { + + final int prime = 31; + int result = 1; + result = prime * result + ((this.id == null) ? 0 : this.id.hashCode()); + return result; + + } + + public boolean equals(final Object obj) { + + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ApplicationLevelNetworkPartitionContext)) { + return false; + } + final ApplicationLevelNetworkPartitionContext other = (ApplicationLevelNetworkPartitionContext) obj; + if (this.id == null) { + if (other.id != null) { + return false; + } + } else if (!this.id.equals(other.id)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "ApplicationNetworkPartitionContext [id=" + id + "]"; + } + + public String getId() { + return id; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/c20d28c2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java new file mode 100644 index 0000000..34c62e8 --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/ClusterLevelNetworkPartitionContext.java @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.context.partition.network; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext; +import org.apache.stratos.autoscaler.policy.model.LoadAverage; +import org.apache.stratos.autoscaler.policy.model.MemoryConsumption; +import org.apache.stratos.autoscaler.policy.model.RequestsInFlight; +import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; +import org.apache.stratos.messaging.domain.instance.Instance; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Holds runtime data of a network partition. + * + */ +public class ClusterLevelNetworkPartitionContext extends NetworkPartitionContext implements Serializable { + + private static final Log log = LogFactory.getLog(ClusterLevelNetworkPartitionContext.class); + private static final long serialVersionUID = 572769304374110159L; + private final String id; + private int scaleDownRequestsCount = 0; + private float averageRequestsServedPerInstance; + private float requestsServedPerInstance; + + private int minInstanceCount = 0, maxInstanceCount = 0; + private int requiredInstanceCountBasedOnStats; + private int requiredInstanceCountBasedOnDependencies; + + private Map<String, Instance> instanceIdToInstanceContextMap; + + + private final String partitionAlgorithm; + + //boolean values to keep whether the requests in flight parameters are reset or not + private boolean rifReset = false, averageRifReset = false, gradientRifReset = false, secondDerivativeRifRest = false; + //boolean values to keep whether the memory consumption parameters are reset or not + private boolean memoryConsumptionReset = false, averageMemoryConsumptionReset = false, + gradientMemoryConsumptionReset = false, secondDerivativeMemoryConsumptionRest = false; + //boolean values to keep whether the load average parameters are reset or not + private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, + secondDerivativeLoadAverageRest = false; + //boolean values to keep whether average requests served per instance parameters are reset or not + private boolean averageRequestServedPerInstanceReset = false; + + private final Partition[] partitions; + + //Following information will keep events details + private RequestsInFlight requestsInFlight; + private MemoryConsumption memoryConsumption; + private LoadAverage loadAverage; + + //details required for partition selection algorithms + private int currentPartitionIndex; + + //partitions of this network partition + private final Map<String, ClusterLevelPartitionContext> partitionCtxts; + + public ClusterLevelNetworkPartitionContext(String id, String partitionAlgo, Partition[] partitions) { + + //super(id, partitionAlgo, partitions); + this.id = id; + this.partitionAlgorithm = partitionAlgo; + if (partitions == null) { + this.partitions = new Partition[0]; + } else { + this.partitions = Arrays.copyOf(partitions, partitions.length); + } + partitionCtxts = new HashMap<String, ClusterLevelPartitionContext>(); + requestsInFlight = new RequestsInFlight(); + loadAverage = new LoadAverage(); + memoryConsumption = new MemoryConsumption(); + for (Partition partition : partitions) { + minInstanceCount += partition.getPartitionMin(); + maxInstanceCount += partition.getPartitionMax(); + } + requiredInstanceCountBasedOnStats = minInstanceCount; + requiredInstanceCountBasedOnDependencies = minInstanceCount; + instanceIdToInstanceContextMap = new HashMap<String, Instance>(); + + } + + public int getMinInstanceCount() { + return minInstanceCount; + } + + public void setMinInstanceCount(int minInstanceCount) { + this.minInstanceCount = minInstanceCount; + } + + public int getMaxInstanceCount() { + return maxInstanceCount; + } + + public void setMaxInstanceCount(int maxInstanceCount) { + this.maxInstanceCount = maxInstanceCount; + } + + public int hashCode() { + + final int prime = 31; + int result = 1; + result = prime * result + ((this.id == null) ? 0 : this.id.hashCode()); + return result; + + } + + public boolean equals(final Object obj) { + + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof ClusterLevelNetworkPartitionContext)) { + return false; + } + final ClusterLevelNetworkPartitionContext other = (ClusterLevelNetworkPartitionContext) obj; + if (this.id == null) { + if (other.id != null) { + return false; + } + } else if (!this.id.equals(other.id)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "NetworkPartitionContext [id=" + id + "partitionAlgorithm=" + partitionAlgorithm + ", minInstanceCount=" + + minInstanceCount + ", maxInstanceCount=" + maxInstanceCount + "]"; + } + + public int getCurrentPartitionIndex() { + return currentPartitionIndex; + } + + public void setCurrentPartitionIndex(int currentPartitionIndex) { + this.currentPartitionIndex = currentPartitionIndex; + } + + public float getAverageRequestsServedPerInstance() { + return averageRequestsServedPerInstance; + } + + public void setAverageRequestsServedPerInstance(float averageRequestServedPerInstance) { + this.averageRequestsServedPerInstance = averageRequestServedPerInstance; + averageRequestServedPerInstanceReset = true; + + if (log.isDebugEnabled()) { + log.debug(String.format("Average Requesets Served Per Instance stats are reset, ready to do scale check [network partition] %s" + , this.id)); + + } + } + + public float getRequestsServedPerInstance() { + return requestsServedPerInstance; + } + + public float getAverageRequestsInFlight() { + return requestsInFlight.getAverage(); + } + + public void setAverageRequestsInFlight(float averageRequestsInFlight) { + requestsInFlight.setAverage(averageRequestsInFlight); + averageRifReset = true; + if (secondDerivativeRifRest && gradientRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getRequestsInFlightSecondDerivative() { + return requestsInFlight.getSecondDerivative(); + } + + public void setRequestsInFlightSecondDerivative(float requestsInFlightSecondDerivative) { + requestsInFlight.setSecondDerivative(requestsInFlightSecondDerivative); + secondDerivativeRifRest = true; + if (averageRifReset && gradientRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getRequestsInFlightGradient() { + return requestsInFlight.getGradient(); + } + + public void setRequestsInFlightGradient(float requestsInFlightGradient) { + requestsInFlight.setGradient(requestsInFlightGradient); + gradientRifReset = true; + if (secondDerivativeRifRest && averageRifReset) { + rifReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Requests in flights stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public boolean isRifReset() { + return rifReset; + } + + public void setRifReset(boolean rifReset) { + this.rifReset = rifReset; + this.averageRifReset = rifReset; + this.gradientRifReset = rifReset; + this.secondDerivativeRifRest = rifReset; + } + + + public float getAverageMemoryConsumption() { + return memoryConsumption.getAverage(); + } + + public void setAverageMemoryConsumption(float averageMemoryConsumption) { + memoryConsumption.setAverage(averageMemoryConsumption); + averageMemoryConsumptionReset = true; + if (secondDerivativeMemoryConsumptionRest && gradientMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getMemoryConsumptionSecondDerivative() { + return memoryConsumption.getSecondDerivative(); + } + + public void setMemoryConsumptionSecondDerivative(float memoryConsumptionSecondDerivative) { + memoryConsumption.setSecondDerivative(memoryConsumptionSecondDerivative); + secondDerivativeMemoryConsumptionRest = true; + if (averageMemoryConsumptionReset && gradientMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getMemoryConsumptionGradient() { + return memoryConsumption.getGradient(); + } + + public void setMemoryConsumptionGradient(float memoryConsumptionGradient) { + memoryConsumption.setGradient(memoryConsumptionGradient); + gradientMemoryConsumptionReset = true; + if (secondDerivativeMemoryConsumptionRest && averageMemoryConsumptionReset) { + memoryConsumptionReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Memory consumption stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public boolean isMemoryConsumptionReset() { + return memoryConsumptionReset; + } + + public void setMemoryConsumptionReset(boolean memoryConsumptionReset) { + this.memoryConsumptionReset = memoryConsumptionReset; + this.averageMemoryConsumptionReset = memoryConsumptionReset; + this.gradientMemoryConsumptionReset = memoryConsumptionReset; + this.secondDerivativeMemoryConsumptionRest = memoryConsumptionReset; + } + + + public float getAverageLoadAverage() { + return loadAverage.getAverage(); + } + + public void setAverageLoadAverage(float averageLoadAverage) { + loadAverage.setAverage(averageLoadAverage); + averageLoadAverageReset = true; + if (secondDerivativeLoadAverageRest && gradientLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getLoadAverageSecondDerivative() { + return loadAverage.getSecondDerivative(); + } + + public void setLoadAverageSecondDerivative(float loadAverageSecondDerivative) { + loadAverage.setSecondDerivative(loadAverageSecondDerivative); + secondDerivativeLoadAverageRest = true; + if (averageLoadAverageReset && gradientLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public float getLoadAverageGradient() { + return loadAverage.getGradient(); + } + + public void setLoadAverageGradient(float loadAverageGradient) { + loadAverage.setGradient(loadAverageGradient); + gradientLoadAverageReset = true; + if (secondDerivativeLoadAverageRest && averageLoadAverageReset) { + loadAverageReset = true; + if (log.isDebugEnabled()) { + log.debug(String.format("Load average stats are reset, ready to do scale check [network partition] %s" + , this.id)); + } + } + } + + public boolean isLoadAverageReset() { + return loadAverageReset; + } + + public void setLoadAverageReset(boolean loadAverageReset) { + this.loadAverageReset = loadAverageReset; + this.averageLoadAverageReset = loadAverageReset; + this.gradientLoadAverageReset = loadAverageReset; + this.secondDerivativeLoadAverageRest = loadAverageReset; + } + + + public String getId() { + return id; + } + + public Map<String, ClusterLevelPartitionContext> getPartitionCtxts() { + return partitionCtxts; + } + + public ClusterLevelPartitionContext getPartitionCtxt(String partitionId) { + return partitionCtxts.get(partitionId); + } + + public void addPartitionContext(ClusterLevelPartitionContext partitionContext) { + partitionCtxts.put(partitionContext.getPartitionId(), partitionContext); + } + + public String getPartitionAlgorithm() { + return partitionAlgorithm; + } + + public Partition[] getPartitions() { + return partitions; + } + + public int getNonTerminatedMemberCountOfPartition(String partitionId) { + if (partitionCtxts.containsKey(partitionId)) { + return getPartitionCtxt(partitionId).getNonTerminatedMemberCount(); + } + return 0; + } + + public int getActiveMemberCount(String currentPartitionId) { + if (partitionCtxts.containsKey(currentPartitionId)) { + return getPartitionCtxt(currentPartitionId).getActiveMemberCount(); + } + return 0; + } + + public int getScaleDownRequestsCount() { + return scaleDownRequestsCount; + } + + public void resetScaleDownRequestsCount() { + this.scaleDownRequestsCount = 0; + } + + public void increaseScaleDownRequestsCount() { + this.scaleDownRequestsCount += 1; + } + + public float getRequiredInstanceCountBasedOnStats() { + return requiredInstanceCountBasedOnStats; + } + + public void setRequiredInstanceCountBasedOnStats(int requiredInstanceCountBasedOnStats) { + this.requiredInstanceCountBasedOnStats = requiredInstanceCountBasedOnStats; + } + + public int getRequiredInstanceCountBasedOnDependencies() { + return requiredInstanceCountBasedOnDependencies; + } + + public void setRequiredInstanceCountBasedOnDependencies(int requiredInstanceCountBasedOnDependencies) { + this.requiredInstanceCountBasedOnDependencies = requiredInstanceCountBasedOnDependencies; + } + + public Map<String, Instance> getInstanceIdToInstanceContextMap() { + return instanceIdToInstanceContextMap; + } + + public void setInstanceIdToInstanceContextMap(Map<String, Instance> instanceIdToInstanceContextMap) { + this.instanceIdToInstanceContextMap = instanceIdToInstanceContextMap; + } + + public void addInstanceContext(Instance context) { + this.instanceIdToInstanceContextMap.put(context.getInstanceId(), context); + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/c20d28c2/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/GroupLevelNetworkPartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/GroupLevelNetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/GroupLevelNetworkPartitionContext.java new file mode 100644 index 0000000..737248a --- /dev/null +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/partition/network/GroupLevelNetworkPartitionContext.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.autoscaler.context.partition.network; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.autoscaler.context.partition.GroupLevelPartitionContext; +import org.apache.stratos.cloud.controller.stub.deployment.partition.Partition; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Holds runtime data of a network partition. + * + */ +public class GroupLevelNetworkPartitionContext extends NetworkPartitionContext implements Serializable { + private static final Log log = LogFactory.getLog(GroupLevelNetworkPartitionContext.class); + private final String id; + private int scaleDownRequestsCount = 0; + private float averageRequestsServedPerInstance; + + private int minInstanceCount = 0, maxInstanceCount = 0; + private int requiredInstanceCountBasedOnStats; + private int requiredInstanceCountBasedOnDependencies; + + private final String partitionAlgorithm; + + private final Partition[] partitions; + + //details required for partition selection algorithms + private int currentPartitionIndex; + + //partitions of this network partition + private final Map<String, GroupLevelPartitionContext> partitionCtxts; + + public GroupLevelNetworkPartitionContext(String id, String partitionAlgo, Partition[] partitions) { + this.id = id; + this.partitionAlgorithm = partitionAlgo; + if (partitions == null) { + this.partitions = new Partition[0]; + } else { + this.partitions = Arrays.copyOf(partitions, partitions.length); + } + partitionCtxts = new HashMap<String, GroupLevelPartitionContext>(); + for (Partition partition : partitions) { + minInstanceCount += partition.getPartitionMin(); + maxInstanceCount += partition.getPartitionMax(); + } + requiredInstanceCountBasedOnStats = minInstanceCount; + requiredInstanceCountBasedOnDependencies = minInstanceCount; + + } + + public int getMinInstanceCount() { + return minInstanceCount; + } + + public void setMinInstanceCount(int minInstanceCount) { + this.minInstanceCount = minInstanceCount; + } + + public int getMaxInstanceCount() { + return maxInstanceCount; + } + + public void setMaxInstanceCount(int maxInstanceCount) { + this.maxInstanceCount = maxInstanceCount; + } + + public int hashCode() { + + final int prime = 31; + int result = 1; + result = prime * result + ((this.id == null) ? 0 : this.id.hashCode()); + return result; + + } + + public boolean equals(final Object obj) { + + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof GroupLevelNetworkPartitionContext)) { + return false; + } + final GroupLevelNetworkPartitionContext other = (GroupLevelNetworkPartitionContext) obj; + if (this.id == null) { + if (other.id != null) { + return false; + } + } else if (!this.id.equals(other.id)) { + return false; + } + return true; + } + + @Override + public String toString() { + return "NetworkPartitionContext [id=" + id + "partitionAlgorithm=" + partitionAlgorithm + ", minInstanceCount=" + + minInstanceCount + ", maxInstanceCount=" + maxInstanceCount + "]"; + } + + public int getCurrentPartitionIndex() { + return currentPartitionIndex; + } + + public void setCurrentPartitionIndex(int currentPartitionIndex) { + this.currentPartitionIndex = currentPartitionIndex; + } + + public String getId() { + return id; + } + + public Map<String, GroupLevelPartitionContext> getPartitionCtxts() { + return partitionCtxts; + } + + public GroupLevelPartitionContext getPartitionCtxt(String partitionId) { + return partitionCtxts.get(partitionId); + } + + public void addPartitionContext(GroupLevelPartitionContext partitionContext) { + partitionCtxts.put(partitionContext.getPartitionId(), partitionContext); + } + + public String getPartitionAlgorithm() { + return partitionAlgorithm; + } + + public Partition[] getPartitions() { + return partitions; + } + + public int getNonTerminatedMemberCountOfPartition(String partitionId) { + if (partitionCtxts.containsKey(partitionId)) { + return getPartitionCtxt(partitionId).getNonTerminatedInstanceCount(); + } + return 0; + } + + public int getActiveMemberCount(String currentPartitionId) { + if (partitionCtxts.containsKey(currentPartitionId)) { + return getPartitionCtxt(currentPartitionId).getActiveInstanceCount(); + } + return 0; + } + + public int getScaleDownRequestsCount() { + return scaleDownRequestsCount; + } + + public void resetScaleDownRequestsCount() { + this.scaleDownRequestsCount = 0; + } + + public void increaseScaleDownRequestsCount() { + this.scaleDownRequestsCount += 1; + } + + public float getRequiredInstanceCountBasedOnStats() { + return requiredInstanceCountBasedOnStats; + } + + public void setRequiredInstanceCountBasedOnStats(int requiredInstanceCountBasedOnStats) { + this.requiredInstanceCountBasedOnStats = requiredInstanceCountBasedOnStats; + } + + public int getRequiredInstanceCountBasedOnDependencies() { + return requiredInstanceCountBasedOnDependencies; + } + + public void setRequiredInstanceCountBasedOnDependencies(int requiredInstanceCountBasedOnDependencies) { + this.requiredInstanceCountBasedOnDependencies = requiredInstanceCountBasedOnDependencies; + } + + + +} \ No newline at end of file
