Repository: stratos Updated Branches: refs/heads/docker-grouping-merge 71e8ac293 -> 21ed21233
http://git-wip-us.apache.org/repos/asf/stratos/blob/8b66cbac/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java index f043f51..fa9736e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java @@ -22,17 +22,17 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.exception.InvalidArgumentException; -import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher; import org.apache.stratos.autoscaler.monitor.Monitor; +import org.apache.stratos.autoscaler.monitor.MonitorStatusEventBuilder; import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent; import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; import org.apache.stratos.cloud.controller.stub.pojo.Properties; -import org.apache.stratos.messaging.domain.topology.ApplicationStatus; import org.apache.stratos.messaging.domain.topology.ClusterStatus; -import org.apache.stratos.messaging.domain.topology.GroupStatus; import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent; import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent; import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent; @@ -62,10 +62,13 @@ import org.drools.runtime.rule.FactHandle; * Every cluster monitor, which are monitoring a cluster, should extend this class. */ public abstract class AbstractClusterMonitor extends Monitor implements Runnable { + + private static final Log log = LogFactory.getLog(AbstractClusterMonitor.class); private String clusterId; private String serviceId; - protected ClusterStatus status; + private String appId; + private ClusterStatus status; private int monitoringIntervalMilliseconds; protected FactHandle minCheckFactHandle; @@ -76,6 +79,7 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable private AutoscalerRuleEvaluator autoscalerRuleEvaluator; protected boolean hasFaultyMember = false; + protected boolean stop = false; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); @@ -88,6 +92,7 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession(); this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession(); + this.status = ClusterStatus.Created; } protected abstract void readConfigurations(); @@ -178,14 +183,35 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable this.clusterId = clusterId; } - public void setStatus(ClusterStatus status) { - this.status = status; - } - public ClusterStatus getStatus() { return status; } + public void setStatus(ClusterStatus status) { + + //if(this.status != status) { + this.status = status; + /** + * notifying the parent monitor about the state change + * If the cluster in_active and if it is a in_dependent cluster, + * then won't send the notification to parent. + */ + if (status == ClusterStatus.Inactive && !this.hasDependent) { + log.info("[Cluster] " + clusterId + "is not notifying the parent, " + + "since it is identified as the independent unit"); + + } else if (status == ClusterStatus.Terminating) { + // notify parent + log.info("[Cluster] " + clusterId + " is not notifying the parent, " + + "since it is in Terminating State"); + + } else { + MonitorStatusEventBuilder.handleClusterStatusEvent(this.parent, this.status, this.clusterId); + } + //} + + } + public String getServiceId() { return serviceId; } @@ -252,15 +278,19 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable AutoscalerRuleEvaluator autoscalerRuleEvaluator) { this.autoscalerRuleEvaluator = autoscalerRuleEvaluator; } + + public String getAppId() { + return this.appId; + } @Override public void onParentEvent(MonitorStatusEvent statusEvent) { // send the ClusterTerminating event - if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() == - ApplicationStatus.Terminating) { - StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId); - } +// if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() == +// ApplicationStatus.Terminating) { +// StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId); +// } } @Override @@ -287,4 +317,12 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable } public abstract void terminateAllMembers(); + + public boolean isStop() { + return stop; + } + + public void setStop(boolean stop) { + this.stop = stop; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/8b66cbac/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java index 2615651..1f17daa 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/KubernetesServiceClusterMonitor.java @@ -18,6 +18,7 @@ */ package org.apache.stratos.autoscaler.monitor.cluster; +import java.util.Arrays; import java.util.List; import org.apache.commons.configuration.XMLConfiguration; @@ -33,8 +34,6 @@ import org.apache.stratos.cloud.controller.stub.pojo.Property; import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.topology.ClusterStatus; -import edu.emory.mathcs.backport.java.util.Arrays; - /* * It is monitoring a kubernetes service cluster periodically. */ http://git-wip-us.apache.org/repos/asf/stratos/blob/8b66cbac/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java index 3365503..19d3704 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java @@ -27,11 +27,13 @@ import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; +import org.apache.stratos.autoscaler.exception.InvalidArgumentException; import org.apache.stratos.autoscaler.exception.TerminationException; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; import org.apache.stratos.autoscaler.status.checker.StatusChecker; import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.cloud.controller.stub.pojo.Properties; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Service; @@ -552,6 +554,11 @@ abstract public class VMClusterMonitor extends AbstractClusterMonitor { ClusterRemovedEvent clusterRemovedEvent) { } + + @Override + public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException { + + } private String getNetworkPartitionIdByMemberId(String memberId) { for (Service service : TopologyManager.getTopology().getServices()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/8b66cbac/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java index 8a0959c..3c82bdd 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMLbClusterMonitor.java @@ -28,14 +28,12 @@ import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.NetworkPartitionLbHolder; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; -import org.apache.stratos.autoscaler.exception.InvalidArgumentException; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.autoscaler.policy.PolicyManager; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; import org.apache.stratos.autoscaler.util.AutoScalerConstants; import org.apache.stratos.autoscaler.util.ConfUtil; -import org.apache.stratos.cloud.controller.stub.pojo.Properties; import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.topology.ClusterStatus; import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; @@ -68,7 +66,7 @@ public class VMLbClusterMonitor extends VMClusterMonitor { log.debug("Cluster monitor is running.. " + this.toString()); } try { - if (!ClusterStatus.Inactive.equals(status)) { + if (!ClusterStatus.Inactive.equals(getStatus())) { monitor(); } else { if (log.isDebugEnabled()) { @@ -173,9 +171,4 @@ public class VMLbClusterMonitor extends VMClusterMonitor { return "VMLbClusterMonitor [clusterId=" + getClusterId() + ", serviceId=" + getServiceId() + "]"; } - @Override - public void handleDynamicUpdates(Properties properties) throws InvalidArgumentException { - // TODO - - } } http://git-wip-us.apache.org/repos/asf/stratos/blob/8b66cbac/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java index dc97dc7..cc351de 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMServiceClusterMonitor.java @@ -28,10 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy; -import org.apache.stratos.autoscaler.exception.InvalidArgumentException; -import org.apache.stratos.autoscaler.exception.TerminationException; import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; -import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent; import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy; import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator; @@ -40,10 +37,10 @@ import org.apache.stratos.autoscaler.util.ConfUtil; import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; import org.apache.stratos.cloud.controller.stub.pojo.Properties; import org.apache.stratos.cloud.controller.stub.pojo.Property; -import org.apache.stratos.messaging.domain.applications.ApplicationStatus; import org.apache.stratos.common.constants.StratosConstants; -import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.applications.ApplicationStatus; import org.apache.stratos.messaging.domain.applications.GroupStatus; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; /** * Is responsible for monitoring a service cluster. This runs periodically @@ -66,22 +63,23 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { new ConcurrentHashMap<String, NetworkPartitionContext>()); readConfigurations(); } - - private static void terminateMember(String memberId) { - try { - CloudControllerClient.getInstance().terminate(memberId); - - } catch (TerminationException e) { - log.error("Unable to terminate member [member id ] " + memberId, e); - } - } + +//TODO why this method? +// private static void terminateMember(String memberId) { +// try { +// CloudControllerClient.getInstance().terminate(memberId); +// +// } catch (TerminationException e) { +// log.error("Unable to terminate member [member id ] " + memberId, e); +// } +// } @Override public void run() { while (!isDestroyed()) { try { - if ((this.status.getCode() <= ClusterStatus.Active.getCode()) || - (this.status == ClusterStatus.Inactive && !hasDependent) || + if ((getStatus().getCode() <= ClusterStatus.Active.getCode()) || + (getStatus() == ClusterStatus.Inactive && !hasDependent) || !this.hasFaultyMember) { if (log.isDebugEnabled()) { log.debug("Cluster monitor is running.. " + this.toString()); @@ -97,56 +95,57 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { log.error("Cluster monitor: Monitor failed." + this.toString(), e); } try { - Thread.sleep(monitorInterval); + Thread.sleep(getMonitorIntervalMilliseconds()); } catch (InterruptedException ignore) { } } } - - @Override - public void terminateAllMembers() { - - Thread memberTerminator = new Thread(new Runnable() { - public void run() { - - for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) { - for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { - //if (log.isDebugEnabled()) { - log.info("Starting to terminate all members in Network Partition [ " + - networkPartitionContext.getId() + " ], Partition [ " + - partitionContext.getPartitionId() + " ]"); - // } - // need to terminate active, pending and obsolete members - - // active members - for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) { - log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId()); - terminateMember(activeMemberCtxt.getMemberId()); - } - - // pending members - for (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) { - log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId()); - terminateMember(pendingMemberCtxt.getMemberId()); - } - - // obsolete members - for (String obsoleteMemberId : partitionContext.getObsoletedMembers()) { - log.info("Terminating obsolete member [member id] " + obsoleteMemberId); - terminateMember(obsoleteMemberId); - } - -// terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll -// (terminateAllKnowledgeSession, terminateAllFactHandle, partitionContext); - } - } - } - }, "Member Terminator - [cluster id] " + this.clusterId); - - memberTerminator.start(); - } + +//TODO why this method? +// @Override +// public void terminateAllMembers() { +// +// Thread memberTerminator = new Thread(new Runnable() { +// public void run() { +// +// for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) { +// for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) { +// //if (log.isDebugEnabled()) { +// log.info("Starting to terminate all members in Network Partition [ " + +// networkPartitionContext.getId() + " ], Partition [ " + +// partitionContext.getPartitionId() + " ]"); +// // } +// // need to terminate active, pending and obsolete members +// +// // active members +// for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) { +// log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId()); +// terminateMember(activeMemberCtxt.getMemberId()); +// } +// +// // pending members +// for (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) { +// log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId()); +// terminateMember(pendingMemberCtxt.getMemberId()); +// } +// +// // obsolete members +// for (String obsoleteMemberId : partitionContext.getObsoletedMembers()) { +// log.info("Terminating obsolete member [member id] " + obsoleteMemberId); +// terminateMember(obsoleteMemberId); +// } +// +//// terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll +//// (terminateAllKnowledgeSession, terminateAllFactHandle, partitionContext); +// } +// } +// } +// }, "Member Terminator - [cluster id] " + this.clusterId); +// +// memberTerminator.start(); +// } private boolean isPrimaryMember(MemberContext memberContext) { Properties props = memberContext.getProperties(); @@ -191,16 +190,16 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { } } primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition); - minCheckKnowledgeSession.setGlobal("clusterId", clusterId); - minCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType); - minCheckKnowledgeSession.setGlobal("isPrimary", hasPrimary); + getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); + getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType); + getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary); if (log.isDebugEnabled()) { log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId())); } - minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession + minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(getMinCheckKnowledgeSession() , minCheckFactHandle, partitionContext); //checking the status of the cluster @@ -246,25 +245,6 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { } } - private boolean isPrimaryMember(MemberContext memberContext) { - Properties props = memberContext.getProperties(); - if (log.isDebugEnabled()) { - log.debug(" Properties [" + props + "] "); - } - if (props != null && props.getProperties() != null) { - for (Property prop : props.getProperties()) { - if (prop.getName().equals("PRIMARY")) { - if (Boolean.parseBoolean(prop.getValue())) { - log.debug("Adding member id [" + memberContext.getMemberId() + "] " + - "member instance id [" + memberContext.getInstanceId() + "] as a primary member"); - return true; - } - } - } - } - return false; - } - @Override protected void readConfigurations() { XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); @@ -320,7 +300,7 @@ public class VMServiceClusterMonitor extends VMClusterMonitor { // send the ClusterTerminating event if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() == ApplicationStatus.Terminating) { - ClusterStatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId); + ClusterStatusEventPublisher.sendClusterTerminatingEvent(getAppId(), getServiceId(), getClusterId()); } } } http://git-wip-us.apache.org/repos/asf/stratos/blob/8b66cbac/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java index d828309..1b9d44d 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java @@ -18,24 +18,28 @@ */ package org.apache.stratos.autoscaler.status.checker; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.autoscaler.AutoscalerContext; import org.apache.stratos.autoscaler.NetworkPartitionContext; import org.apache.stratos.autoscaler.PartitionContext; import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder; -import org.apache.stratos.autoscaler.applications.topic.ApplicationsEventPublisher; import org.apache.stratos.autoscaler.grouping.topic.ClusterStatusEventPublisher; -import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor; -import org.apache.stratos.messaging.domain.applications.*; -import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher; import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor; -import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.domain.applications.Application; +import org.apache.stratos.messaging.domain.applications.ApplicationStatus; +import org.apache.stratos.messaging.domain.applications.ClusterDataHolder; +import org.apache.stratos.messaging.domain.applications.Group; +import org.apache.stratos.messaging.domain.applications.GroupStatus; +import org.apache.stratos.messaging.domain.applications.ParentComponent; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.ClusterStatus; +import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.message.receiver.applications.ApplicationManager; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; -import java.util.Map; - /** * This will be used to evaluate the status of a group * and notify the interested parties about the status changes.
