Repository: stratos Updated Branches: refs/heads/master 02966ab58 -> ab73437f8
Renaming usage data publisher to BAMUsageDataPublisher Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/ab73437f Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/ab73437f Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/ab73437f Branch: refs/heads/master Commit: ab73437f839108969d704c83e7a8fbbe659e6b7c Parents: 02966ab Author: Imesh Gunaratne <[email protected]> Authored: Sat Mar 7 17:10:34 2015 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sat Mar 7 17:10:57 2015 +0530 ---------------------------------------------------------------------- .../publisher/StatisticsDataPublisher.java | 215 ------------------- .../messaging/topology/TopologyBuilder.java | 10 +- .../impl/CloudControllerServiceUtil.java | 4 +- .../services/impl/InstanceCreator.java | 4 +- .../publisher/BAMUsageDataPublisher.java | 213 ++++++++++++++++++ 5 files changed, 222 insertions(+), 224 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/StatisticsDataPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/StatisticsDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/StatisticsDataPublisher.java deleted file mode 100644 index c2d51a2..0000000 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/publisher/StatisticsDataPublisher.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.stratos.cloud.controller.messaging.publisher; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.stratos.cloud.controller.config.CloudControllerConfig; -import org.apache.stratos.cloud.controller.context.CloudControllerContext; -import org.apache.stratos.cloud.controller.domain.IaasProvider; -import org.apache.stratos.cloud.controller.domain.InstanceMetadata; -import org.apache.stratos.cloud.controller.exception.CloudControllerException; -import org.apache.stratos.cloud.controller.domain.Cartridge; -import org.apache.stratos.cloud.controller.domain.MemberContext; -import org.apache.stratos.cloud.controller.iaases.JcloudsIaas; -import org.apache.stratos.cloud.controller.util.CloudControllerConstants; -import org.jclouds.compute.domain.NodeMetadata; -import org.wso2.carbon.base.ServerConfiguration; -import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; -import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; -import org.wso2.carbon.databridge.commons.Attribute; -import org.wso2.carbon.databridge.commons.AttributeType; -import org.wso2.carbon.databridge.commons.Event; -import org.wso2.carbon.databridge.commons.StreamDefinition; -import org.wso2.carbon.utils.CarbonUtils; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -/** - * Statistics data publisher for publishing instance statistics to BAM. - */ -public class StatisticsDataPublisher { - - private static final Log log = LogFactory.getLog(StatisticsDataPublisher.class); - private static AsyncDataPublisher dataPublisher; - private static StreamDefinition streamDefinition; - private static final String cloudControllerEventStreamVersion = "1.0.0"; - - public static void publish(String memberId, - String partitionId, - String networkId, - String clusterId, - String serviceName, - String status, - InstanceMetadata metadata) { - if(!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()){ - return; - } - log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME+" cycle started."); - - if(dataPublisher==null){ - createDataPublisher(); - - //If we cannot create a data publisher we should give up - //this means data will not be published - if(dataPublisher == null){ - log.error("Data Publisher cannot be created or found."); - release(); - return; - } - } - - MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId); - String cartridgeType = memberContext.getCartridgeType(); - Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - - //Construct the data to be published - List<Object> payload = new ArrayList<Object>(); - // Payload values - payload.add(memberId); - payload.add(serviceName); - payload.add(clusterId); - payload.add(handleNull(memberContext.getLbClusterId())); - payload.add(handleNull(partitionId)); - payload.add(handleNull(networkId)); - if (cartridge != null) { - payload.add(handleNull(String.valueOf(cartridge.isMultiTenant()))); - } else { - payload.add(""); - } - payload.add(handleNull(memberContext.getPartition().getProvider())); - payload.add(handleNull(status)); - - if(metadata != null) { - payload.add(metadata.getHostname()); - payload.add(metadata.getHypervisor()); - payload.add(String.valueOf(metadata.getRam())); - payload.add(metadata.getImageId()); - payload.add(metadata.getLoginPort()); - payload.add(metadata.getOperatingSystemName()); - payload.add(metadata.getOperatingSystemVersion()); - payload.add(metadata.getOperatingSystemArchitecture()); - payload.add(String.valueOf(metadata.isOperatingSystem64bit())); - } else { - payload.add(""); - payload.add(""); - payload.add(""); - payload.add(""); - payload.add(0); - payload.add(""); - payload.add(""); - payload.add(""); - payload.add(""); - } - - payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs()))); - payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs()))); - payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs()))); - - Event event = new Event(); - event.setPayloadData(payload.toArray()); - event.setArbitraryDataMap(new HashMap<String, String>()); - - try { - if (log.isDebugEnabled()) { - log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); - } - dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); - } catch (AgentException e) { - if (log.isErrorEnabled()) { - log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e); - } - } - } - - private static void release(){ - CloudControllerContext.getInstance().setPublisherRunning(false); - } - - private static StreamDefinition initializeStream() throws Exception { - streamDefinition = new StreamDefinition( - CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM, - cloudControllerEventStreamVersion); - streamDefinition.setNickName("cloud.controller"); - streamDefinition.setDescription("Instances booted up by the Cloud Controller"); - // Payload definition - List<Attribute> payloadData = new ArrayList<Attribute>(); - payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, AttributeType.INT)); - payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, AttributeType.STRING)); - payloadData.add(new Attribute(CloudControllerConstants.ALLOCATE_IP_COL, AttributeType.STRING)); - streamDefinition.setPayloadData(payloadData); - return streamDefinition; - } - - - private static void createDataPublisher(){ - //creating the agent - - ServerConfiguration serverConfig = CarbonUtils.getServerConfiguration(); - String trustStorePath = serverConfig.getFirstProperty("Security.TrustStore.Location"); - String trustStorePassword = serverConfig.getFirstProperty("Security.TrustStore.Password"); - String bamServerUrl = serverConfig.getFirstProperty("BamServerURL"); - String adminUsername = CloudControllerConfig.getInstance().getDataPubConfig().getBamUsername(); - String adminPassword = CloudControllerConfig.getInstance().getDataPubConfig().getBamPassword(); - - System.setProperty("javax.net.ssl.trustStore", trustStorePath); - System.setProperty("javax.net.ssl.trustStorePassword", trustStorePassword); - - - try { - dataPublisher = new AsyncDataPublisher("tcp://" + bamServerUrl + "", adminUsername, adminPassword); - CloudControllerContext.getInstance().setDataPublisher(dataPublisher); - initializeStream(); - dataPublisher.addStreamDefinition(streamDefinition); - } catch (Exception e) { - String msg = "Unable to create a data publisher to " + bamServerUrl + - ". Usage Agent will not function properly. "; - log.error(msg, e); - throw new CloudControllerException(msg, e); - } - } - - private static String handleNull(String val) { - if (val == null) { - return ""; - } - return val; - } -} http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index aa4dce3..9f88ac5 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -24,7 +24,7 @@ import org.apache.stratos.cloud.controller.context.CloudControllerContext; import org.apache.stratos.cloud.controller.domain.*; import org.apache.stratos.cloud.controller.exception.InvalidCartridgeTypeException; import org.apache.stratos.cloud.controller.exception.InvalidMemberException; -import org.apache.stratos.cloud.controller.messaging.publisher.StatisticsDataPublisher; +import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.messaging.publisher.TopologyEventPublisher; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.messaging.domain.application.ClusterDataHolder; @@ -467,7 +467,7 @@ public class TopologyBuilder { TopologyEventPublisher.sendMemberInitializedEvent(memberContext); //publishing data - StatisticsDataPublisher.publish(memberContext.getMemberId(), + BAMUsageDataPublisher.publish(memberContext.getMemberId(), memberContext.getPartition().getId(), memberContext.getNetworkPartitionId(), memberContext.getClusterId(), @@ -530,7 +530,7 @@ public class TopologyBuilder { //memberStartedEvent. TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); //publishing data - StatisticsDataPublisher.publish(instanceStartedEvent.getMemberId(), + BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(), instanceStartedEvent.getPartitionId(), instanceStartedEvent.getNetworkPartitionId(), instanceStartedEvent.getClusterId(), @@ -633,7 +633,7 @@ public class TopologyBuilder { TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); // Publish statistics data - StatisticsDataPublisher.publish(memberActivatedEvent.getMemberId(), + BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), memberActivatedEvent.getPartitionId(), memberActivatedEvent.getNetworkPartitionId(), memberActivatedEvent.getClusterId(), @@ -695,7 +695,7 @@ public class TopologyBuilder { } TopologyEventPublisher.sendMemberReadyToShutdownEvent(memberReadyToShutdownEvent); //publishing data - StatisticsDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), + BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), instanceReadyToShutdownEvent.getPartitionId(), instanceReadyToShutdownEvent.getNetworkPartitionId(), instanceReadyToShutdownEvent.getClusterId(), http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java index 9ba8876..23f49fe 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java @@ -29,7 +29,7 @@ import org.apache.stratos.cloud.controller.exception.InvalidIaasProviderExceptio import org.apache.stratos.cloud.controller.exception.InvalidPartitionException; import org.apache.stratos.cloud.controller.iaases.Iaas; import org.apache.stratos.cloud.controller.iaases.PartitionValidator; -import org.apache.stratos.cloud.controller.messaging.publisher.StatisticsDataPublisher; +import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.common.constants.StratosConstants; @@ -66,7 +66,7 @@ public class CloudControllerServiceUtil { partitionId, memberContext.getMemberId()); // Publish statistics to BAM - StatisticsDataPublisher.publish(memberContext.getMemberId(), + BAMUsageDataPublisher.publish(memberContext.getMemberId(), partitionId, memberContext.getNetworkPartitionId(), memberContext.getClusterId(), http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java index 64aa0a0..36e81df 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java @@ -26,7 +26,7 @@ import org.apache.stratos.cloud.controller.context.CloudControllerContext; import org.apache.stratos.cloud.controller.domain.*; import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException; import org.apache.stratos.cloud.controller.iaases.Iaas; -import org.apache.stratos.cloud.controller.messaging.publisher.StatisticsDataPublisher; +import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; import org.apache.stratos.messaging.domain.topology.MemberStatus; @@ -79,7 +79,7 @@ public class InstanceCreator implements Runnable { TopologyBuilder.handleMemberInitializedEvent(memberContext); // Publish instance creation statistics to BAM - StatisticsDataPublisher.publish( + BAMUsageDataPublisher.publish( memberContext.getMemberId(), memberContext.getPartition().getId(), memberContext.getNetworkPartitionId(), http://git-wip-us.apache.org/repos/asf/stratos/blob/ab73437f/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java new file mode 100644 index 0000000..588cb01 --- /dev/null +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.cloud.controller.statistics.publisher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.cloud.controller.config.CloudControllerConfig; +import org.apache.stratos.cloud.controller.context.CloudControllerContext; +import org.apache.stratos.cloud.controller.domain.InstanceMetadata; +import org.apache.stratos.cloud.controller.exception.CloudControllerException; +import org.apache.stratos.cloud.controller.domain.Cartridge; +import org.apache.stratos.cloud.controller.domain.MemberContext; +import org.apache.stratos.cloud.controller.util.CloudControllerConstants; +import org.wso2.carbon.base.ServerConfiguration; +import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher; +import org.wso2.carbon.databridge.agent.thrift.exception.AgentException; +import org.wso2.carbon.databridge.commons.Attribute; +import org.wso2.carbon.databridge.commons.AttributeType; +import org.wso2.carbon.databridge.commons.Event; +import org.wso2.carbon.databridge.commons.StreamDefinition; +import org.wso2.carbon.utils.CarbonUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +/** + * Usage data publisher for publishing instance usage data to BAM. + */ +public class BAMUsageDataPublisher { + + private static final Log log = LogFactory.getLog(BAMUsageDataPublisher.class); + + private static AsyncDataPublisher dataPublisher; + private static StreamDefinition streamDefinition; + private static final String cloudControllerEventStreamVersion = "1.0.0"; + + public static void publish(String memberId, + String partitionId, + String networkId, + String clusterId, + String serviceName, + String status, + InstanceMetadata metadata) { + if(!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()){ + return; + } + log.debug(CloudControllerConstants.DATA_PUB_TASK_NAME+" cycle started."); + + if(dataPublisher==null){ + createDataPublisher(); + + //If we cannot create a data publisher we should give up + //this means data will not be published + if(dataPublisher == null){ + log.error("Data Publisher cannot be created or found."); + release(); + return; + } + } + + MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId); + String cartridgeType = memberContext.getCartridgeType(); + Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); + + //Construct the data to be published + List<Object> payload = new ArrayList<Object>(); + // Payload values + payload.add(memberId); + payload.add(serviceName); + payload.add(clusterId); + payload.add(handleNull(memberContext.getLbClusterId())); + payload.add(handleNull(partitionId)); + payload.add(handleNull(networkId)); + if (cartridge != null) { + payload.add(handleNull(String.valueOf(cartridge.isMultiTenant()))); + } else { + payload.add(""); + } + payload.add(handleNull(memberContext.getPartition().getProvider())); + payload.add(handleNull(status)); + + if(metadata != null) { + payload.add(metadata.getHostname()); + payload.add(metadata.getHypervisor()); + payload.add(String.valueOf(metadata.getRam())); + payload.add(metadata.getImageId()); + payload.add(metadata.getLoginPort()); + payload.add(metadata.getOperatingSystemName()); + payload.add(metadata.getOperatingSystemVersion()); + payload.add(metadata.getOperatingSystemArchitecture()); + payload.add(String.valueOf(metadata.isOperatingSystem64bit())); + } else { + payload.add(""); + payload.add(""); + payload.add(""); + payload.add(""); + payload.add(0); + payload.add(""); + payload.add(""); + payload.add(""); + payload.add(""); + } + + payload.add(handleNull(Arrays.toString(memberContext.getPrivateIPs()))); + payload.add(handleNull(Arrays.toString(memberContext.getPublicIPs()))); + payload.add(handleNull(Arrays.toString(memberContext.getAllocatedIPs()))); + + Event event = new Event(); + event.setPayloadData(payload.toArray()); + event.setArbitraryDataMap(new HashMap<String, String>()); + + try { + if (log.isDebugEnabled()) { + log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); + } + dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); + } catch (AgentException e) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e); + } + } + } + + private static void release(){ + CloudControllerContext.getInstance().setPublisherRunning(false); + } + + private static StreamDefinition initializeStream() throws Exception { + streamDefinition = new StreamDefinition( + CloudControllerConstants.CLOUD_CONTROLLER_EVENT_STREAM, + cloudControllerEventStreamVersion); + streamDefinition.setNickName("cloud.controller"); + streamDefinition.setDescription("Instances booted up by the Cloud Controller"); + // Payload definition + List<Attribute> payloadData = new ArrayList<Attribute>(); + payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.HOST_NAME_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.HYPERVISOR_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.RAM_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.IMAGE_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.LOGIN_PORT_COL, AttributeType.INT)); + payloadData.add(new Attribute(CloudControllerConstants.OS_NAME_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_VERSION_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_ARCH_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.OS_BIT_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PRIV_IP_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.PUB_IP_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.ALLOCATE_IP_COL, AttributeType.STRING)); + streamDefinition.setPayloadData(payloadData); + return streamDefinition; + } + + + private static void createDataPublisher(){ + //creating the agent + + ServerConfiguration serverConfig = CarbonUtils.getServerConfiguration(); + String trustStorePath = serverConfig.getFirstProperty("Security.TrustStore.Location"); + String trustStorePassword = serverConfig.getFirstProperty("Security.TrustStore.Password"); + String bamServerUrl = serverConfig.getFirstProperty("BamServerURL"); + String adminUsername = CloudControllerConfig.getInstance().getDataPubConfig().getBamUsername(); + String adminPassword = CloudControllerConfig.getInstance().getDataPubConfig().getBamPassword(); + + System.setProperty("javax.net.ssl.trustStore", trustStorePath); + System.setProperty("javax.net.ssl.trustStorePassword", trustStorePassword); + + + try { + dataPublisher = new AsyncDataPublisher("tcp://" + bamServerUrl + "", adminUsername, adminPassword); + CloudControllerContext.getInstance().setDataPublisher(dataPublisher); + initializeStream(); + dataPublisher.addStreamDefinition(streamDefinition); + } catch (Exception e) { + String msg = "Unable to create a data publisher to " + bamServerUrl + + ". Usage Agent will not function properly. "; + log.error(msg, e); + throw new CloudControllerException(msg, e); + } + } + + private static String handleNull(String val) { + if (val == null) { + return ""; + } + return val; + } +}
