Merging VMServiceClusterMonitor to VMClusterMonitor as abtraction is not 
required anymore


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7797f1e5
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7797f1e5
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7797f1e5

Branch: refs/heads/master
Commit: 7797f1e5b6b4f3a772c9edf6fb17aa05acf9e586
Parents: fdb84e7
Author: Lahiru Sandaruwan <[email protected]>
Authored: Tue Dec 2 07:33:47 2014 +0530
Committer: Lahiru Sandaruwan <[email protected]>
Committed: Tue Dec 2 07:36:07 2014 +0530

----------------------------------------------------------------------
 .../monitor/cluster/ClusterMonitorFactory.java  |   7 +-
 .../monitor/cluster/VMClusterMonitor.java       | 287 ++++++++++++++++++-
 .../autoscaler/rule/RuleTasksDelegator.java     |   7 +-
 3 files changed, 286 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/7797f1e5/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
index 1159fc5..65ab2a2 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitorFactory.java
@@ -26,7 +26,6 @@ import 
org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
 import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.MemberStatus;
 
@@ -52,20 +51,20 @@ public class ClusterMonitorFactory {
 //        } else if (cluster.isLbCluster()) {
 //            clusterMonitor = getVMLbClusterMonitor(cluster);
         } else {
-            clusterMonitor = getVMServiceClusterMonitor(cluster);
+            clusterMonitor = getVMClusterMonitor(cluster);
         }
 
         return clusterMonitor;
     }
 
-    private static VMServiceClusterMonitor getVMServiceClusterMonitor(Cluster 
cluster)
+    private static VMClusterMonitor getVMClusterMonitor(Cluster cluster)
             throws PolicyValidationException, PartitionValidationException {
 
         if (null == cluster) {
             return null;
         }
 
-        VMServiceClusterMonitor clusterMonitor = new 
VMServiceClusterMonitor(cluster.getServiceName(), cluster.getClusterId());
+        VMClusterMonitor clusterMonitor = new 
VMClusterMonitor(cluster.getServiceName(), cluster.getClusterId());
 
         // find lb reference type
         java.util.Properties props = cluster.getProperties();

http://git-wip-us.apache.org/repos/asf/stratos/blob/7797f1e5/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
index 69e7ca0..d902b7e 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/VMClusterMonitor.java
@@ -18,10 +18,9 @@
  */
 package org.apache.stratos.autoscaler.monitor.cluster;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
+import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.client.CloudControllerClient;
@@ -30,12 +29,23 @@ import 
org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
 import org.apache.stratos.autoscaler.context.member.MemberStatsContext;
 import 
org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
 import 
org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
+import 
org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
 import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
 import org.apache.stratos.autoscaler.exception.cartridge.TerminationException;
+import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import 
org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
 import org.apache.stratos.autoscaler.status.processor.StatusChecker;
+import org.apache.stratos.autoscaler.util.AutoScalerConstants;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.autoscaler.util.ConfUtil;
 import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.common.Properties;
+import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
+import org.apache.stratos.messaging.domain.applications.GroupStatus;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Service;
@@ -53,14 +63,21 @@ import 
org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
  * and perform minimum instance check and scaling check using the underlying
  * rules engine.
  */
-abstract public class VMClusterMonitor extends AbstractClusterMonitor {
+public class VMClusterMonitor extends AbstractClusterMonitor {
 
     private static final Log log = LogFactory.getLog(VMClusterMonitor.class);
     private Map<String, ClusterLevelNetworkPartitionContext> 
networkPartitionIdToClusterLevelNetworkPartitionCtxts;
-
-    protected VMClusterMonitor(String serviceType, String clusterId, 
AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
-        super(serviceType, clusterId, autoscalerRuleEvaluator);
+    private boolean hasPrimary;
+    private float scalingFactorBasedOnDependencies = 1.0f;
+
+    protected VMClusterMonitor(String serviceType, String clusterId) {
+        super(serviceType, clusterId, new AutoscalerRuleEvaluator(
+                StratosConstants.VM_MIN_CHECK_DROOL_FILE,
+                StratosConstants.VM_OBSOLETE_CHECK_DROOL_FILE,
+                StratosConstants.VM_SCALE_CHECK_DROOL_FILE));
         this.networkPartitionIdToClusterLevelNetworkPartitionCtxts = new 
HashMap<String, ClusterLevelNetworkPartitionContext>();
+
+        readConfigurations();
     }
 
     public void addClusterLevelNWPartitionContext 
(ClusterLevelNetworkPartitionContext clusterLevelNWPartitionCtxt) {
@@ -95,10 +112,266 @@ abstract public class VMClusterMonitor extends 
AbstractClusterMonitor {
                         " [network partition] %s", networkPartitionId));
             }
         }
+    }
+
+    @Override
+    public void run() {
+        while (!isDestroyed()) {
+            try {
+                /* TODO ***********if  (((getStatus().getCode() <= 
ClusterStatus.Active.getCode()) ||
+                        (getStatus() == ClusterStatus.Inactive && 
!hasStartupDependents)) && !this.hasFaultyMember
+                        && !stop) {*/
+                if (log.isDebugEnabled()) {
+                    log.debug("Cluster monitor is running.. " + 
this.toString());
+                }
+                monitor();
+                /*} else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Cluster monitor is suspended as the cluster 
is in " +
+                                ClusterStatus.Inactive + " mode......");
+                    }
+                }*/
+            } catch (Exception e) {
+                log.error("Cluster monitor: Monitor failed." + 
this.toString(), e);
+            }
+            try {
+                Thread.sleep(getMonitorIntervalMilliseconds());
+            } catch (InterruptedException ignore) {
+            }
+        }
+
+
+    }
+
+    private boolean isPrimaryMember(MemberContext memberContext) {
+        Properties props = 
AutoscalerUtil.toCommonProperties(memberContext.getProperties());
+        if (log.isDebugEnabled()) {
+            log.debug(" Properties [" + props + "] ");
+        }
+        if (props != null && props.getProperties() != null) {
+            for (Property prop : props.getProperties()) {
+                if (prop.getName().equals("PRIMARY")) {
+                    if (Boolean.parseBoolean(prop.getValue())) {
+                        log.debug("Adding member id [" + 
memberContext.getMemberId() + "] " +
+                                "member instance id [" + 
memberContext.getInstanceId() + "] as a primary member");
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    public void monitor() {
+        final Collection<ClusterLevelNetworkPartitionContext> 
clusterLevelNetworkPartitionContexts =
+                ((VMClusterContext) 
this.clusterContext).getNetworkPartitionCtxts().values();
+        Runnable monitoringRunnable = new Runnable() {
+            @Override
+            public void run() {
+                for (ClusterLevelNetworkPartitionContext 
networkPartitionContext :
+                        clusterLevelNetworkPartitionContexts) {
+                    // store primary members in the network partition context
+                    List<String> primaryMemberListInNetworkPartition = new 
ArrayList<String>();
+                    //minimum check per partition
+                    for (ClusterInstanceContext instanceContext : 
networkPartitionContext.
+                            getClusterInstanceContextMap().values()) {
+                        //FIXME to check the status of the instance
+                        if (true) {
+                            for (ClusterLevelPartitionContext partitionContext 
:
+                                    instanceContext.getPartitionCtxts()) {
+                                // store primary members in the partition 
context
+                                List<String> primaryMemberListInPartition = 
new ArrayList<String>();
+                                // get active primary members in this 
partition context
+                                for (MemberContext memberContext : 
partitionContext.getActiveMembers()) {
+                                    if (isPrimaryMember(memberContext)) {
+                                        
primaryMemberListInPartition.add(memberContext.getMemberId());
+                                    }
+                                }
+
+                                // get pending primary members in this 
partition context
+                                for (MemberContext memberContext : 
partitionContext.getPendingMembers()) {
+                                    if (isPrimaryMember(memberContext)) {
+                                        
primaryMemberListInPartition.add(memberContext.getMemberId());
+                                    }
+                                }
+                                
primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
+                                
getMinCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+//                                
getMinCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
+                                
getMinCheckKnowledgeSession().setGlobal("isPrimary", hasPrimary);
+                                
getMinCheckKnowledgeSession().setGlobal("instanceId",
+                                        instanceContext.getId());
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug(String.format("Running minimum 
check for partition %s ",
+                                            
partitionContext.getPartitionId()));
+                                }
+
+                                minCheckFactHandle = AutoscalerRuleEvaluator.
+                                        
evaluateMinCheck(getMinCheckKnowledgeSession()
+                                                , minCheckFactHandle, 
partitionContext);
+
+                                obsoleteCheckFactHandle = 
AutoscalerRuleEvaluator.
+                                        
evaluateObsoleteCheck(getObsoleteCheckKnowledgeSession(),
+                                                obsoleteCheckFactHandle, 
partitionContext);
+
+                                //checking the status of the cluster
+
+                                boolean rifReset = 
instanceContext.isRifReset();
+                                boolean memoryConsumptionReset = 
instanceContext.isMemoryConsumptionReset();
+                                boolean loadAverageReset = 
instanceContext.isLoadAverageReset();
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug("flag of rifReset: " + rifReset 
+ " flag of memoryConsumptionReset" + memoryConsumptionReset
+                                            + " flag of loadAverageReset" + 
loadAverageReset);
+                                }
+                                if (rifReset || memoryConsumptionReset || 
loadAverageReset) {
+
+
+                                    VMClusterContext vmClusterContext = 
(VMClusterContext) clusterContext;
+
+                                    
getScaleCheckKnowledgeSession().setGlobal("instance", instanceContext);
+                                    
getScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId());
+                                    
getScaleCheckKnowledgeSession().setGlobal("autoscalePolicy", 
vmClusterContext.getAutoscalePolicy());
+                                    
getScaleCheckKnowledgeSession().setGlobal("rifReset", rifReset);
+                                    
getScaleCheckKnowledgeSession().setGlobal("mcReset", memoryConsumptionReset);
+                                    
getScaleCheckKnowledgeSession().setGlobal("laReset", loadAverageReset);
+//                                    
getScaleCheckKnowledgeSession().setGlobal("lbRef", lbReferenceType);
+                                    
getScaleCheckKnowledgeSession().setGlobal("isPrimary", false);
+                                    
getScaleCheckKnowledgeSession().setGlobal("primaryMembers", 
primaryMemberListInNetworkPartition);
+
+                                    if (log.isDebugEnabled()) {
+                                        log.debug(String.format("Running scale 
check for network partition %s ", networkPartitionContext.getId()));
+                                        log.debug(" Primary members : " + 
primaryMemberListInNetworkPartition);
+                                    }
+
+                                    scaleCheckFactHandle = 
AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession()
+                                            , scaleCheckFactHandle, 
networkPartitionContext);
+
+                                    instanceContext.setRifReset(false);
+                                    
instanceContext.setMemoryConsumptionReset(false);
+                                    instanceContext.setLoadAverageReset(false);
+                                } else if (log.isDebugEnabled()) {
+                                    log.debug(String.format("Scale rule will 
not run since the LB statistics have not received before this " +
+                                            "cycle for network partition %s", 
networkPartitionContext.getId()));
+                                }
+                            }
+
+                        }
+                    }
+                }
+
+            }
+        };
+        monitoringRunnable.run();
+    }
+
+    @Override
+    protected void readConfigurations() {
+        XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration();
+        int monitorInterval = 
conf.getInt(AutoScalerConstants.VMService_Cluster_MONITOR_INTERVAL, 90000);
+        setMonitorIntervalMilliseconds(monitorInterval);
+        if (log.isDebugEnabled()) {
+            log.debug("VMServiceClusterMonitor task interval set to : " + 
getMonitorIntervalMilliseconds());
+        }
+    }
+
+    @Override
+    public void destroy() {
+        getMinCheckKnowledgeSession().dispose();
+        getObsoleteCheckKnowledgeSession().dispose();
+        getScaleCheckKnowledgeSession().dispose();
+        setDestroyed(true);
+        stopScheduler();
+        if (log.isDebugEnabled()) {
+            log.debug("VMServiceClusterMonitor Drools session has been 
disposed. " + this.toString());
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "VMServiceClusterMonitor [clusterId=" + getClusterId() +
+//                ", lbReferenceType=" + lbReferenceType +
+                ", hasPrimary=" + hasPrimary + " ]";
+    }
+
+//    public String getLbReferenceType() {
+//        return lbReferenceType;
+//    }
+//
+//    public void setLbReferenceType(String lbReferenceType) {
+//        this.lbReferenceType = lbReferenceType;
+//    }
+
+    public boolean isHasPrimary() {
+        return hasPrimary;
+    }
+
+    public void setHasPrimary(boolean hasPrimary) {
+        this.hasPrimary = hasPrimary;
+    }
+
+    @Override
+    public void onChildStatusEvent(MonitorStatusEvent statusEvent) {
+
+    }
+
+    @Override
+    public void onParentStatusEvent(MonitorStatusEvent statusEvent) {
+        String instanceId = statusEvent.getInstanceId();
+        // send the ClusterTerminating event
+        if (statusEvent.getStatus() == GroupStatus.Terminating || 
statusEvent.getStatus() ==
+                ApplicationStatus.Terminating) {
+            if (log.isInfoEnabled()) {
+                log.info("Publishing Cluster terminating event for 
[application]: " + appId +
+                        " [cluster]: " + this.getClusterId());
+            }
+            
ClusterStatusEventPublisher.sendClusterTerminatingEvent(getAppId(), 
getServiceId(), getClusterId(), instanceId);
+        }
+    }
+
+    @Override
+    public void onChildScalingEvent(MonitorScalingEvent scalingEvent) {
 
     }
 
     @Override
+    public void onParentScalingEvent(MonitorScalingEvent scalingEvent) {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Parent scaling event received to [cluster]: " + 
this.getClusterId()
+                    + ", [network partition]: " + 
scalingEvent.getNetworkPartitionId()
+                    + ", [event] " + scalingEvent.getId() + ", [group 
instance] " + scalingEvent.getInstanceId());
+        }
+
+        this.scalingFactorBasedOnDependencies = scalingEvent.getFactor();
+        VMClusterContext vmClusterContext = (VMClusterContext) clusterContext;
+        String instanceId = scalingEvent.getInstanceId();
+
+        ClusterInstanceContext clusterLevelNetworkPartitionContext =
+                
getClusterInstanceContext(scalingEvent.getNetworkPartitionId(), instanceId);
+
+
+        //TODO get min instance count from instance context
+        float requiredInstanceCount = 0 ;/* = 
clusterLevelNetworkPartitionContext.getMinInstanceCount() * 
scalingFactorBasedOnDependencies;*/
+        int roundedRequiredInstanceCount = 
getRoundedInstanceCount(requiredInstanceCount,
+                
vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor());
+        
clusterLevelNetworkPartitionContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount);
+
+        getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", 
getClusterId());
+        getDependentScaleCheckKnowledgeSession().setGlobal("scalingFactor", 
scalingFactorBasedOnDependencies);
+        
getDependentScaleCheckKnowledgeSession().setGlobal("instanceRoundingFactor",
+                
vmClusterContext.getAutoscalePolicy().getInstanceRoundingFactor());
+
+        dependentScaleCheckFactHandle = 
AutoscalerRuleEvaluator.evaluateScaleCheck(getScaleCheckKnowledgeSession()
+                , scaleCheckFactHandle, clusterLevelNetworkPartitionContext);
+
+    }
+
+    public void sendClusterScalingEvent(String networkPartitionId, float 
factor) {
+
+        MonitorStatusEventBuilder.handleClusterScalingEvent(this.parent, 
networkPartitionId, factor, this.id);
+    }
+    @Override
     public void handleGradientOfLoadAverageEvent(
             GradientOfLoadAverageEvent gradientOfLoadAverageEvent) {
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/7797f1e5/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index bb4d87f..e4c82ba 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -37,7 +37,6 @@ import 
org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionCont
 import org.apache.stratos.autoscaler.exception.cartridge.TerminationException;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.VMServiceClusterMonitor;
 //import 
org.apache.stratos.autoscaler.pojo.policy.deployment.partition.PartitionManager;
 import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
 import org.apache.stratos.common.constants.StratosConstants;
@@ -222,10 +221,10 @@ public class RuleTasksDelegator {
 
         //Notify parent for checking scaling dependencies
         AbstractClusterMonitor clusterMonitor = 
AutoscalerContext.getInstance().getClusterMonitor(clusterId);
-        if (clusterMonitor instanceof VMServiceClusterMonitor) {
+        if (clusterMonitor instanceof VMClusterMonitor) {
 
-            VMServiceClusterMonitor vmServiceClusterMonitor = 
(VMServiceClusterMonitor) clusterMonitor;
-            
vmServiceClusterMonitor.sendClusterScalingEvent(networkPartitionId, factor);
+            VMClusterMonitor vmClusterMonitor = (VMClusterMonitor) 
clusterMonitor;
+            vmClusterMonitor.sendClusterScalingEvent(networkPartitionId, 
factor);
         }
 
     }

Reply via email to