Publishing events for Metering Service Implementation
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/76c7724b Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/76c7724b Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/76c7724b Branch: refs/heads/master Commit: 76c7724b6f248e48b14ba2ea69e6e919291b4194 Parents: 00ed5fa Author: Thanuja <[email protected]> Authored: Wed Jul 22 16:28:56 2015 +0530 Committer: Thanuja <[email protected]> Committed: Wed Jul 22 16:28:56 2015 +0530 ---------------------------------------------------------------------- .../client/AutoscalerCloudControllerClient.java | 391 +-- .../autoscaler/rule/RuleTasksDelegator.java | 690 ++-- .../messaging/topology/TopologyBuilder.java | 2002 ++++++----- .../impl/CloudControllerServiceImpl.java | 3155 +++++++++--------- .../impl/CloudControllerServiceUtil.java | 132 +- .../services/impl/InstanceCreator.java | 11 - .../publisher/BAMUsageDataPublisher.java | 343 +- .../util/CloudControllerConstants.java | 504 ++- .../common/constants/StratosConstants.java | 375 +-- .../src/main/conf/drools/dependent-scaling.drl | 5 +- .../src/main/conf/drools/scaling.drl | 6 +- 11 files changed, 3802 insertions(+), 3812 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/76c7724b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java index f944a9f..67fd956 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java @@ -48,194 +48,205 @@ import java.util.List; */ public class AutoscalerCloudControllerClient { - private static final Log log = LogFactory.getLog(AutoscalerCloudControllerClient.class); - - private static CloudControllerServiceStub stub; - - /* An instance of a CloudControllerClient is created when the class is loaded. - * Since the class is loaded only once, it is guaranteed that an object of - * CloudControllerClient is created only once. Hence it is singleton. - */ - private static class InstanceHolder { - private static final AutoscalerCloudControllerClient INSTANCE = new AutoscalerCloudControllerClient(); - } - - public static AutoscalerCloudControllerClient getInstance() { - return InstanceHolder.INSTANCE; - } - - private AutoscalerCloudControllerClient() { - try { - XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); - int port = conf.getInt("autoscaler.cloudController.port", AutoscalerConstants.CLOUD_CONTROLLER_DEFAULT_PORT); - String hostname = conf.getString("autoscaler.cloudController.hostname", "localhost"); - String epr = "https://" + hostname + ":" + port + "/" + AutoscalerConstants.CLOUD_CONTROLLER_SERVICE_SFX; - int cloudControllerClientTimeout = conf.getInt("autoscaler.cloudController.clientTimeout", 180000); - - stub = new CloudControllerServiceStub(epr); - stub._getServiceClient().getOptions().setProperty(HTTPConstants.SO_TIMEOUT, cloudControllerClientTimeout); - stub._getServiceClient().getOptions().setProperty(HTTPConstants.CONNECTION_TIMEOUT, - cloudControllerClientTimeout); - } catch (Exception e) { - log.error("Could not initialize cloud controller client", e); - } - } - - public synchronized MemberContext startInstance(PartitionRef partition, - String clusterId, String clusterInstanceId, - String networkPartitionId, boolean isPrimary, - int minMemberCount) throws SpawningException { - try { - if (log.isInfoEnabled()) { - log.info(String.format("Trying to spawn an instance via cloud controller: " + - "[cluster] %s [partition] %s [network-partition-id] %s", - clusterId, partition.getId(), networkPartitionId)); - } - - XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); - long expiryTime = conf.getLong(StratosConstants.OBSOLETED_MEMBER_EXPIRY_TIMEOUT, 86400000); - if (log.isDebugEnabled()) { - log.debug("Member obsolete expiry time is set to: " + expiryTime); - } - - InstanceContext instanceContext = new InstanceContext(); - instanceContext.setClusterId(clusterId); - instanceContext.setClusterInstanceId(clusterInstanceId); - instanceContext.setPartition(AutoscalerObjectConverter.convertPartitionToCCPartition(partition)); - instanceContext.setInitTime(System.currentTimeMillis()); - instanceContext.setObsoleteExpiryTime(expiryTime); - instanceContext.setNetworkPartitionId(networkPartitionId); - - Properties memberContextProps = new Properties(); - Property isPrimaryProp = new Property(); - isPrimaryProp.setName("PRIMARY"); - isPrimaryProp.setValue(String.valueOf(isPrimary)); - - Property minCountProp = new Property(); - minCountProp.setName(StratosConstants.MIN_COUNT); - minCountProp.setValue(String.valueOf(minMemberCount)); - - memberContextProps.addProperty(isPrimaryProp); - memberContextProps.addProperty(minCountProp); - instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps)); - - long startTime = System.currentTimeMillis(); - MemberContext memberContext = stub.startInstance(instanceContext); - - if (log.isDebugEnabled()) { - long endTime = System.currentTimeMillis(); - log.debug(String.format("Service call startInstance() returned in %dms", (endTime - startTime))); - } - return memberContext; - } catch (CloudControllerServiceCartridgeNotFoundExceptionException e) { - String message = e.getFaultMessage().getCartridgeNotFoundException().getMessage(); - log.error(message, e); - throw new SpawningException(message, e); - } catch (RemoteException e) { - log.error(e.getMessage(), e); - throw new SpawningException(e.getMessage(), e); - } catch (CloudControllerServiceInvalidIaasProviderExceptionException e) { - String message = e.getFaultMessage().getInvalidIaasProviderException().getMessage(); - log.error(message, e); - throw new SpawningException(message, e); - } catch (CloudControllerServiceCloudControllerExceptionException e) { - String message = e.getMessage(); - log.error(message, e); - throw new SpawningException(message, e); - } - } - - public synchronized void createApplicationClusters(String appId, - ApplicationClusterContext[] applicationClusterContexts) { - List<org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext> contextDTOs = - new ArrayList<org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext>(); - if (applicationClusterContexts != null) { - for (ApplicationClusterContext applicationClusterContext : applicationClusterContexts) { - if (applicationClusterContext != null) { - org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext dto = - new org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext(); - dto.setClusterId(applicationClusterContext.getClusterId()); - dto.setAutoscalePolicyName(applicationClusterContext.getAutoscalePolicyName()); - dto.setDeploymentPolicyName(applicationClusterContext.getDeploymentPolicyName()); - dto.setCartridgeType(applicationClusterContext.getCartridgeType()); - dto.setHostName(applicationClusterContext.getHostName()); - dto.setTenantRange(applicationClusterContext.getTenantRange()); - dto.setTextPayload(applicationClusterContext.getTextPayload()); - dto.setProperties(AutoscalerUtil.toStubProperties(applicationClusterContext.getProperties())); - dto.setDependencyClusterIds(applicationClusterContext.getDependencyClusterIds()); - if (applicationClusterContext.getPersistenceContext() != null) { - dto.setVolumeRequired(true); - dto.setVolumes(convertVolumesToStubVolumes( - applicationClusterContext.getPersistenceContext().getVolumes())); - } - contextDTOs.add(dto); - } - } - } - - org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext[] applicationClusterContextDTOs = - new org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext[contextDTOs.size()]; - contextDTOs.toArray(applicationClusterContextDTOs); - try { - stub.createApplicationClusters(appId, applicationClusterContextDTOs); - } catch (RemoteException e) { - String msg = e.getMessage(); - log.error(msg, e); - } catch (CloudControllerServiceApplicationClusterRegistrationExceptionException e) { - String msg = e.getMessage(); - log.error(msg, e); - } - } - - - private Volume[] convertVolumesToStubVolumes(VolumeContext[] volumeContexts) { - - ArrayList<Volume> volumes = new ArrayList<Volume>(); - for (VolumeContext volumeContext : volumeContexts) { - Volume volume = new Volume(); - volume.setRemoveOntermination(volumeContext.isRemoveOntermination()); - volume.setMappingPath(volumeContext.getMappingPath()); - volume.setId(volumeContext.getId()); - volume.setDevice(volumeContext.getDevice()); - volume.setIaasType(volumeContext.getIaasType()); - volume.setSnapshotId(volumeContext.getSnapshotId()); - volume.setVolumeId(volumeContext.getVolumeId()); - volume.setSize(volumeContext.getSize()); - volumes.add(volume); - } - return volumes.toArray(new Volume[volumes.size()]); - } - - public void terminateInstance(String memberId) throws Exception { - if (log.isInfoEnabled()) { - log.info(String.format("Terminating instance via cloud controller: [member] %s", memberId)); - } - long startTime = System.currentTimeMillis(); - stub.terminateInstance(memberId); - if (log.isDebugEnabled()) { - long endTime = System.currentTimeMillis(); - log.debug(String.format("Service call terminateInstance() returned in %dms", (endTime - startTime))); - } - } - - public void terminateInstanceForcefully(String memberId) throws Exception { - if (log.isDebugEnabled()) { - log.debug(String.format("Terminating instance forcefully via cloud controller: [member] %s", memberId)); - } - stub.terminateInstanceForcefully(memberId); - } - - public void terminateAllInstances(String clusterId) throws RemoteException, - CloudControllerServiceInvalidClusterExceptionException { - if (log.isInfoEnabled()) { - log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", clusterId)); - } - long startTime = System.currentTimeMillis(); - stub.terminateInstances(clusterId); - - if (log.isDebugEnabled()) { - long endTime = System.currentTimeMillis(); - log.debug(String.format("Service call terminateInstances() returned in %dms", (endTime - startTime))); - } - } + private static final Log log = LogFactory.getLog(AutoscalerCloudControllerClient.class); + + private static CloudControllerServiceStub stub; + + /* An instance of a CloudControllerClient is created when the class is loaded. + * Since the class is loaded only once, it is guaranteed that an object of + * CloudControllerClient is created only once. Hence it is singleton. + */ + private static class InstanceHolder { + private static final AutoscalerCloudControllerClient INSTANCE = new AutoscalerCloudControllerClient(); + } + + public static AutoscalerCloudControllerClient getInstance() { + return InstanceHolder.INSTANCE; + } + + private AutoscalerCloudControllerClient() { + try { + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + int port = + conf.getInt("autoscaler.cloudController.port", AutoscalerConstants.CLOUD_CONTROLLER_DEFAULT_PORT); + String hostname = conf.getString("autoscaler.cloudController.hostname", "localhost"); + String epr = "https://" + hostname + ":" + port + "/" + AutoscalerConstants.CLOUD_CONTROLLER_SERVICE_SFX; + int cloudControllerClientTimeout = conf.getInt("autoscaler.cloudController.clientTimeout", 180000); + + stub = new CloudControllerServiceStub(epr); + stub._getServiceClient().getOptions().setProperty(HTTPConstants.SO_TIMEOUT, cloudControllerClientTimeout); + stub._getServiceClient().getOptions() + .setProperty(HTTPConstants.CONNECTION_TIMEOUT, cloudControllerClientTimeout); + } catch (Exception e) { + log.error("Could not initialize cloud controller client", e); + } + } + + public synchronized MemberContext startInstance(PartitionRef partition, String clusterId, String clusterInstanceId, + String networkPartitionId, boolean isPrimary, int minMemberCount, + String autoscalingReason, long scalingTime) + throws SpawningException { + try { + if (log.isInfoEnabled()) { + log.info(String.format("Trying to spawn an instance via cloud controller: " + + "[cluster] %s [partition] %s [network-partition-id] %s", clusterId, + partition.getId(), networkPartitionId)); + } + + XMLConfiguration conf = ConfUtil.getInstance(null).getConfiguration(); + long expiryTime = conf.getLong(StratosConstants.OBSOLETED_MEMBER_EXPIRY_TIMEOUT, 86400000); + if (log.isDebugEnabled()) { + log.debug("Member obsolete expiry time is set to: " + expiryTime); + } + + InstanceContext instanceContext = new InstanceContext(); + instanceContext.setClusterId(clusterId); + instanceContext.setClusterInstanceId(clusterInstanceId); + instanceContext.setPartition(AutoscalerObjectConverter.convertPartitionToCCPartition(partition)); + instanceContext.setInitTime(System.currentTimeMillis()); + instanceContext.setObsoleteExpiryTime(expiryTime); + instanceContext.setNetworkPartitionId(networkPartitionId); + + Properties memberContextProps = new Properties(); + Property isPrimaryProp = new Property(); + isPrimaryProp.setName("PRIMARY"); + isPrimaryProp.setValue(String.valueOf(isPrimary)); + + Property minCountProp = new Property(); + minCountProp.setName(StratosConstants.MIN_COUNT); + minCountProp.setValue(String.valueOf(minMemberCount)); + + Property autoscalingReasonProp = new Property(); + autoscalingReasonProp.setName(StratosConstants.SCALING_REASON); + autoscalingReasonProp.setValue(autoscalingReason); + + Property scalingTimeProp = new Property(); + scalingTimeProp.setName(StratosConstants.SCALING_TIME); + scalingTimeProp.setValue(String.valueOf(scalingTime)); + + memberContextProps.addProperty(isPrimaryProp); + memberContextProps.addProperty(minCountProp); + memberContextProps.addProperty(autoscalingReasonProp); + memberContextProps.addProperty(scalingTimeProp); + instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps)); + + long startTime = System.currentTimeMillis(); + MemberContext memberContext = stub.startInstance(instanceContext); + + if (log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); + log.debug(String.format("Service call startInstance() returned in %dms", (endTime - startTime))); + } + return memberContext; + } catch (CloudControllerServiceCartridgeNotFoundExceptionException e) { + String message = e.getFaultMessage().getCartridgeNotFoundException().getMessage(); + log.error(message, e); + throw new SpawningException(message, e); + } catch (RemoteException e) { + log.error(e.getMessage(), e); + throw new SpawningException(e.getMessage(), e); + } catch (CloudControllerServiceInvalidIaasProviderExceptionException e) { + String message = e.getFaultMessage().getInvalidIaasProviderException().getMessage(); + log.error(message, e); + throw new SpawningException(message, e); + } catch (CloudControllerServiceCloudControllerExceptionException e) { + String message = e.getMessage(); + log.error(message, e); + throw new SpawningException(message, e); + } + } + + public synchronized void createApplicationClusters(String appId, + ApplicationClusterContext[] applicationClusterContexts) { + List<org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext> contextDTOs = + new ArrayList<org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext>(); + if (applicationClusterContexts != null) { + for (ApplicationClusterContext applicationClusterContext : applicationClusterContexts) { + if (applicationClusterContext != null) { + org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext dto = + new org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext(); + dto.setClusterId(applicationClusterContext.getClusterId()); + dto.setAutoscalePolicyName(applicationClusterContext.getAutoscalePolicyName()); + dto.setDeploymentPolicyName(applicationClusterContext.getDeploymentPolicyName()); + dto.setCartridgeType(applicationClusterContext.getCartridgeType()); + dto.setHostName(applicationClusterContext.getHostName()); + dto.setTenantRange(applicationClusterContext.getTenantRange()); + dto.setTextPayload(applicationClusterContext.getTextPayload()); + dto.setProperties(AutoscalerUtil.toStubProperties(applicationClusterContext.getProperties())); + dto.setDependencyClusterIds(applicationClusterContext.getDependencyClusterIds()); + if (applicationClusterContext.getPersistenceContext() != null) { + dto.setVolumeRequired(true); + dto.setVolumes(convertVolumesToStubVolumes( + applicationClusterContext.getPersistenceContext().getVolumes())); + } + contextDTOs.add(dto); + } + } + } + + org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext[] applicationClusterContextDTOs = + new org.apache.stratos.cloud.controller.stub.domain.ApplicationClusterContext[contextDTOs.size()]; + contextDTOs.toArray(applicationClusterContextDTOs); + try { + stub.createApplicationClusters(appId, applicationClusterContextDTOs); + } catch (RemoteException e) { + String msg = e.getMessage(); + log.error(msg, e); + } catch (CloudControllerServiceApplicationClusterRegistrationExceptionException e) { + String msg = e.getMessage(); + log.error(msg, e); + } + } + + private Volume[] convertVolumesToStubVolumes(VolumeContext[] volumeContexts) { + + ArrayList<Volume> volumes = new ArrayList<Volume>(); + for (VolumeContext volumeContext : volumeContexts) { + Volume volume = new Volume(); + volume.setRemoveOntermination(volumeContext.isRemoveOntermination()); + volume.setMappingPath(volumeContext.getMappingPath()); + volume.setId(volumeContext.getId()); + volume.setDevice(volumeContext.getDevice()); + volume.setIaasType(volumeContext.getIaasType()); + volume.setSnapshotId(volumeContext.getSnapshotId()); + volume.setVolumeId(volumeContext.getVolumeId()); + volume.setSize(volumeContext.getSize()); + volumes.add(volume); + } + return volumes.toArray(new Volume[volumes.size()]); + } + + public void terminateInstance(String memberId) throws Exception { + if (log.isInfoEnabled()) { + log.info(String.format("Terminating instance via cloud controller: [member] %s", memberId)); + } + long startTime = System.currentTimeMillis(); + stub.terminateInstance(memberId); + if (log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); + log.debug(String.format("Service call terminateInstance() returned in %dms", (endTime - startTime))); + } + } + + public void terminateInstanceForcefully(String memberId) throws Exception { + if (log.isDebugEnabled()) { + log.debug(String.format("Terminating instance forcefully via cloud controller: [member] %s", memberId)); + } + stub.terminateInstanceForcefully(memberId); + } + + public void terminateAllInstances(String clusterId) + throws RemoteException, CloudControllerServiceInvalidClusterExceptionException { + if (log.isInfoEnabled()) { + log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", + clusterId)); + } + long startTime = System.currentTimeMillis(); + stub.terminateInstances(clusterId); + + if (log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); + log.debug(String.format("Service call terminateInstances() returned in %dms", (endTime - startTime))); + } + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/76c7724b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index 51443a1..5627b8c 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -36,7 +36,6 @@ import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetwo import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; -import org.apache.stratos.common.client.CloudControllerServiceClient; import org.apache.stratos.common.constants.StratosConstants; /** @@ -44,347 +43,350 @@ import org.apache.stratos.common.constants.StratosConstants; */ public class RuleTasksDelegator { - private static boolean arspiIsSet = false; - - private static final Log log = LogFactory.getLog(RuleTasksDelegator.class); - - public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, int timeInterval) { - double predictedValue; -// s = u * t + 0.5 * a * t * t - if (log.isDebugEnabled()) { - log.debug(String.format("Predicting the value, [average]: %s , [gradient]: %s , [second derivative] " + - ": %s , [time intervals]: %s ", average, gradient, secondDerivative, timeInterval)); - } - predictedValue = average + gradient * timeInterval + 0.5 * secondDerivative * timeInterval * timeInterval; - - return predictedValue; - } - - - public int getNumberOfInstancesRequiredBasedOnRif(float rifPredictedValue, float rifThreshold) { - - if (rifThreshold != 0) { - - float requiredNumberOfInstances = rifPredictedValue / rifThreshold; - return (int) Math.ceil(requiredNumberOfInstances); - } else { - log.error("Request in flight threshold is Zero"); - return 0; - } - - } - - public int getNumberOfInstancesRequiredBasedOnMemoryConsumption(float threshold, double predictedValue, - int min, int max) { - double numberOfAdditionalInstancesRequired = 0; - if (predictedValue != threshold) { - - float scalingRange = 100 - threshold; - int instanceRange = max - min; - - if (instanceRange != 0) { - - float gradient = scalingRange / instanceRange; - numberOfAdditionalInstancesRequired = (predictedValue - threshold) / gradient; - } - - if (predictedValue < threshold) { - //Since predicted-value is less, it can be scale-down - return min - 1; - } - } - - return (int) Math.ceil(min + numberOfAdditionalInstancesRequired); - } - - public int getNumberOfInstancesRequiredBasedOnLoadAverage(float threshold, double predictedValue, - int min) { - - double numberOfInstances; - if (threshold != 0) { - - numberOfInstances = (min * predictedValue) / threshold; - return (int) Math.ceil(numberOfInstances); - } - - return min; - } - - public int getMaxNumberOfInstancesRequired(int numberOfInstancesRequiredBasedOnRif, - int numberOfInstancesRequiredBasedOnMemoryConsumption, - boolean mcReset, int numberOfInstancesReuquiredBasedOnLoadAverage, - boolean laReset) { - int numberOfInstances = 0; - - int rifBasedRequiredInstances = 0; - int mcBasedRequiredInstances = 0; - int laBasedRequiredInstances = 0; - if (arspiIsSet) { - rifBasedRequiredInstances = numberOfInstancesRequiredBasedOnRif; - } - if (mcReset) { - mcBasedRequiredInstances = numberOfInstancesRequiredBasedOnMemoryConsumption; - } - if (laReset) { - laBasedRequiredInstances = numberOfInstancesReuquiredBasedOnLoadAverage; - } - numberOfInstances = Math.max(Math.max(numberOfInstancesRequiredBasedOnMemoryConsumption, - numberOfInstancesReuquiredBasedOnLoadAverage), numberOfInstancesRequiredBasedOnRif); - return numberOfInstances; - } - - public PartitionAlgorithm getPartitionAlgorithm(String partitionAlgorithm) { - - PartitionAlgorithm autoscaleAlgorithm = null; - //FIXME to not parse for algo when partition is chosen by the parent - - if (partitionAlgorithm == null) { - //Send one after another as default - partitionAlgorithm = StratosConstants.PARTITION_ONE_AFTER_ANOTHER_ALGORITHM_ID; - } - if (log.isDebugEnabled()) { - log.debug(String.format("Retrieving partition algorithm [Partition algorithm]: %s", partitionAlgorithm)); - } - if (StratosConstants.PARTITION_ROUND_ROBIN_ALGORITHM_ID.equals(partitionAlgorithm)) { - autoscaleAlgorithm = new RoundRobin(); - } else if (StratosConstants.PARTITION_ONE_AFTER_ANOTHER_ALGORITHM_ID.equals(partitionAlgorithm)) { - autoscaleAlgorithm = new OneAfterAnother(); - } else { - if (log.isErrorEnabled()) { - log.error(String.format("Partition algorithm %s could not be identified !", partitionAlgorithm)); - } - } - return autoscaleAlgorithm; - } - - public void delegateInstanceCleanup(String memberId) { - try { - // send the instance notification event. - InstanceNotificationPublisher.getInstance().sendInstanceCleanupEventForMember(memberId); - log.info("Instance clean up event sent for [member] " + memberId); - - } catch (Exception e) { - log.error("Cannot terminate instance", e); - } - } - - /** - * Invoked from drools to start an instance. - * - * @param clusterMonitorPartitionContext Cluster monitor partition context - * @param clusterId Cluster id - * @param clusterInstanceId Instance id - * @param isPrimary Is a primary member - */ - public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId, - String clusterInstanceId, boolean isPrimary) { - - try { - String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId(); - - // Calculate accumulation of minimum counts of all the partition of current network partition - int minimumCountOfNetworkPartition; - ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); - ClusterContext clusterContext = clusterMonitor.getClusterContext(); - ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = - clusterContext.getNetworkPartitionCtxt(nwPartitionId); - ClusterInstanceContext clusterInstanceContext = - (ClusterInstanceContext) clusterLevelNetworkPartitionContext. - getInstanceContext(clusterInstanceId); - minimumCountOfNetworkPartition = clusterInstanceContext.getMinInstanceCount(); - - MemberContext memberContext = - AutoscalerCloudControllerClient.getInstance() - .startInstance(clusterMonitorPartitionContext.getPartition(), - clusterId, - clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(), - isPrimary, - minimumCountOfNetworkPartition); - if (memberContext != null) { - ClusterLevelPartitionContext partitionContext = clusterInstanceContext. - getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId()); - partitionContext.addPendingMember(memberContext); - partitionContext.addMemberStatsContext(new MemberStatsContext(memberContext.getMemberId())); - if (log.isDebugEnabled()) { - log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(), - memberContext.getPartition().getId())); - } - - } else { - if (log.isErrorEnabled()) { - log.error("Member context returned from cloud controller is null"); - } - } - } catch (Exception e) { - String message = String.format("Could not start instance: [cluster-id] %s [instance-id] %s", - clusterId, clusterInstanceId); - log.error(message, e); - throw new RuntimeException(message, e); - } - } - - public void delegateScalingDependencyNotification(String clusterId, String networkPartitionId, String instanceId, - int requiredInstanceCount, int minimumInstanceCount) { - - if (log.isDebugEnabled()) { - log.debug("Scaling dependent notification is going to the [parentInstance] " + instanceId); - } - //Notify parent for checking scaling dependencies - ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); - float fMinimumInstanceCount = minimumInstanceCount; - float factor = requiredInstanceCount / fMinimumInstanceCount; - clusterMonitor.sendClusterScalingEvent(networkPartitionId, instanceId, factor); - } - - public void delegateScalingOverMaxNotification(String clusterId, String networkPartitionId, String instanceId) { - if (log.isDebugEnabled()) { - log.debug("Scaling max out notification is going to the [parentInstance] " + instanceId); - } - //Notify parent for checking scaling dependencies - ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); - clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId); - } - - public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, String instanceId) { - if (log.isDebugEnabled()) { - log.debug("Scaling down lower min notification is going to the [parentInstance] " + instanceId); - } - //Notify parent for checking scaling dependencies - ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); - clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId); - } - - public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) { - - try { - //Moving member to pending termination list - if (clusterMonitorPartitionContext.activeMemberAvailable(memberId)) { - - log.info(String.format("[scale-down] Moving active member to termination pending list [member id] %s " + - "[partition] %s [network partition] %s", memberId, - clusterMonitorPartitionContext.getPartitionId(), - clusterMonitorPartitionContext.getNetworkPartitionId())); - clusterMonitorPartitionContext.moveActiveMemberToTerminationPendingMembers(memberId); - clusterMonitorPartitionContext.removeMemberStatsContext(memberId); - } else if (clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) { - - log.info(String.format("[scale-down] Moving pending member to termination pending list [member id] %s " + - "[partition] %s [network partition] %s", memberId, - clusterMonitorPartitionContext.getPartitionId(), - clusterMonitorPartitionContext.getNetworkPartitionId())); - clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId); - clusterMonitorPartitionContext.removeMemberStatsContext(memberId); - } - } catch (Exception e) { - log.error("[scale-down] Cannot move member to termination pending list ", e); - } - } - - public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) { - try { - //calling SM to send the instance notification event. - if (log.isDebugEnabled()) { - log.debug("delegateTerminateDependency:memberId:" + memberId); - } - //InstanceNotificationClient.getInstance().sendMemberCleanupEvent(memberId); - //partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); - //CloudControllerClient.getInstance().terminate(memberId); - } catch (Exception e) { - log.error("Cannot terminate instance", e); - } - } - - public void terminateObsoleteInstance(String memberId) { - try { - AutoscalerCloudControllerClient.getInstance().terminateInstance(memberId); - } catch (Exception e) { - log.error("Cannot terminate instance", e); - } - } - - //Grouping - public void delegateTerminateAll(String clusterId) { - try { - if (log.isDebugEnabled()) { - log.debug("delegateTerminateAll - begin"); - } - AutoscalerCloudControllerClient.getInstance().terminateAllInstances(clusterId); - if (log.isDebugEnabled()) { - log.debug("delegateTerminateAll - done"); - } - } catch (Exception e) { - log.error("Cannot terminate instance", e); - } - } - - public int getPredictedReplicasForStat(int minReplicas, float statUpperLimit, float statPredictedValue) { - if (statUpperLimit == 0) { - return 0; - } - float predictedValue = ((minReplicas / statUpperLimit) * statPredictedValue); - return (int) Math.ceil(predictedValue); - } - - public double getLoadAveragePredictedValue(ClusterInstanceContext clusterInstanceContext) { - double loadAveragePredicted = 0.0d; - int totalMemberCount = 0; - for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) { - for (MemberStatsContext memberStatsContext : partitionContext.getMemberStatsContexts().values()) { - - float memberAverageLoadAverage = memberStatsContext.getLoadAverage().getAverage(); - float memberGredientLoadAverage = memberStatsContext.getLoadAverage().getGradient(); - float memberSecondDerivativeLoadAverage = memberStatsContext.getLoadAverage().getSecondDerivative(); - - double memberPredictedLoadAverage = getPredictedValueForNextMinute(memberAverageLoadAverage, - memberGredientLoadAverage, memberSecondDerivativeLoadAverage, 1); - - if (log.isDebugEnabled()) { - log.debug(String.format("[cluster-instance-id] %s [member-id] %s " + - "[predicted load average] %s " - , clusterInstanceContext.getId(), memberStatsContext.getMemberId() - , memberPredictedLoadAverage)); - } - loadAveragePredicted += memberPredictedLoadAverage; - ++totalMemberCount; - } - } - - if (totalMemberCount > 0) { - log.debug("Predicted load average : " + loadAveragePredicted / totalMemberCount); - return loadAveragePredicted / totalMemberCount; - } else { - return 0; - } - } - - public double getMemoryConsumptionPredictedValue(ClusterInstanceContext clusterInstanceContext) { - double memoryConsumptionPredicted = 0.0d; - int totalMemberCount = 0; - for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) { - for (MemberStatsContext memberStatsContext : partitionContext.getMemberStatsContexts().values()) { - - float memberMemoryConsumptionAverage = memberStatsContext.getMemoryConsumption().getAverage(); - float memberMemoryConsumptionGredient = memberStatsContext.getMemoryConsumption().getGradient(); - float memberMemoryConsumptionSecondDerivative = memberStatsContext.getMemoryConsumption().getSecondDerivative(); - - double memberPredictedMemoryConsumption = getPredictedValueForNextMinute(memberMemoryConsumptionAverage, - memberMemoryConsumptionGredient, memberMemoryConsumptionSecondDerivative, 1); - - if (log.isDebugEnabled()) { - log.debug(String.format("[member-id] %s [predicted memory consumption] %s ", - memberStatsContext.getMemberId() - , memberPredictedMemoryConsumption)); - } - memoryConsumptionPredicted += memberPredictedMemoryConsumption; - ++totalMemberCount; - } - } - - if (totalMemberCount > 0) { - log.debug("Predicted memory consumption : " + memoryConsumptionPredicted / totalMemberCount); - return memoryConsumptionPredicted / totalMemberCount; - } else { - return 0; - } - } + private static boolean arspiIsSet = false; + + private static final Log log = LogFactory.getLog(RuleTasksDelegator.class); + + public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, + int timeInterval) { + double predictedValue; + // s = u * t + 0.5 * a * t * t + if (log.isDebugEnabled()) { + log.debug(String.format("Predicting the value, [average]: %s , [gradient]: %s , [second derivative] " + + ": %s , [time intervals]: %s ", average, gradient, secondDerivative, timeInterval)); + } + predictedValue = average + gradient * timeInterval + 0.5 * secondDerivative * timeInterval * timeInterval; + + return predictedValue; + } + + public int getNumberOfInstancesRequiredBasedOnRif(float rifPredictedValue, float rifThreshold) { + + if (rifThreshold != 0) { + + float requiredNumberOfInstances = rifPredictedValue / rifThreshold; + return (int) Math.ceil(requiredNumberOfInstances); + } else { + log.error("Request in flight threshold is Zero"); + return 0; + } + + } + + public int getNumberOfInstancesRequiredBasedOnMemoryConsumption(float threshold, double predictedValue, int min, + int max) { + double numberOfAdditionalInstancesRequired = 0; + if (predictedValue != threshold) { + + float scalingRange = 100 - threshold; + int instanceRange = max - min; + + if (instanceRange != 0) { + + float gradient = scalingRange / instanceRange; + numberOfAdditionalInstancesRequired = (predictedValue - threshold) / gradient; + } + + if (predictedValue < threshold) { + //Since predicted-value is less, it can be scale-down + return min - 1; + } + } + + return (int) Math.ceil(min + numberOfAdditionalInstancesRequired); + } + + public int getNumberOfInstancesRequiredBasedOnLoadAverage(float threshold, double predictedValue, int min) { + + double numberOfInstances; + if (threshold != 0) { + + numberOfInstances = (min * predictedValue) / threshold; + return (int) Math.ceil(numberOfInstances); + } + + return min; + } + + public int getMaxNumberOfInstancesRequired(int numberOfInstancesRequiredBasedOnRif, + int numberOfInstancesRequiredBasedOnMemoryConsumption, boolean mcReset, + int numberOfInstancesReuquiredBasedOnLoadAverage, boolean laReset) { + int numberOfInstances = 0; + + int rifBasedRequiredInstances = 0; + int mcBasedRequiredInstances = 0; + int laBasedRequiredInstances = 0; + if (arspiIsSet) { + rifBasedRequiredInstances = numberOfInstancesRequiredBasedOnRif; + } + if (mcReset) { + mcBasedRequiredInstances = numberOfInstancesRequiredBasedOnMemoryConsumption; + } + if (laReset) { + laBasedRequiredInstances = numberOfInstancesReuquiredBasedOnLoadAverage; + } + numberOfInstances = Math.max(Math.max(numberOfInstancesRequiredBasedOnMemoryConsumption, + numberOfInstancesReuquiredBasedOnLoadAverage), + numberOfInstancesRequiredBasedOnRif); + return numberOfInstances; + } + + public PartitionAlgorithm getPartitionAlgorithm(String partitionAlgorithm) { + + PartitionAlgorithm autoscaleAlgorithm = null; + //FIXME to not parse for algo when partition is chosen by the parent + + if (partitionAlgorithm == null) { + //Send one after another as default + partitionAlgorithm = StratosConstants.PARTITION_ONE_AFTER_ANOTHER_ALGORITHM_ID; + } + if (log.isDebugEnabled()) { + log.debug(String.format("Retrieving partition algorithm [Partition algorithm]: %s", partitionAlgorithm)); + } + if (StratosConstants.PARTITION_ROUND_ROBIN_ALGORITHM_ID.equals(partitionAlgorithm)) { + autoscaleAlgorithm = new RoundRobin(); + } else if (StratosConstants.PARTITION_ONE_AFTER_ANOTHER_ALGORITHM_ID.equals(partitionAlgorithm)) { + autoscaleAlgorithm = new OneAfterAnother(); + } else { + if (log.isErrorEnabled()) { + log.error(String.format("Partition algorithm %s could not be identified !", partitionAlgorithm)); + } + } + return autoscaleAlgorithm; + } + + public void delegateInstanceCleanup(String memberId) { + try { + // send the instance notification event. + InstanceNotificationPublisher.getInstance().sendInstanceCleanupEventForMember(memberId); + log.info("Instance clean up event sent for [member] " + memberId); + + } catch (Exception e) { + log.error("Cannot terminate instance", e); + } + } + + /** + * Invoked from drools to start an instance. + * + * @param clusterMonitorPartitionContext Cluster monitor partition context + * @param clusterId Cluster id + * @param clusterInstanceId Instance id + * @param isPrimary Is a primary member + */ + public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId, + String clusterInstanceId, boolean isPrimary, String autoscalingReason, long scalingTime) { + + try { + String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId(); + + // Calculate accumulation of minimum counts of all the partition of current network partition + int minimumCountOfNetworkPartition; + ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); + ClusterContext clusterContext = clusterMonitor.getClusterContext(); + ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext = + clusterContext.getNetworkPartitionCtxt(nwPartitionId); + ClusterInstanceContext clusterInstanceContext = + (ClusterInstanceContext) clusterLevelNetworkPartitionContext. + getInstanceContext( + clusterInstanceId); + minimumCountOfNetworkPartition = clusterInstanceContext.getMinInstanceCount(); + + MemberContext memberContext = AutoscalerCloudControllerClient.getInstance().startInstance( + clusterMonitorPartitionContext.getPartition(), clusterId, clusterInstanceId, + clusterMonitorPartitionContext.getNetworkPartitionId(), isPrimary, minimumCountOfNetworkPartition, + autoscalingReason, scalingTime); + if (memberContext != null) { + ClusterLevelPartitionContext partitionContext = clusterInstanceContext. + getPartitionCtxt( + clusterMonitorPartitionContext + .getPartitionId()); + partitionContext.addPendingMember(memberContext); + partitionContext.addMemberStatsContext(new MemberStatsContext(memberContext.getMemberId())); + if (log.isDebugEnabled()) { + log.debug(String.format("Pending member added, [member] %s [partition] %s", + memberContext.getMemberId(), memberContext.getPartition().getId())); + } + + } else { + if (log.isErrorEnabled()) { + log.error("Member context returned from cloud controller is null"); + } + } + } catch (Exception e) { + String message = String.format("Could not start instance: [cluster-id] %s [instance-id] %s", clusterId, + clusterInstanceId); + log.error(message, e); + throw new RuntimeException(message, e); + } + } + + public void delegateScalingDependencyNotification(String clusterId, String networkPartitionId, String instanceId, + int requiredInstanceCount, int minimumInstanceCount) { + + if (log.isDebugEnabled()) { + log.debug("Scaling dependent notification is going to the [parentInstance] " + instanceId); + } + //Notify parent for checking scaling dependencies + ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); + float fMinimumInstanceCount = minimumInstanceCount; + float factor = requiredInstanceCount / fMinimumInstanceCount; + clusterMonitor.sendClusterScalingEvent(networkPartitionId, instanceId, factor); + } + + public void delegateScalingOverMaxNotification(String clusterId, String networkPartitionId, String instanceId) { + if (log.isDebugEnabled()) { + log.debug("Scaling max out notification is going to the [parentInstance] " + instanceId); + } + //Notify parent for checking scaling dependencies + ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); + clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId); + } + + public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, + String instanceId) { + if (log.isDebugEnabled()) { + log.debug("Scaling down lower min notification is going to the [parentInstance] " + instanceId); + } + //Notify parent for checking scaling dependencies + ClusterMonitor clusterMonitor = AutoscalerContext.getInstance().getClusterMonitor(clusterId); + clusterMonitor.sendScalingDownBeyondMinEvent(networkPartitionId, instanceId); + } + + public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) { + + try { + //Moving member to pending termination list + if (clusterMonitorPartitionContext.activeMemberAvailable(memberId)) { + + log.info(String.format("[scale-down] Moving active member to termination pending list [member id] %s " + + "[partition] %s [network partition] %s", memberId, + clusterMonitorPartitionContext.getPartitionId(), + clusterMonitorPartitionContext.getNetworkPartitionId())); + clusterMonitorPartitionContext.moveActiveMemberToTerminationPendingMembers(memberId); + clusterMonitorPartitionContext.removeMemberStatsContext(memberId); + } else if (clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) { + + log.info(String.format( + "[scale-down] Moving pending member to termination pending list [member id] %s " + + "[partition] %s [network partition] %s", memberId, + clusterMonitorPartitionContext.getPartitionId(), + clusterMonitorPartitionContext.getNetworkPartitionId())); + clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId); + clusterMonitorPartitionContext.removeMemberStatsContext(memberId); + } + } catch (Exception e) { + log.error("[scale-down] Cannot move member to termination pending list ", e); + } + } + + public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, + String memberId) { + try { + //calling SM to send the instance notification event. + if (log.isDebugEnabled()) { + log.debug("delegateTerminateDependency:memberId:" + memberId); + } + //InstanceNotificationClient.getInstance().sendMemberCleanupEvent(memberId); + //partitionContext.moveActiveMemberToTerminationPendingMembers(memberId); + //CloudControllerClient.getInstance().terminate(memberId); + } catch (Exception e) { + log.error("Cannot terminate instance", e); + } + } + + public void terminateObsoleteInstance(String memberId) { + try { + AutoscalerCloudControllerClient.getInstance().terminateInstance(memberId); + } catch (Exception e) { + log.error("Cannot terminate instance", e); + } + } + + //Grouping + public void delegateTerminateAll(String clusterId) { + try { + if (log.isDebugEnabled()) { + log.debug("delegateTerminateAll - begin"); + } + AutoscalerCloudControllerClient.getInstance().terminateAllInstances(clusterId); + if (log.isDebugEnabled()) { + log.debug("delegateTerminateAll - done"); + } + } catch (Exception e) { + log.error("Cannot terminate instance", e); + } + } + + public int getPredictedReplicasForStat(int minReplicas, float statUpperLimit, float statPredictedValue) { + if (statUpperLimit == 0) { + return 0; + } + float predictedValue = ((minReplicas / statUpperLimit) * statPredictedValue); + return (int) Math.ceil(predictedValue); + } + + public double getLoadAveragePredictedValue(ClusterInstanceContext clusterInstanceContext) { + double loadAveragePredicted = 0.0d; + int totalMemberCount = 0; + for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) { + for (MemberStatsContext memberStatsContext : partitionContext.getMemberStatsContexts().values()) { + + float memberAverageLoadAverage = memberStatsContext.getLoadAverage().getAverage(); + float memberGredientLoadAverage = memberStatsContext.getLoadAverage().getGradient(); + float memberSecondDerivativeLoadAverage = memberStatsContext.getLoadAverage().getSecondDerivative(); + + double memberPredictedLoadAverage = + getPredictedValueForNextMinute(memberAverageLoadAverage, memberGredientLoadAverage, + memberSecondDerivativeLoadAverage, 1); + + if (log.isDebugEnabled()) { + log.debug(String.format("[cluster-instance-id] %s [member-id] %s " + "[predicted load average] %s ", + clusterInstanceContext.getId(), memberStatsContext.getMemberId(), + memberPredictedLoadAverage)); + } + loadAveragePredicted += memberPredictedLoadAverage; + ++totalMemberCount; + } + } + + if (totalMemberCount > 0) { + log.debug("Predicted load average : " + loadAveragePredicted / totalMemberCount); + return loadAveragePredicted / totalMemberCount; + } else { + return 0; + } + } + + public double getMemoryConsumptionPredictedValue(ClusterInstanceContext clusterInstanceContext) { + double memoryConsumptionPredicted = 0.0d; + int totalMemberCount = 0; + for (ClusterLevelPartitionContext partitionContext : clusterInstanceContext.getPartitionCtxts()) { + for (MemberStatsContext memberStatsContext : partitionContext.getMemberStatsContexts().values()) { + + float memberMemoryConsumptionAverage = memberStatsContext.getMemoryConsumption().getAverage(); + float memberMemoryConsumptionGredient = memberStatsContext.getMemoryConsumption().getGradient(); + float memberMemoryConsumptionSecondDerivative = + memberStatsContext.getMemoryConsumption().getSecondDerivative(); + + double memberPredictedMemoryConsumption = + getPredictedValueForNextMinute(memberMemoryConsumptionAverage, memberMemoryConsumptionGredient, + memberMemoryConsumptionSecondDerivative, 1); + + if (log.isDebugEnabled()) { + log.debug(String.format("[member-id] %s [predicted memory consumption] %s ", + memberStatsContext.getMemberId(), memberPredictedMemoryConsumption)); + } + memoryConsumptionPredicted += memberPredictedMemoryConsumption; + ++totalMemberCount; + } + } + + if (totalMemberCount > 0) { + log.debug("Predicted memory consumption : " + memoryConsumptionPredicted / totalMemberCount); + return memoryConsumptionPredicted / totalMemberCount; + } else { + return 0; + } + } }
