Repository: stratos
Updated Branches:
  refs/heads/master 02966ab58 -> ab73437f8


Renaming usage data publisher to BAMUsageDataPublisher


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ab73437f
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ab73437f
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ab73437f

Branch: refs/heads/master
Commit: ab73437f839108969d704c83e7a8fbbe659e6b7c
Parents: 02966ab
Author: Imesh Gunaratne <[email protected]>
Authored: Sat Mar 7 17:10:34 2015 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Sat Mar 7 17:10:57 2015 +0530

----------------------------------------------------------------------
 .../publisher/StatisticsDataPublisher.java      | 215 -------------------
 .../messaging/topology/TopologyBuilder.java     |  10 +-
 .../impl/CloudControllerServiceUtil.java        |   4 +-
 .../services/impl/InstanceCreator.java          |   4 +-
 .../publisher/BAMUsageDataPublisher.java        | 213 ++++++++++++++++++
 5 files changed, 222 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/StatisticsDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/StatisticsDataPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/StatisticsDataPublisher.java
deleted file mode 100644
index c2d51a2..0000000
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/StatisticsDataPublisher.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.cloud.controller.messaging.publisher;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
-import org.apache.stratos.cloud.controller.context.CloudControllerContext;
-import org.apache.stratos.cloud.controller.domain.IaasProvider;
-import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
-import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-import org.apache.stratos.cloud.controller.domain.Cartridge;
-import org.apache.stratos.cloud.controller.domain.MemberContext;
-import org.apache.stratos.cloud.controller.iaases.JcloudsIaas;
-import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
-import org.jclouds.compute.domain.NodeMetadata;
-import org.wso2.carbon.base.ServerConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-import org.wso2.carbon.utils.CarbonUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- *  Statistics data publisher for publishing instance statistics to BAM.
- */
-public class StatisticsDataPublisher {
-    
-    private static final Log log = 
LogFactory.getLog(StatisticsDataPublisher.class);
-    private static AsyncDataPublisher dataPublisher;
-    private static StreamDefinition streamDefinition;
-    private static final String cloudControllerEventStreamVersion = "1.0.0";
-
-    public static void publish(String memberId,
-                               String partitionId,
-                               String networkId,
-                               String clusterId,
-                               String serviceName,
-                               String status,
-                               InstanceMetadata metadata) {
-        if(!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()){
-            return;
-        }
-        log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME+" cycle 
started.");
-
-        if(dataPublisher==null){
-            createDataPublisher();
-
-            //If we cannot create a data publisher we should give up
-            //this means data will not be published
-            if(dataPublisher == null){
-                log.error("Data Publisher cannot be created or found.");
-                release();
-                return;
-            }
-        }
-
-        MemberContext memberContext = 
CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
-        String cartridgeType = memberContext.getCartridgeType();
-        Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(cartridgeType);
-        
-        //Construct the data to be published
-        List<Object> payload = new ArrayList<Object>();
-        // Payload values
-        payload.add(memberId);
-        payload.add(serviceName);
-        payload.add(clusterId);
-        payload.add(handleNull(memberContext.getLbClusterId()));
-        payload.add(handleNull(partitionId));
-        payload.add(handleNull(networkId));
-               if (cartridge != null) {
-                       
payload.add(handleNull(String.valueOf(cartridge.isMultiTenant())));
-               } else {
-                       payload.add("");
-               }
-        payload.add(handleNull(memberContext.getPartition().getProvider()));
-        payload.add(handleNull(status));
-
-        if(metadata != null) {
-            payload.add(metadata.getHostname());
-            payload.add(metadata.getHypervisor());
-            payload.add(String.valueOf(metadata.getRam()));
-            payload.add(metadata.getImageId());
-            payload.add(metadata.getLoginPort());
-            payload.add(metadata.getOperatingSystemName());
-            payload.add(metadata.getOperatingSystemVersion());
-            payload.add(metadata.getOperatingSystemArchitecture());
-            payload.add(String.valueOf(metadata.isOperatingSystem64bit()));
-        } else {
-            payload.add("");
-            payload.add("");
-            payload.add("");
-            payload.add("");
-            payload.add(0);
-            payload.add("");
-            payload.add("");
-            payload.add("");
-            payload.add("");
-        }
-
-        
payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs())));
-        payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs())));
-        
payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs())));
-
-        Event event = new Event();
-        event.setPayloadData(payload.toArray());
-        event.setArbitraryDataMap(new HashMap<String, String>());
-
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing BAM event: [stream] %s 
[version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
-            }
-            dataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
-        } catch (AgentException e) {
-            if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish BAM event: [stream] 
%s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), 
e);
-            }
-        }
-    }
-    
-    private static void release(){
-        CloudControllerContext.getInstance().setPublisherRunning(false);
-    }
-    
-    private static StreamDefinition initializeStream() throws Exception {
-        streamDefinition = new StreamDefinition(
-                CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM,
-                cloudControllerEventStreamVersion);
-        streamDefinition.setNickName("cloud.controller");
-        streamDefinition.setDescription("Instances booted up by the Cloud 
Controller");
-        // Payload definition
-        List<Attribute> payloadData = new ArrayList<Attribute>();
-        payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, 
AttributeType.STRING));
-        payloadData.add(new 
Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, 
AttributeType.STRING));
-        payloadData.add(new 
Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING));
-        payloadData.add(new 
Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, 
AttributeType.STRING));
-        payloadData.add(new 
Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, 
AttributeType.INT));
-        payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, 
AttributeType.STRING));
-        payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, 
AttributeType.STRING));
-        payloadData.add(new 
Attribute(CloudControllerConstants.ALLOCATE_IP_COL, AttributeType.STRING));
-        streamDefinition.setPayloadData(payloadData);
-        return streamDefinition;
-    }
-
-
-    private static void createDataPublisher(){
-        //creating the agent
-
-        ServerConfiguration serverConfig =  
CarbonUtils.getServerConfiguration();
-        String trustStorePath = 
serverConfig.getFirstProperty("Security.TrustStore.Location");
-        String trustStorePassword = 
serverConfig.getFirstProperty("Security.TrustStore.Password");
-        String bamServerUrl = serverConfig.getFirstProperty("BamServerURL");
-        String adminUsername = 
CloudControllerConfig.getInstance().getDataPubConfig().getBamUsername();
-        String adminPassword = 
CloudControllerConfig.getInstance().getDataPubConfig().getBamPassword();
-
-        System.setProperty("javax.net.ssl.trustStore", trustStorePath);
-        System.setProperty("javax.net.ssl.trustStorePassword", 
trustStorePassword);
-
-
-        try {
-            dataPublisher = new AsyncDataPublisher("tcp://" +  bamServerUrl + 
"", adminUsername, adminPassword);
-            
CloudControllerContext.getInstance().setDataPublisher(dataPublisher);
-            initializeStream();
-            dataPublisher.addStreamDefinition(streamDefinition);
-        } catch (Exception e) {
-            String msg = "Unable to create a data publisher to " + 
bamServerUrl +
-                    ". Usage Agent will not function properly. ";
-            log.error(msg, e);
-            throw new CloudControllerException(msg, e);
-        }
-    }
-    
-    private static String handleNull(String val) {
-        if (val == null) {
-            return "";
-        }
-        return val;
-    }
-}

http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index aa4dce3..9f88ac5 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -24,7 +24,7 @@ import 
org.apache.stratos.cloud.controller.context.CloudControllerContext;
 import org.apache.stratos.cloud.controller.domain.*;
 import 
org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeException;
 import org.apache.stratos.cloud.controller.exception.InvalidMemberException;
-import 
org.apache.stratos.cloud.controller.messaging.publisher.StatisticsDataPublisher;
+import 
org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
 import 
org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.messaging.domain.application.ClusterDataHolder;
@@ -467,7 +467,7 @@ public class TopologyBuilder {
 
                 
TopologyEventPublisher.sendMemberInitializedEvent(memberContext);
                 //publishing data
-                StatisticsDataPublisher.publish(memberContext.getMemberId(),
+                BAMUsageDataPublisher.publish(memberContext.getMemberId(),
                         memberContext.getPartition().getId(),
                         memberContext.getNetworkPartitionId(),
                         memberContext.getClusterId(),
@@ -530,7 +530,7 @@ public class TopologyBuilder {
                     //memberStartedEvent.
                     
TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent);
                     //publishing data
-                    
StatisticsDataPublisher.publish(instanceStartedEvent.getMemberId(),
+                    
BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(),
                             instanceStartedEvent.getPartitionId(),
                             instanceStartedEvent.getNetworkPartitionId(),
                             instanceStartedEvent.getClusterId(),
@@ -633,7 +633,7 @@ public class TopologyBuilder {
                 
TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent);
 
                 // Publish statistics data
-                
StatisticsDataPublisher.publish(memberActivatedEvent.getMemberId(),
+                
BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(),
                         memberActivatedEvent.getPartitionId(),
                         memberActivatedEvent.getNetworkPartitionId(),
                         memberActivatedEvent.getClusterId(),
@@ -695,7 +695,7 @@ public class TopologyBuilder {
         }
         
TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
         //publishing data
-        
StatisticsDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
+        
BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(),
                 instanceReadyToShutdownEvent.getPartitionId(),
                 instanceReadyToShutdownEvent.getNetworkPartitionId(),
                 instanceReadyToShutdownEvent.getClusterId(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
index 9ba8876..23f49fe 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java
@@ -29,7 +29,7 @@ import 
org.apache.stratos.cloud.controller.exception.InvalidIaasProviderExceptio
 import org.apache.stratos.cloud.controller.exception.InvalidPartitionException;
 import org.apache.stratos.cloud.controller.iaases.Iaas;
 import org.apache.stratos.cloud.controller.iaases.PartitionValidator;
-import 
org.apache.stratos.cloud.controller.messaging.publisher.StatisticsDataPublisher;
+import 
org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
 import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
 import org.apache.stratos.cloud.controller.util.CloudControllerUtil;
 import org.apache.stratos.common.constants.StratosConstants;
@@ -66,7 +66,7 @@ public class CloudControllerServiceUtil {
                 partitionId, memberContext.getMemberId());
 
         // Publish statistics to BAM
-        StatisticsDataPublisher.publish(memberContext.getMemberId(),
+        BAMUsageDataPublisher.publish(memberContext.getMemberId(),
                 partitionId,
                 memberContext.getNetworkPartitionId(),
                 memberContext.getClusterId(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
index 64aa0a0..36e81df 100644
--- 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java
@@ -26,7 +26,7 @@ import 
org.apache.stratos.cloud.controller.context.CloudControllerContext;
 import org.apache.stratos.cloud.controller.domain.*;
 import 
org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException;
 import org.apache.stratos.cloud.controller.iaases.Iaas;
-import 
org.apache.stratos.cloud.controller.messaging.publisher.StatisticsDataPublisher;
+import 
org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher;
 import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder;
 import org.apache.stratos.messaging.domain.topology.MemberStatus;
 
@@ -79,7 +79,7 @@ public class InstanceCreator implements Runnable {
             TopologyBuilder.handleMemberInitializedEvent(memberContext);
 
             // Publish instance creation statistics to BAM
-            StatisticsDataPublisher.publish(
+            BAMUsageDataPublisher.publish(
                     memberContext.getMemberId(),
                     memberContext.getPartition().getId(),
                     memberContext.getNetworkPartitionId(),

http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
new file mode 100644
index 0000000..588cb01
--- /dev/null
+++ 
b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cloud.controller.statistics.publisher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.config.CloudControllerConfig;
+import org.apache.stratos.cloud.controller.context.CloudControllerContext;
+import org.apache.stratos.cloud.controller.domain.InstanceMetadata;
+import org.apache.stratos.cloud.controller.exception.CloudControllerException;
+import org.apache.stratos.cloud.controller.domain.Cartridge;
+import org.apache.stratos.cloud.controller.domain.MemberContext;
+import org.apache.stratos.cloud.controller.util.CloudControllerConstants;
+import org.wso2.carbon.base.ServerConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+import org.wso2.carbon.utils.CarbonUtils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ *  Usage data publisher for publishing instance usage data to BAM.
+ */
+public class BAMUsageDataPublisher {
+    
+    private static final Log log = 
LogFactory.getLog(BAMUsageDataPublisher.class);
+
+    private static AsyncDataPublisher dataPublisher;
+    private static StreamDefinition streamDefinition;
+    private static final String cloudControllerEventStreamVersion = "1.0.0";
+
+    public static void publish(String memberId,
+                               String partitionId,
+                               String networkId,
+                               String clusterId,
+                               String serviceName,
+                               String status,
+                               InstanceMetadata metadata) {
+        if(!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()){
+            return;
+        }
+        log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME+" cycle 
started.");
+
+        if(dataPublisher==null){
+            createDataPublisher();
+
+            //If we cannot create a data publisher we should give up
+            //this means data will not be published
+            if(dataPublisher == null){
+                log.error("Data Publisher cannot be created or found.");
+                release();
+                return;
+            }
+        }
+
+        MemberContext memberContext = 
CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId);
+        String cartridgeType = memberContext.getCartridgeType();
+        Cartridge cartridge = 
CloudControllerContext.getInstance().getCartridge(cartridgeType);
+        
+        //Construct the data to be published
+        List<Object> payload = new ArrayList<Object>();
+        // Payload values
+        payload.add(memberId);
+        payload.add(serviceName);
+        payload.add(clusterId);
+        payload.add(handleNull(memberContext.getLbClusterId()));
+        payload.add(handleNull(partitionId));
+        payload.add(handleNull(networkId));
+               if (cartridge != null) {
+                       
payload.add(handleNull(String.valueOf(cartridge.isMultiTenant())));
+               } else {
+                       payload.add("");
+               }
+        payload.add(handleNull(memberContext.getPartition().getProvider()));
+        payload.add(handleNull(status));
+
+        if(metadata != null) {
+            payload.add(metadata.getHostname());
+            payload.add(metadata.getHypervisor());
+            payload.add(String.valueOf(metadata.getRam()));
+            payload.add(metadata.getImageId());
+            payload.add(metadata.getLoginPort());
+            payload.add(metadata.getOperatingSystemName());
+            payload.add(metadata.getOperatingSystemVersion());
+            payload.add(metadata.getOperatingSystemArchitecture());
+            payload.add(String.valueOf(metadata.isOperatingSystem64bit()));
+        } else {
+            payload.add("");
+            payload.add("");
+            payload.add("");
+            payload.add("");
+            payload.add(0);
+            payload.add("");
+            payload.add("");
+            payload.add("");
+            payload.add("");
+        }
+
+        
payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs())));
+        payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs())));
+        
payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs())));
+
+        Event event = new Event();
+        event.setPayloadData(payload.toArray());
+        event.setArbitraryDataMap(new HashMap<String, String>());
+
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Publishing BAM event: [stream] %s 
[version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
+            }
+            dataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
+        } catch (AgentException e) {
+            if (log.isErrorEnabled()) {
+                log.error(String.format("Could not publish BAM event: [stream] 
%s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), 
e);
+            }
+        }
+    }
+    
+    private static void release(){
+        CloudControllerContext.getInstance().setPublisherRunning(false);
+    }
+    
+    private static StreamDefinition initializeStream() throws Exception {
+        streamDefinition = new StreamDefinition(
+                CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM,
+                cloudControllerEventStreamVersion);
+        streamDefinition.setNickName("cloud.controller");
+        streamDefinition.setDescription("Instances booted up by the Cloud 
Controller");
+        // Payload definition
+        List<Attribute> payloadData = new ArrayList<Attribute>();
+        payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, 
AttributeType.INT));
+        payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, 
AttributeType.STRING));
+        payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, 
AttributeType.STRING));
+        payloadData.add(new 
Attribute(CloudControllerConstants.ALLOCATE_IP_COL, AttributeType.STRING));
+        streamDefinition.setPayloadData(payloadData);
+        return streamDefinition;
+    }
+
+
+    private static void createDataPublisher(){
+        //creating the agent
+
+        ServerConfiguration serverConfig =  
CarbonUtils.getServerConfiguration();
+        String trustStorePath = 
serverConfig.getFirstProperty("Security.TrustStore.Location");
+        String trustStorePassword = 
serverConfig.getFirstProperty("Security.TrustStore.Password");
+        String bamServerUrl = serverConfig.getFirstProperty("BamServerURL");
+        String adminUsername = 
CloudControllerConfig.getInstance().getDataPubConfig().getBamUsername();
+        String adminPassword = 
CloudControllerConfig.getInstance().getDataPubConfig().getBamPassword();
+
+        System.setProperty("javax.net.ssl.trustStore", trustStorePath);
+        System.setProperty("javax.net.ssl.trustStorePassword", 
trustStorePassword);
+
+
+        try {
+            dataPublisher = new AsyncDataPublisher("tcp://" +  bamServerUrl + 
"", adminUsername, adminPassword);
+            
CloudControllerContext.getInstance().setDataPublisher(dataPublisher);
+            initializeStream();
+            dataPublisher.addStreamDefinition(streamDefinition);
+        } catch (Exception e) {
+            String msg = "Unable to create a data publisher to " + 
bamServerUrl +
+                    ". Usage Agent will not function properly. ";
+            log.error(msg, e);
+            throw new CloudControllerException(msg, e);
+        }
+    }
+    
+    private static String handleNull(String val) {
+        if (val == null) {
+            return "";
+        }
+        return val;
+    }
+}

Reply via email to