Adding Metering and Monitoring Service Implementation

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1eeead43
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1eeead43
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1eeead43

Branch: refs/heads/data-publisher-integration
Commit: 1eeead432f756d6327dbf038c4bcbbc4da2d0ead
Parents: 00ed5fa
Author: Thanuja <[email protected]>
Authored: Wed Jul 29 18:51:34 2015 +0530
Committer: Thanuja <[email protected]>
Committed: Wed Jul 29 18:51:34 2015 +0530

----------------------------------------------------------------------
 .../client/AutoscalerCloudControllerClient.java |  16 +-
 .../autoscaler/rule/RuleTasksDelegator.java     |  23 +-
 .../publisher/HealthStatisticsNotifier.java     |  10 +-
 .../messaging/topology/TopologyBuilder.java     |  70 ++-
 .../impl/CloudControllerServiceImpl.java        |   6 +-
 .../impl/CloudControllerServiceUtil.java        |  15 +-
 .../services/impl/InstanceCreator.java          |  18 +-
 .../publisher/BAMUsageDataPublisher.java        |  44 +-
 .../util/CloudControllerConstants.java          |   4 +
 .../common/constants/StratosConstants.java      |   8 +-
 .../publisher/HealthStatisticsPublisher.java    |   3 +-
 .../publisher/InFlightRequestPublisher.java     |   4 +-
 .../cep/WSO2CEPHealthStatisticsPublisher.java   |   9 +-
 .../cep/WSO2CEPInFlightRequestPublisher.java    |   6 +-
 .../LoadBalancerStatisticsNotifier.java         |   3 +-
 .../publisher/MockHealthStatisticsNotifier.java |   3 +
 .../modules/healthstatspublisher/healthstats.py |   5 +-
 .../HealthStatsEventFormatter.xml               |  30 ++
 .../eventformatters/RIFEventFormatter.xml       |  31 ++
 .../DASDefaultWSO2EventOutputAdaptor.xml        |  29 ++
 .../streamdefinitions/stream-manager-config.xml | 486 ++++++++++---------
 extensions/das/README.md                        |  10 +
 .../CloudControllerEventReceiver.xml            |  29 ++
 .../eventreceivers/HealthStatsEventReceiver.xml |  29 ++
 .../eventreceivers/RIFEventReceiver.xml         |  29 ++
 .../eventsink/cartridge_agent_health_stats.xml  |  85 ++++
 .../artifacts/eventsink/in_flight_requests.xml  |  64 +++
 .../org_apache_stratos_cloud_controller.xml     | 211 ++++++++
 .../cartridge_agent_health_stats_1.0.0.json     |  40 ++
 .../eventstreams/in_flight_requests_1.0.0.json  |  28 ++
 ...g.apache.stratos.cloud.controller_1.0.0.json | 112 +++++
 extensions/das/artifacts/sparkscript/CCEvent    |  18 +
 extensions/das/pom.xml                          |  40 ++
 extensions/das/spark-udf/pom.xml                |  36 ++
 .../das/extension/spark/udf/TimeUDF.java        |  49 ++
 extensions/pom.xml                              |   4 +-
 .../src/main/conf/drools/dependent-scaling.drl  |   4 +-
 .../src/main/conf/drools/mincheck.drl           |   5 +-
 .../src/main/conf/drools/scaling.drl            |   7 +-
 39 files changed, 1311 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
index f944a9f..c65a5f7 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java
@@ -84,7 +84,8 @@ public class AutoscalerCloudControllerClient {
     public synchronized MemberContext startInstance(PartitionRef partition,
                                                     String clusterId, String 
clusterInstanceId,
                                                     String networkPartitionId, 
boolean isPrimary,
-                                                    int minMemberCount) throws 
SpawningException {
+                                                    int minMemberCount, String 
autoscalingReason,
+                                                    long scalingTime) throws 
SpawningException {
         try {
             if (log.isInfoEnabled()) {
                 log.info(String.format("Trying to spawn an instance via cloud 
controller: " +
@@ -115,8 +116,18 @@ public class AutoscalerCloudControllerClient {
             minCountProp.setName(StratosConstants.MIN_COUNT);
             minCountProp.setValue(String.valueOf(minMemberCount));
 
+            Property autoscalingReasonProp = new Property();
+            autoscalingReasonProp.setName(StratosConstants.SCALING_REASON);
+            autoscalingReasonProp.setValue(autoscalingReason);
+
+            Property scalingTimeProp = new Property();
+            scalingTimeProp.setName(StratosConstants.SCALING_TIME);
+            scalingTimeProp.setValue(String.valueOf(scalingTime));
+
             memberContextProps.addProperty(isPrimaryProp);
             memberContextProps.addProperty(minCountProp);
+            memberContextProps.addProperty(autoscalingReasonProp);
+            memberContextProps.addProperty(scalingTimeProp);
             
instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps));
 
             long startTime = System.currentTimeMillis();
@@ -228,7 +239,8 @@ public class AutoscalerCloudControllerClient {
     public void terminateAllInstances(String clusterId) throws RemoteException,
             CloudControllerServiceInvalidClusterExceptionException {
         if (log.isInfoEnabled()) {
-            log.info(String.format("Terminating all instances of cluster via 
cloud controller: [cluster] %s", clusterId));
+            log.info(String.format("Terminating all instances of cluster via 
cloud controller: " +
+                    "[cluster] %s", clusterId));
         }
         long startTime = System.currentTimeMillis();
         stub.terminateInstances(clusterId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
index 51443a1..733ce57 100644
--- 
a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
+++ 
b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java
@@ -36,7 +36,6 @@ import 
org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetwo
 import 
org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
 import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
 import org.apache.stratos.cloud.controller.stub.domain.MemberContext;
-import org.apache.stratos.common.client.CloudControllerServiceClient;
 import org.apache.stratos.common.constants.StratosConstants;
 
 /**
@@ -48,7 +47,8 @@ public class RuleTasksDelegator {
 
     private static final Log log = LogFactory.getLog(RuleTasksDelegator.class);
 
-    public double getPredictedValueForNextMinute(float average, float 
gradient, float secondDerivative, int timeInterval) {
+    public double getPredictedValueForNextMinute(float average, float 
gradient, float secondDerivative,
+                                                 int timeInterval) {
         double predictedValue;
 //        s = u * t + 0.5 * a * t * t
         if (log.isDebugEnabled()) {
@@ -175,9 +175,11 @@ public class RuleTasksDelegator {
      * @param clusterId                      Cluster id
      * @param clusterInstanceId              Instance id
      * @param isPrimary                      Is a primary member
+     * @param autoscalingReason              scaling reason for member
+     * @param scalingTime                    scaling time
      */
     public void delegateSpawn(ClusterLevelPartitionContext 
clusterMonitorPartitionContext, String clusterId,
-                              String clusterInstanceId, boolean isPrimary) {
+                              String clusterInstanceId, boolean isPrimary, 
String autoscalingReason, long scalingTime) {
 
         try {
             String nwPartitionId = 
clusterMonitorPartitionContext.getNetworkPartitionId();
@@ -199,14 +201,15 @@ public class RuleTasksDelegator {
                                     clusterId,
                                     clusterInstanceId, 
clusterMonitorPartitionContext.getNetworkPartitionId(),
                                     isPrimary,
-                                    minimumCountOfNetworkPartition);
+                                    minimumCountOfNetworkPartition, 
autoscalingReason, scalingTime);
             if (memberContext != null) {
                 ClusterLevelPartitionContext partitionContext = 
clusterInstanceContext.
                         
getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId());
                 partitionContext.addPendingMember(memberContext);
                 partitionContext.addMemberStatsContext(new 
MemberStatsContext(memberContext.getMemberId()));
                 if (log.isDebugEnabled()) {
-                    log.debug(String.format("Pending member added, [member] %s 
[partition] %s", memberContext.getMemberId(),
+                    log.debug(String.format("Pending member added, [member] %s 
[partition] %s",
+                            memberContext.getMemberId(),
                             memberContext.getPartition().getId()));
                 }
 
@@ -245,7 +248,8 @@ public class RuleTasksDelegator {
         clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId);
     }
 
-    public void delegateScalingDownBeyondMinNotification(String clusterId, 
String networkPartitionId, String instanceId) {
+    public void delegateScalingDownBeyondMinNotification(String clusterId, 
String networkPartitionId,
+                                                         String instanceId) {
         if (log.isDebugEnabled()) {
             log.debug("Scaling down lower min notification is going to the 
[parentInstance] " + instanceId);
         }
@@ -268,8 +272,8 @@ public class RuleTasksDelegator {
                 
clusterMonitorPartitionContext.removeMemberStatsContext(memberId);
             } else if 
(clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) {
 
-                log.info(String.format("[scale-down] Moving pending member to 
termination pending list [member id] %s " +
-                                "[partition] %s [network partition] %s", 
memberId,
+                log.info(String.format("[scale-down] Moving pending member to 
termination pending list " +
+                                "[member id] %s " + "[partition] %s [network 
partition] %s", memberId,
                         clusterMonitorPartitionContext.getPartitionId(),
                         
clusterMonitorPartitionContext.getNetworkPartitionId()));
                 
clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId);
@@ -280,7 +284,8 @@ public class RuleTasksDelegator {
         }
     }
 
-    public void delegateTerminateDependency(ClusterLevelPartitionContext 
clusterMonitorPartitionContext, String memberId) {
+    public void delegateTerminateDependency(ClusterLevelPartitionContext 
clusterMonitorPartitionContext,
+                                            String memberId) {
         try {
             //calling SM to send the instance notification event.
             if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
index 74c5156..5ab2ebf 100644
--- 
a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
+++ 
b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java
@@ -51,7 +51,8 @@ public class HealthStatisticsNotifier implements Runnable {
             File pluginFile = new File(pluginFileName);
             if ((pluginFile != null)
                     && (pluginFile.exists())) {
-                List<Class> pluginClass = 
PluginLoader.loadPluginClassesFromJar(pluginFile, 
IHealthStatisticsReader.class);
+                List<Class> pluginClass = 
PluginLoader.loadPluginClassesFromJar(pluginFile,
+                        IHealthStatisticsReader.class);
                 if (!pluginClass.isEmpty()) {
                     try {
                         log.trace("Instantiating new instance of plugin type " 
+ pluginClass);
@@ -63,7 +64,8 @@ public class HealthStatisticsNotifier implements Runnable {
                     }
                 }
             } else {
-                log.error("Plugin not found or malformed: " + pluginFileName + 
((pluginFile == null) ? " NULL" : "Doesn't exist"));
+                log.error("Plugin not found or malformed: " + pluginFileName + 
((pluginFile == null) ? " NULL" :
+                        "Doesn't exist"));
             }
         }
         if (this.statsReader == null) {
@@ -95,7 +97,7 @@ public class HealthStatisticsNotifier implements Runnable {
                         if (log.isDebugEnabled()) {
                             log.debug(String.format("Publishing memory 
consumption: %f", stats.getMemoryUsage()));
                         }
-                        statsPublisher.publish(
+                        statsPublisher.publish(System.currentTimeMillis(),
                                 
CartridgeAgentConfiguration.getInstance().getClusterId(),
                                 
CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
                                 
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),
@@ -108,7 +110,7 @@ public class HealthStatisticsNotifier implements Runnable {
                         if (log.isDebugEnabled()) {
                             log.debug(String.format("Publishing load average: 
%f", stats.getProcessorUsage()));
                         }
-                        statsPublisher.publish(
+                        statsPublisher.publish(System.currentTimeMillis(),
                                 
CartridgeAgentConfiguration.getInstance().getClusterId(),
                                 
CartridgeAgentConfiguration.getInstance().getClusterInstanceId(),
                                 
CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index f04a11f..419c711 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -31,6 +31,7 @@ import 
org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPubl
 import 
org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.common.Property;
+import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
 import org.apache.stratos.messaging.domain.instance.ClusterInstance;
 import org.apache.stratos.messaging.domain.topology.*;
@@ -67,7 +68,8 @@ public class TopologyBuilder {
             TopologyManager.acquireWriteLock();
             for (Cartridge cartridge : cartridgeList) {
                 if (!topology.serviceExists(cartridge.getType())) {
-                    ServiceType serviceType = cartridge.isMultiTenant() ? 
ServiceType.MultiTenant : ServiceType.SingleTenant;
+                    ServiceType serviceType = cartridge.isMultiTenant() ? 
ServiceType.MultiTenant :
+                            ServiceType.SingleTenant;
                     service = new Service(cartridge.getType(), serviceType);
                     Properties properties = new Properties();
 
@@ -199,14 +201,14 @@ public class TopologyBuilder {
         }
 
         log.debug("Creating cluster port mappings: [appication-id] " + appId);
-        for(Cluster cluster : appClusters) {
+        for (Cluster cluster : appClusters) {
             String cartridgeType = cluster.getServiceName();
             Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(cartridgeType);
-            if(cartridge == null) {
+            if (cartridge == null) {
                 throw new CloudControllerException("Cartridge not found: 
[cartridge-type] " + cartridgeType);
             }
 
-            for(PortMapping portMapping : cartridge.getPortMappings()) {
+            for (PortMapping portMapping : cartridge.getPortMappings()) {
                 ClusterPortMapping clusterPortMapping = new 
ClusterPortMapping(appId,
                         cluster.getClusterId(), portMapping.getName(), 
portMapping.getProtocol(), portMapping.getPort(),
                         portMapping.getProxyPort());
@@ -406,6 +408,11 @@ public class TopologyBuilder {
         String partitionId = memberContext.getPartition().getId();
         String lbClusterId = memberContext.getLbClusterId();
         long initTime = memberContext.getInitTime();
+        String autoscalingReason = memberContext.getProperties().getProperty(
+                StratosConstants.SCALING_REASON).getValue();
+        long scalingTime = 
Long.parseLong(memberContext.getProperties().getProperty(
+                StratosConstants.SCALING_TIME).getValue());
+
 
         if (cluster.memberExists(memberId)) {
             log.warn(String.format("Member %s already exists", memberId));
@@ -421,6 +428,19 @@ public class TopologyBuilder {
             
member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties()));
             cluster.addMember(member);
             TopologyManager.updateTopology(topology);
+            //member created time
+            Long timeStamp = System.currentTimeMillis();
+            //publishing to BAM
+            BAMUsageDataPublisher
+                    .publish(memberContext.getMemberId(),
+                            memberContext.getPartition().getId(),
+                            memberContext.getNetworkPartitionId(),
+                            memberContext.getClusterId(),
+                            memberContext.getClusterInstanceId(),
+                            memberContext.getCartridgeType(),
+                            MemberStatus.Created.toString(),
+                            timeStamp, autoscalingReason,
+                            scalingTime, null);
         } finally {
             TopologyManager.releaseWriteLock();
         }
@@ -479,16 +499,18 @@ public class TopologyBuilder {
                 log.info("Member status updated to initialized");
 
                 TopologyManager.updateTopology(topology);
-
+                //member intialized time
+                Long timeStamp = System.currentTimeMillis();
                 
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
                 //publishing data
                 BAMUsageDataPublisher.publish(memberContext.getMemberId(),
                         memberContext.getPartition().getId(),
                         memberContext.getNetworkPartitionId(),
+                        memberContext.getClusterInstanceId(),
                         memberContext.getClusterId(),
                         memberContext.getCartridgeType(),
                         MemberStatus.Initialized.toString(),
-                        null);
+                        timeStamp, null, null, null);
             }
         } finally {
             TopologyManager.releaseWriteLock();
@@ -542,16 +564,19 @@ public class TopologyBuilder {
                     log.info("member started event adding status started");
 
                     TopologyManager.updateTopology(topology);
+                    //member started time
+                    Long timeStamp = System.currentTimeMillis();
                     //memberStartedEvent.
                     
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
                     //publishing data
                     
BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
                             instanceStartedEvent.getPartitionId(),
                             instanceStartedEvent.getNetworkPartitionId(),
+                            instanceStartedEvent.getClusterInstanceId(),
                             instanceStartedEvent.getClusterId(),
                             instanceStartedEvent.getServiceName(),
                             MemberStatus.Starting.toString(),
-                            null);
+                            timeStamp, null, null, null);
                 }
             } finally {
                 TopologyManager.releaseWriteLock();
@@ -602,7 +627,8 @@ public class TopologyBuilder {
             TopologyManager.acquireWriteLock();
             // try update lifecycle state
             if (!member.isStateTransitionValid(MemberStatus.Active)) {
-                log.error("Invalid state transition from [" + 
member.getStatus() + "] to [" + MemberStatus.Active + "]");
+                log.error("Invalid state transition from [" + 
member.getStatus() + "] to [" +
+                        MemberStatus.Active + "]");
                 return;
             } else {
                 member.setStatus(MemberStatus.Active);
@@ -644,7 +670,8 @@ public class TopologyBuilder {
                 
memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP());
                 
memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs());
                 TopologyManager.updateTopology(topology);
-
+                //member activated time
+                Long timeStamp = System.currentTimeMillis();
                 // Publish member activated event
                 
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
 
@@ -652,10 +679,11 @@ public class TopologyBuilder {
                 
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
                         memberActivatedEvent.getPartitionId(),
                         memberActivatedEvent.getNetworkPartitionId(),
+                        memberActivatedEvent.getClusterInstanceId(),
                         memberActivatedEvent.getClusterId(),
                         memberActivatedEvent.getServiceName(),
                         MemberStatus.Active.toString(),
-                        null);
+                        timeStamp, null, null, null);
             }
         } finally {
             TopologyManager.releaseWriteLock();
@@ -694,6 +722,8 @@ public class TopologyBuilder {
                 instanceReadyToShutdownEvent.getMemberId(),
                 instanceReadyToShutdownEvent.getNetworkPartitionId(),
                 instanceReadyToShutdownEvent.getPartitionId());
+        //member ReadyToShutDown state change time
+        Long timeStamp = null;
         try {
             TopologyManager.acquireWriteLock();
 
@@ -706,6 +736,7 @@ public class TopologyBuilder {
             log.info("Member Ready to shut down event adding status started");
 
             TopologyManager.updateTopology(topology);
+            timeStamp = System.currentTimeMillis();
         } finally {
             TopologyManager.releaseWriteLock();
         }
@@ -714,10 +745,11 @@ public class TopologyBuilder {
         
BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
                 instanceReadyToShutdownEvent.getPartitionId(),
                 instanceReadyToShutdownEvent.getNetworkPartitionId(),
+                instanceReadyToShutdownEvent.getClusterInstanceId(),
                 instanceReadyToShutdownEvent.getClusterId(),
                 instanceReadyToShutdownEvent.getServiceName(),
                 MemberStatus.ReadyToShutDown.toString(),
-                null);
+                timeStamp, null, null, null);
         //termination of particular instance will be handled by autoscaler
     }
 
@@ -834,7 +866,8 @@ public class TopologyBuilder {
         }
     }
 
-    public static void 
handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent 
clusterStatusClusterActivatedEvent) {
+    public static void 
handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent
+                                                           
clusterStatusClusterActivatedEvent) {
 
         Topology topology = TopologyManager.getTopology();
         Service service = 
topology.getService(clusterStatusClusterActivatedEvent.getServiceName());
@@ -888,7 +921,8 @@ public class TopologyBuilder {
             } else {
                 log.error(String.format("Cluster state transition is not 
valid: [cluster-id] %s " +
                                 " [instance-id] %s [current-status] %s 
[status-requested] %s",
-                        clusterStatusClusterActivatedEvent.getClusterId(), 
clusterStatusClusterActivatedEvent.getInstanceId(),
+                        clusterStatusClusterActivatedEvent.getClusterId(),
+                        clusterStatusClusterActivatedEvent.getInstanceId(),
                         context.getStatus(), status));
                 return;
             }
@@ -997,8 +1031,8 @@ public class TopologyBuilder {
                 cluster.removeInstanceContext(event.getInstanceId());
                 TopologyManager.updateTopology(topology);
                 //publishing data
-                ClusterInstanceTerminatedEvent clusterTerminatedEvent = new 
ClusterInstanceTerminatedEvent(event.getAppId(),
-                        event.getServiceName(), event.getClusterId(), 
event.getInstanceId());
+                ClusterInstanceTerminatedEvent clusterTerminatedEvent = new 
ClusterInstanceTerminatedEvent(
+                        event.getAppId(), event.getServiceName(), 
event.getClusterId(), event.getInstanceId());
 
                 
TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent);
             } else {
@@ -1041,15 +1075,15 @@ public class TopologyBuilder {
                 log.info("Cluster Terminating started for " + 
cluster.getClusterId());
                 TopologyManager.updateTopology(topology);
                 //publishing data
-                ClusterInstanceTerminatingEvent clusterTerminaingEvent = new 
ClusterInstanceTerminatingEvent(event.getAppId(),
-                        event.getServiceName(), event.getClusterId(), 
event.getInstanceId());
+                ClusterInstanceTerminatingEvent clusterTerminaingEvent = new 
ClusterInstanceTerminatingEvent(
+                        event.getAppId(), event.getServiceName(), 
event.getClusterId(), event.getInstanceId());
 
                 
TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent);
 
                 // Remove kubernetes services if available
                 ClusterContext clusterContext =
                         
CloudControllerContext.getInstance().getClusterContext(event.getClusterId());
-                
if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
+                if 
(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) {
                     KubernetesIaas.removeKubernetesServices(event.getAppId(), 
event.getClusterId());
                 }
             } else {

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 4d51cc1..2b19b05 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -447,13 +447,13 @@ public class CloudControllerServiceImpl implements 
CloudControllerService {
                 clusterContext.setVolumes(volumes);
             }
 
-            // Handle member created event
-            TopologyBuilder.handleMemberCreatedEvent(memberContext);
-
             // Persist member context
             
CloudControllerContext.getInstance().addMemberContext(memberContext);
             CloudControllerContext.getInstance().persist();
 
+            // Handle member created event
+            TopologyBuilder.handleMemberCreatedEvent(memberContext);
+
             // Start instance in a new thread
             if (log.isDebugEnabled()) {
                 log.debug(String.format("Starting instance creator thread: 
[cluster] %s [cluster-instance] %s " +

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
index 37580eb..e7be3a6 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
@@ -65,18 +65,21 @@ public class CloudControllerServiceUtil {
         
TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(),
                 memberContext.getClusterId(), 
memberContext.getNetworkPartitionId(),
                 partitionId, memberContext.getMemberId());
-
+        //member terminated time
+        Long timeStamp = System.currentTimeMillis();
         // Publish statistics to BAM
         BAMUsageDataPublisher.publish(memberContext.getMemberId(),
                 partitionId,
                 memberContext.getNetworkPartitionId(),
+                memberContext.getClusterInstanceId(),
                 memberContext.getClusterId(),
                 memberContext.getCartridgeType(),
                 MemberStatus.Terminated.toString(),
-                null);
+                timeStamp, null, null, null);
 
         // Remove member context
-        
CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(),
 memberContext.getMemberId());
+        
CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(),
+                memberContext.getMemberId());
 
         // Persist cloud controller context
         CloudControllerContext.getInstance().persist();
@@ -87,7 +90,8 @@ public class CloudControllerServiceUtil {
         return isValid;
     }
 
-    public static IaasProvider validatePartitionAndGetIaasProvider(Partition 
partition, IaasProvider iaasProvider) throws InvalidPartitionException {
+    public static IaasProvider validatePartitionAndGetIaasProvider(Partition 
partition, IaasProvider iaasProvider)
+            throws InvalidPartitionException {
         if (iaasProvider != null) {
             // if this is a IaaS based partition
             Iaas iaas = iaasProvider.getIaas();
@@ -104,7 +108,8 @@ public class CloudControllerServiceUtil {
         }
     }
 
-    public static boolean validatePartition(Partition partition, IaasProvider 
iaasProvider) throws InvalidPartitionException {
+    public static boolean validatePartition(Partition partition, IaasProvider 
iaasProvider)
+            throws InvalidPartitionException {
         validatePartitionAndGetIaasProvider(partition, iaasProvider);
         return true;
     }

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
index 77cfea2..c0dbf57 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
@@ -27,8 +27,6 @@ import org.apache.stratos.cloud.controller.domain.*;
 import 
org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException;
 import org.apache.stratos.cloud.controller.iaases.Iaas;
 import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
-import 
org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
 
 import java.util.concurrent.locks.Lock;
 
@@ -68,7 +66,8 @@ public class InstanceCreator implements Runnable {
             memberContext = startInstance(iaas, memberContext, payload);
 
             if (log.isInfoEnabled()) {
-                log.info(String.format("Instance started successfully: 
[cartridge-type] %s [cluster-id] %s [instance-id] %s " +
+                log.info(String.format("Instance started successfully: 
[cartridge-type] %s [cluster-id] %s " +
+                                "[instance-id] %s " +
                                 "[default-private-ip] %s [default-public-ip] 
%s",
                         memberContext.getCartridgeType(), 
memberContext.getClusterId(),
                         memberContext.getInstanceId(), 
memberContext.getDefaultPrivateIP(),
@@ -84,16 +83,6 @@ public class InstanceCreator implements Runnable {
 
             // Update topology
             TopologyBuilder.handleMemberInitializedEvent(memberContext);
-
-            // Publish instance creation statistics to BAM
-            BAMUsageDataPublisher.publish(
-                    memberContext.getMemberId(),
-                    memberContext.getPartition().getId(),
-                    memberContext.getNetworkPartitionId(),
-                    memberContext.getClusterId(),
-                    memberContext.getCartridgeType(),
-                    MemberStatus.Initialized.toString(),
-                    memberContext.getInstanceMetadata());
         } catch (Exception e) {
             String message = String.format("Could not start instance: 
[cartridge-type] %s [cluster-id] %s",
                     memberContext.getCartridgeType(), 
memberContext.getClusterId());
@@ -105,7 +94,8 @@ public class InstanceCreator implements Runnable {
         }
     }
 
-    private MemberContext startInstance(Iaas iaas, MemberContext 
memberContext, byte[] payload) throws CartridgeNotFoundException {
+    private MemberContext startInstance(Iaas iaas, MemberContext 
memberContext, byte[] payload) throws
+            CartridgeNotFoundException {
         memberContext = iaas.startInstance(memberContext, payload);
 
         // Validate instance id

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
index d5aabbd..690bc59 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
@@ -52,12 +52,31 @@ public class BAMUsageDataPublisher {
     private static StreamDefinition streamDefinition;
     private static final String cloudControllerEventStreamVersion = "1.0.0";
 
+    /**
+     * Publish events to BAM
+     *
+     * @param memberId          member id
+     * @param partitionId       partition id
+     * @param networkId         network partition id
+     * @param clusterId         cluster id
+     * @param clusterInstanceId cluster instance id
+     * @param serviceName       service name
+     * @param status            member status
+     * @param timeStamp         time
+     * @param autoscalingReason scaling reason related to member
+     * @param scalingTime       scaling time
+     * @param metadata          meta-data
+     */
     public static void publish(String memberId,
                                String partitionId,
                                String networkId,
                                String clusterId,
+                               String clusterInstanceId,
                                String serviceName,
                                String status,
+                               Long timeStamp,
+                               String autoscalingReason,
+                               Long scalingTime,
                                InstanceMetadata metadata) {
         if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) {
             return;
@@ -79,16 +98,23 @@ public class BAMUsageDataPublisher {
         MemberContext memberContext = 
CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
         String cartridgeType = memberContext.getCartridgeType();
         Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(cartridgeType);
+        String instanceType = 
CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridgeType,
+                
partitionId).getProperty(CloudControllerConstants.INSTANCE_TYPE);
 
         //Construct the data to be published
         List<Object> payload = new ArrayList<Object>();
         // Payload values
+        payload.add(timeStamp);
         payload.add(memberId);
         payload.add(serviceName);
         payload.add(clusterId);
+        payload.add(clusterInstanceId);
         payload.add(handleNull(memberContext.getLbClusterId()));
         payload.add(handleNull(partitionId));
         payload.add(handleNull(networkId));
+        payload.add(handleNull(instanceType));
+        payload.add(handleNull(autoscalingReason));
+        payload.add(handleNull(scalingTime));
         if (cartridge != null) {
             payload.add(handleNull(String.valueOf(cartridge.isMultiTenant())));
         } else {
@@ -129,12 +155,14 @@ public class BAMUsageDataPublisher {
 
         try {
             if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing BAM event: [stream] %s 
[version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
+                log.debug(String.format("Publishing BAM event: [stream] %s 
[version] %s", streamDefinition.getName(),
+                        streamDefinition.getVersion()));
             }
             dataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
         } catch (AgentException e) {
             if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish BAM event: [stream] 
%s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), 
e);
+                log.error(String.format("Could not publish BAM event: [stream] 
%s [version] %s",
+                        streamDefinition.getName(), 
streamDefinition.getVersion()), e);
             }
         }
     }
@@ -151,12 +179,17 @@ public class BAMUsageDataPublisher {
         streamDefinition.setDescription("Instances booted up by the Cloud 
Controller");
         // Payload definition
         List<Attribute> payloadData = new ArrayList<Attribute>();
+        payloadData.add(new Attribute(CloudControllerConstants.TIME_STAMP, 
AttributeType.LONG));
         payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, 
AttributeType.STRING));
         payloadData.add(new 
Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, 
AttributeType.STRING));
         payloadData.add(new 
Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING));
         payloadData.add(new 
Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.SCALING_REASON, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.SCALING_TIME, 
AttributeType.LONG));
         payloadData.add(new 
Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, 
AttributeType.STRING));
         payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, 
AttributeType.STRING));
@@ -210,4 +243,11 @@ public class BAMUsageDataPublisher {
         }
         return val;
     }
+
+    private static Long handleNull(Long val) {
+        if (val == null) {
+            return -1L;
+        }
+        return val;
+    }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
index 5e6115f..2cb0c31 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java
@@ -103,6 +103,7 @@ public final class CloudControllerConstants {
     public static final String MEMBER_ID_COL = "memberId";
     public static final String CARTRIDGE_TYPE_COL = "cartridgeType";
     public static final String CLUSTER_ID_COL = "clusterId";
+    public static final String CLUSTER_INSTANCE_ID_COL = "clusterInstanceId";
     public static final String PARTITION_ID_COL = "partitionId";
     public static final String NETWORK_ID_COL = "networkId";
     public static final String ALIAS_COL = "alias";
@@ -122,6 +123,9 @@ public final class CloudControllerConstants {
     public static final String PRIV_IP_COL = "privateIPAddresses";
     public static final String PUB_IP_COL = "publicIPAddresses";
     public static final String ALLOCATE_IP_COL = "allocateIPAddresses";
+    public static final String TIME_STAMP = "timeStamp";
+    public static final String SCALING_REASON = "scalingReason";
+    public static final String SCALING_TIME = "scalingTime";
 
     /**
      * Properties

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
index 1275f5c..af46cfe 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java
@@ -91,7 +91,8 @@ public class StratosConstants {
 
     // metering constants
     public static final String THROTTLING_ALL_ACTION = "all_actions";
-    public static final String THROTTLING_IN_DATA_ACTION = "in_data_action"; 
//this covers registry capacity + registry bandwidth
+    public static final String THROTTLING_IN_DATA_ACTION =
+            "in_data_action"; //this covers registry capacity + registry 
bandwidth
     public static final String THROTTLING_OUT_DATA_ACTION = "out_data_action"; 
//this covers registry bandwidth
     public static final String THROTTLING_ADD_USER_ACTION = "add_user_action";
     public static final String THROTTLING_SERVICE_IN_BANDWIDTH_ACTION = 
"service_in_bandwith_action";
@@ -158,6 +159,8 @@ public class StratosConstants {
     public static final String MAX_CHECK_DROOL_FILE = "maxcheck.drl";
     public static final String OBSOLETE_CHECK_DROOL_FILE = "obsoletecheck.drl";
     public static final String MIN_COUNT = "MIN_COUNT";
+    public static final String SCALING_REASON = "SCALING_REASON";
+    public static final String SCALING_TIME = "SCALING_TIME";
 
     // Policy and definition related constants
     public static final int PUBLIC_DEFINITION = 0;
@@ -165,7 +168,8 @@ public class StratosConstants {
     // member expiry timeout constants
     public static final String PENDING_MEMBER_EXPIRY_TIMEOUT = 
"autoscaler.member.pendingMemberExpiryTimeout";
     public static final String OBSOLETED_MEMBER_EXPIRY_TIMEOUT = 
"autoscaler.member.obsoletedMemberExpiryTimeout";
-    public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = 
"autoscaler.member.pendingTerminationMemberExpiryTimeout";
+    public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT =
+            "autoscaler.member.pendingTerminationMemberExpiryTimeout";
 
     public static final String FILTER_VALUE_SEPARATOR = ",";
     public static final String TOPOLOGY_APPLICATION_FILTER = 
"stratos.topology.application.filter";

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
index dd7ddd4..95b04ff 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java
@@ -27,6 +27,7 @@ public interface HealthStatisticsPublisher extends 
StatisticsPublisher {
     /**
      * Publish health statistics to complex event processor.
      *
+     * @param timeStamp          time
      * @param clusterId          Cluster id of the member
      * @param clusterInstanceId  Cluster instance id of the member
      * @param networkPartitionId Network partition id of the member
@@ -35,6 +36,6 @@ public interface HealthStatisticsPublisher extends 
StatisticsPublisher {
      * @param health             Health type: memory_consumption | load_average
      * @param value              Health type value
      */
-    void publish(String clusterId, String clusterInstanceId, String 
networkPartitionId,
+    void publish(Long timeStamp, String clusterId, String clusterInstanceId, 
String networkPartitionId,
                  String memberId, String partitionId, String health, double 
value);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
index 289be8b..af9c8e9 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java
@@ -27,10 +27,12 @@ public interface InFlightRequestPublisher extends 
StatisticsPublisher {
     /**
      * Publish in-flight request count.
      *
+     * @param timeStamp            time
      * @param clusterId            Cluster id
      * @param clusterInstanceId    Cluster instance id
      * @param networkPartitionId   Network partition id of the cluster
      * @param inFlightRequestCount In-flight request count of the cluster
      */
-    void publish(String clusterId, String clusterInstanceId, String 
networkPartitionId, int inFlightRequestCount);
+    void publish(Long timeStamp, String clusterId, String clusterInstanceId, 
String networkPartitionId,
+                 int inFlightRequestCount);
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
index 1dc4240..d5c9265 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java
@@ -52,6 +52,7 @@ public class WSO2CEPHealthStatisticsPublisher extends 
WSO2CEPStatisticsPublisher
 
             // Set payload definition
             List<Attribute> payloadData = new ArrayList<Attribute>();
+            payloadData.add(new Attribute("time_stamp", AttributeType.LONG));
             payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
             payloadData.add(new Attribute("cluster_instance_id", 
AttributeType.STRING));
             payloadData.add(new Attribute("network_partition_id", 
AttributeType.STRING));
@@ -70,6 +71,7 @@ public class WSO2CEPHealthStatisticsPublisher extends 
WSO2CEPStatisticsPublisher
     /**
      * Publish health statistics to cep.
      *
+     * @param timeStamp
      * @param clusterId
      * @param clusterInstanceId
      * @param networkPartitionId
@@ -79,13 +81,16 @@ public class WSO2CEPHealthStatisticsPublisher extends 
WSO2CEPStatisticsPublisher
      * @param value
      */
     @Override
-    public void publish(String clusterId, String clusterInstanceId, String 
networkPartitionId, String memberId, String partitionId, String health, double 
value) {
+    public void publish(Long timeStamp, String clusterId, String 
clusterInstanceId, String networkPartitionId,
+                        String memberId, String partitionId, String health, 
double value) {
         if (log.isDebugEnabled()) {
-            log.debug(String.format("Publishing health statistics: [cluster] 
%s [network-partition] %s [partition] %s [member] %s [health] %s [value] %f",
+            log.debug(String.format("Publishing health statistics: [cluster] 
%s [network-partition] %s " +
+                            "[partition] %s [member] %s [health] %s [value] 
%f",
                     clusterId, networkPartitionId, partitionId, memberId, 
health, value));
         }
         // Set payload values
         List<Object> payload = new ArrayList<Object>();
+        payload.add(timeStamp);
         payload.add(clusterId);
         payload.add(clusterInstanceId);
         payload.add(networkPartitionId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
index 2ed8883..f51eb91 100644
--- 
a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
+++ 
b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java
@@ -51,6 +51,7 @@ public class WSO2CEPInFlightRequestPublisher extends 
WSO2CEPStatisticsPublisher
             List<Attribute> payloadData = new ArrayList<Attribute>();
 
             // Set payload definition
+            payloadData.add(new Attribute("time_stamp", AttributeType.LONG));
             payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
             payloadData.add(new Attribute("cluster_instance_id", 
AttributeType.STRING));
             payloadData.add(new Attribute("network_partition_id", 
AttributeType.STRING));
@@ -65,15 +66,18 @@ public class WSO2CEPInFlightRequestPublisher extends 
WSO2CEPStatisticsPublisher
     /**
      * Publish in-flight request count of a cluster.
      *
+     * @param timeStamp
      * @param clusterId
      * @param clusterInstanceId
      * @param networkPartitionId
      * @param inFlightRequestCount
      */
     @Override
-    public void publish(String clusterId, String clusterInstanceId, String 
networkPartitionId, int inFlightRequestCount) {
+    public void publish(Long timeStamp, String clusterId, String 
clusterInstanceId, String networkPartitionId,
+                        int inFlightRequestCount) {
         // Set payload values
         List<Object> payload = new ArrayList<Object>();
+        payload.add(timeStamp);
         payload.add(clusterId);
         payload.add(clusterInstanceId);
         payload.add(networkPartitionId);

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
index dc2233d..1dd12c7 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -81,7 +81,8 @@ public class LoadBalancerStatisticsNotifier implements 
Runnable {
                         for (Cluster cluster : service.getClusters()) {
                             // Publish in-flight request count of load 
balancer's network partition
                             int requestCount = 
statsReader.getInFlightRequestCount(cluster.getClusterId());
-                            
inFlightRequestPublisher.publish(cluster.getClusterId(), clusterInstanceId,
+                            
inFlightRequestPublisher.publish(System.currentTimeMillis(), 
cluster.getClusterId(),
+                                    clusterInstanceId,
                                     networkPartitionId, requestCount);
 
                             if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
index c2d1c6c..0dc5e67 100644
--- 
a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
+++ 
b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java
@@ -69,6 +69,7 @@ public class MockHealthStatisticsNotifier implements Runnable 
{
                         mockMemberContext.getMemberId(), memoryConsumption));
             }
             healthStatisticsPublisher.publish(
+                    System.currentTimeMillis(),
                     mockMemberContext.getClusterId(),
                     mockMemberContext.getClusterInstanceId(),
                     mockMemberContext.getNetworkPartitionId(),
@@ -93,6 +94,7 @@ public class MockHealthStatisticsNotifier implements Runnable 
{
                         mockMemberContext.getMemberId(), loadAvereage));
             }
             healthStatisticsPublisher.publish(
+                    System.currentTimeMillis(),
                     mockMemberContext.getClusterId(),
                     mockMemberContext.getClusterInstanceId(),
                     mockMemberContext.getNetworkPartitionId(),
@@ -116,6 +118,7 @@ public class MockHealthStatisticsNotifier implements 
Runnable {
                         mockMemberContext.getMemberId(), requestsInFlight));
             }
             inFlightRequestPublisher.publish(
+                    System.currentTimeMillis(),
                     mockMemberContext.getClusterId(),
                     mockMemberContext.getClusterInstanceId(),
                     mockMemberContext.getNetworkPartitionId(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
index 9753c3e..aae9e9d 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
@@ -17,7 +17,7 @@
 
 from threading import Thread
 import multiprocessing
-
+import time
 import psutil
 
 from abstracthealthstatisticspublisher import *
@@ -124,6 +124,7 @@ class HealthStatisticsPublisher:
         stream_def.description = 
HealthStatisticsPublisherManager.STREAM_DESCRIPTION
 
         # stream_def.add_payloaddata_attribute()
+        stream_def.add_payloaddata_attribute("time_stamp", 
StreamDefinition.LONG)
         stream_def.add_payloaddata_attribute("cluster_id", 
StreamDefinition.STRING)
         stream_def.add_payloaddata_attribute("cluster_instance_id", 
StreamDefinition.STRING)
         stream_def.add_payloaddata_attribute("network_partition_id", 
StreamDefinition.STRING)
@@ -141,6 +142,7 @@ class HealthStatisticsPublisher:
         """
 
         event = ThriftEvent()
+        event.payloadData.append(int(round(time.time() * 1000)))
         event.payloadData.append(self.cartridge_agent_config.cluster_id)
         
event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
         
event.payloadData.append(self.cartridge_agent_config.network_partition_id)
@@ -159,6 +161,7 @@ class HealthStatisticsPublisher:
         """
 
         event = ThriftEvent()
+        event.payloadData.append(int(round(time.time() * 1000)))
         event.payloadData.append(self.cartridge_agent_config.cluster_id)
         
event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
         
event.payloadData.append(self.cartridge_agent_config.network_partition_id)

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
----------------------------------------------------------------------
diff --git 
a/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml 
b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
new file mode 100644
index 0000000..bcef15f
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventFormatter name="HealthStatsEventFormatter"
+                statistics="disable" trace="enable" 
xmlns="http://wso2.org/carbon/eventformatter";>
+    <from streamName="cartridge_agent_health_stats" version="1.0.0"/>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to eventAdaptorName="DASDefaultWSO2EventOutputAdaptor" 
eventAdaptorType="wso2event">
+        <property name="stream">cartridge_agent_health_stats</property>
+        <property name="version">1.0.0</property>
+    </to>
+</eventFormatter>

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
----------------------------------------------------------------------
diff --git a/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml 
b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
new file mode 100644
index 0000000..3cfd4a9
--- /dev/null
+++ b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<eventFormatter name="RIFEventFormatter" statistics="disable"
+                trace="enable" xmlns="http://wso2.org/carbon/eventformatter";>
+    <from streamName="in_flight_requests" version="1.0.0"/>
+    <mapping customMapping="disable" type="wso2event"/>
+    <to eventAdaptorName="DASDefaultWSO2EventOutputAdaptor" 
eventAdaptorType="wso2event">
+        <property name="stream">in_flight_requests</property>
+        <property name="version">1.0.0</property>
+    </to>
+</eventFormatter>
+

http://git-wip-us.apache.org/repos/asf/stratos/blob/1eeead43/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
----------------------------------------------------------------------
diff --git 
a/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
 
b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
new file mode 100755
index 0000000..5cec300
--- /dev/null
+++ 
b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<outputEventAdaptor name="DASDefaultWSO2EventOutputAdaptor"
+                    statistics="disable" trace="disable" type="wso2event"
+                    xmlns="http://wso2.org/carbon/eventadaptormanager";>
+    <property name="username">admin</property>
+    <property name="receiverURL">tcp://localhost:7612</property>
+    <property name="password">admin</property>
+    <property name="authenticatorURL">ssl://localhost:7712</property>
+</outputEventAdaptor>

Reply via email to