Repository: stratos Updated Branches: refs/heads/docker-grouping-merge 70044149d -> 368138888
adding locks when reading cluster and fixing cluster reset issue Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/36813888 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/36813888 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/36813888 Branch: refs/heads/docker-grouping-merge Commit: 36813888881d035554b32e7322d46cd770d52fcd Parents: 7004414 Author: reka <[email protected]> Authored: Sun Nov 9 22:20:01 2014 +0530 Committer: reka <[email protected]> Committed: Sun Nov 9 22:20:01 2014 +0530 ---------------------------------------------------------------------- .../cluster/VMServiceClusterMonitor.java | 29 +++++++++----------- .../status/checker/StatusChecker.java | 22 ++++++++++----- .../topology/ClusterResetMessageProcessor.java | 28 ++++--------------- 3 files changed, 34 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/36813888/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java index a2164f5..7684bb8 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java @@ -18,10 +18,6 @@ */ package org.apache.stratos.autoscaler.monitor.cluster; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,8 +26,6 @@ import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.VMServiceClusterContext; import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher; import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; -import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; -import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; import org.apache.stratos.autoscaler.util.AutoScalerConstants; import org.apache.stratos.autoscaler.util.ConfUtil; @@ -43,6 +37,9 @@ import org.apache.stratos.messaging.domain.applications.ApplicationStatus; import org.apache.stratos.messaging.domain.applications.GroupStatus; import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import java.util.ArrayList; +import java.util.List; + /** * Is responsible for monitoring a service cluster. This runs periodically * and perform minimum instance check and scaling check using the underlying @@ -56,10 +53,10 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { public VMServiceClusterMonitor(String clusterId, VMServiceClusterContext vmServiceClusterContext) { super(clusterId, new AutoscalerRuleEvaluator( - StratosConstants.VM_MIN_CHECK_DROOL_FILE, - StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE, - StratosConstants.VM_SCALE_CHECK_DROOL_FILE), vmServiceClusterContext - ); + StratosConstants.VM_MIN_CHECK_DROOL_FILE, + StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE, + StratosConstants.VM_SCALE_CHECK_DROOL_FILE), vmServiceClusterContext + ); readConfigurations(); } @@ -67,9 +64,9 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { public void run() { while (!isDestroyed()) { try { - if ((getStatus().getCode() <= ClusterStatus.Active.getCode()) || - (getStatus() == ClusterStatus.Inactive && !hasDependent) || - !this.hasFaultyMember) { + if (((getStatus().getCode() <= ClusterStatus.Active.getCode()) || + (getStatus() == ClusterStatus.Inactive && !hasDependent)) && !this.hasFaultyMember + && !stop) { if (log.isDebugEnabled()) { log.debug("Cluster monitor is running.. " + this.toString()); } @@ -148,7 +145,7 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { , minCheckFactHandle, partitionContext); obsoleteCheckFactHandle = AutoscalerRuleEvaluator.evaluateObsoleteCheck(getObsoleteCheckKnowledgeSession(), - obsoleteCheckFactHandle, partitionContext); + obsoleteCheckFactHandle, partitionContext); //checking the status of the cluster @@ -183,8 +180,8 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { @Override public String toString() { return "VMServiceClusterMonitor [clusterId=" + getClusterId() + - ", lbReferenceType=" + lbReferenceType + - ", hasPrimary=" + hasPrimary + " ]"; + ", lbReferenceType=" + lbReferenceType + + ", hasPrimary=" + hasPrimary + " ]"; } public String getLbReferenceType() { http://git-wip-us.apache.org/repos/asf/stratos/blob/36813888/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java index ba6dbaa..e07c946 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java @@ -422,14 +422,22 @@ public class StatusChecker { ClusterStatus status) { boolean clusterStat = false; for (Map.Entry<String, ClusterDataHolder> clusterDataHolderEntry : clusterData.entrySet()) { - Service service = TopologyManager.getTopology().getService(clusterDataHolderEntry.getValue().getServiceType()); - Cluster cluster = service.getCluster(clusterDataHolderEntry.getValue().getClusterId()); - if (cluster.getStatus() == status) { - clusterStat = true; - } else { - clusterStat = false; - return clusterStat; + String serviceName = clusterDataHolderEntry.getValue().getServiceType(); + String clusterId = clusterDataHolderEntry.getValue().getClusterId(); + TopologyManager.acquireReadLockForCluster(serviceName, clusterId); + try { + Service service = TopologyManager.getTopology().getService(serviceName); + Cluster cluster = service.getCluster(clusterId); + if (cluster.getStatus() == status) { + clusterStat = true; + } else { + clusterStat = false; + return clusterStat; + } + } finally { + TopologyManager.releaseReadLockForCluster(serviceName, clusterId); } + } return clusterStat; } http://git-wip-us.apache.org/repos/asf/stratos/blob/36813888/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java index 3cfb2dc..4e7b461 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterResetMessageProcessor.java @@ -99,17 +99,6 @@ public class ClusterResetMessageProcessor extends MessageProcessor { } } - // Validate event properties - /*Cluster cluster = event.getCluster(); - - if(cluster == null) { - String msg = "Cluster object of cluster created event is null."; - log.error(msg); - throw new RuntimeException(msg); - } - if (cluster.getHostNames().isEmpty()) { - throw new RuntimeException("Host name/s not found in cluster created event"); - }*/ // Validate event against the existing topology Service service = topology.getService(event.getServiceName()); if (service == null) { @@ -119,26 +108,21 @@ public class ClusterResetMessageProcessor extends MessageProcessor { } return false; } - if (service.clusterExists(event.getClusterId())) { + Cluster cluster = service.getCluster(event.getClusterId()); + + if (cluster == null) { if (log.isWarnEnabled()) { - log.warn(String.format("Cluster already exists in service: [service] %s [cluster] %s", event.getServiceName(), + log.warn(String.format("Cluster not exists in service: [service] %s [cluster] %s", event.getServiceName(), event.getClusterId())); } } else { - // Apply changes to the topology - Cluster cluster = service.getCluster(event.getClusterId()); if (!cluster.isStateTransitionValid(ClusterStatus.Created)) { - log.error("Invalid State Transition from " + cluster.getStatus() + " to " + ClusterStatus.Created + " " + - "for cluster " + cluster.getClusterId()); + log.error("Invalid State Transition from " + cluster.getStatus() + " to " + ClusterStatus.Created); } cluster.setStatus(ClusterStatus.Created); - if (log.isInfoEnabled()) { - log.info(String.format("Cluster reset as Created: %s", - cluster.toString())); - } - } + } // Notify event listeners notifyEventListeners(event); return true;
