auto-scaler now reacts to MemberActivated, Terminated, ClusterRemoved Events
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/fcf1dc7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/fcf1dc7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/fcf1dc7f Branch: refs/heads/master Commit: fcf1dc7fa41f1ce6939ad645918076542f694f32 Parents: 3ba57aa Author: Nirmal Fernando <[email protected]> Authored: Tue Dec 3 21:53:37 2013 +0530 Committer: Nirmal Fernando <[email protected]> Committed: Tue Dec 3 21:53:37 2013 +0530 ---------------------------------------------------------------------- .../stratos/autoscaler/ClusterContext.java | 5 +-- .../stratos/autoscaler/ClusterMonitor.java | 9 ++++-- .../stratos/autoscaler/PartitionContext.java | 6 ++++ .../cloud/controller/CloudControllerClient.java | 18 +++++++++++ .../rule/AutoscalerRuleEvaluator.java | 16 +++++---- .../processors/AutoscalerTopologyReceiver.java | 34 ++++++++------------ .../stratos/autoscaler/util/AutoscalerUtil.java | 3 +- .../src/test/resources/autoscaler.drl | 4 ++- 8 files changed, 61 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fcf1dc7f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java index 8f6da21..2e9acf4 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java @@ -267,11 +267,12 @@ public class ClusterContext { this.memberPartitionMap.put(memberId, partitionId); } - public void removeMemberPartition(String memberId){ - this.memberPartitionMap.remove(memberId); + public String removeMemberPartition(String memberId){ + return this.memberPartitionMap.remove(memberId); } public String getPartitonOfMember(String memberId){ return this.memberPartitionMap.get(memberId); } + } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fcf1dc7f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java index ce12904..9e81c84 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java @@ -115,7 +115,9 @@ public class ClusterMonitor implements Runnable{ public void run() { while (!isDestroyed()) { - log.debug("Cluster monitor is running.."); + if (log.isDebugEnabled()) { + log.debug("Cluster monitor is running.."); + } try { minInstanceCountCheck(); } catch (Exception e) { @@ -147,10 +149,13 @@ public class ClusterMonitor implements Runnable{ } } } - + public void destroy() { ksession.dispose(); setDestroyed(true); + if(log.isDebugEnabled()) { + log.debug("Cluster Monitor Drools session has been disposed."); + } } public boolean isDestroyed() { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fcf1dc7f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java index 1740363..3ff6c01 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java @@ -89,9 +89,15 @@ public class PartitionContext { // live count + pending count return currentMemberCount + pendingMembers.size(); } + public void incrementCurrentMemberCount(int count) { + this.currentMemberCount += count; } + + public void decrementCurrentMemberCount(int count) { + this.currentMemberCount -= count; + } public int getMinimumMemberCount() { return minimumMemberCount; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fcf1dc7f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java index 6eed713..1531c88 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java @@ -30,6 +30,7 @@ import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.cloud.controller.deployment.partition.Partition; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceIllegalArgumentExceptionException; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidCartridgeTypeExceptionException; +import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidClusterExceptionException; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidMemberExceptionException; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidPartitionExceptionException; import org.apache.stratos.cloud.controller.stub.CloudControllerServiceStub; @@ -140,6 +141,23 @@ public class CloudControllerClient { } } + public void terminateAllInstances(String clusterId) throws TerminationException { + try { + stub.terminateAllInstances(clusterId); + + } catch (RemoteException e) { + String msg = "Error occurred in cloud controller side while terminating instance"; + log.error(msg, e); + throw new TerminationException(msg, e); + + } catch (CloudControllerServiceInvalidClusterExceptionException e) { + log.error(e.getMessage()); + throw new TerminationException(e); + } catch (CloudControllerServiceIllegalArgumentExceptionException e) { + log.error(e.getMessage()); + throw new TerminationException(e); + } + } public void terminate(String memberId) throws TerminationException { //call CC terminate method http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fcf1dc7f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java index a81906e..7e267b4 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java @@ -122,18 +122,20 @@ public class AutoscalerRuleEvaluator { public static void delegateTerminate(String memberId) { try { -// int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount(); -// log.info("Current member count is " + currentMemberCount ); -// if(currentMemberCount > partition.getPartitionMembersMin()) { -// AutoscalerContext.getInstance().getClusterContext(clusterId).decreaseMemberCount(); - //FIXME -// cloudControllerClient.terminate(partition, clusterId); -// } CloudControllerClient.getInstance().terminate(memberId); } catch (Throwable e) { log.error("Cannot terminate instance", e); } } + + public static void delegateTerminateAll(String clusterId) { + try { + + CloudControllerClient.getInstance().terminateAllInstances(clusterId); + } catch (Throwable e) { + log.error("Cannot terminate instance", e); + } + } // public boolean delegateSpawn(Partition partition, String clusterId, int memberCountToBeIncreased) { // CloudControllerClient cloudControllerClient = new CloudControllerClient(); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fcf1dc7f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java index 6b64514..7a2601b 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.ClusterContext; import org.apache.stratos.autoscaler.ClusterMonitor; +import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.exception.PartitionValidationException; import org.apache.stratos.autoscaler.exception.PolicyValidationException; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; @@ -158,9 +159,13 @@ public class AutoscalerTopologyReceiver implements Runnable { try { TopologyManager.acquireReadLock(); MemberTerminatedEvent e = (MemberTerminatedEvent) event; - ClusterContext clusCtx = AutoscalerContext.getInstance() - .getClusterContext(e.getClusterId()); - clusCtx.removeMemberPartition(e.getMemberId()); + ClusterMonitor monitor = AutoscalerRuleEvaluator.getInstance().getMonitor(e.getClusterId()); + ClusterContext clusCtx = monitor.getClusterCtxt(); + String partitionId = clusCtx.removeMemberPartition(e.getMemberId()); + if (partitionId != null) { + PartitionContext partCtxt = monitor.getPartitionCtxt(partitionId); + partCtxt.decrementCurrentMemberCount(1); + } } finally { TopologyManager.releaseReadLock(); } @@ -171,29 +176,17 @@ public class AutoscalerTopologyReceiver implements Runnable { processorChain.addEventListener(new MemberActivatedEventListener() { @Override protected void onEvent(Event event) { -// try { -// TopologyManager.acquireReadLock(); -// -// // Add cluster to the context when its first member is activated -// MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent)event; -// Cluster cluster = findCluster(memberActivatedEvent.getClusterId()); -// if(cluster == null) { -// if(log.isErrorEnabled()) { -// log.error(String.format("Cluster not found in topology: [cluster] %s", memberActivatedEvent.getClusterId())); -// } -// } -// addClusterToContext(cluster); -// } -// finally { -// TopologyManager.releaseReadLock(); -// } try { TopologyManager.acquireReadLock(); MemberActivatedEvent e = (MemberActivatedEvent)event; - ClusterContext clusCtx = AutoscalerContext.getInstance().getClusterContext(e.getClusterId()); + ClusterMonitor monitor = AutoscalerRuleEvaluator.getInstance().getMonitor(e.getClusterId()); + ClusterContext clusCtx = monitor.getClusterCtxt(); clusCtx.addMemberpartition(e.getMemberId(), e.getPartitionId()); + PartitionContext partCtxt = monitor.getPartitionCtxt(e.getPartitionId()); + partCtxt.incrementCurrentMemberCount(1); + } finally{ TopologyManager.releaseReadLock(); @@ -280,6 +273,7 @@ public class AutoscalerTopologyReceiver implements Runnable { private void removeClusterFromContext(String clusterId) { ClusterMonitor monitor = AutoscalerRuleEvaluator.getInstance().removeMonitor(clusterId); +// monitor.unsubscribe(); monitor.destroy(); if(log.isDebugEnabled()) { log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId)); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fcf1dc7f/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java index 03b1ddd..98048a6 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java @@ -62,8 +62,7 @@ public class AutoscalerUtil { * @throws PolicyValidationException * @throws PartitionValidationException */ - public static ClusterContext - getClusterContext(Cluster cluster) throws PolicyValidationException, PartitionValidationException { + public static ClusterContext getClusterContext(Cluster cluster) throws PolicyValidationException, PartitionValidationException { // FIXME fix the following code to correctly update // AutoscalerContext context = AutoscalerContext.getInstance(); if (null == cluster) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/fcf1dc7f/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl b/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl index 7881efe..ba50959 100644 --- a/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl +++ b/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl @@ -49,6 +49,7 @@ global org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator $evaluator; global org.apache.stratos.messaging.domain.topology.Topology $topology; global java.util.Map partitionCtxts; global java.lang.String clusterId; +global java.lang.boolean clusterRemoved; import function org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator.delegateSpawn; import function org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator.delegateTerminate; @@ -76,4 +77,5 @@ dialect "mvel" eval($ctxt.removeObsoleteMember(memberId)) then delegateTerminate(memberId); -end \ No newline at end of file +end +
