Adding Metering and Monitoring Service Implementation
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1eeead43 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1eeead43 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1eeead43 Branch: refs/heads/data-publisher-integration Commit: 1eeead432f756d6327dbf038c4bcbbc4da2d0ead Parents: 00ed5fa Author: Thanuja <[email protected]> Authored: Wed Jul 29 18:51:34 2015 +0530 Committer: Thanuja <[email protected]> Committed: Wed Jul 29 18:51:34 2015 +0530 ---------------------------------------------------------------------- .../client/AutoscalerCloudControllerClient.java | 16 +- .../autoscaler/rule/RuleTasksDelegator.java | 23 +- .../publisher/HealthStatisticsNotifier.java | 10 +- .../messaging/topology/TopologyBuilder.java | 70 ++- .../impl/CloudControllerServiceImpl.java | 6 +- .../impl/CloudControllerServiceUtil.java | 15 +- .../services/impl/InstanceCreator.java | 18 +- .../publisher/BAMUsageDataPublisher.java | 44 +- .../util/CloudControllerConstants.java | 4 + .../common/constants/StratosConstants.java | 8 +- .../publisher/HealthStatisticsPublisher.java | 3 +- .../publisher/InFlightRequestPublisher.java | 4 +- .../cep/WSO2CEPHealthStatisticsPublisher.java | 9 +- .../cep/WSO2CEPInFlightRequestPublisher.java | 6 +- .../LoadBalancerStatisticsNotifier.java | 3 +- .../publisher/MockHealthStatisticsNotifier.java | 3 + .../modules/healthstatspublisher/healthstats.py | 5 +- .../HealthStatsEventFormatter.xml | 30 ++ .../eventformatters/RIFEventFormatter.xml | 31 ++ .../DASDefaultWSO2EventOutputAdaptor.xml | 29 ++ .../streamdefinitions/stream-manager-config.xml | 486 ++++++++++--------- extensions/das/README.md | 10 + .../CloudControllerEventReceiver.xml | 29 ++ .../eventreceivers/HealthStatsEventReceiver.xml | 29 ++ .../eventreceivers/RIFEventReceiver.xml | 29 ++ .../eventsink/cartridge_agent_health_stats.xml | 85 ++++ .../artifacts/eventsink/in_flight_requests.xml | 64 +++ .../org_apache_stratos_cloud_controller.xml | 211 ++++++++ .../cartridge_agent_health_stats_1.0.0.json | 40 ++ .../eventstreams/in_flight_requests_1.0.0.json | 28 ++ ...g.apache.stratos.cloud.controller_1.0.0.json | 112 +++++ extensions/das/artifacts/sparkscript/CCEvent | 18 + extensions/das/pom.xml | 40 ++ extensions/das/spark-udf/pom.xml | 36 ++ .../das/extension/spark/udf/TimeUDF.java | 49 ++ extensions/pom.xml | 4 +- .../src/main/conf/drools/dependent-scaling.drl | 4 +- .../src/main/conf/drools/mincheck.drl | 5 +- .../src/main/conf/drools/scaling.drl | 7 +- 39 files changed, 1311 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java index f944a9f..c65a5f7 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java @@ -84,7 +84,8 @@ public class AutoscalerCloudControllerClient { public synchronized MemberContext startInstance(PartitionRef partition, String clusterId, String clusterInstanceId, String networkPartitionId, boolean isPrimary, - int minMemberCount) throws SpawningException { + int minMemberCount, String autoscalingReason, + long scalingTime) throws SpawningException { try { if (log.isInfoEnabled()) { log.info(String.format("Trying to spawn an instance via cloud controller: " + @@ -115,8 +116,18 @@ public class AutoscalerCloudControllerClient { minCountProp.setName(StratosConstants.MIN_COUNT); minCountProp.setValue(String.valueOf(minMemberCount)); + Property autoscalingReasonProp = new Property(); + autoscalingReasonProp.setName(StratosConstants.SCALING_REASON); + autoscalingReasonProp.setValue(autoscalingReason); + + Property scalingTimeProp = new Property(); + scalingTimeProp.setName(StratosConstants.SCALING_TIME); + scalingTimeProp.setValue(String.valueOf(scalingTime)); + memberContextProps.addProperty(isPrimaryProp); memberContextProps.addProperty(minCountProp); + memberContextProps.addProperty(autoscalingReasonProp); + memberContextProps.addProperty(scalingTimeProp); instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps)); long startTime = System.currentTimeMillis(); @@ -228,7 +239,8 @@ public class AutoscalerCloudControllerClient { public void terminateAllInstances(String clusterId) throws RemoteException, CloudControllerServiceInvalidClusterExceptionException { if (log.isInfoEnabled()) { - log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", clusterId)); + log.info(String.format("Terminating all instances of cluster via cloud controller: " + + "[cluster] %s", clusterId)); } long startTime = System.currentTimeMillis(); stub.terminateInstances(clusterId); http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index 51443a1..733ce57 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -36,7 +36,6 @@ import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetwo import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; -import org.apache.stratos.common.client.CloudControllerServiceClient; import org.apache.stratos.common.constants.StratosConstants; /** @@ -48,7 +47,8 @@ public class RuleTasksDelegator { private static final Log log = LogFactory.getLog(RuleTasksDelegator.class); - public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, int timeInterval) { + public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, + int timeInterval) { double predictedValue; // s = u * t + 0.5 * a * t * t if (log.isDebugEnabled()) { @@ -175,9 +175,11 @@ public class RuleTasksDelegator { * @param clusterId Cluster id * @param clusterInstanceId Instance id * @param isPrimary Is a primary member + * @param autoscalingReason scaling reason for member + * @param scalingTime scaling time */ public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId, - String clusterInstanceId, boolean isPrimary) { + String clusterInstanceId, boolean isPrimary, String autoscalingReason, long scalingTime) { try { String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId(); @@ -199,14 +201,15 @@ public class RuleTasksDelegator { clusterId, clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(), isPrimary, - minimumCountOfNetworkPartition); + minimumCountOfNetworkPartition, autoscalingReason, scalingTime); if (memberContext != null) { ClusterLevelPartitionContext partitionContext = clusterInstanceContext. getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId()); partitionContext.addPendingMember(memberContext); partitionContext.addMemberStatsContext(new MemberStatsContext(memberContext.getMemberId())); if (log.isDebugEnabled()) { - log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(), + log.debug(String.format("Pending member added, [member] %s [partition] %s", + memberContext.getMemberId(), memberContext.getPartition().getId())); } @@ -245,7 +248,8 @@ public class RuleTasksDelegator { clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId); } - public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, String instanceId) { + public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, + String instanceId) { if (log.isDebugEnabled()) { log.debug("Scaling down lower min notification is going to the [parentInstance] " + instanceId); } @@ -268,8 +272,8 @@ public class RuleTasksDelegator { clusterMonitorPartitionContext.removeMemberStatsContext(memberId); } else if (clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) { - log.info(String.format("[scale-down] Moving pending member to termination pending list [member id] %s " + - "[partition] %s [network partition] %s", memberId, + log.info(String.format("[scale-down] Moving pending member to termination pending list " + + "[member id] %s " + "[partition] %s [network partition] %s", memberId, clusterMonitorPartitionContext.getPartitionId(), clusterMonitorPartitionContext.getNetworkPartitionId())); clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId); @@ -280,7 +284,8 @@ public class RuleTasksDelegator { } } - public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) { + public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, + String memberId) { try { //calling SM to send the instance notification event. if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java index 74c5156..5ab2ebf 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java @@ -51,7 +51,8 @@ public class HealthStatisticsNotifier implements Runnable { File pluginFile = new File(pluginFileName); if ((pluginFile != null) && (pluginFile.exists())) { - List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile, IHealthStatisticsReader.class); + List<Class> pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile, + IHealthStatisticsReader.class); if (!pluginClass.isEmpty()) { try { log.trace("Instantiating new instance of plugin type " + pluginClass); @@ -63,7 +64,8 @@ public class HealthStatisticsNotifier implements Runnable { } } } else { - log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" : "Doesn't exist")); + log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" : + "Doesn't exist")); } } if (this.statsReader == null) { @@ -95,7 +97,7 @@ public class HealthStatisticsNotifier implements Runnable { if (log.isDebugEnabled()) { log.debug(String.format("Publishing memory consumption: %f", stats.getMemoryUsage())); } - statsPublisher.publish( + statsPublisher.publish(System.currentTimeMillis(), CartridgeAgentConfiguration.getInstance().getClusterId(), CartridgeAgentConfiguration.getInstance().getClusterInstanceId(), CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(), @@ -108,7 +110,7 @@ public class HealthStatisticsNotifier implements Runnable { if (log.isDebugEnabled()) { log.debug(String.format("Publishing load average: %f", stats.getProcessorUsage())); } - statsPublisher.publish( + statsPublisher.publish(System.currentTimeMillis(), CartridgeAgentConfiguration.getInstance().getClusterId(), CartridgeAgentConfiguration.getInstance().getClusterInstanceId(), CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(), http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index f04a11f..419c711 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -31,6 +31,7 @@ import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.common.Property; +import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.application.ClusterDataHolder; import org.apache.stratos.messaging.domain.instance.ClusterInstance; import org.apache.stratos.messaging.domain.topology.*; @@ -67,7 +68,8 @@ public class TopologyBuilder { TopologyManager.acquireWriteLock(); for (Cartridge cartridge : cartridgeList) { if (!topology.serviceExists(cartridge.getType())) { - ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant; + ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : + ServiceType.SingleTenant; service = new Service(cartridge.getType(), serviceType); Properties properties = new Properties(); @@ -199,14 +201,14 @@ public class TopologyBuilder { } log.debug("Creating cluster port mappings: [appication-id] " + appId); - for(Cluster cluster : appClusters) { + for (Cluster cluster : appClusters) { String cartridgeType = cluster.getServiceName(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - if(cartridge == null) { + if (cartridge == null) { throw new CloudControllerException("Cartridge not found: [cartridge-type] " + cartridgeType); } - for(PortMapping portMapping : cartridge.getPortMappings()) { + for (PortMapping portMapping : cartridge.getPortMappings()) { ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appId, cluster.getClusterId(), portMapping.getName(), portMapping.getProtocol(), portMapping.getPort(), portMapping.getProxyPort()); @@ -406,6 +408,11 @@ public class TopologyBuilder { String partitionId = memberContext.getPartition().getId(); String lbClusterId = memberContext.getLbClusterId(); long initTime = memberContext.getInitTime(); + String autoscalingReason = memberContext.getProperties().getProperty( + StratosConstants.SCALING_REASON).getValue(); + long scalingTime = Long.parseLong(memberContext.getProperties().getProperty( + StratosConstants.SCALING_TIME).getValue()); + if (cluster.memberExists(memberId)) { log.warn(String.format("Member %s already exists", memberId)); @@ -421,6 +428,19 @@ public class TopologyBuilder { member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); cluster.addMember(member); TopologyManager.updateTopology(topology); + //member created time + Long timeStamp = System.currentTimeMillis(); + //publishing to BAM + BAMUsageDataPublisher + .publish(memberContext.getMemberId(), + memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), + memberContext.getClusterInstanceId(), + memberContext.getCartridgeType(), + MemberStatus.Created.toString(), + timeStamp, autoscalingReason, + scalingTime, null); } finally { TopologyManager.releaseWriteLock(); } @@ -479,16 +499,18 @@ public class TopologyBuilder { log.info("Member status updated to initialized"); TopologyManager.updateTopology(topology); - + //member intialized time + Long timeStamp = System.currentTimeMillis(); TopologyEventPublisher.sendMemberInitializedEvent(memberContext); //publishing data BAMUsageDataPublisher.publish(memberContext.getMemberId(), memberContext.getPartition().getId(), memberContext.getNetworkPartitionId(), + memberContext.getClusterInstanceId(), memberContext.getClusterId(), memberContext.getCartridgeType(), MemberStatus.Initialized.toString(), - null); + timeStamp, null, null, null); } } finally { TopologyManager.releaseWriteLock(); @@ -542,16 +564,19 @@ public class TopologyBuilder { log.info("member started event adding status started"); TopologyManager.updateTopology(topology); + //member started time + Long timeStamp = System.currentTimeMillis(); //memberStartedEvent. TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); //publishing data BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(), instanceStartedEvent.getPartitionId(), instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getClusterInstanceId(), instanceStartedEvent.getClusterId(), instanceStartedEvent.getServiceName(), MemberStatus.Starting.toString(), - null); + timeStamp, null, null, null); } } finally { TopologyManager.releaseWriteLock(); @@ -602,7 +627,8 @@ public class TopologyBuilder { TopologyManager.acquireWriteLock(); // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.Active)) { - log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]"); + log.error("Invalid state transition from [" + member.getStatus() + "] to [" + + MemberStatus.Active + "]"); return; } else { member.setStatus(MemberStatus.Active); @@ -644,7 +670,8 @@ public class TopologyBuilder { memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); TopologyManager.updateTopology(topology); - + //member activated time + Long timeStamp = System.currentTimeMillis(); // Publish member activated event TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); @@ -652,10 +679,11 @@ public class TopologyBuilder { BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), memberActivatedEvent.getPartitionId(), memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getClusterInstanceId(), memberActivatedEvent.getClusterId(), memberActivatedEvent.getServiceName(), MemberStatus.Active.toString(), - null); + timeStamp, null, null, null); } } finally { TopologyManager.releaseWriteLock(); @@ -694,6 +722,8 @@ public class TopologyBuilder { instanceReadyToShutdownEvent.getMemberId(), instanceReadyToShutdownEvent.getNetworkPartitionId(), instanceReadyToShutdownEvent.getPartitionId()); + //member ReadyToShutDown state change time + Long timeStamp = null; try { TopologyManager.acquireWriteLock(); @@ -706,6 +736,7 @@ public class TopologyBuilder { log.info("Member Ready to shut down event adding status started"); TopologyManager.updateTopology(topology); + timeStamp = System.currentTimeMillis(); } finally { TopologyManager.releaseWriteLock(); } @@ -714,10 +745,11 @@ public class TopologyBuilder { BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), instanceReadyToShutdownEvent.getPartitionId(), instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), instanceReadyToShutdownEvent.getClusterId(), instanceReadyToShutdownEvent.getServiceName(), MemberStatus.ReadyToShutDown.toString(), - null); + timeStamp, null, null, null); //termination of particular instance will be handled by autoscaler } @@ -834,7 +866,8 @@ public class TopologyBuilder { } } - public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { + public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent + clusterStatusClusterActivatedEvent) { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); @@ -888,7 +921,8 @@ public class TopologyBuilder { } else { log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + " [instance-id] %s [current-status] %s [status-requested] %s", - clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(), + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(), status)); return; } @@ -997,8 +1031,8 @@ public class TopologyBuilder { cluster.removeInstanceContext(event.getInstanceId()); TopologyManager.updateTopology(topology); //publishing data - ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); + ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent( + event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId()); TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); } else { @@ -1041,15 +1075,15 @@ public class TopologyBuilder { log.info("Cluster Terminating started for " + cluster.getClusterId()); TopologyManager.updateTopology(topology); //publishing data - ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); + ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent( + event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId()); TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); // Remove kubernetes services if available ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(event.getClusterId()); - if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { + if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId()); } } else { http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index 4d51cc1..2b19b05 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -447,13 +447,13 @@ public class CloudControllerServiceImpl implements CloudControllerService { clusterContext.setVolumes(volumes); } - // Handle member created event - TopologyBuilder.handleMemberCreatedEvent(memberContext); - // Persist member context CloudControllerContext.getInstance().addMemberContext(memberContext); CloudControllerContext.getInstance().persist(); + // Handle member created event + TopologyBuilder.handleMemberCreatedEvent(memberContext); + // Start instance in a new thread if (log.isDebugEnabled()) { log.debug(String.format("Starting instance creator thread: [cluster] %s [cluster-instance] %s " + http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java index 37580eb..e7be3a6 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java @@ -65,18 +65,21 @@ public class CloudControllerServiceUtil { TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(), memberContext.getClusterId(), memberContext.getNetworkPartitionId(), partitionId, memberContext.getMemberId()); - + //member terminated time + Long timeStamp = System.currentTimeMillis(); // Publish statistics to BAM BAMUsageDataPublisher.publish(memberContext.getMemberId(), partitionId, memberContext.getNetworkPartitionId(), + memberContext.getClusterInstanceId(), memberContext.getClusterId(), memberContext.getCartridgeType(), MemberStatus.Terminated.toString(), - null); + timeStamp, null, null, null); // Remove member context - CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), memberContext.getMemberId()); + CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), + memberContext.getMemberId()); // Persist cloud controller context CloudControllerContext.getInstance().persist(); @@ -87,7 +90,8 @@ public class CloudControllerServiceUtil { return isValid; } - public static IaasProvider validatePartitionAndGetIaasProvider(Partition partition, IaasProvider iaasProvider) throws InvalidPartitionException { + public static IaasProvider validatePartitionAndGetIaasProvider(Partition partition, IaasProvider iaasProvider) + throws InvalidPartitionException { if (iaasProvider != null) { // if this is a IaaS based partition Iaas iaas = iaasProvider.getIaas(); @@ -104,7 +108,8 @@ public class CloudControllerServiceUtil { } } - public static boolean validatePartition(Partition partition, IaasProvider iaasProvider) throws InvalidPartitionException { + public static boolean validatePartition(Partition partition, IaasProvider iaasProvider) + throws InvalidPartitionException { validatePartitionAndGetIaasProvider(partition, iaasProvider); return true; } http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java index 77cfea2..c0dbf57 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java @@ -27,8 +27,6 @@ import org.apache.stratos.cloud.controller.domain.*; import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException; import org.apache.stratos.cloud.controller.iaases.Iaas; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; -import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; -import org.apache.stratos.messaging.domain.topology.MemberStatus; import java.util.concurrent.locks.Lock; @@ -68,7 +66,8 @@ public class InstanceCreator implements Runnable { memberContext = startInstance(iaas, memberContext, payload); if (log.isInfoEnabled()) { - log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s [instance-id] %s " + + log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s " + + "[instance-id] %s " + "[default-private-ip] %s [default-public-ip] %s", memberContext.getCartridgeType(), memberContext.getClusterId(), memberContext.getInstanceId(), memberContext.getDefaultPrivateIP(), @@ -84,16 +83,6 @@ public class InstanceCreator implements Runnable { // Update topology TopologyBuilder.handleMemberInitializedEvent(memberContext); - - // Publish instance creation statistics to BAM - BAMUsageDataPublisher.publish( - memberContext.getMemberId(), - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), - memberContext.getInstanceMetadata()); } catch (Exception e) { String message = String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s", memberContext.getCartridgeType(), memberContext.getClusterId()); @@ -105,7 +94,8 @@ public class InstanceCreator implements Runnable { } } - private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws CartridgeNotFoundException { + private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws + CartridgeNotFoundException { memberContext = iaas.startInstance(memberContext, payload); // Validate instance id http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java index d5aabbd..690bc59 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java @@ -52,12 +52,31 @@ public class BAMUsageDataPublisher { private static StreamDefinition streamDefinition; private static final String cloudControllerEventStreamVersion = "1.0.0"; + /** + * Publish events to BAM + * + * @param memberId member id + * @param partitionId partition id + * @param networkId network partition id + * @param clusterId cluster id + * @param clusterInstanceId cluster instance id + * @param serviceName service name + * @param status member status + * @param timeStamp time + * @param autoscalingReason scaling reason related to member + * @param scalingTime scaling time + * @param metadata meta-data + */ public static void publish(String memberId, String partitionId, String networkId, String clusterId, + String clusterInstanceId, String serviceName, String status, + Long timeStamp, + String autoscalingReason, + Long scalingTime, InstanceMetadata metadata) { if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) { return; @@ -79,16 +98,23 @@ public class BAMUsageDataPublisher { MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId); String cartridgeType = memberContext.getCartridgeType(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); + String instanceType = CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridgeType, + partitionId).getProperty(CloudControllerConstants.INSTANCE_TYPE); //Construct the data to be published List<Object> payload = new ArrayList<Object>(); // Payload values + payload.add(timeStamp); payload.add(memberId); payload.add(serviceName); payload.add(clusterId); + payload.add(clusterInstanceId); payload.add(handleNull(memberContext.getLbClusterId())); payload.add(handleNull(partitionId)); payload.add(handleNull(networkId)); + payload.add(handleNull(instanceType)); + payload.add(handleNull(autoscalingReason)); + payload.add(handleNull(scalingTime)); if (cartridge != null) { payload.add(handleNull(String.valueOf(cartridge.isMultiTenant()))); } else { @@ -129,12 +155,14 @@ public class BAMUsageDataPublisher { try { if (log.isDebugEnabled()) { - log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); + log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), + streamDefinition.getVersion())); } dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); } catch (AgentException e) { if (log.isErrorEnabled()) { - log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e); + log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", + streamDefinition.getName(), streamDefinition.getVersion()), e); } } } @@ -151,12 +179,17 @@ public class BAMUsageDataPublisher { streamDefinition.setDescription("Instances booted up by the Cloud Controller"); // Payload definition List<Attribute> payloadData = new ArrayList<Attribute>(); + payloadData.add(new Attribute(CloudControllerConstants.TIME_STAMP, AttributeType.LONG)); payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.SCALING_REASON, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.SCALING_TIME, AttributeType.LONG)); payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, AttributeType.STRING)); @@ -210,4 +243,11 @@ public class BAMUsageDataPublisher { } return val; } + + private static Long handleNull(Long val) { + if (val == null) { + return -1L; + } + return val; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java index 5e6115f..2cb0c31 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java @@ -103,6 +103,7 @@ public final class CloudControllerConstants { public static final String MEMBER_ID_COL = "memberId"; public static final String CARTRIDGE_TYPE_COL = "cartridgeType"; public static final String CLUSTER_ID_COL = "clusterId"; + public static final String CLUSTER_INSTANCE_ID_COL = "clusterInstanceId"; public static final String PARTITION_ID_COL = "partitionId"; public static final String NETWORK_ID_COL = "networkId"; public static final String ALIAS_COL = "alias"; @@ -122,6 +123,9 @@ public final class CloudControllerConstants { public static final String PRIV_IP_COL = "privateIPAddresses"; public static final String PUB_IP_COL = "publicIPAddresses"; public static final String ALLOCATE_IP_COL = "allocateIPAddresses"; + public static final String TIME_STAMP = "timeStamp"; + public static final String SCALING_REASON = "scalingReason"; + public static final String SCALING_TIME = "scalingTime"; /** * Properties http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java index 1275f5c..af46cfe 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java @@ -91,7 +91,8 @@ public class StratosConstants { // metering constants public static final String THROTTLING_ALL_ACTION = "all_actions"; - public static final String THROTTLING_IN_DATA_ACTION = "in_data_action"; //this covers registry capacity + registry bandwidth + public static final String THROTTLING_IN_DATA_ACTION = + "in_data_action"; //this covers registry capacity + registry bandwidth public static final String THROTTLING_OUT_DATA_ACTION = "out_data_action"; //this covers registry bandwidth public static final String THROTTLING_ADD_USER_ACTION = "add_user_action"; public static final String THROTTLING_SERVICE_IN_BANDWIDTH_ACTION = "service_in_bandwith_action"; @@ -158,6 +159,8 @@ public class StratosConstants { public static final String MAX_CHECK_DROOL_FILE = "maxcheck.drl"; public static final String OBSOLETE_CHECK_DROOL_FILE = "obsoletecheck.drl"; public static final String MIN_COUNT = "MIN_COUNT"; + public static final String SCALING_REASON = "SCALING_REASON"; + public static final String SCALING_TIME = "SCALING_TIME"; // Policy and definition related constants public static final int PUBLIC_DEFINITION = 0; @@ -165,7 +168,8 @@ public class StratosConstants { // member expiry timeout constants public static final String PENDING_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingMemberExpiryTimeout"; public static final String OBSOLETED_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.obsoletedMemberExpiryTimeout"; - public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingTerminationMemberExpiryTimeout"; + public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = + "autoscaler.member.pendingTerminationMemberExpiryTimeout"; public static final String FILTER_VALUE_SEPARATOR = ","; public static final String TOPOLOGY_APPLICATION_FILTER = "stratos.topology.application.filter"; http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java index dd7ddd4..95b04ff 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java @@ -27,6 +27,7 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher { /** * Publish health statistics to complex event processor. * + * @param timeStamp time * @param clusterId Cluster id of the member * @param clusterInstanceId Cluster instance id of the member * @param networkPartitionId Network partition id of the member @@ -35,6 +36,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher { * @param health Health type: memory_consumption | load_average * @param value Health type value */ - void publish(String clusterId, String clusterInstanceId, String networkPartitionId, + void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value); } http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java index 289be8b..af9c8e9 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java @@ -27,10 +27,12 @@ public interface InFlightRequestPublisher extends StatisticsPublisher { /** * Publish in-flight request count. * + * @param timeStamp time * @param clusterId Cluster id * @param clusterInstanceId Cluster instance id * @param networkPartitionId Network partition id of the cluster * @param inFlightRequestCount In-flight request count of the cluster */ - void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount); + void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + int inFlightRequestCount); } http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java index 1dc4240..d5c9265 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java @@ -52,6 +52,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher // Set payload definition List<Attribute> payloadData = new ArrayList<Attribute>(); + payloadData.add(new Attribute("time_stamp", AttributeType.LONG)); payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING)); payloadData.add(new Attribute("network_partition_id", AttributeType.STRING)); @@ -70,6 +71,7 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher /** * Publish health statistics to cep. * + * @param timeStamp * @param clusterId * @param clusterInstanceId * @param networkPartitionId @@ -79,13 +81,16 @@ public class WSO2CEPHealthStatisticsPublisher extends WSO2CEPStatisticsPublisher * @param value */ @Override - public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value) { + public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + String memberId, String partitionId, String health, double value) { if (log.isDebugEnabled()) { - log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s [partition] %s [member] %s [health] %s [value] %f", + log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s " + + "[partition] %s [member] %s [health] %s [value] %f", clusterId, networkPartitionId, partitionId, memberId, health, value)); } // Set payload values List<Object> payload = new ArrayList<Object>(); + payload.add(timeStamp); payload.add(clusterId); payload.add(clusterInstanceId); payload.add(networkPartitionId); http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java index 2ed8883..f51eb91 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java @@ -51,6 +51,7 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher List<Attribute> payloadData = new ArrayList<Attribute>(); // Set payload definition + payloadData.add(new Attribute("time_stamp", AttributeType.LONG)); payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING)); payloadData.add(new Attribute("network_partition_id", AttributeType.STRING)); @@ -65,15 +66,18 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher /** * Publish in-flight request count of a cluster. * + * @param timeStamp * @param clusterId * @param clusterInstanceId * @param networkPartitionId * @param inFlightRequestCount */ @Override - public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount) { + public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + int inFlightRequestCount) { // Set payload values List<Object> payload = new ArrayList<Object>(); + payload.add(timeStamp); payload.add(clusterId); payload.add(clusterInstanceId); payload.add(networkPartitionId); http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java index dc2233d..1dd12c7 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -81,7 +81,8 @@ public class LoadBalancerStatisticsNotifier implements Runnable { for (Cluster cluster : service.getClusters()) { // Publish in-flight request count of load balancer's network partition int requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); - inFlightRequestPublisher.publish(cluster.getClusterId(), clusterInstanceId, + inFlightRequestPublisher.publish(System.currentTimeMillis(), cluster.getClusterId(), + clusterInstanceId, networkPartitionId, requestCount); if (log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java index c2d1c6c..0dc5e67 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java @@ -69,6 +69,7 @@ public class MockHealthStatisticsNotifier implements Runnable { mockMemberContext.getMemberId(), memoryConsumption)); } healthStatisticsPublisher.publish( + System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), @@ -93,6 +94,7 @@ public class MockHealthStatisticsNotifier implements Runnable { mockMemberContext.getMemberId(), loadAvereage)); } healthStatisticsPublisher.publish( + System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), @@ -116,6 +118,7 @@ public class MockHealthStatisticsNotifier implements Runnable { mockMemberContext.getMemberId(), requestsInFlight)); } inFlightRequestPublisher.publish( + System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py index 9753c3e..aae9e9d 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py @@ -17,7 +17,7 @@ from threading import Thread import multiprocessing - +import time import psutil from abstracthealthstatisticspublisher import * @@ -124,6 +124,7 @@ class HealthStatisticsPublisher: stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION # stream_def.add_payloaddata_attribute() + stream_def.add_payloaddata_attribute("time_stamp", StreamDefinition.LONG) stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING) stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING) stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING) @@ -141,6 +142,7 @@ class HealthStatisticsPublisher: """ event = ThriftEvent() + event.payloadData.append(int(round(time.time() * 1000))) event.payloadData.append(self.cartridge_agent_config.cluster_id) event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) event.payloadData.append(self.cartridge_agent_config.network_partition_id) @@ -159,6 +161,7 @@ class HealthStatisticsPublisher: """ event = ThriftEvent() + event.payloadData.append(int(round(time.time() * 1000))) event.payloadData.append(self.cartridge_agent_config.cluster_id) event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) event.payloadData.append(self.cartridge_agent_config.network_partition_id) http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml new file mode 100644 index 0000000..bcef15f --- /dev/null +++ b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<eventFormatter name="HealthStatsEventFormatter" + statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter"> + <from streamName="cartridge_agent_health_stats" version="1.0.0"/> + <mapping customMapping="disable" type="wso2event"/> + <to eventAdaptorName="DASDefaultWSO2EventOutputAdaptor" eventAdaptorType="wso2event"> + <property name="stream">cartridge_agent_health_stats</property> + <property name="version">1.0.0</property> + </to> +</eventFormatter> http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml new file mode 100644 index 0000000..3cfd4a9 --- /dev/null +++ b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<eventFormatter name="RIFEventFormatter" statistics="disable" + trace="enable" xmlns="http://wso2.org/carbon/eventformatter"> + <from streamName="in_flight_requests" version="1.0.0"/> + <mapping customMapping="disable" type="wso2event"/> + <to eventAdaptorName="DASDefaultWSO2EventOutputAdaptor" eventAdaptorType="wso2event"> + <property name="stream">in_flight_requests</property> + <property name="version">1.0.0</property> + </to> +</eventFormatter> + http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml ---------------------------------------------------------------------- diff --git a/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml new file mode 100755 index 0000000..5cec300 --- /dev/null +++ b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<outputEventAdaptor name="DASDefaultWSO2EventOutputAdaptor" + statistics="disable" trace="disable" type="wso2event" + xmlns="http://wso2.org/carbon/eventadaptormanager"> + <property name="username">admin</property> + <property name="receiverURL">tcp://localhost:7612</property> + <property name="password">admin</property> + <property name="authenticatorURL">ssl://localhost:7712</property> +</outputEventAdaptor>
