Repository: stratos Updated Branches: refs/heads/stratos-4.1.x 6496b05a3 -> 509bd6bdc
Fixing jira issue STRATOS-1629 - DAS publishers created each time when an event is published from stratos to DAS Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/65aee8a8 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/65aee8a8 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/65aee8a8 Branch: refs/heads/stratos-4.1.x Commit: 65aee8a81ffec254486e88b5c53eefa58f06dea8 Parents: af13aeb Author: Thanuja <[email protected]> Authored: Mon Nov 23 11:27:12 2015 +0530 Committer: Thanuja <[email protected]> Committed: Mon Nov 23 11:28:38 2015 +0530 ---------------------------------------------------------------------- .../monitor/cluster/ClusterMonitor.java | 21 +++++++++--- .../messaging/topology/TopologyBuilder.java | 34 +++----------------- .../src/main/conf/drools/dependent-scaling.drl | 8 ++--- .../src/main/conf/drools/mincheck.drl | 9 ++---- .../src/main/conf/drools/scaling.drl | 8 ++--- .../src/test/resources/common/scaling.drl | 8 ++--- 6 files changed, 31 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java index 28c2d95..5976279 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java @@ -41,6 +41,8 @@ import org.apache.stratos.autoscaler.monitor.events.ScalingEvent; import org.apache.stratos.autoscaler.monitor.events.ScalingUpBeyondMaxEvent; import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder; import org.apache.stratos.autoscaler.rule.RuleTasksDelegator; +import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory; +import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher; import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusActiveProcessor; import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusInactiveProcessor; import org.apache.stratos.autoscaler.status.processor.cluster.ClusterStatusTerminatedProcessor; @@ -51,6 +53,7 @@ import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.common.Properties; import org.apache.stratos.common.client.CloudControllerServiceClient; import org.apache.stratos.common.constants.StratosConstants; +import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; import org.apache.stratos.common.threading.StratosThreadPool; import org.apache.stratos.messaging.domain.application.ApplicationStatus; import org.apache.stratos.messaging.domain.application.GroupStatus; @@ -98,7 +101,8 @@ public class ClusterMonitor extends Monitor { private boolean hasScalingDependents; private boolean groupScalingEnabledSubtree; private String deploymentPolicyId; - + private ScalingDecisionPublisher scalingDecisionPublisher = + AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS); public ClusterMonitor(Cluster cluster, boolean hasScalingDependents, boolean groupScalingEnabledSubtree, String deploymentPolicyId) { @@ -337,6 +341,9 @@ public class ClusterMonitor extends Monitor { instanceContext.getMinCheckKnowledgeSession().setGlobal("algorithmName", paritionAlgo); + instanceContext.getMinCheckKnowledgeSession().setGlobal("scalingDecisionPublisher", + scalingDecisionPublisher); + if (log.isDebugEnabled()) { log.debug(String.format("Running minimum check for [cluster instance] %s, " + "[cluster id] %s", @@ -350,7 +357,7 @@ public class ClusterMonitor extends Monitor { if (log.isDebugEnabled()) { log.debug(String.format("Running maximum check for [cluster instance] %s, " + - "[cluster id] %s", instanceContext.getId(), clusterId)); + "[cluster id] %s", instanceContext.getId(), clusterId)); } instanceContext.setMaxCheckFactHandle(evaluate(instanceContext. getMaxCheckKnowledgeSession(), @@ -376,6 +383,8 @@ public class ClusterMonitor extends Monitor { clusterContext.getAutoscalePolicy()); instanceContext.getScaleCheckKnowledgeSession().setGlobal("arspiReset", averageRequestServedPerInstanceReset); + instanceContext.getScaleCheckKnowledgeSession().setGlobal("scalingDecisionPublisher", + scalingDecisionPublisher); if (log.isDebugEnabled()) { log.debug("Running scale check, [Is rif Reset] " + rifReset + ", " + "[Is memoryConsumption Reset] " + memoryConsumptionReset + ", " + @@ -552,8 +561,12 @@ public class ClusterMonitor extends Monitor { clusterInstanceContext.setRequiredInstanceCountBasedOnDependencies(roundedRequiredInstanceCount); clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("clusterId", getClusterId()); - clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount", roundedRequiredInstanceCount); - clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", clusterInstanceContext.getPartitionAlgorithm()); + clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("roundedRequiredInstanceCount", + roundedRequiredInstanceCount); + clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("algorithmName", + clusterInstanceContext.getPartitionAlgorithm()); + clusterInstanceContext.getDependentScaleCheckKnowledgeSession().setGlobal("scalingDecisionPublisher", + scalingDecisionPublisher); if (log.isDebugEnabled()) { log.debug(String.format("Running dependent scale check for [cluster instance] %s, " + http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index da38337..90fa8bd 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -57,6 +57,11 @@ import java.util.*; */ public class TopologyBuilder { private static final Log log = LogFactory.getLog(TopologyBuilder.class); + private static MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory. + createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS); + + private static MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. + createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); public static void handleServiceCreated(List<Cartridge> cartridgeList) throws RegistryException { Service service; @@ -352,9 +357,6 @@ public class TopologyBuilder { //member created time Long timestamp = System.currentTimeMillis(); //publishing member status to DAS - MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. - createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); - if (memStatusPublisher.isEnabled()) { if (log.isDebugEnabled()) { log.debug("Publishing Member Status to DAS"); @@ -363,8 +365,6 @@ public class TopologyBuilder { memberContext.getClusterInstanceId(), memberContext.getCartridgeType(), memberContext.getNetworkPartitionId(), memberContext.getPartition().getId(), memberContext.getMemberId(), MemberStatus.Created.toString()); - } else { - log.warn("Member Status Publisher is not enabled"); } } finally { @@ -435,11 +435,6 @@ public class TopologyBuilder { Long timestamp = System.currentTimeMillis(); TopologyEventPublisher.sendMemberInitializedEvent(memberContext); //publishing member information and status to DAS - MemberInformationPublisher memInfoPublisher = CloudControllerPublisherFactory. - createMemberInformationPublisher(StatisticsPublisherType.WSO2DAS); - - MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. - createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); if (memInfoPublisher.isEnabled()) { if (log.isInfoEnabled()) { @@ -460,8 +455,6 @@ public class TopologyBuilder { memberContext.getClusterInstanceId(), memberContext.getCartridgeType(), memberContext.getNetworkPartitionId(), memberContext.getPartition().getId(), memberContext.getMemberId(), MemberStatus.Initialized.toString()); - } else { - log.warn("Member status publisher is not enabled"); } } } finally { @@ -519,9 +512,6 @@ public class TopologyBuilder { //memberStartedEvent. TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); //publishing member status to DAS - MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. - createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); - if (memStatusPublisher.isEnabled()) { if (log.isDebugEnabled()) { log.debug("Publishing Member Status to DAS"); @@ -533,8 +523,6 @@ public class TopologyBuilder { instanceStartedEvent.getNetworkPartitionId(), instanceStartedEvent.getPartitionId(), instanceStartedEvent.getMemberId(), MemberStatus.Starting.toString()); - } else { - log.warn("Member Status Publisher is not enabled"); } } } finally { @@ -632,8 +620,6 @@ public class TopologyBuilder { TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); //publishing member status to DAS - MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. - createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); if (memStatusPublisher.isEnabled()) { if (log.isDebugEnabled()) { log.debug("Publishing Member Status to DAS"); @@ -643,8 +629,6 @@ public class TopologyBuilder { memberActivatedEvent.getClusterInstanceId(), memberActivatedEvent.getServiceName(), memberActivatedEvent.getNetworkPartitionId(), memberActivatedEvent.getPartitionId(), memberActivatedEvent.getMemberId(), MemberStatus.Active.toString()); - } else { - log.warn("Member Status Publisher is not enabled"); } } } finally { @@ -700,8 +684,6 @@ public class TopologyBuilder { } TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); //publishing member status to DAS. - MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. - createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); if (memStatusPublisher.isEnabled()) { if (log.isDebugEnabled()) { log.debug("Publishing Member Status to DAS"); @@ -713,8 +695,6 @@ public class TopologyBuilder { instanceReadyToShutdownEvent.getNetworkPartitionId(), instanceReadyToShutdownEvent.getPartitionId(), instanceReadyToShutdownEvent.getMemberId(), MemberStatus.ReadyToShutDown.toString()); - } else { - log.warn("Member Status Publisher is not enabled"); } //termination of particular instance will be handled by autoscaler } @@ -814,8 +794,6 @@ public class TopologyBuilder { partitionId, properties, groupAlias); //publishing member status to DAS. - MemberStatusPublisher memStatusPublisher = CloudControllerPublisherFactory. - createMemberStatusPublisher(StatisticsPublisherType.WSO2DAS); if (memStatusPublisher.isEnabled()) { if (log.isDebugEnabled()) { log.debug("Publishing Member Status to DAS"); @@ -823,8 +801,6 @@ public class TopologyBuilder { memStatusPublisher.publish(timestamp, applicationId, member.getClusterId(), clusterAlias, member.getClusterInstanceId(), member.getServiceName(), member.getNetworkPartitionId(), member.getPartitionId(), member.getMemberId(), MemberStatus.Terminated.toString()); - } else { - log.warn("Member Status Publisher is not enabled"); } } http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl index c4a1141..72c5536 100644 --- a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl @@ -28,15 +28,13 @@ import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadThresholds; import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption; import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage; import java.util.UUID; -import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher; -import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory; -import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; global org.apache.stratos.autoscaler.rule.RuleLog log; global java.lang.String clusterId; global Integer roundedRequiredInstanceCount; global org.apache.stratos.autoscaler.rule.RuleTasksDelegator delegator; global java.lang.String algorithmName; +global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher; rule "Dependent Scaling Rule" dialect "mvel" @@ -82,7 +80,7 @@ dialect "mvel" String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString(); long scalingTime = System.currentTimeMillis(); String scalingReason = "DEPENDENCY"; - ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS); + if (scalingDecisionPublisher.isEnabled()) { log.debug("Publishing scaling decision to DAS"); scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId, @@ -90,8 +88,6 @@ dialect "mvel" 0, 0, 0, 0, 0, 0,0, 0, 0, additionalInstances + nonTerminatedMembers, 0, additionalInstances, scalingReason); - } else { - log.warn("Scaling decision publisher is not enabled"); } while(count != additionalInstances && partitionsAvailable) { http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl index 5ce6d21..250676d 100755 --- a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl @@ -38,9 +38,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import java.util.UUID; -import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher; -import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory; -import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; global org.apache.stratos.autoscaler.rule.RuleLog log; global org.apache.stratos.autoscaler.pojo.policy.PolicyManager manager; @@ -49,6 +46,7 @@ global org.apache.stratos.autoscaler.rule.RuleTasksDelegator delegator; global java.util.Map partitionCtxts; global java.lang.String clusterId; global java.lang.String algorithmName; +global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher; rule "Minimum Rule" dialect "mvel" @@ -74,7 +72,7 @@ dialect "mvel" String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString(); long scalingTime = System.currentTimeMillis(); String scalingReason = "MIN"; - ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS); + if (scalingDecisionPublisher.isEnabled()) { log.debug("Publishing scaling decision to DAS"); scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId, @@ -83,9 +81,8 @@ dialect "mvel" 0, 0, 0, 0, 0, 0,0, 0, 0, clusterInstanceContext.getMinInstanceCount(), 0, additionalInstances, scalingReason); - } else { - log.warn("Scaling decision publisher is not enabled"); } + while(count != additionalInstances && partitionsAvailable){ ClusterLevelPartitionContext partitionContext = (ClusterLevelPartitionContext)partitionAlgorithm.getNextScaleUpPartitionContext(clusterInstanceContext.getPartitionCtxtsAsAnArray()); http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl index 912685c..f367652 100644 --- a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl @@ -40,9 +40,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import java.util.UUID; -import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher; -import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory; -import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption @@ -56,6 +53,7 @@ global java.lang.Boolean mcReset; global java.lang.Boolean laReset; global java.lang.Boolean arspiReset; global java.lang.String algorithmName; +global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher; rule "Scaling Rule" dialect "mvel" @@ -168,7 +166,7 @@ dialect "mvel" String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString(); long scalingTime = System.currentTimeMillis(); - ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS); + if (scalingDecisionPublisher.isEnabled()) { log.debug("Publishing scaling decision to DAS"); scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId, @@ -178,8 +176,6 @@ dialect "mvel" laPredictedValue, laThreshold, numberOfInstancesReuquiredBasedOnLoadAverage, numberOfRequiredInstances, activeInstancesCount, additionalInstances, scalingReason); - } else { - log.warn("Scaling decision publisher is not enabled"); } while(count != additionalInstances && partitionsAvailable){ http://git-wip-us.apache.org/repos/asf/stratos/blob/65aee8a8/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl ---------------------------------------------------------------------- diff --git a/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl b/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl index 4fc73fd..c4af71c 100644 --- a/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl +++ b/products/stratos/modules/integration/test-integration/src/test/resources/common/scaling.drl @@ -40,9 +40,6 @@ import org.apache.stratos.cloud.controller.stub.domain.Partition; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext; import java.util.UUID; -import org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher; -import org.apache.stratos.autoscaler.statistics.publisher.AutoscalerPublisherFactory; -import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType; import org.apache.stratos.autoscaler.pojo.policy.autoscale.LoadAverage import org.apache.stratos.autoscaler.pojo.policy.autoscale.MemoryConsumption @@ -56,6 +53,7 @@ global java.lang.Boolean mcReset; global java.lang.Boolean laReset; global java.lang.Boolean arspiReset; global java.lang.String algorithmName; +global org.apache.stratos.autoscaler.statistics.publisher.ScalingDecisionPublisher scalingDecisionPublisher; rule "Scaling Rule" dialect "mvel" @@ -169,7 +167,7 @@ dialect "mvel" String scalingDecisionId = clusterId + "-" + UUID.randomUUID().toString(); long scalingTime = System.currentTimeMillis(); - ScalingDecisionPublisher scalingDecisionPublisher = AutoscalerPublisherFactory.createScalingDecisionPublisher(StatisticsPublisherType.WSO2DAS); + if (scalingDecisionPublisher.isEnabled()) { log.debug("Publishing scaling decision to DAS"); scalingDecisionPublisher.publish(scalingTime, scalingDecisionId, clusterId, @@ -179,8 +177,6 @@ dialect "mvel" laPredictedValue, laThreshold, numberOfInstancesReuquiredBasedOnLoadAverage, numberOfRequiredInstances, activeInstancesCount, additionalInstances, scalingReason); - } else { - log.warn("Scaling decision publisher is not enabled"); } while(count != additionalInstances && partitionsAvailable){
